A bot that tracks and auto-deletes statuses on Mastodon/Pleroma accounts after a set time if they are cringe enough
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

405 lines
16 KiB

import os
import sys
import time
import sched
from datetime import datetime, timezone, timedelta
from collections import deque
import toml
import re
import traceback
import bogofilter
import html2text
import urllib
config_path = os.path.join(os.path.dirname(sys.argv[0]), "config.toml")
loaded_config = {
"name": "cringebot",
"ocr": False,
**toml.load(config_path)}
OCR = loaded_config["ocr"]
# TODO: Move OCR support to separate module
if OCR:
from PIL import Image
import pytesseract
from mastodon import Mastodon, MastodonNotFoundError
from fedbot.bot import Bot, BotClient
SEASON = {
**{ i : "spring" for i in range(3, 6) },
**{ i : "summer" for i in range(6, 9) },
**{ i : "autumn" for i in range(9, 12) },
**{ i : "winter" for i in [12, 1, 2] }}
TIME_OF_DAY = {
**{ i : "night" for i in range(0, 4) },
**{ i : "early" for i in range(4, 8) },
**{ i : "morning" for i in range(8, 12) },
**{ i : "afternoon" for i in range(12, 18) },
**{ i : "evening" for i in range(18, 24) }}
class CringeBotClient(BotClient):
def __init__(self, bot, config):
config = {
"app_name": "Cringebot",
"rate_limit": 3,
"retry_rate": 60,
"poll_interval": 15,
"max_age": 600,
"db_dir": ".",
"cringe_dir": "data/cringe",
"based_dir": "data/based",
"unsure_dir": "data/unsure",
"register": False,
**config}
self.db_dir = os.path.join(os.path.dirname(sys.argv[0]), config["db_dir"])
# Initialise HTML-to-Markdown converter
self.h2t = html2text.HTML2Text()
self.h2t.ignore_links = True
# Create scheduler for deferred deletion of posts
self.deletion_scheduler = sched.scheduler(time.time, time.sleep)
super().__init__(bot, config)
# Send DM reply to message, appropriately tagged, and schedules it for deferred deletion
def respond(self, status, message, context):
self.log("Responding with:")
self.log(message)
self.log()
reply = self.api.status_reply(status, message, visibility = "direct", untag = True)
self.state["own"][reply["id"]] = context
self.enqueue_deletion(reply["id"])
time.sleep(self.config["rate_limit"])
def on_start(self):
self.deletion_report()
def on_poll(self):
# Perform any scheduled deletes
self.deletion_scheduler.run(blocking = False)
def get_cringe_path(self, status_id):
return os.path.join(os.path.dirname(sys.argv[0]), self.config["cringe_dir"], status_id)
def get_based_path(self, status_id):
return os.path.join(os.path.dirname(sys.argv[0]), self.config["based_dir"], status_id)
def get_unsure_path(self, status_id):
return os.path.join(os.path.dirname(sys.argv[0]), self.config["unsure_dir"], status_id)
def is_cringe(self, status_id):
path = self.get_cringe_path(status_id)
return path if os.path.isfile(path) else None
def is_based(self, status_id):
path = self.get_based_path(status_id)
return path if os.path.isfile(path) else None
def is_unsure(self, status_id):
path = self.get_unsure_path(status_id)
return path if os.path.isfile(path) else None
def get_category_path(self, status_id):
return self.is_cringe(status_id) or self.is_based(status_id) or self.is_unsure(status_id) or None
def delete_and_write(self, delete_path, write_path, text, mode = "a+"):
if delete_path and os.path.isfile(delete_path):
os.remove(delete_path)
os.makedirs(os.path.dirname(write_path), exist_ok = True)
with open(write_path, mode, encoding = "utf-8") as text_file:
text_file.write(text)
def make_cringe(self, status_id, text):
self.delete_and_write(self.get_category_path(status_id), self.get_cringe_path(status_id), text + "\n")
def make_based(self, status_id, text):
self.delete_and_write(self.get_category_path(status_id), self.get_based_path(status_id), text + "\n")
def make_unsure(self, status_id, text):
self.delete_and_write(self.get_category_path(status_id), self.get_unsure_path(status_id), text + "\n")
# Look for replies to the bot and return True if commands were processed
def process_commands(self, status):
status_id = status["id"]
parent_id = status["in_reply_to_id"]
# Check if bot owns the parent status
if parent_id not in self.state["own"]:
return False
context = self.state["own"][parent_id]
event = context["event"]
target_id = context["target"]
# Enqueue command status for deletion
self.enqueue_deletion(status_id)
try:
command = self.h2t.handle(status["content"]).strip()
self.log("Received command: {}".format(command))
# Fetch the target status
target_status = self.api.status(target_id)
target_mail = toot_dict_to_mail(target_status)
target_mail_text = target_mail.format()
# TODO: Move status classification system to separate module for use in other bots
tokens = deque(command.split())
while True:
token = tokens.popleft()
if token == "cringe":
if len(target_mail.body) == 0:
self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id})
return True
elif event not in ["categorise", "learn"]:
self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id})
break
elif self.is_cringe(target_id):
self.respond(status, "Status was already learned as cringe", {"event": "error", "type": "redundant", "target": target_id})
break
elif self.is_based(target_id):
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM])
else:
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.LEARN_SPAM])
self.make_cringe(target_id, target_mail_text)
self.enqueue_deletion(target_id)
self.respond(status, "Learned as cringe", {"event": "learn", "target": target_id})
break
elif token == "based":
if len(target_mail.body) == 0:
self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id})
return True
elif event not in ["categorise", "learn"]:
self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id})
break
elif self.is_based(target_id):
self.respond(status, "Status was already learned as based", {"event": "error", "type": "redundant", "target": target_id})
break
elif self.is_cringe(target_id):
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM])
else:
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.LEARN_HAM])
self.make_based(target_id, target_mail_text)
self.unqueue_deletion(target_id)
self.respond(status, "Learned as based", {"event": "learn", "target": target_id})
break
elif token == "unlearn":
if len(target_mail.body) == 0:
self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id})
return True
elif event not in ["categorise", "learn"]:
self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id})
break
elif self.is_unsure(target_id):
self.respond(status, "Status was already unsure", {"event": "error", "type": "redundant", "target": target_id})
break
elif self.is_cringe(target_id):
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_SPAM])
elif self.is_based(target_id):
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_HAM])
self.make_unsure(target_id, target_mail_text)
self.unqueue_deletion(target_id)
self.respond(status, "Unlearned", {"event": "learn", "target": target_id})
break
except IndexError:
self.respond(status, "Invalid command", {"event": "error", "type": "syntax", "target": target_id})
except MastodonNotFoundError:
self.respond(status, "Could not fetch target status", {"event": "error", "type": "fetch", "target": target_id})
return True
def on_status(self, status):
# Ignore statuses from other accounts
if status["account"]["id"] != self.api.me()["id"]:
return
# Ignore statuses this account boosts
if status["reblog"]:
return
status_id = status["id"]
#if "Categorised as" in status["content"] and status["visibility"] == "direct":
# print("Deleting own status", status["id"])
# self.enqueue_deletion(status_id)
# return
#else:
# return
# Ignore bot's own statuses
if status_id in self.state["own"]:
return
# Create faux HTML email of status
mail = toot_dict_to_mail(status)
mail_text = mail.format()
self.log()
self.log(mail_text)
self.log()
if len(mail.body) == 0:
self.log("Not classifying {} because it has no content".format(status_id))
return
# Process any commands
if self.process_commands(status):
return
result = bogofilter.run(mail_text, db_dir = self.db_dir, actions = [bogofilter.CLASSIFY, bogofilter.REGISTER] if self.config["register"] else [bogofilter.CLASSIFY])
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score))
if result.category == bogofilter.SPAM:
self.log("CRINGE: Enqueuing status {} for deletion".format(status_id))
if self.config["register"]:
self.make_cringe(status_id, mail_text)
self.enqueue_deletion(status_id)
self.respond(status, "Categorised as cringe\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id})
elif result.category == bogofilter.HAM:
self.log("BASED: Not enqueueing status {} for deletion".format(status_id))
if self.config["register"]:
self.make_based(status_id, mail_text)
self.respond(status, "Categorised as based\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id})
else:
self.log("UNSURE: Not enqueueing status {} for deletion".format(status_id))
if self.config["register"]:
self.make_unsure(status_id, mail_text)
self.respond(status, "Categorised as unsure\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id})
def on_load_state(self):
state = {
"deletion_queue": {},
"own": {},
**super().on_load_state()}
for status_id, params in state["deletion_queue"].items():
# Load deletion queue into scheduler
params["scheduler_event"] = self.deletion_scheduler.enterabs(datetime.fromisoformat(params["time"]).timestamp(), 1, self.queued_delete, argument=(status_id,))
return state
def on_save_state(self, state):
# Transform deletion scheduler queue to a JSON friendly format
state["deletion_queue"] = {event.argument[0]: {"time": datetime.fromtimestamp(event.time, timezone.utc).isoformat()} for event in self.deletion_scheduler.queue}
super().on_save_state(state)
def deletion_report(self):
self.log("{} status(es) queued for deletion".format(len(self.deletion_scheduler.queue)))
def enqueue_deletion(self, status_id, delay = None):
if delay is None:
delay = 60 * self.config["max_age"]
self.state["deletion_queue"][status_id] = {"scheduler_event": self.deletion_scheduler.enter(delay, 1, self.queued_delete, argument=(status_id,), kwargs={})}
self.deletion_report()
def unqueue_deletion(self, status_id):
try:
params = self.state["deletion_queue"].pop(status_id)
self.deletion_scheduler.cancel(params["scheduler_event"])
except KeyError:
self.log("Cannot unqueue non-queued status {} for deletion".format(status_id))
self.deletion_report()
def queued_delete(self, status_id):
try:
self.log("Deleting status {}".format(status_id))
self.api.status_delete(status_id)
if status_id in self.state["own"]:
del self.state["own"][status_id]
except MastodonNotFoundError:
self.log("Cannot find status {} on server".format(status_id))
except Exception:
self.log(traceback.format_exc())
self.enqueue_deletion(status_id, 300)
# Bogofilter lexer uses these regular expressions:
# FRONT_CHAR [^[:blank:][:cntrl:][:digit:][:punct:]]
# MID_CHAR [^[:blank:][:cntrl:]:$*<>;=()&%#@+|/\\{}^\"?,\[\]]
# BACK_CHAR [^[:blank:][:cntrl:]:$*<>;=()&%#@+|/\\{}^\"?,\[\]._~\'\`\-]
# TOKEN {FRONT_CHAR}({MID_CHAR}*{BACK_CHAR})?
# C standard library defines these character classes:
# ispunct() !"#$%&'()*+,-./:;<=>?@[\]^_`{|}
# iscntrl() Between ASCII codes 0x00 (NUL) and 0x1f (US), plus 0x7f (DEL)
# Contruct regular expression for tokens based on the above:
PUNCT = r"""!:"#\$%&'\(\)\*\+,\-\./:;<=>\?@\[\\\]\^_`\{\|\}"""
CNTRL = r"\x00-\x1f\x7f"
FRONT_CHAR = rf"[^\s{CNTRL}\d{PUNCT}]"
MID_CHAR = rf"""[^\s{CNTRL}:\$\*<>;=\(\)&%#@\+\|/\\\{{\}}\^"\?,\[\]]"""
BACK_CHAR = rf"""[^\s{CNTRL}:\$\*<>;=\(\)&%#@\+\|/\\\{{\}}\^"\?,\[\]\._~'`\-]"""
TOKEN = f"({FRONT_CHAR}({MID_CHAR}*{BACK_CHAR})?)"
def toot_dict_to_mail(toot_dict):
flags = []
flags.append(toot_dict["visibility"])
if toot_dict["sensitive"]:
flags.append("sensitive")
if toot_dict["poll"]:
flags.append("poll")
if len(toot_dict["media_attachments"]) > 0:
flags.append("attachments")
time = []
now = datetime.now()
time.append(SEASON[now.month])
time.append(TIME_OF_DAY[now.hour])
headers = {}
headers["From"] = toot_dict["account"]["acct"]
headers["X-Flags"] = ", ".join(flags)
headers["X-Time"] = ", ".join(time)
if len(toot_dict["spoiler_text"]) > 0:
headers["Subject"] = toot_dict["spoiler_text"]
body = toot_dict["content"]
if OCR:
for media in toot_dict["media_attachments"]:
if media["type"] == "image":
try:
with urllib.request.urlopen(media["url"]) as image:
ocr_text = pytesseract.image_to_string(Image.open(image))
#print("ocr_text =", ocr_text)
words = [match[0] for match in filter(lambda match: len(match[1]) > 0, re.findall(TOKEN, ocr_text))]
#print("words =", words)
tokens = ["ocr-" + word.lower() for word in words]
#print("tokens =", tokens)
body += "\n\n" + " ".join(tokens)
except Exception:
print("Skipping OCR on attachment due to exception")
print(traceback.format_exc())
return bogofilter.Mail(headers = headers, body = body)
bot = Bot(CringeBotClient, loaded_config)
del loaded_config
bot.start()
while True:
time.sleep(1)