return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", *args)
def fs_snap_schedule_cmd(self, *args, **kwargs):
- fs = kwargs.pop('fs', self.volname)
- args += ('--fs', fs)
+ if 'fs' in kwargs:
+ fs = kwargs.pop('fs')
+ args += ('--fs', fs)
if 'format' in kwargs:
fmt = kwargs.pop('format')
args += ('--format', fmt)
snap_stats['fs_count'] = fs_count
snap_stats['db_count'] = db_count
+ log.debug(f'fs_count: {fs_count}')
+ log.debug(f'db_count: {db_count}')
+
return snap_stats
def verify_snap_stats(self, dir_path):
# remove snapshot schedule
self.fs_snap_schedule_cmd('remove', path="/bad-path")
+ def test_snap_schedule_for_number_of_snaps_retention(self):
+ """
+ Test that number of snaps retained are as per user spec.
+ """
+ total_snaps = 55
+ test_dir = '/' + TestSnapSchedules.TEST_DIRECTORY
+
+ self.mount_a.run_shell(['mkdir', '-p', test_dir[1:]])
+
+ # set a schedule on the dir
+ self.fs_snap_schedule_cmd('add', path=test_dir, snap_schedule='1M')
+ self.fs_snap_schedule_cmd('retention', 'add', path=test_dir,
+ retention_spec_or_period=f'{total_snaps}n')
+ exec_time = time.time()
+
+ timo_1, snap_sfx = self.calc_wait_time_and_snap_name(exec_time, '1M')
+
+ # verify snapshot schedule
+ self.verify_schedule(test_dir, ['1M'])
+
+ # we wait for total_snaps snaps to be taken
+ wait_time = timo_1 + total_snaps * 60 + 15
+ time.sleep(wait_time)
+
+ snap_stats = self.get_snap_stats(test_dir)
+ self.assertTrue(snap_stats['fs_count'] == total_snaps)
+ self.assertTrue(snap_stats['db_count'] >= total_snaps)
+
+ # remove snapshot schedule
+ self.fs_snap_schedule_cmd('remove', path=test_dir)
+
+ # remove all scheduled snapshots
+ self.remove_snapshots(test_dir[1:])
+
+ self.mount_a.run_shell(['rmdir', test_dir[1:]])
+
class TestSnapSchedulesSnapdir(TestSnapSchedulesHelper):
def remove_snapshots(self, dir_path, sdn):
self.remove_snapshots(TestSnapSchedulesSnapdir.TEST_DIRECTORY, sdn)
self.mount_a.run_shell(['rmdir', TestSnapSchedulesSnapdir.TEST_DIRECTORY])
+
+
+"""
+Note that the class TestSnapSchedulesMandatoryFSArgument tests snap-schedule
+commands only for multi-fs scenario. Commands for a single default fs should
+pass for tests defined above or elsewhere.
+"""
+
+
+class TestSnapSchedulesMandatoryFSArgument(TestSnapSchedulesHelper):
+ REQUIRE_BACKUP_FILESYSTEM = True
+ TEST_DIRECTORY = 'mandatory_fs_argument_test_dir'
+
+ def test_snap_schedule_without_fs_argument(self):
+ """Test command fails without --fs argument in presence of multiple fs"""
+ test_path = TestSnapSchedulesMandatoryFSArgument.TEST_DIRECTORY
+ self.mount_a.run_shell(['mkdir', '-p', test_path])
+
+ # try setting a schedule on the dir; this should fail now that we are
+ # working with mutliple fs; we need the --fs argument if there are more
+ # than one fs hosted by the same cluster
+ with self.assertRaises(CommandFailedError):
+ self.fs_snap_schedule_cmd('add', test_path, snap_schedule='1M')
+
+ self.mount_a.run_shell(['rmdir', test_path])
+
+ def test_snap_schedule_for_non_default_fs(self):
+ """Test command succes with --fs argument for non-default fs"""
+ test_path = TestSnapSchedulesMandatoryFSArgument.TEST_DIRECTORY
+ self.mount_a.run_shell(['mkdir', '-p', test_path])
+
+ # use the backup fs as the second fs; all these commands must pass
+ self.fs_snap_schedule_cmd('add', test_path, snap_schedule='1M', fs='backup_fs')
+ self.fs_snap_schedule_cmd('activate', test_path, snap_schedule='1M', fs='backup_fs')
+ self.fs_snap_schedule_cmd('retention', 'add', test_path, retention_spec_or_period='1M', fs='backup_fs')
+ self.fs_snap_schedule_cmd('list', test_path, fs='backup_fs', format='json')
+ self.fs_snap_schedule_cmd('status', test_path, fs='backup_fs', format='json')
+ self.fs_snap_schedule_cmd('retention', 'remove', test_path, retention_spec_or_period='1M', fs='backup_fs')
+ self.fs_snap_schedule_cmd('deactivate', test_path, snap_schedule='1M', fs='backup_fs')
+ self.fs_snap_schedule_cmd('remove', test_path, snap_schedule='1M', fs='backup_fs')
+
+ self.mount_a.run_shell(['rmdir', test_path])