|
|
|
@ -46,12 +46,11 @@ class CringeBotClient(BotClient): |
|
|
|
|
self.spawner_thread.start() |
|
|
|
|
|
|
|
|
|
def spawner(self): |
|
|
|
|
if not learning: |
|
|
|
|
self.purger_thread.start() |
|
|
|
|
self.purger_thread.start() |
|
|
|
|
|
|
|
|
|
while True: |
|
|
|
|
self.tracker_report() |
|
|
|
|
time.sleep(60) |
|
|
|
|
while True: |
|
|
|
|
self.tracker_report() |
|
|
|
|
time.sleep(60) |
|
|
|
|
|
|
|
|
|
def respond(self, status, message): |
|
|
|
|
self.log("Responded with:") |
|
|
|
@ -76,105 +75,89 @@ class CringeBotClient(BotClient): |
|
|
|
|
preview.body = md_text |
|
|
|
|
preview_text = preview.format() |
|
|
|
|
|
|
|
|
|
if learning: |
|
|
|
|
self.log(preview_text) |
|
|
|
|
self.log() |
|
|
|
|
|
|
|
|
|
category = None |
|
|
|
|
while not category in bogofilter.categories: |
|
|
|
|
category = input("H(am), S(pam) or U(nknown)? ").upper() |
|
|
|
|
|
|
|
|
|
if category != bogofilter.UNSURE: |
|
|
|
|
bogofilter.run(mail_text, [category]) |
|
|
|
|
|
|
|
|
|
if category == bogofilter.SPAM: |
|
|
|
|
self.track_status(status) |
|
|
|
|
|
|
|
|
|
self.log() |
|
|
|
|
else: |
|
|
|
|
replied_id = status.get("in_reply_to_id", None) |
|
|
|
|
if replied_id: |
|
|
|
|
try: |
|
|
|
|
replied_status = self.api.status(replied_id) |
|
|
|
|
replied_tokens = self.h2t.handle(replied_status["content"]).split() |
|
|
|
|
|
|
|
|
|
if self.config["tag"] in replied_tokens: |
|
|
|
|
self.track_status(status) |
|
|
|
|
target_status_id = replied_status.get("in_reply_to_id", None) |
|
|
|
|
if target_status_id: |
|
|
|
|
replied_id = status.get("in_reply_to_id", None) |
|
|
|
|
if replied_id: |
|
|
|
|
try: |
|
|
|
|
replied_status = self.api.status(replied_id) |
|
|
|
|
replied_tokens = self.h2t.handle(replied_status["content"]).split() |
|
|
|
|
|
|
|
|
|
if self.config["tag"] in replied_tokens: |
|
|
|
|
self.track_status(status) |
|
|
|
|
target_status_id = replied_status.get("in_reply_to_id", None) |
|
|
|
|
if target_status_id: |
|
|
|
|
try: |
|
|
|
|
target_status = self.api.status(target_status_id) |
|
|
|
|
target_timeslot_key = encode_time(target_status["created_at"]) |
|
|
|
|
target_mail_text = toot_dict_to_mail(target_status).format() |
|
|
|
|
|
|
|
|
|
command = self.h2t.handle(status["content"]).strip() |
|
|
|
|
tokens = deque(command.split()) |
|
|
|
|
self.log("Received command: {}".format(command)) |
|
|
|
|
try: |
|
|
|
|
target_status = self.api.status(target_status_id) |
|
|
|
|
target_timeslot_key = encode_time(target_status["created_at"]) |
|
|
|
|
target_mail_text = toot_dict_to_mail(target_status).format() |
|
|
|
|
|
|
|
|
|
command = self.h2t.handle(status["content"]).strip() |
|
|
|
|
tokens = deque(command.split()) |
|
|
|
|
self.log("Received command: {}".format(command)) |
|
|
|
|
try: |
|
|
|
|
while True: |
|
|
|
|
while True: |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "learn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "learn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM]) |
|
|
|
|
self.track_status(target_status) |
|
|
|
|
self.respond(status, "Learned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM]) |
|
|
|
|
self.expire_status(target_timeslot_key, target_status_id) |
|
|
|
|
self.respond(status, "Learned as based") |
|
|
|
|
break |
|
|
|
|
elif token == "unlearn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM]) |
|
|
|
|
self.expire_status(target_timeslot_key, target_status_id) |
|
|
|
|
self.respond(status, "Unlearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM]) |
|
|
|
|
self.expire_status(target_timeslot_key, target_status_id) |
|
|
|
|
self.respond(status, "Unlearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "relearn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM]) |
|
|
|
|
self.track_status(target_status) |
|
|
|
|
self.respond(status, "Relearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM]) |
|
|
|
|
self.expire_status(target_timeslot_key, target_status_id) |
|
|
|
|
self.respond(status, "Relearned as as based") |
|
|
|
|
break |
|
|
|
|
except IndexError: |
|
|
|
|
self.respond(status, "Invalid command") |
|
|
|
|
except MastodonNotFoundError: |
|
|
|
|
self.respond(status, "Original status is missing") |
|
|
|
|
else: |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM]) |
|
|
|
|
self.track_status(target_status) |
|
|
|
|
self.respond(status, "Learned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM]) |
|
|
|
|
self.expire_status(target_timeslot_key, target_status_id) |
|
|
|
|
self.respond(status, "Learned as based") |
|
|
|
|
break |
|
|
|
|
elif token == "unlearn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM]) |
|
|
|
|
self.expire_status(target_timeslot_key, target_status_id) |
|
|
|
|
self.respond(status, "Unlearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM]) |
|
|
|
|
self.expire_status(target_timeslot_key, target_status_id) |
|
|
|
|
self.respond(status, "Unlearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "relearn": |
|
|
|
|
token = tokens.popleft() |
|
|
|
|
if token == "cringe": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARN_SPAM]) |
|
|
|
|
self.track_status(target_status) |
|
|
|
|
self.respond(status, "Relearned as cringe") |
|
|
|
|
break |
|
|
|
|
elif token == "based": |
|
|
|
|
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARN_HAM]) |
|
|
|
|
self.expire_status(target_timeslot_key, target_status_id) |
|
|
|
|
self.respond(status, "Relearned as as based") |
|
|
|
|
break |
|
|
|
|
except IndexError: |
|
|
|
|
self.respond(status, "Invalid command") |
|
|
|
|
except MastodonNotFoundError: |
|
|
|
|
self.respond(status, "Original status is missing") |
|
|
|
|
return |
|
|
|
|
except MastodonNotFoundError: |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
result = bogofilter.run(mail_text, [bogofilter.CLASSIFY, bogofilter.REGISTER]) |
|
|
|
|
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score)) |
|
|
|
|
if result.category == bogofilter.SPAM: |
|
|
|
|
self.log("CRINGE: Tracking status with ID {} as cringe".format(status["id"])) |
|
|
|
|
self.respond(status, "Categorised as cringe\n{}".format(bogo_report)) |
|
|
|
|
self.track_status(status) |
|
|
|
|
elif result.category == bogofilter.UNSURE: |
|
|
|
|
self.log("UNSURE: Not tracking status with ID {} as cringe".format(status["id"])) |
|
|
|
|
self.respond(status, "Categorised as unsure\n{}".format(bogo_report)) |
|
|
|
|
else: |
|
|
|
|
self.log("BASED: Not tracking status with ID {} as cringe".format(status["id"])) |
|
|
|
|
self.respond(status, "Categorised as based\n{}".format(bogo_report)) |
|
|
|
|
|
|
|
|
|
self.log() |
|
|
|
|
self.log(preview_text) |
|
|
|
|
self.log() |
|
|
|
|
else: |
|
|
|
|
self.respond(status, "Original status is missing") |
|
|
|
|
return |
|
|
|
|
except MastodonNotFoundError: |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
result = bogofilter.run(mail_text, [bogofilter.CLASSIFY, bogofilter.REGISTER]) |
|
|
|
|
bogo_report = "Bogofilter: Category={}, Score={}".format(result.category, "{:.4f}".format(result.score)) |
|
|
|
|
if result.category == bogofilter.SPAM: |
|
|
|
|
self.log("CRINGE: Tracking status with ID {} as cringe".format(status["id"])) |
|
|
|
|
self.respond(status, "Categorised as cringe\n{}".format(bogo_report)) |
|
|
|
|
self.track_status(status) |
|
|
|
|
elif result.category == bogofilter.UNSURE: |
|
|
|
|
self.log("UNSURE: Not tracking status with ID {} as cringe".format(status["id"])) |
|
|
|
|
self.respond(status, "Categorised as unsure\n{}".format(bogo_report)) |
|
|
|
|
else: |
|
|
|
|
self.log("BASED: Not tracking status with ID {} as cringe".format(status["id"])) |
|
|
|
|
self.respond(status, "Categorised as based\n{}".format(bogo_report)) |
|
|
|
|
|
|
|
|
|
self.log() |
|
|
|
|
self.log(preview_text) |
|
|
|
|
self.log() |
|
|
|
|
|
|
|
|
|
def purger(self): |
|
|
|
|
while True: |
|
|
|
@ -326,8 +309,6 @@ def toot_dict_to_mail(toot_dict): |
|
|
|
|
|
|
|
|
|
return bogofilter.Mail(headers = headers, body = body) |
|
|
|
|
|
|
|
|
|
learning = "-l" in sys.argv[1:] |
|
|
|
|
|
|
|
|
|
with open("config.json") as json_file: |
|
|
|
|
bot = Bot(CringeBotClient, json.load(json_file)) |
|
|
|
|
bot.start() |
|
|
|
|