LGPL2.1. See file COPYING.
"""
import cephfs
-import errno
import rados
from contextlib import contextmanager
-from mgr_util import CephfsClient, CephfsConnectionException, \
- open_filesystem
+from mgr_util import CephfsClient, open_filesystem
from collections import OrderedDict
from datetime import datetime, timezone
import logging
-from threading import Timer
+from threading import Timer, Lock
+from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
+ Tuple, TypeVar, Union, Type
+from types import TracebackType
import sqlite3
-from .schedule import Schedule, parse_retention
+from .schedule import Schedule
import traceback
log = logging.getLogger(__name__)
+CephfsClientT = TypeVar('CephfsClientT', bound=CephfsClient)
+
+
@contextmanager
-def open_ioctx(self, pool):
+def open_ioctx(self: CephfsClientT,
+ pool: Union[int, str]) -> Iterator[rados.Ioctx]:
try:
if type(pool) is int:
with self.mgr.rados.open_ioctx2(pool) as ioctx:
raise
-def updates_schedule_db(func):
- def f(self, fs, schedule_or_path, *args):
- func(self, fs, schedule_or_path, *args)
+FuncT = TypeVar('FuncT', bound=Callable[..., None])
+
+
+def updates_schedule_db(func: FuncT) -> FuncT:
+ def f(self: 'SnapSchedClient', fs: str, schedule_or_path: str, *args: Any) -> None:
+ ret = func(self, fs, schedule_or_path, *args)
path = schedule_or_path
if isinstance(schedule_or_path, Schedule):
path = schedule_or_path.path
self.refresh_snap_timers(fs, path)
- return f
+ return ret
+ return cast(FuncT, f)
-def get_prune_set(candidates, retention):
+def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]],
+ retention: Dict[str, int]) -> Set:
PRUNING_PATTERNS = OrderedDict([
# n is for keep last n snapshots, uses the snapshot name timestamp
# format for lowest granularity
if snap_ts != last:
last = snap_ts
if snap not in keep:
- log.debug(f'keeping {snap[0].d_name} due to {period_count}{period}')
+ log.debug((f'keeping {snap[0].d_name} due to '
+ f'{period_count}{period}'))
keep.append(snap)
kept_for_this_period += 1
if kept_for_this_period == period_count:
f'{period_count}{period}'))
break
if len(keep) > MAX_SNAPS_PER_PATH:
- log.info(f'Would keep more then {MAX_SNAPS_PER_PATH}, pruning keep set')
+ log.info((f'Would keep more then {MAX_SNAPS_PER_PATH}, '
+ 'pruning keep set'))
keep = keep[:MAX_SNAPS_PER_PATH]
return candidates - set(keep)
+class DBInfo():
+ def __init__(self, fs: str, db: sqlite3.Connection):
+ self.fs: str = fs
+ self.lock: Lock = Lock()
+ self.db: sqlite3.Connection = db
+
+
+# context manager for serializing db connection usage
+class DBConnectionManager():
+ def __init__(self, info: DBInfo):
+ self.dbinfo: DBInfo = info
+
+ # using string as return type hint since __future__.annotations is not
+ # available with Python 3.6; its avaialbe starting from Pytohn 3.7
+ def __enter__(self) -> 'DBConnectionManager':
+ log.debug(f'locking db connection for {self.dbinfo.fs}')
+ self.dbinfo.lock.acquire()
+ log.debug(f'locked db connection for {self.dbinfo.fs}')
+ return self
+
+ def __exit__(self,
+ exception_type: Optional[Type[BaseException]],
+ exception_value: Optional[BaseException],
+ traceback: Optional[TracebackType]) -> None:
+ log.debug(f'unlocking db connection for {self.dbinfo.fs}')
+ self.dbinfo.lock.release()
+ log.debug(f'unlocked db connection for {self.dbinfo.fs}')
+
+
class SnapSchedClient(CephfsClient):
- def __init__(self, mgr):
+ def __init__(self, mgr: Any) -> None:
super(SnapSchedClient, self).__init__(mgr)
# TODO maybe iterate over all fs instance in fsmap and load snap dbs?
- self.sqlite_connections = {}
- self.active_timers = {}
+ #
+ # Each db connection is now guarded by a Lock; this is required to
+ # avoid concurrent DB transactions when more than one paths in a
+ # file-system are scheduled at the same interval eg. 1h; without the
+ # lock, there are races to use the same connection, causing nested
+ # transactions to be aborted
+ self.sqlite_connections: Dict[str, DBInfo] = {}
+ self.active_timers: Dict[Tuple[str, str], List[Timer]] = {}
+ self.conn_lock: Lock = Lock() # lock to protect add/lookup db connections
@property
- def allow_minute_snaps(self):
+ def allow_minute_snaps(self) -> None:
return self.mgr.get_module_option('allow_m_granularity')
- def get_schedule_db(self, fs):
+ @property
+ def dump_on_update(self) -> None:
+ return self.mgr.get_module_option('dump_on_update')
+
+ def get_schedule_db(self, fs: str) -> DBConnectionManager:
+ dbinfo = None
+ self.conn_lock.acquire()
if fs not in self.sqlite_connections:
- self.sqlite_connections[fs] = sqlite3.connect(
- ':memory:',
- check_same_thread=False)
- with self.sqlite_connections[fs] as con:
- con.row_factory = sqlite3.Row
- con.execute("PRAGMA FOREIGN_KEYS = 1")
- pool = self.get_metadata_pool(fs)
- with open_ioctx(self, pool) as ioctx:
- try:
- size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
- db = ioctx.read(SNAP_DB_OBJECT_NAME,
- size).decode('utf-8')
- con.executescript(db)
- except rados.ObjectNotFound:
- log.debug(f'No schedule DB found in {fs}, creating one.')
- con.executescript(Schedule.CREATE_TABLES)
- return self.sqlite_connections[fs]
-
- def store_schedule_db(self, fs):
- # only store db is it exists, otherwise nothing to do
- metadata_pool = self.get_metadata_pool(fs)
- if not metadata_pool:
- raise CephfsConnectionException(
- -errno.ENOENT, "Filesystem {} does not exist".format(fs))
- if fs in self.sqlite_connections:
- db_content = []
- db = self.sqlite_connections[fs]
- with db:
- for row in db.iterdump():
- db_content.append(row)
- with open_ioctx(self, metadata_pool) as ioctx:
- ioctx.write_full(SNAP_DB_OBJECT_NAME,
- '\n'.join(db_content).encode('utf-8'))
-
- def _is_allowed_repeat(self, exec_row, path):
+ poolid = self.get_metadata_pool(fs)
+ assert poolid, f'fs "{fs}" not found'
+ uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph"
+ log.debug(f"using uri {uri}")
+ db = sqlite3.connect(uri, check_same_thread=False, uri=True)
+ db.execute('PRAGMA FOREIGN_KEYS = 1')
+ db.execute('PRAGMA JOURNAL_MODE = PERSIST')
+ db.execute('PRAGMA PAGE_SIZE = 65536')
+ db.execute('PRAGMA CACHE_SIZE = 256')
+ db.row_factory = sqlite3.Row
+ # check for legacy dump store
+ pool_param = cast(Union[int, str], poolid)
+ with open_ioctx(self, pool_param) as ioctx:
+ try:
+ size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
+ dump = ioctx.read(SNAP_DB_OBJECT_NAME, size).decode('utf-8')
+ db.executescript(dump)
+ ioctx.remove(SNAP_DB_OBJECT_NAME)
+ except rados.ObjectNotFound:
+ log.debug(f'No legacy schedule DB found in {fs}')
+ db.executescript(Schedule.CREATE_TABLES)
+ self.sqlite_connections[fs] = DBInfo(fs, db)
+ dbinfo = self.sqlite_connections[fs]
+ self.conn_lock.release()
+ return DBConnectionManager(dbinfo)
+
+ def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool:
if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
if self.allow_minute_snaps:
- log.debug(f'Minute repeats allowed, scheduling snapshot on path {path}')
+ log.debug(('Minute repeats allowed, '
+ f'scheduling snapshot on path {path}'))
return True
else:
- log.info(f'Minute repeats disabled, skipping snapshot on path {path}')
+ log.info(('Minute repeats disabled, '
+ f'skipping snapshot on path {path}'))
return False
else:
return True
+ def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]:
+ with db:
+ if self.dump_on_update:
+ dump = [line for line in db.iterdump()]
+ dump = "\n".join(dump)
+ log.debug(f"db dump:\n{dump}")
+ cur = db.execute(Schedule.EXEC_QUERY, (path,))
+ all_rows = cur.fetchall()
+ rows = [r for r in all_rows
+ if self._is_allowed_repeat(r, path)][0:1]
+ return rows
- def refresh_snap_timers(self, fs, path):
+ def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None:
try:
- log.debug(f'SnapDB on {fs} changed for {path}, updating next Timer')
- db = self.get_schedule_db(fs)
+ log.debug((f'SnapDB on {fs} changed for {path}, '
+ 'updating next Timer'))
rows = []
- with db:
- cur = db.execute(Schedule.EXEC_QUERY, (path,))
- all_rows = cur.fetchall()
- rows = [r for r in all_rows if self._is_allowed_repeat(r, path)][0:1]
+ # olddb is passed in the case where we land here without a timer
+ # the lock on the db connection has already been taken
+ if olddb:
+ rows = self.fetch_schedules(olddb, path)
+ else:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ rows = self.fetch_schedules(db, path)
timers = self.active_timers.get((fs, path), [])
for timer in timers:
timer.cancel()
except Exception:
self._log_exception('refresh_snap_timers')
- def _log_exception(self, fct):
+ def _log_exception(self, fct: str) -> None:
log.error(f'{fct} raised an exception:')
log.error(traceback.format_exc())
- def create_scheduled_snapshot(self, fs_name, path, retention, start, repeat):
+ def create_scheduled_snapshot(self,
+ fs_name: str,
+ path: str,
+ retention: str,
+ start: str,
+ repeat: str) -> None:
log.debug(f'Scheduled snapshot of {path} triggered')
try:
- db = self.get_schedule_db(fs_name)
- sched = Schedule.get_db_schedules(path,
- db,
- fs_name,
- repeat=repeat,
- start=start)[0]
- time = datetime.now(timezone.utc)
- with open_filesystem(self, fs_name) as fs_handle:
- snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
- snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
- fs_handle.mkdir(snap_name, 0o755)
- log.info(f'created scheduled snapshot of {path}')
- log.debug(f'created scheduled snapshot {snap_name}')
- sched.update_last(time, db)
- except cephfs.Error:
- self._log_exception('create_scheduled_snapshot')
- sched.set_inactive(db)
- except Exception:
- # catch all exceptions cause otherwise we'll never know since this
- # is running in a thread
- self._log_exception('create_scheduled_snapshot')
+ with self.get_schedule_db(fs_name) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ try:
+ sched = Schedule.get_db_schedules(path,
+ db,
+ fs_name,
+ repeat=repeat,
+ start=start)[0]
+ time = datetime.now(timezone.utc)
+ with open_filesystem(self, fs_name) as fs_handle:
+ snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
+ snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
+ fs_handle.mkdir(snap_name, 0o755)
+ log.info(f'created scheduled snapshot of {path}')
+ log.debug(f'created scheduled snapshot {snap_name}')
+ sched.update_last(time, db)
+ except cephfs.Error:
+ self._log_exception('create_scheduled_snapshot')
+ sched.set_inactive(db)
+ except Exception:
+ # catch all exceptions cause otherwise we'll never know since this
+ # is running in a thread
+ self._log_exception('create_scheduled_snapshot')
finally:
- self.refresh_snap_timers(fs_name, path)
+ with self.get_schedule_db(fs_name) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ self.refresh_snap_timers(fs_name, path, db)
self.prune_snapshots(sched)
- def prune_snapshots(self, sched):
+ def prune_snapshots(self, sched: Schedule) -> None:
try:
log.debug('Pruning snapshots')
ret = sched.retention
log.debug(f'rmdir on {dirname}')
fs_handle.rmdir(f'{path}/.snap/{dirname}')
if to_prune:
- sched.update_pruned(time, self.get_schedule_db(sched.fs),
- len(to_prune))
+ with self.get_schedule_db(sched.fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ sched.update_pruned(time, db, len(to_prune))
except Exception:
self._log_exception('prune_snapshots')
- def get_snap_schedules(self, fs, path):
- db = self.get_schedule_db(fs)
- return Schedule.get_db_schedules(path, db, fs)
+ def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ return Schedule.get_db_schedules(path, db, fs)
- def list_snap_schedules(self, fs, path, recursive):
- db = self.get_schedule_db(fs)
- return Schedule.list_schedules(path, db, fs, recursive)
+ def list_snap_schedules(self,
+ fs: str,
+ path: str,
+ recursive: bool) -> List[Schedule]:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ return Schedule.list_schedules(path, db, fs, recursive)
@updates_schedule_db
# TODO improve interface
- def store_snap_schedule(self, fs, path_, args):
+ def store_snap_schedule(self,
+ fs: str, path_: str,
+ args: Tuple[str, str, str, str,
+ Optional[str], Optional[str]]) -> None:
sched = Schedule(*args)
log.debug(f'repeat is {sched.repeat}')
if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps:
log.error('not allowed')
raise ValueError('no minute snaps allowed')
log.debug(f'attempting to add schedule {sched}')
- db = self.get_schedule_db(fs)
- sched.store_schedule(db)
- self.store_schedule_db(sched.fs)
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ sched.store_schedule(db)
@updates_schedule_db
- def rm_snap_schedule(self, fs, path, schedule, start):
- db = self.get_schedule_db(fs)
- Schedule.rm_schedule(db, path, schedule, start)
+ def rm_snap_schedule(self,
+ fs: str, path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ Schedule.rm_schedule(db, path, schedule, start)
@updates_schedule_db
def add_retention_spec(self,
- fs,
- path,
- retention_spec_or_period,
- retention_count):
+ fs: str,
+ path: str,
+ retention_spec_or_period: str,
+ retention_count: Optional[str]) -> None:
retention_spec = retention_spec_or_period
if retention_count:
retention_spec = retention_count + retention_spec
- db = self.get_schedule_db(fs)
- Schedule.add_retention(db, path, retention_spec)
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ Schedule.add_retention(db, path, retention_spec)
@updates_schedule_db
def rm_retention_spec(self,
- fs,
- path,
- retention_spec_or_period,
- retention_count):
+ fs: str,
+ path: str,
+ retention_spec_or_period: str,
+ retention_count: Optional[str]) -> None:
retention_spec = retention_spec_or_period
if retention_count:
retention_spec = retention_count + retention_spec
- db = self.get_schedule_db(fs)
- Schedule.rm_retention(db, path, retention_spec)
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ Schedule.rm_retention(db, path, retention_spec)
@updates_schedule_db
- def activate_snap_schedule(self, fs, path, schedule, start):
- db = self.get_schedule_db(fs)
- schedules = Schedule.get_db_schedules(path, db, fs,
- schedule=schedule,
- start=start)
- [s.set_active(db) for s in schedules]
+ def activate_snap_schedule(self,
+ fs: str,
+ path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ schedules = Schedule.get_db_schedules(path, db, fs,
+ schedule=schedule,
+ start=start)
+ for s in schedules:
+ s.set_active(db)
@updates_schedule_db
- def deactivate_snap_schedule(self, fs, path, schedule, start):
- db = self.get_schedule_db(fs)
- schedules = Schedule.get_db_schedules(path, db, fs,
- schedule=schedule,
- start=start)
- [s.set_inactive(db) for s in schedules]
+ def deactivate_snap_schedule(self,
+ fs: str, path: str,
+ schedule: Optional[str],
+ start: Optional[str]) -> None:
+ with self.get_schedule_db(fs) as conn_mgr:
+ db = conn_mgr.dbinfo.db
+ schedules = Schedule.get_db_schedules(path, db, fs,
+ schedule=schedule,
+ start=start)
+ for s in schedules:
+ s.set_inactive(db)