ZFS Automatic Snapshot Daemon
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
5.6 KiB

''' Configuration subsystem '''
from typing import MutableMapping, Any
4 years ago
from sys import argv, executable, stdout
from os import environ, getcwd, sep as psep
from os.path import dirname, abspath, join as joinpath, \
expanduser, splitdrive, isfile
from copy import deepcopy
import logging
import pprint
import asyncio
from apscheduler.triggers.cron import CronTrigger # type: ignore
from apscheduler.triggers.interval import IntervalTrigger # type: ignore
from zasd.log import log
from zasd.util import ifenv, uniq, find_file
import zasd.config as _config
#
# Constants
_CONFIG_BASENAME = 'zasd.conf.py'
_CONFIG_FILENAMES = [_CONFIG_BASENAME, '.' + _CONFIG_BASENAME]
4 years ago
_WARNING_WAIT = 3
#_WARNING_WAIT = 10
# Default configuration
_DEFAULT_CONFIG = dict(
zfs_path = '/usr/local/bin/zfs',
fswatch_path = '/usr/local/bin/fswatch',
tab_size = 2,
log_level = logging.INFO,
log_format = '%(asctime)s %(name)s [%(levelname)-8s]: %(message)s',
log_date_format = '%a, %d %b %Y, %H:%M:%S',
separator = ':',
destroy_trigger = CronTrigger.from_crontab('* * * * *'),
# Defaults will take a snapshot every 12 hours
# and keep them for a week, so if the config file
# should disappear for some reason, there will
# at least be _some_ recoverable snapshots
defaults = dict(
disabled = False,
filesystems = ['tank'],
recursive = True,
tag = 'zasd',
trigger = IntervalTrigger(hours=12),
if_modified = False,
priority = 1,
keep = 14
),
schedules = [{}]
)
async def load_config():
''' Load configuration file '''
if len(argv) > 1:
# Configuration pathname given as first argument
if isfile(argv[1]):
config_pathname = argv[1]
else:
log.warning('Could not find configuration file %s', argv[1])
return await _warn_default()
else:
# No configuration pathname given; attempt to find it:
# Get root of system partition
sys_root_path = environ.get('SystemDrive', splitdrive(executable)[0] + psep)
# Get system configuration directory
sys_conf_path = joinpath(
*ifenv(
'SystemRoot',
lambda path: [path, 'System32', 'drivers', 'etc'],
lambda: [sys_root_path, 'etc']))
# Get user home directory
user_home_path = expanduser('~')
# Get path of this Python file
if '__file__' in globals():
script_path = dirname(abspath(__file__))
else:
script_path = dirname(abspath(argv[0]))
# Build list of configuration file pathnames to search
config_paths = uniq([
getcwd(),
user_home_path,
joinpath(user_home_path, '.config'),
joinpath(user_home_path, '.local', 'share'),
sys_conf_path,
script_path])
config_pathnames = list(joinpath(p, f) for p in config_paths for f in _CONFIG_FILENAMES)
# Attempt to find a config file
config_pathname = find_file(config_pathnames)
if config_pathname is None:
log.warning('Unable to find a config file at:')
for pathname in config_pathnames:
log.warning(' %s', pathname)
await _warn_default()
return
with open(config_pathname, 'rt', encoding='utf-8') as f:
config_source = f.read()
# Create configuration file scopes
global_scope = dict(
CRITICAL = logging.CRITICAL,
ERROR = logging.ERROR,
WARNING = logging.WARNING,
INFO = logging.INFO,
DEBUG = logging.DEBUG,
NOTSET = logging.NOTSET,
crontab = CronTrigger.from_crontab,
cron = CronTrigger.from_crontab,
interval = IntervalTrigger,
every = IntervalTrigger)
local_scope: MutableMapping[str, Any] = dict()
# Execute configuration file
exec(config_source, global_scope, local_scope)
# Merge configuration with default configuration
_config.change(_merge_configs(_DEFAULT_CONFIG, local_scope))
log.debug('Loaded configuration')
4 years ago
if _config.get('log_level') <= logging.DEBUG:
log.debug('')
for line in pprint.pformat(_config).split('\n'):
logging.debug(_config.get('tab_size') * ' ' + line)
log.debug('')
async def _warn_default():
log.warning('')
4 years ago
log.warning(
'Waiting %s seconds before loading default configuration...',
_WARNING_WAIT)
log.warning('')
4 years ago
await asyncio.sleep(_WARNING_WAIT)
stdout.write('\r')
log.warning('Loading default configuration')
log.warning('')
4 years ago
_config.change(deepcopy(_DEFAULT_CONFIG))
# pylint: disable=dangerous-default-value
def _merge_configs(base, diff, path=[]):
''' Deep merge configuration dictionaries '''
base = base if len(path) == 0 else deepcopy(base)
for key, value in diff.items():
if not key in base:
base[key] = value
elif not isinstance(value, type(base[key])):
log.error('Cannot merge diff type %s with base %s type at %s.%s',
type(value), type(base[key]), '.'.join(path), key)
return None
elif isinstance(value, dict):
merged = _merge_configs(base[key], value, path + [key])
if merged is None:
return None
base[key] = merged
else:
base[key] = value
return base