You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
109 lines
3.1 KiB
109 lines
3.1 KiB
import subprocess
|
|
from email.utils import format_datetime
|
|
import quopri
|
|
import os
|
|
import re
|
|
|
|
MAX_HAM = 1.0 / 4.0
|
|
MIN_SPAM = 3.0 / 4.0
|
|
MIN_DEV = 2 * (MIN_SPAM - MAX_HAM) * 0.375
|
|
ROBS = 0.0178
|
|
ROBX = (MAX_HAM + MIN_SPAM) / 2
|
|
LOAD_COMMAND = ["bogoutil", "-l"]
|
|
COMMAND = ["bogofilter", "-T", "-c", "/dev/null", "-o", "{},{}".format(MIN_SPAM, MAX_HAM), "-m", "{},{},{}".format(MIN_DEV, ROBS, ROBX)]
|
|
|
|
# Categories
|
|
SPAM = "S"
|
|
HAM = "H"
|
|
UNSURE = "U"
|
|
categories = [SPAM, HAM, UNSURE]
|
|
|
|
# Actions
|
|
CLASSIFY = 0
|
|
REGISTER = 1
|
|
LEARN_SPAM = 2
|
|
UNLEARN_SPAM = 3
|
|
LEARN_HAM = 4
|
|
UNLEARN_HAM = 5
|
|
|
|
ACTIONS = {
|
|
CLASSIFY: [],
|
|
REGISTER: ["-u"],
|
|
LEARN_SPAM: ["-s"],
|
|
UNLEARN_SPAM: ["-S"],
|
|
LEARN_HAM: ["-n"],
|
|
UNLEARN_HAM: ["-N"],
|
|
|
|
SPAM: ["-s"],
|
|
HAM: ["-n"],
|
|
UNSURE: []
|
|
}
|
|
|
|
class BogofilterResult:
|
|
def __init__(self, category, score):
|
|
self.category = category
|
|
self.score = score
|
|
|
|
class Mail:
|
|
def __init__(self, headers = {}, body = []):
|
|
self.headers = {**headers, "Content-Type": "text/html; charset=\"UTF-8\""}
|
|
self.change_body(body)
|
|
|
|
def get_body(self):
|
|
return "\n".join(self.body)
|
|
|
|
def change_body(self, body):
|
|
if isinstance(body, str):
|
|
self.body = body.strip().split("\n")
|
|
else:
|
|
self.body = list(body)
|
|
|
|
def format(self, mbox = False):
|
|
lines = []
|
|
|
|
if mbox:
|
|
lines.append("From {} {}\n".format(self.headers["From"], format_datetime(datetime.now(timezone.utc), usegmt = True)))
|
|
|
|
for key, value in self.headers.items():
|
|
if key == "Subject":
|
|
value = "=?utf-8?Q?{}?=".format(quopri.encodestring(bytes(value, "utf-8"), header = True).decode("utf-8"))
|
|
lines.append("{}: {}".format(key, value))
|
|
lines.append("")
|
|
|
|
if mbox:
|
|
lines.extend([">" + line if re.match(r">*From ", line) else line for line in self.body])
|
|
else:
|
|
lines.extend(self.body)
|
|
|
|
return "\n".join(lines)
|
|
|
|
def deliver_to_mbox(path, mode = "a+"):
|
|
empty = not os.path.exists(path) or os.path.getsize(path) == 0
|
|
with open(path, mode, encoding = "utf-8") as mbox_file:
|
|
if not empty:
|
|
mbox_file.write("\n")
|
|
mbox_file.write(self.format(mbox = True))
|
|
|
|
def run(text, db_dir, actions = [CLASSIFY], category = UNSURE):
|
|
args = []
|
|
for action in actions:
|
|
args.extend(ACTIONS[action])
|
|
|
|
if not os.path.exists(os.path.join(db_dir, "wordlist.db")):
|
|
os.makedirs(db_dir, exist_ok = True)
|
|
subprocess.run(LOAD_COMMAND + [db_dir], input = b'')
|
|
|
|
cp = subprocess.run(COMMAND + ["-d", db_dir] + args, capture_output = True, encoding = "utf-8", input = text)
|
|
arr = cp.stdout.strip().split(" ")
|
|
if len(arr) == 2:
|
|
(category, score) = arr
|
|
return BogofilterResult(category, float(score))
|
|
else:
|
|
if cp.returncode == 3:
|
|
print("Bogofilter error:")
|
|
if cp.stdout.strip():
|
|
print(cp.stderr.strip())
|
|
return None
|
|
|
|
#mail = Mail({"From": "thor"}, "Hello, World\nHow are you?")
|
|
#print(mail.format())
|
|
|