master
Thor 3 years ago
parent 3eb003833c
commit 842aae3db1
  1. 41
      bogofilter.py
  2. 67
      cringefilter.py

@ -6,9 +6,31 @@ import quopri
BOGOFILTER_DB_DIR = "."
BOGOFILTER_COMMAND = ["bogofilter", "-T", "-d", BOGOFILTER_DB_DIR]
# Categories
SPAM = "S"
HAM = "H"
UNSURE = "U"
categories = [SPAM, HAM, UNSURE]
# Actions
CLASSIFY = []
REGISTER = ["-u"]
LEARN_SPAM = ["-s"]
UNLEARN_SPAM = ["-S"]
LEARN_HAM = ["-n"]
UNLEARN_HAM = ["-N"]
ACTIONS = {
REGISTER: ["-u"],
LEARN_SPAM: ["-s"],
UNLEARN_SPAM: ["-S"],
LEARN_HAM: ["-n"],
UNLEARN_HAM: ["-N"],
SPAM: ["-s"],
HAM: ["-n"],
UNSURE: []
}
class BogofilterResult:
def __init__(self, category, score):
@ -42,20 +64,11 @@ class Mail:
return text
# If run with category == UNSURE, the message is classified
# If run with category == HAM | SPAM, the message is learned
# If learn = True, messages are learned as they are categorised
def run(text, category = UNSURE, learn = False):
if category == SPAM:
args = ["-s"]
elif category == HAM:
args = ["-n"]
else:
args = []
if learn:
args.append("-u")
def run(text, actions = [CLASSIFY], category = UNSURE):
args = []
for action in actions:
args.extend(ACTIONS[action])
cp = subprocess.run(BOGOFILTER_COMMAND + args, capture_output = True, encoding = "utf-8", input = text)
arr = cp.stdout.strip().split(" ")
if len(arr) == 2:

@ -108,7 +108,6 @@ class Instance:
while True:
try:
self.state_lock.acquire()
self.state_lock.release()
@ -144,21 +143,73 @@ class Instance:
print()
category = None
while not category in ["H", "S", "U"]:
while not category in bogofilter.categories:
category = input("H(am), S(pam) or U(nknown)? ").upper()
if category != "U":
bogofilter.run(mail_text, category)
if category != bogofilter.UNSURE:
bogofilter.run(mail_text, [category])
if category == "S":
if category == bogofilter.SPAM:
self.track_status(status)
print()
self.save_state()
else:
result = bogofilter.run(mail_text, learn = True)
if result.category == "S":
replied_id = status.get("in_reply_to_id", None)
log_pprint(self.name, status)
if replied_id:
try:
replied_status = self.api.status(replied_id)
replied_md = h2t.handle(replied_status["content"])
if "#cringefilter" in replied_md:
target_status_id = status.get("in_reply_to_id", None)
if target_status_id:
try:
target_status = self.api.status(target_status_id)
target_timeslot_key = encode_time(status["created_at"])
target_mail_text = toot_dict_to_mail(target_status).format()
command = h2t.handle(status["content"])
if "learn spam" in command:
bogofilter.run(target_mail_text, [bogofilter.LEARN_SPAM])
self.track_status(target_status)
self.api.status_reply(status, "@{} Learned as spam\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
elif "unlearn spam" in command:
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM])
self.expire_status(target_timeslot_key, target_status_id)
self.api.status_reply(status, "@{} Unlearned as spam\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
elif "relearn spam" in command:
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM, bogofilter.LEARM_SPAM])
self.track_status(target_status)
self.api.status_reply(status, "@{} Relearned as spam\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
if "learn ham" in command:
bogofilter.run(target_mail_text, [bogofilter.LEARN_HAM])
self.expire_status(target_timeslot_key, target_status_id)
self.api.status_reply(status, "@{} Learned as ham\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
elif "unlearn ham" in command:
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_HAM])
self.api.status_reply(status, "@{} Unlearned as ham\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
elif "relearn ham" in command:
bogofilter.run(target_mail_text, [bogofilter.UNLEARN_SPAM, bogofilter.LEARM_HAM])
self.expire_status(target_timeslot_key, target_status_id)
self.api.status_reply(status, "@{} Relearned as ham\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
else:
self.api.status_reply(status, "@{} Unknown command\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
continue
except MastodonNotFoundError:
self.api.status_reply(status, "@{} Original status is missing\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
continue
else:
self.api.status_reply(status, "@{} Original status not referenced\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
continue
except MastodonNotFoundError:
log_print(self.name,
"Cannot find replied-to status {} on server".format(status_id))
result = bogofilter.run(mail_text, [bogofilter.CLASSIFY, bogofilter.REGISTER])
if result.category == bogofilter.SPAM:
log_print(self.name, "SPAM: Tracking status with ID {} as spam".format(status["id"]))
self.api.status_reply(status, "@{} Tracked as spam\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
time.sleep(1)
@ -183,7 +234,7 @@ class Instance:
except:
log_print(self.name, traceback.format_exc())
time.sleep(60)
time.sleep(5)
def purger(self):
while True:

Loading…
Cancel
Save