import sqlite3 from mastodon import Mastodon from sys import exit def get_api(url, token_name = ""): if token_name: try: file = open('token/' + token_name, 'r') except FileNotFoundError: print('Token not found for ' + token_name) exit() else: token = file.read().splitlines()[0] file.close() else: token = "" return Mastodon(access_token = token, api_base_url = url, ratelimit_method='throw', version_check_mode='none') def list_read(name): try: file = open('list/' + name, 'r') except FileNotFoundError: file = open('list/' + name, 'x') file.close() return [] else: list = file.read().splitlines() file.close() return list def list_write(name, values): file = open('list/' + name, 'w') for value in values: file.write(str(value) + '\n') file.close() def list_append(name, value): file = open('list/' + name, 'a') file.write(value + '\n') file.close() conn = sqlite3.connect("bots.db", isolation_level=None) cursor = conn.cursor() cursor.executescript(''' create table if not exists bots ( id integer primary key autoincrement, name text not null ); create table if not exists posted_already ( id integer primary key autoincrement, bot_id integer not null, user_id text not null, created timestamp default current_timestamp, unique(bot_id, user_id), foreign key (bot_id) references bots(id) ); ''') def get_or_create_bot(name): cursor.execute("SELECT id FROM bots WHERE name = ?", (name,)) result = cursor.fetchone() if result: return result[0] cursor.execute("INSERT INTO bots (name) VALUES (?)", (name,)) return cursor.lastrowid def has_user_posted(bot_name, user_id): user_id = str(user_id) bot_id = get_or_create_bot(bot_name) cursor.execute(""" SELECT EXISTS( SELECT 1 FROM posted_already WHERE bot_id = ? AND user_id = ? LIMIT 1 ) """, (bot_id, user_id)) return bool(cursor.fetchone()[0]) def mark_user_posted(bot_name, user_id): user_id = str(user_id) bot_id = get_or_create_bot(bot_name) try: cursor.execute(""" INSERT INTO posted_already (bot_id, user_id) VALUES (?, ?) """, (bot_id, user_id)) return True except sqlite3.IntegrityError: return False def unmark_user_posted(bot_name, user_id): user_id = str(user_id) bot_id = get_or_create_bot(bot_name) cursor.execute(""" DELETE FROM posted_already WHERE bot_id = ? AND user_id = ? """, (bot_id, user_id)) return cursor.rowcount > 0 # Returns True if a row was deleted def db_close(): try: cursor.close() conn.close() except Exception: pass # It is not safe to get notifications from "last_id" because some may have been deleted def get_new_notifications(api, bot_name, types=None): last_notifications=list_read(bot_name + '_last_notifications') notifications = api.notifications(types=types) new_notifications = [] new_notifications_ids = [] max_notification = 0 if len(notifications) < 2: max_notification = len(notifications) else: max_notification = len(notifications) // 2 for i in range(0, max_notification): if str(notifications[i]['id']) not in last_notifications: new_notifications.append(notifications[i]) for n in notifications: new_notifications_ids.append(n['id']) list_write(bot_name + "_last_notifications", new_notifications_ids) return new_notifications women_pronouns = ["she","her","ella","illa"] nb_pronouns = ["they","them","elle", "ille"] def is_gender(pronouns, account): for pronoun in pronouns: if(pronoun in account.display_name.lower()): return True for field in account.fields: if(pronoun in field.value.lower()): return True if(pronoun in account.note.lower()): return True return False def get_gender(account): if(is_gender(nb_pronouns, account)): return 2 if(is_gender(women_pronouns,account)): return 1 return 0