A bot that tracks and auto-deletes statuses on Mastodon/Pleroma accounts after a set time if they are cringe enough
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

334 lines
13 KiB

import os
import sys
import time
from datetime import datetime, timezone, timedelta
import json
import threading
import bogofilter
import html2text
from collections import deque
from mastodon import Mastodon, MastodonNotFoundError
from bot import Bot, BotClient
def encode_time(dt):
return int(dt.strftime("%Y%m%d%H%M"))
def decode_time(value):
if len(value) == 12:
return dt.strptime(str(value), "%Y%m%d%H%M")
else:
return dt.strptime(str(value), "%Y%m%d%H")
class CringeBotClient(BotClient):
def __init__(self, bot, config):
super().__init__(bot, config)
self.h2t = html2text.HTML2Text()
self.h2t.ignore_links = True
self.spawner_thread = threading.Thread(
target = self.spawner,
name = self.config["name"] + " spawner",
args = (),
kwargs = {},
daemon = True)
self.purger_thread = threading.Thread(
target = self.purger,
name = self.config["name"] + " purger",
args = (),
kwargs = {},
daemon = True)
def on_start(self):
self.spawner_thread.start()
def spawner(self):
if not learning:
self.purger_thread.start()
while True:
self.tracker_report()
time.sleep(60)
def respond(self, status, message):
self.log("Responded with:")
self.log(message)
self.api.status_reply(status, "{}\n{}".format(message, self.config["tag"]), visibility = "direct", untag = True)
time.sleep(1)
def on_status(self, status):
if status["account"]["id"] != self.api.me()["id"]:
return
if status["reblog"]:
return
md_text = self.h2t.handle(status["content"])
if self.config["tag"] in md_text.split():
return
mail_text = toot_dict_to_mail(status).format()
preview = toot_dict_to_mail(status)
preview.body = md_text
preview_text = preview.format()
if learning:
self.log(preview_text)
self.log()
category = None
while not category in bogofilter.categories:
category = input("H(am), S(pam) or U(nknown)? ").upper()
if category != bogofilter.UNSURE:
bogofilter.run(mail_text, [category])
if category == bogofilter.SPAM:
self.track_status(status)
self.log()
else:
replied_id = status.get("in_reply_to_id", None)
if replied_id:
try:
replied_status = self.api.status(replied_id)
replied_tokens = self.h2t.handle(replied_status["content"]).split()
if self.config["tag"] in replied_tokens:
target_status_id = replied_status.get("in_reply_to_id", None)
if target_status_id:
try:
target_status = self.api.status(target_status_id)
target_timeslot_key = encode_time(target_status["created_at"])
target_mail_text = toot_dict_to_mail(target_status).format()
command = self.h2t.handle(status["content"]).strip()
tokens = deque(command.split())
self.log("Received command: {}".format(command))
try:
while True:
token = tokens.popleft()
if token == "learn":
token = tokens.popleft()
if token == "spam":
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM])
self.track_status(target_status)
self.respond(status, "Learned as spam")
break
elif token == "ham":
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM])
self.expire_status(target_timeslot_key, target_status_id)
self.respond(status, "Learned as ham")
break
elif token == "unlearn":
token = tokens.popleft()
if token == "spam":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM])
self.expire_status(target_timeslot_key, target_status_id)
self.respond(status, "Unlearned as spam")
break
elif token == "ham":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM])
self.expire_status(target_timeslot_key, target_status_id)
self.respond(status, "Unlearned as spam")
break
elif token == "relearn":
token = tokens.popleft()
if token == "spam":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM])
self.track_status(target_status)
self.respond(status, "Relearned as spam")
break
elif token == "ham":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM])
self.expire_status(target_timeslot_key, target_status_id)
self.respond(status, "Relearned as as ham")
break
except IndexError:
self.respond(status, "Invalid command")
except MastodonNotFoundError:
self.respond(status, "Original status is missing")
else:
self.respond(status, "Original status is missing")
return
except MastodonNotFoundError:
pass
result = bogofilter.run(mail_text, [bogofilter.CLASSIFY, bogofilter.REGISTER])
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score))
if result.category == bogofilter.SPAM:
self.log("SPAM: Tracking status with ID {} as spam".format(status["id"]))
self.respond(status, "Categorised as spam\n{}".format(bogo_report))
self.track_status(status)
elif result.category == bogofilter.UNSURE:
self.log("UNSURE: Not tracking status with ID {} as spam".format(status["id"]))
self.respond(status, "Categorised as unsure\n{}".format(bogo_report))
else:
self.log("HAM: Not tracking status with ID {} as spam".format(status["id"]))
self.respond(status, "Categorised as ham\n{}".format(bogo_report))
self.log()
self.log(preview_text)
self.log()
def purger(self):
while True:
try:
deleted = False
timeslot_key, status_id = self.next_expired()
if not timeslot_key is None:
try:
self.log("Deleting status {} in timeslot {}".format(status_id, timeslot_key))
self.api.status_delete(status_id)
deleted = True
except MastodonNotFoundError:
self.log("Cannot find status {} on server".format(status_id))
self.expire_status(timeslot_key, status_id)
if deleted:
time.sleep(60)
else:
time.sleep(1)
except:
self.log(traceback.format_exc())
time.sleep(60)
def on_load_state(self):
state = super().on_load_state()
state["timeslots"] = state.get("timeslots", {})
state["timeslots"] = dict(map(lambda kv: (int(kv[0]), set(kv[1])), state["timeslots"]))
return state
def on_save_state(self, state):
state["timeslots"] = list(map(lambda kv: [kv[0], list(kv[1])], state["timeslots"].items()))
super().on_save_state(state)
def tracker_report(self):
with self.state_lock:
total_timeslots = len(self.state["timeslots"])
total_statuses = 0
for timeslot_key, status_ids in self.state["timeslots"].items():
total_statuses += len(status_ids)
self.log("Tracking {} statuses across {} timeslots".format(total_statuses, total_timeslots))
def track_status(self, status):
status_id = str(status["id"])
timeslot_key = encode_time(status["created_at"])
with self.state_lock:
if status["reblog"] is None:
timeslots = self.state["timeslots"]
if not timeslot_key in timeslots:
timeslots[timeslot_key] = set()
timeslots[timeslot_key].add(status_id)
def next_expired(self):
now = datetime.now(timezone.utc)
min_timeslot_key = encode_time(now - timedelta(minutes = self.config["max_age"]))
with self.state_lock:
timeslot_key, status_ids = next(iter(self.state["timeslots"].items()), (None, None))
if not timeslot_key is None and timeslot_key < min_timeslot_key:
status_id = next(iter(status_ids), None)
else:
timeslot_key = None
status_id = None
return (timeslot_key, status_id)
def expire_status(self, timeslot_key, status_id):
with self.state_lock:
timeslots = self.state["timeslots"]
if timeslot_key in timeslots:
if status_id in timeslots[timeslot_key]:
self.log("Expiring status {} from timeslot {}".format(status_id, timeslot_key))
timeslots[timeslot_key].remove(status_id)
else:
self.log("Cannot expire missing status {} from timeslot {}".format(
status_id, timeslot_key))
if len(timeslots[timeslot_key]) == 0:
self.log("Removing empty timeslot {}".format(timeslot_key))
del timeslots[timeslot_key]
else:
self.log("Cannot expire status {} from missing timeslot {}".format(
status_id, timeslot_key))
def toot_dict_to_mail(toot_dict):
flags = []
if toot_dict.get("sensitive", False):
flags.append("sensitive")
if toot_dict.get("poll", False):
flags.append("poll")
if toot_dict.get("reblog", False):
flags.append("reblog")
if toot_dict.get("reblogged", False):
flags.append("reblogged")
if toot_dict.get("favourited", False):
flags.append("favourited")
if toot_dict.get("bookmarked", False):
flags.append("bookmarked")
if toot_dict.get("pinned", False):
flags.append("pinned")
flags = ", ".join(flags)
headers = {}
if toot_dict.get("account") and toot_dict["account"].get("acct"):
headers["From"] = toot_dict["account"]["acct"]
if toot_dict.get("created_at"):
headers["Date"] = toot_dict["created_at"]
if toot_dict.get("visibility"):
headers["X-Visibility"] = toot_dict["visibility"]
if len(flags) > 0:
headers["X-Flags"] = flags
if toot_dict.get("spoiler_text"):
headers["Subject"] = toot_dict["spoiler_text"]
if toot_dict.get("replies_count", 0) > 0:
headers["X-Replies-Count"] = toot_dict["replies_count"]
if len(toot_dict.get("media_attachments", [])) > 0:
headers["X-Attachments-Count"] = len(toot_dict["media_attachments"])
if toot_dict.get("reblogs_count", 0) > 0:
headers["X-Reblogs-Count"] = toot_dict["reblogs_count"]
if toot_dict.get("favourites_count", 0) > 0:
headers["X-Favourites-Count"] = toot_dict["favourites_count"]
if toot_dict.get("content") and len(toot_dict["content"]) > 0:
body = toot_dict["content"]
else:
body = None
return bogofilter.Mail(headers = headers, body = body)
learning = "-l" in sys.argv[1:]
with open("config.json") as json_file:
bot = Bot(CringeBotClient, json.load(json_file))
bot.start()
while True:
time.sleep(1)