]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/snap_schedule/fs/schedule_client.py
import ceph quincy 17.2.4
[ceph.git] / ceph / src / pybind / mgr / snap_schedule / fs / schedule_client.py
CommitLineData
f67539c2
TL
1"""
2Copyright (C) 2020 SUSE
3
4LGPL2.1. See file COPYING.
5"""
6import cephfs
f67539c2
TL
7import rados
8from contextlib import contextmanager
20effc67 9from mgr_util import CephfsClient, open_filesystem
f67539c2
TL
10from collections import OrderedDict
11from datetime import datetime, timezone
12import logging
20effc67
TL
13from threading import Timer, Lock
14from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
15 Tuple, TypeVar, Union, Type
16from types import TracebackType
f67539c2 17import sqlite3
20effc67 18from .schedule import Schedule
f67539c2
TL
19import traceback
20
21
22MAX_SNAPS_PER_PATH = 50
23SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
24SNAP_DB_PREFIX = 'snap_db'
25# increment this every time the db schema changes and provide upgrade code
26SNAP_DB_VERSION = '0'
27SNAP_DB_OBJECT_NAME = f'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}'
2a845540
TL
28# scheduled snapshots are tz suffixed
29SNAPSHOT_TS_FORMAT_TZ = '%Y-%m-%d-%H_%M_%S_%Z'
30# for backward compat snapshot name parsing
f67539c2 31SNAPSHOT_TS_FORMAT = '%Y-%m-%d-%H_%M_%S'
2a845540
TL
32# length of timestamp format (without tz suffix)
33# e.g.: scheduled-2022-04-19-05_39_00_UTC (len = "2022-04-19-05_39_00")
34SNAPSHOT_TS_FORMAT_LEN = 19
f67539c2
TL
35SNAPSHOT_PREFIX = 'scheduled'
36
37log = logging.getLogger(__name__)
38
39
20effc67
TL
40CephfsClientT = TypeVar('CephfsClientT', bound=CephfsClient)
41
42
f67539c2 43@contextmanager
20effc67
TL
44def open_ioctx(self: CephfsClientT,
45 pool: Union[int, str]) -> Iterator[rados.Ioctx]:
f67539c2
TL
46 try:
47 if type(pool) is int:
48 with self.mgr.rados.open_ioctx2(pool) as ioctx:
49 ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
50 yield ioctx
51 else:
52 with self.mgr.rados.open_ioctx(pool) as ioctx:
53 ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
54 yield ioctx
55 except rados.ObjectNotFound:
56 log.error("Failed to locate pool {}".format(pool))
57 raise
58
59
20effc67
TL
60FuncT = TypeVar('FuncT', bound=Callable[..., None])
61
62
63def updates_schedule_db(func: FuncT) -> FuncT:
64 def f(self: 'SnapSchedClient', fs: str, schedule_or_path: str, *args: Any) -> None:
65 ret = func(self, fs, schedule_or_path, *args)
f67539c2
TL
66 path = schedule_or_path
67 if isinstance(schedule_or_path, Schedule):
68 path = schedule_or_path.path
69 self.refresh_snap_timers(fs, path)
20effc67
TL
70 return ret
71 return cast(FuncT, f)
f67539c2
TL
72
73
20effc67
TL
74def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]],
75 retention: Dict[str, int]) -> Set:
f67539c2
TL
76 PRUNING_PATTERNS = OrderedDict([
77 # n is for keep last n snapshots, uses the snapshot name timestamp
78 # format for lowest granularity
2a845540 79 # NOTE: prune set has tz suffix stripped out.
f67539c2
TL
80 ("n", SNAPSHOT_TS_FORMAT),
81 # TODO remove M for release
82 ("M", '%Y-%m-%d-%H_%M'),
83 ("h", '%Y-%m-%d-%H'),
84 ("d", '%Y-%m-%d'),
85 ("w", '%G-%V'),
86 ("m", '%Y-%m'),
87 ("y", '%Y'),
88 ])
89 keep = []
90 if not retention:
91 log.info(f'no retention set, assuming n: {MAX_SNAPS_PER_PATH}')
92 retention = {'n': MAX_SNAPS_PER_PATH}
93 for period, date_pattern in PRUNING_PATTERNS.items():
94 log.debug(f'compiling keep set for period {period}')
95 period_count = retention.get(period, 0)
96 if not period_count:
97 continue
98 last = None
a4b75251 99 kept_for_this_period = 0
f67539c2
TL
100 for snap in sorted(candidates, key=lambda x: x[0].d_name,
101 reverse=True):
102 snap_ts = snap[1].strftime(date_pattern)
103 if snap_ts != last:
104 last = snap_ts
105 if snap not in keep:
20effc67
TL
106 log.debug((f'keeping {snap[0].d_name} due to '
107 f'{period_count}{period}'))
f67539c2 108 keep.append(snap)
a4b75251
TL
109 kept_for_this_period += 1
110 if kept_for_this_period == period_count:
111 log.debug(('found enough snapshots for '
112 f'{period_count}{period}'))
f67539c2
TL
113 break
114 if len(keep) > MAX_SNAPS_PER_PATH:
20effc67
TL
115 log.info((f'Would keep more then {MAX_SNAPS_PER_PATH}, '
116 'pruning keep set'))
f67539c2
TL
117 keep = keep[:MAX_SNAPS_PER_PATH]
118 return candidates - set(keep)
119
2a845540
TL
120def snap_name_to_timestamp(scheduled_snap_name: str) -> str:
121 """ extract timestamp from a schedule snapshot with tz suffix stripped out """
122 ts = scheduled_snap_name.lstrip(f'{SNAPSHOT_PREFIX}-')
123 return ts[0:SNAPSHOT_TS_FORMAT_LEN]
f67539c2 124
20effc67
TL
125class DBInfo():
126 def __init__(self, fs: str, db: sqlite3.Connection):
127 self.fs: str = fs
128 self.lock: Lock = Lock()
129 self.db: sqlite3.Connection = db
130
131
132# context manager for serializing db connection usage
133class DBConnectionManager():
134 def __init__(self, info: DBInfo):
135 self.dbinfo: DBInfo = info
136
137 # using string as return type hint since __future__.annotations is not
138 # available with Python 3.6; its avaialbe starting from Pytohn 3.7
139 def __enter__(self) -> 'DBConnectionManager':
140 log.debug(f'locking db connection for {self.dbinfo.fs}')
141 self.dbinfo.lock.acquire()
142 log.debug(f'locked db connection for {self.dbinfo.fs}')
143 return self
144
145 def __exit__(self,
146 exception_type: Optional[Type[BaseException]],
147 exception_value: Optional[BaseException],
148 traceback: Optional[TracebackType]) -> None:
149 log.debug(f'unlocking db connection for {self.dbinfo.fs}')
150 self.dbinfo.lock.release()
151 log.debug(f'unlocked db connection for {self.dbinfo.fs}')
152
153
f67539c2
TL
154class SnapSchedClient(CephfsClient):
155
20effc67 156 def __init__(self, mgr: Any) -> None:
f67539c2 157 super(SnapSchedClient, self).__init__(mgr)
20effc67
TL
158 # Each db connection is now guarded by a Lock; this is required to
159 # avoid concurrent DB transactions when more than one paths in a
160 # file-system are scheduled at the same interval eg. 1h; without the
161 # lock, there are races to use the same connection, causing nested
162 # transactions to be aborted
163 self.sqlite_connections: Dict[str, DBInfo] = {}
164 self.active_timers: Dict[Tuple[str, str], List[Timer]] = {}
165 self.conn_lock: Lock = Lock() # lock to protect add/lookup db connections
f67539c2 166
1d09f67e
TL
167 # restart old schedules
168 for fs_name in self.get_all_filesystems():
169 with self.get_schedule_db(fs_name) as conn_mgr:
170 db = conn_mgr.dbinfo.db
171 sched_list = Schedule.list_all_schedules(db, fs_name)
172 for sched in sched_list:
173 self.refresh_snap_timers(fs_name, sched.path, db)
174
f67539c2 175 @property
20effc67 176 def allow_minute_snaps(self) -> None:
f67539c2
TL
177 return self.mgr.get_module_option('allow_m_granularity')
178
20effc67
TL
179 @property
180 def dump_on_update(self) -> None:
181 return self.mgr.get_module_option('dump_on_update')
182
183 def get_schedule_db(self, fs: str) -> DBConnectionManager:
184 dbinfo = None
185 self.conn_lock.acquire()
f67539c2 186 if fs not in self.sqlite_connections:
20effc67
TL
187 poolid = self.get_metadata_pool(fs)
188 assert poolid, f'fs "{fs}" not found'
189 uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph"
190 log.debug(f"using uri {uri}")
191 db = sqlite3.connect(uri, check_same_thread=False, uri=True)
192 db.execute('PRAGMA FOREIGN_KEYS = 1')
193 db.execute('PRAGMA JOURNAL_MODE = PERSIST')
194 db.execute('PRAGMA PAGE_SIZE = 65536')
195 db.execute('PRAGMA CACHE_SIZE = 256')
196 db.row_factory = sqlite3.Row
197 # check for legacy dump store
198 pool_param = cast(Union[int, str], poolid)
199 with open_ioctx(self, pool_param) as ioctx:
200 try:
201 size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
202 dump = ioctx.read(SNAP_DB_OBJECT_NAME, size).decode('utf-8')
203 db.executescript(dump)
2a845540 204 ioctx.remove_object(SNAP_DB_OBJECT_NAME)
20effc67
TL
205 except rados.ObjectNotFound:
206 log.debug(f'No legacy schedule DB found in {fs}')
207 db.executescript(Schedule.CREATE_TABLES)
208 self.sqlite_connections[fs] = DBInfo(fs, db)
209 dbinfo = self.sqlite_connections[fs]
210 self.conn_lock.release()
211 return DBConnectionManager(dbinfo)
212
213 def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool:
f67539c2
TL
214 if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
215 if self.allow_minute_snaps:
20effc67
TL
216 log.debug(('Minute repeats allowed, '
217 f'scheduling snapshot on path {path}'))
f67539c2
TL
218 return True
219 else:
20effc67
TL
220 log.info(('Minute repeats disabled, '
221 f'skipping snapshot on path {path}'))
f67539c2
TL
222 return False
223 else:
224 return True
225
20effc67
TL
226 def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]:
227 with db:
228 if self.dump_on_update:
229 dump = [line for line in db.iterdump()]
230 dump = "\n".join(dump)
231 log.debug(f"db dump:\n{dump}")
232 cur = db.execute(Schedule.EXEC_QUERY, (path,))
233 all_rows = cur.fetchall()
234 rows = [r for r in all_rows
235 if self._is_allowed_repeat(r, path)][0:1]
236 return rows
f67539c2 237
20effc67 238 def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None:
f67539c2 239 try:
20effc67
TL
240 log.debug((f'SnapDB on {fs} changed for {path}, '
241 'updating next Timer'))
f67539c2 242 rows = []
20effc67
TL
243 # olddb is passed in the case where we land here without a timer
244 # the lock on the db connection has already been taken
245 if olddb:
246 rows = self.fetch_schedules(olddb, path)
247 else:
248 with self.get_schedule_db(fs) as conn_mgr:
249 db = conn_mgr.dbinfo.db
250 rows = self.fetch_schedules(db, path)
f67539c2
TL
251 timers = self.active_timers.get((fs, path), [])
252 for timer in timers:
253 timer.cancel()
254 timers = []
255 for row in rows:
256 log.debug(f'Creating new snapshot timer for {path}')
257 t = Timer(row[1],
258 self.create_scheduled_snapshot,
259 args=[fs, path, row[0], row[2], row[3]])
260 t.start()
261 timers.append(t)
262 log.debug(f'Will snapshot {path} in fs {fs} in {row[1]}s')
263 self.active_timers[(fs, path)] = timers
264 except Exception:
265 self._log_exception('refresh_snap_timers')
266
20effc67 267 def _log_exception(self, fct: str) -> None:
f67539c2
TL
268 log.error(f'{fct} raised an exception:')
269 log.error(traceback.format_exc())
270
20effc67
TL
271 def create_scheduled_snapshot(self,
272 fs_name: str,
273 path: str,
274 retention: str,
275 start: str,
276 repeat: str) -> None:
f67539c2
TL
277 log.debug(f'Scheduled snapshot of {path} triggered')
278 try:
20effc67
TL
279 with self.get_schedule_db(fs_name) as conn_mgr:
280 db = conn_mgr.dbinfo.db
281 try:
282 sched = Schedule.get_db_schedules(path,
283 db,
284 fs_name,
285 repeat=repeat,
286 start=start)[0]
287 time = datetime.now(timezone.utc)
288 with open_filesystem(self, fs_name) as fs_handle:
2a845540
TL
289 snap_ts = time.strftime(SNAPSHOT_TS_FORMAT_TZ)
290 snap_dir = self.mgr.rados.conf_get('client_snapdir')
291 snap_name = f'{path}/{snap_dir}/{SNAPSHOT_PREFIX}-{snap_ts}'
20effc67
TL
292 fs_handle.mkdir(snap_name, 0o755)
293 log.info(f'created scheduled snapshot of {path}')
294 log.debug(f'created scheduled snapshot {snap_name}')
295 sched.update_last(time, db)
296 except cephfs.Error:
297 self._log_exception('create_scheduled_snapshot')
298 sched.set_inactive(db)
299 except Exception:
300 # catch all exceptions cause otherwise we'll never know since this
301 # is running in a thread
302 self._log_exception('create_scheduled_snapshot')
f67539c2 303 finally:
20effc67
TL
304 with self.get_schedule_db(fs_name) as conn_mgr:
305 db = conn_mgr.dbinfo.db
306 self.refresh_snap_timers(fs_name, path, db)
f67539c2
TL
307 self.prune_snapshots(sched)
308
20effc67 309 def prune_snapshots(self, sched: Schedule) -> None:
f67539c2
TL
310 try:
311 log.debug('Pruning snapshots')
312 ret = sched.retention
313 path = sched.path
314 prune_candidates = set()
315 time = datetime.now(timezone.utc)
316 with open_filesystem(self, sched.fs) as fs_handle:
2a845540
TL
317 snap_dir = self.mgr.rados.conf_get('client_snapdir')
318 with fs_handle.opendir(f'{path}/{snap_dir}') as d_handle:
f67539c2
TL
319 dir_ = fs_handle.readdir(d_handle)
320 while dir_:
321 if dir_.d_name.decode('utf-8').startswith(f'{SNAPSHOT_PREFIX}-'):
322 log.debug(f'add {dir_.d_name} to pruning')
323 ts = datetime.strptime(
2a845540 324 snap_name_to_timestamp(dir_.d_name.decode('utf-8')), SNAPSHOT_TS_FORMAT)
f67539c2
TL
325 prune_candidates.add((dir_, ts))
326 else:
327 log.debug(f'skipping dir entry {dir_.d_name}')
328 dir_ = fs_handle.readdir(d_handle)
329 to_prune = get_prune_set(prune_candidates, ret)
330 for k in to_prune:
331 dirname = k[0].d_name.decode('utf-8')
332 log.debug(f'rmdir on {dirname}')
2a845540 333 fs_handle.rmdir(f'{path}/{snap_dir}/{dirname}')
f67539c2 334 if to_prune:
20effc67
TL
335 with self.get_schedule_db(sched.fs) as conn_mgr:
336 db = conn_mgr.dbinfo.db
337 sched.update_pruned(time, db, len(to_prune))
f67539c2
TL
338 except Exception:
339 self._log_exception('prune_snapshots')
340
20effc67
TL
341 def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]:
342 with self.get_schedule_db(fs) as conn_mgr:
343 db = conn_mgr.dbinfo.db
344 return Schedule.get_db_schedules(path, db, fs)
f67539c2 345
20effc67
TL
346 def list_snap_schedules(self,
347 fs: str,
348 path: str,
349 recursive: bool) -> List[Schedule]:
350 with self.get_schedule_db(fs) as conn_mgr:
351 db = conn_mgr.dbinfo.db
352 return Schedule.list_schedules(path, db, fs, recursive)
f67539c2
TL
353
354 @updates_schedule_db
355 # TODO improve interface
20effc67
TL
356 def store_snap_schedule(self,
357 fs: str, path_: str,
358 args: Tuple[str, str, str, str,
359 Optional[str], Optional[str]]) -> None:
f67539c2
TL
360 sched = Schedule(*args)
361 log.debug(f'repeat is {sched.repeat}')
362 if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps:
363 log.error('not allowed')
364 raise ValueError('no minute snaps allowed')
365 log.debug(f'attempting to add schedule {sched}')
20effc67
TL
366 with self.get_schedule_db(fs) as conn_mgr:
367 db = conn_mgr.dbinfo.db
368 sched.store_schedule(db)
f67539c2
TL
369
370 @updates_schedule_db
20effc67
TL
371 def rm_snap_schedule(self,
372 fs: str, path: str,
373 schedule: Optional[str],
374 start: Optional[str]) -> None:
375 with self.get_schedule_db(fs) as conn_mgr:
376 db = conn_mgr.dbinfo.db
377 Schedule.rm_schedule(db, path, schedule, start)
f67539c2
TL
378
379 @updates_schedule_db
380 def add_retention_spec(self,
20effc67
TL
381 fs: str,
382 path: str,
383 retention_spec_or_period: str,
384 retention_count: Optional[str]) -> None:
f67539c2
TL
385 retention_spec = retention_spec_or_period
386 if retention_count:
387 retention_spec = retention_count + retention_spec
20effc67
TL
388 with self.get_schedule_db(fs) as conn_mgr:
389 db = conn_mgr.dbinfo.db
390 Schedule.add_retention(db, path, retention_spec)
f67539c2
TL
391
392 @updates_schedule_db
393 def rm_retention_spec(self,
20effc67
TL
394 fs: str,
395 path: str,
396 retention_spec_or_period: str,
397 retention_count: Optional[str]) -> None:
f67539c2
TL
398 retention_spec = retention_spec_or_period
399 if retention_count:
400 retention_spec = retention_count + retention_spec
20effc67
TL
401 with self.get_schedule_db(fs) as conn_mgr:
402 db = conn_mgr.dbinfo.db
403 Schedule.rm_retention(db, path, retention_spec)
f67539c2
TL
404
405 @updates_schedule_db
20effc67
TL
406 def activate_snap_schedule(self,
407 fs: str,
408 path: str,
409 schedule: Optional[str],
410 start: Optional[str]) -> None:
411 with self.get_schedule_db(fs) as conn_mgr:
412 db = conn_mgr.dbinfo.db
413 schedules = Schedule.get_db_schedules(path, db, fs,
414 schedule=schedule,
415 start=start)
416 for s in schedules:
417 s.set_active(db)
f67539c2
TL
418
419 @updates_schedule_db
20effc67
TL
420 def deactivate_snap_schedule(self,
421 fs: str, path: str,
422 schedule: Optional[str],
423 start: Optional[str]) -> None:
424 with self.get_schedule_db(fs) as conn_mgr:
425 db = conn_mgr.dbinfo.db
426 schedules = Schedule.get_db_schedules(path, db, fs,
427 schedule=schedule,
428 start=start)
429 for s in schedules:
430 s.set_inactive(db)