Automatic status deletion script for Mastodon and Pleroma
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

270 lines
8.2 KiB

import os
import time
from datetime import datetime, timezone, timedelta
import json
import pprint
import threading
import traceback
from mastodon import Mastodon, MastodonNotFoundError
def log_print(source, text):
prefix = "{}: ".format(source)
text = (prefix + text.strip()).replace("\n", "\n" + prefix)
print(text)
def log_pprint(source, obj):
log(source, pprint.pformat(obj))
# Floor datetime to nearest hour
#def floor_dt(dt):
# return dt - timedelta(
# minutes = dt.minute,
# seconds = dt.second,
# microseconds = dt.microsecond)
def encode_time(dt):
return int(dt.strftime("%Y%m%d%H"))
def decode_time(value):
return dt.strptime(str(value), "%Y%m%d%H")
class Instance:
def __init__(self, name, config):
self.name = name
self.config = config
self.base_url = "https://{}".format(name)
self.client_file = "secret/{}.client".format(name)
self.user_file = "secret/{}.user".format(name)
self.state_file = "state/{}.state".format(name)
self.state_lock = threading.Lock()
self.spawner_thread = threading.Thread(
target = self.spawner,
name = self.name + " spawner",
args = (),
kwargs = {},
daemon = True)
self.tracker_thread = threading.Thread(
target = self.tracker,
name = self.name + " tracker",
args = (),
kwargs = {},
daemon = True)
self.purger_thread = threading.Thread(
target = self.purger,
name = self.name + " purger",
args = (),
kwargs = {},
daemon = True)
def setup(self):
if not os.path.exists(self.client_file):
Mastodon.create_app(
'MastodonDeleter',
api_base_url = self.base_url,
to_file = self.client_file)
if not os.path.exists(self.user_file):
api = Mastodon(
api_base_url = self.base_url,
client_id = self.client_file)
auth_url = api.auth_request_url()
print("Go to:")
print(auth_url)
print()
auth_code = input("Enter code: ")
print()
api.log_in(code = auth_code, to_file = self.user_file)
def start(self):
self.spawner_thread.start()
def spawner(self):
self.load_state()
self.api = Mastodon(
access_token = self.user_file,
api_base_url = self.base_url)
self.tracker_thread.start()
self.purger_thread.start()
while True:
self.tracker_report()
time.sleep(60)
def tracker(self):
my_id = self.api.me()["id"]
while True:
try:
self.state_lock.acquire()
min_id = self.state["min_id"]
self.state_lock.release()
statuses = self.api.account_statuses(my_id, min_id = min_id)
while not statuses is None and len(statuses) > 0:
log_print(self.name, "Found {} new status(es)".format(len(statuses)))
for status in sorted(statuses,
key = lambda status: status["created_at"]):
self.track_status(status)
self.save_state()
statuses = self.api.fetch_previous(statuses)
# Rate limit (300 requests per 5 minutes, i.e. 1 per second)
time.sleep(1)
except:
log_print(self.name, traceback.format_exc())
time.sleep(60)
def purger(self):
while True:
try:
timeslot_key, status_id = self.next_expired()
if not timeslot_key is None:
log_print(self.name, "Deleting status {} in timeslot {}".format(status_id, timeslot_key))
try:
self.api.status_delete(status_id)
except MastodonNotFoundError:
log_print(self.name,
"Cannot delete missing status {} from server".format(status_id))
self.expire_status(timeslot_key, status_id)
time.sleep(60)
else:
time.sleep(1)
except:
log_print(self.name, traceback.format_exc())
time.sleep(60)
def load_state(self):
self.state_lock.acquire()
if not os.path.exists(self.state_file):
self.state = dict(
min_id = "0",
timeslots = {})
else:
with open(self.state_file) as json_file:
self.state = json.load(json_file)
self.state["timeslots"] = dict(map(lambda kv: (int(kv[0]), set(kv[1])), self.state["timeslots"]))
self.state_lock.release()
def save_state(self):
self.state_lock.acquire()
json_state = self.state.copy()
json_state["timeslots"] = list(map(lambda kv: [kv[0], list(kv[1])], json_state["timeslots"].items()))
self.state_lock.release()
with open(self.state_file, "w") as json_file:
json.dump(json_state, json_file, indent = 4)
def tracker_report(self):
self.state_lock.acquire()
total_timeslots = len(self.state["timeslots"])
total_statuses = 0
for timeslot_key, status_ids in self.state["timeslots"].items():
total_statuses += len(status_ids)
self.state_lock.release()
log_print(self.name, "Tracking {} statuses across {} timeslots".format(
total_statuses, total_timeslots))
def track_status(self, status):
status_id = str(status["id"])
timeslot_key = encode_time(status["created_at"])
self.state_lock.acquire()
if status["reblog"] is None:
timeslots = self.state["timeslots"]
if not timeslot_key in timeslots:
timeslots[timeslot_key] = set()
timeslots[timeslot_key].add(status_id)
if status_id > self.state["min_id"]:
self.state["min_id"] = status_id
self.state_lock.release()
def next_expired(self):
now = datetime.now(timezone.utc)
min_timeslot_key = encode_time(now - timedelta(hours = self.config["max_age"]))
self.state_lock.acquire()
timeslot_key, status_ids = next(iter(self.state["timeslots"].items()), (None, None))
if not timeslot_key is None and timeslot_key < min_timeslot_key:
status_id = next(iter(status_ids), None)
else:
timeslot_key = None
status_id = None
self.state_lock.release()
return (timeslot_key, status_id)
def expire_status(self, timeslot_key, status_id):
self.state_lock.acquire()
timeslots = self.state["timeslots"]
if timeslot_key in timeslots:
if status_id in timeslots[timeslot_key]:
log_print(self.name, "Expiring status {} from timeslot {}".format(status_id, timeslot_key))
timeslots[timeslot_key].remove(status_id)
else:
log_print(self.name, "Cannot expire missing status {} from timeslot {}".format(
status_id, timeslot_key))
if len(timeslots[timeslot_key]) == 0:
log_print(self.name, "Removing empty timeslot {}".format(timeslot_key))
del timeslots[timeslot_key]
else:
log_print(self.name, "Cannot expire status {} from missing timeslot {}".format(
status_id, timeslot_key))
self.state_lock.release()
self.save_state()
with open("config.json") as json_file:
config = json.load(json_file)
instances = {}
for name in config["instances"]:
instances[name] = Instance(name = name, config = config)
instances[name].setup()
start_interval = 60.0 / len(config["instances"])
for instance in instances.values():
instance.start()
time.sleep(start_interval)
while True:
time.sleep(1)