|
|
|
@ -38,78 +38,115 @@ class CringeBotClient(BotClient): |
|
|
|
|
# Perform any scheduled deletes |
|
|
|
|
self.deletion_scheduler.run(blocking = False) |
|
|
|
|
|
|
|
|
|
# Look for commands in the replies to the bot's notifications |
|
|
|
|
def set_cringe(self, status_id): |
|
|
|
|
self.state["cringe"].add(status_id) |
|
|
|
|
self.state["based"].discard(status_id) |
|
|
|
|
self.state["unsure"].discard(status_id) |
|
|
|
|
|
|
|
|
|
def set_based(self, status_id): |
|
|
|
|
self.state["cringe"].discard(status_id) |
|
|
|
|
self.state["based"].add(status_id) |
|
|
|
|
self.state["unsure"].discard(status_id) |
|
|
|
|
|
|
|
|
|
def set_unsure(self, status_id): |
|
|
|
|
self.state["cringe"].discard(status_id) |
|
|
|
|
self.state["based"].discard(status_id) |
|
|
|
|
self.state["unsure"].add(status_id) |
|
|
|
|
|
|
|
|
|
def set_discard(self, status_id): |
|
|
|
|
self.state["cringe"].discard(status_id) |
|
|
|
|
self.state["based"].discard(status_id) |
|
|
|
|
self.state["unsure"].discard(status_id) |
|
|
|
|
|
|
|
|
|
# Look for and process commands in the replies to the bot's notifications and return True if commands were processed |
|
|
|
|
def process_commands(self, status): |
|
|
|
|
# Check if status is a reply to another status |
|
|
|
|
replied_id = status.get("in_reply_to_id", None) |
|
|
|
|
if not replied_id: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
# Fetch replied-to status |
|
|
|
|
replied_status = self.api.status(replied_id) |
|
|
|
|
replied_tokens = self.h2t.handle(replied_status["content"]).split() |
|
|
|
|
|
|
|
|
|
# Check if it belongs to the bot |
|
|
|
|
if not self.config["tag"] in replied_tokens: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
self.enqueue_deletion(status["id"]) |
|
|
|
|
status_id = status["id"] |
|
|
|
|
|
|
|
|
|
# Enqueue user command status for deletion |
|
|
|
|
self.enqueue_deletion(status_id) |
|
|
|
|
|
|
|
|
|
# Find the intended target of the command (the status that the bot originally replied to with a classification) |
|
|
|
|
target_status_id = replied_status.get("in_reply_to_id", None) |
|
|
|
|
if not target_status_id: |
|
|
|
|
self.respond(status, "Original status is missing") |
|
|
|
|
self.respond(status, "Target status is missing") |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
target_status = self.api.status(target_status_id) |
|
|
|
|
target_mail_text = toot_dict_to_mail(target_status).format() |
|
|
|
|
|
|
|
|
|
command = self.h2t.handle(status["content"]).strip() |
|
|
|
|
tokens = deque(command.split()) |
|
|
|
|
|
|
|
|
|
self.log("Received command: {}".format(command)) |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
while True: |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "learn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM]) |
|
|
|
|
self.enqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Learned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Learned as based") |
|
|
|
|
break |
|
|
|
|
elif token == "unlearn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Unlearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Unlearned as based") |
|
|
|
|
break |
|
|
|
|
elif token == "relearn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM]) |
|
|
|
|
self.enqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Relearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Relearned as as based") |
|
|
|
|
break |
|
|
|
|
except IndexError: |
|
|
|
|
self.respond(status, "Invalid command") |
|
|
|
|
# Fetch the target status |
|
|
|
|
target_status = self.api.status(target_status_id) |
|
|
|
|
target_mail_text = toot_dict_to_mail(target_status).format() |
|
|
|
|
|
|
|
|
|
# Check if target status was previously classified |
|
|
|
|
was_cringe = target_status_id in self.state["cringe"] |
|
|
|
|
was_based = target_status_id in self.state["based"] |
|
|
|
|
was_unsure = target_status_id in self.state["unsure"] |
|
|
|
|
|
|
|
|
|
tokens = deque(command.split()) |
|
|
|
|
while True: |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
if was_cringe: |
|
|
|
|
break |
|
|
|
|
elif was_based: |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM]) |
|
|
|
|
else: |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM]) |
|
|
|
|
|
|
|
|
|
self.set_cringe(target_status_id) |
|
|
|
|
self.enqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Learned as cringe") |
|
|
|
|
|
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
elif token == "based": |
|
|
|
|
if was_based: |
|
|
|
|
break |
|
|
|
|
elif was_cringe: |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM]) |
|
|
|
|
else: |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM]) |
|
|
|
|
|
|
|
|
|
self.set_based(target_status_id) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Learned as based") |
|
|
|
|
|
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
elif token == "unlearn": |
|
|
|
|
if was_unsure: |
|
|
|
|
break |
|
|
|
|
elif was_cringe: |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM]) |
|
|
|
|
elif was_based: |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM]) |
|
|
|
|
|
|
|
|
|
self.set_unsure(target_status_id) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Unlearned") |
|
|
|
|
|
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
except IndexError: |
|
|
|
|
self.respond(status, "Invalid command") |
|
|
|
|
except MastodonNotFoundError: |
|
|
|
|
self.respond(status, "Original status is missing") |
|
|
|
|
self.respond(status, "Target status is missing") |
|
|
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
@ -150,26 +187,45 @@ class CringeBotClient(BotClient): |
|
|
|
|
|
|
|
|
|
result = bogofilter.run(mail_text, [bogofilter.CLASSIFY, bogofilter.REGISTER]) |
|
|
|
|
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score)) |
|
|
|
|
status_id = status["id"] |
|
|
|
|
if result.category == bogofilter.SPAM: |
|
|
|
|
self.log("CRINGE: Enqueuing status {} for deletion".format(status["id"])) |
|
|
|
|
self.log("CRINGE: Enqueuing status {} for deletion".format(status_id)) |
|
|
|
|
self.set_cringe(status_id) |
|
|
|
|
self.enqueue_deletion(status_id) |
|
|
|
|
self.respond(status, "Categorised as cringe\n{}".format(bogo_report)) |
|
|
|
|
self.enqueue_deletion(status["id"]) |
|
|
|
|
elif result.category == bogofilter.UNSURE: |
|
|
|
|
self.log("UNSURE: Not enqueueing status {} for deletion".format(status["id"])) |
|
|
|
|
self.respond(status, "Categorised as unsure\n{}".format(bogo_report)) |
|
|
|
|
else: |
|
|
|
|
self.log("BASED: Not enqueueing status {} for deletion".format(status["id"])) |
|
|
|
|
elif result.category == bogofilter.HAM: |
|
|
|
|
self.log("BASED: Not enqueueing status {} for deletion".format(status_id)) |
|
|
|
|
self.set_based(status_id) |
|
|
|
|
self.respond(status, "Categorised as based\n{}".format(bogo_report)) |
|
|
|
|
else: |
|
|
|
|
self.log("UNSURE: Not enqueueing status {} for deletion".format(status_id)) |
|
|
|
|
self.set_unsure(status_id) |
|
|
|
|
self.respond(status, "Categorised as unsure\n{}".format(bogo_report)) |
|
|
|
|
|
|
|
|
|
def on_load_state(self): |
|
|
|
|
state = {"deletion_queue": {}, **super().on_load_state()} |
|
|
|
|
state = { |
|
|
|
|
"deletion_queue": {}, |
|
|
|
|
"cringe": [], |
|
|
|
|
"based": [], |
|
|
|
|
"unsure": [], |
|
|
|
|
**super().on_load_state()} |
|
|
|
|
|
|
|
|
|
state["cringe"] = set(state["cringe"]) |
|
|
|
|
state["based"] = set(state["based"]) |
|
|
|
|
state["unsure"] = set(state["unsure"]) |
|
|
|
|
|
|
|
|
|
for status_id, params in state["deletion_queue"].items(): |
|
|
|
|
# Load deletion queue into scheduler |
|
|
|
|
params["scheduler_event"] = self.deletion_scheduler.enterabs(datetime.fromisoformat(params["time"]).timestamp(), 1, self.queued_delete, argument=(status_id,)) |
|
|
|
|
|
|
|
|
|
return state |
|
|
|
|
|
|
|
|
|
def on_save_state(self, state): |
|
|
|
|
state["cringe"] = list(state["cringe"]) |
|
|
|
|
state["based"] = list(state["based"]) |
|
|
|
|
state["unsure"] = list(state["unsure"]) |
|
|
|
|
|
|
|
|
|
# Transform deletion scheduler queue to a JSON friendly format |
|
|
|
|
state["deletion_queue"] = {event.argument[0]: {"time": datetime.fromtimestamp(event.time, timezone.utc).isoformat()} for event in self.deletion_scheduler.queue} |
|
|
|
|
|
|
|
|
|
super().on_save_state(state) |
|
|
|
@ -193,6 +249,7 @@ class CringeBotClient(BotClient): |
|
|
|
|
def queued_delete(self, status_id): |
|
|
|
|
try: |
|
|
|
|
self.log("Deleting status {}".format(status_id)) |
|
|
|
|
self.set_discard(status_id) |
|
|
|
|
self.api.status_delete(status_id) |
|
|
|
|
except MastodonNotFoundError: |
|
|
|
|
self.log("Cannot find status {} on server".format(status_id)) |
|
|
|
|