]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/snap_schedule/fs/schedule_client.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / snap_schedule / fs / schedule_client.py
index bb5bcf17f792718bae51db4497ef11368e5f3660..cf1ba78aac7322e0b5d3975cab3cbafc47d5d2ae 100644 (file)
@@ -4,17 +4,18 @@ Copyright (C) 2020 SUSE
 LGPL2.1.  See file COPYING.
 """
 import cephfs
-import errno
 import rados
 from contextlib import contextmanager
-from mgr_util import CephfsClient, CephfsConnectionException, \
-        open_filesystem
+from mgr_util import CephfsClient, open_filesystem
 from collections import OrderedDict
 from datetime import datetime, timezone
 import logging
-from threading import Timer
+from threading import Timer, Lock
+from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
+    Tuple, TypeVar, Union, Type
+from types import TracebackType
 import sqlite3
-from .schedule import Schedule, parse_retention
+from .schedule import Schedule
 import traceback
 
 
@@ -30,8 +31,12 @@ SNAPSHOT_PREFIX = 'scheduled'
 log = logging.getLogger(__name__)
 
 
+CephfsClientT = TypeVar('CephfsClientT', bound=CephfsClient)
+
+
 @contextmanager
-def open_ioctx(self, pool):
+def open_ioctx(self: CephfsClientT,
+               pool: Union[int, str]) -> Iterator[rados.Ioctx]:
     try:
         if type(pool) is int:
             with self.mgr.rados.open_ioctx2(pool) as ioctx:
@@ -46,17 +51,22 @@ def open_ioctx(self, pool):
         raise
 
 
-def updates_schedule_db(func):
-    def f(self, fs, schedule_or_path, *args):
-        func(self, fs, schedule_or_path, *args)
+FuncT = TypeVar('FuncT', bound=Callable[..., None])
+
+
+def updates_schedule_db(func: FuncT) -> FuncT:
+    def f(self: 'SnapSchedClient', fs: str, schedule_or_path: str, *args: Any) -> None:
+        ret = func(self, fs, schedule_or_path, *args)
         path = schedule_or_path
         if isinstance(schedule_or_path, Schedule):
             path = schedule_or_path.path
         self.refresh_snap_timers(fs, path)
-    return f
+        return ret
+    return cast(FuncT, f)
 
 
-def get_prune_set(candidates, retention):
+def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]],
+                  retention: Dict[str, int]) -> Set:
     PRUNING_PATTERNS = OrderedDict([
         # n is for keep last n snapshots, uses the snapshot name timestamp
         # format for lowest granularity
@@ -86,7 +96,8 @@ def get_prune_set(candidates, retention):
             if snap_ts != last:
                 last = snap_ts
                 if snap not in keep:
-                    log.debug(f'keeping {snap[0].d_name} due to {period_count}{period}')
+                    log.debug((f'keeping {snap[0].d_name} due to '
+                               f'{period_count}{period}'))
                     keep.append(snap)
                     kept_for_this_period += 1
                     if kept_for_this_period == period_count:
@@ -94,80 +105,132 @@ def get_prune_set(candidates, retention):
                                    f'{period_count}{period}'))
                         break
     if len(keep) > MAX_SNAPS_PER_PATH:
-        log.info(f'Would keep more then {MAX_SNAPS_PER_PATH}, pruning keep set')
+        log.info((f'Would keep more then {MAX_SNAPS_PER_PATH}, '
+                  'pruning keep set'))
         keep = keep[:MAX_SNAPS_PER_PATH]
     return candidates - set(keep)
 
 
+class DBInfo():
+    def __init__(self, fs: str, db: sqlite3.Connection):
+        self.fs: str = fs
+        self.lock: Lock = Lock()
+        self.db: sqlite3.Connection = db
+
+
+# context manager for serializing db connection usage
+class DBConnectionManager():
+    def __init__(self, info: DBInfo):
+        self.dbinfo: DBInfo = info
+
+    # using string as return type hint since __future__.annotations is not
+    # available with Python 3.6; its avaialbe starting from Pytohn 3.7
+    def __enter__(self) -> 'DBConnectionManager':
+        log.debug(f'locking db connection for {self.dbinfo.fs}')
+        self.dbinfo.lock.acquire()
+        log.debug(f'locked db connection for {self.dbinfo.fs}')
+        return self
+
+    def __exit__(self,
+                 exception_type: Optional[Type[BaseException]],
+                 exception_value: Optional[BaseException],
+                 traceback: Optional[TracebackType]) -> None:
+        log.debug(f'unlocking db connection for {self.dbinfo.fs}')
+        self.dbinfo.lock.release()
+        log.debug(f'unlocked db connection for {self.dbinfo.fs}')
+
+
 class SnapSchedClient(CephfsClient):
 
-    def __init__(self, mgr):
+    def __init__(self, mgr: Any) -> None:
         super(SnapSchedClient, self).__init__(mgr)
         # TODO maybe iterate over all fs instance in fsmap and load snap dbs?
-        self.sqlite_connections = {}
-        self.active_timers = {}
+        #
+        # Each db connection is now guarded by a Lock; this is required to
+        # avoid concurrent DB transactions when more than one paths in a
+        # file-system are scheduled at the same interval eg. 1h; without the
+        # lock, there are races to use the same connection, causing  nested
+        # transactions to be aborted
+        self.sqlite_connections: Dict[str, DBInfo] = {}
+        self.active_timers: Dict[Tuple[str, str], List[Timer]] = {}
+        self.conn_lock: Lock = Lock()  # lock to protect add/lookup db connections
 
     @property
-    def allow_minute_snaps(self):
+    def allow_minute_snaps(self) -> None:
         return self.mgr.get_module_option('allow_m_granularity')
 
-    def get_schedule_db(self, fs):
+    @property
+    def dump_on_update(self) -> None:
+        return self.mgr.get_module_option('dump_on_update')
+
+    def get_schedule_db(self, fs: str) -> DBConnectionManager:
+        dbinfo = None
+        self.conn_lock.acquire()
         if fs not in self.sqlite_connections:
-            self.sqlite_connections[fs] = sqlite3.connect(
-                ':memory:',
-                check_same_thread=False)
-            with self.sqlite_connections[fs] as con:
-                con.row_factory = sqlite3.Row
-                con.execute("PRAGMA FOREIGN_KEYS = 1")
-                pool = self.get_metadata_pool(fs)
-                with open_ioctx(self, pool) as ioctx:
-                    try:
-                        size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
-                        db = ioctx.read(SNAP_DB_OBJECT_NAME,
-                                        size).decode('utf-8')
-                        con.executescript(db)
-                    except rados.ObjectNotFound:
-                        log.debug(f'No schedule DB found in {fs}, creating one.')
-                        con.executescript(Schedule.CREATE_TABLES)
-        return self.sqlite_connections[fs]
-
-    def store_schedule_db(self, fs):
-        # only store db is it exists, otherwise nothing to do
-        metadata_pool = self.get_metadata_pool(fs)
-        if not metadata_pool:
-            raise CephfsConnectionException(
-                -errno.ENOENT, "Filesystem {} does not exist".format(fs))
-        if fs in self.sqlite_connections:
-            db_content = []
-            db = self.sqlite_connections[fs]
-            with db:
-                for row in db.iterdump():
-                    db_content.append(row)
-        with open_ioctx(self, metadata_pool) as ioctx:
-            ioctx.write_full(SNAP_DB_OBJECT_NAME,
-                             '\n'.join(db_content).encode('utf-8'))
-
-    def _is_allowed_repeat(self, exec_row, path):
+            poolid = self.get_metadata_pool(fs)
+            assert poolid, f'fs "{fs}" not found'
+            uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph"
+            log.debug(f"using uri {uri}")
+            db = sqlite3.connect(uri, check_same_thread=False, uri=True)
+            db.execute('PRAGMA FOREIGN_KEYS = 1')
+            db.execute('PRAGMA JOURNAL_MODE = PERSIST')
+            db.execute('PRAGMA PAGE_SIZE = 65536')
+            db.execute('PRAGMA CACHE_SIZE = 256')
+            db.row_factory = sqlite3.Row
+            # check for legacy dump store
+            pool_param = cast(Union[int, str], poolid)
+            with open_ioctx(self, pool_param) as ioctx:
+                try:
+                    size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
+                    dump = ioctx.read(SNAP_DB_OBJECT_NAME, size).decode('utf-8')
+                    db.executescript(dump)
+                    ioctx.remove(SNAP_DB_OBJECT_NAME)
+                except rados.ObjectNotFound:
+                    log.debug(f'No legacy schedule DB found in {fs}')
+            db.executescript(Schedule.CREATE_TABLES)
+            self.sqlite_connections[fs] = DBInfo(fs, db)
+        dbinfo = self.sqlite_connections[fs]
+        self.conn_lock.release()
+        return DBConnectionManager(dbinfo)
+
+    def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool:
         if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
             if self.allow_minute_snaps:
-                log.debug(f'Minute repeats allowed, scheduling snapshot on path {path}')
+                log.debug(('Minute repeats allowed, '
+                           f'scheduling snapshot on path {path}'))
                 return True
             else:
-                log.info(f'Minute repeats disabled, skipping snapshot on path {path}')
+                log.info(('Minute repeats disabled, '
+                          f'skipping snapshot on path {path}'))
                 return False
         else:
             return True
 
+    def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]:
+        with db:
+            if self.dump_on_update:
+                dump = [line for line in db.iterdump()]
+                dump = "\n".join(dump)
+                log.debug(f"db dump:\n{dump}")
+            cur = db.execute(Schedule.EXEC_QUERY, (path,))
+            all_rows = cur.fetchall()
+            rows = [r for r in all_rows
+                    if self._is_allowed_repeat(r, path)][0:1]
+            return rows
 
-    def refresh_snap_timers(self, fs, path):
+    def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None:
         try:
-            log.debug(f'SnapDB on {fs} changed for {path}, updating next Timer')
-            db = self.get_schedule_db(fs)
+            log.debug((f'SnapDB on {fs} changed for {path}, '
+                       'updating next Timer'))
             rows = []
-            with db:
-                cur = db.execute(Schedule.EXEC_QUERY, (path,))
-                all_rows = cur.fetchall()
-                rows = [r for r in all_rows if self._is_allowed_repeat(r, path)][0:1]
+            # olddb is passed in the case where we land here without a timer
+            # the lock on the db connection has already been taken
+            if olddb:
+                rows = self.fetch_schedules(olddb, path)
+            else:
+                with self.get_schedule_db(fs) as conn_mgr:
+                    db = conn_mgr.dbinfo.db
+                    rows = self.fetch_schedules(db, path)
             timers = self.active_timers.get((fs, path), [])
             for timer in timers:
                 timer.cancel()
@@ -184,39 +247,48 @@ class SnapSchedClient(CephfsClient):
         except Exception:
             self._log_exception('refresh_snap_timers')
 
-    def _log_exception(self, fct):
+    def _log_exception(self, fct: str) -> None:
         log.error(f'{fct} raised an exception:')
         log.error(traceback.format_exc())
 
-    def create_scheduled_snapshot(self, fs_name, path, retention, start, repeat):
+    def create_scheduled_snapshot(self,
+                                  fs_name: str,
+                                  path: str,
+                                  retention: str,
+                                  start: str,
+                                  repeat: str) -> None:
         log.debug(f'Scheduled snapshot of {path} triggered')
         try:
-            db = self.get_schedule_db(fs_name)
-            sched = Schedule.get_db_schedules(path,
-                                              db,
-                                              fs_name,
-                                              repeat=repeat,
-                                              start=start)[0]
-            time = datetime.now(timezone.utc)
-            with open_filesystem(self, fs_name) as fs_handle:
-                snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
-                snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
-                fs_handle.mkdir(snap_name, 0o755)
-            log.info(f'created scheduled snapshot of {path}')
-            log.debug(f'created scheduled snapshot {snap_name}')
-            sched.update_last(time, db)
-        except cephfs.Error:
-            self._log_exception('create_scheduled_snapshot')
-            sched.set_inactive(db)
-        except Exception:
-            # catch all exceptions cause otherwise we'll never know since this
-            # is running in a thread
-            self._log_exception('create_scheduled_snapshot')
+            with self.get_schedule_db(fs_name) as conn_mgr:
+                db = conn_mgr.dbinfo.db
+                try:
+                    sched = Schedule.get_db_schedules(path,
+                                                      db,
+                                                      fs_name,
+                                                      repeat=repeat,
+                                                      start=start)[0]
+                    time = datetime.now(timezone.utc)
+                    with open_filesystem(self, fs_name) as fs_handle:
+                        snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
+                        snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
+                        fs_handle.mkdir(snap_name, 0o755)
+                    log.info(f'created scheduled snapshot of {path}')
+                    log.debug(f'created scheduled snapshot {snap_name}')
+                    sched.update_last(time, db)
+                except cephfs.Error:
+                    self._log_exception('create_scheduled_snapshot')
+                    sched.set_inactive(db)
+                except Exception:
+                    # catch all exceptions cause otherwise we'll never know since this
+                    # is running in a thread
+                    self._log_exception('create_scheduled_snapshot')
         finally:
-            self.refresh_snap_timers(fs_name, path)
+            with self.get_schedule_db(fs_name) as conn_mgr:
+                db = conn_mgr.dbinfo.db
+                self.refresh_snap_timers(fs_name, path, db)
             self.prune_snapshots(sched)
 
-    def prune_snapshots(self, sched):
+    def prune_snapshots(self, sched: Schedule) -> None:
         try:
             log.debug('Pruning snapshots')
             ret = sched.retention
@@ -242,73 +314,99 @@ class SnapSchedClient(CephfsClient):
                     log.debug(f'rmdir on {dirname}')
                     fs_handle.rmdir(f'{path}/.snap/{dirname}')
                 if to_prune:
-                    sched.update_pruned(time, self.get_schedule_db(sched.fs),
-                                        len(to_prune))
+                    with self.get_schedule_db(sched.fs) as conn_mgr:
+                        db = conn_mgr.dbinfo.db
+                        sched.update_pruned(time, db, len(to_prune))
         except Exception:
             self._log_exception('prune_snapshots')
 
-    def get_snap_schedules(self, fs, path):
-        db = self.get_schedule_db(fs)
-        return Schedule.get_db_schedules(path, db, fs)
+    def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]:
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            return Schedule.get_db_schedules(path, db, fs)
 
-    def list_snap_schedules(self, fs, path, recursive):
-        db = self.get_schedule_db(fs)
-        return Schedule.list_schedules(path, db, fs, recursive)
+    def list_snap_schedules(self,
+                            fs: str,
+                            path: str,
+                            recursive: bool) -> List[Schedule]:
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            return Schedule.list_schedules(path, db, fs, recursive)
 
     @updates_schedule_db
     # TODO improve interface
-    def store_snap_schedule(self, fs, path_, args):
+    def store_snap_schedule(self,
+                            fs: str, path_: str,
+                            args: Tuple[str, str, str, str,
+                                        Optional[str], Optional[str]]) -> None:
         sched = Schedule(*args)
         log.debug(f'repeat is {sched.repeat}')
         if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps:
             log.error('not allowed')
             raise ValueError('no minute snaps allowed')
         log.debug(f'attempting to add schedule {sched}')
-        db = self.get_schedule_db(fs)
-        sched.store_schedule(db)
-        self.store_schedule_db(sched.fs)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            sched.store_schedule(db)
 
     @updates_schedule_db
-    def rm_snap_schedule(self, fs, path, schedule, start):
-        db = self.get_schedule_db(fs)
-        Schedule.rm_schedule(db, path, schedule, start)
+    def rm_snap_schedule(self,
+                         fs: str, path: str,
+                         schedule: Optional[str],
+                         start: Optional[str]) -> None:
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            Schedule.rm_schedule(db, path, schedule, start)
 
     @updates_schedule_db
     def add_retention_spec(self,
-                           fs,
-                           path,
-                           retention_spec_or_period,
-                           retention_count):
+                           fs: str,
+                           path: str,
+                           retention_spec_or_period: str,
+                           retention_count: Optional[str]) -> None:
         retention_spec = retention_spec_or_period
         if retention_count:
             retention_spec = retention_count + retention_spec
-        db = self.get_schedule_db(fs)
-        Schedule.add_retention(db, path, retention_spec)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            Schedule.add_retention(db, path, retention_spec)
 
     @updates_schedule_db
     def rm_retention_spec(self,
-                          fs,
-                          path,
-                          retention_spec_or_period,
-                          retention_count):
+                          fs: str,
+                          path: str,
+                          retention_spec_or_period: str,
+                          retention_count: Optional[str]) -> None:
         retention_spec = retention_spec_or_period
         if retention_count:
             retention_spec = retention_count + retention_spec
-        db = self.get_schedule_db(fs)
-        Schedule.rm_retention(db, path, retention_spec)
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            Schedule.rm_retention(db, path, retention_spec)
 
     @updates_schedule_db
-    def activate_snap_schedule(self, fs, path, schedule, start):
-        db = self.get_schedule_db(fs)
-        schedules = Schedule.get_db_schedules(path, db, fs,
-                                              schedule=schedule,
-                                              start=start)
-        [s.set_active(db) for s in schedules]
+    def activate_snap_schedule(self,
+                               fs: str,
+                               path: str,
+                               schedule: Optional[str],
+                               start: Optional[str]) -> None:
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            schedules = Schedule.get_db_schedules(path, db, fs,
+                                                  schedule=schedule,
+                                                  start=start)
+            for s in schedules:
+                s.set_active(db)
 
     @updates_schedule_db
-    def deactivate_snap_schedule(self, fs, path, schedule, start):
-        db = self.get_schedule_db(fs)
-        schedules = Schedule.get_db_schedules(path, db, fs,
-                                              schedule=schedule,
-                                              start=start)
-        [s.set_inactive(db) for s in schedules]
+    def deactivate_snap_schedule(self,
+                                 fs: str, path: str,
+                                 schedule: Optional[str],
+                                 start: Optional[str]) -> None:
+        with self.get_schedule_db(fs) as conn_mgr:
+            db = conn_mgr.dbinfo.db
+            schedules = Schedule.get_db_schedules(path, db, fs,
+                                                  schedule=schedule,
+                                                  start=start)
+            for s in schedules:
+                s.set_inactive(db)