diff --git a/apreciabot.py b/apreciabot.py index 96f592d..23ce7d3 100755 --- a/apreciabot.py +++ b/apreciabot.py @@ -1,12 +1,20 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + from bs4 import BeautifulSoup from common import get_api from common import list_append from common import list_read from common import list_write +from common import status_reply +import mastodon import json import os import click import click_config_file +import logging +from logging.handlers import SysLogHandler +import sys class load_custom_messages(): @@ -20,6 +28,11 @@ class apreciabot(): def __init__(self, **kwargs): # Initialization self.kwargs = kwargs + if 'log_file' not in kwargs or kwargs['log_file'] is None: + log_file = os.path.join(os.environ.get('HOME', os.environ.get('USERPROFILE', os.getcwd())), 'log', 'apreciabot.log') + self.kwargs['log_file'] = log_file + self._init_log() + self.custom_messages = load_custom_messages(self.kwargs['custom_message_file']).custom_messages bot_name = self.custom_messages[self.kwargs['language']]['apreciabot']['bot_name'] @@ -44,7 +57,7 @@ class apreciabot(): # Some notifications may have been deleted since last fetch # Therefore, it is better to check less than the maximum number of notifications if len(notifications) < 1: - print(self.custom_messages[self.kwargs['language']]['apreciabot']['no_notifications']) + self._log.info(self.custom_messages[self.kwargs['language']]['apreciabot']['no_notifications']) else: # for i in range(0, max_notifications - 5): # # (adelgado) I'm not sure why this previous loop, but if there are less than 5 notifications, @@ -62,12 +75,12 @@ class apreciabot(): target = "@" + content[1] user = "@" + n['account']['acct'] except: - api.status_reply(n['status'], mensaje_error) + status_reply(api, n['status'], mensaje_error) continue # The bot is meant to be anonymous so only allow directs if n['status']['visibility'] == "direct": if user == target: - api.status_reply(n['status'], mensaje_mismo, visibility="unlisted") + status_reply(api, n['status'], mensaje_mismo, visibility="unlisted") else: # Find account if it is not known by the server api.search(target, result_type="accounts") @@ -77,7 +90,7 @@ class apreciabot(): api.status_post(user + mensaje_no_encontrado, in_reply_to_id=n['status']['id'], visibility="direct" ) else: if "nobot" in bio['note']: - api.status_reply(n['status'], mensaje_nobot) + status_reply(api, n['status'], mensaje_nobot) else: #api.status_post(mensaje + target + "!", in_reply_to_id=n['status']['id'], visibility="unlisted") if ("croqueta" in content @@ -88,13 +101,50 @@ class apreciabot(): new_status = api.status_post(target + " " + mensaje_croqueta, visibility="unlisted") else: new_status = api.status_post(mensaje + target + "!", visibility="unlisted") - api.status_reply(n['status'], mensaje_muestra_aprecio_enviada + new_status['url'], visibility="direct") + status_reply(api, n['status'], mensaje_muestra_aprecio_enviada + new_status['url'], visibility="direct") elif first_mention == "@" + bot_name and n['status']['in_reply_to_id'] == None: - api.status_reply(n['status'], mensaje_aviso, visibility='direct') + status_reply(api, n['status'], mensaje_aviso, visibility='direct') list_write(bot_name, new_last_ids) + def _init_log(self): + ''' Initialize log object ''' + self._log = logging.getLogger("apreciabot") + self._log.setLevel(logging.DEBUG) + + sysloghandler = SysLogHandler() + sysloghandler.setLevel(logging.DEBUG) + self._log.addHandler(sysloghandler) + + streamhandler = logging.StreamHandler(sys.stdout) + streamhandler.setLevel(logging.getLevelName(self.kwargs.get("debug_level", 'INFO'))) + self._log.addHandler(streamhandler) + + if 'log_file' in self.kwargs: + log_file = self.kwargs['log_file'] + else: + home_folder = os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + log_folder = os.path.join(home_folder, "log") + log_file = os.path.join(log_folder, "apreciabot.log") + + if not os.path.exists(os.path.dirname(log_file)): + os.mkdir(os.path.dirname(log_file)) + + filehandler = logging.handlers.RotatingFileHandler(log_file, maxBytes=102400000) + # create formatter + formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + filehandler.setFormatter(formatter) + filehandler.setLevel(logging.DEBUG) + self._log.addHandler(filehandler) + return True + @click.command() +@click.option("--debug-level", "-d", default="INFO", + type=click.Choice( + ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"], + case_sensitive=False, + ), help='Set the debug level for the standard output.') +@click.option('--log-file', '-L', help="File to store all debug messages.") @click.option('--language', '-l', default='es', help="Language.") @click.option('--custom-message-file', '-j', default='custom_messages.json', help='JSON file containing the messages.') @click.option('--instance-name', '-i', default='masto.es', help='Instance FQDN') diff --git a/click b/click new file mode 100644 index 0000000..e69de29 diff --git a/click_config_file b/click_config_file new file mode 100644 index 0000000..e69de29 diff --git a/common.py b/common.py index f99297f..316f054 100755 --- a/common.py +++ b/common.py @@ -1,20 +1,21 @@ -from mastodon import Mastodon +import mastodon from sys import exit +import re def get_api(url, token_name = ""): if token_name: try: file = open('token/' + token_name, 'r') except FileNotFoundError: - print('Token not found for ' + token_name) + print('Token not found for ' + token_name + ' in "token/'+ token_name + '"') exit() else: - token = file.read().splitlines()[0] + token = file.read().splitlines()[0].strip() file.close() else: token = "" - return Mastodon(access_token = token, api_base_url = url) + return mastodon.Mastodon(access_token = token, api_base_url = url) def list_read(name): try: @@ -55,3 +56,23 @@ def get_new_notifications(api, bot_name, types=None): list_write(bot_name + "_last_notifications", new_notifications_ids) return new_notifications +def status_reply(api, post, message, visibility): + try: + api.status_reply(post, message, visibility=visibility) + except mastodon.errors.MastodonAPIError as error: + match_len_limit = re.search(r'Text character limit of ([0-9]*) exceeded', str(error)) + if match_len_limit: + max_post_size = int(match_len_limit.group(1)) - 10 + #split_message = [message[i:i+max_post_size] for i in range(0, len(message), max_post_size)] + split_message = message.split('\n\n') + counter = 1 + for chunk_message in split_message: + try: + api.status_reply(post, f"{chunk_message} {counter}/{len(split_message)}", visibility=visibility) + counter += 1 + except mastodon.errors.MastodonAPIError as error: + print(f"Error posting: {error}") + exit(1) + else: + print(f"Error posting: {error}") + exit(1) diff --git a/custom_messages.json b/custom_messages.json index fb76b39..618d20e 100644 --- a/custom_messages.json +++ b/custom_messages.json @@ -13,7 +13,7 @@ "no_notifications": "No hay notificationes" }, "describot": { - "mensaje": "¡Hola! He detectado que has publicado imágenes o vídeo sin texto alternativo. Añadir una descripción de texto alternativa a tus vídeos e imágenes es esencial para que las personas con alguna discapacidad visual puedan disfrutar de nuestras publicaciones. \n\n Por favor, considera añadir texto alternativo a tus publicaciones la próxima vez (o edita esta publicación para añadírselo). Si necesitas ayuda para saber cómo hacerlo, consulta la publicación fijada en mi perfil: https://masto.es/@TeLoDescribot/110249937862873987 \n\n ¡Gracias por hacer de este espacio un lugar más accesible para todos! \n\n Bip bop. Esta es una cuenta automatizada, si no quieres que te mencione más, eres libre de bloquearme.", + "mensaje": "¡Hola! Has publicado imágenes o vídeo sin texto alternativo. Añadir una descripción de texto alternativa a tus vídeos e imágenes es esencial para que las personas con alguna discapacidad visual puedan disfrutar de nuestras publicaciones. \n\n Por favor, considera añadir texto alternativo a tus publicaciones la próxima vez (o edita esta publicación para añadírselo). Si necesitas ayuda para saber cómo hacerlo, consulta la publicación fijada en mi perfil: https://masto.es/@TeLoDescribot/110249937862873987 \n\n ¡Gracias por hacer de este espacio un lugar más accesible para todos! \n\n Bip bop. Esta es una cuenta automatizada, si no quieres que te mencione más, eres libre de bloquearme.", "bot_name": "describot" }, "federabot": { diff --git a/describot.py b/describot.py index ee384de..3109a25 100755 --- a/describot.py +++ b/describot.py @@ -1,12 +1,22 @@ +#!/usr/bin/env python3 +# -*- encoding: utf-8 -*- + from common import get_api from common import list_append from common import list_read from common import list_write from common import get_new_notifications +from common import status_reply +import mastodon import json import os import click import click_config_file +import mastodon +import re +import logging +from logging.handlers import SysLogHandler +import sys class load_custom_messages(): @@ -14,56 +24,112 @@ class load_custom_messages(): if os.path.exists(custom_message_file): with open(custom_message_file, 'r') as messages_pointer: custom_messages = json.load(messages_pointer) - return custom_messages + self.custom_messages = custom_messages class describot(): def __init__(self, **kwargs): # Initialization self.kwargs = kwargs + if 'log_file' not in kwargs or kwargs['log_file'] is None: + log_file = os.path.join(os.environ.get('HOME', os.environ.get('USERPROFILE', os.getcwd())), 'log', 'describot.log') + self.kwargs['log_file'] = log_file + self._init_log() + self.custom_messages = load_custom_messages(self.kwargs['custom_message_file']).custom_messages - messages = self.custom_messages[self.kwargs['language']]['describot'] - bot_name = messages['describot']['bot_name'] - + self.messages = self.custom_messages[self.kwargs['language']]['describot'] + self.bot_name = self.messages['bot_name'] - api_internal = get_api(self.kwargs['instance_name'], bot_name) - max_posts=20 - warned=[] + self.api_internal = get_api(self.kwargs['instance_name'], self.bot_name) + self.max_posts=20 + self.warned=[] - following = list_read(bot_name + "_following") - - def check_timeline(domain, api_external, timeline_name = 'local'): - last_ids = list_read(bot_name + "_" + domain + "_last_ids") - warned.extend(list_read(bot_name + "_" + domain)) - timeline = api_external.timeline(timeline=timeline_name, limit=max_posts) - new_last_ids=[] - for post in timeline: - new_last_ids.append(post['id']) - for i in range(0, len(timeline) - 2): - post = timeline[i] - if str(post['id']) not in last_ids and (str(post['account']['acct']) not in warned or (timeline_name == 'home' and post['account']['acct'] in following)): - for media in post['media_attachments']: - if media['description'] is None: - print('Warning ' + post['account']['acct']) - api_internal.status_reply(post, messages['describot']['mensaje'], visibility="unlisted") - warned.append(post['account']['acct']) - if domain != 'home': - list_append(bot_name + "_" + domain, post['account']['acct']) - break - list_write(bot_name + "_" + domain + "_last_ids", new_last_ids) - - notifications = get_new_notifications(api_internal, bot_name, types=['follow']) + self._log.debug('Getting list of followed...') + self.following = list_read(self.bot_name + "_following") + self._log.debug('Getting new notifications...') + notifications = get_new_notifications(self.api_internal, self.bot_name, types=['follow']) + self._log.debug(f"Gotten {len(notifications)} notifications") for n in notifications: - if n['account']['acct'] not in following: - print("Following: " + n['account']['acct']) + self._log.debug(n) + if n['account']['acct'] not in self.following: + self._log.info("Following: " + n['account']['acct']) api_internal.account_follow(n['account']['id']) - following.append(n['account']['acct']) - list_append(bot_name + "_following", n['account']['acct']) + self.following.append(n['account']['acct']) + list_append(self.bot_name + "_following", n['account']['acct']) + else: + self._log.debug(f"Already following {n['account']['acct']}.") + + self.check_timeline(self.kwargs['instance_name'], self.api_internal) + self.check_timeline('home', self.api_internal, timeline_name='home') - check_timeline(self.kwargs['instance_name'], api_internal) - check_timeline('home', api_internal, timeline_name='home') + def check_timeline(self, domain, api_external, timeline_name = 'local'): + self._log.debug(f"Checking timeline of domain '{domain}' with name '{timeline_name}'...") + last_ids = list_read(self.bot_name + "_" + domain + "_last_ids") + self.warned.extend(list_read(self.bot_name + "_" + domain)) + timeline = api_external.timeline(timeline=timeline_name, limit=self.max_posts) + new_last_ids=[] + self._log.debug(f"Gotten {len(timeline)} posts in the timeline") + for post in timeline: + new_last_ids.append(post['id']) + for i in range(0, len(timeline) - 2): + post = timeline[i] + if str(post['id']) not in last_ids and ( + str(post['account']['acct']) not in self.warned or ( + timeline_name == 'home' and post['account']['acct'] in self.following + ) + ): + for media in post['media_attachments']: + if media['description'] is None: + self._log.warning('Warning ' + post['account']['acct']) + status_reply(self.api_internal, post, self.messages['mensaje'], visibility="unlisted") + self.warned.append(post['account']['acct']) + if domain != 'home': + list_append(self.bot_name + "_" + domain, post['account']['acct']) + break + else: + self._log.debug(f"Post {post['id']} has media with description") + else: + self._log.debug(f"Ignoring post {post['id']}") + list_write(self.bot_name + "_" + domain + "_last_ids", new_last_ids) + + def _init_log(self): + ''' Initialize log object ''' + self._log = logging.getLogger("describot") + self._log.setLevel(logging.DEBUG) + + sysloghandler = SysLogHandler() + sysloghandler.setLevel(logging.DEBUG) + self._log.addHandler(sysloghandler) + + streamhandler = logging.StreamHandler(sys.stdout) + streamhandler.setLevel(logging.getLevelName(self.kwargs.get("debug_level", 'INFO'))) + self._log.addHandler(streamhandler) + + if 'log_file' in self.kwargs: + log_file = self.kwargs['log_file'] + else: + home_folder = os.environ.get('HOME', os.environ.get('USERPROFILE', '')) + log_folder = os.path.join(home_folder, "log") + log_file = os.path.join(log_folder, "describot.log") + + if not os.path.exists(os.path.dirname(log_file)): + os.mkdir(os.path.dirname(log_file)) + + filehandler = logging.handlers.RotatingFileHandler(log_file, maxBytes=102400000) + # create formatter + formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') + filehandler.setFormatter(formatter) + filehandler.setLevel(logging.DEBUG) + self._log.addHandler(filehandler) + return True @click.command() +@click.option("--debug-level", "-d", default="INFO", + type=click.Choice( + ["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG", "NOTSET"], + case_sensitive=False, + ), help='Set the debug level for the standard output.') +@click.option('--log-file', '-L', help="File to store all debug messages.") @click.option('--language', '-l', default='es', help="Language.") @click.option('--custom-message-file', '-j', default='custom_messages.json', help='JSON file containing the messages.') @click.option('--instance-name', '-i', default='masto.es', help='Instance FQDN') diff --git a/sample.conf b/sample.conf new file mode 100644 index 0000000..5719791 --- /dev/null +++ b/sample.conf @@ -0,0 +1,2 @@ +instance_name="social.koti.site" +debug_level="DEBUG"