]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/snap_schedule/fs/schedule_client.py
a22f658dffed94d0562276c7123b5bc270a9b6ff
[ceph.git] / ceph / src / pybind / mgr / snap_schedule / fs / schedule_client.py
1 """
2 Copyright (C) 2020 SUSE
3
4 LGPL2.1. See file COPYING.
5 """
6 import cephfs
7 import rados
8 from contextlib import contextmanager
9 from mgr_util import CephfsClient, open_filesystem
10 from collections import OrderedDict
11 from datetime import datetime, timezone
12 import logging
13 from threading import Timer, Lock
14 from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \
15 Tuple, TypeVar, Union, Type
16 from types import TracebackType
17 import sqlite3
18 from .schedule import Schedule
19 import traceback
20
21
22 MAX_SNAPS_PER_PATH = 50
23 SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
24 SNAP_DB_PREFIX = 'snap_db'
25 # increment this every time the db schema changes and provide upgrade code
26 SNAP_DB_VERSION = '0'
27 SNAP_DB_OBJECT_NAME = f'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}'
28 SNAPSHOT_TS_FORMAT = '%Y-%m-%d-%H_%M_%S'
29 SNAPSHOT_PREFIX = 'scheduled'
30
31 log = logging.getLogger(__name__)
32
33
34 CephfsClientT = TypeVar('CephfsClientT', bound=CephfsClient)
35
36
37 @contextmanager
38 def open_ioctx(self: CephfsClientT,
39 pool: Union[int, str]) -> Iterator[rados.Ioctx]:
40 try:
41 if type(pool) is int:
42 with self.mgr.rados.open_ioctx2(pool) as ioctx:
43 ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
44 yield ioctx
45 else:
46 with self.mgr.rados.open_ioctx(pool) as ioctx:
47 ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE)
48 yield ioctx
49 except rados.ObjectNotFound:
50 log.error("Failed to locate pool {}".format(pool))
51 raise
52
53
54 FuncT = TypeVar('FuncT', bound=Callable[..., None])
55
56
57 def updates_schedule_db(func: FuncT) -> FuncT:
58 def f(self: 'SnapSchedClient', fs: str, schedule_or_path: str, *args: Any) -> None:
59 ret = func(self, fs, schedule_or_path, *args)
60 path = schedule_or_path
61 if isinstance(schedule_or_path, Schedule):
62 path = schedule_or_path.path
63 self.refresh_snap_timers(fs, path)
64 return ret
65 return cast(FuncT, f)
66
67
68 def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]],
69 retention: Dict[str, int]) -> Set:
70 PRUNING_PATTERNS = OrderedDict([
71 # n is for keep last n snapshots, uses the snapshot name timestamp
72 # format for lowest granularity
73 ("n", SNAPSHOT_TS_FORMAT),
74 # TODO remove M for release
75 ("M", '%Y-%m-%d-%H_%M'),
76 ("h", '%Y-%m-%d-%H'),
77 ("d", '%Y-%m-%d'),
78 ("w", '%G-%V'),
79 ("m", '%Y-%m'),
80 ("y", '%Y'),
81 ])
82 keep = []
83 if not retention:
84 log.info(f'no retention set, assuming n: {MAX_SNAPS_PER_PATH}')
85 retention = {'n': MAX_SNAPS_PER_PATH}
86 for period, date_pattern in PRUNING_PATTERNS.items():
87 log.debug(f'compiling keep set for period {period}')
88 period_count = retention.get(period, 0)
89 if not period_count:
90 continue
91 last = None
92 kept_for_this_period = 0
93 for snap in sorted(candidates, key=lambda x: x[0].d_name,
94 reverse=True):
95 snap_ts = snap[1].strftime(date_pattern)
96 if snap_ts != last:
97 last = snap_ts
98 if snap not in keep:
99 log.debug((f'keeping {snap[0].d_name} due to '
100 f'{period_count}{period}'))
101 keep.append(snap)
102 kept_for_this_period += 1
103 if kept_for_this_period == period_count:
104 log.debug(('found enough snapshots for '
105 f'{period_count}{period}'))
106 break
107 if len(keep) > MAX_SNAPS_PER_PATH:
108 log.info((f'Would keep more then {MAX_SNAPS_PER_PATH}, '
109 'pruning keep set'))
110 keep = keep[:MAX_SNAPS_PER_PATH]
111 return candidates - set(keep)
112
113
114 class DBInfo():
115 def __init__(self, fs: str, db: sqlite3.Connection):
116 self.fs: str = fs
117 self.lock: Lock = Lock()
118 self.db: sqlite3.Connection = db
119
120
121 # context manager for serializing db connection usage
122 class DBConnectionManager():
123 def __init__(self, info: DBInfo):
124 self.dbinfo: DBInfo = info
125
126 # using string as return type hint since __future__.annotations is not
127 # available with Python 3.6; its avaialbe starting from Pytohn 3.7
128 def __enter__(self) -> 'DBConnectionManager':
129 log.debug(f'locking db connection for {self.dbinfo.fs}')
130 self.dbinfo.lock.acquire()
131 log.debug(f'locked db connection for {self.dbinfo.fs}')
132 return self
133
134 def __exit__(self,
135 exception_type: Optional[Type[BaseException]],
136 exception_value: Optional[BaseException],
137 traceback: Optional[TracebackType]) -> None:
138 log.debug(f'unlocking db connection for {self.dbinfo.fs}')
139 self.dbinfo.lock.release()
140 log.debug(f'unlocked db connection for {self.dbinfo.fs}')
141
142
143 class SnapSchedClient(CephfsClient):
144
145 def __init__(self, mgr: Any) -> None:
146 super(SnapSchedClient, self).__init__(mgr)
147 # Each db connection is now guarded by a Lock; this is required to
148 # avoid concurrent DB transactions when more than one paths in a
149 # file-system are scheduled at the same interval eg. 1h; without the
150 # lock, there are races to use the same connection, causing nested
151 # transactions to be aborted
152 self.sqlite_connections: Dict[str, DBInfo] = {}
153 self.active_timers: Dict[Tuple[str, str], List[Timer]] = {}
154 self.conn_lock: Lock = Lock() # lock to protect add/lookup db connections
155
156 # restart old schedules
157 for fs_name in self.get_all_filesystems():
158 with self.get_schedule_db(fs_name) as conn_mgr:
159 db = conn_mgr.dbinfo.db
160 sched_list = Schedule.list_all_schedules(db, fs_name)
161 for sched in sched_list:
162 self.refresh_snap_timers(fs_name, sched.path, db)
163
164 @property
165 def allow_minute_snaps(self) -> None:
166 return self.mgr.get_module_option('allow_m_granularity')
167
168 @property
169 def dump_on_update(self) -> None:
170 return self.mgr.get_module_option('dump_on_update')
171
172 def get_schedule_db(self, fs: str) -> DBConnectionManager:
173 dbinfo = None
174 self.conn_lock.acquire()
175 if fs not in self.sqlite_connections:
176 poolid = self.get_metadata_pool(fs)
177 assert poolid, f'fs "{fs}" not found'
178 uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph"
179 log.debug(f"using uri {uri}")
180 db = sqlite3.connect(uri, check_same_thread=False, uri=True)
181 db.execute('PRAGMA FOREIGN_KEYS = 1')
182 db.execute('PRAGMA JOURNAL_MODE = PERSIST')
183 db.execute('PRAGMA PAGE_SIZE = 65536')
184 db.execute('PRAGMA CACHE_SIZE = 256')
185 db.row_factory = sqlite3.Row
186 # check for legacy dump store
187 pool_param = cast(Union[int, str], poolid)
188 with open_ioctx(self, pool_param) as ioctx:
189 try:
190 size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME)
191 dump = ioctx.read(SNAP_DB_OBJECT_NAME, size).decode('utf-8')
192 db.executescript(dump)
193 ioctx.remove(SNAP_DB_OBJECT_NAME)
194 except rados.ObjectNotFound:
195 log.debug(f'No legacy schedule DB found in {fs}')
196 db.executescript(Schedule.CREATE_TABLES)
197 self.sqlite_connections[fs] = DBInfo(fs, db)
198 dbinfo = self.sqlite_connections[fs]
199 self.conn_lock.release()
200 return DBConnectionManager(dbinfo)
201
202 def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool:
203 if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M':
204 if self.allow_minute_snaps:
205 log.debug(('Minute repeats allowed, '
206 f'scheduling snapshot on path {path}'))
207 return True
208 else:
209 log.info(('Minute repeats disabled, '
210 f'skipping snapshot on path {path}'))
211 return False
212 else:
213 return True
214
215 def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]:
216 with db:
217 if self.dump_on_update:
218 dump = [line for line in db.iterdump()]
219 dump = "\n".join(dump)
220 log.debug(f"db dump:\n{dump}")
221 cur = db.execute(Schedule.EXEC_QUERY, (path,))
222 all_rows = cur.fetchall()
223 rows = [r for r in all_rows
224 if self._is_allowed_repeat(r, path)][0:1]
225 return rows
226
227 def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None:
228 try:
229 log.debug((f'SnapDB on {fs} changed for {path}, '
230 'updating next Timer'))
231 rows = []
232 # olddb is passed in the case where we land here without a timer
233 # the lock on the db connection has already been taken
234 if olddb:
235 rows = self.fetch_schedules(olddb, path)
236 else:
237 with self.get_schedule_db(fs) as conn_mgr:
238 db = conn_mgr.dbinfo.db
239 rows = self.fetch_schedules(db, path)
240 timers = self.active_timers.get((fs, path), [])
241 for timer in timers:
242 timer.cancel()
243 timers = []
244 for row in rows:
245 log.debug(f'Creating new snapshot timer for {path}')
246 t = Timer(row[1],
247 self.create_scheduled_snapshot,
248 args=[fs, path, row[0], row[2], row[3]])
249 t.start()
250 timers.append(t)
251 log.debug(f'Will snapshot {path} in fs {fs} in {row[1]}s')
252 self.active_timers[(fs, path)] = timers
253 except Exception:
254 self._log_exception('refresh_snap_timers')
255
256 def _log_exception(self, fct: str) -> None:
257 log.error(f'{fct} raised an exception:')
258 log.error(traceback.format_exc())
259
260 def create_scheduled_snapshot(self,
261 fs_name: str,
262 path: str,
263 retention: str,
264 start: str,
265 repeat: str) -> None:
266 log.debug(f'Scheduled snapshot of {path} triggered')
267 try:
268 with self.get_schedule_db(fs_name) as conn_mgr:
269 db = conn_mgr.dbinfo.db
270 try:
271 sched = Schedule.get_db_schedules(path,
272 db,
273 fs_name,
274 repeat=repeat,
275 start=start)[0]
276 time = datetime.now(timezone.utc)
277 with open_filesystem(self, fs_name) as fs_handle:
278 snap_ts = time.strftime(SNAPSHOT_TS_FORMAT)
279 snap_name = f'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
280 fs_handle.mkdir(snap_name, 0o755)
281 log.info(f'created scheduled snapshot of {path}')
282 log.debug(f'created scheduled snapshot {snap_name}')
283 sched.update_last(time, db)
284 except cephfs.Error:
285 self._log_exception('create_scheduled_snapshot')
286 sched.set_inactive(db)
287 except Exception:
288 # catch all exceptions cause otherwise we'll never know since this
289 # is running in a thread
290 self._log_exception('create_scheduled_snapshot')
291 finally:
292 with self.get_schedule_db(fs_name) as conn_mgr:
293 db = conn_mgr.dbinfo.db
294 self.refresh_snap_timers(fs_name, path, db)
295 self.prune_snapshots(sched)
296
297 def prune_snapshots(self, sched: Schedule) -> None:
298 try:
299 log.debug('Pruning snapshots')
300 ret = sched.retention
301 path = sched.path
302 prune_candidates = set()
303 time = datetime.now(timezone.utc)
304 with open_filesystem(self, sched.fs) as fs_handle:
305 with fs_handle.opendir(f'{path}/.snap') as d_handle:
306 dir_ = fs_handle.readdir(d_handle)
307 while dir_:
308 if dir_.d_name.decode('utf-8').startswith(f'{SNAPSHOT_PREFIX}-'):
309 log.debug(f'add {dir_.d_name} to pruning')
310 ts = datetime.strptime(
311 dir_.d_name.decode('utf-8').lstrip(f'{SNAPSHOT_PREFIX}-'),
312 SNAPSHOT_TS_FORMAT)
313 prune_candidates.add((dir_, ts))
314 else:
315 log.debug(f'skipping dir entry {dir_.d_name}')
316 dir_ = fs_handle.readdir(d_handle)
317 to_prune = get_prune_set(prune_candidates, ret)
318 for k in to_prune:
319 dirname = k[0].d_name.decode('utf-8')
320 log.debug(f'rmdir on {dirname}')
321 fs_handle.rmdir(f'{path}/.snap/{dirname}')
322 if to_prune:
323 with self.get_schedule_db(sched.fs) as conn_mgr:
324 db = conn_mgr.dbinfo.db
325 sched.update_pruned(time, db, len(to_prune))
326 except Exception:
327 self._log_exception('prune_snapshots')
328
329 def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]:
330 with self.get_schedule_db(fs) as conn_mgr:
331 db = conn_mgr.dbinfo.db
332 return Schedule.get_db_schedules(path, db, fs)
333
334 def list_snap_schedules(self,
335 fs: str,
336 path: str,
337 recursive: bool) -> List[Schedule]:
338 with self.get_schedule_db(fs) as conn_mgr:
339 db = conn_mgr.dbinfo.db
340 return Schedule.list_schedules(path, db, fs, recursive)
341
342 @updates_schedule_db
343 # TODO improve interface
344 def store_snap_schedule(self,
345 fs: str, path_: str,
346 args: Tuple[str, str, str, str,
347 Optional[str], Optional[str]]) -> None:
348 sched = Schedule(*args)
349 log.debug(f'repeat is {sched.repeat}')
350 if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps:
351 log.error('not allowed')
352 raise ValueError('no minute snaps allowed')
353 log.debug(f'attempting to add schedule {sched}')
354 with self.get_schedule_db(fs) as conn_mgr:
355 db = conn_mgr.dbinfo.db
356 sched.store_schedule(db)
357
358 @updates_schedule_db
359 def rm_snap_schedule(self,
360 fs: str, path: str,
361 schedule: Optional[str],
362 start: Optional[str]) -> None:
363 with self.get_schedule_db(fs) as conn_mgr:
364 db = conn_mgr.dbinfo.db
365 Schedule.rm_schedule(db, path, schedule, start)
366
367 @updates_schedule_db
368 def add_retention_spec(self,
369 fs: str,
370 path: str,
371 retention_spec_or_period: str,
372 retention_count: Optional[str]) -> None:
373 retention_spec = retention_spec_or_period
374 if retention_count:
375 retention_spec = retention_count + retention_spec
376 with self.get_schedule_db(fs) as conn_mgr:
377 db = conn_mgr.dbinfo.db
378 Schedule.add_retention(db, path, retention_spec)
379
380 @updates_schedule_db
381 def rm_retention_spec(self,
382 fs: str,
383 path: str,
384 retention_spec_or_period: str,
385 retention_count: Optional[str]) -> None:
386 retention_spec = retention_spec_or_period
387 if retention_count:
388 retention_spec = retention_count + retention_spec
389 with self.get_schedule_db(fs) as conn_mgr:
390 db = conn_mgr.dbinfo.db
391 Schedule.rm_retention(db, path, retention_spec)
392
393 @updates_schedule_db
394 def activate_snap_schedule(self,
395 fs: str,
396 path: str,
397 schedule: Optional[str],
398 start: Optional[str]) -> None:
399 with self.get_schedule_db(fs) as conn_mgr:
400 db = conn_mgr.dbinfo.db
401 schedules = Schedule.get_db_schedules(path, db, fs,
402 schedule=schedule,
403 start=start)
404 for s in schedules:
405 s.set_active(db)
406
407 @updates_schedule_db
408 def deactivate_snap_schedule(self,
409 fs: str, path: str,
410 schedule: Optional[str],
411 start: Optional[str]) -> None:
412 with self.get_schedule_db(fs) as conn_mgr:
413 db = conn_mgr.dbinfo.db
414 schedules = Schedule.get_db_schedules(path, db, fs,
415 schedule=schedule,
416 start=start)
417 for s in schedules:
418 s.set_inactive(db)