ZFS Automatic Snapshot Daemon
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

237 lines
8.3 KiB

4 years ago
import logging
from datetime import datetime
import six
from apscheduler.job import Job
from apscheduler.util import undefined
from apscheduler.schedulers.base import STATE_STOPPED
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_MAX_INSTANCES, EVENT_JOB_ERROR, EVENT_JOB_MISSED
class AsyncIOPriorityScheduler(AsyncIOScheduler):
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
next_run_time=undefined, jobstore='default', executor='default',
replace_existing=False, group=0, priority=0, **trigger_args):
job_kwargs = {
'trigger': self._create_trigger(trigger, trigger_args),
'executor': executor,
'func': func,
'args': tuple(args) if args is not None else (),
'kwargs': dict(kwargs) if kwargs is not None else {},
'id': id,
'name': name,
'misfire_grace_time': misfire_grace_time,
'coalesce': coalesce,
'max_instances': max_instances,
'next_run_time': next_run_time,
'group': group,
'priority': priority
}
job_kwargs = dict((key, value) for key, value in job_kwargs.items() if
value is not undefined)
job = _PriorityJob(self, **job_kwargs)
# Don't really add jobs to job stores before the scheduler is up and running
with self._jobstores_lock:
if self.state == STATE_STOPPED:
self._pending_jobs.append((job, jobstore, replace_existing))
self._logger.info('Adding job tentatively -- it will be properly scheduled when '
'the scheduler starts')
else:
self._real_add_job(job, jobstore, replace_existing)
return job
def get_due_jobs(self, now=None):
if now is None:
now = datetime.now(self.timezone)
with self._jobstores_lock:
jobs = []
for alias, store in six.iteritems(self._jobstores):
jobs.extend(store.get_due_jobs(now))
return jobs
def _process_jobs(self):
due_jobs = self.get_due_jobs()
wait_seconds = super()._process_jobs()
executors = []
for job in due_jobs:
executor = self._lookup_executor(job.executor)
if not executor in executors:
executors.append(executor)
for executor in executors:
if hasattr(executor, 'commit'):
executor.commit()
return wait_seconds
class _PriorityJob(Job):
def __init__(self, scheduler, priority=1, group=1, **kwargs):
self.priority = priority
self.group = group
super().__init__(scheduler, **kwargs)
def modify(self, **changes):
if 'priority' in changes:
self.priority = changes.pop('priority')
if 'group' in changes:
self.group = changes.pop('group')
super().modify(**changes)
4 years ago
def __getstate__(self):
state = super().__getstate__()
state['priority'] = self.priority
state['group'] = self.group
return state
def __setstate__(self, state):
priority = state.pop('priority')
group = state.pop('group')
super().__setstate__(state)
self.priority = priority
self.group = group
class AsyncIOPriorityExecutor(AsyncIOExecutor):
def __init__(self, *args, **kwargs):
self.scheduler = None
self.job_groups = {}
super().__init__(*args, **kwargs)
def start(self, scheduler, alias):
# Store a reference to the scheduler and add job listener
self.scheduler = scheduler
self.scheduler.add_listener(self.on_job_executed, mask=EVENT_JOB_EXECUTED|EVENT_JOB_MAX_INSTANCES|EVENT_JOB_ERROR|EVENT_JOB_MISSED)
super().start(scheduler, alias)
def shutdown(self, *args):
# Remove job listener
self.scheduler.remove_listener(self.on_job_executed)
super().shutdown(*args)
def submit_job(self, job, run_times):
run_times = set(run_times)
logger = logging.getLogger('apscheduler')
4 years ago
logger.debug('Incoming submission for job %s with run times:', job.id)
for time in run_times:
logger.debug(' %s', time)
if job.group in self.job_groups:
job_group = self.job_groups[job.group]
if job.id in job_group['times_for_job']:
# Fetch time set
times_for_job = job_group['times_for_job'][job.id]
else:
# Create and store time set
times_for_job = set()
job_group['times_for_job'][job.id] = times_for_job
# Fetch map of times to their jobs
job_for_time = job_group['job_for_time']
# Filter out run times that coincide with higher-priority jobs
run_times = set(
time for time in run_times if
not time in job_for_time or
job.priority < self.scheduler.get_job(job_for_time[time]).priority
)
else:
# Create and store empty job group
times_for_job = set()
job_for_time = {}
job_group = { 'times_for_job': { job.id: times_for_job }, 'job_for_time': job_for_time }
self.job_groups[job.group] = job_group
# Add new times to stored time set for current job
times_for_job |= run_times
# Remove jobs in time set from other jobs in group
# Necessary when incoming job overwrites time slots for existing jobs
#
# Look at time sets for all jobs in group
for job_id, times in list(job_group['times_for_job'].items()):
# Look at time set for this job
for time in set(times):
# Does time in set coincide with time set of job being submitted?
if time in times_for_job and job_id != job.id:
# Remove time from time set
times.remove(time)
if len(times) == 0:
# Delete time set if empty
del job_group['times_for_job'][job_id]
else:
# Update time set if not
job_group['times_for_job'][job_id] = times
logger.debug('Final time set for job %s:', job.id)
for time in times_for_job:
logger.debug(' %s', time)
# Map run times to jobs
for time in run_times:
job_for_time[time] = job.id
def commit(self):
logger = logging.getLogger('apscheduler')
4 years ago
logger.debug('Committing jobs:')
# Look at every group
for group in self.job_groups.values():
4 years ago
# Look at every job in group
for job_id, times in group['times_for_job'].items():
# Find job for ID and submit to scheduler superclass
job = self.scheduler.get_job(job_id)
super().submit_job(job, list(times))
logger.debug(' %s', job.id)
for time in times:
logger.debug(' %s', time)
def on_job_executed(self, event):
time = event.scheduled_run_time
logger = logging.getLogger('apscheduler')
4 years ago
logger.debug('Job %s has executed', event.job_id)
# Find job for job ID
job = self.scheduler.get_job(event.job_id)
# Get group and times for job
job_group = self.job_groups[job.group]
times_for_job = job_group['times_for_job']
job_for_time = job_group['job_for_time']
# Delete job from time map
del job_for_time[time]
# Delte time from job time set
times_for_job[job.id].remove(time)
if len(times_for_job[job.id]) == 0:
logger.debug('Deleting empty time set for job %s', job.id)
del times_for_job[job.id]
else:
logger.debug('Remaining time set for job %s:', job.id)
for time in times_for_job[job.id]:
logger.debug(' %s', time)