163 lines
4.2 KiB
Python
163 lines
4.2 KiB
Python
import sqlite3
|
|
from mastodon import Mastodon
|
|
from sys import exit
|
|
|
|
def get_api(url, token_name = ""):
|
|
if token_name:
|
|
try:
|
|
file = open('token/' + token_name, 'r')
|
|
except FileNotFoundError:
|
|
print('Token not found for ' + token_name)
|
|
exit()
|
|
else:
|
|
token = file.read().splitlines()[0]
|
|
file.close()
|
|
else:
|
|
token = ""
|
|
|
|
return Mastodon(access_token = token, api_base_url = url, ratelimit_method='throw', version_check_mode='none')
|
|
|
|
def list_read(name):
|
|
try:
|
|
file = open('list/' + name, 'r')
|
|
except FileNotFoundError:
|
|
file = open('list/' + name, 'x')
|
|
file.close()
|
|
return []
|
|
else:
|
|
list = file.read().splitlines()
|
|
file.close()
|
|
return list
|
|
|
|
def list_write(name, values):
|
|
file = open('list/' + name, 'w')
|
|
for value in values:
|
|
file.write(str(value) + '\n')
|
|
file.close()
|
|
|
|
def list_append(name, value):
|
|
file = open('list/' + name, 'a')
|
|
file.write(value + '\n')
|
|
file.close()
|
|
|
|
|
|
conn = sqlite3.connect("bots.db", isolation_level=None)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.executescript('''
|
|
create table if not exists bots (
|
|
id integer primary key autoincrement,
|
|
name text not null
|
|
);
|
|
|
|
create table if not exists posted_already (
|
|
id integer primary key autoincrement,
|
|
bot_id integer not null,
|
|
user_id text not null,
|
|
created timestamp default current_timestamp,
|
|
unique(bot_id, user_id),
|
|
foreign key (bot_id) references bots(id)
|
|
);
|
|
''')
|
|
|
|
def get_or_create_bot(name):
|
|
cursor.execute("SELECT id FROM bots WHERE name = ?", (name,))
|
|
result = cursor.fetchone()
|
|
|
|
if result:
|
|
return result[0]
|
|
|
|
cursor.execute("INSERT INTO bots (name) VALUES (?)", (name,))
|
|
return cursor.lastrowid
|
|
|
|
|
|
def has_user_posted(bot_name, user_id):
|
|
user_id = str(user_id)
|
|
bot_id = get_or_create_bot(bot_name)
|
|
|
|
cursor.execute("""
|
|
SELECT EXISTS(
|
|
SELECT 1
|
|
FROM posted_already
|
|
WHERE bot_id = ? AND user_id = ?
|
|
LIMIT 1
|
|
)
|
|
""", (bot_id, user_id))
|
|
|
|
return bool(cursor.fetchone()[0])
|
|
|
|
def mark_user_posted(bot_name, user_id):
|
|
user_id = str(user_id)
|
|
bot_id = get_or_create_bot(bot_name)
|
|
|
|
try:
|
|
cursor.execute("""
|
|
INSERT INTO posted_already (bot_id, user_id)
|
|
VALUES (?, ?)
|
|
""", (bot_id, user_id))
|
|
return True
|
|
except sqlite3.IntegrityError:
|
|
return False
|
|
|
|
def unmark_user_posted(bot_name, user_id):
|
|
user_id = str(user_id)
|
|
bot_id = get_or_create_bot(bot_name)
|
|
|
|
cursor.execute("""
|
|
DELETE FROM posted_already
|
|
WHERE bot_id = ? AND user_id = ?
|
|
""", (bot_id, user_id))
|
|
|
|
return cursor.rowcount > 0 # Returns True if a row was deleted
|
|
|
|
def db_close():
|
|
try:
|
|
cursor.close()
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# It is not safe to get notifications from "last_id" because some may have been deleted
|
|
def get_new_notifications(api, bot_name, types=None):
|
|
last_notifications=list_read(bot_name + '_last_notifications')
|
|
notifications = api.notifications(types=types)
|
|
new_notifications = []
|
|
new_notifications_ids = []
|
|
max_notification = 0
|
|
if len(notifications) < 2:
|
|
max_notification = len(notifications)
|
|
else:
|
|
max_notification = len(notifications) // 2
|
|
|
|
for i in range(0, max_notification):
|
|
if str(notifications[i]['id']) not in last_notifications:
|
|
new_notifications.append(notifications[i])
|
|
|
|
for n in notifications:
|
|
new_notifications_ids.append(n['id'])
|
|
|
|
list_write(bot_name + "_last_notifications", new_notifications_ids)
|
|
return new_notifications
|
|
|
|
women_pronouns = ["she","her","ella","illa"]
|
|
nb_pronouns = ["they","them","elle", "ille"]
|
|
|
|
def is_gender(pronouns, account):
|
|
for pronoun in pronouns:
|
|
if(pronoun in account.display_name.lower()):
|
|
return True
|
|
for field in account.fields:
|
|
if(pronoun in field.value.lower()):
|
|
return True
|
|
if(pronoun in account.note.lower()):
|
|
return True
|
|
return False
|
|
|
|
def get_gender(account):
|
|
if(is_gender(nb_pronouns, account)):
|
|
return 2
|
|
if(is_gender(women_pronouns,account)):
|
|
return 1
|
|
return 0
|