import os import time from datetime import datetime, timezone, timedelta import json import pprint import threading import traceback from mastodon import Mastodon, MastodonNotFoundError def log_print(source, text): prefix = "{}: ".format(source) text = (prefix + text.strip()).replace("\n", "\n" + prefix) print(text) def log_pprint(source, obj): log_print(source, pprint.pformat(obj)) def encode_time(dt): return int(dt.strftime("%Y%m%d%H%M")) def decode_time(value): if len(value) == 12: return dt.strptime(str(value), "%Y%m%d%H%M") else: return dt.strptime(str(value), "%Y%m%d%H") class Instance: def __init__(self, name, config): self.name = name self.config = config self.base_url = "https://{}".format(name) self.client_file = "secret/{}.client".format(name) self.user_file = "secret/{}.user".format(name) self.state_file = "state/{}.state".format(name) self.state_lock = threading.Lock() self.spawner_thread = threading.Thread( target = self.spawner, name = self.name + " spawner", args = (), kwargs = {}, daemon = True) self.tracker_thread = threading.Thread( target = self.tracker, name = self.name + " tracker", args = (), kwargs = {}, daemon = True) self.purger_thread = threading.Thread( target = self.purger, name = self.name + " purger", args = (), kwargs = {}, daemon = True) def setup(self): if not os.path.exists(self.client_file): Mastodon.create_app( 'MastodonDeleter', api_base_url = self.base_url, to_file = self.client_file) if not os.path.exists(self.user_file): api = Mastodon( api_base_url = self.base_url, client_id = self.client_file) auth_url = api.auth_request_url() print("Go to:") print(auth_url) print() auth_code = input("Enter code: ") print() api.log_in(code = auth_code, to_file = self.user_file) def start(self): self.spawner_thread.start() def spawner(self): self.load_state() self.api = Mastodon( access_token = self.user_file, api_base_url = self.base_url) self.tracker_thread.start() self.purger_thread.start() while True: self.tracker_report() time.sleep(60) def tracker(self): my_id = self.api.me()["id"] while True: try: self.state_lock.acquire() self.state_lock.release() statuses = self.api.account_statuses(my_id, min_id = self.state["min_id"]) while not statuses is None and len(statuses) > 0: log_print(self.name, "Found {} new status(es)".format(len(statuses))) for status in sorted(statuses, key = lambda status: status["created_at"]): self.track_status(status) self.state["min_id"] = status["id"] self.save_state() statuses = self.api.fetch_previous(statuses) # Rate limit (max 300 requests per 5 minutes, i.e. 1 per second) time.sleep(1) except: log_print(self.name, traceback.format_exc()) time.sleep(60) def purger(self): while True: try: deleted = False timeslot_key, status_id = self.next_expired() if not timeslot_key is None: try: log_print(self.name, "Inspecting status {} in timeslot {}".format(status_id, timeslot_key)) status = self.api.status(status_id) if status["favourited"]: log_print(self.name, "Keeping favourited status {} in timeslot {}".format(status_id, timeslot_key)) else: log_print(self.name, "Deleting status {} in timeslot {}".format(status_id, timeslot_key)) self.api.status_delete(status_id) deleted = True except MastodonNotFoundError: log_print(self.name, "Cannot find status {} on server".format(status_id)) self.expire_status(timeslot_key, status_id) if deleted: time.sleep(60) else: time.sleep(1) except: log_print(self.name, traceback.format_exc()) time.sleep(60) def load_state(self): self.state_lock.acquire() if not os.path.exists(self.state_file): self.state = dict( min_id = "0", timeslots = {}) else: with open(self.state_file) as json_file: self.state = json.load(json_file) self.state["timeslots"] = dict(map(lambda kv: (int(kv[0]), set(kv[1])), self.state["timeslots"])) self.state_lock.release() def save_state(self): self.state_lock.acquire() json_state = self.state.copy() json_state["timeslots"] = list(map(lambda kv: [kv[0], list(kv[1])], json_state["timeslots"].items())) self.state_lock.release() with open(self.state_file, "w") as json_file: json.dump(json_state, json_file, indent = 4) def tracker_report(self): self.state_lock.acquire() total_timeslots = len(self.state["timeslots"]) total_statuses = 0 for timeslot_key, status_ids in self.state["timeslots"].items(): total_statuses += len(status_ids) self.state_lock.release() log_print(self.name, "Tracking {} statuses across {} timeslots".format( total_statuses, total_timeslots)) def track_status(self, status): status_id = str(status["id"]) timeslot_key = encode_time(status["created_at"]) self.state_lock.acquire() if status["reblog"] is None and not status["visibility"] is "direct": timeslots = self.state["timeslots"] if not timeslot_key in timeslots: timeslots[timeslot_key] = set() timeslots[timeslot_key].add(status_id) self.state_lock.release() def next_expired(self): now = datetime.now(timezone.utc) min_timeslot_key = encode_time(now - timedelta(minutes = self.config["max_age"])) self.state_lock.acquire() timeslot_key, status_ids = next(iter(self.state["timeslots"].items()), (None, None)) if not timeslot_key is None and timeslot_key < min_timeslot_key: status_id = next(iter(status_ids), None) else: timeslot_key = None status_id = None self.state_lock.release() return (timeslot_key, status_id) def expire_status(self, timeslot_key, status_id): self.state_lock.acquire() timeslots = self.state["timeslots"] if timeslot_key in timeslots: if status_id in timeslots[timeslot_key]: log_print(self.name, "Expiring status {} from timeslot {}".format(status_id, timeslot_key)) timeslots[timeslot_key].remove(status_id) else: log_print(self.name, "Cannot expire missing status {} from timeslot {}".format( status_id, timeslot_key)) if len(timeslots[timeslot_key]) == 0: log_print(self.name, "Removing empty timeslot {}".format(timeslot_key)) del timeslots[timeslot_key] else: log_print(self.name, "Cannot expire status {} from missing timeslot {}".format( status_id, timeslot_key)) self.state_lock.release() self.save_state() with open("config.json") as json_file: config = json.load(json_file) instances = {} for name in config["instances"]: instances[name] = Instance(name = name, config = config) instances[name].setup() start_interval = 60.0 / len(config["instances"]) for instance in instances.values(): instance.start() time.sleep(start_interval) while True: time.sleep(1)