2 Copyright (C) 2020 SUSE
4 LGPL2.1. See file COPYING.
8 from contextlib
import contextmanager
9 from mgr_util
import CephfsClient
, open_filesystem
10 from collections
import OrderedDict
11 from datetime
import datetime
, timezone
13 from threading
import Timer
, Lock
14 from typing
import cast
, Any
, Callable
, Dict
, Iterator
, List
, Set
, Optional
, \
15 Tuple
, TypeVar
, Union
, Type
16 from types
import TracebackType
18 from .schedule
import Schedule
22 MAX_SNAPS_PER_PATH
= 50
23 SNAP_SCHEDULE_NAMESPACE
= 'cephfs-snap-schedule'
24 SNAP_DB_PREFIX
= 'snap_db'
25 # increment this every time the db schema changes and provide upgrade code
27 SNAP_DB_OBJECT_NAME
= f
'{SNAP_DB_PREFIX}_v{SNAP_DB_VERSION}'
28 SNAPSHOT_TS_FORMAT
= '%Y-%m-%d-%H_%M_%S'
29 SNAPSHOT_PREFIX
= 'scheduled'
31 log
= logging
.getLogger(__name__
)
34 CephfsClientT
= TypeVar('CephfsClientT', bound
=CephfsClient
)
38 def open_ioctx(self
: CephfsClientT
,
39 pool
: Union
[int, str]) -> Iterator
[rados
.Ioctx
]:
42 with self
.mgr
.rados
.open_ioctx2(pool
) as ioctx
:
43 ioctx
.set_namespace(SNAP_SCHEDULE_NAMESPACE
)
46 with self
.mgr
.rados
.open_ioctx(pool
) as ioctx
:
47 ioctx
.set_namespace(SNAP_SCHEDULE_NAMESPACE
)
49 except rados
.ObjectNotFound
:
50 log
.error("Failed to locate pool {}".format(pool
))
54 FuncT
= TypeVar('FuncT', bound
=Callable
[..., None])
57 def updates_schedule_db(func
: FuncT
) -> FuncT
:
58 def f(self
: 'SnapSchedClient', fs
: str, schedule_or_path
: str, *args
: Any
) -> None:
59 ret
= func(self
, fs
, schedule_or_path
, *args
)
60 path
= schedule_or_path
61 if isinstance(schedule_or_path
, Schedule
):
62 path
= schedule_or_path
.path
63 self
.refresh_snap_timers(fs
, path
)
68 def get_prune_set(candidates
: Set
[Tuple
[cephfs
.DirEntry
, datetime
]],
69 retention
: Dict
[str, int]) -> Set
:
70 PRUNING_PATTERNS
= OrderedDict([
71 # n is for keep last n snapshots, uses the snapshot name timestamp
72 # format for lowest granularity
73 ("n", SNAPSHOT_TS_FORMAT
),
74 # TODO remove M for release
75 ("M", '%Y-%m-%d-%H_%M'),
84 log
.info(f
'no retention set, assuming n: {MAX_SNAPS_PER_PATH}')
85 retention
= {'n': MAX_SNAPS_PER_PATH
}
86 for period
, date_pattern
in PRUNING_PATTERNS
.items():
87 log
.debug(f
'compiling keep set for period {period}')
88 period_count
= retention
.get(period
, 0)
92 kept_for_this_period
= 0
93 for snap
in sorted(candidates
, key
=lambda x
: x
[0].d_name
,
95 snap_ts
= snap
[1].strftime(date_pattern
)
99 log
.debug((f
'keeping {snap[0].d_name} due to '
100 f
'{period_count}{period}'))
102 kept_for_this_period
+= 1
103 if kept_for_this_period
== period_count
:
104 log
.debug(('found enough snapshots for '
105 f
'{period_count}{period}'))
107 if len(keep
) > MAX_SNAPS_PER_PATH
:
108 log
.info((f
'Would keep more then {MAX_SNAPS_PER_PATH}, '
110 keep
= keep
[:MAX_SNAPS_PER_PATH
]
111 return candidates
- set(keep
)
115 def __init__(self
, fs
: str, db
: sqlite3
.Connection
):
117 self
.lock
: Lock
= Lock()
118 self
.db
: sqlite3
.Connection
= db
121 # context manager for serializing db connection usage
122 class DBConnectionManager():
123 def __init__(self
, info
: DBInfo
):
124 self
.dbinfo
: DBInfo
= info
126 # using string as return type hint since __future__.annotations is not
127 # available with Python 3.6; its avaialbe starting from Pytohn 3.7
128 def __enter__(self
) -> 'DBConnectionManager':
129 log
.debug(f
'locking db connection for {self.dbinfo.fs}')
130 self
.dbinfo
.lock
.acquire()
131 log
.debug(f
'locked db connection for {self.dbinfo.fs}')
135 exception_type
: Optional
[Type
[BaseException
]],
136 exception_value
: Optional
[BaseException
],
137 traceback
: Optional
[TracebackType
]) -> None:
138 log
.debug(f
'unlocking db connection for {self.dbinfo.fs}')
139 self
.dbinfo
.lock
.release()
140 log
.debug(f
'unlocked db connection for {self.dbinfo.fs}')
143 class SnapSchedClient(CephfsClient
):
145 def __init__(self
, mgr
: Any
) -> None:
146 super(SnapSchedClient
, self
).__init
__(mgr
)
147 # Each db connection is now guarded by a Lock; this is required to
148 # avoid concurrent DB transactions when more than one paths in a
149 # file-system are scheduled at the same interval eg. 1h; without the
150 # lock, there are races to use the same connection, causing nested
151 # transactions to be aborted
152 self
.sqlite_connections
: Dict
[str, DBInfo
] = {}
153 self
.active_timers
: Dict
[Tuple
[str, str], List
[Timer
]] = {}
154 self
.conn_lock
: Lock
= Lock() # lock to protect add/lookup db connections
156 # restart old schedules
157 for fs_name
in self
.get_all_filesystems():
158 with self
.get_schedule_db(fs_name
) as conn_mgr
:
159 db
= conn_mgr
.dbinfo
.db
160 sched_list
= Schedule
.list_all_schedules(db
, fs_name
)
161 for sched
in sched_list
:
162 self
.refresh_snap_timers(fs_name
, sched
.path
, db
)
165 def allow_minute_snaps(self
) -> None:
166 return self
.mgr
.get_module_option('allow_m_granularity')
169 def dump_on_update(self
) -> None:
170 return self
.mgr
.get_module_option('dump_on_update')
172 def get_schedule_db(self
, fs
: str) -> DBConnectionManager
:
174 self
.conn_lock
.acquire()
175 if fs
not in self
.sqlite_connections
:
176 poolid
= self
.get_metadata_pool(fs
)
177 assert poolid
, f
'fs "{fs}" not found'
178 uri
= f
"file:///*{poolid}:/{SNAP_DB_OBJECT_NAME}.db?vfs=ceph"
179 log
.debug(f
"using uri {uri}")
180 db
= sqlite3
.connect(uri
, check_same_thread
=False, uri
=True)
181 db
.execute('PRAGMA FOREIGN_KEYS = 1')
182 db
.execute('PRAGMA JOURNAL_MODE = PERSIST')
183 db
.execute('PRAGMA PAGE_SIZE = 65536')
184 db
.execute('PRAGMA CACHE_SIZE = 256')
185 db
.row_factory
= sqlite3
.Row
186 # check for legacy dump store
187 pool_param
= cast(Union
[int, str], poolid
)
188 with
open_ioctx(self
, pool_param
) as ioctx
:
190 size
, _mtime
= ioctx
.stat(SNAP_DB_OBJECT_NAME
)
191 dump
= ioctx
.read(SNAP_DB_OBJECT_NAME
, size
).decode('utf-8')
192 db
.executescript(dump
)
193 ioctx
.remove(SNAP_DB_OBJECT_NAME
)
194 except rados
.ObjectNotFound
:
195 log
.debug(f
'No legacy schedule DB found in {fs}')
196 db
.executescript(Schedule
.CREATE_TABLES
)
197 self
.sqlite_connections
[fs
] = DBInfo(fs
, db
)
198 dbinfo
= self
.sqlite_connections
[fs
]
199 self
.conn_lock
.release()
200 return DBConnectionManager(dbinfo
)
202 def _is_allowed_repeat(self
, exec_row
: Dict
[str, str], path
: str) -> bool:
203 if Schedule
.parse_schedule(exec_row
['schedule'])[1] == 'M':
204 if self
.allow_minute_snaps
:
205 log
.debug(('Minute repeats allowed, '
206 f
'scheduling snapshot on path {path}'))
209 log
.info(('Minute repeats disabled, '
210 f
'skipping snapshot on path {path}'))
215 def fetch_schedules(self
, db
: sqlite3
.Connection
, path
: str) -> List
[sqlite3
.Row
]:
217 if self
.dump_on_update
:
218 dump
= [line
for line
in db
.iterdump()]
219 dump
= "\n".join(dump
)
220 log
.debug(f
"db dump:\n{dump}")
221 cur
= db
.execute(Schedule
.EXEC_QUERY
, (path
,))
222 all_rows
= cur
.fetchall()
223 rows
= [r
for r
in all_rows
224 if self
._is
_allowed
_repeat
(r
, path
)][0:1]
227 def refresh_snap_timers(self
, fs
: str, path
: str, olddb
: Optional
[sqlite3
.Connection
] = None) -> None:
229 log
.debug((f
'SnapDB on {fs} changed for {path}, '
230 'updating next Timer'))
232 # olddb is passed in the case where we land here without a timer
233 # the lock on the db connection has already been taken
235 rows
= self
.fetch_schedules(olddb
, path
)
237 with self
.get_schedule_db(fs
) as conn_mgr
:
238 db
= conn_mgr
.dbinfo
.db
239 rows
= self
.fetch_schedules(db
, path
)
240 timers
= self
.active_timers
.get((fs
, path
), [])
245 log
.debug(f
'Creating new snapshot timer for {path}')
247 self
.create_scheduled_snapshot
,
248 args
=[fs
, path
, row
[0], row
[2], row
[3]])
251 log
.debug(f
'Will snapshot {path} in fs {fs} in {row[1]}s')
252 self
.active_timers
[(fs
, path
)] = timers
254 self
._log
_exception
('refresh_snap_timers')
256 def _log_exception(self
, fct
: str) -> None:
257 log
.error(f
'{fct} raised an exception:')
258 log
.error(traceback
.format_exc())
260 def create_scheduled_snapshot(self
,
265 repeat
: str) -> None:
266 log
.debug(f
'Scheduled snapshot of {path} triggered')
268 with self
.get_schedule_db(fs_name
) as conn_mgr
:
269 db
= conn_mgr
.dbinfo
.db
271 sched
= Schedule
.get_db_schedules(path
,
276 time
= datetime
.now(timezone
.utc
)
277 with
open_filesystem(self
, fs_name
) as fs_handle
:
278 snap_ts
= time
.strftime(SNAPSHOT_TS_FORMAT
)
279 snap_name
= f
'{path}/.snap/{SNAPSHOT_PREFIX}-{snap_ts}'
280 fs_handle
.mkdir(snap_name
, 0o755)
281 log
.info(f
'created scheduled snapshot of {path}')
282 log
.debug(f
'created scheduled snapshot {snap_name}')
283 sched
.update_last(time
, db
)
285 self
._log
_exception
('create_scheduled_snapshot')
286 sched
.set_inactive(db
)
288 # catch all exceptions cause otherwise we'll never know since this
289 # is running in a thread
290 self
._log
_exception
('create_scheduled_snapshot')
292 with self
.get_schedule_db(fs_name
) as conn_mgr
:
293 db
= conn_mgr
.dbinfo
.db
294 self
.refresh_snap_timers(fs_name
, path
, db
)
295 self
.prune_snapshots(sched
)
297 def prune_snapshots(self
, sched
: Schedule
) -> None:
299 log
.debug('Pruning snapshots')
300 ret
= sched
.retention
302 prune_candidates
= set()
303 time
= datetime
.now(timezone
.utc
)
304 with
open_filesystem(self
, sched
.fs
) as fs_handle
:
305 with fs_handle
.opendir(f
'{path}/.snap') as d_handle
:
306 dir_
= fs_handle
.readdir(d_handle
)
308 if dir_
.d_name
.decode('utf-8').startswith(f
'{SNAPSHOT_PREFIX}-'):
309 log
.debug(f
'add {dir_.d_name} to pruning')
310 ts
= datetime
.strptime(
311 dir_
.d_name
.decode('utf-8').lstrip(f
'{SNAPSHOT_PREFIX}-'),
313 prune_candidates
.add((dir_
, ts
))
315 log
.debug(f
'skipping dir entry {dir_.d_name}')
316 dir_
= fs_handle
.readdir(d_handle
)
317 to_prune
= get_prune_set(prune_candidates
, ret
)
319 dirname
= k
[0].d_name
.decode('utf-8')
320 log
.debug(f
'rmdir on {dirname}')
321 fs_handle
.rmdir(f
'{path}/.snap/{dirname}')
323 with self
.get_schedule_db(sched
.fs
) as conn_mgr
:
324 db
= conn_mgr
.dbinfo
.db
325 sched
.update_pruned(time
, db
, len(to_prune
))
327 self
._log
_exception
('prune_snapshots')
329 def get_snap_schedules(self
, fs
: str, path
: str) -> List
[Schedule
]:
330 with self
.get_schedule_db(fs
) as conn_mgr
:
331 db
= conn_mgr
.dbinfo
.db
332 return Schedule
.get_db_schedules(path
, db
, fs
)
334 def list_snap_schedules(self
,
337 recursive
: bool) -> List
[Schedule
]:
338 with self
.get_schedule_db(fs
) as conn_mgr
:
339 db
= conn_mgr
.dbinfo
.db
340 return Schedule
.list_schedules(path
, db
, fs
, recursive
)
343 # TODO improve interface
344 def store_snap_schedule(self
,
346 args
: Tuple
[str, str, str, str,
347 Optional
[str], Optional
[str]]) -> None:
348 sched
= Schedule(*args
)
349 log
.debug(f
'repeat is {sched.repeat}')
350 if sched
.parse_schedule(sched
.schedule
)[1] == 'M' and not self
.allow_minute_snaps
:
351 log
.error('not allowed')
352 raise ValueError('no minute snaps allowed')
353 log
.debug(f
'attempting to add schedule {sched}')
354 with self
.get_schedule_db(fs
) as conn_mgr
:
355 db
= conn_mgr
.dbinfo
.db
356 sched
.store_schedule(db
)
359 def rm_snap_schedule(self
,
361 schedule
: Optional
[str],
362 start
: Optional
[str]) -> None:
363 with self
.get_schedule_db(fs
) as conn_mgr
:
364 db
= conn_mgr
.dbinfo
.db
365 Schedule
.rm_schedule(db
, path
, schedule
, start
)
368 def add_retention_spec(self
,
371 retention_spec_or_period
: str,
372 retention_count
: Optional
[str]) -> None:
373 retention_spec
= retention_spec_or_period
375 retention_spec
= retention_count
+ retention_spec
376 with self
.get_schedule_db(fs
) as conn_mgr
:
377 db
= conn_mgr
.dbinfo
.db
378 Schedule
.add_retention(db
, path
, retention_spec
)
381 def rm_retention_spec(self
,
384 retention_spec_or_period
: str,
385 retention_count
: Optional
[str]) -> None:
386 retention_spec
= retention_spec_or_period
388 retention_spec
= retention_count
+ retention_spec
389 with self
.get_schedule_db(fs
) as conn_mgr
:
390 db
= conn_mgr
.dbinfo
.db
391 Schedule
.rm_retention(db
, path
, retention_spec
)
394 def activate_snap_schedule(self
,
397 schedule
: Optional
[str],
398 start
: Optional
[str]) -> None:
399 with self
.get_schedule_db(fs
) as conn_mgr
:
400 db
= conn_mgr
.dbinfo
.db
401 schedules
= Schedule
.get_db_schedules(path
, db
, fs
,
408 def deactivate_snap_schedule(self
,
410 schedule
: Optional
[str],
411 start
: Optional
[str]) -> None:
412 with self
.get_schedule_db(fs
) as conn_mgr
:
413 db
= conn_mgr
.dbinfo
.db
414 schedules
= Schedule
.get_db_schedules(path
, db
, fs
,