import os import sys import time import sched from datetime import datetime, timezone, timedelta from collections import deque import toml import re import bogofilter import html2text import urllib config_path = os.path.join(os.path.dirname(sys.argv[0]), "config.toml") loaded_config = { "name": "cringebot", "ocr": False, **toml.load(config_path)} OCR = loaded_config["ocr"] # TODO: Move OCR support to separate module if OCR: from PIL import Image import pytesseract from mastodon import Mastodon, MastodonNotFoundError from fedbot.bot import Bot, BotClient SEASON = { **{ i : "spring" for i in range(3, 6) }, **{ i : "summer" for i in range(6, 9) }, **{ i : "autumn" for i in range(9, 12) }, **{ i : "winter" for i in [12, 1, 2] }} TIME_OF_DAY = { **{ i : "night" for i in range(0, 4) }, **{ i : "early" for i in range(4, 8) }, **{ i : "morning" for i in range(8, 12) }, **{ i : "afternoon" for i in range(12, 18) }, **{ i : "evening" for i in range(18, 24) }} class CringeBotClient(BotClient): def __init__(self, bot, config): config = { "app_name": "Cringebot", "rate_limit": 3, "retry_rate": 60, "poll_interval": 15, "max_age": 600, "db_dir": ".", "cringe_dir": "data/cringe", "based_dir": "data/based", "unsure_dir": "data/unsure", "register": False, **config} self.db_dir = os.path.join(os.path.dirname(sys.argv[0]), config["db_dir"]) # Initialise HTML-to-Markdown converter self.h2t = html2text.HTML2Text() self.h2t.ignore_links = True # Create scheduler for deferred deletion of posts self.deletion_scheduler = sched.scheduler(time.time, time.sleep) super().__init__(bot, config) # Send DM reply to message, appropriately tagged, and schedules it for deferred deletion def respond(self, status, message, context): self.log("Responding with:") self.log(message) self.log() reply = self.api.status_reply(status, message, visibility = "direct", untag = True) self.state["own"][reply["id"]] = context self.enqueue_deletion(reply["id"]) time.sleep(self.config["rate_limit"]) def on_start(self): self.deletion_report() def on_poll(self): # Perform any scheduled deletes self.deletion_scheduler.run(blocking = False) def get_cringe_path(self, status_id): return os.path.join(os.path.dirname(sys.argv[0]), self.config["cringe_dir"], status_id) def get_based_path(self, status_id): return os.path.join(os.path.dirname(sys.argv[0]), self.config["based_dir"], status_id) def get_unsure_path(self, status_id): return os.path.join(os.path.dirname(sys.argv[0]), self.config["unsure_dir"], status_id) def is_cringe(self, status_id): path = self.get_cringe_path(status_id) return path if os.path.isfile(path) else None def is_based(self, status_id): path = self.get_based_path(status_id) return path if os.path.isfile(path) else None def is_unsure(self, status_id): path = self.get_unsure_path(status_id) return path if os.path.isfile(path) else None def get_category_path(self, status_id): return self.is_cringe(status_id) or self.is_based(status_id) or self.is_unsure(status_id) or None def delete_and_write(self, delete_path, write_path, text, mode = "a+"): if delete_path and os.path.isfile(delete_path): os.remove(delete_path) os.makedirs(os.path.dirname(write_path), exist_ok = True) with open(write_path, mode, encoding = "utf-8") as text_file: text_file.write(text) def make_cringe(self, status_id, text): self.delete_and_write(self.get_category_path(status_id), self.get_cringe_path(status_id), text + "\n") def make_based(self, status_id, text): self.delete_and_write(self.get_category_path(status_id), self.get_based_path(status_id), text + "\n") def make_unsure(self, status_id, text): self.delete_and_write(self.get_category_path(status_id), self.get_unsure_path(status_id), text + "\n") # Look for replies to the bot and return True if commands were processed def process_commands(self, status): status_id = status["id"] parent_id = status["in_reply_to_id"] # Check if bot owns the parent status if parent_id not in self.state["own"]: return False context = self.state["own"][parent_id] event = context["event"] target_id = context["target"] # Enqueue command status for deletion self.enqueue_deletion(status_id) try: command = self.h2t.handle(status["content"]).strip() self.log("Received command: {}".format(command)) # Fetch the target status target_status = self.api.status(target_id) target_mail = toot_dict_to_mail(target_status) target_mail_text = target_mail.format() # TODO: Move status classification system to separate module for use in other bots tokens = deque(command.split()) while True: token = tokens.popleft() if token == "cringe": if len(target_mail.body) == 0: self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id}) return True elif event not in ["categorise", "learn"]: self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id}) break elif self.is_cringe(target_id): self.respond(status, "Status was already learned as cringe", {"event": "error", "type": "redundant", "target": target_id}) break elif self.is_based(target_id): bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM]) else: bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.LEARN_SPAM]) self.make_cringe(target_id, target_mail_text) self.enqueue_deletion(target_id) self.respond(status, "Learned as cringe", {"event": "learn", "target": target_id}) break elif token == "based": if len(target_mail.body) == 0: self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id}) return True elif event not in ["categorise", "learn"]: self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id}) break elif self.is_based(target_id): self.respond(status, "Status was already learned as based", {"event": "error", "type": "redundant", "target": target_id}) break elif self.is_cringe(target_id): bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM]) else: bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.LEARN_HAM]) self.make_based(target_id, target_mail_text) self.unqueue_deletion(target_id) self.respond(status, "Learned as based", {"event": "learn", "target": target_id}) break elif token == "unlearn": if len(target_mail.body) == 0: self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id}) return True elif event not in ["categorise", "learn"]: self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id}) break elif self.is_unsure(target_id): self.respond(status, "Status was already unsure", {"event": "error", "type": "redundant", "target": target_id}) break elif self.is_cringe(target_id): bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_SPAM]) elif self.is_based(target_id): bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_HAM]) self.make_unsure(target_id, target_mail_text) self.unqueue_deletion(target_id) self.respond(status, "Unlearned", {"event": "learn", "target": target_id}) break except IndexError: self.respond(status, "Invalid command", {"event": "error", "type": "syntax", "target": target_id}) except MastodonNotFoundError: self.respond(status, "Could not fetch target status", {"event": "error", "type": "fetch", "target": target_id}) return True def on_status(self, status): # Ignore statuses from other accounts if status["account"]["id"] != self.api.me()["id"]: return # Ignore statuses this account boosts if status["reblog"]: return status_id = status["id"] #if "Categorised as" in status["content"] and status["visibility"] == "direct": # print("Deleting own status", status["id"]) # self.enqueue_deletion(status_id) # return #else: # return # Ignore bot's own statuses if status_id in self.state["own"]: return # Create faux HTML email of status mail = toot_dict_to_mail(status) mail_text = mail.format() self.log() self.log(mail_text) self.log() if len(mail.body) == 0: self.log("Not classifying {} because it has no content".format(status_id)) return # Process any commands if self.process_commands(status): return result = bogofilter.run(mail_text, db_dir = self.db_dir, actions = [bogofilter.CLASSIFY, bogofilter.REGISTER] if self.config["register"] else [bogofilter.CLASSIFY]) bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score)) if result.category == bogofilter.SPAM: self.log("CRINGE: Enqueuing status {} for deletion".format(status_id)) if self.config["register"]: self.make_cringe(status_id, mail_text) self.enqueue_deletion(status_id) self.respond(status, "Categorised as cringe\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id}) elif result.category == bogofilter.HAM: self.log("BASED: Not enqueueing status {} for deletion".format(status_id)) if self.config["register"]: self.make_based(status_id, mail_text) self.respond(status, "Categorised as based\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id}) else: self.log("UNSURE: Not enqueueing status {} for deletion".format(status_id)) if self.config["register"]: self.make_unsure(status_id, mail_text) self.respond(status, "Categorised as unsure\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id}) def on_load_state(self): state = { "deletion_queue": {}, "own": {}, **super().on_load_state()} for status_id, params in state["deletion_queue"].items(): # Load deletion queue into scheduler params["scheduler_event"] = self.deletion_scheduler.enterabs(datetime.fromisoformat(params["time"]).timestamp(), 1, self.queued_delete, argument=(status_id,)) return state def on_save_state(self, state): # Transform deletion scheduler queue to a JSON friendly format state["deletion_queue"] = {event.argument[0]: {"time": datetime.fromtimestamp(event.time, timezone.utc).isoformat()} for event in self.deletion_scheduler.queue} super().on_save_state(state) def deletion_report(self): self.log("{} status(es) queued for deletion".format(len(self.deletion_scheduler.queue))) def enqueue_deletion(self, status_id, delay = None): if delay is None: delay = 60 * self.config["max_age"] self.state["deletion_queue"][status_id] = {"scheduler_event": self.deletion_scheduler.enter(delay, 1, self.queued_delete, argument=(status_id,), kwargs={})} self.deletion_report() def unqueue_deletion(self, status_id): try: params = self.state["deletion_queue"].pop(status_id) self.deletion_scheduler.cancel(params["scheduler_event"]) except KeyError: self.log("Cannot unqueue non-queued status {} for deletion".format(status_id)) self.deletion_report() def queued_delete(self, status_id): try: self.log("Deleting status {}".format(status_id)) self.api.status_delete(status_id) if status_id in self.state["own"]: del self.state["own"][status_id] except MastodonNotFoundError: self.log("Cannot find status {} on server".format(status_id)) except Exception: self.log(traceback.format_exc()) self.enqueue_deletion(status_id, 300) # Bogofilter lexer uses these regular expressions: # FRONT_CHAR [^[:blank:][:cntrl:][:digit:][:punct:]] # MID_CHAR [^[:blank:][:cntrl:]:$*<>;=()&%#@+|/\\{}^\"?,\[\]] # BACK_CHAR [^[:blank:][:cntrl:]:$*<>;=()&%#@+|/\\{}^\"?,\[\]._~\'\`\-] # TOKEN {FRONT_CHAR}({MID_CHAR}*{BACK_CHAR})? # C standard library defines these character classes: # ispunct() !"#$%&'()*+,-./:;<=>?@[\]^_`{|} # iscntrl() Between ASCII codes 0x00 (NUL) and 0x1f (US), plus 0x7f (DEL) # Contruct regular expression for tokens based on the above: PUNCT = r"""!:"#\$%&'\(\)\*\+,\-\./:;<=>\?@\[\\\]\^_`\{\|\}""" CNTRL = r"\x00-\x1f\x7f" FRONT_CHAR = rf"[^\s{CNTRL}\d{PUNCT}]" MID_CHAR = rf"""[^\s{CNTRL}:\$\*<>;=\(\)&%#@\+\|/\\\{{\}}\^"\?,\[\]]""" BACK_CHAR = rf"""[^\s{CNTRL}:\$\*<>;=\(\)&%#@\+\|/\\\{{\}}\^"\?,\[\]\._~'`\-]""" TOKEN = f"({FRONT_CHAR}({MID_CHAR}*{BACK_CHAR})?)" def toot_dict_to_mail(toot_dict): flags = [] flags.append(toot_dict["visibility"]) if toot_dict["sensitive"]: flags.append("sensitive") if toot_dict["poll"]: flags.append("poll") if len(toot_dict["media_attachments"]) > 0: flags.append("attachments") time = [] now = datetime.now() time.append(SEASON[now.month]) time.append(TIME_OF_DAY[now.hour]) headers = {} headers["From"] = toot_dict["account"]["acct"] headers["X-Flags"] = ", ".join(flags) headers["X-Time"] = ", ".join(time) if len(toot_dict["spoiler_text"]) > 0: headers["Subject"] = toot_dict["spoiler_text"] body = toot_dict["content"] if OCR: for media in toot_dict["media_attachments"]: if media["type"] == "image": try: with urllib.request.urlopen(media["url"]) as image: ocr_text = pytesseract.image_to_string(Image.open(image)) #print("ocr_text =", ocr_text) words = [match[0] for match in filter(lambda match: len(match[1]) > 0, re.findall(TOKEN, ocr_text))] #print("words =", words) tokens = ["ocr-" + word.lower() for word in words] #print("tokens =", tokens) body += "\n\n" + " ".join(tokens) except Exception: print("Skipping OCR on attachment due to exception") print(traceback.format_exc()) return bogofilter.Mail(headers = headers, body = body) bot = Bot(CringeBotClient, loaded_config) del loaded_config bot.start() while True: time.sleep(1)