|
|
|
import os
|
|
|
|
import sys
|
|
|
|
import time
|
|
|
|
import toml
|
|
|
|
import random
|
|
|
|
import re
|
|
|
|
import sched
|
|
|
|
import math
|
|
|
|
import string
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
|
|
|
|
from mastodon import Mastodon, MastodonNotFoundError
|
|
|
|
from fedbot.bot import Bot, BotClient
|
|
|
|
|
|
|
|
POST_INTERVAL = timedelta(seconds = 15)
|
|
|
|
TEST = "test" in sys.argv[1:]
|
|
|
|
PORT_PCT = 67
|
|
|
|
|
|
|
|
def next_dt():
|
|
|
|
dt = datetime.now(timezone.utc)
|
|
|
|
dt -= timedelta(hours = 0,
|
|
|
|
minutes = (dt.minute % 15) - 15,
|
|
|
|
seconds = dt.second,
|
|
|
|
microseconds = dt.microsecond)
|
|
|
|
return dt
|
|
|
|
|
|
|
|
|
|
|
|
config_path = os.path.join(os.path.dirname(sys.argv[0]), "config.toml")
|
|
|
|
loaded_config = {
|
|
|
|
"name": "portmanteaubot",
|
|
|
|
**toml.load(config_path)}
|
|
|
|
|
|
|
|
SUFFIXES = [
|
|
|
|
'ly$',
|
|
|
|
'ing$',
|
|
|
|
'[bdklmptw]?est$',
|
|
|
|
'[^ious]s$',
|
|
|
|
'ted$',
|
|
|
|
'[ei]ty$']
|
|
|
|
|
|
|
|
def is_suffixed(word):
|
|
|
|
return any(re.fullmatch(suf, word) for suf in SUFFIXES)
|
|
|
|
|
|
|
|
def overlap_words(left_word, right_word):
|
|
|
|
if left_word == right_word:
|
|
|
|
return None
|
|
|
|
|
|
|
|
offset = 2
|
|
|
|
attempts = []
|
|
|
|
while offset + 2 <= len(left_word):
|
|
|
|
if right_word.lower().startswith(left_word.lower()[offset : offset + 2]):
|
|
|
|
attempts.append(left_word[:offset] + right_word)
|
|
|
|
#break
|
|
|
|
offset += 1
|
|
|
|
|
|
|
|
offset = len(right_word) - 2
|
|
|
|
while offset >= 0:
|
|
|
|
if left_word.lower().endswith(right_word.lower()[offset : offset + 2]):
|
|
|
|
attempts.append(left_word + right_word[offset + 2:])
|
|
|
|
#break
|
|
|
|
offset -= 1
|
|
|
|
|
|
|
|
attempts = sorted(attempts, key = lambda w: len(w), reverse = True)
|
|
|
|
|
|
|
|
if len(attempts) == 0:
|
|
|
|
return None
|
|
|
|
|
|
|
|
return pick_one_word(attempts)
|
|
|
|
|
|
|
|
def word_weight(index, length, power = 2):
|
|
|
|
a = pow((index + 1) / length, 2)
|
|
|
|
return int(100000 * a)
|
|
|
|
|
|
|
|
def pick_one_word(words, power = 2, max_len = 12):
|
|
|
|
words = list(filter(lambda w: len(w) <= max_len, words))
|
|
|
|
|
|
|
|
if len(words) == 0:
|
|
|
|
return None
|
|
|
|
|
|
|
|
weights = [word_weight(i, len(words), power = power) for i in range(0, len(words))]
|
|
|
|
return random.choices(words, weights = weights)[0]
|
|
|
|
|
|
|
|
class WordMaker:
|
|
|
|
def __init__(self):
|
|
|
|
print("Loading dictionaries")
|
|
|
|
illegal = set(ch for ch in (string.ascii_uppercase + string.punctuation + string.digits + string.whitespace))
|
|
|
|
with open ("mhyph.txt", "r", encoding = "mac-roman") as f:
|
|
|
|
lines = [l.strip() for l in f.readlines()]
|
|
|
|
lines = filter(lambda w: len(w) > 0 and not any(ch in illegal for ch in w), lines)
|
|
|
|
words = [l.replace("•", "") for l in lines]
|
|
|
|
self.all_words = words
|
|
|
|
words = list(set(sorted(words, key = lambda w: len(w), reverse = True)))
|
|
|
|
self.first_words = list(filter(lambda w: not is_suffixed(w), words))
|
|
|
|
self.next_words = words
|
|
|
|
|
|
|
|
with open("porthyph.txt", "r") as f:
|
|
|
|
lines = [line.strip() for line in f.readlines()]
|
|
|
|
words = list(filter(lambda l: len(l) > 0, lines))
|
|
|
|
self.all_words = list(set(sorted([w.lower() for w in [*self.all_words, *words]], key = lambda w: len(w), reverse = True)))
|
|
|
|
self.port_words = list(set(sorted(words, key = lambda w: len(w), reverse = True)))
|
|
|
|
|
|
|
|
def extend_word2(self, prev_word):
|
|
|
|
port_dict = random.randint(0, 100) < PORT_PCT
|
|
|
|
if port_dict:
|
|
|
|
next_dict = self.port_words
|
|
|
|
else:
|
|
|
|
next_dict = self.next_words
|
|
|
|
|
|
|
|
new_words = [overlap_words(prev_word, w) for w in next_dict if overlap_words(prev_word, w)]
|
|
|
|
|
|
|
|
while len(new_words) > 0:
|
|
|
|
new_word = pick_one_word(new_words, power = 2 if port_dict else 4)
|
|
|
|
if not new_word:
|
|
|
|
return None
|
|
|
|
new_words.remove(new_word)
|
|
|
|
|
|
|
|
if new_word.lower() not in self.all_words:
|
|
|
|
return new_word
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
def get_portmanteau(self):
|
|
|
|
target_times = 1
|
|
|
|
port_dict = random.randint(0, 100) < PORT_PCT
|
|
|
|
if port_dict:
|
|
|
|
words = self.port_words
|
|
|
|
else:
|
|
|
|
words = self.first_words
|
|
|
|
|
|
|
|
while True:
|
|
|
|
while True:
|
|
|
|
word = pick_one_word(words, power = 2 if port_dict else 4)
|
|
|
|
|
|
|
|
times = target_times
|
|
|
|
while times > 0:
|
|
|
|
ext_word = self.extend_word2(word)
|
|
|
|
if ext_word is None:
|
|
|
|
break
|
|
|
|
|
|
|
|
word = ext_word
|
|
|
|
times -= 1
|
|
|
|
|
|
|
|
if times == 0:
|
|
|
|
break
|
|
|
|
|
|
|
|
if len(word) < 15:
|
|
|
|
break
|
|
|
|
|
|
|
|
word = word.lower()
|
|
|
|
|
|
|
|
print(word)
|
|
|
|
|
|
|
|
return word
|
|
|
|
|
|
|
|
def get_portmanteaus(self, count = 10):
|
|
|
|
return [self.get_portmanteau() for x in range(0, count)]
|
|
|
|
|
|
|
|
class PortBotClient(BotClient):
|
|
|
|
def __init__(self, bot, config):
|
|
|
|
config = {
|
|
|
|
"app_name": "PortmanteuBot",
|
|
|
|
"rate_limit": 3,
|
|
|
|
"retry_rate": 60,
|
|
|
|
"poll_interval": 15,
|
|
|
|
**config}
|
|
|
|
|
|
|
|
super().__init__(bot, config)
|
|
|
|
|
|
|
|
self.my_id = None
|
|
|
|
|
|
|
|
def on_start(self):
|
|
|
|
self.log("Starting")
|
|
|
|
self.my_id = self.api.me()["id"]
|
|
|
|
pass
|
|
|
|
|
|
|
|
def on_poll(self):
|
|
|
|
pass
|
|
|
|
|
|
|
|
def on_status(self, status):
|
|
|
|
if status["account"]["id"] != self.my_id:
|
|
|
|
return
|
|
|
|
|
|
|
|
#if status["created_at"] < datetime.now(timezone.utc) - timedelta(hours = 24) and status["reblogs_count"] == 0 and status["favourites_count"] == 0:
|
|
|
|
# try:
|
|
|
|
# print("Deleting", status["created_at"], status["content"])
|
|
|
|
# self.api.status_delete(status["id"])
|
|
|
|
# time.sleep(2)
|
|
|
|
# except MastodonNotFoundError:
|
|
|
|
# pass
|
|
|
|
|
|
|
|
def post():
|
|
|
|
for client_name, client in bot.clients.items():
|
|
|
|
words = wm.get_portmanteaus(3)
|
|
|
|
print()
|
|
|
|
if random.randint(0, 100) <= 100:
|
|
|
|
visibility = "public"
|
|
|
|
else:
|
|
|
|
visibility = "unlisted"
|
|
|
|
|
|
|
|
dt = next_dt()
|
|
|
|
|
|
|
|
if not TEST:
|
|
|
|
client.api.status_post("\n".join(words), visibility = visibility)
|
|
|
|
print("Scheduling at", dt)
|
|
|
|
|
|
|
|
if TEST:
|
|
|
|
scheduler.enter(1, 1, post)
|
|
|
|
else:
|
|
|
|
scheduler.enterabs(dt.timestamp(), 1, post)
|
|
|
|
|
|
|
|
wm = WordMaker()
|
|
|
|
scheduler = sched.scheduler(timefunc = time.time, delayfunc = time.sleep)
|
|
|
|
|
|
|
|
bot = Bot(PortBotClient, loaded_config)
|
|
|
|
del loaded_config
|
|
|
|
bot.start()
|
|
|
|
|
|
|
|
print("Running")
|
|
|
|
|
|
|
|
dt = next_dt()
|
|
|
|
if TEST:
|
|
|
|
scheduler.enter(1, 1, post)
|
|
|
|
else:
|
|
|
|
print("Scheduling at", dt)
|
|
|
|
scheduler.enterabs(dt.timestamp(), 1, post)
|
|
|
|
scheduler.run()
|