zulip-fediverse-auth/auth.py

183 lines
6.0 KiB
Python

from flask import Flask, Response, redirect
from flask import request
from markupsafe import escape
from mastodon import Mastodon
from email.headerregistry import Address
import jwt
from gevent.pywsgi import WSGIServer
import os
import sqlite3
import zulip
import random
import string
SECRET = os.environ["SECRET"]
ZULIP = f"https://{os.environ['ZULIP']}/accounts/login/jwt/"
REDIRECT = f"https://{os.environ['ZULIP']}/fedi-auth/callback"
DB = os.environ.get("DB", "/var/lib/fedi-zulip/db/applications")
scopes = ["read"]
print(f"""
Zulip is: {os.environ['ZULIP']}
DB location is: {DB}
""")
con = sqlite3.connect(DB)
cur = con.cursor()
zulip_client = zulip.Client()
def get_zulip_user(handle):
print(f"Querying Zulip for handle: {handle}")
zulip_client.call_endpoint(
url=f"/users/{handle}",
method="GET"
)
def create_zulip_user(handle):
print(f"Creating Zulip user with handle: {handle}")
password = ''.join(random.choices(string.ascii_uppercase + string.ascii_lowercase + string.digits, k=40))
full_name = handle.split('@')[0]
payload = {
"email": handle,
"password": password,
"full_name": full_name
}
response = zulip_client.create_user(payload)
if response["result"] == "success":
print(f"Zulip user ID: {response.user_id} created for handle: {handle}")
return True
elif response["result"] == "error" and response["msg"] == f"Email '{handle}' already in use":
print(response["msg"] + ", this is okay.")
return True
else:
print(response["msg"])
return False
cur.execute("CREATE TABLE IF NOT EXISTS applications(instance TEXT PRIMARY KEY, client TEXT, secret TEXT, disabled BOOLEAN DEFAULT FALSE)")
def get_application(instance):
res = cur.execute("SELECT client, secret FROM applications WHERE instance = ?", [instance])
return res.fetchone();
def set_application(instance, client, secret):
res = cur.execute("INSERT INTO applications(instance, client, secret) values (?, ?, ?)", (instance, client, secret));
con.commit();
app = Flask(__name__)
@app.get("/fedi-auth/")
def index():
return f"""
<!DOCTYPE html>
<html lang="en">
<body>
<h1>Login to Pleroma Chat Using Handle</h1>
<p>
You can use this page to login to {os.environ['ZULIP']} using your
Pleroma, Akkoma or Mastodon handle. Format is <code>nickname@server.tld</code>.
</p>
<form action="/fedi-auth/login" method="post">
<label for="nickname">Fediverse handle</label>
<br>
<input type="email" id="nickname" name="nickname">
<button>login</button>
</form>
</body>
</html>
"""
@app.post("/fedi-auth/login")
def login():
print("Login POST", flush=True)
instance = Address(addr_spec=request.form["nickname"]).domain
print(f"Instance is: {instance}.", flush=True)
try:
app = get_application(instance)
if app == None:
print(f"There is no OAuth application for {instance} so creating one.")
(client, secret) = Mastodon.create_app(
"zulip",
scopes=scopes,
redirect_uris=REDIRECT,
api_base_url=f"https://{instance}",
)
app = (client, secret)
set_application(instance, client, secret)
(client, secret) = app
masto = Mastodon(
client_id=client,
client_secret=secret,
api_base_url=f"https://{instance}",
)
print(f"Getting login URI for {instance}.", flush=True)
oauth = Mastodon.auth_request_url(
masto,
scopes=scopes,
force_login=True,
redirect_uris=REDIRECT,
state=instance,
)
print(f"Sending user to: {REDIRECT}", flush=True)
return redirect(oauth)
except Exception as e:
print(e)
return Response("fail", status=400)
@app.get("/fedi-auth/callback")
def callback():
oauth = request.args.get("code")
instance = request.args.get("state")
print(f"oauth: {oauth is not None} instance: {instance is not None}")
if oauth != None and instance != None:
try:
app = get_application(instance)
if app != None:
masto = Mastodon(
client_id=app[0],
client_secret=app[1],
api_base_url=f"https://{instance}",
)
Mastodon.log_in(masto, code=oauth, scopes=scopes)
creds = Mastodon.account_verify_credentials(masto)
if hasattr(creds, "error"):
print(f"Verifying credentials for instance: {instance} error: {creds.error}")
return Response("fail", status=400)
handle = f"{creds.acct}@{instance}"
success = create_zulip_user(handle)
if not success:
return Response("fail", status=400)
token = jwt.encode(
{"email": handle},
SECRET,
algorithm="HS256",
)
return f"""
<!DOCTYPE html>
<html lang="en">
<body>
<p>Please wait while you are logged in...</p>
<form name="zulip" action="{ZULIP}" method="post">
<input type="hidden" name="token" value={escape(token)}>
</form>
<script type="text/javascript">
document.zulip.submit();
</script>
</body>
</html>
"""
except Exception as e:
print(e)
pass
print("Some field wasn't set.")
return Response("fail", status=400)