A bot that tracks and auto-deletes statuses on Mastodon/Pleroma accounts after a set time if they are cringe enough
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

439 lines
16 KiB

import os
import sys
import time
from datetime import datetime, timezone, timedelta
import json
import pprint
import threading
import traceback
import bogofilter
import html2text
from mastodon import Mastodon, MastodonNotFoundError
def log_print(source, text = ""):
prefix = "{}: ".format(source)
text = (prefix + text.strip()).replace("\n", "\n" + prefix)
print(text)
def log_pprint(source, obj):
log_print(source, pprint.pformat(obj))
def encode_time(dt):
return int(dt.strftime("%Y%m%d%H%M"))
def decode_time(value):
if len(value) == 12:
return dt.strptime(str(value), "%Y%m%d%H%M")
else:
return dt.strptime(str(value), "%Y%m%d%H")
class Instance:
def __init__(self, name, config):
self.name = name
self.config = config
self.base_url = "https://{}".format(name)
self.client_file = "secret/{}.client".format(name)
self.user_file = "secret/{}.user".format(name)
self.state_file = "state/{}.state".format(name)
self.state_lock = threading.Lock()
self.spawner_thread = threading.Thread(
target = self.spawner,
name = self.name + " spawner",
args = (),
kwargs = {},
daemon = True)
self.tracker_thread = threading.Thread(
target = self.tracker,
name = self.name + " tracker",
args = (),
kwargs = {},
daemon = True)
self.purger_thread = threading.Thread(
target = self.purger,
name = self.name + " purger",
args = (),
kwargs = {},
daemon = True)
def setup(self):
if not os.path.exists(self.client_file):
Mastodon.create_app(
'MastodonDeleter',
api_base_url = self.base_url,
to_file = self.client_file)
if not os.path.exists(self.user_file):
api = Mastodon(
api_base_url = self.base_url,
client_id = self.client_file)
auth_url = api.auth_request_url()
print("Go to:")
print(auth_url)
print()
auth_code = input("Enter code: ")
print()
api.log_in(code = auth_code, to_file = self.user_file)
def start(self):
self.spawner_thread.start()
def spawner(self):
self.load_state()
self.api = Mastodon(
access_token = self.user_file,
api_base_url = self.base_url)
self.tracker_thread.start()
if not learning:
self.purger_thread.start()
while True:
self.tracker_report()
time.sleep(60)
def respond(self, status, message):
log_print(self.name, "Responded with: {}".format(message))
self.api.status_reply(status, "{}\n{}".format(message, config["tag"]), visibility = "direct", untag = True)
def tracker(self):
my_id = self.api.me()["id"]
while True:
try:
self.state_lock.acquire()
self.state_lock.release()
statuses = self.api.account_statuses(my_id, min_id = self.state["min_id"])
h2t = html2text.HTML2Text()
h2t.ignore_links = True
while not statuses is None and len(statuses) > 0:
log_print(self.name, "Found {} new status(es)".format(len(statuses)))
for status in sorted(statuses,
key = lambda status: status["created_at"]):
self.state["min_id"] = status["id"]
if status["reblog"]:
continue
md_text = h2t.handle(status["content"])
if config["tag"] in md_text:
continue
mail_text = toot_dict_to_mail(status).format()
if learning:
preview = toot_dict_to_mail(status)
preview.body = md_text
preview_text = preview.format()
print(preview_text)
print()
category = None
while not category in bogofilter.categories:
category = input("H(am), S(pam) or U(nknown)? ").upper()
if category != bogofilter.UNSURE:
bogofilter.run(mail_text, [category])
if category == bogofilter.SPAM:
self.track_status(status)
print()
self.save_state()
else:
replied_id = status.get("in_reply_to_id", None)
if replied_id:
try:
replied_status = self.api.status(replied_id)
replied_md = h2t.handle(replied_status["content"])
if config["tag"] in replied_md:
target_status_id = replied_status.get("in_reply_to_id", None)
if target_status_id:
try:
target_status = self.api.status(target_status_id)
target_timeslot_key = encode_time(status["created_at"])
target_mail_text = toot_dict_to_mail(target_status).format()
command = h2t.handle(status["content"]).strip()
log_print("Received command: {}".format(command))
if command == "learn spam":
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM])
self.track_status(target_status)
self.respond(status, "Learned as spam")
elif command == "unlearn spam":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM])
self.expire_status(target_timeslot_key, target_status_id)
self.respond(status, "Unlearned as spam")
elif command == "relearn spam":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM])
self.track_status(target_status)
self.respond(status, "Relearned as spam")
elif command == "learn ham":
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM])
self.expire_status(target_timeslot_key, target_status_id)
self.respond(status, "Learned as ham")
elif command == "unlearn ham":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM])
self.respond(status, "Unlearned as ham")
elif command == "relearn ham":
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM])
self.expire_status(target_timeslot_key, target_status_id)
self.respond(status, "Relearned as ham")
else:
self.respond(status, "Unknown command")
except MastodonNotFoundError:
self.respond(status, "Original status is missing")
else:
self.respond(status, "Original status not referenced")
continue
except MastodonNotFoundError:
pass
result = bogofilter.run(mail_text, [bogofilter.CLASSIFY, bogofilter.REGISTER])
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score))
if result.category == bogofilter.SPAM:
log_print(self.name, "SPAM: Tracking status with ID {} as spam".format(status["id"]))
self.respond(status, "Tracked as spam\n{}".format(bogo_report))
time.sleep(1)
else:
log_print(self.name, "HAM: Not tracking status with ID {} as spam".format(status["id"]))
self.respond(status, "Tracked as ham\n{}".format(bogo_report))
time.sleep(1)
print()
print(mail_text)
self.track_status(status)
self.save_state()
statuses = self.api.fetch_previous(statuses)
# Rate limit (max 300 requests per 5 minutes, i.e. 1 per second)
time.sleep(1)
except:
log_print(self.name, traceback.format_exc())
time.sleep(5)
def purger(self):
while True:
try:
deleted = False
timeslot_key, status_id = self.next_expired()
if not timeslot_key is None:
try:
log_print(self.name, "Deleting status {} in timeslot {}".format(status_id, timeslot_key))
self.api.status_delete(status_id)
deleted = True
except MastodonNotFoundError:
log_print(self.name,
"Cannot find status {} on server".format(status_id))
self.expire_status(timeslot_key, status_id)
if deleted:
time.sleep(60)
else:
time.sleep(1)
except:
log_print(self.name, traceback.format_exc())
time.sleep(60)
def load_state(self):
self.state_lock.acquire()
if not os.path.exists(self.state_file):
self.state = dict(
min_id = "0",
timeslots = {})
else:
with open(self.state_file) as json_file:
self.state = json.load(json_file)
self.state["timeslots"] = dict(map(lambda kv: (int(kv[0]), set(kv[1])), self.state["timeslots"]))
self.state_lock.release()
def save_state(self):
self.state_lock.acquire()
json_state = self.state.copy()
json_state["timeslots"] = list(map(lambda kv: [kv[0], list(kv[1])], json_state["timeslots"].items()))
self.state_lock.release()
with open(self.state_file, "w") as json_file:
json.dump(json_state, json_file, indent = 4)
def tracker_report(self):
self.state_lock.acquire()
total_timeslots = len(self.state["timeslots"])
total_statuses = 0
for timeslot_key, status_ids in self.state["timeslots"].items():
total_statuses += len(status_ids)
self.state_lock.release()
log_print(self.name, "Tracking {} statuses across {} timeslots".format(
total_statuses, total_timeslots))
def track_status(self, status):
status_id = str(status["id"])
timeslot_key = encode_time(status["created_at"])
self.state_lock.acquire()
if status["reblog"] is None:
timeslots = self.state["timeslots"]
if not timeslot_key in timeslots:
timeslots[timeslot_key] = set()
timeslots[timeslot_key].add(status_id)
self.state_lock.release()
def next_expired(self):
now = datetime.now(timezone.utc)
min_timeslot_key = encode_time(now - timedelta(minutes = config["max_age"]))
self.state_lock.acquire()
timeslot_key, status_ids = next(iter(self.state["timeslots"].items()), (None, None))
if not timeslot_key is None and timeslot_key < min_timeslot_key:
status_id = next(iter(status_ids), None)
else:
timeslot_key = None
status_id = None
self.state_lock.release()
return (timeslot_key, status_id)
def expire_status(self, timeslot_key, status_id):
self.state_lock.acquire()
timeslots = self.state["timeslots"]
if timeslot_key in timeslots:
if status_id in timeslots[timeslot_key]:
log_print(self.name, "Expiring status {} from timeslot {}".format(status_id, timeslot_key))
timeslots[timeslot_key].remove(status_id)
else:
log_print(self.name, "Cannot expire missing status {} from timeslot {}".format(
status_id, timeslot_key))
if len(timeslots[timeslot_key]) == 0:
log_print(self.name, "Removing empty timeslot {}".format(timeslot_key))
del timeslots[timeslot_key]
else:
log_print(self.name, "Cannot expire status {} from missing timeslot {}".format(
status_id, timeslot_key))
self.state_lock.release()
self.save_state()
def toot_dict_to_mail(toot_dict):
#log_pprint("toot_dict_to_mail", toot_dict)
flags = []
if toot_dict.get("sensitive", False):
flags.append("sensitive")
if toot_dict.get("poll", False):
flags.append("poll")
if toot_dict.get("reblog", False):
flags.append("reblog")
if toot_dict.get("reblogged", False):
flags.append("reblogged")
#if toot_dict.get("favourited", False):
# flags.append("favourited")
if toot_dict.get("bookmarked", False):
flags.append("bookmarked")
if toot_dict.get("pinned", False):
flags.append("pinned")
flags = ", ".join(flags)
headers = {}
if toot_dict.get("account") and toot_dict["account"].get("acct"):
headers["From"] = toot_dict["account"]["acct"]
if toot_dict.get("created_at"):
headers["Date"] = toot_dict["created_at"]
if toot_dict.get("visibility"):
headers["X-Visibility"] = toot_dict["visibility"]
if len(flags) > 0:
headers["X-Flags"] = flags
if toot_dict.get("spoiler_text"):
headers["Subject"] = toot_dict["spoiler_text"]
if toot_dict.get("replies_count", 0) > 0:
headers["X-Replies-Count"] = toot_dict["replies_count"]
if len(toot_dict.get("media_attachments", [])) > 0:
headers["X-Attachments-Count"] = len(toot_dict["media_attachments"])
if toot_dict.get("reblogs_count", 0) > 0:
headers["X-Reblogs-Count"] = toot_dict["reblogs_count"]
if toot_dict.get("favourites_count", 0) > 0:
headers["X-Favourites-Count"] = toot_dict["favourites_count"]
if toot_dict.get("content") and len(toot_dict["content"]) > 0:
body = toot_dict["content"]
else:
body = None
return bogofilter.Mail(headers = headers, body = body)
learning = "-l" in sys.argv[1:]
with open("config.json") as json_file:
config = json.load(json_file)
instances = {}
for name in config["instances"]:
instances[name] = Instance(name = name, config = config)
instances[name].setup()
start_interval = 60.0 / len(config["instances"])
for instance in instances.values():
instance.start()
time.sleep(start_interval)
while True:
time.sleep(1)