]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | """ |
2 | Copyright (C) 2020 SUSE | |
3 | ||
4 | LGPL2.1. See file COPYING. | |
5 | """ | |
6 | import cephfs | |
f67539c2 TL |
7 | import rados |
8 | from contextlib import contextmanager | |
20effc67 | 9 | from mgr_util import CephfsClient, open_filesystem |
f67539c2 TL |
10 | from collections import OrderedDict |
11 | from datetime import datetime, timezone | |
12 | import logging | |
20effc67 TL |
13 | from threading import Timer, Lock |
14 | from typing import cast, Any, Callable, Dict, Iterator, List, Set, Optional, \ | |
15 | Tuple, TypeVar, Union, Type | |
16 | from types import TracebackType | |
f67539c2 | 17 | import sqlite3 |
20effc67 | 18 | from .schedule import Schedule |
f67539c2 TL |
19 | import traceback |
20 | ||
21 | ||
22 | MAX_SNAPS_PER_PATH = 50 | |
23 | SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule' | |
24 | SNAP_DB_PREFIX = 'snap_db' | |
25 | # increment this every time the db schema changes and provide upgrade code | |
26 | SNAP_DB_VERSION = '0' | |
27 | SNAP_DB_OBJECT_NAME = f'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}' | |
2a845540 TL |
28 | # scheduled snapshots are tz suffixed |
29 | SNAPSHOT_TS_FORMAT_TZ = '%Y-%m-%d-%H_%M_%S_%Z' | |
30 | # for backward compat snapshot name parsing | |
f67539c2 | 31 | SNAPSHOT_TS_FORMAT = '%Y-%m-%d-%H_%M_%S' |
2a845540 TL |
32 | # length of timestamp format (without tz suffix) |
33 | # e.g.: scheduled-2022-04-19-05_39_00_UTC (len = "2022-04-19-05_39_00") | |
34 | SNAPSHOT_TS_FORMAT_LEN = 19 | |
f67539c2 TL |
35 | SNAPSHOT_PREFIX = 'scheduled' |
36 | ||
37 | log = logging.getLogger(__name__) | |
38 | ||
39 | ||
20effc67 TL |
40 | CephfsClientT = TypeVar('CephfsClientT', bound=CephfsClient) |
41 | ||
42 | ||
f67539c2 | 43 | @contextmanager |
20effc67 TL |
44 | def open_ioctx(self: CephfsClientT, |
45 | pool: Union[int, str]) -> Iterator[rados.Ioctx]: | |
f67539c2 TL |
46 | try: |
47 | if type(pool) is int: | |
48 | with self.mgr.rados.open_ioctx2(pool) as ioctx: | |
49 | ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE) | |
50 | yield ioctx | |
51 | else: | |
52 | with self.mgr.rados.open_ioctx(pool) as ioctx: | |
53 | ioctx.set_namespace(SNAP_SCHEDULE_NAMESPACE) | |
54 | yield ioctx | |
55 | except rados.ObjectNotFound: | |
56 | log.error("Failed to locate pool {}".format(pool)) | |
57 | raise | |
58 | ||
59 | ||
20effc67 TL |
60 | FuncT = TypeVar('FuncT', bound=Callable[..., None]) |
61 | ||
62 | ||
63 | def updates_schedule_db(func: FuncT) -> FuncT: | |
64 | def f(self: 'SnapSchedClient', fs: str, schedule_or_path: str, *args: Any) -> None: | |
65 | ret = func(self, fs, schedule_or_path, *args) | |
f67539c2 TL |
66 | path = schedule_or_path |
67 | if isinstance(schedule_or_path, Schedule): | |
68 | path = schedule_or_path.path | |
69 | self.refresh_snap_timers(fs, path) | |
20effc67 TL |
70 | return ret |
71 | return cast(FuncT, f) | |
f67539c2 TL |
72 | |
73 | ||
20effc67 TL |
74 | def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]], |
75 | retention: Dict[str, int]) -> Set: | |
f67539c2 TL |
76 | PRUNING_PATTERNS = OrderedDict([ |
77 | # n is for keep last n snapshots, uses the snapshot name timestamp | |
78 | # format for lowest granularity | |
2a845540 | 79 | # NOTE: prune set has tz suffix stripped out. |
f67539c2 TL |
80 | ("n", SNAPSHOT_TS_FORMAT), |
81 | # TODO remove M for release | |
82 | ("M", '%Y-%m-%d-%H_%M'), | |
83 | ("h", '%Y-%m-%d-%H'), | |
84 | ("d", '%Y-%m-%d'), | |
85 | ("w", '%G-%V'), | |
86 | ("m", '%Y-%m'), | |
87 | ("y", '%Y'), | |
88 | ]) | |
89 | keep = [] | |
90 | if not retention: | |
91 | log.info(f'no retention set, assuming n: {MAX_SNAPS_PER_PATH}') | |
92 | retention = {'n': MAX_SNAPS_PER_PATH} | |
93 | for period, date_pattern in PRUNING_PATTERNS.items(): | |
94 | log.debug(f'compiling keep set for period {period}') | |
95 | period_count = retention.get(period, 0) | |
96 | if not period_count: | |
97 | continue | |
98 | last = None | |
a4b75251 | 99 | kept_for_this_period = 0 |
f67539c2 TL |
100 | for snap in sorted(candidates, key=lambda x: x[0].d_name, |
101 | reverse=True): | |
102 | snap_ts = snap[1].strftime(date_pattern) | |
103 | if snap_ts != last: | |
104 | last = snap_ts | |
105 | if snap not in keep: | |
20effc67 TL |
106 | log.debug((f'keeping {snap[0].d_name} due to ' |
107 | f'{period_count}{period}')) | |
f67539c2 | 108 | keep.append(snap) |
a4b75251 TL |
109 | kept_for_this_period += 1 |
110 | if kept_for_this_period == period_count: | |
111 | log.debug(('found enough snapshots for ' | |
112 | f'{period_count}{period}')) | |
f67539c2 TL |
113 | break |
114 | if len(keep) > MAX_SNAPS_PER_PATH: | |
20effc67 TL |
115 | log.info((f'Would keep more then {MAX_SNAPS_PER_PATH}, ' |
116 | 'pruning keep set')) | |
f67539c2 TL |
117 | keep = keep[:MAX_SNAPS_PER_PATH] |
118 | return candidates - set(keep) | |
119 | ||
2a845540 TL |
120 | def snap_name_to_timestamp(scheduled_snap_name: str) -> str: |
121 | """ extract timestamp from a schedule snapshot with tz suffix stripped out """ | |
122 | ts = scheduled_snap_name.lstrip(f'{SNAPSHOT_PREFIX}-') | |
123 | return ts[0:SNAPSHOT_TS_FORMAT_LEN] | |
f67539c2 | 124 | |
20effc67 TL |
125 | class DBInfo(): |
126 | def __init__(self, fs: str, db: sqlite3.Connection): | |
127 | self.fs: str = fs | |
128 | self.lock: Lock = Lock() | |
129 | self.db: sqlite3.Connection = db | |
130 | ||
131 | ||
132 | # context manager for serializing db connection usage | |
133 | class DBConnectionManager(): | |
134 | def __init__(self, info: DBInfo): | |
135 | self.dbinfo: DBInfo = info | |
136 | ||
137 | # using string as return type hint since __future__.annotations is not | |
138 | # available with Python 3.6; its avaialbe starting from Pytohn 3.7 | |
139 | def __enter__(self) -> 'DBConnectionManager': | |
140 | log.debug(f'locking db connection for {self.dbinfo.fs}') | |
141 | self.dbinfo.lock.acquire() | |
142 | log.debug(f'locked db connection for {self.dbinfo.fs}') | |
143 | return self | |
144 | ||
145 | def __exit__(self, | |
146 | exception_type: Optional[Type[BaseException]], | |
147 | exception_value: Optional[BaseException], | |
148 | traceback: Optional[TracebackType]) -> None: | |
149 | log.debug(f'unlocking db connection for {self.dbinfo.fs}') | |
150 | self.dbinfo.lock.release() | |
151 | log.debug(f'unlocked db connection for {self.dbinfo.fs}') | |
152 | ||
153 | ||
f67539c2 TL |
154 | class SnapSchedClient(CephfsClient): |
155 | ||
20effc67 | 156 | def __init__(self, mgr: Any) -> None: |
f67539c2 | 157 | super(SnapSchedClient, self).__init__(mgr) |
20effc67 TL |
158 | # Each db connection is now guarded by a Lock; this is required to |
159 | # avoid concurrent DB transactions when more than one paths in a | |
160 | # file-system are scheduled at the same interval eg. 1h; without the | |
161 | # lock, there are races to use the same connection, causing nested | |
162 | # transactions to be aborted | |
163 | self.sqlite_connections: Dict[str, DBInfo] = {} | |
164 | self.active_timers: Dict[Tuple[str, str], List[Timer]] = {} | |
165 | self.conn_lock: Lock = Lock() # lock to protect add/lookup db connections | |
f67539c2 | 166 | |
1d09f67e TL |
167 | # restart old schedules |
168 | for fs_name in self.get_all_filesystems(): | |
169 | with self.get_schedule_db(fs_name) as conn_mgr: | |
170 | db = conn_mgr.dbinfo.db | |
171 | sched_list = Schedule.list_all_schedules(db, fs_name) | |
172 | for sched in sched_list: | |
173 | self.refresh_snap_timers(fs_name, sched.path, db) | |
174 | ||
f67539c2 | 175 | @property |
20effc67 | 176 | def allow_minute_snaps(self) -> None: |
f67539c2 TL |
177 | return self.mgr.get_module_option('allow_m_granularity') |
178 | ||
20effc67 TL |
179 | @property |
180 | def dump_on_update(self) -> None: | |
181 | return self.mgr.get_module_option('dump_on_update') | |
182 | ||
183 | def get_schedule_db(self, fs: str) -> DBConnectionManager: | |
184 | dbinfo = None | |
185 | self.conn_lock.acquire() | |
f67539c2 | 186 | if fs not in self.sqlite_connections: |
20effc67 TL |
187 | poolid = self.get_metadata_pool(fs) |
188 | assert poolid, f'fs "{fs}" not found' | |
189 | uri = f"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph" | |
190 | log.debug(f"using uri {uri}") | |
191 | db = sqlite3.connect(uri, check_same_thread=False, uri=True) | |
192 | db.execute('PRAGMA FOREIGN_KEYS = 1') | |
193 | db.execute('PRAGMA JOURNAL_MODE = PERSIST') | |
194 | db.execute('PRAGMA PAGE_SIZE = 65536') | |
195 | db.execute('PRAGMA CACHE_SIZE = 256') | |
196 | db.row_factory = sqlite3.Row | |
197 | # check for legacy dump store | |
198 | pool_param = cast(Union[int, str], poolid) | |
199 | with open_ioctx(self, pool_param) as ioctx: | |
200 | try: | |
201 | size, _mtime = ioctx.stat(SNAP_DB_OBJECT_NAME) | |
202 | dump = ioctx.read(SNAP_DB_OBJECT_NAME, size).decode('utf-8') | |
203 | db.executescript(dump) | |
2a845540 | 204 | ioctx.remove_object(SNAP_DB_OBJECT_NAME) |
20effc67 TL |
205 | except rados.ObjectNotFound: |
206 | log.debug(f'No legacy schedule DB found in {fs}') | |
207 | db.executescript(Schedule.CREATE_TABLES) | |
208 | self.sqlite_connections[fs] = DBInfo(fs, db) | |
209 | dbinfo = self.sqlite_connections[fs] | |
210 | self.conn_lock.release() | |
211 | return DBConnectionManager(dbinfo) | |
212 | ||
213 | def _is_allowed_repeat(self, exec_row: Dict[str, str], path: str) -> bool: | |
f67539c2 TL |
214 | if Schedule.parse_schedule(exec_row['schedule'])[1] == 'M': |
215 | if self.allow_minute_snaps: | |
20effc67 TL |
216 | log.debug(('Minute repeats allowed, ' |
217 | f'scheduling snapshot on path {path}')) | |
f67539c2 TL |
218 | return True |
219 | else: | |
20effc67 TL |
220 | log.info(('Minute repeats disabled, ' |
221 | f'skipping snapshot on path {path}')) | |
f67539c2 TL |
222 | return False |
223 | else: | |
224 | return True | |
225 | ||
20effc67 TL |
226 | def fetch_schedules(self, db: sqlite3.Connection, path: str) -> List[sqlite3.Row]: |
227 | with db: | |
228 | if self.dump_on_update: | |
229 | dump = [line for line in db.iterdump()] | |
230 | dump = "\n".join(dump) | |
231 | log.debug(f"db dump:\n{dump}") | |
232 | cur = db.execute(Schedule.EXEC_QUERY, (path,)) | |
233 | all_rows = cur.fetchall() | |
234 | rows = [r for r in all_rows | |
235 | if self._is_allowed_repeat(r, path)][0:1] | |
236 | return rows | |
f67539c2 | 237 | |
20effc67 | 238 | def refresh_snap_timers(self, fs: str, path: str, olddb: Optional[sqlite3.Connection] = None) -> None: |
f67539c2 | 239 | try: |
20effc67 TL |
240 | log.debug((f'SnapDB on {fs} changed for {path}, ' |
241 | 'updating next Timer')) | |
f67539c2 | 242 | rows = [] |
20effc67 TL |
243 | # olddb is passed in the case where we land here without a timer |
244 | # the lock on the db connection has already been taken | |
245 | if olddb: | |
246 | rows = self.fetch_schedules(olddb, path) | |
247 | else: | |
248 | with self.get_schedule_db(fs) as conn_mgr: | |
249 | db = conn_mgr.dbinfo.db | |
250 | rows = self.fetch_schedules(db, path) | |
f67539c2 TL |
251 | timers = self.active_timers.get((fs, path), []) |
252 | for timer in timers: | |
253 | timer.cancel() | |
254 | timers = [] | |
255 | for row in rows: | |
256 | log.debug(f'Creating new snapshot timer for {path}') | |
257 | t = Timer(row[1], | |
258 | self.create_scheduled_snapshot, | |
259 | args=[fs, path, row[0], row[2], row[3]]) | |
260 | t.start() | |
261 | timers.append(t) | |
262 | log.debug(f'Will snapshot {path} in fs {fs} in {row[1]}s') | |
263 | self.active_timers[(fs, path)] = timers | |
264 | except Exception: | |
265 | self._log_exception('refresh_snap_timers') | |
266 | ||
20effc67 | 267 | def _log_exception(self, fct: str) -> None: |
f67539c2 TL |
268 | log.error(f'{fct} raised an exception:') |
269 | log.error(traceback.format_exc()) | |
270 | ||
20effc67 TL |
271 | def create_scheduled_snapshot(self, |
272 | fs_name: str, | |
273 | path: str, | |
274 | retention: str, | |
275 | start: str, | |
276 | repeat: str) -> None: | |
f67539c2 TL |
277 | log.debug(f'Scheduled snapshot of {path} triggered') |
278 | try: | |
20effc67 TL |
279 | with self.get_schedule_db(fs_name) as conn_mgr: |
280 | db = conn_mgr.dbinfo.db | |
281 | try: | |
282 | sched = Schedule.get_db_schedules(path, | |
283 | db, | |
284 | fs_name, | |
285 | repeat=repeat, | |
286 | start=start)[0] | |
287 | time = datetime.now(timezone.utc) | |
288 | with open_filesystem(self, fs_name) as fs_handle: | |
2a845540 TL |
289 | snap_ts = time.strftime(SNAPSHOT_TS_FORMAT_TZ) |
290 | snap_dir = self.mgr.rados.conf_get('client_snapdir') | |
291 | snap_name = f'{path}/{snap_dir}/{SNAPSHOT_PREFIX}-{snap_ts}' | |
20effc67 TL |
292 | fs_handle.mkdir(snap_name, 0o755) |
293 | log.info(f'created scheduled snapshot of {path}') | |
294 | log.debug(f'created scheduled snapshot {snap_name}') | |
295 | sched.update_last(time, db) | |
296 | except cephfs.Error: | |
297 | self._log_exception('create_scheduled_snapshot') | |
298 | sched.set_inactive(db) | |
299 | except Exception: | |
300 | # catch all exceptions cause otherwise we'll never know since this | |
301 | # is running in a thread | |
302 | self._log_exception('create_scheduled_snapshot') | |
f67539c2 | 303 | finally: |
20effc67 TL |
304 | with self.get_schedule_db(fs_name) as conn_mgr: |
305 | db = conn_mgr.dbinfo.db | |
306 | self.refresh_snap_timers(fs_name, path, db) | |
f67539c2 TL |
307 | self.prune_snapshots(sched) |
308 | ||
20effc67 | 309 | def prune_snapshots(self, sched: Schedule) -> None: |
f67539c2 TL |
310 | try: |
311 | log.debug('Pruning snapshots') | |
312 | ret = sched.retention | |
313 | path = sched.path | |
314 | prune_candidates = set() | |
315 | time = datetime.now(timezone.utc) | |
316 | with open_filesystem(self, sched.fs) as fs_handle: | |
2a845540 TL |
317 | snap_dir = self.mgr.rados.conf_get('client_snapdir') |
318 | with fs_handle.opendir(f'{path}/{snap_dir}') as d_handle: | |
f67539c2 TL |
319 | dir_ = fs_handle.readdir(d_handle) |
320 | while dir_: | |
321 | if dir_.d_name.decode('utf-8').startswith(f'{SNAPSHOT_PREFIX}-'): | |
322 | log.debug(f'add {dir_.d_name} to pruning') | |
323 | ts = datetime.strptime( | |
2a845540 | 324 | snap_name_to_timestamp(dir_.d_name.decode('utf-8')), SNAPSHOT_TS_FORMAT) |
f67539c2 TL |
325 | prune_candidates.add((dir_, ts)) |
326 | else: | |
327 | log.debug(f'skipping dir entry {dir_.d_name}') | |
328 | dir_ = fs_handle.readdir(d_handle) | |
329 | to_prune = get_prune_set(prune_candidates, ret) | |
330 | for k in to_prune: | |
331 | dirname = k[0].d_name.decode('utf-8') | |
332 | log.debug(f'rmdir on {dirname}') | |
2a845540 | 333 | fs_handle.rmdir(f'{path}/{snap_dir}/{dirname}') |
f67539c2 | 334 | if to_prune: |
20effc67 TL |
335 | with self.get_schedule_db(sched.fs) as conn_mgr: |
336 | db = conn_mgr.dbinfo.db | |
337 | sched.update_pruned(time, db, len(to_prune)) | |
f67539c2 TL |
338 | except Exception: |
339 | self._log_exception('prune_snapshots') | |
340 | ||
20effc67 TL |
341 | def get_snap_schedules(self, fs: str, path: str) -> List[Schedule]: |
342 | with self.get_schedule_db(fs) as conn_mgr: | |
343 | db = conn_mgr.dbinfo.db | |
344 | return Schedule.get_db_schedules(path, db, fs) | |
f67539c2 | 345 | |
20effc67 TL |
346 | def list_snap_schedules(self, |
347 | fs: str, | |
348 | path: str, | |
349 | recursive: bool) -> List[Schedule]: | |
350 | with self.get_schedule_db(fs) as conn_mgr: | |
351 | db = conn_mgr.dbinfo.db | |
352 | return Schedule.list_schedules(path, db, fs, recursive) | |
f67539c2 TL |
353 | |
354 | @updates_schedule_db | |
355 | # TODO improve interface | |
20effc67 TL |
356 | def store_snap_schedule(self, |
357 | fs: str, path_: str, | |
358 | args: Tuple[str, str, str, str, | |
359 | Optional[str], Optional[str]]) -> None: | |
f67539c2 TL |
360 | sched = Schedule(*args) |
361 | log.debug(f'repeat is {sched.repeat}') | |
362 | if sched.parse_schedule(sched.schedule)[1] == 'M' and not self.allow_minute_snaps: | |
363 | log.error('not allowed') | |
364 | raise ValueError('no minute snaps allowed') | |
365 | log.debug(f'attempting to add schedule {sched}') | |
20effc67 TL |
366 | with self.get_schedule_db(fs) as conn_mgr: |
367 | db = conn_mgr.dbinfo.db | |
368 | sched.store_schedule(db) | |
f67539c2 TL |
369 | |
370 | @updates_schedule_db | |
20effc67 TL |
371 | def rm_snap_schedule(self, |
372 | fs: str, path: str, | |
373 | schedule: Optional[str], | |
374 | start: Optional[str]) -> None: | |
375 | with self.get_schedule_db(fs) as conn_mgr: | |
376 | db = conn_mgr.dbinfo.db | |
377 | Schedule.rm_schedule(db, path, schedule, start) | |
f67539c2 TL |
378 | |
379 | @updates_schedule_db | |
380 | def add_retention_spec(self, | |
20effc67 TL |
381 | fs: str, |
382 | path: str, | |
383 | retention_spec_or_period: str, | |
384 | retention_count: Optional[str]) -> None: | |
f67539c2 TL |
385 | retention_spec = retention_spec_or_period |
386 | if retention_count: | |
387 | retention_spec = retention_count + retention_spec | |
20effc67 TL |
388 | with self.get_schedule_db(fs) as conn_mgr: |
389 | db = conn_mgr.dbinfo.db | |
390 | Schedule.add_retention(db, path, retention_spec) | |
f67539c2 TL |
391 | |
392 | @updates_schedule_db | |
393 | def rm_retention_spec(self, | |
20effc67 TL |
394 | fs: str, |
395 | path: str, | |
396 | retention_spec_or_period: str, | |
397 | retention_count: Optional[str]) -> None: | |
f67539c2 TL |
398 | retention_spec = retention_spec_or_period |
399 | if retention_count: | |
400 | retention_spec = retention_count + retention_spec | |
20effc67 TL |
401 | with self.get_schedule_db(fs) as conn_mgr: |
402 | db = conn_mgr.dbinfo.db | |
403 | Schedule.rm_retention(db, path, retention_spec) | |
f67539c2 TL |
404 | |
405 | @updates_schedule_db | |
20effc67 TL |
406 | def activate_snap_schedule(self, |
407 | fs: str, | |
408 | path: str, | |
409 | schedule: Optional[str], | |
410 | start: Optional[str]) -> None: | |
411 | with self.get_schedule_db(fs) as conn_mgr: | |
412 | db = conn_mgr.dbinfo.db | |
413 | schedules = Schedule.get_db_schedules(path, db, fs, | |
414 | schedule=schedule, | |
415 | start=start) | |
416 | for s in schedules: | |
417 | s.set_active(db) | |
f67539c2 TL |
418 | |
419 | @updates_schedule_db | |
20effc67 TL |
420 | def deactivate_snap_schedule(self, |
421 | fs: str, path: str, | |
422 | schedule: Optional[str], | |
423 | start: Optional[str]) -> None: | |
424 | with self.get_schedule_db(fs) as conn_mgr: | |
425 | db = conn_mgr.dbinfo.db | |
426 | schedules = Schedule.get_db_schedules(path, db, fs, | |
427 | schedule=schedule, | |
428 | start=start) | |
429 | for s in schedules: | |
430 | s.set_inactive(db) |