2 Copyright (C) 2020 SUSE
4 LGPL2.1. See file COPYING.
8 from contextlib
import contextmanager
9 from mgr_util
import CephfsClient
, open_filesystem
10 from collections
import OrderedDict
11 from datetime
import datetime
, timezone
13 from threading
import Timer
, Lock
14 from typing
import cast
, Any
, Callable
, Dict
, Iterator
, List
, Set
, Optional
, \
15 Tuple
, TypeVar
, Union
, Type
16 from types
import TracebackType
18 from .schedule
import Schedule
22 MAX_SNAPS_PER_PATH
= 50
23 SNAP_SCHEDULE_NAMESPACE
= 'cephfs-snap-schedule'
24 SNAP_DB_PREFIX
= 'snap_db'
25 # increment this every time the db schema changes and provide upgrade code
27 SNAP_DB_OBJECT_NAME
= f
'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}'
28 # scheduled snapshots are tz suffixed
29 SNAPSHOT_TS_FORMAT_TZ
= '%Y-%m-%d-%H_%M_%S_%Z'
30 # for backward compat snapshot name parsing
31 SNAPSHOT_TS_FORMAT
= '%Y-%m-%d-%H_%M_%S'
32 # length of timestamp format (without tz suffix)
33 # e.g.: scheduled-2022-04-19-05_39_00_UTC (len = "2022-04-19-05_39_00")
34 SNAPSHOT_TS_FORMAT_LEN
= 19
35 SNAPSHOT_PREFIX
= 'scheduled'
37 log
= logging
.getLogger(__name__
)
40 CephfsClientT
= TypeVar('CephfsClientT', bound
=CephfsClient
)
44 def open_ioctx(self
: CephfsClientT
,
45 pool
: Union
[int, str]) -> Iterator
[rados
.Ioctx
]:
48 with self
.mgr
.rados
.open_ioctx2(pool
) as ioctx
:
49 ioctx
.set_namespace(SNAP_SCHEDULE_NAMESPACE
)
52 with self
.mgr
.rados
.open_ioctx(pool
) as ioctx
:
53 ioctx
.set_namespace(SNAP_SCHEDULE_NAMESPACE
)
55 except rados
.ObjectNotFound
:
56 log
.error("Failed to locate pool {}".format(pool
))
60 FuncT
= TypeVar('FuncT', bound
=Callable
[..., None])
63 def updates_schedule_db(func
: FuncT
) -> FuncT
:
64 def f(self
: 'SnapSchedClient', fs
: str, schedule_or_path
: str, *args
: Any
) -> None:
65 ret
= func(self
, fs
, schedule_or_path
, *args
)
66 path
= schedule_or_path
67 if isinstance(schedule_or_path
, Schedule
):
68 path
= schedule_or_path
.path
69 self
.refresh_snap_timers(fs
, path
)
74 def get_prune_set(candidates
: Set
[Tuple
[cephfs
.DirEntry
, datetime
]],
75 retention
: Dict
[str, int]) -> Set
:
76 PRUNING_PATTERNS
= OrderedDict([
77 # n is for keep last n snapshots, uses the snapshot name timestamp
78 # format for lowest granularity
79 # NOTE: prune set has tz suffix stripped out.
80 ("n", SNAPSHOT_TS_FORMAT
),
81 # TODO remove M for release
82 ("M", '%Y-%m-%d-%H_%M'),
91 log
.info(f
'no retention set, assuming n: {MAX_SNAPS_PER_PATH}')
92 retention
= {'n': MAX_SNAPS_PER_PATH
}
93 for period
, date_pattern
in PRUNING_PATTERNS
.items():
94 log
.debug(f
'compiling keep set for period {period}')
95 period_count
= retention
.get(period
, 0)
99 kept_for_this_period
= 0
100 for snap
in sorted(candidates
, key
=lambda x
: x
[0].d_name
,
102 snap_ts
= snap
[1].strftime(date_pattern
)
106 log
.debug((f
'keeping {snap[0].d_name} due to '
107 f
'{period_count}{period}'))
109 kept_for_this_period
+= 1
110 if kept_for_this_period
== period_count
:
111 log
.debug(('found enough snapshots for '
112 f
'{period_count}{period}'))
114 if len(keep
) > MAX_SNAPS_PER_PATH
:
115 log
.info((f
'Would keep more then {MAX_SNAPS_PER_PATH}, '
117 keep
= keep
[:MAX_SNAPS_PER_PATH
]
118 return candidates
- set(keep
)
120 def snap_name_to_timestamp(scheduled_snap_name
: str) -> str:
121 """ extract timestamp from a schedule snapshot with tz suffix stripped out """
122 ts
= scheduled_snap_name
.lstrip(f
'{SNAPSHOT_PREFIX}-')
123 return ts
[0:SNAPSHOT_TS_FORMAT_LEN
]
126 def __init__(self
, fs
: str, db
: sqlite3
.Connection
):
128 self
.lock
: Lock
= Lock()
129 self
.db
: sqlite3
.Connection
= db
132 # context manager for serializing db connection usage
133 class DBConnectionManager():
134 def __init__(self
, info
: DBInfo
):
135 self
.dbinfo
: DBInfo
= info
137 # using string as return type hint since __future__.annotations is not
138 # available with Python 3.6; its avaialbe starting from Pytohn 3.7
139 def __enter__(self
) -> 'DBConnectionManager':
140 log
.debug(f
'locking db connection for {self.dbinfo.fs}')
141 self
.dbinfo
.lock
.acquire()
142 log
.debug(f
'locked db connection for {self.dbinfo.fs}')
146 exception_type
: Optional
[Type
[BaseException
]],
147 exception_value
: Optional
[BaseException
],
148 traceback
: Optional
[TracebackType
]) -> None:
149 log
.debug(f
'unlocking db connection for {self.dbinfo.fs}')
150 self
.dbinfo
.lock
.release()
151 log
.debug(f
'unlocked db connection for {self.dbinfo.fs}')
154 class SnapSchedClient(CephfsClient
):
156 def __init__(self
, mgr
: Any
) -> None:
157 super(SnapSchedClient
, self
).__init
__(mgr
)
158 # Each db connection is now guarded by a Lock; this is required to
159 # avoid concurrent DB transactions when more than one paths in a
160 # file-system are scheduled at the same interval eg. 1h; without the
161 # lock, there are races to use the same connection, causing nested
162 # transactions to be aborted
163 self
.sqlite_connections
: Dict
[str, DBInfo
] = {}
164 self
.active_timers
: Dict
[Tuple
[str, str], List
[Timer
]] = {}
165 self
.conn_lock
: Lock
= Lock() # lock to protect add/lookup db connections
167 # restart old schedules
168 for fs_name
in self
.get_all_filesystems():
169 with self
.get_schedule_db(fs_name
) as conn_mgr
:
170 db
= conn_mgr
.dbinfo
.db
171 sched_list
= Schedule
.list_all_schedules(db
, fs_name
)
172 for sched
in sched_list
:
173 self
.refresh_snap_timers(fs_name
, sched
.path
, db
)
176 def allow_minute_snaps(self
) -> None:
177 return self
.mgr
.get_module_option('allow_m_granularity')
180 def dump_on_update(self
) -> None:
181 return self
.mgr
.get_module_option('dump_on_update')
183 def get_schedule_db(self
, fs
: str) -> DBConnectionManager
:
185 self
.conn_lock
.acquire()
186 if fs
not in self
.sqlite_connections
:
187 poolid
= self
.get_metadata_pool(fs
)
188 assert poolid
, f
'fs "{fs}" not found'
189 uri
= f
"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph"
190 log
.debug(f
"using uri {uri}")
191 db
= sqlite3
.connect(uri
, check_same_thread
=False, uri
=True)
192 db
.execute('PRAGMA FOREIGN_KEYS = 1')
193 db
.execute('PRAGMA JOURNAL_MODE = PERSIST')
194 db
.execute('PRAGMA PAGE_SIZE = 65536')
195 db
.execute('PRAGMA CACHE_SIZE = 256')
196 db
.row_factory
= sqlite3
.Row
197 # check for legacy dump store
198 pool_param
= cast(Union
[int, str], poolid
)
199 with
open_ioctx(self
, pool_param
) as ioctx
:
201 size
, _mtime
= ioctx
.stat(SNAP_DB_OBJECT_NAME
)
202 dump
= ioctx
.read(SNAP_DB_OBJECT_NAME
, size
).decode('utf-8')
203 db
.executescript(dump
)
204 ioctx
.remove_object(SNAP_DB_OBJECT_NAME
)
205 except rados
.ObjectNotFound
:
206 log
.debug(f
'No legacy schedule DB found in {fs}')
207 db
.executescript(Schedule
.CREATE_TABLES
)
208 self
.sqlite_connections
[fs
] = DBInfo(fs
, db
)
209 dbinfo
= self
.sqlite_connections
[fs
]
210 self
.conn_lock
.release()
211 return DBConnectionManager(dbinfo
)
213 def _is_allowed_repeat(self
, exec_row
: Dict
[str, str], path
: str) -> bool:
214 if Schedule
.parse_schedule(exec_row
['schedule'])[1] == 'M':
215 if self
.allow_minute_snaps
:
216 log
.debug(('Minute repeats allowed, '
217 f
'scheduling snapshot on path {path}'))
220 log
.info(('Minute repeats disabled, '
221 f
'skipping snapshot on path {path}'))
226 def fetch_schedules(self
, db
: sqlite3
.Connection
, path
: str) -> List
[sqlite3
.Row
]:
228 if self
.dump_on_update
:
229 dump
= [line
for line
in db
.iterdump()]
230 dump
= "\n".join(dump
)
231 log
.debug(f
"db dump:\n{dump}")
232 cur
= db
.execute(Schedule
.EXEC_QUERY
, (path
,))
233 all_rows
= cur
.fetchall()
234 rows
= [r
for r
in all_rows
235 if self
._is
_allowed
_repeat
(r
, path
)][0:1]
238 def refresh_snap_timers(self
, fs
: str, path
: str, olddb
: Optional
[sqlite3
.Connection
] = None) -> None:
240 log
.debug((f
'SnapDB on {fs} changed for {path}, '
241 'updating next Timer'))
243 # olddb is passed in the case where we land here without a timer
244 # the lock on the db connection has already been taken
246 rows
= self
.fetch_schedules(olddb
, path
)
248 with self
.get_schedule_db(fs
) as conn_mgr
:
249 db
= conn_mgr
.dbinfo
.db
250 rows
= self
.fetch_schedules(db
, path
)
251 timers
= self
.active_timers
.get((fs
, path
), [])
256 log
.debug(f
'Creating new snapshot timer for {path}')
258 self
.create_scheduled_snapshot
,
259 args
=[fs
, path
, row
[0], row
[2], row
[3]])
262 log
.debug(f
'Will snapshot {path} in fs {fs} in {row[1]}s')
263 self
.active_timers
[(fs
, path
)] = timers
265 self
._log
_exception
('refresh_snap_timers')
267 def _log_exception(self
, fct
: str) -> None:
268 log
.error(f
'{fct} raised an exception:')
269 log
.error(traceback
.format_exc())
271 def create_scheduled_snapshot(self
,
276 repeat
: str) -> None:
277 log
.debug(f
'Scheduled snapshot of {path} triggered')
279 with self
.get_schedule_db(fs_name
) as conn_mgr
:
280 db
= conn_mgr
.dbinfo
.db
282 sched
= Schedule
.get_db_schedules(path
,
287 time
= datetime
.now(timezone
.utc
)
288 with
open_filesystem(self
, fs_name
) as fs_handle
:
289 snap_ts
= time
.strftime(SNAPSHOT_TS_FORMAT_TZ
)
290 snap_dir
= self
.mgr
.rados
.conf_get('client_snapdir')
291 snap_name
= f
'{path}/{snap_dir}/{SNAPSHOT_PREFIX}-{snap_ts}'
292 fs_handle
.mkdir(snap_name
, 0o755)
293 log
.info(f
'created scheduled snapshot of {path}')
294 log
.debug(f
'created scheduled snapshot {snap_name}')
295 sched
.update_last(time
, db
)
297 self
._log
_exception
('create_scheduled_snapshot')
298 sched
.set_inactive(db
)
300 # catch all exceptions cause otherwise we'll never know since this
301 # is running in a thread
302 self
._log
_exception
('create_scheduled_snapshot')
304 with self
.get_schedule_db(fs_name
) as conn_mgr
:
305 db
= conn_mgr
.dbinfo
.db
306 self
.refresh_snap_timers(fs_name
, path
, db
)
307 self
.prune_snapshots(sched
)
309 def prune_snapshots(self
, sched
: Schedule
) -> None:
311 log
.debug('Pruning snapshots')
312 ret
= sched
.retention
314 prune_candidates
= set()
315 time
= datetime
.now(timezone
.utc
)
316 with
open_filesystem(self
, sched
.fs
) as fs_handle
:
317 snap_dir
= self
.mgr
.rados
.conf_get('client_snapdir')
318 with fs_handle
.opendir(f
'{path}/{snap_dir}') as d_handle
:
319 dir_
= fs_handle
.readdir(d_handle
)
321 if dir_
.d_name
.decode('utf-8').startswith(f
'{SNAPSHOT_PREFIX}-'):
322 log
.debug(f
'add {dir_.d_name} to pruning')
323 ts
= datetime
.strptime(
324 snap_name_to_timestamp(dir_
.d_name
.decode('utf-8')), SNAPSHOT_TS_FORMAT
)
325 prune_candidates
.add((dir_
, ts
))
327 log
.debug(f
'skipping dir entry {dir_.d_name}')
328 dir_
= fs_handle
.readdir(d_handle
)
329 to_prune
= get_prune_set(prune_candidates
, ret
)
331 dirname
= k
[0].d_name
.decode('utf-8')
332 log
.debug(f
'rmdir on {dirname}')
333 fs_handle
.rmdir(f
'{path}/{snap_dir}/{dirname}')
335 with self
.get_schedule_db(sched
.fs
) as conn_mgr
:
336 db
= conn_mgr
.dbinfo
.db
337 sched
.update_pruned(time
, db
, len(to_prune
))
339 self
._log
_exception
('prune_snapshots')
341 def get_snap_schedules(self
, fs
: str, path
: str) -> List
[Schedule
]:
342 with self
.get_schedule_db(fs
) as conn_mgr
:
343 db
= conn_mgr
.dbinfo
.db
344 return Schedule
.get_db_schedules(path
, db
, fs
)
346 def list_snap_schedules(self
,
349 recursive
: bool) -> List
[Schedule
]:
350 with self
.get_schedule_db(fs
) as conn_mgr
:
351 db
= conn_mgr
.dbinfo
.db
352 return Schedule
.list_schedules(path
, db
, fs
, recursive
)
355 # TODO improve interface
356 def store_snap_schedule(self
,
358 args
: Tuple
[str, str, str, str,
359 Optional
[str], Optional
[str]]) -> None:
360 sched
= Schedule(*args
)
361 log
.debug(f
'repeat is {sched.repeat}')
362 if sched
.parse_schedule(sched
.schedule
)[1] == 'M' and not self
.allow_minute_snaps
:
363 log
.error('not allowed')
364 raise ValueError('no minute snaps allowed')
365 log
.debug(f
'attempting to add schedule {sched}')
366 with self
.get_schedule_db(fs
) as conn_mgr
:
367 db
= conn_mgr
.dbinfo
.db
368 sched
.store_schedule(db
)
371 def rm_snap_schedule(self
,
373 schedule
: Optional
[str],
374 start
: Optional
[str]) -> None:
375 with self
.get_schedule_db(fs
) as conn_mgr
:
376 db
= conn_mgr
.dbinfo
.db
377 Schedule
.rm_schedule(db
, path
, schedule
, start
)
380 def add_retention_spec(self
,
383 retention_spec_or_period
: str,
384 retention_count
: Optional
[str]) -> None:
385 retention_spec
= retention_spec_or_period
387 retention_spec
= retention_count
+ retention_spec
388 with self
.get_schedule_db(fs
) as conn_mgr
:
389 db
= conn_mgr
.dbinfo
.db
390 Schedule
.add_retention(db
, path
, retention_spec
)
393 def rm_retention_spec(self
,
396 retention_spec_or_period
: str,
397 retention_count
: Optional
[str]) -> None:
398 retention_spec
= retention_spec_or_period
400 retention_spec
= retention_count
+ retention_spec
401 with self
.get_schedule_db(fs
) as conn_mgr
:
402 db
= conn_mgr
.dbinfo
.db
403 Schedule
.rm_retention(db
, path
, retention_spec
)
406 def activate_snap_schedule(self
,
409 schedule
: Optional
[str],
410 start
: Optional
[str]) -> None:
411 with self
.get_schedule_db(fs
) as conn_mgr
:
412 db
= conn_mgr
.dbinfo
.db
413 schedules
= Schedule
.get_db_schedules(path
, db
, fs
,
420 def deactivate_snap_schedule(self
,
422 schedule
: Optional
[str],
423 start
: Optional
[str]) -> None:
424 with self
.get_schedule_db(fs
) as conn_mgr
:
425 db
= conn_mgr
.dbinfo
.db
426 schedules
= Schedule
.get_db_schedules(path
, db
, fs
,