You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
205 lines
6.7 KiB
205 lines
6.7 KiB
import os
|
|
import sys
|
|
import time
|
|
import copy
|
|
import json
|
|
import pprint
|
|
import threading
|
|
import traceback
|
|
|
|
from mastodon import Mastodon
|
|
|
|
def log_obj_str(obj):
|
|
if isinstance(obj, str):
|
|
return obj.strip()
|
|
else:
|
|
return pprint.pformat(obj).strip()
|
|
|
|
class BotClient:
|
|
DEFAULT_STATE = {"min_status_id": "0"}
|
|
|
|
def __init__(self, bot, config):
|
|
self.bot = bot
|
|
self.config = {
|
|
"base_url": "https://{}".format(config["name"]),
|
|
"client_file": os.path.join("clients", config["name"], "client.secret"),
|
|
"user_file": os.path.join("clients", config["name"], "user.secret"),
|
|
"state_file": os.path.join("clients", config["name"], "state.json"),
|
|
"cringe_dir": os.path.join("clients", config["name"], "cringe"),
|
|
"based_dir": os.path.join("clients", config["name"], "based"),
|
|
"unsure_dir": os.path.join("clients", config["name"], "unsure"), **config}
|
|
|
|
self.load_state()
|
|
|
|
self.poll_thread = threading.Thread(
|
|
target = self.poll_loop,
|
|
name = "{} Poll Loop".format(self.config["name"]),
|
|
args = (),
|
|
kwargs = {},
|
|
daemon = True)
|
|
|
|
def log_str(self, obj = "", infix = str()):
|
|
return self.bot.log_str(obj, infix = "{}: {}".format(self.config["name"], infix))
|
|
|
|
def log(self, obj = "", infix = str()):
|
|
return self.bot.log(obj, infix = "{}: {}".format(self.config["name"], infix))
|
|
|
|
def setup(self):
|
|
client_file_path = os.path.join(os.path.dirname(sys.argv[0]), self.config["client_file"])
|
|
os.makedirs(os.path.dirname(client_file_path), exist_ok = True)
|
|
if not os.path.exists(client_file_path):
|
|
Mastodon.create_app(
|
|
self.app_name,
|
|
api_base_url = self.config["base_url"],
|
|
to_file = client_file_path)
|
|
|
|
user_file_path = os.path.join(os.path.dirname(sys.argv[0]), self.config["user_file"])
|
|
os.makedirs(os.path.dirname(client_file_path), exist_ok = True)
|
|
if not os.path.exists(user_file_path):
|
|
api = Mastodon(
|
|
api_base_url = self.config["base_url"],
|
|
client_id = client_file_path)
|
|
|
|
auth_url = api.auth_request_url()
|
|
|
|
self.log("Go to:")
|
|
self.log(auth_url)
|
|
self.log()
|
|
|
|
auth_code = input(log_string("Enter code: "))
|
|
|
|
self.log()
|
|
|
|
api.log_in(code = auth_code, to_file = user_file_path)
|
|
|
|
self.api = Mastodon(
|
|
access_token = user_file_path,
|
|
api_base_url = self.config["base_url"])
|
|
|
|
def start(self):
|
|
self.poll_thread.start()
|
|
self.on_start()
|
|
|
|
def poll_loop(self):
|
|
while True:
|
|
try:
|
|
statuses = self.api.timeline(min_id = self.state["min_status_id"])
|
|
|
|
if len(statuses) == 0:
|
|
self.on_poll()
|
|
time.sleep(self.config["poll_interval"])
|
|
else:
|
|
self.on_wake()
|
|
|
|
while len(statuses) > 0:
|
|
self.on_status_page(statuses)
|
|
|
|
for status in sorted(statuses,
|
|
key = lambda status: status["created_at"]):
|
|
|
|
self.on_status(status)
|
|
self.state["min_status_id"] = status["id"]
|
|
|
|
time.sleep(self.config["rate_limit"])
|
|
statuses = self.api.fetch_previous(statuses)
|
|
|
|
self.save_state()
|
|
|
|
except Exception as exc:
|
|
self.on_poll_exception(exc)
|
|
time.sleep(self.config["retry_rate"])
|
|
|
|
def load_state(self):
|
|
self.state = self.on_load_state()
|
|
|
|
def save_state(self):
|
|
state = self.state.copy()
|
|
self.on_save_state(state)
|
|
|
|
def on_start(self):
|
|
pass
|
|
|
|
def on_poll(self):
|
|
pass
|
|
|
|
def on_poll_exception(self, exc):
|
|
self.log(traceback.format_exc())
|
|
|
|
def on_wake(self):
|
|
pass
|
|
|
|
def on_status_page(self, statuses):
|
|
pass
|
|
|
|
def on_status(self, status):
|
|
pass
|
|
|
|
def on_load_state(self):
|
|
state_file_path = os.path.join(os.path.dirname(sys.argv[0]), self.config["state_file"])
|
|
if os.path.exists(state_file_path):
|
|
os.makedirs(os.path.dirname(state_file_path), exist_ok = True)
|
|
with open(state_file_path) as json_file:
|
|
return json.load(json_file)
|
|
|
|
return copy.deepcopy(self.DEFAULT_STATE)
|
|
|
|
def on_save_state(self, state):
|
|
state_file_path = os.path.join(os.path.dirname(sys.argv[0]), self.config["state_file"])
|
|
os.makedirs(os.path.dirname(state_file_path), exist_ok = True)
|
|
with open(state_file_path, "w") as json_file:
|
|
json.dump(state, json_file, indent = 4)
|
|
|
|
class Bot:
|
|
DEFAULT_CONFIG = {
|
|
"name": "generic-bot",
|
|
"defaults": {
|
|
"app_name": "Generic Bot",
|
|
"rate_limit": 1,
|
|
"retry_rate": 60,
|
|
"poll_interval": 10
|
|
},
|
|
"clients": {
|
|
"mastodon.social": {}}}
|
|
|
|
def __init__(self, client_type = BotClient, config = {}):
|
|
self.clients = {}
|
|
|
|
self.client_type = client_type
|
|
|
|
self.config = {**self.DEFAULT_CONFIG, **config}
|
|
self.config["defaults"] = {**self.DEFAULT_CONFIG["defaults"], **config.get("defaults", {})}
|
|
|
|
def log_str(self, obj, infix = str()):
|
|
prefix = "{}: {}".format(self.config["name"], infix)
|
|
return prefix + log_obj_str(obj).replace("\n", "\n" + prefix)
|
|
|
|
def log(self, *args, **kwargs):
|
|
print(self.log_str(*args, **kwargs))
|
|
|
|
def start(self):
|
|
self.clients = self.on_start_clients(self.config["clients"])
|
|
self.on_clients_started(self.clients)
|
|
|
|
def on_start_clients(self, client_configs):
|
|
clients = {}
|
|
for client_name, client_config in client_configs.items():
|
|
client_config = {
|
|
**self.config["defaults"],
|
|
**{"name": client_name},
|
|
**client_config}
|
|
client = self.on_init_client(client_name, client_config)
|
|
client.setup()
|
|
clients[client_config["name"]] = client
|
|
|
|
start_interval = self.config["defaults"]["poll_interval"] / len(self.config["clients"])
|
|
for client in clients.values():
|
|
client.start()
|
|
time.sleep(start_interval)
|
|
|
|
return clients
|
|
|
|
def on_init_client(self, client_name, client_config):
|
|
return self.client_type(self, client_config)
|
|
|
|
def on_clients_started(self, clients):
|
|
pass
|
|
|