You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
368 lines
15 KiB
368 lines
15 KiB
import os
|
|
import sys
|
|
import time
|
|
import sched
|
|
from datetime import datetime, timezone, timedelta
|
|
from collections import deque
|
|
import toml
|
|
import re
|
|
|
|
import bogofilter
|
|
import html2text
|
|
import urllib
|
|
|
|
config_path = os.path.join(os.path.dirname(sys.argv[0]), "config.toml")
|
|
loaded_config = toml.load(config_path)
|
|
|
|
OCR = loaded_config.get("ocr", False)
|
|
|
|
if OCR:
|
|
from PIL import Image
|
|
import pytesseract
|
|
|
|
from mastodon import Mastodon, MastodonNotFoundError
|
|
|
|
from bot import Bot, BotClient
|
|
|
|
SEASON = {
|
|
**{ i : "spring" for i in range(3, 6) },
|
|
**{ i : "summer" for i in range(6, 9) },
|
|
**{ i : "autumn" for i in range(9, 12) },
|
|
**{ i : "winter" for i in [12, 1, 2] }}
|
|
|
|
TIME_OF_DAY = {
|
|
**{ i : "night" for i in range(0, 4) },
|
|
**{ i : "early" for i in range(4, 8) },
|
|
**{ i : "morning" for i in range(8, 12) },
|
|
**{ i : "afternoon" for i in range(12, 18) },
|
|
**{ i : "evening" for i in range(18, 24) }}
|
|
|
|
class CringeBotClient(BotClient):
|
|
def __init__(self, bot, config):
|
|
config = {
|
|
"db_dir": ".",
|
|
"cringe_dir": "data/cringe",
|
|
"based_dir": "data/based",
|
|
"unsure_dir": "data/unsure",
|
|
"register": False,
|
|
**config}
|
|
|
|
self.db_dir = os.path.join(os.path.dirname(sys.argv[0]), config["db_dir"])
|
|
|
|
# Initialise HTML-to-Markdown converter
|
|
self.h2t = html2text.HTML2Text()
|
|
self.h2t.ignore_links = True
|
|
|
|
# Create scheduler for deferred deletion of posts
|
|
self.deletion_scheduler = sched.scheduler(time.time, time.sleep)
|
|
|
|
super().__init__(bot, config)
|
|
|
|
# Send DM reply to message, appropriately tagged, and schedules it for deferred deletion
|
|
def respond(self, status, message, context):
|
|
self.log("Responding with:")
|
|
self.log(message)
|
|
self.log()
|
|
|
|
reply = self.api.status_reply(status, message, visibility = "direct", untag = True)
|
|
|
|
self.state["own"][reply["id"]] = context
|
|
self.enqueue_deletion(reply["id"])
|
|
|
|
time.sleep(self.config["rate_limit"])
|
|
|
|
def on_start(self):
|
|
self.deletion_report()
|
|
|
|
def on_poll(self):
|
|
# Perform any scheduled deletes
|
|
self.deletion_scheduler.run(blocking = False)
|
|
|
|
def get_cringe_path(self, status_id):
|
|
return os.path.join(os.path.dirname(sys.argv[0]), self.config["cringe_dir"], status_id)
|
|
|
|
def get_based_path(self, status_id):
|
|
return os.path.join(os.path.dirname(sys.argv[0]), self.config["based_dir"], status_id)
|
|
|
|
def get_unsure_path(self, status_id):
|
|
return os.path.join(os.path.dirname(sys.argv[0]), self.config["unsure_dir"], status_id)
|
|
|
|
def is_cringe(self, status_id):
|
|
path = self.get_cringe_path(status_id)
|
|
return path if os.path.isfile(path) else None
|
|
|
|
def is_based(self, status_id):
|
|
path = self.get_based_path(status_id)
|
|
return path if os.path.isfile(path) else None
|
|
|
|
def is_unsure(self, status_id):
|
|
path = self.get_unsure_path(status_id)
|
|
return path if os.path.isfile(path) else None
|
|
|
|
def get_category_path(self, status_id):
|
|
return self.is_cringe(status_id) or self.is_based(status_id) or self.is_unsure(status_id) or None
|
|
|
|
def delete_and_write(self, delete_path, write_path, text, mode = "a+"):
|
|
if delete_path and os.path.isfile(delete_path):
|
|
os.remove(delete_path)
|
|
os.makedirs(os.path.dirname(write_path), exist_ok = True)
|
|
with open(write_path, mode, encoding = "utf-8") as text_file:
|
|
text_file.write(text)
|
|
|
|
def make_cringe(self, status_id, text):
|
|
self.delete_and_write(self.get_category_path(status_id), self.get_cringe_path(status_id), text + "\n")
|
|
|
|
def make_based(self, status_id, text):
|
|
self.delete_and_write(self.get_category_path(status_id), self.get_based_path(status_id), text + "\n")
|
|
|
|
def make_unsure(self, status_id, text):
|
|
self.delete_and_write(self.get_category_path(status_id), self.get_unsure_path(status_id), text + "\n")
|
|
|
|
# Look for replies to the bot and return True if commands were processed
|
|
def process_commands(self, status):
|
|
status_id = status["id"]
|
|
parent_id = status["in_reply_to_id"]
|
|
|
|
# Check if bot owns the parent status
|
|
if parent_id not in self.state["own"]:
|
|
return False
|
|
|
|
context = self.state["own"][parent_id]
|
|
event = context["event"]
|
|
|
|
target_id = context["target"]
|
|
|
|
# Enqueue command status for deletion
|
|
self.enqueue_deletion(status_id)
|
|
|
|
try:
|
|
command = self.h2t.handle(status["content"]).strip()
|
|
self.log("Received command: {}".format(command))
|
|
|
|
# Fetch the target status
|
|
target_status = self.api.status(target_id)
|
|
target_mail = toot_dict_to_mail(target_status)
|
|
target_mail_text = target_mail.format()
|
|
|
|
|
|
tokens = deque(command.split())
|
|
while True:
|
|
token = tokens.popleft()
|
|
if token == "cringe":
|
|
if len(target_mail.body) == 0:
|
|
self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id})
|
|
return True
|
|
elif event not in ["categorise", "learn"]:
|
|
self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id})
|
|
break
|
|
elif self.is_cringe(target_id):
|
|
self.respond(status, "Status was already learned as cringe", {"event": "error", "type": "redundant", "target": target_id})
|
|
break
|
|
elif self.is_based(target_id):
|
|
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM])
|
|
else:
|
|
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.LEARN_SPAM])
|
|
|
|
self.make_cringe(target_id, target_mail_text)
|
|
self.enqueue_deletion(target_id)
|
|
self.respond(status, "Learned as cringe", {"event": "learn", "target": target_id})
|
|
|
|
break
|
|
|
|
elif token == "based":
|
|
if len(target_mail.body) == 0:
|
|
self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id})
|
|
return True
|
|
elif event not in ["categorise", "learn"]:
|
|
self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id})
|
|
break
|
|
elif self.is_based(target_id):
|
|
self.respond(status, "Status was already learned as based", {"event": "error", "type": "redundant", "target": target_id})
|
|
break
|
|
elif self.is_cringe(target_id):
|
|
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM])
|
|
else:
|
|
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.LEARN_HAM])
|
|
|
|
self.make_based(target_id, target_mail_text)
|
|
self.unqueue_deletion(target_id)
|
|
self.respond(status, "Learned as based", {"event": "learn", "target": target_id})
|
|
|
|
break
|
|
|
|
elif token == "unlearn":
|
|
if len(target_mail.body) == 0:
|
|
self.respond(status, "Status has no content to classify", {"event": "error", "type": "empty", "target": target_id})
|
|
return True
|
|
elif event not in ["categorise", "learn"]:
|
|
self.respond(status, "Status is not learnable", {"event": "error", "type": "learnable", "target": target_id})
|
|
break
|
|
elif self.is_unsure(target_id):
|
|
self.respond(status, "Status was already unsure", {"event": "error", "type": "redundant", "target": target_id})
|
|
break
|
|
elif self.is_cringe(target_id):
|
|
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_SPAM])
|
|
elif self.is_based(target_id):
|
|
bogofilter.run(target_mail_text, db_dir = self.db_dir, actions = [bogofilter.UNLEARN_HAM])
|
|
|
|
self.make_unsure(target_id, target_mail_text)
|
|
self.unqueue_deletion(target_id)
|
|
self.respond(status, "Unlearned", {"event": "learn", "target": target_id})
|
|
|
|
break
|
|
|
|
except IndexError:
|
|
self.respond(status, "Invalid command", {"event": "error", "type": "syntax", "target": target_id})
|
|
except MastodonNotFoundError:
|
|
self.respond(status, "Could not fetch target status", {"event": "error", "type": "fetch", "target": target_id})
|
|
|
|
return True
|
|
|
|
def on_status(self, status):
|
|
# Ignore statuses from other accounts
|
|
if status["account"]["id"] != self.api.me()["id"]:
|
|
return
|
|
|
|
# Ignore statuses this account boosts
|
|
if status["reblog"]:
|
|
return
|
|
|
|
status_id = status["id"]
|
|
|
|
# Ignore bot's own statuses
|
|
if status_id in self.state["own"]:
|
|
return
|
|
|
|
# Create faux HTML email of status
|
|
mail = toot_dict_to_mail(status)
|
|
mail_text = mail.format()
|
|
|
|
self.log()
|
|
self.log(mail_text)
|
|
self.log()
|
|
|
|
if len(mail.body) == 0:
|
|
self.log("Not classifying {} because it has no content".format(status_id))
|
|
return
|
|
|
|
# Process any commands
|
|
if self.process_commands(status):
|
|
return
|
|
|
|
result = bogofilter.run(mail_text, db_dir = self.db_dir, actions = [bogofilter.CLASSIFY, bogofilter.REGISTER] if self.config["register"] else [bogofilter.CLASSIFY])
|
|
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score))
|
|
if result.category == bogofilter.SPAM:
|
|
self.log("CRINGE: Enqueuing status {} for deletion".format(status_id))
|
|
if self.config["register"]:
|
|
self.make_cringe(status_id, mail_text)
|
|
self.enqueue_deletion(status_id)
|
|
self.respond(status, "Categorised as cringe\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id})
|
|
elif result.category == bogofilter.HAM:
|
|
self.log("BASED: Not enqueueing status {} for deletion".format(status_id))
|
|
if self.config["register"]:
|
|
self.make_based(status_id, mail_text)
|
|
self.respond(status, "Categorised as based\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id})
|
|
else:
|
|
self.log("UNSURE: Not enqueueing status {} for deletion".format(status_id))
|
|
if self.config["register"]:
|
|
self.make_unsure(status_id, mail_text)
|
|
self.respond(status, "Categorised as unsure\n{}".format(bogo_report), context = {"event": "categorise", "target": status_id})
|
|
|
|
def on_load_state(self):
|
|
state = {
|
|
"deletion_queue": {},
|
|
"own": {},
|
|
**super().on_load_state()}
|
|
|
|
for status_id, params in state["deletion_queue"].items():
|
|
# Load deletion queue into scheduler
|
|
params["scheduler_event"] = self.deletion_scheduler.enterabs(datetime.fromisoformat(params["time"]).timestamp(), 1, self.queued_delete, argument=(status_id,))
|
|
|
|
return state
|
|
|
|
def on_save_state(self, state):
|
|
# Transform deletion scheduler queue to a JSON friendly format
|
|
state["deletion_queue"] = {event.argument[0]: {"time": datetime.fromtimestamp(event.time, timezone.utc).isoformat()} for event in self.deletion_scheduler.queue}
|
|
|
|
super().on_save_state(state)
|
|
|
|
def deletion_report(self):
|
|
self.log("{} status(es) queued for deletion".format(len(self.deletion_scheduler.queue)))
|
|
|
|
def enqueue_deletion(self, status_id, delay = None):
|
|
if delay is None:
|
|
delay = 60 * self.config["max_age"]
|
|
self.state["deletion_queue"][status_id] = {"scheduler_event": self.deletion_scheduler.enter(delay, 1, self.queued_delete, argument=(status_id,), kwargs={})}
|
|
self.deletion_report()
|
|
|
|
def unqueue_deletion(self, status_id):
|
|
try:
|
|
params = self.state["deletion_queue"].pop(status_id)
|
|
self.deletion_scheduler.cancel(params["scheduler_event"])
|
|
except KeyError:
|
|
self.log("Cannot unqueue non-queued status {} for deletion".format(status_id))
|
|
|
|
self.deletion_report()
|
|
|
|
def queued_delete(self, status_id):
|
|
try:
|
|
self.log("Deleting status {}".format(status_id))
|
|
self.api.status_delete(status_id)
|
|
if status_id in self.state["own"]:
|
|
del self.state["own"][status_id]
|
|
except MastodonNotFoundError:
|
|
self.log("Cannot find status {} on server".format(status_id))
|
|
except Exception:
|
|
self.log(traceback.format_exc())
|
|
self.enqueue_deletion(status_id, 300)
|
|
|
|
def toot_dict_to_mail(toot_dict):
|
|
flags = []
|
|
|
|
flags.append(toot_dict["visibility"])
|
|
|
|
if toot_dict["sensitive"]:
|
|
flags.append("sensitive")
|
|
|
|
if toot_dict["poll"]:
|
|
flags.append("poll")
|
|
|
|
if len(toot_dict["media_attachments"]) > 0:
|
|
flags.append("attachments")
|
|
|
|
time = []
|
|
now = datetime.now()
|
|
time.append(SEASON[now.month])
|
|
time.append(TIME_OF_DAY[now.hour])
|
|
|
|
headers = {}
|
|
|
|
headers["From"] = toot_dict["account"]["acct"]
|
|
headers["X-Flags"] = ", ".join(flags)
|
|
headers["X-Time"] = ", ".join(time)
|
|
if len(toot_dict["spoiler_text"]) > 0:
|
|
headers["Subject"] = toot_dict["spoiler_text"]
|
|
|
|
body = toot_dict["content"]
|
|
|
|
if OCR:
|
|
for media in toot_dict["media_attachments"]:
|
|
if media["type"] == "image":
|
|
try:
|
|
with urllib.request.urlopen(media["url"]) as image:
|
|
ocr_text = pytesseract.image_to_string(Image.open(image))
|
|
words = re.findall(r"\w+", ocr_text)
|
|
tokens = ["ocr_" + word.lower() for word in words]
|
|
body += "\n\n" + " ".join(tokens)
|
|
except Exception:
|
|
print("Skipping OCR on attachment due to exception")
|
|
print(traceback.format_exc())
|
|
|
|
return bogofilter.Mail(headers = headers, body = body)
|
|
|
|
bot = Bot(CringeBotClient, loaded_config)
|
|
del loaded_config
|
|
bot.start()
|
|
|
|
while True:
|
|
time.sleep(1)
|
|
|