import traceback
-MAX_SNAPS_PER_PATH = 50
SNAP_SCHEDULE_NAMESPACE = 'cephfs-snap-schedule'
SNAP_DB_PREFIX = 'snap_db'
# increment this every time the db schema changes and provide upgrade code
def get_prune_set(candidates: Set[Tuple[cephfs.DirEntry, datetime]],
- retention: Dict[str, int]) -> Set:
+ retention: Dict[str, int],
+ max_snaps_to_retain: int) -> Set:
PRUNING_PATTERNS = OrderedDict([
# n is for keep last n snapshots, uses the snapshot name timestamp
# format for lowest granularity
])
keep = []
if not retention:
- log.info(f'no retention set, assuming n: {MAX_SNAPS_PER_PATH}')
- retention = {'n': MAX_SNAPS_PER_PATH}
+ log.info(f'no retention set, assuming n: {max_snaps_to_retain}')
+ retention = {'n': max_snaps_to_retain}
for period, date_pattern in PRUNING_PATTERNS.items():
log.debug(f'compiling keep set for period {period}')
period_count = retention.get(period, 0)
log.debug(('found enough snapshots for '
f'{period_count}{period}'))
break
- if len(keep) > MAX_SNAPS_PER_PATH:
- log.info((f'Would keep more then {MAX_SNAPS_PER_PATH}, '
- 'pruning keep set'))
- keep = keep[:MAX_SNAPS_PER_PATH]
+ if len(keep) > max_snaps_to_retain:
+ log.info(f'Pruning keep set; would retain first {max_snaps_to_retain}'
+ f' out of {len(keep)} snaps')
+ keep = keep[:max_snaps_to_retain]
return candidates - set(keep)
def snap_name_to_timestamp(scheduled_snap_name: str) -> str:
path = sched.path
prune_candidates = set()
time = datetime.now(timezone.utc)
+ mds_max_snaps_per_dir = self.mgr.get_ceph_option('mds_max_snaps_per_dir')
with open_filesystem(self, sched.fs) as fs_handle:
snap_dir = self.mgr.rados.conf_get('client_snapdir')
with fs_handle.opendir(f'{path}/{snap_dir}') as d_handle:
else:
log.debug(f'skipping dir entry {dir_.d_name}')
dir_ = fs_handle.readdir(d_handle)
- to_prune = get_prune_set(prune_candidates, ret)
+ # Limit ourselves to one snapshot less than allowed by config to allow for
+ # snapshot creation before pruning
+ to_prune = get_prune_set(prune_candidates, ret, mds_max_snaps_per_dir - 1)
for k in to_prune:
dirname = k[0].d_name.decode('utf-8')
log.debug(f'rmdir on {dirname}')