import errno
import time
from teuthology.exceptions import CommandFailedError
+from teuthology.contextutil import safe_while
import os
from tasks.cephfs.cephfs_test_case import CephFSTestCase
self.assertEqual(res['return_code'], expected)
def _get_scrub_status(self):
return self.fs.rank_tell(["scrub", "status"])
- def _check_task_status(self, expected_status):
- task_status = self.fs.get_task_status("scrub status")
- active = self.fs.get_active_names()
- log.debug("current active={0}".format(active))
- self.assertTrue(task_status[active[0]].startswith(expected_status))
+ def _check_task_status(self, expected_status, timo=120):
+ """ check scrub status for current active mds in ceph status """
+ with safe_while(sleep=1, tries=120, action='wait for task status') as proceed:
+ while proceed():
+ active = self.fs.get_active_names()
+ log.debug("current active={0}".format(active))
+ task_status = self.fs.get_task_status("scrub status")
+ try:
+ if task_status[active[0]].startswith(expected_status):
+ return True
+ except KeyError:
+ pass
+
+ def _check_task_status_na(self, timo=120):
+ """ check absence of scrub status in ceph status """
+ with safe_while(sleep=1, tries=120, action='wait for task status') as proceed:
+ while proceed():
+ active = self.fs.get_active_names()
+ log.debug("current active={0}".format(active))
+ task_status = self.fs.get_task_status("scrub status")
+ if not active[0] in task_status:
+ return True
+
+ def create_scrub_data(self, test_dir):
+ for i in range(32):
+ dirname = "dir.{0}".format(i)
+ dirpath = os.path.join(test_dir, dirname)
+ self.mount_a.run_shell_payload(f"""
+set -e
+mkdir -p {dirpath}
+for ((i = 0; i < 32; i++)); do
+ dd if=/dev/urandom of={dirpath}/filename.$i bs=1M conv=fdatasync count=1
+done
+""")
def test_scrub_abort(self):
test_dir = "scrub_control_test_path"
abs_test_path = "/{0}".format(test_dir)
- log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
- client_path = os.path.join(self.mount_a.mountpoint, test_dir)
- log.info("client_path: {0}".format(client_path))
-
- log.info("Cloning repo into place")
- TestScrubChecks.clone_repo(self.mount_a, client_path)
+ self.create_scrub_data(test_dir)
out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
self.assertNotEqual(out_json, None)
self.assertTrue("no active" in out_json['status'])
# sleep enough to fetch updated task status
- time.sleep(10)
- self._check_task_status("idle")
+ checked = self._check_task_status_na()
+ self.assertTrue(checked)
def test_scrub_pause_and_resume(self):
test_dir = "scrub_control_test_path"
client_path = os.path.join(self.mount_a.mountpoint, test_dir)
log.info("client_path: {0}".format(client_path))
- log.info("Cloning repo into place")
- _ = TestScrubChecks.clone_repo(self.mount_a, client_path)
+ self.create_scrub_data(test_dir)
out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
self.assertNotEqual(out_json, None)
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])
- # sleep enough to fetch updated task status
- time.sleep(10)
- self._check_task_status("paused")
+ checked = self._check_task_status("paused")
+ self.assertTrue(checked)
# resume and verify
self._resume_scrub(0)
out_json = self._get_scrub_status()
self.assertFalse("PAUSED" in out_json['status'])
+ checked = self._check_task_status_na()
+ self.assertTrue(checked)
+
def test_scrub_pause_and_resume_with_abort(self):
test_dir = "scrub_control_test_path"
abs_test_path = "/{0}".format(test_dir)
- log.info("mountpoint: {0}".format(self.mount_a.mountpoint))
- client_path = os.path.join(self.mount_a.mountpoint, test_dir)
- log.info("client_path: {0}".format(client_path))
-
- log.info("Cloning repo into place")
- _ = TestScrubChecks.clone_repo(self.mount_a, client_path)
+ self.create_scrub_data(test_dir)
out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
self.assertNotEqual(out_json, None)
out_json = self._get_scrub_status()
self.assertTrue("PAUSED" in out_json['status'])
- # sleep enough to fetch updated task status
- time.sleep(10)
- self._check_task_status("paused")
+ checked = self._check_task_status("paused")
+ self.assertTrue(checked)
# abort and verify
self._abort_scrub(0)
self.assertTrue("PAUSED" in out_json['status'])
self.assertTrue("0 inodes" in out_json['status'])
- # sleep enough to fetch updated task status
- time.sleep(10)
- self._check_task_status("paused")
+ # scrub status should still be paused...
+ checked = self._check_task_status("paused")
+ self.assertTrue(checked)
# resume and verify
self._resume_scrub(0)
out_json = self._get_scrub_status()
self.assertTrue("no active" in out_json['status'])
- # sleep enough to fetch updated task status
- time.sleep(10)
- self._check_task_status("idle")
+ checked = self._check_task_status_na()
+ self.assertTrue(checked)
def test_scrub_task_status_on_mds_failover(self):
- # sleep enough to fetch updated task status
- time.sleep(10)
-
(original_active, ) = self.fs.get_active_names()
original_standbys = self.mds_cluster.get_standby_daemons()
- self._check_task_status("idle")
+
+ test_dir = "scrub_control_test_path"
+ abs_test_path = "/{0}".format(test_dir)
+
+ self.create_scrub_data(test_dir)
+
+ out_json = self.fs.rank_tell(["scrub", "start", abs_test_path, "recursive"])
+ self.assertNotEqual(out_json, None)
+
+ # pause and verify
+ self._pause_scrub(0)
+ out_json = self._get_scrub_status()
+ self.assertTrue("PAUSED" in out_json['status'])
+
+ checked = self._check_task_status("paused")
+ self.assertTrue(checked)
# Kill the rank 0
self.fs.mds_stop(original_active)
original_standbys))
self.wait_until_true(promoted, timeout=grace*2)
- mgr_beacon_grace = float(self.fs.get_config("mgr_service_beacon_grace", service_type="mon"))
-
- def status_check():
- task_status = self.fs.get_task_status("scrub status")
- return original_active not in task_status
- self.wait_until_true(status_check, timeout=mgr_beacon_grace*2)
+ self._check_task_status_na()
class TestScrubChecks(CephFSTestCase):
"""