import subprocess from email.utils import format_datetime import quopri import os import re MAX_HAM = 1.0 / 4.0 MIN_SPAM = 3.0 / 4.0 MIN_DEV = 2 * (MIN_SPAM - MAX_HAM) * 0.375 ROBS = 0.0178 ROBX = (MAX_HAM + MIN_SPAM) / 2 LOAD_COMMAND = ["bogoutil", "-l"] COMMAND = ["bogofilter", "-T", "-c", "/dev/null", "-o", "{},{}".format(MIN_SPAM, MAX_HAM), "-m", "{},{},{}".format(MIN_DEV, ROBS, ROBX)] # Categories SPAM = "S" HAM = "H" UNSURE = "U" categories = [SPAM, HAM, UNSURE] # Actions CLASSIFY = 0 REGISTER = 1 LEARN_SPAM = 2 UNLEARN_SPAM = 3 LEARN_HAM = 4 UNLEARN_HAM = 5 ACTIONS = { CLASSIFY: [], REGISTER: ["-u"], LEARN_SPAM: ["-s"], UNLEARN_SPAM: ["-S"], LEARN_HAM: ["-n"], UNLEARN_HAM: ["-N"], SPAM: ["-s"], HAM: ["-n"], UNSURE: [] } class BogofilterResult: def __init__(self, category, score): self.category = category self.score = score class Mail: def __init__(self, headers = {}, body = []): self.headers = {**headers, "Content-Type": "text/html; charset=\"UTF-8\""} self.change_body(body) def get_body(self): return "\n".join(self.body) def change_body(self, body): if isinstance(body, str): self.body = body.strip().split("\n") else: self.body = list(body) def format(self, mbox = False): lines = [] if mbox: lines.append("From {} {}\n".format(self.headers["From"], format_datetime(datetime.now(timezone.utc), usegmt = True))) for key, value in self.headers.items(): if key == "Subject": value = "=?utf-8?Q?{}?=".format(quopri.encodestring(bytes(value, "utf-8"), header = True).decode("utf-8")) lines.append("{}: {}".format(key, value)) lines.append("") if mbox: lines.extend([">" + line if re.match(r">*From ", line) else line for line in self.body]) else: lines.extend(self.body) return "\n".join(lines) def deliver_to_mbox(path, mode = "a+"): empty = not os.path.exists(path) or os.path.getsize(path) == 0 with open(path, mode, encoding = "utf-8") as mbox_file: if not empty: mbox_file.write("\n") mbox_file.write(self.format(mbox = True)) def run(text, db_dir, actions = [CLASSIFY], category = UNSURE): args = [] for action in actions: args.extend(ACTIONS[action]) if not os.path.exists(os.path.join(db_dir, "wordlist.db")): os.makedirs(db_dir, exist_ok = True) subprocess.run(LOAD_COMMAND + [db_dir], input = b'') cp = subprocess.run(COMMAND + ["-d", db_dir] + args, capture_output = True, encoding = "utf-8", input = text) arr = cp.stdout.strip().split(" ") if len(arr) == 2: (category, score) = arr return BogofilterResult(category, float(score)) else: if cp.returncode == 3: print("Bogofilter error:") if cp.stdout.strip(): print(cp.stderr.strip()) return None #mail = Mail({"From": "thor"}, "Hello, World\nHow are you?") #print(mail.format())