]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/ceph_manager.py
update sources to 12.2.7
[ceph.git] / ceph / qa / tasks / ceph_manager.py
index 465da73d2358c3df21d8916b3cfe6b312426ec72..969cd6e23c5d1c7242a0f405df9a99672b982f06 100644 (file)
@@ -111,12 +111,12 @@ class Thrasher:
         self.stopping = False
         self.logger = logger
         self.config = config
-        self.revive_timeout = self.config.get("revive_timeout", 150)
+        self.revive_timeout = self.config.get("revive_timeout", 360)
         self.pools_to_fix_pgp_num = set()
         if self.config.get('powercycle'):
             self.revive_timeout += 120
         self.clean_wait = self.config.get('clean_wait', 0)
-        self.minin = self.config.get("min_in", 3)
+        self.minin = self.config.get("min_in", 4)
         self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
         self.sighup_delay = self.config.get('sighup_delay')
         self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
@@ -125,6 +125,8 @@ class Thrasher:
         self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
         self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
         self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
+        self.random_eio = self.config.get('random_eio')
+        self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
 
         num_osds = self.in_osds + self.out_osds
         self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * num_osds
@@ -284,6 +286,7 @@ class Thrasher:
                                         pg=pg,
                                         id=exp_osd))
             # export
+            # Can't use new export-remove op since this is part of upgrade testing
             cmd = prefix + "--op export --pgid {pg} --file {file}"
             cmd = cmd.format(id=exp_osd, pg=pg, file=exp_path)
             proc = exp_remote.run(args=cmd)
@@ -292,7 +295,7 @@ class Thrasher:
                                 "export failure with status {ret}".
                                 format(ret=proc.exitstatus))
             # remove
-            cmd = prefix + "--op remove --pgid {pg}"
+            cmd = prefix + "--force --op remove --pgid {pg}"
             cmd = cmd.format(id=exp_osd, pg=pg)
             proc = exp_remote.run(args=cmd)
             if proc.exitstatus:
@@ -435,6 +438,12 @@ class Thrasher:
             skip_admin_check=skip_admin_check)
         self.dead_osds.remove(osd)
         self.live_osds.append(osd)
+        if self.random_eio > 0 and osd is self.rerrosd:
+            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+                          'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
+            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+                          'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
+
 
     def out_osd(self, osd=None):
         """
@@ -596,6 +605,46 @@ class Thrasher:
         except CommandFailedError:
             self.log('Failed to rm-pg-upmap-items, ignoring')
 
+    def force_recovery(self):
+        """
+        Force recovery on some of PGs
+        """
+        backfill = random.random() >= 0.5
+        j = self.ceph_manager.get_pgids_to_force(backfill)
+        if j:
+            try:
+                if backfill:
+                    self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
+                else:
+                    self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
+            except CommandFailedError:
+                self.log('Failed to force backfill|recovery, ignoring')
+
+
+    def cancel_force_recovery(self):
+        """
+        Force recovery on some of PGs
+        """
+        backfill = random.random() >= 0.5
+        j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
+        if j:
+            try:
+                if backfill:
+                    self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
+                else:
+                    self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
+            except CommandFailedError:
+                self.log('Failed to force backfill|recovery, ignoring')
+
+    def force_cancel_recovery(self):
+        """
+        Force or cancel forcing recovery
+        """
+        if random.random() >= 0.4:
+           self.force_recovery()
+        else:
+           self.cancel_force_recovery()
+
     def all_up(self):
         """
         Make sure all osds are up and not out.
@@ -726,7 +775,7 @@ class Thrasher:
         osd_debug_skip_full_check_in_backfill_reservation to force
         the more complicated check in do_scan to be exercised.
 
-        Then, verify that all backfills stop.
+        Then, verify that all backfillings stop.
         """
         self.log("injecting backfill full")
         for i in self.live_osds:
@@ -738,13 +787,13 @@ class Thrasher:
                                      check_status=True, timeout=30, stdout=DEVNULL)
         for i in range(30):
             status = self.ceph_manager.compile_pg_status()
-            if 'backfill' not in status.keys():
+            if 'backfilling' not in status.keys():
                 break
             self.log(
-                "waiting for {still_going} backfills".format(
-                    still_going=status.get('backfill')))
+                "waiting for {still_going} backfillings".format(
+                    still_going=status.get('backfilling')))
             time.sleep(1)
-        assert('backfill' not in self.ceph_manager.compile_pg_status().keys())
+        assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
         for i in self.live_osds:
             self.ceph_manager.set_config(
                 i,
@@ -765,7 +814,7 @@ class Thrasher:
         while len(self.in_osds) < (self.minin + 1):
             self.in_osd()
         self.log("Waiting for recovery")
-        self.ceph_manager.wait_for_all_up(
+        self.ceph_manager.wait_for_all_osds_up(
             timeout=self.config.get('timeout')
             )
         # now we wait 20s for the pg status to change, if it takes longer,
@@ -834,6 +883,8 @@ class Thrasher:
             actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
         if self.chance_thrash_pg_upmap_items > 0:
             actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
+        if self.chance_force_recovery > 0:
+            actions.append((self.force_cancel_recovery, self.chance_force_recovery))
 
         for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
             for scenario in [
@@ -955,6 +1006,12 @@ class Thrasher:
         scrubint = self.config.get("scrub_interval", -1)
         maxdead = self.config.get("max_dead", 0)
         delay = self.config.get("op_delay", 5)
+        self.rerrosd = self.live_osds[0]
+        if self.random_eio > 0:
+            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+                          'injectargs', '--', '--filestore_debug_random_read_err='+str(self.random_eio))
+            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+                          'injectargs', '--', '--bluestore_debug_random_read_err='+str(self.random_eio))
         self.log("starting do_thrash")
         while not self.stopping:
             to_log = [str(x) for x in ["in_osds: ", self.in_osds,
@@ -982,6 +1039,12 @@ class Thrasher:
                         Scrubber(self.ceph_manager, self.config)
             self.choose_action()()
             time.sleep(delay)
+        self.all_up()
+        if self.random_eio > 0:
+            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+                          'injectargs', '--', '--filestore_debug_random_read_err=0.0')
+            self.ceph_manager.raw_cluster_cmd('tell', 'osd.'+str(self.rerrosd),
+                          'injectargs', '--', '--bluestore_debug_random_read_err=0.0')
         for pool in list(self.pools_to_fix_pgp_num):
             if self.ceph_manager.get_pool_pg_num(pool) > 0:
                 self.fix_pgp_num(pool)
@@ -1056,6 +1119,7 @@ class ObjectStoreTool:
         finally:
             if self.do_revive:
                 self.manager.revive_osd(self.osd)
+                self.manager.wait_till_osd_is_up(self.osd, 300)
 
 
 class CephManager:
@@ -1159,7 +1223,7 @@ class CephManager:
                   "-w"],
             wait=False, stdout=StringIO(), stdin=run.PIPE)
 
-    def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=3*5):
+    def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
         """
         Flush pg stats from a list of OSD ids, ensuring they are reflected
         all the way to the monitor.  Luminous and later only.
@@ -1171,7 +1235,7 @@ class CephManager:
                         stat seq from monitor anymore. in that case, you need
                         to pass a blacklist.
         :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
-                             it. (3 * mon_mgr_digest_period, by default)
+                             it. (5 min by default)
         """
         seq = {osd: self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats')
                for osd in osds}
@@ -1373,7 +1437,7 @@ class CephManager:
         # both osd_mon_report_interval_min and mgr_stats_period are 5 seconds
         # by default, and take the faulty injection in ms into consideration,
         # 12 seconds are more than enough
-        delays = [1, 1, 2, 3, 5, 8, 13]
+        delays = [1, 1, 2, 3, 5, 8, 13, 0]
         @wraps(func)
         def wrapper(self, *args, **kwargs):
             exc = None
@@ -1602,6 +1666,10 @@ class CephManager:
                     'osd', 'pool', 'set', pool_name,
                     'allow_ec_overwrites',
                     'true')
+            self.raw_cluster_cmd(
+                'osd', 'pool', 'application', 'enable',
+                pool_name, 'rados', '--yes-i-really-mean-it',
+                run.Raw('||'), 'true')
             self.pools[pool_name] = pg_num
         time.sleep(1)
 
@@ -1764,6 +1832,40 @@ class CephManager:
         j = json.loads('\n'.join(out.split('\n')[1:]))
         return j['pg_stats']
 
+    def get_pgids_to_force(self, backfill):
+        """
+        Return the randomized list of PGs that can have their recovery/backfill forced
+        """
+        j = self.get_pg_stats();
+        pgids = []
+        if backfill:
+            wanted = ['degraded', 'backfilling', 'backfill_wait']
+        else:
+            wanted = ['recovering', 'degraded', 'recovery_wait']
+        for pg in j:
+            status = pg['state'].split('+')
+            for t in wanted:
+                if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
+                    pgids.append(pg['pgid'])
+                    break
+        return pgids
+
+    def get_pgids_to_cancel_force(self, backfill):
+       """
+       Return the randomized list of PGs whose recovery/backfill priority is forced
+       """
+       j = self.get_pg_stats();
+       pgids = []
+       if backfill:
+           wanted = 'forced_backfill'
+       else:
+           wanted = 'forced_recovery'
+       for pg in j:
+           status = pg['state'].split('+')
+           if wanted in status and random.random() > 0.5:
+               pgids.append(pg['pgid'])
+       return pgids
+
     def compile_pg_status(self):
         """
         Return a histogram of pg state values
@@ -1896,6 +1998,10 @@ class CephManager:
         """
         return self.get_osd_dump_json()['osds']
 
+    def get_mgr_dump(self):
+        out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
+        return json.loads(out)
+
     def get_stuck_pgs(self, type_, threshold):
         """
         :returns: stuck pg information from the cluster
@@ -1945,7 +2051,7 @@ class CephManager:
         for pg in pgs:
             if (pg['state'].count('active') and
                     not pg['state'].count('recover') and
-                    not pg['state'].count('backfill') and
+                    not pg['state'].count('backfilling') and
                     not pg['state'].count('stale')):
                 num += 1
         return num
@@ -2054,7 +2160,7 @@ class CephManager:
         x = self.get_osd_dump()
         return (len(x) == sum([(y['up'] > 0) for y in x]))
 
-    def wait_for_all_up(self, timeout=None):
+    def wait_for_all_osds_up(self, timeout=None):
         """
         When this exits, either the timeout has expired, or all
         osds are up.
@@ -2064,10 +2170,45 @@ class CephManager:
         while not self.are_all_osds_up():
             if timeout is not None:
                 assert time.time() - start < timeout, \
-                    'timeout expired in wait_for_all_up'
+                    'timeout expired in wait_for_all_osds_up'
             time.sleep(3)
         self.log("all up!")
 
+    def pool_exists(self, pool):
+        if pool in self.list_pools():
+            return True
+        return False
+
+    def wait_for_pool(self, pool, timeout=300):
+        """
+        Wait for a pool to exist
+        """
+        self.log('waiting for pool %s to exist' % pool)
+        start = time.time()
+        while not self.pool_exists(pool):
+            if timeout is not None:
+                assert time.time() - start < timeout, \
+                    'timeout expired in wait_for_pool'
+            time.sleep(3)
+
+    def wait_for_pools(self, pools):
+        for pool in pools:
+            self.wait_for_pool(pool)
+
+    def is_mgr_available(self):
+        x = self.get_mgr_dump()
+        return x.get('available', False)
+
+    def wait_for_mgr_available(self, timeout=None):
+        self.log("waiting for mgr available")
+        start = time.time()
+        while not self.is_mgr_available():
+            if timeout is not None:
+                assert time.time() - start < timeout, \
+                    'timeout expired in wait_for_mgr_available'
+            time.sleep(3)
+        self.log("mgr available!")
+
     def wait_for_recovery(self, timeout=None):
         """
         Check peering. When this exists, we have recovered.
@@ -2084,6 +2225,8 @@ class CephManager:
                 else:
                     self.log("no progress seen, keeping timeout for now")
                     if now - start >= timeout:
+                       if self.is_recovered():
+                           break
                         self.log('dumping pgs')
                         out = self.raw_cluster_cmd('pg', 'dump')
                         self.log(out)
@@ -2184,6 +2327,30 @@ class CephManager:
             time.sleep(3)
         self.log("active!")
 
+    def wait_till_pg_convergence(self, timeout=None):
+        start = time.time()
+        old_stats = None
+        active_osds = [osd['osd'] for osd in self.get_osd_dump()
+                       if osd['in'] and osd['up']]
+        while True:
+            # strictly speaking, no need to wait for mon. but due to the
+            # "ms inject socket failures" setting, the osdmap could be delayed,
+            # so mgr is likely to ignore the pg-stat messages with pgs serving
+            # newly created pools which is not yet known by mgr. so, to make sure
+            # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
+            # necessary.
+            self.flush_pg_stats(active_osds)
+            new_stats = dict((stat['pgid'], stat['state'])
+                             for stat in self.get_pg_stats())
+            if old_stats == new_stats:
+                return old_stats
+            if timeout is not None:
+                assert time.time() - start < timeout, \
+                    'failed to reach convergence before %d secs' % timeout
+            old_stats = new_stats
+            # longer than mgr_stats_period
+            time.sleep(5 + 1)
+
     def mark_out_osd(self, osd):
         """
         Wrapper to mark osd out.
@@ -2235,7 +2402,7 @@ class CephManager:
         time.sleep(2)
         self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
 
-    def revive_osd(self, osd, timeout=150, skip_admin_check=False):
+    def revive_osd(self, osd, timeout=360, skip_admin_check=False):
         """
         Revive osds by either power cycling (if indicated by the config)
         or by restarting.
@@ -2425,5 +2592,8 @@ kill_mon = utility_task("kill_mon")
 create_pool = utility_task("create_pool")
 remove_pool = utility_task("remove_pool")
 wait_for_clean = utility_task("wait_for_clean")
+flush_all_pg_stats = utility_task("flush_all_pg_stats")
 set_pool_property = utility_task("set_pool_property")
 do_pg_scrub = utility_task("do_pg_scrub")
+wait_for_pool = utility_task("wait_for_pool")
+wait_for_pools = utility_task("wait_for_pools")