now use sqlite for who posted before.

This commit is contained in:
Moon Man 2024-12-16 13:56:43 +00:00
parent 64a5d0c040
commit 72eac07711
3 changed files with 78 additions and 7 deletions

1
.gitignore vendored
View File

@ -4,3 +4,4 @@ token
./*.png ./*.png
.env .env
feditree/feditree.png feditree/feditree.png
bots.db

View File

@ -1,3 +1,4 @@
import sqlite3
from mastodon import Mastodon from mastodon import Mastodon
from sys import exit from sys import exit
@ -39,6 +40,77 @@ def list_append(name, value):
file.write(value + '\n') file.write(value + '\n')
file.close() file.close()
conn = sqlite3.connect("bots.db", isolation_level=None)
cursor = conn.cursor()
cursor.executescript('''
create table if not exists bots (
id integer primary key autoincrement,
name text not null
);
create table if not exists posted_already (
id integer primary key autoincrement,
bot_id integer not null,
user_id text not null,
created timestamp default current_timestamp,
unique(bot_id, user_id),
foreign key (bot_id) references bots(id)
);
''')
def get_or_create_bot(name):
cursor.execute("SELECT id FROM bots WHERE name = ?", (name,))
result = cursor.fetchone()
if result:
return result[0]
cursor.execute("INSERT INTO bots (name) VALUES (?)", (name,))
return cursor.lastrowid
def has_user_posted(bot_name, user_id):
user_id = str(user_id)
bot_id = get_or_create_bot(bot_name)
cursor.execute("""
SELECT EXISTS(
SELECT 1
FROM posted_already
WHERE bot_id = ? AND user_id = ?
LIMIT 1
)
""", (bot_id, user_id))
return bool(cursor.fetchone()[0])
def mark_user_posted(bot_name, user_id):
user_id = str(user_id)
bot_id = get_or_create_bot(bot_name)
try:
cursor.execute("""
INSERT INTO posted_already (bot_id, user_id)
VALUES (?, ?)
""", (bot_id, user_id))
return True
except sqlite3.IntegrityError:
return False
def unmark_user_posted(bot_name, user_id):
user_id = str(user_id)
bot_id = get_or_create_bot(bot_name)
cursor.execute("""
DELETE FROM posted_already
WHERE bot_id = ? AND user_id = ?
""", (bot_id, user_id))
return cursor.rowcount > 0 # Returns True if a row was deleted
# It is not safe to get notifications from "last_id" because some may have been deleted # It is not safe to get notifications from "last_id" because some may have been deleted
def get_new_notifications(api, bot_name, types=None): def get_new_notifications(api, bot_name, types=None):
last_notifications=list_read(bot_name + '_last_notifications') last_notifications=list_read(bot_name + '_last_notifications')

View File

@ -1,7 +1,7 @@
from dotenv import load_dotenv from dotenv import load_dotenv
from PIL import Image, ImageFile from PIL import Image, ImageFile
from common import get_api, get_new_notifications, list_read, list_append from common import get_api, get_new_notifications, list_read, list_append, has_user_posted, mark_user_posted, unmark_user_posted
import datetime import datetime
import gettext import gettext
import requests import requests
@ -120,20 +120,18 @@ def get_filtered_notifications():
localedir = './locales' localedir = './locales'
api = get_api(os.getenv("FEDI_DOMAIN"), bot_name) api = get_api(os.getenv("FEDI_DOMAIN"), bot_name)
notifications = get_filtered_notifications() notifications = get_filtered_notifications()
previous_ids = list_read(bot_name + "_previous_ids")
i18n = gettext.translation(bot_name, localedir, fallback=True, languages=['en']) i18n = gettext.translation(bot_name, localedir, fallback=True, languages=['en'])
i18n.install() i18n.install()
for notification in notifications: for notification in notifications:
try: try:
if str(notification.account.id) in previous_ids: if has_user_posted(bot_name, notification.account.id):
status = "@" + notification.account.acct + " " status = "@" + notification.account.acct + " "
status += _("I have already generated a faggotree for you this year. Try again next year!") status += _("I have already generated a faggotree for you this year. Try again next year!")
api.status_post(status, visibility="direct", in_reply_to_id=notification.status.id) api.status_post(status, visibility="direct", in_reply_to_id=notification.status.id)
continue continue
else: else:
list_append(bot_name + "_previous_ids", str(notification.account.id)) mark_user_posted(bot_name, notification.account.id)
accounts_ids = get_ordered_accounts_ids(notification.account.id) accounts_ids = get_ordered_accounts_ids(notification.account.id)
accounts = get_accounts(accounts_ids) accounts = get_accounts(accounts_ids)
image = create_image(accounts) image = create_image(accounts)
@ -142,8 +140,8 @@ for notification in notifications:
if account.username != notification.account.username: if account.username != notification.account.username:
status += " - " + account.acct + "\n" status += " - " + account.acct + "\n"
api.status_post(status, media_ids=image, visibility="unlisted", in_reply_to_id=notification.status.id) api.status_post(status, media_ids=image, visibility="unlisted", in_reply_to_id=notification.status.id)
previous_ids.append(notification.account.id)
except Exception as e: except Exception as e:
unmark_user_posted(bot_name, notification.account.id)
exc_type, exc_obj, exc_tb = sys.exc_info() exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno) print(exc_type, fname, exc_tb.tb_lineno)