You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
336 lines
13 KiB
336 lines
13 KiB
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone, timedelta
|
|
import json
|
|
import threading
|
|
import bogofilter
|
|
import html2text
|
|
from collections import deque
|
|
|
|
from mastodon import Mastodon, MastodonNotFoundError
|
|
|
|
from bot import Bot, BotClient
|
|
|
|
def encode_time(dt):
|
|
return int(dt.strftime("%Y%m%d%H%M"))
|
|
|
|
def decode_time(value):
|
|
if len(value) == 12:
|
|
return dt.strptime(str(value), "%Y%m%d%H%M")
|
|
else:
|
|
return dt.strptime(str(value), "%Y%m%d%H")
|
|
|
|
class CringeBotClient(BotClient):
|
|
def __init__(self, bot, config):
|
|
super().__init__(bot, config)
|
|
|
|
self.h2t = html2text.HTML2Text()
|
|
self.h2t.ignore_links = True
|
|
|
|
self.spawner_thread = threading.Thread(
|
|
target = self.spawner,
|
|
name = self.config["name"] + " spawner",
|
|
args = (),
|
|
kwargs = {},
|
|
daemon = True)
|
|
|
|
self.purger_thread = threading.Thread(
|
|
target = self.purger,
|
|
name = self.config["name"] + " purger",
|
|
args = (),
|
|
kwargs = {},
|
|
daemon = True)
|
|
|
|
def on_start(self):
|
|
self.spawner_thread.start()
|
|
|
|
def spawner(self):
|
|
if not learning:
|
|
self.purger_thread.start()
|
|
|
|
while True:
|
|
self.tracker_report()
|
|
time.sleep(60)
|
|
|
|
def respond(self, status, message):
|
|
self.log("Responded with:")
|
|
self.log(message)
|
|
reply = self.api.status_reply(status, "{}\n{}".format(message, self.config["tag"]), visibility = "direct", untag = True)
|
|
self.track_status(reply)
|
|
time.sleep(1)
|
|
|
|
def on_status(self, status):
|
|
if status["account"]["id"] != self.api.me()["id"]:
|
|
return
|
|
|
|
if status["reblog"]:
|
|
return
|
|
|
|
md_text = self.h2t.handle(status["content"])
|
|
if self.config["tag"] in md_text.split():
|
|
return
|
|
mail_text = toot_dict_to_mail(status).format()
|
|
|
|
preview = toot_dict_to_mail(status)
|
|
preview.body = md_text
|
|
preview_text = preview.format()
|
|
|
|
if learning:
|
|
self.log(preview_text)
|
|
self.log()
|
|
|
|
category = None
|
|
while not category in bogofilter.categories:
|
|
category = input("H(am), S(pam) or U(nknown)? ").upper()
|
|
|
|
if category != bogofilter.UNSURE:
|
|
bogofilter.run(mail_text, [category])
|
|
|
|
if category == bogofilter.SPAM:
|
|
self.track_status(status)
|
|
|
|
self.log()
|
|
else:
|
|
replied_id = status.get("in_reply_to_id", None)
|
|
if replied_id:
|
|
try:
|
|
replied_status = self.api.status(replied_id)
|
|
replied_tokens = self.h2t.handle(replied_status["content"]).split()
|
|
|
|
if self.config["tag"] in replied_tokens:
|
|
self.track_status(status)
|
|
target_status_id = replied_status.get("in_reply_to_id", None)
|
|
if target_status_id:
|
|
try:
|
|
target_status = self.api.status(target_status_id)
|
|
target_timeslot_key = encode_time(target_status["created_at"])
|
|
target_mail_text = toot_dict_to_mail(target_status).format()
|
|
|
|
command = self.h2t.handle(status["content"]).strip()
|
|
tokens = deque(command.split())
|
|
self.log("Received command: {}".format(command))
|
|
try:
|
|
while True:
|
|
token = tokens.popleft()
|
|
if token == "learn":
|
|
token = tokens.popleft()
|
|
if token == "spam":
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM])
|
|
self.track_status(target_status)
|
|
self.respond(status, "Learned as spam")
|
|
break
|
|
elif token == "ham":
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM])
|
|
self.expire_status(target_timeslot_key, target_status_id)
|
|
self.respond(status, "Learned as ham")
|
|
break
|
|
elif token == "unlearn":
|
|
token = tokens.popleft()
|
|
if token == "spam":
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM])
|
|
self.expire_status(target_timeslot_key, target_status_id)
|
|
self.respond(status, "Unlearned as spam")
|
|
break
|
|
elif token == "ham":
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM])
|
|
self.expire_status(target_timeslot_key, target_status_id)
|
|
self.respond(status, "Unlearned as spam")
|
|
break
|
|
elif token == "relearn":
|
|
token = tokens.popleft()
|
|
if token == "spam":
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM])
|
|
self.track_status(target_status)
|
|
self.respond(status, "Relearned as spam")
|
|
break
|
|
elif token == "ham":
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM])
|
|
self.expire_status(target_timeslot_key, target_status_id)
|
|
self.respond(status, "Relearned as as ham")
|
|
break
|
|
except IndexError:
|
|
self.respond(status, "Invalid command")
|
|
except MastodonNotFoundError:
|
|
self.respond(status, "Original status is missing")
|
|
else:
|
|
self.respond(status, "Original status is missing")
|
|
return
|
|
except MastodonNotFoundError:
|
|
pass
|
|
|
|
result = bogofilter.run(mail_text, [bogofilter.CLASSIFY, bogofilter.REGISTER])
|
|
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score))
|
|
if result.category == bogofilter.SPAM:
|
|
self.log("SPAM: Tracking status with ID {} as spam".format(status["id"]))
|
|
self.respond(status, "Categorised as spam\n{}".format(bogo_report))
|
|
self.track_status(status)
|
|
elif result.category == bogofilter.UNSURE:
|
|
self.log("UNSURE: Not tracking status with ID {} as spam".format(status["id"]))
|
|
self.respond(status, "Categorised as unsure\n{}".format(bogo_report))
|
|
else:
|
|
self.log("HAM: Not tracking status with ID {} as spam".format(status["id"]))
|
|
self.respond(status, "Categorised as ham\n{}".format(bogo_report))
|
|
|
|
self.log()
|
|
self.log(preview_text)
|
|
self.log()
|
|
|
|
def purger(self):
|
|
while True:
|
|
try:
|
|
deleted = False
|
|
timeslot_key, status_id = self.next_expired()
|
|
|
|
if not timeslot_key is None:
|
|
try:
|
|
self.log("Deleting status {} in timeslot {}".format(status_id, timeslot_key))
|
|
self.api.status_delete(status_id)
|
|
deleted = True
|
|
|
|
except MastodonNotFoundError:
|
|
self.log("Cannot find status {} on server".format(status_id))
|
|
|
|
self.expire_status(timeslot_key, status_id)
|
|
|
|
if deleted:
|
|
time.sleep(60)
|
|
else:
|
|
time.sleep(1)
|
|
except:
|
|
self.log(traceback.format_exc())
|
|
time.sleep(60)
|
|
|
|
def on_load_state(self):
|
|
state = super().on_load_state()
|
|
state["timeslots"] = state.get("timeslots", {})
|
|
state["timeslots"] = dict(map(lambda kv: (int(kv[0]), set(kv[1])), state["timeslots"]))
|
|
return state
|
|
|
|
def on_save_state(self, state):
|
|
state["timeslots"] = list(map(lambda kv: [kv[0], list(kv[1])], state["timeslots"].items()))
|
|
super().on_save_state(state)
|
|
|
|
def tracker_report(self):
|
|
with self.state_lock:
|
|
total_timeslots = len(self.state["timeslots"])
|
|
total_statuses = 0
|
|
for timeslot_key, status_ids in self.state["timeslots"].items():
|
|
total_statuses += len(status_ids)
|
|
|
|
self.log("Tracking {} statuses across {} timeslots".format(total_statuses, total_timeslots))
|
|
|
|
def track_status(self, status):
|
|
status_id = str(status["id"])
|
|
timeslot_key = encode_time(status["created_at"])
|
|
|
|
with self.state_lock:
|
|
if status["reblog"] is None:
|
|
timeslots = self.state["timeslots"]
|
|
if not timeslot_key in timeslots:
|
|
timeslots[timeslot_key] = set()
|
|
timeslots[timeslot_key].add(status_id)
|
|
|
|
def next_expired(self):
|
|
now = datetime.now(timezone.utc)
|
|
min_timeslot_key = encode_time(now - timedelta(minutes = self.config["max_age"]))
|
|
|
|
with self.state_lock:
|
|
timeslot_key, status_ids = next(iter(self.state["timeslots"].items()), (None, None))
|
|
|
|
if not timeslot_key is None and timeslot_key < min_timeslot_key:
|
|
status_id = next(iter(status_ids), None)
|
|
else:
|
|
timeslot_key = None
|
|
status_id = None
|
|
|
|
return (timeslot_key, status_id)
|
|
|
|
def expire_status(self, timeslot_key, status_id):
|
|
with self.state_lock:
|
|
timeslots = self.state["timeslots"]
|
|
if timeslot_key in timeslots:
|
|
if status_id in timeslots[timeslot_key]:
|
|
self.log("Expiring status {} from timeslot {}".format(status_id, timeslot_key))
|
|
timeslots[timeslot_key].remove(status_id)
|
|
else:
|
|
self.log("Cannot expire missing status {} from timeslot {}".format(
|
|
status_id, timeslot_key))
|
|
|
|
if len(timeslots[timeslot_key]) == 0:
|
|
self.log("Removing empty timeslot {}".format(timeslot_key))
|
|
del timeslots[timeslot_key]
|
|
else:
|
|
self.log("Cannot expire status {} from missing timeslot {}".format(
|
|
status_id, timeslot_key))
|
|
|
|
def toot_dict_to_mail(toot_dict):
|
|
flags = []
|
|
|
|
if toot_dict.get("sensitive", False):
|
|
flags.append("sensitive")
|
|
|
|
if toot_dict.get("poll", False):
|
|
flags.append("poll")
|
|
|
|
if toot_dict.get("reblog", False):
|
|
flags.append("reblog")
|
|
|
|
if toot_dict.get("reblogged", False):
|
|
flags.append("reblogged")
|
|
|
|
if toot_dict.get("favourited", False):
|
|
flags.append("favourited")
|
|
|
|
if toot_dict.get("bookmarked", False):
|
|
flags.append("bookmarked")
|
|
|
|
if toot_dict.get("pinned", False):
|
|
flags.append("pinned")
|
|
|
|
flags = ", ".join(flags)
|
|
|
|
headers = {}
|
|
|
|
if toot_dict.get("account") and toot_dict["account"].get("acct"):
|
|
headers["From"] = toot_dict["account"]["acct"]
|
|
|
|
if toot_dict.get("created_at"):
|
|
headers["Date"] = toot_dict["created_at"]
|
|
|
|
if toot_dict.get("visibility"):
|
|
headers["X-Visibility"] = toot_dict["visibility"]
|
|
|
|
if len(flags) > 0:
|
|
headers["X-Flags"] = flags
|
|
|
|
if toot_dict.get("spoiler_text"):
|
|
headers["Subject"] = toot_dict["spoiler_text"]
|
|
|
|
if toot_dict.get("replies_count", 0) > 0:
|
|
headers["X-Replies-Count"] = toot_dict["replies_count"]
|
|
|
|
if len(toot_dict.get("media_attachments", [])) > 0:
|
|
headers["X-Attachments-Count"] = len(toot_dict["media_attachments"])
|
|
|
|
if toot_dict.get("reblogs_count", 0) > 0:
|
|
headers["X-Reblogs-Count"] = toot_dict["reblogs_count"]
|
|
|
|
if toot_dict.get("favourites_count", 0) > 0:
|
|
headers["X-Favourites-Count"] = toot_dict["favourites_count"]
|
|
|
|
if toot_dict.get("content") and len(toot_dict["content"]) > 0:
|
|
body = toot_dict["content"]
|
|
else:
|
|
body = None
|
|
|
|
return bogofilter.Mail(headers = headers, body = body)
|
|
|
|
learning = "-l" in sys.argv[1:]
|
|
|
|
with open("config.json") as json_file:
|
|
bot = Bot(CringeBotClient, json.load(json_file))
|
|
bot.start()
|
|
|
|
while True:
|
|
time.sleep(1)
|
|
|