A bot that tracks and auto-deletes statuses on Mastodon/Pleroma accounts after a set time if they are cringe enough
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

252 lines
10 KiB

3 years ago
import os
import sys
import time
import sched
3 years ago
from datetime import datetime, timezone, timedelta
import json
import bogofilter
import html2text
from collections import deque
3 years ago
from mastodon import Mastodon, MastodonNotFoundError
from bot import Bot, BotClient
3 years ago
def encode_time(dt):
return int(dt.strftime("%Y%m%d%H%M"))
def decode_time(value):
if len(value) == 12:
return dt.strptime(str(value), "%Y%m%d%H%M")
else:
return dt.strptime(str(value), "%Y%m%d%H")
class CringeBotClient(BotClient):
def __init__(self, bot, config):
self.h2t = html2text.HTML2Text()
self.h2t.ignore_links = True
self.deletion_scheduler = sched.scheduler(time.time, time.sleep)
super().__init__(bot, config)
3 years ago
def on_start(self):
self.deletion_report()
3 years ago
3 years ago
def respond(self, status, message):
self.log("Responded with:")
self.log(message)
reply = self.api.status_reply(status, "{}\n{}".format(message, self.config["tag"]), visibility = "direct", untag = True)
self.enqueue_deletion(reply["id"])
time.sleep(1)
def on_poll(self):
self.deletion_scheduler.run(blocking = False)
def on_status(self, status):
if status["account"]["id"] != self.api.me()["id"]:
return
if status["reblog"]:
return
md_text = self.h2t.handle(status["content"])
if self.config["tag"] in md_text.split():
return
mail_text = toot_dict_to_mail(status).format()
preview = toot_dict_to_mail(status)
preview.body = md_text
preview_text = preview.format()
self.log()
self.log(preview_text)
self.log()
replied_id = status.get("in_reply_to_id", None)
if replied_id:
try:
replied_status = self.api.status(replied_id)
replied_tokens = self.h2t.handle(replied_status["content"]).split()
if self.config["tag"] in replied_tokens:
self.enqueue_deletion(status["id"])
target_status_id = replied_status.get("in_reply_to_id", None)
if target_status_id:
try:
target_status = self.api.status(target_status_id)
target_mail_text = toot_dict_to_mail(target_status).format()
command = self.h2t.handle(status["content"]).strip()
tokens = deque(command.split())
self.log("Received command: {}".format(command))
try:
while True:
token = tokens.popleft()
if token == "learn":
token = tokens.popleft()
if token == "cringe":
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM])
self.enqueue_deletion(target_status_id)
self.respond(status, "Learned as cringe")
break
elif token == "based":
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM])
self.unqueue_deletion(target_status_id)
self.respond(status, "Learned as based")
break
elif token == "unlearn":
token = tokens.popleft()
if token == "cringe":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM])
self.unqueue_deletion(target_status_id)
self.respond(status, "Unlearned as cringe")
break
elif token == "based":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM])
self.unqueue_deletion(target_status_id)
self.respond(status, "Unlearned as cringe")
break
elif token == "relearn":
token = tokens.popleft()
if token == "cringe":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM])
self.enqueue_deletion(target_status_id)
self.respond(status, "Relearned as cringe")
break
elif token == "based":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM])
self.unqueue_deletion(target_status_id)
self.respond(status, "Relearned as as based")
break
except IndexError:
self.respond(status, "Invalid command")
except MastodonNotFoundError:
self.respond(status, "Original status is missing")
else:
self.respond(status, "Original status is missing")
return
except MastodonNotFoundError:
pass
result = bogofilter.run(mail_text, [bogofilter.CLASSIFY, bogofilter.REGISTER])
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score))
if result.category == bogofilter.SPAM:
self.log("CRINGE: Enqueuing status {} for deletion".format(status["id"]))
self.respond(status, "Categorised as cringe\n{}".format(bogo_report))
self.enqueue_deletion(status["id"])
elif result.category == bogofilter.UNSURE:
self.log("UNSURE: Not enqueueing status {} for deletion".format(status["id"]))
self.respond(status, "Categorised as unsure\n{}".format(bogo_report))
else:
self.log("BASED: Not enqueueing status {} for deletion".format(status["id"]))
self.respond(status, "Categorised as based\n{}".format(bogo_report))
def on_load_state(self):
state = {"deletion_queue": {}, **super().on_load_state()}
3 years ago
for status_id, params in state["deletion_queue"].items():
params["scheduler_event"] = self.deletion_scheduler.enterabs(datetime.fromisoformat(params["time"]).timestamp(), 1, self.queued_delete, argument=(status_id,))
3 years ago
return state
3 years ago
def on_save_state(self, state):
state["deletion_queue"] = {event.argument[0]: {"time": datetime.fromtimestamp(event.time, timezone.utc).isoformat()} for event in self.deletion_scheduler.queue}
super().on_save_state(state)
3 years ago
def deletion_report(self):
self.log("{} status(es) queued for deletion".format(len(self.deletion_scheduler.queue)))
3 years ago
def enqueue_deletion(self, status_id):
self.state["deletion_queue"][status_id] = {"scheduler_event": self.deletion_scheduler.enter(60 * self.config["max_age"], 1, self.queued_delete, argument=(status_id,), kwargs={})}
self.save_state()
self.deletion_report()
3 years ago
def unqueue_deletion(self, status_id):
try:
params = state["deletion_queue"].pop(status_id)
self.deletion_scheduler.cancel(params["scheduler_event"])
except KeyError:
self.log("Cannot unqueue non-queued status {} for deletion".format(status_id))
3 years ago
self.deletion_report()
3 years ago
def queued_delete(self, status_id):
try:
self.log("Deleting status {}".format(status_id))
self.api.status_delete(status_id)
except MastodonNotFoundError:
self.log("Cannot find status {} on server".format(status_id))
except Exception:
self.log(traceback.format_exc())
3 years ago
def toot_dict_to_mail(toot_dict):
flags = []
if toot_dict.get("sensitive", False):
flags.append("sensitive")
if toot_dict.get("poll", False):
flags.append("poll")
if toot_dict.get("reblog", False):
flags.append("reblog")
if toot_dict.get("reblogged", False):
flags.append("reblogged")
if toot_dict.get("favourited", False):
flags.append("favourited")
3 years ago
if toot_dict.get("bookmarked", False):
flags.append("bookmarked")
if toot_dict.get("pinned", False):
flags.append("pinned")
flags = ", ".join(flags)
headers = {}
if toot_dict.get("account") and toot_dict["account"].get("acct"):
headers["From"] = toot_dict["account"]["acct"]
if toot_dict.get("created_at"):
headers["Date"] = toot_dict["created_at"]
if toot_dict.get("visibility"):
headers["X-Visibility"] = toot_dict["visibility"]
if len(flags) > 0:
headers["X-Flags"] = flags
if toot_dict.get("spoiler_text"):
headers["Subject"] = toot_dict["spoiler_text"]
if toot_dict.get("replies_count", 0) > 0:
headers["X-Replies-Count"] = toot_dict["replies_count"]
if len(toot_dict.get("media_attachments", [])) > 0:
headers["X-Attachments-Count"] = len(toot_dict["media_attachments"])
if toot_dict.get("reblogs_count", 0) > 0:
headers["X-Reblogs-Count"] = toot_dict["reblogs_count"]
if toot_dict.get("favourites_count", 0) > 0:
headers["X-Favourites-Count"] = toot_dict["favourites_count"]
if toot_dict.get("content") and len(toot_dict["content"]) > 0:
body = toot_dict["content"]
else:
body = None
return bogofilter.Mail(headers = headers, body = body)
with open("config.json") as json_file:
bot = Bot(CringeBotClient, json.load(json_file))
bot.start()
3 years ago
while True:
time.sleep(1)