zulip-fediverse-auth/auth.py

172 lines
5.5 KiB
Python

from flask import Flask, Response, redirect
from flask import request
from markupsafe import escape
from mastodon import Mastodon
from email.headerregistry import Address
import jwt
from gevent.pywsgi import WSGIServer
import os
import sqlite3
import zulip
import random
import string
SECRET = os.environ["SECRET"]
ZULIP = f"https://{os.environ['ZULIP']}/accounts/login/jwt/"
REDIRECT = f"https://{os.environ['ZULIP']}/fedi-auth/callback"
DB = os.environ.get("DB", "/var/lib/fedi-zulip/db/applications")
print(f"""
Zulip is: {os.environ['ZULIP']}
DB location is: {DB}
""")
con = sqlite3.connect(DB)
cur = con.cursor()
zulip_client = zulip.Client()
def get_zulip_user(handle):
zulip_client.call_endpoint(
url=f"/users/{handle}",
method="GET"
)
def create_zulip_user(handle):
password = ''.join(random.choices(string.ascii_uppercase + string.digits, k=40))
return zulip_client.create_user({
"email": handle,
"password": password,
"full_name": handle.split('@')[0]
})
def get_or_create_zulip_user(handle):
user = get_zulip_user(handle)
if user is None:
print(f"User: {handle} created.")
user = create_zulip_user(handle)
else:
print(f"User: {handle} already exists.")
return user
cur.execute("CREATE TABLE IF NOT EXISTS applications(instance TEXT PRIMARY KEY, client TEXT, secret TEXT, disabled BOOLEAN DEFAULT FALSE)")
def get_application(instance):
res = cur.execute("SELECT client, secret FROM applications WHERE instance = ?", [instance])
return res.fetchone();
def set_application(instance, client, secret):
res = cur.execute("INSERT INTO applications(instance, client, secret) values (?, ?, ?)", (instance, client, secret));
con.commit();
app = Flask(__name__)
@app.get("/fedi-auth/")
def index():
return f"""
<!DOCTYPE html>
<html lang="en">
<body>
<h1>Login to Pleroma Chat Using Handle</h1>
<p>
You can use this page to login to {os.environ['ZULIP']} using your
Pleroma, Akkoma or Mastodon handle. Format is <code>nickname@server</code>.
</p>
<form action="/fedi-auth/login" method="post">
<label for="nickname">Fediverse handle</label>
<br>
<input type="email" id="nickname" name="nickname">
<button>login</button>
</form>
</body>
</html>
"""
@app.post("/fedi-auth/login")
def login():
print("Login POST", flush=True)
instance = Address(addr_spec=request.form["nickname"]).domain
print(f"Instance is: {instance}.", flush=True)
try:
app = get_application(instance)
if app == None:
print(f"There is no OAuth application for {instance} so creating one.", flush=True)
(client, secret) = Mastodon.create_app(
"zulip",
scopes=["read"],
redirect_uris=REDIRECT,
api_base_url=f"https://{instance}",
)
app = (client, secret)
set_application(instance, client, secret)
(client, secret) = app
masto = Mastodon(
client_id=client,
client_secret=secret,
api_base_url=f"https://{instance}",
)
print(f"Getting login URI for {instance}.", flush=True)
oauth = Mastodon.auth_request_url(
masto,
scopes=["read"],
force_login=True,
redirect_uris=REDIRECT,
state=instance,
)
print(f"Sending user to: {REDIRECT}", flush=True)
return redirect(oauth)
except Exception as e:
print(e)
return Response("fail", status=400)
@app.get("/fedi-auth/callback")
def callback():
oauth = request.args.get("code")
instance = request.args.get("state")
print(f"oauth: {oauth is not None} instance: {instance is not None}")
if oauth != None and instance != None:
try:
app = get_application(instance)
if app != None:
masto = Mastodon(
client_id=app[0],
client_secret=app[1],
api_base_url=f"https://{instance}",
)
Mastodon.log_in(masto, code=oauth, scopes=["read"])
creds = Mastodon.account_verify_credentials(masto)
handle = f"{creds.acct}@{instance}"
zulip_user = get_or_create_zulip_user(handle)
print(zulip_user)
token = jwt.encode(
{"email": handle},
SECRET,
algorithm="HS256",
)
return f"""
<!DOCTYPE html>
<html lang="en">
<body>
<form name="zulip" action="{ZULIP}" method="post">
<input type="hidden" name="token" value={escape(token)}>
<button>login to zulip</button>
</form>
<script type="text/javascript">
document.zulip.submit();
</script>
</body>
</html>
"""
except Exception as e:
print(e)
pass
print("Some field wasn't set.")
return Response("fail", status=400)