diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..feb6528 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,5 @@ +[MESSAGES CONTROL] + +disable= + invalid-name, + bad-whitespace \ No newline at end of file diff --git a/src/zasd.py b/src/zasd.py index f4eaad9..36de56b 100755 --- a/src/zasd.py +++ b/src/zasd.py @@ -7,7 +7,6 @@ from itertools import islice import asyncio from subprocess import run, PIPE from datetime import datetime, timezone, timedelta -import logging from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger @@ -15,342 +14,125 @@ from apscheduler.triggers.interval import IntervalTrigger from zasd.apscheduler import * from zasd.asyncio import * from zasd.config import * +from zasd.zfs import * +from zasd.fs import * +from zasd.log import * -# -# Constants +class ZASD(): + def __init__(self): + self.event_loop = asyncio.get_event_loop() -DATASET_COLS = ['type', 'name', 'creation', 'mountpoint'] + self.event_loop.add_signal_handler(signal.SIGINT, + partial(self.signal_handler, 'SIGINT')) + self.event_loop.add_signal_handler(signal.SIGTERM, + partial(self.signal_handler, 'SIGTERM')) -# -# Functions for running subprocesses with tabulated output + global config + config = load_config() + configure_logging() -# Run program and convert tabulated output to nested lists -def run_for_table(args): - result = run(args, check=True, stdout=PIPE, encoding='utf-8') - return str_to_table(result.stdout) + self.zfs = ZFS(config['zfs_path']) -# Run program and convert tabulated output to list of dictionaries with given column names as keys -def run_for_dicts(args, column_list): - return table_to_dicts(run_for_table(args), column_list) + log.info('Processing jobs') -# -# Functions for converting multi-line tabulated strings to data structures - -# Convert tabulated multi-line string to nested lists -def str_to_table(string, sep='\t'): - return list(line.split(sep) for line in string.splitlines()) - -# Convert table to list of dictionaries with given column names as keys -def table_to_dicts(table, column_list): - return list(row_to_dict(row, column_list) for row in table) - -# Convert table row to dictionary with given column names as keys -def row_to_dict(row, column_list): - return ({ column_list[i]: row[i] for i in range(len(row)) }) - -# -# ZFS functions - -# Get list of snapshots (dataset dictionaries) -def zfs_get_snapshots(): - return zfs_get_datasets('snapshot') - -# Get list of filesystems (dataset dictionaries) -def zfs_get_filesystems(): - return zfs_get_datasets('filesystem') - -# Get list of datasets -def zfs_get_datasets(dataset_type='all'): - global config - return zfs_dicts_to_datasets(run_for_dicts( - [config['zfs_path'], - 'list', - '-Hp', - '-t', dataset_type, - '-o', ','.join(DATASET_COLS)], DATASET_COLS)) - -# Transform list of ZFS dictionaries to list of datasets -def zfs_dicts_to_datasets(dicts): - return list(zfs_dict_to_dataset(d) for d in dicts) - -# Transform dictionary to dataset (pool, filesystem) -def zfs_dict_to_dataset(zfs_dict): - name = zfs_dict['name'] - dataset = dict(zfs_dict) - - # Separate dataset and snapshot names out to extra fields - if '@' in name: - fields = name.split('@') - dataset['dataset'] = fields[0] - dataset['snapshot'] = fields[1] - - return dataset - -# Create one or more snapshots -async def zfs_create_snapshot(*snapshots, recursive=False): - global config - - args = [config['zfs_path'], 'snapshot'] - if recursive: - args.append('-r') - - for snapshot in snapshots: - sargs = args + [get_snapshot_zfs_name(snapshot)] - await asyncio.create_subprocess_exec(*sargs) - -# Destroy one or more snapshots -async def zfs_destroy_snapshot(*snapshots, recursive=False): - global config - - args = [config['zfs_path'], 'destroy'] - if recursive: - args.append('-r') - - for snapshot in snapshots: - sargs = args + [get_snapshot_zfs_name(snapshot)] - await asyncio.create_subprocess_exec(*sargs) - -# Generate ZFS identifier string for snapshot -def get_snapshot_zfs_name(snapshot): - if 'tag' in snapshot: - return make_snapshot_zfs_name(snapshot['dataset'], snapshot['tag'], snapshot.get('serial', None)) - elif 'snapshot' in snapshot: - return make_snapshot_zfs_name(snapshot['dataset'], snapshot['snapshot']) - else: - raise KeyError('Snapshot has no name or tag') - -# Generate ZFS identifier string from arguments -def make_snapshot_zfs_name(dataset, tag_or_snapshot, serial=None): - if serial is None: - return '{}@{}'.format(dataset, tag_or_snapshot) - else: - return '{}@{}:{}'.format(dataset, tag_or_snapshot, serial) -# -# Configuration functions - -# Retrieve all schedules and merge with default schedule -def get_schedules(): - global config - schedules = ({**config['defaults'], **dict(s)} for s in config['schedules']) - return schedules - -# Get dictionary of tag-modified flags on filesystem -def get_fs_flags(name): - global fs_modified - - if not name in fs_modified: - fs_modified[name] = dict() - - return fs_modified[name] - -# Get tag-modified flag for specific tag on filesystem -def get_fs_flag(name, tag): - flags = get_fs_flags(name) - - if not tag in flags: - flags[tag] = False - - return flags[tag] - -# Set specific tag-modified flag on filesystem -def set_fs_flag(name, tag): - flags = get_fs_flags(name) - flags[tag] = True - -# Set all tag-modified flags on filesystem -def set_all_fs_flags(name): - flags = get_fs_flags(name) - for tag in flags.keys(): - set_fs_flag(name, tag) - -# Clear specific tag-modified flag on filesystem -def clear_fs_flag(name, tag): - flags = get_fs_flags(name) - flags[tag] = False - -# -# fswatch subprocess protocol for asyncio - -class FSWatchProtocol(LineBufferedProtocol): - def __init__(self, fs): - LineBufferedProtocol.__init__(self, 'utf-8') - self.fs = fs - - def pipe_line_received(self, line): - global logger - - # Ignore empty lines and NOOPs - if len(line) == 0 or int(line) == 0: - return - - logger.info('Detected change on filesystem %s', self.fs['name']) - - # Set all tag-modified flags on filesystem - set_all_fs_flags(self.fs['name']) - -# -# Snapshot scheduling functions - -# Create snapshot from a snapshot schedule -async def snapshot_creation_task(schedule, fs): - global logger - - tag = schedule['tag'] - serial = make_snapshot_serial() - recursive = schedule['recursive'] - - if not schedule['if_modified'] or get_fs_flag(fs, tag): - # Clear tag-modified flags for this tag on filesystem - clear_fs_flag(fs, tag) - - logger.info('Taking snapshot of filesystem %s on schedule %s', fs, tag) - - # Create stub snapshot record and take the snapshot - snapshot = dict(dataset=fs, tag=tag, serial=serial) - await zfs_create_snapshot(snapshot, recursive=recursive) + # Load and activate snapshot schedules + self.scheduler = AsyncIOPriorityScheduler( + event_loop = self.event_loop, + executors = {'default': AsyncIOPriorityExecutor()}) + self.load_schedules() + self.scheduler.start() + + spinner = Spinner() + self.event_loop.create_task(spinner.spin) + + try: + self.event_loop.run_forever() + finally: + log.info('Terminating') + print(file=stderr) + self.event_loop.close() + + def signal_handler(self, signame): + log.info('Received %s', signame) + self.event_loop.stop() + + def load_schedules(self): + for schedule in self.schedules(): + if schedule['disabled']: + continue + + tag = schedule['tag'] + for fs in schedule['filesystems']: + self.scheduler.add_job(lambda: self.snapshot_creation_task, + trigger = schedule['trigger'], + id = '{}:{}'.format(fs, tag), + group = fs, + priority = schedule['priority'], + args = [schedule, fs]) -# Generate time-based 8-character hexadecimal snapshot serial number -def make_snapshot_serial(): - return ('%x' % int(time.time()))[-8:] + # Set tag-modified flags on filesystems (always take snapshots on startup) + for name in schedule['filesystems']: + filesystem = self.zfs.filesystems(name) + filesystem.modified(tag, True) -# Destroy all expired snapshots -async def snapshot_destruction_task(): - global config, logger + self.scheduler.add_job(self.snapshot_destruction_task, + trigger = config['destroy_trigger'], + id = 'destroy', + group = 'destroy') - snapshots = zfs_get_snapshots() + # Retrieve all schedules and merge with default schedule + def schedules(self): + schedules = ({**config['defaults'], **dict(s)} for + s in config['schedules']) + return schedules - for schedule in get_schedules(): - if schedule['disabled']: - continue + # + # Snapshot scheduling functions - # Find expired snapshots for schedule + # Create snapshot from a snapshot schedule + async def snapshot_creation_task(self, schedule, fs): tag = schedule['tag'] - expired = slice_snapshots(snapshots, tag, index=schedule['keep'], stop=None, reverse=True) - - if len(expired) > 0: - logger.info('Destroying snapsnots with tag %s:', tag) - for snapshot in expired: - logger.info('%s%s', config['tab_size'] * ' ', snapshot['name']) - await zfs_destroy_snapshot(snapshot) - -# Check if snapshot matches tag -def snapshot_matches_tag(snapshot, tag): - return get_snapshot_tag(snapshot) == tag - -# Get tag of snapshot -def get_snapshot_tag(snapshot): - (tag, serial) = get_snapshot_fields(snapshot) - return tag - -# Get serial number of snapshot -def get_snapshot_serial(snapshot): - (tag, serial) = get_snapshot_fields(snapshot) - return serial - -# Get tuple of fields in snapshot name -def get_snapshot_fields(snapshot): - global config - return tuple(snapshot['snapshot'].split(config['separator'])) - -# Group 'snapshots' list using 'key' function, enumerate groups (in reverse if 'reverse' is -# True), slice by 'index' and 'stop', and return slice as flat list of snapshots -# -# If 'stop' is not specified, assume that 'index' is the index to shop at; otherwise, slice -# beginning at 'index' and ending at 'stop' -# -def slice_snapshots(snapshots, tag, index, stop=0, reverse=False, key=get_snapshot_serial): - # Find matching snapshots - matches = list(s for s in snapshots if snapshot_matches_tag(s, tag)) - # Make ordered set of serials present in matched snapshots - ordered_set = list(sorted(set(key(s) for s in matches), reverse=reverse)) + recursive = schedule['recursive'] - # Slice n serials from ordered set of serials - serials = ordered_set[:index] if stop == 0 else ordered_set[index:stop] + if not schedule['if_modified'] or fs.was_modifed(): + # Clear tag-modified flags for this tag on filesystem + fs.clear_modified(tag) - # Intersect matching snapshots with sliced set of serials - result = list(s for s in matches if get_snapshot_serial(s) in set(serials)) + log.info('Taking snapshot of filesystem %s on schedule %s', fs, tag) - return result + # Create stub snapshot record and take the snapshot + snapshot = dict(dataset=fs, tag=tag) + await self.zfs.create_snapshot(snapshot, recursive=recursive) -# Load and activate snapshot schedules -def load_snapshot_schedules(): - global config, scheduler, fs_modified, logger - fs_modified = dict() + # Destroy all expired snapshots + async def snapshot_destruction_task(self): + snapshots = self.zfs.snapshots() - for schedule in get_schedules(): - if schedule['disabled']: - continue + for schedule in self.schedules(): + if schedule['disabled']: + continue - tag = schedule['tag'] - for fs in schedule['filesystems']: - scheduler.add_job(snapshot_creation_task, - trigger = schedule['trigger'], - id = make_snapshot_zfs_name(fs, tag), - group = fs, - priority = schedule['priority'], - args = [schedule, fs]) - - # Set tag-modified flags on filesystems (always take snapshots on startup) - for name in schedule['filesystems']: - set_fs_flag(name, tag) + # Find expired snapshots for schedule + tag = schedule['tag'] + expired = slice_snapshots(snapshots, tag, index=schedule['keep'], stop=None, reverse=True) - scheduler.add_job(snapshot_destruction_task, - trigger = config['destroy_trigger'], - group = 'destroy') + if len(expired) > 0: + log.info('Destroying snapsnots with tag %s:', tag) + for snapshot in expired: + log.info('%s%s', + config['tab_size'] * ' ', snapshot['name']) + await self.zfs.destroy_snapshot(snapshot) -async def main_task(): - global config, event_loop, scheduler, fs_listeners - - scheduler = AsyncIOPriorityScheduler( - event_loop = event_loop, - executors = {'default': AsyncIOPriorityExecutor()}) - - # Watch file system mountpoints - fs_listeners = dict() - for fs in zfs_get_filesystems(): - await event_loop.subprocess_exec( - lambda: FSWatchProtocol(fs), config['fswatch_path'], '-o', fs['mountpoint'], stdout=PIPE) - - load_snapshot_schedules() - - scheduler.start() +# Class for printing idle spinner +class Spinner(): + CHARS = ['|', '/', '-', '\\'] + counter = 0 - if stdout.isatty(): + def spin(self): while True: - print_spinner() + print(self.CHARS[self.counter] + '\x1B[G', end='', file=stderr, flush=True) + self.counter = (self.counter + 1) % len(self.CHARS) await asyncio.sleep(1) -# Print idle spinner -def print_spinner(): - print(print_spinner.chars[print_spinner.index] + '\x1B[G', end='', file=stderr, flush=True) - print_spinner.index = (print_spinner.index + 1) % len(print_spinner.chars) -print_spinner.index = 0 -print_spinner.chars = ['|', '/', '-', '\\'] - -def signal_handler(signame): - global logger, event_loop - logger.info('Received %s', signame) - event_loop.stop() - -# -# Program - -config = load_config() -configure_logging(config) - -logger.info('Processing jobs') - -event_loop = asyncio.get_event_loop() -event_loop.add_signal_handler(signal.SIGINT, partial(signal_handler, 'SIGINT')) -event_loop.add_signal_handler(signal.SIGTERM, partial(signal_handler, 'SIGTERM')) - -event_loop.create_task(main_task()) - -try: - event_loop.run_forever() -finally: - logger.info('Terminating') - print(file=stderr) - event_loop.close() diff --git a/src/zasd/apscheduler.py b/src/zasd/apscheduler.py index 9063e36..f69d930 100644 --- a/src/zasd/apscheduler.py +++ b/src/zasd/apscheduler.py @@ -84,7 +84,7 @@ class _PriorityJob(Job): if 'group' in changes: self.group = changes.pop('group') - super().modify(self, **changes) + super().modify(**changes) def __getstate__(self): state = super().__getstate__() @@ -196,7 +196,7 @@ class AsyncIOPriorityExecutor(AsyncIOExecutor): logger.debug('Committing jobs:') # Look at every group - for group_id, group in self.job_groups.items(): + for group in self.job_groups.values(): # Look at every job in group for job_id, times in group['times_for_job'].items(): # Find job for ID and submit to scheduler superclass diff --git a/src/zasd/asyncio.py b/src/zasd/asyncio.py index 90bebf0..82f4823 100644 --- a/src/zasd/asyncio.py +++ b/src/zasd/asyncio.py @@ -4,14 +4,14 @@ class LineBufferedProtocol(asyncio.SubprocessProtocol): def __init__(self, encoding): self.encoding = encoding self.buffer = bytearray() - + def pipe_data_received(self, fd, data): self.buffer.extend(data) try: while True: length = self.buffer.index(b'\n') - + line = self.buffer[0: length].decode() del self.buffer[0: length + 1] @@ -24,7 +24,7 @@ class LineBufferedProtocol(asyncio.SubprocessProtocol): pass def process_exited(self): - if(len(self.buffer) > 0): + if len(self.buffer) > 0: line = self.buffer.decode() self.buffer = bytearray() self.pipe_line_received(line) diff --git a/src/zasd/config.py b/src/zasd/config.py index a094827..c4f8cd2 100644 --- a/src/zasd/config.py +++ b/src/zasd/config.py @@ -13,8 +13,8 @@ from apscheduler.triggers.interval import IntervalTrigger import logging import pprint -from zasd.logging import * -from zasd.util import * +from zasd.log import log +from zasd.util import * # # Constants @@ -22,20 +22,25 @@ from zasd.util import * CONFIG_BASENAME = 'zasd.conf.py' CONFIG_FILENAMES = [CONFIG_BASENAME, '.' + CONFIG_BASENAME] +# +# Globals + +config = {} + # Default configuration DEFAULT_CONFIG = dict( zfs_path = '/usr/local/bin/zfs', fswatch_path = '/usr/local/bin/fswatch', - + tab_size = 2, log_level = logging.INFO, log_format = '%(asctime)s %(name)s [%(levelname)-8s]: %(message)s', log_date_format = '%a, %d %b %Y, %H:%M:%S', - + separator = ':', destroy_trigger = CronTrigger.from_crontab('* * * * *'), - + # Defaults will take a snapshot every 12 hours # and keep them for a week, so if the config file # should disappear for some reason, there will @@ -56,18 +61,16 @@ DEFAULT_CONFIG = dict( # Load configuration def load_config(): - global logger - if len(argv) > 1: # Configuration pathname given as first argument if isfile(argv[1]): config_pathname = argv[1] else: - logger.warning('Could not find configuration file %s', argv[1]) + log.warning('Could not find configuration file %s', argv[1]) return _warn_load_default() else: # No configuration pathname given; attempt to find it: - + # Get root of system partition sys_root_path = environ.get('SystemDrive', splitdrive(executable)[0] + psep) @@ -75,16 +78,16 @@ def load_config(): sys_conf_path = joinpath(*ifenv('SystemRoot', lambda path: [path, 'System32', 'drivers', 'etc'], lambda: [sys_root_path, 'etc'])) - + # Get user home directory user_home_path = expanduser('~') - + # Get path of this Python file if '__file__' in globals(): script_path = dirname(abspath(__file__)) else: script_path = dirname(abspath(argv[0])) - + # Build list of configuration file pathnames to search config_paths = uniq([ getcwd(), @@ -99,9 +102,9 @@ def load_config(): config_pathname = find_file(config_pathnames) if config_pathname is None: - logger.warning('Unable to find a config file at:') + log.warning('Unable to find a config file at:') for pathname in config_pathnames: - logger.warning(' ' + pathname) + log.warning(' ' + pathname) return _warn_load_default() @@ -120,7 +123,7 @@ def load_config(): crontab = CronTrigger.from_crontab, cron = CronTrigger.from_crontab, - + interval = IntervalTrigger, every = IntervalTrigger) @@ -130,27 +133,27 @@ def load_config(): exec(config_source, global_scope, local_scope) # Merge configuration with default configuration + global config config = merge_configs(DEFAULT_CONFIG, local_scope) - - logger.debug('Loaded configuration') - + + log.debug('Loaded configuration') + if config['log_level'] <= logging.DEBUG: - logger.debug('') + log.debug('') for line in pprint.pformat(config).split('\n'): logging.debug(config['tab_size'] * ' ' + line) - logger.debug('') + log.debug('') - return config def _warn_load_default(): global DEFAULT_CONFIG - logger.warning('') - logger.warning('Waiting 10 seconds before loading default configuration...') - logger.warning('') + log.warning('') + log.warning('Waiting 10 seconds before loading default configuration...') + log.warning('') sleep(10) - logger.warning('Loading default configuration') - logger.warning('') + log.warning('Loading default configuration') + log.warning('') return deepcopy(DEFAULT_CONFIG) def merge_configs(base, diff, path=[]): @@ -159,7 +162,7 @@ def merge_configs(base, diff, path=[]): if not key in base: base[key] = value elif not isinstance(value, type(base[key])): - logger.error('Cannot merge diff type %s with base %s type at %s.%s', + log.error('Cannot merge diff type %s with base %s type at %s.%s', type(value), type(base[key]), '.'.join(path), key) return None elif isinstance(value, dict): diff --git a/src/zasd/filesystem.py b/src/zasd/filesystem.py new file mode 100644 index 0000000..1c0f7f6 --- /dev/null +++ b/src/zasd/filesystem.py @@ -0,0 +1,117 @@ +filesystems = {} + +class Filesystem: + ''' Base filsystem class inherited by filesystem implementations ''' + + def __init__(self, name, props, **kwprops): + # Set common properties + self._name = name + + # Process arguments to extract all properties + props = process_args(props, kwprops) + + # Delegate further initialisation to the subclass + self.initialise(props) + + def initialise(self, props): + ''' Perform initialisation. This method is called in the constructor + once the property arguments have been processed and merged into a + single property dictionary. Subclasses typically use this method for + transforming property values and assigning them to private + attributes on the Filesystem object. ''' + + async def snapshots(self): + ''' Retrieve an {ordinal: snapshot} dictionary of the snapshots that are + managed by this Filesystem object. ''' + + async def mountpoint(self): + ''' Retrieve the mountpoint path this Filesystem object is managing. ''' + + @classmethod + async def filesystems(cls): + ''' Retrieve a {name: filesystem} dictionary of the mounted filesystems + that are managed by this Filesystem class. ''' + raise NotImplementedError() + + @classmethod + async def expose(cls): + ''' Retrieve an object to be exposed on the global scope of the + configuration file. The identifier of this object will equal the + registered name of the filesystem. This may be used to expose + variables and methods that the user will need in order to configure + the filesystem. The method is called whenever the configuration + file is reloaded. ''' + +class Snapshot: + ''' Base snapshot class inherited by filesystem implementations''' + + def __init__(self, filesystem, tag, *props, **kwprops): + # pylint: disable=unused-argument + + # Set common properties + self._filesystem = filesystem + self._tag = tag + + # Process arguments to extract all properties + props = process_args(props, kwprops) + + # Delegate further initialisation to the subclass + self.initialise(props) + + def initialise(self, props): + ''' Perform initialisation. This method is called in the constructor + once the property arguments have been processed and merged into a + single property dictionary. Subclasses typically use this method for + transforming property values and assigning them to private + attributes on the Snapshot object. ''' + + def filesystem(self): + ''' Get parent Filesystem object. This method need not be overridden + in subclasses. ''' + return self._filesystem + + def tag(self): + ''' Get tag name. Each configured schedule in ZAFS has a unique tag + name. The tag name is passed in the constructor of the snapshot. A + tag name paired with an ordinal uniquely identifies a snapshot. This + method need not be overridden in subclasses. ''' + return self._tag + + def ordinal(self): + ''' Get ordinal. Ordinals are sortable, hashable timestamps that are + unique per tag name. They need not be numbers, but they must sort + in a strictly ascending chronological order. The ordinal is + typically derived from the UNIX timestamp the snapshot was taken at. + Before the snapshot is taken, this method returns None. ''' + raise NotImplementedError() + + async def take(self): + ''' Take snapshot and return the ordinal. This method is never called + more than once per snapshot object. ''' + + def taken(self): + ''' This method returns True if the snapshot has been taken. ''' + + async def delete(self): + ''' Delete this snapshot. No further method calls are made on the + snapshot once it has been destroyed. ''' + +def process_args(props, kwprops): + ''' Merge 'props' and 'kwprops' into single dictionary ''' + if props: + if not isinstance(props, dict): + raise TypeError('Properties argument is not a dictionary') + props = dict(props) + else: + props = {} + + # Update properties dictionary with keyword arguments + props.update(kwprops) + + return props + +def register(name, klass): + ''' Register a filesystem. If a filesystem needs to be configured by the + user, it must retrieve its configuration from a dictionary at + config['filesystems'][name]. ''' + filesystems[name] = klass \ No newline at end of file diff --git a/src/zasd/filesystem/zfs.py b/src/zasd/filesystem/zfs.py new file mode 100644 index 0000000..05b22ea --- /dev/null +++ b/src/zasd/filesystem/zfs.py @@ -0,0 +1,142 @@ +from subprocess import run, PIPE +import asyncio +import locale +from time import time +import re + +from zasd.config import config +from zasd.filesystem import Filesystem, Snapshot + +# +# Constants + +_DATASET_COLS = ['type', 'name', 'creation', 'mountpoint'] + +class ZFS(Filesystem): + def __init__(self, *args, **kwargs): + super().__init__(*args, *kwargs) + + def snapshots(self, name=None): + datasets = self.datasets('snapshot') + if name is None: + return datasets + return next(ds for ds in datasets if ds['name'] == name) + + @classmethod + def filesystems(cls, name=None): + datasets = cls.datasets('filesystem') + if name is None: + return datasets + return next(ds for ds in datasets if ds['name'] == name) + + @classmethod + def datasets(cls, dataset_type, *columns): + '''Get list of datasets''' + return _run_for_dicts( + [config['zfs_path'], + 'list', '-Hp', + '-t', dataset_type, + '-o', ','.join(columns)], columns) + + async def take_snapshot(self, name, recursive=False): + '''Create ZFS snapshot and, optionally, include the children''' + args = [config['zfs']['executable'], 'snapshot'] + if recursive: + args.append('-r') + args.append(name) + + return await asyncio.create_subprocess_exec(*args) + + async def destroy_snapshot(self, name, recursive=False): + '''Destroy ZFS snapshot and, optionally, include the children''' + args = [config['zfs']['executable'], 'destroy'] + if recursive: + args.append('-r') + args.append(name) + + return await asyncio.create_subprocess_exec(*args) + +class ZFSSnapshot(Snapshot): + def initialise(self, props): + # Check if 'name' property exists and looks like a snapshot name + name = props.get('name') + + if name: + # Split name string into field list + fields = name.split('@' + config['separator']) + + # Get any missing properties from field list + if not props.get('dataset'): + props['dataset'] = fields[0] + if not props.get('tag'): + props['tag'] = fields[1] + if not props.get('serial'): + props['serial'] = fields[2] + + # Assign properties to object + self._dataset = props.get('dataset') + self._tag = props.get('tag') + self._serial = props.get('serial', ('%x' % int(time()))[-8:]) + self._mountpoint = props.get('mountpoint') + + def dataset(self): + return self._dataset + + ''' Get ZFS name of snapshot ''' + def name(self): + return '{}@{}{}{}'.format( + self._dataset, self._tag, config['separator'], self._serial) + + ''' Get tag name''' + def tag(self): + return self._tag + + ''' Get serial ''' + def serial(self): + return self._serial + + ''' Get mountpoint ''' + def mountpoint(self): + return self._mountpoint + + +def is_snapshot(arg): + ''' + Check if 'arg' looks like a ZASD snapshot name, meaning a string + consisting of a character sequence as follows: + + 1. At least one character that isn't '@' + 2. Character '@' + 3. At least one character that isn't a separator + 4. Separator character + 5. At least one character that isn't a separator + 6. Go to #4 and repeat until the string ends + + NOTE: The validity of 'arg' as a ZFS snapshot name is not checked! + ''' + sep = re.escape(config['separator']) + return isinstance(arg, str) and bool( + re.match('^[^@]+[^%s]+(%s[^%s]+)+$' % (sep, sep, sep), arg)) + +def _run_for_dicts(args, column_list): + ''' Run program and convert tabulated output to list + of dictionaries with given column names as keys ''' + return _table_to_dicts(_run_for_table(args), column_list) + +def _run_for_table(args): + '''Run program and convert tabulated output to nested lists''' + result = run(args, check=True, stdout=PIPE, + encoding=locale.getpreferredencoding()) + return _str_to_table(result.stdout) + +def _str_to_table(string, sep='\t'): + '''Convert tabulated multi-line string to nested lists''' + return (line.split(sep) for line in string.splitlines()) + +def _table_to_dicts(table, column_list): + '''Convert table to list of dictionaries with given column names as keys''' + return (_row_to_dict(row, column_list) for row in table) + +def _row_to_dict(row, column_list): + '''Convert table row to dictionary with given column names as keys''' + return {column_list[i]: row[i] for i in range(len(row))} diff --git a/src/zasd/logging.py b/src/zasd/log.py similarity index 73% rename from src/zasd/logging.py rename to src/zasd/log.py index 8c97de6..b169780 100644 --- a/src/zasd/logging.py +++ b/src/zasd/log.py @@ -1,32 +1,29 @@ import logging +from zasd.config import config + +log = logging.getLogger('zasd') + # Bootstrap logging using a hardcoded configuration def bootstrap_logging(): - global logger - logging.basicConfig( level=logging.NOTSET, format='%(asctime)s %(name)s [%(levelname)-8s]: %(message)s', datefmt='%a, %d %b %Y, %H:%M:%S') - logger = logging.getLogger('zasd') - logger.setLevel(logging.NOTSET) - + log.setLevel(logging.NOTSET) logging.getLogger('apscheduler').setLevel(logging.NOTSET) # Configure logging using a loaded configuration -def configure_logging(config): - global logger - +def configure_logging(): level = config['log_level'] - + logging.basicConfig( format=config['log_format'], datefmt=config['log_date_format']) - logger = logging.getLogger('zasd') - logger.setLevel(level) + log.setLevel(level) logging.getLogger('asyncio').setLevel(logging.WARN) logging.getLogger('apscheduler').setLevel(logging.WARN) -bootstrap_logging() +bootstrap_logging() \ No newline at end of file diff --git a/src/zasd/pathwatcher.py b/src/zasd/pathwatcher.py new file mode 100644 index 0000000..2c16c8b --- /dev/null +++ b/src/zasd/pathwatcher.py @@ -0,0 +1,31 @@ +import asyncio +from subprocess import PIPE +import locale + +from zasd.config import config +from zasd.asyncio import LineBufferedProtocol +from zasd.log import log + +class PathWatcher: + def __init__(self, name, path, callback): + self._name = name + self._path = path + + # Watch path + asyncio.get_event_loop().subprocess_exec( + PathWatcherProtocol(name, callback), config['fswatch_path'], '-o', + path, stdout=PIPE) + +class PathWatcherProtocol(LineBufferedProtocol): + def __init__(self, name, callback): + LineBufferedProtocol.__init__(self, locale.getpreferredencoding()) + self._name = name + self._callback = callback + + def pipe_line_received(self, line): + # Ignore empty lines and NOOPs + if len(line) == 0 or int(line) == 0: + return + + log.info('Detected change on filesystem %s', self._name) + self._callback(self._name) diff --git a/src/zasd/scheduler.py b/src/zasd/scheduler.py new file mode 100644 index 0000000..e223e4c --- /dev/null +++ b/src/zasd/scheduler.py @@ -0,0 +1,25 @@ +def slice(snapshots, tag, start=0, stop=None, reverse=True, + key=lambda snapshot: snapshot.serial()): + ''' + Take list of 'snapshots' having 'tag'. Take set of 'key's from list, + sort the set (optionally in 'reverse'), slice it from 'start' to 'stop', + take list of 'snapshots' belonging to this set and return it. If 'stop' + is not specified, slice set from 'start' to end of set. If called with + first three arguments only, return list of snapshots older than the + 'index'th snapshot in the set. + ''' + + # Find snapshots having 'tag' + matches = (s for s in snapshots if s.tag() == tag) + + # Take ordered set of keys from matching snapshots + ordered_set = sorted(set(key(s) for s in matches), reverse=reverse) + + # Take slice from ordered set of keys + keys = set(ordered_set[start:] if stop is None + else ordered_set[start:stop]) + + # Filter matches by keys in sliced set + result = (s for s in matches if key(s) in keys) + + return result