A bot that tracks and auto-deletes statuses on Mastodon/Pleroma accounts after a set time if they are cringe enough
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

320 lines
12 KiB

import os
import sys
import time
import sched
from datetime import datetime, timezone, timedelta
import json
import bogofilter
import html2text
from collections import deque
import toml
from mastodon import Mastodon, MastodonNotFoundError
from bot import Bot, BotClient
SEASON = {
**{ i : "spring" for i in range(3, 6) },
**{ i : "summer" for i in range(6, 9) },
**{ i : "autumn" for i in range(9, 12) },
**{ i : "winter" for i in [12, 1, 2] }}
TIME_OF_DAY = {
**{ i : "night" for i in range(0, 4) },
**{ i : "early" for i in range(4, 8) },
**{ i : "morning" for i in range(8, 12) },
**{ i : "afternoon" for i in range(12, 18) },
**{ i : "evening" for i in range(18, 24) }}
class CringeBotClient(BotClient):
def __init__(self, bot, config):
# Initialise HTML-to-Markdown converter
self.h2t = html2text.HTML2Text()
self.h2t.ignore_links = True
# Create scheduler for deferred deletion of posts
self.deletion_scheduler = sched.scheduler(time.time, time.sleep)
super().__init__(bot, {"register": False, **config})
# Send DM reply to message, appropriately tagged, and schedules it for deferred deletion
def respond(self, status, message, context):
self.log("Responding with:")
self.log(message)
self.log()
reply = self.api.status_reply(status, message, visibility = "direct", untag = True)
self.state["own"][reply["id"]] = context
self.enqueue_deletion(reply["id"])
time.sleep(self.config["rate_limit"])
def on_start(self):
self.deletion_report()
def on_poll(self):
# Perform any scheduled deletes
self.deletion_scheduler.run(blocking = False)
def set_cringe(self, status_id):
self.state["cringe"].add(status_id)
self.state["based"].discard(status_id)
self.state["unsure"].discard(status_id)
def set_based(self, status_id):
self.state["cringe"].discard(status_id)
self.state["based"].add(status_id)
self.state["unsure"].discard(status_id)
def set_unsure(self, status_id):
self.state["cringe"].discard(status_id)
self.state["based"].discard(status_id)
self.state["unsure"].add(status_id)
def set_discard(self, status_id):
self.state["cringe"].discard(status_id)
self.state["based"].discard(status_id)
self.state["unsure"].discard(status_id)
# Look for replies to the bot and return True if commands were processed
def process_commands(self, status):
status_id = status["id"]
parent_id = status["in_reply_to_id"]
# Check if bot owns the parent status
if parent_id not in self.state["own"]:
return False
context = self.state["own"][parent_id]
event = context["event"]
target_id = context["target"]
# Enqueue command status for deletion
self.enqueue_deletion(status_id)
try:
command = self.h2t.handle(status["content"]).strip()
self.log("Received command: {}".format(command))
# Fetch the target status
target_status = self.api.status(target_id)
target_mail_text = toot_dict_to_mail(target_status).format()
# Check if target status was previously classified
was_cringe = target_id in self.state["cringe"]
was_based = target_id in self.state["based"]
was_unsure = target_id in self.state["unsure"]
tokens = deque(command.split())
while True:
token = tokens.popleft()
if token == "cringe":
if event not in ["categorise", "learn"]:
self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id})
return True
elif was_cringe:
break
elif was_based:
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM])
else:
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM])
self.set_cringe(target_id)
self.enqueue_deletion(target_id)
self.respond(status, "Learned as cringe", {"event": "learn", "target": target_id})
break
elif token == "based":
if event not in ["categorise", "learn"]:
self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id})
return True
elif was_based:
break
elif was_cringe:
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM])
else:
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM])
self.set_based(target_id)
self.unqueue_deletion(target_id)
self.respond(status, "Learned as based", {"event": "learn", "target": target_id})
break
elif token == "unlearn":
if event not in ["categorise", "learn"]:
self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id})
return True
elif was_unsure:
break
elif was_cringe:
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM])
elif was_based:
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM])
self.set_unsure(target_id)
self.unqueue_deletion(target_id)
self.respond(status, "Unlearned", {"event": "learn", "target": target_id})
break
except IndexError:
self.respond(status, "Invalid command", {"event": "error", "type": "syntax", "target": target_id})
except MastodonNotFoundError:
self.respond(status, "Could not fetch target status", {"event": "error", "type": "fetch", "target": target_id})
return True
def on_status(self, status):
# Ignore statuses from other accounts
if status["account"]["id"] != self.api.me()["id"]:
return
# Ignore statuses this account boosts
if status["reblog"]:
return
status_id = status["id"]
# Ignore bot's own statuses
if status_id in self.state["own"]:
return
# Create faux HTML email of status
mail_text = toot_dict_to_mail(status).format()
# Format and log plain-text preview
md_text = self.h2t.handle(status["content"])
preview = toot_dict_to_mail(status)
preview.body = md_text
preview_text = preview.format()
self.log()
self.log(preview_text)
self.log()
# Process any commands
if self.process_commands(status):
return
result = bogofilter.run(mail_text, [bogofilter.CLASSIFY, bogofilter.REGISTER] if self.config["register"] else [bogofilter.CLASSIFY])
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score))
if result.category == bogofilter.SPAM:
self.log("CRINGE: Enqueuing status {} for deletion".format(status_id))
if self.config["register"]:
self.set_cringe(status_id)
self.enqueue_deletion(status_id)
self.respond(status, "Categorised as cringe\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id})
elif result.category == bogofilter.HAM:
self.log("BASED: Not enqueueing status {} for deletion".format(status_id))
if self.config["register"]:
self.set_based(status_id)
self.respond(status, "Categorised as based\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id})
else:
self.log("UNSURE: Not enqueueing status {} for deletion".format(status_id))
if self.config["register"]:
self.set_unsure(status_id)
self.respond(status, "Categorised as unsure\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id})
def on_load_state(self):
state = {
"deletion_queue": {},
"cringe": [],
"based": [],
"unsure": [],
"own": {},
**super().on_load_state()}
state["cringe"] = set(state["cringe"])
state["based"] = set(state["based"])
state["unsure"] = set(state["unsure"])
for status_id, params in state["deletion_queue"].items():
# Load deletion queue into scheduler
params["scheduler_event"] = self.deletion_scheduler.enterabs(datetime.fromisoformat(params["time"]).timestamp(), 1, self.queued_delete, argument=(status_id,))
return state
def on_save_state(self, state):
state["cringe"] = list(state["cringe"])
state["based"] = list(state["based"])
state["unsure"] = list(state["unsure"])
# Transform deletion scheduler queue to a JSON friendly format
state["deletion_queue"] = {event.argument[0]: {"time": datetime.fromtimestamp(event.time, timezone.utc).isoformat()} for event in self.deletion_scheduler.queue}
super().on_save_state(state)
def deletion_report(self):
self.log("{} status(es) queued for deletion".format(len(self.deletion_scheduler.queue)))
def enqueue_deletion(self, status_id, delay = None):
if delay is None:
delay = 60 * self.config["max_age"]
self.state["deletion_queue"][status_id] = {"scheduler_event": self.deletion_scheduler.enter(delay, 1, self.queued_delete, argument=(status_id,), kwargs={})}
self.deletion_report()
def unqueue_deletion(self, status_id):
try:
params = self.state["deletion_queue"].pop(status_id)
self.deletion_scheduler.cancel(params["scheduler_event"])
except KeyError:
self.log("Cannot unqueue non-queued status {} for deletion".format(status_id))
self.deletion_report()
def queued_delete(self, status_id):
try:
self.log("Deleting status {}".format(status_id))
self.api.status_delete(status_id)
self.set_discard(status_id)
if status_id in self.state["own"]:
del self.state["own"][status_id]
except MastodonNotFoundError:
self.log("Cannot find status {} on server".format(status_id))
self.set_discard(status_id)
except Exception:
self.log(traceback.format_exc())
self.enqueue_deletion(status_id, 300)
def toot_dict_to_mail(toot_dict):
flags = []
flags.append(toot_dict["visibility"])
if toot_dict["sensitive"]:
flags.append("sensitive")
if toot_dict["poll"]:
flags.append("poll")
if len(toot_dict["media_attachments"]) > 0:
flags.append("attachments")
time = []
now = datetime.now()
time.append(SEASON[now.month])
time.append(TIME_OF_DAY[now.hour])
headers = {}
headers["From"] = toot_dict["account"]["acct"]
headers["X-Flags"] = ", ".join(flags)
headers["X-Time"] = ", ".join(time)
if len(toot_dict["spoiler_text"]) > 0:
headers["Subject"] = toot_dict["spoiler_text"]
body = toot_dict["content"]
return bogofilter.Mail(headers = headers, body = body)
bot = Bot(CringeBotClient, toml.load("config.toml"))
#with open("config.json") as json_file:
# bot = Bot(CringeBotClient, json.load(json_file))
bot.start()
while True:
time.sleep(1)