|
|
|
@ -38,6 +38,84 @@ class CringeBotClient(BotClient): |
|
|
|
|
# Perform any scheduled deletes |
|
|
|
|
self.deletion_scheduler.run(blocking = False) |
|
|
|
|
|
|
|
|
|
# Look for commands in the replies to the bot's notifications |
|
|
|
|
def process_commands(self, status): |
|
|
|
|
replied_id = status.get("in_reply_to_id", None) |
|
|
|
|
if not replied_id: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
replied_status = self.api.status(replied_id) |
|
|
|
|
replied_tokens = self.h2t.handle(replied_status["content"]).split() |
|
|
|
|
|
|
|
|
|
if not self.config["tag"] in replied_tokens: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
self.enqueue_deletion(status["id"]) |
|
|
|
|
|
|
|
|
|
target_status_id = replied_status.get("in_reply_to_id", None) |
|
|
|
|
if not target_status_id: |
|
|
|
|
self.respond(status, "Original status is missing") |
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
target_status = self.api.status(target_status_id) |
|
|
|
|
target_mail_text = toot_dict_to_mail(target_status).format() |
|
|
|
|
|
|
|
|
|
command = self.h2t.handle(status["content"]).strip() |
|
|
|
|
tokens = deque(command.split()) |
|
|
|
|
|
|
|
|
|
self.log("Received command: {}".format(command)) |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
while True: |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "learn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM]) |
|
|
|
|
self.enqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Learned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Learned as based") |
|
|
|
|
break |
|
|
|
|
elif token == "unlearn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Unlearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Unlearned as based") |
|
|
|
|
break |
|
|
|
|
elif token == "relearn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM]) |
|
|
|
|
self.enqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Relearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Relearned as as based") |
|
|
|
|
break |
|
|
|
|
except IndexError: |
|
|
|
|
self.respond(status, "Invalid command") |
|
|
|
|
except MastodonNotFoundError: |
|
|
|
|
self.respond(status, "Original status is missing") |
|
|
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
|
|
except MastodonNotFoundError: |
|
|
|
|
return False |
|
|
|
|
|
|
|
|
|
def on_status(self, status): |
|
|
|
|
# Ignore statuses from other accounts |
|
|
|
|
if status["account"]["id"] != self.api.me()["id"]: |
|
|
|
@ -58,7 +136,6 @@ class CringeBotClient(BotClient): |
|
|
|
|
mail_text = toot_dict_to_mail(status).format() |
|
|
|
|
|
|
|
|
|
# Format and log plain-text preview |
|
|
|
|
|
|
|
|
|
preview = toot_dict_to_mail(status) |
|
|
|
|
preview.body = md_text |
|
|
|
|
preview_text = preview.format() |
|
|
|
@ -67,73 +144,9 @@ class CringeBotClient(BotClient): |
|
|
|
|
self.log(preview_text) |
|
|
|
|
self.log() |
|
|
|
|
|
|
|
|
|
# Look for commands in the replies to the bot's notifications |
|
|
|
|
|
|
|
|
|
replied_id = status.get("in_reply_to_id", None) |
|
|
|
|
if replied_id: |
|
|
|
|
try: |
|
|
|
|
replied_status = self.api.status(replied_id) |
|
|
|
|
replied_tokens = self.h2t.handle(replied_status["content"]).split() |
|
|
|
|
|
|
|
|
|
if self.config["tag"] in replied_tokens: |
|
|
|
|
self.enqueue_deletion(status["id"]) |
|
|
|
|
target_status_id = replied_status.get("in_reply_to_id", None) |
|
|
|
|
if target_status_id: |
|
|
|
|
try: |
|
|
|
|
target_status = self.api.status(target_status_id) |
|
|
|
|
target_mail_text = toot_dict_to_mail(target_status).format() |
|
|
|
|
|
|
|
|
|
command = self.h2t.handle(status["content"]).strip() |
|
|
|
|
tokens = deque(command.split()) |
|
|
|
|
self.log("Received command: {}".format(command)) |
|
|
|
|
try: |
|
|
|
|
while True: |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "learn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM]) |
|
|
|
|
self.enqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Learned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Learned as based") |
|
|
|
|
break |
|
|
|
|
elif token == "unlearn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Unlearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Unlearned as based") |
|
|
|
|
break |
|
|
|
|
elif token == "relearn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM]) |
|
|
|
|
self.enqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Relearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM]) |
|
|
|
|
self.unqueue_deletion(target_status_id) |
|
|
|
|
self.respond(status, "Relearned as as based") |
|
|
|
|
break |
|
|
|
|
except IndexError: |
|
|
|
|
self.respond(status, "Invalid command") |
|
|
|
|
except MastodonNotFoundError: |
|
|
|
|
self.respond(status, "Original status is missing") |
|
|
|
|
else: |
|
|
|
|
self.respond(status, "Original status is missing") |
|
|
|
|
return |
|
|
|
|
except MastodonNotFoundError: |
|
|
|
|
pass |
|
|
|
|
# Process any commands |
|
|
|
|
if self.process_commands(status): |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
result = bogofilter.run(mail_text, [bogofilter.CLASSIFY, bogofilter.REGISTER]) |
|
|
|
|
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score)) |
|
|
|
|