Initial commit

master
Thor 3 years ago
commit 3eb003833c
  1. 5
      .gitignore
  2. 46
      README.md
  3. 68
      bogofilter.py
  4. 6
      config.json.example
  5. 387
      cringefilter.py
  6. 13
      cringefilter.service
  7. 2
      secret/.gitignore
  8. 2
      state/.gitignore

5
.gitignore vendored

@ -0,0 +1,5 @@
config.json
wordlist.db
__pycache__
.*
!.gitignore

@ -0,0 +1,46 @@
# cringefilter
`cringefilter` is a bot that tracks and auto-deletes on Mastodon/Pleroma accounts according to a set maximum age, if they are bad enough. Tracking multiple instances simultaneously is possible. Once a status is flagged for retention, the bot will not check it again.
## Installation (Linux)
#### Obtain root login shell
```
sudo -i
```
#### Download cringefilter and make user account
```
git clone https://git.thj.no/thor/cringefilter.git /usr/local/lib/cringefilter
useradd -d /usr/local/lib/cringefilter -M -s /bin/bash -U cringefilter
chown -R cringefilter.cringefilter /usr/local/lib/cringefilter
```
#### Create configuration
```
cd /usr/local/lib/cringefilter
cp config.example.json config.json
editor config.json
```
#### Install systemd service
```
ln -s cringefilter.service /etc/systemd/system/cringefilter.service
systemctl enable cringefilter
systemctl start cringefilter
```
#### Management
```
# Restart service
systemctl restart cringefilter
# Check service status
systemctl status cringefilter
# Monitor log output
journalctl -t cringefilter -f
```
## Maintainer
The maintainer can be contacted at `@thor@pl.thj.no`.

@ -0,0 +1,68 @@
import subprocess
from email.utils import format_datetime
from datetime import datetime
import quopri
BOGOFILTER_DB_DIR = "."
BOGOFILTER_COMMAND = ["bogofilter", "-T", "-d", BOGOFILTER_DB_DIR]
SPAM = "S"
HAM = "H"
UNSURE = "U"
class BogofilterResult:
def __init__(self, category, score):
self.category = category
self.score = score
class Mail:
def __init__(self, headers = {}, body = None):
self.headers = {
"Date": datetime.now(),
"Content-Type": "text/html; charset=\"UTF-8\""} | headers
self.body = body
def format(self):
text = str()
for key, value in self.headers.items():
if key == "Subject":
value = "=?utf-8?Q?{}?=".format(quopri.encodestring(bytes(value, "utf-8"), header = True).decode("utf-8"))
if key == "Date":
value = format_datetime(value)
text += "{key}: {value}\n".format(key = key, value = value)
text += "\n"
if self.body:
text += self.body
text += "\n"
return text
# If run with category == UNSURE, the message is classified
# If run with category == HAM | SPAM, the message is learned
# If learn = True, messages are learned as they are categorised
def run(text, category = UNSURE, learn = False):
if category == SPAM:
args = ["-s"]
elif category == HAM:
args = ["-n"]
else:
args = []
if learn:
args.append("-u")
cp = subprocess.run(BOGOFILTER_COMMAND + args, capture_output = True, encoding = "utf-8", input = text)
arr = cp.stdout.strip().split(" ")
if len(arr) == 2:
(category, score) = arr
return BogofilterResult(category, score)
else:
if cp.stderr:
print("Bogofilter:")
print(cp.stderr.strip())
return None

@ -0,0 +1,6 @@
{
"max_age": 90,
"instances": [
"mastodon.social"
]
}

@ -0,0 +1,387 @@
import os
import sys
import time
from datetime import datetime, timezone, timedelta
import json
import pprint
import threading
import traceback
import bogofilter
import html2text
from mastodon import Mastodon, MastodonNotFoundError
def log_print(source, text = ""):
prefix = "{}: ".format(source)
text = (prefix + text.strip()).replace("\n", "\n" + prefix)
print(text)
def log_pprint(source, obj):
log_print(source, pprint.pformat(obj))
def encode_time(dt):
return int(dt.strftime("%Y%m%d%H%M"))
def decode_time(value):
if len(value) == 12:
return dt.strptime(str(value), "%Y%m%d%H%M")
else:
return dt.strptime(str(value), "%Y%m%d%H")
class Instance:
def __init__(self, name, config):
self.name = name
self.config = config
self.base_url = "https://{}".format(name)
self.client_file = "secret/{}.client".format(name)
self.user_file = "secret/{}.user".format(name)
self.state_file = "state/{}.state".format(name)
self.state_lock = threading.Lock()
self.spawner_thread = threading.Thread(
target = self.spawner,
name = self.name + " spawner",
args = (),
kwargs = {},
daemon = True)
self.tracker_thread = threading.Thread(
target = self.tracker,
name = self.name + " tracker",
args = (),
kwargs = {},
daemon = True)
self.purger_thread = threading.Thread(
target = self.purger,
name = self.name + " purger",
args = (),
kwargs = {},
daemon = True)
def setup(self):
if not os.path.exists(self.client_file):
Mastodon.create_app(
'MastodonDeleter',
api_base_url = self.base_url,
to_file = self.client_file)
if not os.path.exists(self.user_file):
api = Mastodon(
api_base_url = self.base_url,
client_id = self.client_file)
auth_url = api.auth_request_url()
print("Go to:")
print(auth_url)
print()
auth_code = input("Enter code: ")
print()
api.log_in(code = auth_code, to_file = self.user_file)
def start(self):
self.spawner_thread.start()
def spawner(self):
self.load_state()
self.api = Mastodon(
access_token = self.user_file,
api_base_url = self.base_url)
self.tracker_thread.start()
if not learning:
self.purger_thread.start()
while True:
self.tracker_report()
time.sleep(60)
def tracker(self):
my_id = self.api.me()["id"]
while True:
try:
self.state_lock.acquire()
self.state_lock.release()
statuses = self.api.account_statuses(my_id, min_id = self.state["min_id"])
h2t = html2text.HTML2Text()
h2t.ignore_links = True
while not statuses is None and len(statuses) > 0:
log_print(self.name, "Found {} new status(es)".format(len(statuses)))
for status in sorted(statuses,
key = lambda status: status["created_at"]):
self.state["min_id"] = status["id"]
if status["reblog"]:
continue
md_text = h2t.handle(status["content"])
if "#cringefilter" in md_text:
continue
mail_text = toot_dict_to_mail(status).format()
if learning:
preview = toot_dict_to_mail(status)
preview.body = md_text
preview_text = preview.format()
print(preview_text)
print()
category = None
while not category in ["H", "S", "U"]:
category = input("H(am), S(pam) or U(nknown)? ").upper()
if category != "U":
bogofilter.run(mail_text, category)
if category == "S":
self.track_status(status)
print()
self.save_state()
else:
result = bogofilter.run(mail_text, learn = True)
if result.category == "S":
log_print(self.name, "SPAM: Tracking status with ID {} as spam".format(status["id"]))
self.api.status_reply(status, "@{} Tracked as spam\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
time.sleep(1)
else:
log_print(self.name, "HAM: Not tracking status with ID {} as spam".format(status["id"]))
self.api.status_reply(status, "@{} Tracked as ham\n#cringefilter".format(self.api.me()["username"]), visibility = "direct", untag = True)
time.sleep(1)
log_print(self.name, "Bogofilter: Category={}, Score={}".format(result.category, result.score))
print()
print(mail_text)
self.track_status(status)
self.save_state()
statuses = self.api.fetch_previous(statuses)
# Rate limit (max 300 requests per 5 minutes, i.e. 1 per second)
time.sleep(1)
except:
log_print(self.name, traceback.format_exc())
time.sleep(60)
def purger(self):
while True:
try:
deleted = False
timeslot_key, status_id = self.next_expired()
if not timeslot_key is None:
try:
log_print(self.name, "Deleting status {} in timeslot {}".format(status_id, timeslot_key))
self.api.status_delete(status_id)
deleted = True
except MastodonNotFoundError:
log_print(self.name,
"Cannot find status {} on server".format(status_id))
self.expire_status(timeslot_key, status_id)
if deleted:
time.sleep(60)
else:
time.sleep(1)
except:
log_print(self.name, traceback.format_exc())
time.sleep(60)
def load_state(self):
self.state_lock.acquire()
if not os.path.exists(self.state_file):
self.state = dict(
min_id = "0",
timeslots = {})
else:
with open(self.state_file) as json_file:
self.state = json.load(json_file)
self.state["timeslots"] = dict(map(lambda kv: (int(kv[0]), set(kv[1])), self.state["timeslots"]))
self.state_lock.release()
def save_state(self):
self.state_lock.acquire()
json_state = self.state.copy()
json_state["timeslots"] = list(map(lambda kv: [kv[0], list(kv[1])], json_state["timeslots"].items()))
self.state_lock.release()
with open(self.state_file, "w") as json_file:
json.dump(json_state, json_file, indent = 4)
def tracker_report(self):
self.state_lock.acquire()
total_timeslots = len(self.state["timeslots"])
total_statuses = 0
for timeslot_key, status_ids in self.state["timeslots"].items():
total_statuses += len(status_ids)
self.state_lock.release()
log_print(self.name, "Tracking {} statuses across {} timeslots".format(
total_statuses, total_timeslots))
def track_status(self, status):
status_id = str(status["id"])
timeslot_key = encode_time(status["created_at"])
self.state_lock.acquire()
if status["reblog"] is None:
timeslots = self.state["timeslots"]
if not timeslot_key in timeslots:
timeslots[timeslot_key] = set()
timeslots[timeslot_key].add(status_id)
self.state_lock.release()
def next_expired(self):
now = datetime.now(timezone.utc)
min_timeslot_key = encode_time(now - timedelta(minutes = config["max_age"]))
self.state_lock.acquire()
timeslot_key, status_ids = next(iter(self.state["timeslots"].items()), (None, None))
if not timeslot_key is None and timeslot_key < min_timeslot_key:
status_id = next(iter(status_ids), None)
else:
timeslot_key = None
status_id = None
self.state_lock.release()
return (timeslot_key, status_id)
def expire_status(self, timeslot_key, status_id):
self.state_lock.acquire()
timeslots = self.state["timeslots"]
if timeslot_key in timeslots:
if status_id in timeslots[timeslot_key]:
log_print(self.name, "Expiring status {} from timeslot {}".format(status_id, timeslot_key))
timeslots[timeslot_key].remove(status_id)
else:
log_print(self.name, "Cannot expire missing status {} from timeslot {}".format(
status_id, timeslot_key))
if len(timeslots[timeslot_key]) == 0:
log_print(self.name, "Removing empty timeslot {}".format(timeslot_key))
del timeslots[timeslot_key]
else:
log_print(self.name, "Cannot expire status {} from missing timeslot {}".format(
status_id, timeslot_key))
self.state_lock.release()
self.save_state()
def toot_dict_to_mail(toot_dict):
#log_pprint("toot_dict_to_mail", toot_dict)
flags = []
if toot_dict.get("sensitive", False):
flags.append("sensitive")
if toot_dict.get("poll", False):
flags.append("poll")
if toot_dict.get("reblog", False):
flags.append("reblog")
if toot_dict.get("reblogged", False):
flags.append("reblogged")
#if toot_dict.get("favourited", False):
# flags.append("favourited")
if toot_dict.get("bookmarked", False):
flags.append("bookmarked")
if toot_dict.get("pinned", False):
flags.append("pinned")
flags = ", ".join(flags)
headers = {}
if toot_dict.get("account") and toot_dict["account"].get("acct"):
headers["From"] = toot_dict["account"]["acct"]
if toot_dict.get("created_at"):
headers["Date"] = toot_dict["created_at"]
if toot_dict.get("visibility"):
headers["X-Visibility"] = toot_dict["visibility"]
if len(flags) > 0:
headers["X-Flags"] = flags
if toot_dict.get("spoiler_text"):
headers["Subject"] = toot_dict["spoiler_text"]
if toot_dict.get("replies_count", 0) > 0:
headers["X-Replies-Count"] = toot_dict["replies_count"]
if len(toot_dict.get("media_attachments", [])) > 0:
headers["X-Attachments-Count"] = len(toot_dict["media_attachments"])
if toot_dict.get("reblogs_count", 0) > 0:
headers["X-Reblogs-Count"] = toot_dict["reblogs_count"]
if toot_dict.get("favourites_count", 0) > 0:
headers["X-Favourites-Count"] = toot_dict["favourites_count"]
if toot_dict.get("content") and len(toot_dict["content"]) > 0:
body = toot_dict["content"]
else:
body = None
return bogofilter.Mail(headers = headers, body = body)
learning = "-l" in sys.argv[1:]
with open("config.json") as json_file:
config = json.load(json_file)
instances = {}
for name in config["instances"]:
instances[name] = Instance(name = name, config = config)
instances[name].setup()
start_interval = 60.0 / len(config["instances"])
for instance in instances.values():
instance.start()
time.sleep(start_interval)
while True:
time.sleep(1)

@ -0,0 +1,13 @@
[Unit]
Description=Fediverse Cringe Filter
[Service]
User=cringefilter
WorkingDirectory=/usr/local/lib/cringefilter
ExecStart=/usr/bin/python3 /usr/local/lib/cringefilter/cringefilter.py
Environment=PYTHONUNBUFFERED=1
SyslogIdentifier=cringefilter
Restart=always
[Install]
WantedBy=multi-user.target

2
secret/.gitignore vendored

@ -0,0 +1,2 @@
*
!.gitignore

2
state/.gitignore vendored

@ -0,0 +1,2 @@
*
!.gitignore
Loading…
Cancel
Save