import os import sys import time import copy import json import pprint import threading import traceback from mastodon import Mastodon def log_obj_str(obj): if isinstance(obj, str): return obj.strip() else: return pprint.pformat(obj).strip() class BotClient: DEFAULT_STATE = {"min_status_id": "0"} def __init__(self, bot, config): self.bot = bot self.config = { "base_url": "https://{}".format(config["name"]), "client_file": os.path.join("clients", config["name"], "client.secret"), "user_file": os.path.join("clients", config["name"], "user.secret"), "state_file": os.path.join("clients", config["name"], "state.json"), "cringe_dir": os.path.join("clients", config["name"], "cringe"), "based_dir": os.path.join("clients", config["name"], "based"), "unsure_dir": os.path.join("clients", config["name"], "unsure"), **config} self.load_state() self.poll_thread = threading.Thread( target = self.poll_loop, name = "{} Poll Loop".format(self.config["name"]), args = (), kwargs = {}, daemon = True) def log_str(self, obj = "", infix = str()): return self.bot.log_str(obj, infix = "{}: {}".format(self.config["name"], infix)) def log(self, obj = "", infix = str()): return self.bot.log(obj, infix = "{}: {}".format(self.config["name"], infix)) def setup(self): client_file_path = os.path.join(os.path.dirname(sys.argv[0]), self.config["client_file"]) os.makedirs(os.path.dirname(client_file_path), exist_ok = True) if not os.path.exists(client_file_path): Mastodon.create_app( self.app_name, api_base_url = self.config["base_url"], to_file = client_file_path) user_file_path = os.path.join(os.path.dirname(sys.argv[0]), self.config["user_file"]) os.makedirs(os.path.dirname(client_file_path), exist_ok = True) if not os.path.exists(user_file_path): api = Mastodon( api_base_url = self.config["base_url"], client_id = client_file_path) auth_url = api.auth_request_url() self.log("Go to:") self.log(auth_url) self.log() auth_code = input(log_string("Enter code: ")) self.log() api.log_in(code = auth_code, to_file = user_file_path) self.api = Mastodon( access_token = user_file_path, api_base_url = self.config["base_url"]) def start(self): self.poll_thread.start() self.on_start() def poll_loop(self): while True: try: statuses = self.api.timeline(min_id = self.state["min_status_id"]) if len(statuses) == 0: self.on_poll() time.sleep(self.config["poll_interval"]) else: self.on_wake() while len(statuses) > 0: self.on_status_page(statuses) for status in sorted(statuses, key = lambda status: status["created_at"]): self.on_status(status) self.state["min_status_id"] = status["id"] time.sleep(self.config["rate_limit"]) statuses = self.api.fetch_previous(statuses) self.save_state() except Exception as exc: self.on_poll_exception(exc) time.sleep(self.config["retry_rate"]) def load_state(self): self.state = self.on_load_state() def save_state(self): state = self.state.copy() self.on_save_state(state) def on_start(self): pass def on_poll(self): pass def on_poll_exception(self, exc): self.log(traceback.format_exc()) def on_wake(self): pass def on_status_page(self, statuses): pass def on_status(self, status): pass def on_load_state(self): state_file_path = os.path.join(os.path.dirname(sys.argv[0]), self.config["state_file"]) if os.path.exists(state_file_path): os.makedirs(os.path.dirname(state_file_path), exist_ok = True) with open(state_file_path) as json_file: return json.load(json_file) return copy.deepcopy(self.DEFAULT_STATE) def on_save_state(self, state): state_file_path = os.path.join(os.path.dirname(sys.argv[0]), self.config["state_file"]) os.makedirs(os.path.dirname(state_file_path), exist_ok = True) with open(state_file_path, "w") as json_file: json.dump(state, json_file, indent = 4) class Bot: DEFAULT_CONFIG = { "name": "generic-bot", "defaults": { "app_name": "Generic Bot", "rate_limit": 1, "retry_rate": 60, "poll_interval": 10 }, "clients": { "mastodon.social": {}}} def __init__(self, client_type = BotClient, config = {}): self.clients = {} self.client_type = client_type self.config = {**self.DEFAULT_CONFIG, **config} self.config["defaults"] = {**self.DEFAULT_CONFIG["defaults"], **config.get("defaults", {})} def log_str(self, obj, infix = str()): prefix = "{}: {}".format(self.config["name"], infix) return prefix + log_obj_str(obj).replace("\n", "\n" + prefix) def log(self, *args, **kwargs): print(self.log_str(*args, **kwargs)) def start(self): self.clients = self.on_start_clients(self.config["clients"]) self.on_clients_started(self.clients) def on_start_clients(self, client_configs): clients = {} for client_name, client_config in client_configs.items(): client_config = { **self.config["defaults"], **{"name": client_name}, **client_config} client = self.on_init_client(client_name, client_config) client.setup() clients[client_config["name"]] = client start_interval = self.config["defaults"]["poll_interval"] / len(self.config["clients"]) for client in clients.values(): client.start() time.sleep(start_interval) return clients def on_init_client(self, client_name, client_config): return self.client_type(self, client_config) def on_clients_started(self, clients): pass