def _fs_cmd(self, *args):
return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", *args)
- def fs_snap_schedule_cmd(self, *args):
- args = list(args)
- args.append(f'fs={self.volname}')
+ def fs_snap_schedule_cmd(self, *args, **kwargs):
+ fs = kwargs.pop('fs', self.volname)
+ args += ('--fs', fs)
+ for name, val in kwargs.items():
+ args += (f'--{name}', str(val))
res = self._fs_cmd('snap-schedule', *args)
log.debug(f'res={res}')
return res
def verify_schedule(self, dir_path, schedules, retentions=[]):
log.debug(f'expected_schedule: {schedules}, expected_retention: {retentions}')
- result = self.fs_snap_schedule_cmd('list', f'path={dir_path}', 'format=json')
+ result = self.fs_snap_schedule_cmd('list', path=dir_path, format='json')
json_res = json.loads(result)
log.debug(f'json_res: {json_res}')
def test_non_existent_snap_schedule_list(self):
"""Test listing snap schedules on a non-existing filesystem path failure"""
try:
- self.fs_snap_schedule_cmd('list', f'path={TestSnapSchedules.TEST_DIRECTORY}', 'format=json')
+ self.fs_snap_schedule_cmd('list', path=TestSnapSchedules.TEST_DIRECTORY)
except CommandFailedError as ce:
if ce.exitstatus != errno.ENOENT:
raise RuntimeError('incorrect errno when listing a non-existing snap schedule')
self.mount_a.run_shell(['mkdir', '-p', TestSnapSchedules.TEST_DIRECTORY])
try:
- self.fs_snap_schedule_cmd('list', f'path={TestSnapSchedules.TEST_DIRECTORY}', 'format=json')
+ self.fs_snap_schedule_cmd('list', path=TestSnapSchedules.TEST_DIRECTORY)
except CommandFailedError as ce:
if ce.exitstatus != errno.ENOENT:
raise RuntimeError('incorrect errno when listing a non-existing snap schedule')
"""Test listing snap schedules post removal of a schedule"""
self.mount_a.run_shell(['mkdir', '-p', TestSnapSchedules.TEST_DIRECTORY])
- self.fs_snap_schedule_cmd('add', f'path={TestSnapSchedules.TEST_DIRECTORY}', 'snap-schedule=1h')
+ self.fs_snap_schedule_cmd('add', path=TestSnapSchedules.TEST_DIRECTORY, snap_schedule='1h')
- self.fs_snap_schedule_cmd('remove', f'path={TestSnapSchedules.TEST_DIRECTORY}')
+ self.fs_snap_schedule_cmd('remove', path=TestSnapSchedules.TEST_DIRECTORY)
try:
- self.fs_snap_schedule_cmd('list', f'path={TestSnapSchedules.TEST_DIRECTORY}', 'format=json')
+ self.fs_snap_schedule_cmd('list', path=TestSnapSchedules.TEST_DIRECTORY)
except CommandFailedError as ce:
if ce.exitstatus != errno.ENOENT:
raise RuntimeError('incorrect errno when listing a non-existing snap schedule')
self.mount_a.run_shell(['mkdir', '-p', TestSnapSchedules.TEST_DIRECTORY])
# set a schedule on the dir
- self.fs_snap_schedule_cmd('add', f'path={TestSnapSchedules.TEST_DIRECTORY}', 'snap-schedule=1M')
+ self.fs_snap_schedule_cmd('add', path=TestSnapSchedules.TEST_DIRECTORY, snap_schedule='1M')
exec_time = time.time()
timo, snap_sfx = self.calc_wait_time_and_snap_name(exec_time, '1M')
self.assert_if_not_verified()
# remove snapshot schedule
- self.fs_snap_schedule_cmd('remove', f'path={TestSnapSchedules.TEST_DIRECTORY}')
+ self.fs_snap_schedule_cmd('remove', path=TestSnapSchedules.TEST_DIRECTORY)
# remove all scheduled snapshots
self.remove_snapshots(TestSnapSchedules.TEST_DIRECTORY)
self.mount_a.run_shell(['mkdir', '-p', TestSnapSchedules.TEST_DIRECTORY])
# set schedules on the dir
- self.fs_snap_schedule_cmd('add', f'path={TestSnapSchedules.TEST_DIRECTORY}', 'snap-schedule=1M')
- self.fs_snap_schedule_cmd('add', f'path={TestSnapSchedules.TEST_DIRECTORY}', 'snap-schedule=2M')
+ self.fs_snap_schedule_cmd('add', path=TestSnapSchedules.TEST_DIRECTORY, snap_schedule='1M')
+ self.fs_snap_schedule_cmd('add', path=TestSnapSchedules.TEST_DIRECTORY, snap_schedule='2M')
exec_time = time.time()
timo_1, snap_sfx_1 = self.calc_wait_time_and_snap_name(exec_time, '1M')
self.assert_if_not_verified()
# remove snapshot schedule
- self.fs_snap_schedule_cmd('remove', f'path={TestSnapSchedules.TEST_DIRECTORY}')
+ self.fs_snap_schedule_cmd('remove', path=TestSnapSchedules.TEST_DIRECTORY)
# remove all scheduled snapshots
self.remove_snapshots(TestSnapSchedules.TEST_DIRECTORY)
self.mount_a.run_shell(['mkdir', '-p', TestSnapSchedules.TEST_DIRECTORY])
# set a schedule on the dir
- self.fs_snap_schedule_cmd('add', f'path={TestSnapSchedules.TEST_DIRECTORY}', 'snap-schedule=1M')
- self.fs_snap_schedule_cmd('retention', 'add', f'path={TestSnapSchedules.TEST_DIRECTORY}', 'retention-spec-or-period=1M')
+ self.fs_snap_schedule_cmd('add', path=TestSnapSchedules.TEST_DIRECTORY, snap_schedule='1M')
+ self.fs_snap_schedule_cmd('retention', 'add', path=TestSnapSchedules.TEST_DIRECTORY, retention_spec_or_period='1M')
exec_time = time.time()
timo_1, snap_sfx = self.calc_wait_time_and_snap_name(exec_time, '1M')
self.assert_if_not_verified()
# remove snapshot schedule
- self.fs_snap_schedule_cmd('remove', f'path={TestSnapSchedules.TEST_DIRECTORY}')
+ self.fs_snap_schedule_cmd('remove', path=TestSnapSchedules.TEST_DIRECTORY)
# remove all scheduled snapshots
self.remove_snapshots(TestSnapSchedules.TEST_DIRECTORY)