]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/snap_schedule/fs/schedule_client.py
486582074767ed303dac712ceac7481dd01cb8cf
[ceph.git] / ceph / src / pybind / mgr / snap_schedule / fs / schedule_client.py
1 """
2 Copyright (C) 2020 SUSE
3
4 LGPL2.1. See file COPYING.
5 """
6 import cephfs
7 import rados
8 from contextlib import contextmanager
9 from mgr_util import CephfsClient, open_filesystem
10 from collections import OrderedDict
11 from datetime import datetime, timezone
12 import logging
13 from threading import Timer, Lock
14 from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
15 Tuple, TypeVar, Union, Type
16 from types import TracebackType
17 import sqlite3
18 from .schedule import Schedule
19 import traceback
20
21
22 MAX_SNAPS_PER_PATH = 50
23 SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
24 SNAP_DB_PREFIX = 'snap_db'
25 # increment this every time the db schema changes and provide upgrade code
26 SNAP_DB_VERSION = '0'
27 SNAP_DB_OBJECT_NAME = f'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}'
28 # scheduled snapshots are tz suffixed
29 SNAPSHOT_TS_FORMAT_TZ = '%Y-%m-%d-%H_%M_%S_%Z'
30 # for backward compat snapshot name parsing
31 SNAPSHOT_TS_FORMAT = '%Y-%m-%d-%H_%M_%S'
32 # length of timestamp format (without tz suffix)
33 # e.g.: scheduled-2022-04-19-05_39_00_UTC (len = "2022-04-19-05_39_00")
34 SNAPSHOT_TS_FORMAT_LEN = 19
35 SNAPSHOT_PREFIX = 'scheduled'
36
37 log = logging.getLogger(__name__)
38
39
40 CephfsClientT = TypeVar('CephfsClientT', bound=CephfsClient)
41
42
43 @contextmanager
44 def open_ioctx(self: CephfsClientT,
45 pool: Union[int, str]) -> Iterator[rados.Ioctx]:
46 try:
47 if type(pool) is int:
48 with self.mgr.rados.open_ioctx2(pool) as ioctx:
49 ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
50 yield ioctx
51 else:
52 with self.mgr.rados.open_ioctx(pool) as ioctx:
53 ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
54 yield ioctx
55 except rados.ObjectNotFound:
56 log.error("Failed to locate pool {}".format(pool))
57 raise
58
59
60 FuncT = TypeVar('FuncT', bound=Callable[..., None])
61
62
63 def updates_schedule_db(func: FuncT) -> FuncT:
64 def f(self: 'SnapSchedClient', fs: str, schedule_or_path: str, *args: Any) -> None:
65 ret = func(self, fs, schedule_or_path, *args)
66 path = schedule_or_path
67 if isinstance(schedule_or_path, Schedule):
68 path = schedule_or_path.path
69 self.refresh_snap_timers(fs, path)
70 return ret
71 return cast(FuncT, f)
72
73
74 def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]],
75 retention: Dict[str, int]) -> Set:
76 PRUNING_PATTERNS = OrderedDict([
77 # n is for keep last n snapshots, uses the snapshot name timestamp
78 # format for lowest granularity
79 # NOTE: prune set has tz suffix stripped out.
80 ("n", SNAPSHOT_TS_FORMAT),
81 # TODO remove M for release
82 ("M", '%Y-%m-%d-%H_%M'),
83 ("h", '%Y-%m-%d-%H'),
84 ("d", '%Y-%m-%d'),
85 ("w", '%G-%V'),
86 ("m", '%Y-%m'),
87 ("y", '%Y'),
88 ])
89 keep = []
90 if not retention:
91 log.info(f'no retention set, assuming n: {MAX_SNAPS_PER_PATH}')
92 retention = {'n': MAX_SNAPS_PER_PATH}
93 for period, date_pattern in PRUNING_PATTERNS.items():
94 log.debug(f'compiling keep set for period {period}')
95 period_count = retention.get(period, 0)
96 if not period_count:
97 continue
98 last = None
99 kept_for_this_period = 0
100 for snap in sorted(candidates, key=lambda x: x[0].d_name,
101 reverse=True):
102 snap_ts = snap[1].strftime(date_pattern)
103 if snap_ts != last:
104 last = snap_ts
105 if snap not in keep:
106 log.debug((f'keeping {snap[0].d_name} due to '
107 f'{period_count}{period}'))
108 keep.append(snap)
109 kept_for_this_period += 1
110 if kept_for_this_period == period_count:
111 log.debug(('found enough snapshots for '
112 f'{period_count}{period}'))
113 break
114 if len(keep) > MAX_SNAPS_PER_PATH:
115 log.info((f'Would keep more then {MAX_SNAPS_PER_PATH}, '
116 'pruning keep set'))
117 keep = keep[:MAX_SNAPS_PER_PATH]
118 return candidates - set(keep)
119
120 def snap_name_to_timestamp(scheduled_snap_name: str) -> str:
121 """ extract timestamp from a schedule snapshot with tz suffix stripped out """
122 ts = scheduled_snap_name.lstrip(f'{SNAPSHOT_PREFIX}-')
123 return ts[0:SNAPSHOT_TS_FORMAT_LEN]
124
125 class DBInfo():
126 def __init__(self, fs: str, db: sqlite3.Connection):
127 self.fs: str = fs
128 self.lock: Lock = Lock()
129 self.db: sqlite3.Connection = db
130
131
132 # context manager for serializing db connection usage
133 class DBConnectionManager():
134 def __init__(self, info: DBInfo):
135 self.dbinfo: DBInfo = info
136
137 # using string as return type hint since __future__.annotations is not
138 # available with Python 3.6; its avaialbe starting from Pytohn 3.7
139 def __enter__(self) -> 'DBConnectionManager':
140 log.debug(f'locking db connection for {self.dbinfo.fs}')
141 self.dbinfo.lock.acquire()
142 log.debug(f'locked db connection for {self.dbinfo.fs}')
143 return self
144
145 def __exit__(self,
146 exception_type: Optional[Type[BaseException]],
147 exception_value: Optional[BaseException],
148 traceback: Optional[TracebackType]) -> None:
149 log.debug(f'unlocking db connection for {self.dbinfo.fs}')
150 self.dbinfo.lock.release()
151 log.debug(f'unlocked db connection for {self.dbinfo.fs}')
152
153
154 class SnapSchedClient(CephfsClient):
155
156 def __init__(self, mgr: Any) -> None:
157 super(SnapSchedClient, self).__init__(mgr)
158 # Each db connection is now guarded by a Lock; this is required to
159 # avoid concurrent DB transactions when more than one paths in a
160 # file-system are scheduled at the same interval eg. 1h; without the
161 # lock, there are races to use the same connection, causing nested
162 # transactions to be aborted
163 self.sqlite_connections: Dict[str, DBInfo] = {}
164 self.active_timers: Dict[Tuple[str, str], List[Timer]] = {}
165 self.conn_lock: Lock = Lock() # lock to protect add/lookup db connections
166
167 # restart old schedules
168 for fs_name in self.get_all_filesystems():
169 with self.get_schedule_db(fs_name) as conn_mgr:
170 db = conn_mgr.dbinfo.db
171 sched_list = Schedule.list_all_schedules(db, fs_name)
172 for sched in sched_list:
173 self.refresh_snap_timers(fs_name, sched.path, db)
174
175 @property
176 def allow_minute_snaps(self) -> None:
177 return self.mgr.get_module_option('allow_m_granularity')
178
179 @property
180 def dump_on_update(self) -> None:
181 return self.mgr.get_module_option('dump_on_update')
182
183 def get_schedule_db(self, fs: str) -> DBConnectionManager:
184 dbinfo = None
185 self.conn_lock.acquire()
186 if fs not in self.sqlite_connections:
187 poolid = self.get_metadata_pool(fs)
188 assert poolid, f'fs "{fs}" not found'
189 uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph"
190 log.debug(f"using uri {uri}")
191 db = sqlite3.connect(uri, check_same_thread=False, uri=True)
192 db.execute('PRAGMA FOREIGN_KEYS = 1')
193 db.execute('PRAGMA JOURNAL_MODE = PERSIST')
194 db.execute('PRAGMA PAGE_SIZE = 65536')
195 db.execute('PRAGMA CACHE_SIZE = 256')
196 db.row_factory = sqlite3.Row
197 # check for legacy dump store
198 pool_param = cast(Union[int, str], poolid)
199 with open_ioctx(self, pool_param) as ioctx:
200 try:
201 size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
202 dump = ioctx.read(SNAP_DB_OBJECT_NAME, size).decode('utf-8')
203 db.executescript(dump)
204 ioctx.remove_object(SNAP_DB_OBJECT_NAME)
205 except rados.ObjectNotFound:
206 log.debug(f'No legacy schedule DB found in {fs}')
207 db.executescript(Schedule.CREATE_TABLES)
208 self.sqlite_connections[fs] = DBInfo(fs, db)
209 dbinfo = self.sqlite_connections[fs]
210 self.conn_lock.release()
211 return DBConnectionManager(dbinfo)
212
213 def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool:
214 if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
215 if self.allow_minute_snaps:
216 log.debug(('Minute repeats allowed, '
217 f'scheduling snapshot on path {path}'))
218 return True
219 else:
220 log.info(('Minute repeats disabled, '
221 f'skipping snapshot on path {path}'))
222 return False
223 else:
224 return True
225
226 def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]:
227 with db:
228 if self.dump_on_update:
229 dump = [line for line in db.iterdump()]
230 dump = "\n".join(dump)
231 log.debug(f"db dump:\n{dump}")
232 cur = db.execute(Schedule.EXEC_QUERY, (path,))
233 all_rows = cur.fetchall()
234 rows = [r for r in all_rows
235 if self._is_allowed_repeat(r, path)][0:1]
236 return rows
237
238 def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None:
239 try:
240 log.debug((f'SnapDB on {fs} changed for {path}, '
241 'updating next Timer'))
242 rows = []
243 # olddb is passed in the case where we land here without a timer
244 # the lock on the db connection has already been taken
245 if olddb:
246 rows = self.fetch_schedules(olddb, path)
247 else:
248 with self.get_schedule_db(fs) as conn_mgr:
249 db = conn_mgr.dbinfo.db
250 rows = self.fetch_schedules(db, path)
251 timers = self.active_timers.get((fs, path), [])
252 for timer in timers:
253 timer.cancel()
254 timers = []
255 for row in rows:
256 log.debug(f'Creating new snapshot timer for {path}')
257 t = Timer(row[1],
258 self.create_scheduled_snapshot,
259 args=[fs, path, row[0], row[2], row[3]])
260 t.start()
261 timers.append(t)
262 log.debug(f'Will snapshot {path} in fs {fs} in {row[1]}s')
263 self.active_timers[(fs, path)] = timers
264 except Exception:
265 self._log_exception('refresh_snap_timers')
266
267 def _log_exception(self, fct: str) -> None:
268 log.error(f'{fct} raised an exception:')
269 log.error(traceback.format_exc())
270
271 def create_scheduled_snapshot(self,
272 fs_name: str,
273 path: str,
274 retention: str,
275 start: str,
276 repeat: str) -> None:
277 log.debug(f'Scheduled snapshot of {path} triggered')
278 try:
279 with self.get_schedule_db(fs_name) as conn_mgr:
280 db = conn_mgr.dbinfo.db
281 try:
282 sched = Schedule.get_db_schedules(path,
283 db,
284 fs_name,
285 repeat=repeat,
286 start=start)[0]
287 time = datetime.now(timezone.utc)
288 with open_filesystem(self, fs_name) as fs_handle:
289 snap_ts = time.strftime(SNAPSHOT_TS_FORMAT_TZ)
290 snap_dir = self.mgr.rados.conf_get('client_snapdir')
291 snap_name = f'{path}/{snap_dir}/{SNAPSHOT_PREFIX}-{snap_ts}'
292 fs_handle.mkdir(snap_name, 0o755)
293 log.info(f'created scheduled snapshot of {path}')
294 log.debug(f'created scheduled snapshot {snap_name}')
295 sched.update_last(time, db)
296 except cephfs.Error:
297 self._log_exception('create_scheduled_snapshot')
298 sched.set_inactive(db)
299 except Exception:
300 # catch all exceptions cause otherwise we'll never know since this
301 # is running in a thread
302 self._log_exception('create_scheduled_snapshot')
303 finally:
304 with self.get_schedule_db(fs_name) as conn_mgr:
305 db = conn_mgr.dbinfo.db
306 self.refresh_snap_timers(fs_name, path, db)
307 self.prune_snapshots(sched)
308
309 def prune_snapshots(self, sched: Schedule) -> None:
310 try:
311 log.debug('Pruning snapshots')
312 ret = sched.retention
313 path = sched.path
314 prune_candidates = set()
315 time = datetime.now(timezone.utc)
316 with open_filesystem(self, sched.fs) as fs_handle:
317 snap_dir = self.mgr.rados.conf_get('client_snapdir')
318 with fs_handle.opendir(f'{path}/{snap_dir}') as d_handle:
319 dir_ = fs_handle.readdir(d_handle)
320 while dir_:
321 if dir_.d_name.decode('utf-8').startswith(f'{SNAPSHOT_PREFIX}-'):
322 log.debug(f'add {dir_.d_name} to pruning')
323 ts = datetime.strptime(
324 snap_name_to_timestamp(dir_.d_name.decode('utf-8')), SNAPSHOT_TS_FORMAT)
325 prune_candidates.add((dir_, ts))
326 else:
327 log.debug(f'skipping dir entry {dir_.d_name}')
328 dir_ = fs_handle.readdir(d_handle)
329 to_prune = get_prune_set(prune_candidates, ret)
330 for k in to_prune:
331 dirname = k[0].d_name.decode('utf-8')
332 log.debug(f'rmdir on {dirname}')
333 fs_handle.rmdir(f'{path}/{snap_dir}/{dirname}')
334 if to_prune:
335 with self.get_schedule_db(sched.fs) as conn_mgr:
336 db = conn_mgr.dbinfo.db
337 sched.update_pruned(time, db, len(to_prune))
338 except Exception:
339 self._log_exception('prune_snapshots')
340
341 def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]:
342 with self.get_schedule_db(fs) as conn_mgr:
343 db = conn_mgr.dbinfo.db
344 return Schedule.get_db_schedules(path, db, fs)
345
346 def list_snap_schedules(self,
347 fs: str,
348 path: str,
349 recursive: bool) -> List[Schedule]:
350 with self.get_schedule_db(fs) as conn_mgr:
351 db = conn_mgr.dbinfo.db
352 return Schedule.list_schedules(path, db, fs, recursive)
353
354 @updates_schedule_db
355 # TODO improve interface
356 def store_snap_schedule(self,
357 fs: str, path_: str,
358 args: Tuple[str, str, str, str,
359 Optional[str], Optional[str]]) -> None:
360 sched = Schedule(*args)
361 log.debug(f'repeat is {sched.repeat}')
362 if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps:
363 log.error('not allowed')
364 raise ValueError('no minute snaps allowed')
365 log.debug(f'attempting to add schedule {sched}')
366 with self.get_schedule_db(fs) as conn_mgr:
367 db = conn_mgr.dbinfo.db
368 sched.store_schedule(db)
369
370 @updates_schedule_db
371 def rm_snap_schedule(self,
372 fs: str, path: str,
373 schedule: Optional[str],
374 start: Optional[str]) -> None:
375 with self.get_schedule_db(fs) as conn_mgr:
376 db = conn_mgr.dbinfo.db
377 Schedule.rm_schedule(db, path, schedule, start)
378
379 @updates_schedule_db
380 def add_retention_spec(self,
381 fs: str,
382 path: str,
383 retention_spec_or_period: str,
384 retention_count: Optional[str]) -> None:
385 retention_spec = retention_spec_or_period
386 if retention_count:
387 retention_spec = retention_count + retention_spec
388 with self.get_schedule_db(fs) as conn_mgr:
389 db = conn_mgr.dbinfo.db
390 Schedule.add_retention(db, path, retention_spec)
391
392 @updates_schedule_db
393 def rm_retention_spec(self,
394 fs: str,
395 path: str,
396 retention_spec_or_period: str,
397 retention_count: Optional[str]) -> None:
398 retention_spec = retention_spec_or_period
399 if retention_count:
400 retention_spec = retention_count + retention_spec
401 with self.get_schedule_db(fs) as conn_mgr:
402 db = conn_mgr.dbinfo.db
403 Schedule.rm_retention(db, path, retention_spec)
404
405 @updates_schedule_db
406 def activate_snap_schedule(self,
407 fs: str,
408 path: str,
409 schedule: Optional[str],
410 start: Optional[str]) -> None:
411 with self.get_schedule_db(fs) as conn_mgr:
412 db = conn_mgr.dbinfo.db
413 schedules = Schedule.get_db_schedules(path, db, fs,
414 schedule=schedule,
415 start=start)
416 for s in schedules:
417 s.set_active(db)
418
419 @updates_schedule_db
420 def deactivate_snap_schedule(self,
421 fs: str, path: str,
422 schedule: Optional[str],
423 start: Optional[str]) -> None:
424 with self.get_schedule_db(fs) as conn_mgr:
425 db = conn_mgr.dbinfo.db
426 schedules = Schedule.get_db_schedules(path, db, fs,
427 schedule=schedule,
428 start=start)
429 for s in schedules:
430 s.set_inactive(db)