import os import sys import time import sched from datetime import datetime, timezone, timedelta from collections import deque import toml import re import traceback import bogofilter import html2text import urllib config_path = os.path.join(os.path.dirname(sys.argv[0]), "config.toml") loaded_config = { "name": "cringebot", "ocr": False, **toml.load(config_path)} OCR = loaded_config["ocr"] # TODO: Move OCR support to separate module if OCR: from PIL import Image import pytesseract from mastodon import Mastodon, MastodonNotFoundError from fedbot.bot import Bot, BotClient SEASON = { **{ i : "spring" for i in range(3, 6) }, **{ i : "summer" for i in range(6, 9) }, **{ i : "autumn" for i in range(9, 12) }, **{ i : "winter" for i in [12, 1, 2] }} TIME_OF_DAY = { **{ i : "night" for i in range(0, 4) }, **{ i : "early" for i in range(4, 8) }, **{ i : "morning" for i in range(8, 12) }, **{ i : "afternoon" for i in range(12, 18) }, **{ i : "evening" for i in range(18, 24) }} class CringeBotClient(BotClient): def __init__(self, bot, config): config = { "app_name": "Cringebot", "rate_limit": 3, "retry_rate": 60, "poll_interval": 15, "max_age": 600, "db_dir": ".", "cringe_dir": "data/cringe", "based_dir": "data/based", "unsure_dir": "data/unsure", "register": False, **config} self.db_dir = os.path.join(os.path.dirname(sys.argv[0]), config["db_dir"]) # Initialise HTML-to-Markdown converter self.h2t = html2text.HTML2Text() self.h2t.ignore_links = True # Create scheduler for deferred deletion of posts self.deletion_scheduler = sched.scheduler(time.time, time.sleep) super().__init__(bot, config) # Send DM reply to message, appropriately tagged, and schedules it for deferred deletion def respond(self, status, message, context): self.log("Responding with:") self.log(message) self.log() reply = self.api.status_reply(status, message, visibility = "direct", untag = True) self.state["own"][reply["id"]] = context self.enqueue_deletion(reply["id"]) time.sleep(self.config["rate_limit"]) def on_start(self): self.deletion_report() def on_poll(self): # Perform any scheduled deletes self.deletion_scheduler.run(blocking = False) def get_cringe_path(self, status_id): return os.path.join(os.path.dirname(sys.argv[0]), self.config["cringe_dir"], status_id) def get_based_path(self, status_id): return os.path.join(os.path.dirname(sys.argv[0]), self.config["based_dir"], status_id) def get_unsure_path(self, status_id): return os.path.join(os.path.dirname(sys.argv[0]), self.config["unsure_dir"], status_id) def is_cringe(self, status_id): path = self.get_cringe_path(status_id) return path if os.path.isfile(path) else None def is_based(self, status_id): path = self.get_based_path(status_id) return path if os.path.isfile(path) else None def is_unsure(self, status_id): path = self.get_unsure_path(status_id) return path if os.path.isfile(path) else None def get_category_path(self, status_id): return self.is_cringe(status_id) or self.is_based(status_id) or self.is_unsure(status_id) or None def delete_and_write(self, delete_path, write_path, text, mode = "a+"): if delete_path and os.path.isfile(delete_path): os.remove(delete_path) os.makedirs(os.path.dirname(write_path), exist_ok = True) with open(write_path, mode, encoding = "utf-8") as text_file: text_file.write(text) def make_cringe(self, status_id, text): self.delete_and_write(self.get_category_path(status_id), self.get_cringe_path(status_id), text + "\n") def make_based(self, status_id, text): self.delete_and_write(self.get_category_path(status_id), self.get_based_path(status_id), text + "\n") def make_unsure(self, status_id, text): self.delete_and_write(self.get_category_path(status_id), self.get_unsure_path(status_id), text + "\n") # Look for replies to the bot and return True if commands were processed def process_commands(self, status): status_id = status["id"] parent_id = status["in_reply_to_id"] # Check if bot owns the parent status if parent_id not in self.state["own"]: return False context = self.state["own"][parent_id] event = context["event"] target_id = context["target"] # Enqueue command status for deletion self.enqueue_deletion(status_id) try: command = self.h2t.handle(status["content"]).strip() self.log("Received command: {}".format(command)) # Fetch the target status target_status = self.api.status(target_id) target_mail = toot_dict_to_mail(target_status) target_mail_text = target_mail.format() # TODO: Move status classification system to separate module for use in other bots tokens = deque(command.split()) while True: token = tokens.popleft() if token == "cringe": if len(target_mail.body) == 0: self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id}) return True elif event not in ["categorise", "learn"]: self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id}) break elif self.is_cringe(target_id): self.respond(status, "Status was already learned as cringe", {"event": "error", "type": "redundant", "target": target_id}) break elif self.is_based(target_id): bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM]) else: bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.LEARN_SPAM]) self.make_cringe(target_id, target_mail_text) self.enqueue_deletion(target_id) self.respond(status, "Learned as cringe", {"event": "learn", "target": target_id}) break elif token == "based": if len(target_mail.body) == 0: self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id}) return True elif event not in ["categorise", "learn"]: self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id}) break elif self.is_based(target_id): self.respond(status, "Status was already learned as based", {"event": "error", "type": "redundant", "target": target_id}) break elif self.is_cringe(target_id): bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM]) else: bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.LEARN_HAM]) self.make_based(target_id, target_mail_text) self.unqueue_deletion(target_id) self.respond(status, "Learned as based", {"event": "learn", "target": target_id}) break elif token == "unlearn": if len(target_mail.body) == 0: self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id}) return True elif event not in ["categorise", "learn"]: self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id}) break elif self.is_unsure(target_id): self.respond(status, "Status was already unsure", {"event": "error", "type": "redundant", "target": target_id}) break elif self.is_cringe(target_id): bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_SPAM]) elif self.is_based(target_id): bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_HAM]) self.make_unsure(target_id, target_mail_text) self.unqueue_deletion(target_id) self.respond(status, "Unlearned", {"event": "learn", "target": target_id}) break except IndexError: self.respond(status, "Invalid command", {"event": "error", "type": "syntax", "target": target_id}) except MastodonNotFoundError: self.respond(status, "Could not fetch target status", {"event": "error", "type": "fetch", "target": target_id}) return True def on_status(self, status): # Ignore statuses from other accounts if status["account"]["id"] != self.api.me()["id"]: return # Ignore statuses this account boosts if status["reblog"]: return status_id = status["id"] #if "Categorised as" in status["content"] and status["visibility"] == "direct": # print("Deleting own status", status["id"]) # self.enqueue_deletion(status_id) # return #else: # return # Ignore bot's own statuses if status_id in self.state["own"]: return # Create faux HTML email of status mail = toot_dict_to_mail(status) mail_text = mail.format() self.log() self.log(mail_text) self.log() if len(mail.body) == 0: self.log("Not classifying {} because it has no content".format(status_id)) return # Process any commands if self.process_commands(status): return result = bogofilter.run(mail_text, db_dir = self.db_dir, actions = [bogofilter.CLASSIFY, bogofilter.REGISTER] if self.config["register"] else [bogofilter.CLASSIFY]) bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score)) if result.category == bogofilter.SPAM: self.log("CRINGE: Enqueuing status {} for deletion".format(status_id)) if self.config["register"]: self.make_cringe(status_id, mail_text) self.enqueue_deletion(status_id) self.respond(status, "Categorised as cringe\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id}) elif result.category == bogofilter.HAM: self.log("BASED: Not enqueueing status {} for deletion".format(status_id)) if self.config["register"]: self.make_based(status_id, mail_text) self.respond(status, "Categorised as based\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id}) else: self.log("UNSURE: Not enqueueing status {} for deletion".format(status_id)) if self.config["register"]: self.make_unsure(status_id, mail_text) self.respond(status, "Categorised as unsure\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id}) def on_load_state(self): state = { "deletion_queue": {}, "own": {}, **super().on_load_state()} for status_id, params in state["deletion_queue"].items(): # Load deletion queue into scheduler params["scheduler_event"] = self.deletion_scheduler.enterabs(datetime.fromisoformat(params["time"]).timestamp(), 1, self.queued_delete, argument=(status_id,)) return state def on_save_state(self, state): # Transform deletion scheduler queue to a JSON friendly format state["deletion_queue"] = {event.argument[0]: {"time": datetime.fromtimestamp(event.time, timezone.utc).isoformat()} for event in self.deletion_scheduler.queue} super().on_save_state(state) def deletion_report(self): self.log("{} status(es) queued for deletion".format(len(self.deletion_scheduler.queue))) def enqueue_deletion(self, status_id, delay = None): if delay is None: delay = 60 * self.config["max_age"] self.state["deletion_queue"][status_id] = {"scheduler_event": self.deletion_scheduler.enter(delay, 1, self.queued_delete, argument=(status_id,), kwargs={})} self.deletion_report() def unqueue_deletion(self, status_id): try: params = self.state["deletion_queue"].pop(status_id) self.deletion_scheduler.cancel(params["scheduler_event"]) except KeyError: self.log("Cannot unqueue non-queued status {} for deletion".format(status_id)) self.deletion_report() def queued_delete(self, status_id): try: self.log("Deleting status {}".format(status_id)) self.api.status_delete(status_id) if status_id in self.state["own"]: del self.state["own"][status_id] except MastodonNotFoundError: self.log("Cannot find status {} on server".format(status_id)) except Exception: self.log(traceback.format_exc()) self.enqueue_deletion(status_id, 300) # Bogofilter lexer uses these regular expressions: # FRONT_CHAR [^[:blank:][:cntrl:][:digit:][:punct:]] # MID_CHAR [^[:blank:][:cntrl:]:$*<>;=()&%#@+|/\\{}^\"?,\[\]] # BACK_CHAR [^[:blank:][:cntrl:]:$*<>;=()&%#@+|/\\{}^\"?,\[\]._~\'\`\-] # TOKEN {FRONT_CHAR}({MID_CHAR}*{BACK_CHAR})? # C standard library defines these character classes: # ispunct() !"#$%&'()*+,-./:;<=>?@[\]^_`{|} # iscntrl() Between ASCII codes 0x00 (NUL) and 0x1f (US), plus 0x7f (DEL) # Contruct regular expression for tokens based on the above: PUNCT = r"""!:"#\$%&'\(\)\*\+,\-\./:;<=>\?@\[\\\]\^_`\{\|\}""" CNTRL = r"\x00-\x1f\x7f" FRONT_CHAR = rf"[^\s{CNTRL}\d{PUNCT}]" MID_CHAR = rf"""[^\s{CNTRL}:\$\*<>;=\(\)&%#@\+\|/\\\{{\}}\^"\?,\[\]]""" BACK_CHAR = rf"""[^\s{CNTRL}:\$\*<>;=\(\)&%#@\+\|/\\\{{\}}\^"\?,\[\]\._~'`\-]""" TOKEN = f"({FRONT_CHAR}({MID_CHAR}*{BACK_CHAR})?)" def toot_dict_to_mail(toot_dict): flags = [] flags.append(toot_dict["visibility"]) if toot_dict["sensitive"]: flags.append("sensitive") if toot_dict["poll"]: flags.append("poll") if len(toot_dict["media_attachments"]) > 0: flags.append("attachments") time = [] now = datetime.now() time.append(SEASON[now.month]) time.append(TIME_OF_DAY[now.hour]) headers = {} headers["From"] = toot_dict["account"]["acct"] headers["X-Flags"] = ", ".join(flags) headers["X-Time"] = ", ".join(time) if len(toot_dict["spoiler_text"]) > 0: headers["Subject"] = toot_dict["spoiler_text"] body = toot_dict["content"] if OCR: for media in toot_dict["media_attachments"]: if media["type"] == "image": try: with urllib.request.urlopen(media["url"]) as image: ocr_text = pytesseract.image_to_string(Image.open(image)) #print("ocr_text =", ocr_text) words = [match[0] for match in filter(lambda match: len(match[1]) > 0, re.findall(TOKEN, ocr_text))] #print("words =", words) tokens = ["ocr-" + word.lower() for word in words] #print("tokens =", tokens) body += "\n\n" + " ".join(tokens) except Exception: print("Skipping OCR on attachment due to exception") print(traceback.format_exc()) return bogofilter.Mail(headers = headers, body = body) bot = Bot(CringeBotClient, loaded_config) del loaded_config bot.start() while True: time.sleep(1)