]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/snap_schedule/tests/fs/test_schedule_client.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / snap_schedule / tests / fs / test_schedule_client.py
1 from datetime import datetime, timedelta
2 from unittest.mock import MagicMock
3 import pytest
4 from ...fs.schedule_client import get_prune_set, SNAPSHOT_TS_FORMAT
5
6
7 class TestScheduleClient(object):
8
9 def test_get_prune_set_empty_retention_no_prune(self):
10 now = datetime.now()
11 candidates = set()
12 for i in range(10):
13 ts = now - timedelta(minutes=i*5)
14 fake_dir = MagicMock()
15 fake_dir.d_name = f'scheduled-{ts.strftime(SNAPSHOT_TS_FORMAT)}'
16 candidates.add((fake_dir, ts))
17 ret = {}
18 prune_set = get_prune_set(candidates, ret, 99)
19 assert prune_set == set(), 'candidates are pruned despite empty retention'
20
21 def test_get_prune_set_two_retention_specs(self):
22 now = datetime.now()
23 candidates = set()
24 for i in range(10):
25 ts = now - timedelta(hours=i*1)
26 fake_dir = MagicMock()
27 fake_dir.d_name = f'scheduled-{ts.strftime(SNAPSHOT_TS_FORMAT)}'
28 candidates.add((fake_dir, ts))
29 for i in range(10):
30 ts = now - timedelta(days=i*1)
31 fake_dir = MagicMock()
32 fake_dir.d_name = f'scheduled-{ts.strftime(SNAPSHOT_TS_FORMAT)}'
33 candidates.add((fake_dir, ts))
34 # should keep 8 snapshots
35 ret = {'h': 6, 'd': 2}
36 prune_set = get_prune_set(candidates, ret, 99)
37 assert len(prune_set) == len(candidates) - 8, 'wrong size of prune set'