import sys from sys import stdout, stderr import signal import time from functools import partial, reduce from itertools import islice import asyncio from asyncio import get_event_loop from subprocess import run, PIPE from datetime import datetime, timezone, timedelta from apscheduler.triggers.cron import CronTrigger # type: ignore from apscheduler.triggers.interval import IntervalTrigger # type: ignore from zasd.apsched import AsyncIOPriorityScheduler, \ AsyncIOPriorityExecutor from zasd.config.loader import load_config import zasd.config as config from zasd.filesystem import FilesystemRegistry from zasd.filesystems.zfs import ZFSFilesystem from zasd.log import configure_logging, log from zasd.repl import repl async def main(): main.task = asyncio.current_task() log.info('sys.path = %s', sys.path) await load_config() configure_logging() if stdout.isatty(): asyncio.create_task(repl()) log.info('Processing jobs') # Load and activate snapshot schedules #launch_scheduler() try: while True: await asyncio.sleep(3600) except asyncio.CancelledError: pass def start(): try: code = asyncio.get_event_loop().run_until_complete(main()) finally: print('Terminating') def stop(): main.task.cancel() def signal_handler(signame): log.info('Received %s', signame) get_event_loop().stop() def launch_scheduler(): scheduler = AsyncIOPriorityScheduler( event_loop = get_event_loop(), executors = {'default': AsyncIOPriorityExecutor()}) for schedule in schedules(): if schedule['disabled']: continue tag = schedule['tag'] for fs in schedule['filesystems']: scheduler.add_job( lambda: snapshot_creation_task, trigger = schedule['trigger'], id = '{}:{}'.format(fs, tag), group = fs, priority = schedule['priority'], args = [schedule, fs]) # TODO: Set tag-modified flags on filesystems (always take snapshots on startup) scheduler.add_job( snapshot_destruction_task, trigger = config.get('destroy_trigger'), id = 'destroy', group = 'destroy') #scheduler.start() # Retrieve all schedules and merge with default schedule def schedules(): return ({**config.get('defaults'), **dict(s)} for s in config.get('schedules')) # # Snapshot scheduling functions # Create snapshot from a snapshot schedule async def snapshot_creation_task(schedule, fs): tag = schedule['tag'] recursive = schedule['recursive'] if not schedule['if_modified'] or fs.was_modifed(): # Clear tag-modified flags for this tag on filesystem fs.clear_modified(tag) log.info('Taking snapshot of filesystem %s on schedule %s', fs, tag) # Create stub snapshot record and take the snapshot snapshot = dict(dataset=fs, tag=tag) #await zfs.create_snapshot(snapshot, recursive=recursive) # Destroy all expired snapshots async def snapshot_destruction_task(): #snapshots = zfs.snapshots() for schedule in schedules(): if schedule['disabled']: continue # Find expired snapshots for schedule #tag = schedule['tag'] #expired = slice_snapshots(snapshots, tag, index=schedule['keep'], stop=None, reverse=True) #if len(expired) > 0: # log.info('Destroying snapsnots with tag %s:', tag) # for snapshot in expired: # log.info('%s%s', # config['tab_size'] * ' ', snapshot['name']) # await zfs.destroy_snapshot(snapshot) # Class for printing idle spinner async def spinner(): CHARS = ['|', '/', '-', '\\'] counter = 0 while True: print(CHARS[counter] + '\x1B[G', end='', file=stderr, flush=True) counter = (counter + 1) % len(CHARS) await asyncio.sleep(1)