From 7506c9395c2caeafdf0c986c30fd85db35f594e6 Mon Sep 17 00:00:00 2001 From: Thor Harald Johansen Date: Tue, 26 May 2020 17:40:50 +0200 Subject: [PATCH] Initial commit --- .gitignore | 1 + bin/zasctl | 126 +++++++++++++ log/.empty | 0 src/.gitignore | 2 + src/__init__.py | 0 src/config.py | 72 ++++++++ src/zasd.config.example.py | 69 +++++++ src/zasd.py | 357 +++++++++++++++++++++++++++++++++++++ src/zasd/__init__.py | 0 src/zasd/apscheduler.py | 236 ++++++++++++++++++++++++ src/zasd/asyncio.py | 30 ++++ src/zasd/config.py | 171 ++++++++++++++++++ src/zasd/logging.py | 32 ++++ src/zasd/util.py | 25 +++ 14 files changed, 1121 insertions(+) create mode 100644 .gitignore create mode 100755 bin/zasctl create mode 100644 log/.empty create mode 100644 src/.gitignore create mode 100644 src/__init__.py create mode 100644 src/config.py create mode 100644 src/zasd.config.example.py create mode 100644 src/zasd.py create mode 100644 src/zasd/__init__.py create mode 100644 src/zasd/apscheduler.py create mode 100644 src/zasd/asyncio.py create mode 100644 src/zasd/config.py create mode 100644 src/zasd/logging.py create mode 100644 src/zasd/util.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1377554 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.swp diff --git a/bin/zasctl b/bin/zasctl new file mode 100755 index 0000000..e340745 --- /dev/null +++ b/bin/zasctl @@ -0,0 +1,126 @@ +#!/usr/local/bin/bash + +install() { + uninstall + + cat >"$launchd_file" << EOF + + + + + Label + $launchd_name + ProgramArguments + + `which python3` + $zasd_root/src/zasd.py + + KeepAlive + <$keep_alive/> + StandardOutPath + $log_root/zasd.log + StandardErrorPath + $log_root/zasd.err + + +EOF + + load +} + +uninstall() { + if is_installed; then + unload + fi + + rm -f "$launchd_file" +} + +load() { + launchctl load -w "$launchd_file" +} + +unload() { + launchctl unload -w "$launchd_file" +} + +start() { + launchctl kickstart -k $domain/$launchd_name +} + +stop() { + launchctl stop $launchd_name +} + +is_installed() { + launchctl list | fgrep -q $launchd_name +} + +initialise() { + if [ $UID == 0 ]; then + launchd_file="/Library/LaunchDaemons/$launchd_name.plist" + domain=system + log_root="/var/log" + else + launchd_file="$HOME/Library/LaunchAgents/$launchd_name.plist" + domain=gui/$UID + log_root="$zasd_root/log" + fi +} + +zasd_root="$(dirname "$(greadlink -f "$0")")/.." + +launchd_name=no.thj.zasd + +keep_alive=true + +# Parse arguments +while (($#)); do + case $1 in + -K|--keep-alive) + keep_alive=true + ;; + + -k|--no-keep-alive) + keep_alive=false + ;; + + *) + command=$1 + ;; + esac + + shift +done + +initialise + +case $command in + install) + install + ;; + + uninstall) + uninstall + ;; + + load) + load + ;; + + unload) + unload + ;; + + start|restart) + start + ;; + + stop) + stop + ;; + + *) + echo "./zasctl [options] " + ;; +esac diff --git a/log/.empty b/log/.empty new file mode 100644 index 0000000..e69de29 diff --git a/src/.gitignore b/src/.gitignore new file mode 100644 index 0000000..8ea5427 --- /dev/null +++ b/src/.gitignore @@ -0,0 +1,2 @@ +**/__pycache__ +zasd.config.py diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..c26f42f --- /dev/null +++ b/src/config.py @@ -0,0 +1,72 @@ +from apscheduler.triggers.cron import CronTrigger +import logging + +config = { +# 'zfs_path': '/usr/local/bin/zfs', +# 'fswatch_path': '/usr/local/bin/fswatch', + +# 'tab_size': 2, + +# 'log_level': logging.INFO, +# 'log_format': '%(asctime)s %(name)s [%(levelname)-8s]: %(message)s', +# 'log_date_format': '%a, %d %b %Y, %H:%M:%S', + +# 'separator': ':', +# 'destroy_trigger': CronTrigger.from_crontab('* * * * *'), + +# 'defaults': { +# 'disabled': False, +# 'filesystems': ['Media', 'Volatile'], + 'recursive': True, + 'tag': '', + 'trigger': CronTrigger.from_crontab('0 0 * * *'), + 'priority': 1, + 'keep': 6 + }, + + 'schedules': [ + { + 'disabled': True, + 'tag': '1mi', + 'trigger': CronTrigger.from_crontab('* * * * *'), + 'priority': 7, + 'keep': 4 + }, + { + 'tag': '5mi', + 'trigger': CronTrigger.from_crontab('*/5 * * * *'), + 'priority': 6, + 'keep': 4 + }, + { + 'tag': '20m', + 'trigger': CronTrigger.from_crontab('*/20 * * * *'), + 'priority': 5, + 'keep': 3 + }, + { + 'tag': '1hr', + 'trigger': CronTrigger.from_crontab('0 * * * *'), + 'priority': 4, + 'keep': 6 + }, + { + 'tag': '6hr', + 'trigger': CronTrigger.from_crontab('0 */6 * * *'), + 'priority': 3, + 'keep': 4 + }, + { + 'tag': '1dy', + 'trigger': CronTrigger.from_crontab('0 0 * * *'), + 'priority': 2, + 'keep': 7 + }, + { + 'tag': '1wk', + 'trigger': CronTrigger.from_crontab('0 0 * * mon'), + 'priority': 1, + 'keep': 4 + } + ] +} diff --git a/src/zasd.config.example.py b/src/zasd.config.example.py new file mode 100644 index 0000000..f4278d3 --- /dev/null +++ b/src/zasd.config.example.py @@ -0,0 +1,69 @@ +#zfs_path = '/usr/local/bin/zfs' +#fswatch_path = '/usr/local/bin/fswatch' + +# Tab size for indented log messages +#tab_size = 2 + +# Log format +#log_level = INFO +#log_format = '%(asctime)s %(name)s [%(levelname)-8s]: %(message)s' +#log_date_format = '%a, %d %b %Y, %H:%M:%S' + +# Field separator in snapshot names (':' gives 'dataset@tag:serial') +#separator = ':' + +# How frequently to prune expired snapshots +#destroy_trigger = cron('* * * * *') + +# Default settings for a schedule +#defaults = dict( +# tag = 'zasd', +# disabled = False, +# filesystems = ['tank'], +# recursive = True, +# trigger = every(hours=12), +# priority = 1, +# keep = 14) + +# The simplest possible schedule list possible sets up a single +# schedule with the default settings +#schedules = [{}] + +# More advanced example schedule +# +schedules = [ + dict( + tag = '5mi', + trigger = cron('*/5 * * * *'), + priority = 6, + keep = 4), + + dict( + tag = '20m', + trigger = cron('*/20 * * * *'), + priority = 5, + keep = 3), + + dict( + tag = '1hr', + trigger = cron('0 * * * *'), + priority = 4, + keep = 6), + + dict( + tag = '6hr', + trigger = cron('0 */6 * * *'), + priority = 3, + keep = 4), + + dict( + tag = '1dy', + trigger = cron('0 0 * * *'), + priority = 2, + keep = 7), + + dict( + tag = '1wk', + trigger = cron('0 0 * * mon'), + priority = 1, + keep = 4)] diff --git a/src/zasd.py b/src/zasd.py new file mode 100644 index 0000000..39d386e --- /dev/null +++ b/src/zasd.py @@ -0,0 +1,357 @@ +from sys import stdout, stderr + +import signal +import time +from functools import partial, reduce +from itertools import islice +import asyncio +from subprocess import run, PIPE +from datetime import datetime, timezone, timedelta +import logging + +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger + +from zasd.apscheduler import * +from zasd.asyncio import * +from zasd.config import * + +# +# Constants + +DATASET_COLS = ['type', 'name', 'creation', 'mountpoint'] + +# +# Functions for running subprocesses with tabulated output + +# Run program and convert tabulated output to nested lists +def run_for_table(args): + result = run(args, check=True, stdout=PIPE, encoding='utf-8') + return str_to_table(result.stdout) + +# Run program and convert tabulated output to list of dictionaries with given column names as keys +def run_for_dicts(args, column_list): + return table_to_dicts(run_for_table(args), column_list) + +# +# Functions for converting multi-line tabulated strings to data structures + +# Convert tabulated multi-line string to nested lists +def str_to_table(string, sep='\t'): + return list(line.split(sep) for line in string.splitlines()) + +# Convert table to list of dictionaries with given column names as keys +def table_to_dicts(table, column_list): + return list(row_to_dict(row, column_list) for row in table) + +# Convert table row to dictionary with given column names as keys +def row_to_dict(row, column_list): + return ({ column_list[i]: row[i] for i in range(len(row)) }) + +# +# ZFS functions + +# Get list of snapshots (dataset dictionaries) +def zfs_get_snapshots(): + return zfs_get_datasets('snapshot') + +# Get list of filesystems (dataset dictionaries) +def zfs_get_filesystems(): + return zfs_get_datasets('filesystem') + +# Get list of datasets +def zfs_get_datasets(dataset_type='all'): + global config + return zfs_dicts_to_datasets(run_for_dicts( + [config['zfs_path'], + 'list', + '-Hp', + '-t', dataset_type, + '-o', ','.join(DATASET_COLS)], DATASET_COLS)) + +# Transform list of ZFS dictionaries to list of datasets +def zfs_dicts_to_datasets(dicts): + return list(zfs_dict_to_dataset(d) for d in dicts) + +# Transform dictionary to dataset (pool, filesystem) +def zfs_dict_to_dataset(zfs_dict): + name = zfs_dict['name'] + dataset = dict(zfs_dict) + + # Separate dataset and snapshot names out to extra fields + if '@' in name: + fields = name.split('@') + dataset['dataset'] = fields[0] + dataset['snapshot'] = fields[1] + + return dataset + +# Create one or more snapshots +async def zfs_create_snapshot(*snapshots, recursive=False): + global config + + args = [config['zfs_path'], 'snapshot'] + if recursive: + args.append('-r') + + for snapshot in snapshots: + sargs = args + [get_snapshot_zfs_name(snapshot)] + await asyncio.create_subprocess_exec(*sargs) + +# Destroy one or more snapshots +async def zfs_destroy_snapshot(*snapshots, recursive=False): + global config + + args = [config['zfs_path'], 'destroy'] + if recursive: + args.append('-r') + + for snapshot in snapshots: + sargs = args + [get_snapshot_zfs_name(snapshot)] + await asyncio.create_subprocess_exec(*sargs) + +# Generate ZFS identifier string for snapshot +def get_snapshot_zfs_name(snapshot): + if 'tag' in snapshot: + return make_snapshot_zfs_name(snapshot['dataset'], snapshot['tag'], snapshot.get('serial', None)) + elif 'snapshot' in snapshot: + return make_snapshot_zfs_name(snapshot['dataset'], snapshot['snapshot']) + else: + raise KeyError('Snapshot has no name or tag') + +# Generate ZFS identifier string from arguments +def make_snapshot_zfs_name(dataset, tag_or_snapshot, serial=None): + if serial is None: + return '{}@{}'.format(dataset, tag_or_snapshot) + else: + return '{}@{}:{}'.format(dataset, tag_or_snapshot, serial) +# +# Configuration functions + +# Retrieve all schedules and merge with default schedule +def get_schedules(): + global config + schedules = ({**config['defaults'], **dict(s)} for s in config['schedules']) + return schedules + +# Get dictionary of tag-modified flags on filesystem +def get_fs_flags(name): + global fs_modified + + if not name in fs_modified: + fs_modified[name] = dict() + + return fs_modified[name] + +# Get tag-modified flag for specific tag on filesystem +def get_fs_flag(name, tag): + flags = get_fs_flags(name) + + if not tag in flags: + flags[tag] = False + + return flags[tag] + +# Set specific tag-modified flag on filesystem +def set_fs_flag(name, tag): + flags = get_fs_flags(name) + flags[tag] = True + +# Set all tag-modified flags on filesystem +def set_all_fs_flags(name): + flags = get_fs_flags(name) + for tag in flags.keys(): + set_fs_flag(name, tag) + +# Clear specific tag-modified flag on filesystem +def clear_fs_flag(name, tag): + flags = get_fs_flags(name) + flags[tag] = False + +# +# fswatch subprocess protocol for asyncio + +class FSWatchProtocol(LineBufferedProtocol): + def __init__(self, fs): + LineBufferedProtocol.__init__(self, 'utf-8') + self.fs = fs + + def pipe_line_received(self, line): + global logger + + # Ignore empty lines and NOOPs + if len(line) == 0 or int(line) == 0: + return + + logger.info('Detected change on filesystem %s', self.fs['name']) + + # Set all tag-modified flags on filesystem + set_all_fs_flags(self.fs['name']) + +# +# Snapshot scheduling functions + +# Create snapshot from a snapshot schedule +async def snapshot_creation_task(schedule, fs): + global logger + + tag = schedule['tag'] + serial = make_snapshot_serial() + recursive = schedule['recursive'] + + if get_fs_flag(fs, tag): + # Clear tag-modified flags for this tag on filesystem + clear_fs_flag(fs, tag) + + logger.info('Taking snapshot of filesystem %s on schedule %s', fs, tag) + + # Create stub snapshot record and take the snapshot + snapshot = dict(dataset=fs, tag=tag, serial=serial) + await zfs_create_snapshot(snapshot, recursive=recursive) + +# Generate time-based 8-character hexadecimal snapshot serial number +def make_snapshot_serial(): + return ('%x' % int(time.time()))[-8:] + +# Destroy all expired snapshots +async def snapshot_destruction_task(): + global config, logger + + snapshots = zfs_get_snapshots() + + for schedule in get_schedules(): + if schedule['disabled']: + continue + + # Find expired snapshots for schedule + tag = schedule['tag'] + expired = slice_snapshots(snapshots, tag, index=schedule['keep'], stop=None, reverse=True) + + if len(expired) > 0: + logger.info('Destroying snapsnots with tag %s:', tag) + for snapshot in expired: + logger.info('%s%s', config['tab_size'] * ' ', snapshot['name']) + await zfs_destroy_snapshot(snapshot) + +# Check if snapshot matches tag +def snapshot_matches_tag(snapshot, tag): + return get_snapshot_tag(snapshot) == tag + +# Get tag of snapshot +def get_snapshot_tag(snapshot): + (tag, serial) = get_snapshot_fields(snapshot) + return tag + +# Get serial number of snapshot +def get_snapshot_serial(snapshot): + (tag, serial) = get_snapshot_fields(snapshot) + return serial + +# Get tuple of fields in snapshot name +def get_snapshot_fields(snapshot): + global config + return tuple(snapshot['snapshot'].split(config['separator'])) + +# Group 'snapshots' list using 'key' function, enumerate groups (in reverse if 'reverse' is +# True), slice by 'index' and 'stop', and return slice as flat list of snapshots +# +# If 'stop' is not specified, assume that 'index' is the index to shop at; otherwise, slice +# beginning at 'index' and ending at 'stop' +# +def slice_snapshots(snapshots, tag, index, stop=0, reverse=False, key=get_snapshot_serial): + # Find matching snapshots + matches = list(s for s in snapshots if snapshot_matches_tag(s, tag)) + + # Make ordered set of serials present in matched snapshots + ordered_set = list(sorted(set(key(s) for s in matches), reverse=reverse)) + + # Slice n serials from ordered set of serials + serials = ordered_set[:index] if stop == 0 else ordered_set[index:stop] + + # Intersect matching snapshots with sliced set of serials + result = list(s for s in matches if get_snapshot_serial(s) in set(serials)) + + return result + +# Load and activate snapshot schedules +def load_snapshot_schedules(): + global config, scheduler, fs_modified, logger + + fs_modified = dict() + + for schedule in get_schedules(): + if schedule['disabled']: + continue + + tag = schedule['tag'] + for fs in schedule['filesystems']: + scheduler.add_job(snapshot_creation_task, + trigger = schedule['trigger'], + id = make_snapshot_zfs_name(fs, tag), + group = fs, + priority = schedule['priority'], + args = [schedule, fs]) + + # Set tag-modified flags on filesystems (always take snapshots on startup) + for name in schedule['filesystems']: + set_fs_flag(name, tag) + + scheduler.add_job(snapshot_destruction_task, + trigger = config['destroy_trigger'], + group = 'destroy') + +async def main_task(): + global config, event_loop, scheduler, fs_listeners + + + scheduler = AsyncIOPriorityScheduler( + event_loop = event_loop, + executors = {'default': AsyncIOPriorityExecutor()}) + + # Watch file system mountpoints + fs_listeners = dict() + for fs in zfs_get_filesystems(): + await event_loop.subprocess_exec( + lambda: FSWatchProtocol(fs), config['fswatch_path'], '-o', fs['mountpoint'], stdout=PIPE) + + load_snapshot_schedules() + + scheduler.start() + + if stdout.isatty(): + while True: + print_spinner() + await asyncio.sleep(1) + +# Print idle spinner +def print_spinner(): + print(print_spinner.chars[print_spinner.index] + '\x1B[G', end='', file=stderr, flush=True) + print_spinner.index = (print_spinner.index + 1) % len(print_spinner.chars) +print_spinner.index = 0 +print_spinner.chars = ['|', '/', '-', '\\'] + +def signal_handler(signame): + global logger, event_loop + logger.info('Received %s', signame) + event_loop.stop() + +# +# Program + +config = load_config() +configure_logging(config) + +logger.info('Processing jobs') + +event_loop = asyncio.get_event_loop() +event_loop.add_signal_handler(signal.SIGINT, partial(signal_handler, 'SIGINT')) +event_loop.add_signal_handler(signal.SIGTERM, partial(signal_handler, 'SIGTERM')) + +event_loop.create_task(main_task()) + +try: + event_loop.run_forever() +finally: + logger.info('Terminating') + print(file=stderr) + event_loop.close() diff --git a/src/zasd/__init__.py b/src/zasd/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/zasd/apscheduler.py b/src/zasd/apscheduler.py new file mode 100644 index 0000000..f7367de --- /dev/null +++ b/src/zasd/apscheduler.py @@ -0,0 +1,236 @@ +import logging +from datetime import datetime + +import six +from apscheduler.job import Job +from apscheduler.util import undefined +from apscheduler.schedulers.base import STATE_STOPPED +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.executors.asyncio import AsyncIOExecutor +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED + +class AsyncIOPriorityScheduler(AsyncIOScheduler): + def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, + misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, + next_run_time=undefined, jobstore='default', executor='default', + replace_existing=False, group=0, priority=0, **trigger_args): + job_kwargs = { + 'trigger': self._create_trigger(trigger, trigger_args), + 'executor': executor, + 'func': func, + 'args': tuple(args) if args is not None else (), + 'kwargs': dict(kwargs) if kwargs is not None else {}, + 'id': id, + 'name': name, + 'misfire_grace_time': misfire_grace_time, + 'coalesce': coalesce, + 'max_instances': max_instances, + 'next_run_time': next_run_time, + 'group': group, + 'priority': priority + } + job_kwargs = dict((key, value) for key, value in job_kwargs.items() if + value is not undefined) + job = _PriorityJob(self, **job_kwargs) + + # Don't really add jobs to job stores before the scheduler is up and running + with self._jobstores_lock: + if self.state == STATE_STOPPED: + self._pending_jobs.append((job, jobstore, replace_existing)) + self._logger.info('Adding job tentatively -- it will be properly scheduled when ' + 'the scheduler starts') + else: + self._real_add_job(job, jobstore, replace_existing) + + return job + + def get_due_jobs(self, now=None): + if now is None: + now = datetime.now(self.timezone) + + with self._jobstores_lock: + jobs = [] + for alias, store in six.iteritems(self._jobstores): + jobs.extend(store.get_due_jobs(now)) + + return jobs + + def _process_jobs(self): + due_jobs = self.get_due_jobs() + wait_seconds = super()._process_jobs() + + executors = [] + for job in due_jobs: + executor = self._lookup_executor(job.executor) + if not executor in executors: + executors.append(executor) + + for executor in executors: + if hasattr(executor, 'commit'): + executor.commit() + + return wait_seconds + +class _PriorityJob(Job): + def __init__(self, scheduler, priority=1, group=1, **kwargs): + self.priority = priority + self.group = group + super().__init__(scheduler, **kwargs) + + def modify(self, **changes): + if 'priority' in changes: + self.priority = changes.pop('priority') + + if 'group' in changes: + self.group = changes.pop('group') + + super().modify(self, **changes) + + def __getstate__(self): + state = super().__getstate__() + + state['priority'] = self.priority + state['group'] = self.group + + return state + + def __setstate__(self, state): + priority = state.pop('priority') + group = state.pop('group') + + super().__setstate__(state) + + self.priority = priority + self.group = group + +class AsyncIOPriorityExecutor(AsyncIOExecutor): + def __init__(self, *args, **kwargs): + self.scheduler = None + self.job_groups = {} + + super().__init__(*args, **kwargs) + + def start(self, scheduler, alias): + # Store a reference to the scheduler and add job listener + self.scheduler = scheduler + self.scheduler.add_listener(self.on_job_executed, mask=EVENT_JOB_EXECUTED|EVENT_JOB_MAX_INSTANCES|EVENT_JOB_ERROR|EVENT_JOB_MISSED) + + super().start(scheduler, alias) + + def shutdown(self, *args): + # Remove job listener + self.scheduler.remove_listener(self.on_job_executed) + + super().shutdown(*args) + + def submit_job(self, job, run_times): + run_times = set(run_times) + + logger = logging.getLogger('zasd') + logger.debug('Incoming submission for job %s with run times:', job.id) + for time in run_times: + logger.debug(' %s', time) + + if job.group in self.job_groups: + job_group = self.job_groups[job.group] + + if job.id in job_group['times_for_job']: + # Fetch time set + times_for_job = job_group['times_for_job'][job.id] + else: + # Create and store time set + times_for_job = set() + job_group['times_for_job'][job.id] = times_for_job + + # Fetch map of times to their jobs + job_for_time = job_group['job_for_time'] + + # Filter out run times that coincide with higher-priority jobs + run_times = set( + time for time in run_times if + not time in job_for_time or + job.priority < self.scheduler.get_job(job_for_time[time]).priority + ) + + else: + # Create and store empty job group + + times_for_job = set() + job_for_time = {} + job_group = { 'times_for_job': { job.id: times_for_job }, 'job_for_time': job_for_time } + + self.job_groups[job.group] = job_group + + # Add new times to stored time set for current job + times_for_job |= run_times + + # Remove jobs in time set from other jobs in group + # Necessary when incoming job overwrites time slots for existing jobs + # + # Look at time sets for all jobs in group + for job_id, times in list(job_group['times_for_job'].items()): + # Look at time set for this job + for time in set(times): + # Does time in set coincide with time set of job being submitted? + if time in times_for_job and job_id != job.id: + # Remove time from time set + times.remove(time) + + if len(times) == 0: + # Delete time set if empty + del job_group['times_for_job'][job_id] + else: + # Update time set if not + job_group['times_for_job'][job_id] = times + + logger.debug('Final time set for job %s:', job.id) + for time in times_for_job: + logger.debug(' %s', time) + + # Map run times to jobs + for time in run_times: + job_for_time[time] = job.id + + def commit(self): + logger = logging.getLogger('zasd') + + logger.debug('Committing jobs:') + # Look at every group + for group_id, group in self.job_groups.items(): + # Look at every job in group + for job_id, times in group['times_for_job'].items(): + # Find job for ID and submit to scheduler superclass + job = self.scheduler.get_job(job_id) + super().submit_job(job, list(times)) + + logger.debug(' %s', job.id) + for time in times: + logger.debug(' %s', time) + + def on_job_executed(self, event): + time = event.scheduled_run_time + + logger = logging.getLogger('zasd') + logger.debug('Job %s has executed', event.job_id) + + # Find job for job ID + job = self.scheduler.get_job(event.job_id) + + # Get group and times for job + job_group = self.job_groups[job.group] + times_for_job = job_group['times_for_job'] + job_for_time = job_group['job_for_time'] + + # Delete job from time map + del job_for_time[time] + + # Delte time from job time set + times_for_job[job.id].remove(time) + + if len(times_for_job[job.id]) == 0: + logger.debug('Deleting empty time set for job %s', job.id) + del times_for_job[job.id] + else: + logger.debug('Remaining time set for job %s:', job.id) + for time in times_for_job[job.id]: + logger.debug(' %s', time) diff --git a/src/zasd/asyncio.py b/src/zasd/asyncio.py new file mode 100644 index 0000000..90bebf0 --- /dev/null +++ b/src/zasd/asyncio.py @@ -0,0 +1,30 @@ +import asyncio + +class LineBufferedProtocol(asyncio.SubprocessProtocol): + def __init__(self, encoding): + self.encoding = encoding + self.buffer = bytearray() + + def pipe_data_received(self, fd, data): + self.buffer.extend(data) + + try: + while True: + length = self.buffer.index(b'\n') + + line = self.buffer[0: length].decode() + del self.buffer[0: length + 1] + + self.pipe_line_received(line) + + except ValueError: + pass + + def pipe_line_received(self, line): + pass + + def process_exited(self): + if(len(self.buffer) > 0): + line = self.buffer.decode() + self.buffer = bytearray() + self.pipe_line_received(line) diff --git a/src/zasd/config.py b/src/zasd/config.py new file mode 100644 index 0000000..332aa01 --- /dev/null +++ b/src/zasd/config.py @@ -0,0 +1,171 @@ +from sys import argv, executable +from os import environ, getcwd, sep as psep +from os.path import dirname, abspath, join as joinpath, \ + expanduser, splitdrive, isfile + +from time import sleep + +from copy import deepcopy + +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger + +import logging +import pprint + +from zasd.logging import * +from zasd.util import * + +# +# Constants + +CONFIG_BASENAME = 'zasd.conf.py' +CONFIG_FILENAMES = [CONFIG_BASENAME, '.' + CONFIG_BASENAME] + +# Default configuration +DEFAULT_CONFIG = { + 'zfs_path': '/usr/local/bin/zfs', + 'fswatch_path': '/usr/local/bin/fswatch', + + 'tab_size': 2, + + 'log_level': logging.INFO, + 'log_format': '%(asctime)s %(name)s [%(levelname)-8s]: %(message)s', + 'log_date_format': '%a, %d %b %Y, %H:%M:%S', + + 'separator': ':', + 'destroy_trigger': CronTrigger.from_crontab('* * * * *'), + + # Defaults will take a snapshot every 12 hours + # and keep them for a week, so if the config file + # should disappear for some reason, there will + # at least be _some_ recoverable snapshots + 'defaults': { + 'disabled': False, + 'filesystems': ['tank'], + 'recursive': True, + 'tag': 'zasd', + 'trigger': IntervalTrigger(hours=12), + 'priority': 1, + 'keep': 14 + }, + + 'schedules': [{}] +} + +# Load configuration +def load_config(): + global logger + + if len(argv) > 1: + # Configuration pathname given as first argument + if isfile(argv[1]): + config_pathname = argv[1] + else: + logger.warning('Could not find configuration file %s', argv[1]) + return _warn_load_default() + else: + # No configuration pathname given; attempt to find it: + + # Get root of system partition + sys_root_path = environ.get('SystemDrive', splitdrive(executable)[0] + psep) + + # Get system configuration directory + sys_conf_path = joinpath(*ifenv('SystemRoot', + lambda path: [path, 'System32', 'drivers', 'etc'], + lambda: [sys_root_path, 'etc'])) + + # Get user home directory + user_home_path = expanduser('~') + + # Get path of this Python file + if '__file__' in globals(): + script_path = dirname(abspath(__file__)) + else: + script_path = dirname(abspath(argv[0])) + + # Build list of configuration file pathnames to search + config_paths = uniq([ + getcwd(), + user_home_path, + joinpath(user_home_path, '.config'), + joinpath(user_home_path, '.local', 'share'), + sys_conf_path, + script_path]) + config_pathnames = list(joinpath(p, f) for p in config_paths for f in CONFIG_FILENAMES) + + # Attempt to find a config file + config_pathname = find_file(config_pathnames) + + if config_pathname is None: + logger.warning('Unable to find a config file at:') + for pathname in config_pathnames: + logger.warning(' ' + pathname) + + return _warn_load_default() + + with open(config_pathname, 'rt', encoding='utf-8') as f: + config_source = f.read() + + # Create configuration file scopes + + global_scope = dict( + CRITICAL = logging.CRITICAL, + ERROR = logging.ERROR, + WARNING = logging.WARNING, + INFO = logging.INFO, + DEBUG = logging.DEBUG, + NOTSET = logging.NOTSET, + + crontab = CronTrigger.from_crontab, + cron = CronTrigger.from_crontab, + + interval = IntervalTrigger, + every = IntervalTrigger) + + local_scope = dict() + + # Execute configuration file + exec(config_source, global_scope, local_scope) + + # Merge configuration with default configuration + config = merge_configs(DEFAULT_CONFIG, local_scope) + + logger.debug('Loaded configuration') + + if config['log_level'] <= logging.DEBUG: + logger.debug('') + for line in pprint.pformat(config).split('\n'): + logging.debug(config['tab_size'] * ' ' + line) + logger.debug('') + + return config + +def _warn_load_default(): + global DEFAULT_CONFIG + + logger.warning('') + logger.warning('Waiting 10 seconds before loading default configuration...') + logger.warning('') + sleep(10) + logger.warning('Loading default configuration') + logger.warning('') + return deepcopy(DEFAULT_CONFIG) + +def merge_configs(base, diff, path=[]): + base = base if len(path) == 0 else deepcopy(base) + for key, value in diff.items(): + if not key in base: + base[key] = value + elif not isinstance(value, type(base[key])): + logger.error('Cannot merge diff type %s with base %s type at %s.%s', + type(value), type(base[key]), '.'.join(path), key) + return None + elif isinstance(value, dict): + merged = merge_configs(base[key], value, path + [key]) + if merged is None: + return None + base[key] = merged + else: + base[key] = value + return base diff --git a/src/zasd/logging.py b/src/zasd/logging.py new file mode 100644 index 0000000..8c97de6 --- /dev/null +++ b/src/zasd/logging.py @@ -0,0 +1,32 @@ +import logging + +# Bootstrap logging using a hardcoded configuration +def bootstrap_logging(): + global logger + + logging.basicConfig( + level=logging.NOTSET, + format='%(asctime)s %(name)s [%(levelname)-8s]: %(message)s', + datefmt='%a, %d %b %Y, %H:%M:%S') + + logger = logging.getLogger('zasd') + logger.setLevel(logging.NOTSET) + + logging.getLogger('apscheduler').setLevel(logging.NOTSET) + +# Configure logging using a loaded configuration +def configure_logging(config): + global logger + + level = config['log_level'] + + logging.basicConfig( + format=config['log_format'], + datefmt=config['log_date_format']) + + logger = logging.getLogger('zasd') + logger.setLevel(level) + logging.getLogger('asyncio').setLevel(logging.WARN) + logging.getLogger('apscheduler').setLevel(logging.WARN) + +bootstrap_logging() diff --git a/src/zasd/util.py b/src/zasd/util.py new file mode 100644 index 0000000..62f9dce --- /dev/null +++ b/src/zasd/util.py @@ -0,0 +1,25 @@ +from os import environ +from os.path import isfile + +# Return unique items in iterable +def uniq(iterable): + items = set() + for item in iterable: + if item not in items: + items.add(item) + yield item + +# Ternary operator for environment variables +def ifenv(name, true_func, false_func): + return true_func(environ[name]) if name in environ else false_func() + +# Search for file in pathnames, return the first pathname +# that exists as a file, or None if no files were found +def find_file(pathnames): + try: + # Find the first pathname that exists as a file + pathname = next(p for p in pathnames if isfile(p)) + except: + pathname = None + finally: + return pathname