]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/snap_schedule/fs/schedule.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / snap_schedule / fs / schedule.py
1 """
2 Copyright (C) 2020 SUSE
3
4 LGPL2.1. See file COPYING.
5 """
6 from datetime import datetime, timezone
7 import json
8 import logging
9 import re
10 import sqlite3
11 from typing import cast, Any, Dict, List, Tuple, Optional, Union
12
13 log = logging.getLogger(__name__)
14
15 # Work around missing datetime.fromisoformat for < python3.7
16 SNAP_DB_TS_FORMAT = '%Y-%m-%dT%H:%M:%S'
17 try:
18 from backports.datetime_fromisoformat import MonkeyPatch
19 MonkeyPatch.patch_fromisoformat()
20 except ImportError:
21 log.debug('backports.datetime_fromisoformat not found')
22
23 try:
24 # have mypy ignore this line. We use the attribute error to detect if we
25 # have fromisoformat or not
26 ts_parser = datetime.fromisoformat # type: ignore
27 log.debug('found datetime.fromisoformat')
28 except AttributeError:
29 log.info(('Couldn\'t find datetime.fromisoformat, falling back to '
30 f'static timestamp parsing ({SNAP_DB_TS_FORMAT}'))
31
32 def ts_parser(data_string: str) -> datetime: # type: ignore
33 try:
34 date = datetime.strptime(data_string, SNAP_DB_TS_FORMAT)
35 return date
36 except ValueError:
37 msg = f'''The date string {data_string} does not match the required format
38 {SNAP_DB_TS_FORMAT}. For more flexibel date parsing upgrade to
39 python3.7 or install
40 https://github.com/movermeyer/backports.datetime_fromisoformat'''
41 log.error(msg)
42 raise ValueError(msg)
43
44
45 def parse_timestamp(ts: str) -> datetime:
46 date = ts_parser(ts)
47 # normalize any non utc timezone to utc. If no tzinfo is supplied, assume
48 # its already utc
49 # import pdb; pdb.set_trace()
50 if date.tzinfo is not timezone.utc and date.tzinfo is not None:
51 date = date.astimezone(timezone.utc)
52 return date
53
54
55 def parse_retention(retention: str) -> Dict[str, int]:
56 ret = {}
57 log.debug(f'parse_retention({retention})')
58 matches = re.findall(r'\d+[a-z]', retention)
59 for m in matches:
60 ret[m[-1]] = int(m[0:-1])
61 matches = re.findall(r'\d+[A-Z]', retention)
62 for m in matches:
63 ret[m[-1]] = int(m[0:-1])
64 log.debug(f'parse_retention({retention}) -> {ret}')
65 return ret
66
67
68 RETENTION_MULTIPLIERS = ['n', 'M', 'h', 'd', 'w', 'm', 'y']
69
70 TableRowT = Dict[str, Union[int, str]]
71
72
73 def dump_retention(retention: Dict[str, str]) -> str:
74 ret = ''
75 for mult in RETENTION_MULTIPLIERS:
76 if mult in retention:
77 ret += str(retention[mult]) + mult
78 return ret
79
80
81 class Schedule(object):
82 '''
83 Wrapper to work with schedules stored in sqlite
84 '''
85 def __init__(self,
86 path: str,
87 schedule: str,
88 fs_name: str,
89 rel_path: str,
90 start: Optional[str] = None,
91 subvol: Optional[str] = None,
92 retention_policy: str = '{}',
93 created: Optional[str] = None,
94 first: Optional[str] = None,
95 last: Optional[str] = None,
96 last_pruned: Optional[str] = None,
97 created_count: int = 0,
98 pruned_count: int = 0,
99 active: bool = True,
100 ) -> None:
101 self.fs = fs_name
102 self.subvol = subvol
103 self.path = path
104 self.rel_path = rel_path
105 self.schedule = schedule
106 self.retention = json.loads(retention_policy)
107 if start is None:
108 now = datetime.now(timezone.utc)
109 self.start = datetime(now.year,
110 now.month,
111 now.day,
112 tzinfo=now.tzinfo)
113 else:
114 self.start = parse_timestamp(start)
115 if created is None:
116 self.created: Optional[datetime] = datetime.now(timezone.utc)
117 else:
118 self.created = parse_timestamp(created)
119 if first:
120 self.first: Optional[datetime] = parse_timestamp(first)
121 else:
122 self.first = None
123 if last:
124 self.last: Optional[datetime] = parse_timestamp(last)
125 else:
126 self.last = None
127 if last_pruned:
128 self.last_pruned: Optional[datetime] = parse_timestamp(last_pruned)
129 else:
130 self.last_pruned = None
131 self.created_count = created_count
132 self.pruned_count = pruned_count
133 self.active = bool(active)
134
135 @classmethod
136 def _from_db_row(cls, table_row: TableRowT, fs: str) -> 'Schedule':
137 return cls(cast(str, table_row['path']),
138 cast(str, table_row['schedule']),
139 fs,
140 cast(str, table_row['rel_path']),
141 cast(str, table_row['start']),
142 cast(str, table_row['subvol']),
143 cast(str, table_row['retention']),
144 cast(str, table_row['created']),
145 cast(str, table_row['first']),
146 cast(str, table_row['last']),
147 cast(str, table_row['last_pruned']),
148 cast(int, table_row['created_count']),
149 cast(int, table_row['pruned_count']),
150 cast(bool, table_row['active']),
151 )
152
153 def __str__(self) -> str:
154 return f'{self.path} {self.schedule} {dump_retention(self.retention)}'
155
156 def json_list(self) -> str:
157 return json.dumps({'path': self.path, 'schedule': self.schedule,
158 'retention': dump_retention(self.retention)})
159
160 CREATE_TABLES = '''CREATE TABLE IF NOT EXISTS schedules(
161 id INTEGER PRIMARY KEY ASC,
162 path TEXT NOT NULL UNIQUE,
163 subvol TEXT,
164 retention TEXT DEFAULT '{}',
165 rel_path TEXT NOT NULL
166 );
167 CREATE TABLE IF NOT EXISTS schedules_meta(
168 id INTEGER PRIMARY KEY ASC,
169 schedule_id INT,
170 start TEXT NOT NULL,
171 first TEXT,
172 last TEXT,
173 last_pruned TEXT,
174 created TEXT NOT NULL,
175 repeat INT NOT NULL,
176 schedule TEXT NOT NULL,
177 created_count INT DEFAULT 0,
178 pruned_count INT DEFAULT 0,
179 active INT NOT NULL,
180 FOREIGN KEY(schedule_id) REFERENCES schedules(id) ON DELETE CASCADE,
181 UNIQUE (schedule_id, start, repeat)
182 );'''
183
184 EXEC_QUERY = '''SELECT
185 s.retention,
186 sm.repeat - (strftime("%s", "now") - strftime("%s", sm.start)) %
187 sm.repeat "until",
188 sm.start, sm.repeat, sm.schedule
189 FROM schedules s
190 INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
191 WHERE
192 s.path = ? AND
193 strftime("%s", "now") - strftime("%s", sm.start) > 0 AND
194 sm.active = 1
195 ORDER BY until;'''
196
197 PROTO_GET_SCHEDULES = '''SELECT
198 s.path, s.subvol, s.rel_path, sm.active,
199 sm.schedule, s.retention, sm.start, sm.first, sm.last,
200 sm.last_pruned, sm.created, sm.created_count, sm.pruned_count
201 FROM schedules s
202 INNER JOIN schedules_meta sm ON sm.schedule_id = s.id
203 WHERE'''
204
205 GET_SCHEDULES = PROTO_GET_SCHEDULES + ' s.path = ?'
206
207 @classmethod
208 def get_db_schedules(cls,
209 path: str,
210 db: sqlite3.Connection,
211 fs: str,
212 schedule: Optional[str] = None,
213 start: Optional[str] = None,
214 repeat: Optional[str] = None) -> List['Schedule']:
215 query = cls.GET_SCHEDULES
216 data: Tuple[Any, ...] = (path,)
217 if repeat:
218 query += ' AND sm.repeat = ?'
219 data += (repeat,)
220 if schedule:
221 query += ' AND sm.schedule = ?'
222 data += (schedule,)
223 if start:
224 query += ' AND sm.start = ?'
225 data += (start,)
226 with db:
227 c = db.execute(query, data)
228 return [cls._from_db_row(row, fs) for row in c.fetchall()]
229
230 @classmethod
231 def list_schedules(cls,
232 path: str,
233 db: sqlite3.Connection,
234 fs: str, recursive: bool) -> List['Schedule']:
235 with db:
236 if recursive:
237 c = db.execute(cls.PROTO_GET_SCHEDULES + ' path LIKE ?',
238 (f'{path}%',))
239 else:
240 c = db.execute(cls.PROTO_GET_SCHEDULES + ' path = ?',
241 (f'{path}',))
242 return [cls._from_db_row(row, fs) for row in c.fetchall()]
243
244 INSERT_SCHEDULE = '''INSERT INTO
245 schedules(path, subvol, retention, rel_path)
246 Values(?, ?, ?, ?);'''
247 INSERT_SCHEDULE_META = '''INSERT INTO
248 schedules_meta(schedule_id, start, created, repeat, schedule,
249 active)
250 SELECT ?, ?, ?, ?, ?, ?'''
251
252 def store_schedule(self, db: sqlite3.Connection) -> None:
253 sched_id = None
254 with db:
255 try:
256 log.debug(f'schedule with retention {self.retention}')
257 c = db.execute(self.INSERT_SCHEDULE,
258 (self.path,
259 self.subvol,
260 json.dumps(self.retention),
261 self.rel_path,))
262 sched_id = c.lastrowid
263 except sqlite3.IntegrityError:
264 # might be adding another schedule, retrieve sched id
265 log.debug((f'found schedule entry for {self.path}, '
266 'trying to add meta'))
267 c = db.execute('SELECT id FROM schedules where path = ?',
268 (self.path,))
269 sched_id = c.fetchone()[0]
270 pass
271 assert self.created, "self.created should be set"
272 db.execute(self.INSERT_SCHEDULE_META,
273 (sched_id,
274 self.start.strftime(SNAP_DB_TS_FORMAT),
275 self.created.strftime(SNAP_DB_TS_FORMAT),
276 self.repeat,
277 self.schedule,
278 1))
279
280 @classmethod
281 def rm_schedule(cls,
282 db: sqlite3.Connection,
283 path: str,
284 repeat: Optional[str],
285 start: Optional[str]) -> None:
286 with db:
287 cur = db.execute('SELECT id FROM schedules WHERE path = ?',
288 (path,))
289 row = cur.fetchone()
290
291 if len(row) == 0:
292 log.info(f'no schedule for {path} found')
293 raise ValueError('SnapSchedule for {} not found'.format(path))
294
295 id_ = tuple(row)
296
297 if repeat or start:
298 meta_delete = ('DELETE FROM schedules_meta '
299 'WHERE schedule_id = ?')
300 delete_param = id_
301 if repeat:
302 meta_delete += ' AND schedule = ?'
303 delete_param += (repeat,)
304 if start:
305 meta_delete += ' AND start = ?'
306 delete_param += (start,)
307 # maybe only delete meta entry
308 log.debug(f'executing {meta_delete}, {delete_param}')
309 res = db.execute(meta_delete + ';', delete_param).rowcount
310 if res < 1:
311 raise ValueError(f'No schedule found for {repeat} {start}')
312 db.execute('COMMIT;')
313 # now check if we have schedules in meta left, if not delete
314 # the schedule as well
315 meta_count = db.execute(
316 'SELECT COUNT() FROM schedules_meta WHERE schedule_id = ?',
317 id_)
318 if meta_count.fetchone() == (0,):
319 log.debug(
320 'no more schedules left, cleaning up schedules table')
321 db.execute('DELETE FROM schedules WHERE id = ?;', id_)
322 else:
323 # just delete the schedule CASCADE DELETE takes care of the
324 # rest
325 db.execute('DELETE FROM schedules WHERE id = ?;', id_)
326
327 GET_RETENTION = '''SELECT retention FROM schedules
328 WHERE path = ?'''
329 UPDATE_RETENTION = '''UPDATE schedules
330 SET retention = ?
331 WHERE path = ?'''
332
333 @classmethod
334 def add_retention(cls,
335 db: sqlite3.Connection,
336 path: str,
337 retention_spec: str) -> None:
338 with db:
339 row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
340 if not row:
341 raise ValueError(f'No schedule found for {path}')
342 retention = parse_retention(retention_spec)
343 if not retention:
344 raise ValueError(f'Retention spec {retention_spec} is invalid')
345 log.debug(f'db result is {tuple(row)}')
346 current = row['retention']
347 current_retention = json.loads(current)
348 for r, v in retention.items():
349 if r in current_retention:
350 msg = (f'Retention for {r} is already present with value'
351 f'{current_retention[r]}. Please remove first')
352 raise ValueError(msg)
353 current_retention.update(retention)
354 db.execute(cls.UPDATE_RETENTION,
355 (json.dumps(current_retention), path))
356
357 @classmethod
358 def rm_retention(cls,
359 db: sqlite3.Connection,
360 path: str,
361 retention_spec: str) -> None:
362 with db:
363 row = db.execute(cls.GET_RETENTION, (path,)).fetchone()
364 if not row:
365 raise ValueError(f'No schedule found for {path}')
366 retention = parse_retention(retention_spec)
367 current = row['retention']
368 current_retention = json.loads(current)
369 for r, v in retention.items():
370 if r not in current_retention or current_retention[r] != v:
371 msg = (f'Retention for {r}: {v} was not set for {path} '
372 'can\'t remove')
373 raise ValueError(msg)
374 current_retention.pop(r)
375 db.execute(cls.UPDATE_RETENTION,
376 (json.dumps(current_retention), path))
377
378 def report(self) -> str:
379 return self.report_json()
380
381 def report_json(self) -> str:
382 return json.dumps(dict(self.__dict__),
383 default=lambda o: o.strftime(SNAP_DB_TS_FORMAT))
384
385 @classmethod
386 def parse_schedule(cls, schedule: str) -> Tuple[int, str]:
387 return int(schedule[0:-1]), schedule[-1]
388
389 @property
390 def repeat(self) -> int:
391 period, mult = self.parse_schedule(self.schedule)
392 if mult == 'M':
393 return period * 60
394 elif mult == 'h':
395 return period * 60 * 60
396 elif mult == 'd':
397 return period * 60 * 60 * 24
398 elif mult == 'w':
399 return period * 60 * 60 * 24 * 7
400 else:
401 raise ValueError(f'schedule multiplier "{mult}" not recognized')
402
403 UPDATE_LAST = '''UPDATE schedules_meta
404 SET
405 last = ?,
406 created_count = created_count + 1,
407 first = CASE WHEN first IS NULL THEN ? ELSE first END
408 WHERE EXISTS(
409 SELECT id
410 FROM schedules s
411 WHERE s.id = schedules_meta.schedule_id
412 AND s.path = ?
413 AND schedules_meta.start = ?
414 AND schedules_meta.repeat = ?);'''
415
416 def update_last(self, time: datetime, db: sqlite3.Connection) -> None:
417 with db:
418 db.execute(self.UPDATE_LAST,
419 (time.strftime(SNAP_DB_TS_FORMAT),
420 time.strftime(SNAP_DB_TS_FORMAT),
421 self.path,
422 self.start.strftime(SNAP_DB_TS_FORMAT),
423 self.repeat))
424 self.created_count += 1
425 self.last = time
426 if not self.first:
427 self.first = time
428
429 UPDATE_INACTIVE = '''UPDATE schedules_meta
430 SET
431 active = 0
432 WHERE EXISTS(
433 SELECT id
434 FROM schedules s
435 WHERE s.id = schedules_meta.schedule_id
436 AND s.path = ?
437 AND schedules_meta.start = ?
438 AND schedules_meta.repeat = ?);'''
439
440 def set_inactive(self, db: sqlite3.Connection) -> None:
441 with db:
442 log.debug((f'Deactivating schedule ({self.repeat}, '
443 f'{self.start}) on path {self.path}'))
444 db.execute(self.UPDATE_INACTIVE,
445 (self.path,
446 self.start.strftime(SNAP_DB_TS_FORMAT),
447 self.repeat))
448 self.active = False
449
450 UPDATE_ACTIVE = '''UPDATE schedules_meta
451 SET
452 active = 1
453 WHERE EXISTS(
454 SELECT id
455 FROM schedules s
456 WHERE s.id = schedules_meta.schedule_id
457 AND s.path = ?
458 AND schedules_meta.start = ?
459 AND schedules_meta.repeat = ?);'''
460
461 def set_active(self, db: sqlite3.Connection) -> None:
462 with db:
463 log.debug(f'Activating schedule ({self.repeat}, {self.start}) '
464 f'on path {self.path}')
465 db.execute(self.UPDATE_ACTIVE,
466 (self.path,
467 self.start.strftime(SNAP_DB_TS_FORMAT),
468 self.repeat))
469 self.active = True
470
471 UPDATE_PRUNED = '''UPDATE schedules_meta
472 SET
473 last_pruned = ?,
474 pruned_count = pruned_count + ?
475 WHERE EXISTS(
476 SELECT id
477 FROM schedules s
478 WHERE s.id = schedules_meta.schedule_id
479 AND s.path = ?
480 AND schedules_meta.start = ?
481 AND schedules_meta.repeat = ?);'''
482
483 def update_pruned(self,
484 time: datetime,
485 db: sqlite3.Connection,
486 pruned: int) -> None:
487 with db:
488 db.execute(self.UPDATE_PRUNED,
489 (time.strftime(SNAP_DB_TS_FORMAT), pruned,
490 self.path,
491 self.start.strftime(SNAP_DB_TS_FORMAT),
492 self.repeat))
493 self.pruned_count += pruned
494 self.last_pruned = time