]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/cephfs/test_full.py
import 15.2.4
[ceph.git] / ceph / qa / tasks / cephfs / test_full.py
index 9395612e48f44cdf94da8f2680bcf2ac987a3336..eaa36c7c9d62fbc9384c357a850e7587e254cdb3 100644 (file)
@@ -1,10 +1,13 @@
-
-
 import json
 import logging
 import os
 from textwrap import dedent
 import time
+try:
+    from typing import Optional
+except:
+    # make it work for python2
+    pass
 from teuthology.orchestra.run import CommandFailedError
 from tasks.cephfs.fuse_mount import FuseMount
 from tasks.cephfs.cephfs_test_case import CephFSTestCase
@@ -20,7 +23,7 @@ class FullnessTestCase(CephFSTestCase):
     data_only = False
 
     # Subclasses define how many bytes should be written to achieve fullness
-    pool_capacity = None
+    pool_capacity = None  # type: Optional[int]
     fill_mb = None
 
     # Subclasses define what fullness means to them
@@ -30,21 +33,10 @@ class FullnessTestCase(CephFSTestCase):
     def setUp(self):
         CephFSTestCase.setUp(self)
 
-        # These tests just use a single active MDS throughout, so remember its ID
-        # for use in mds_asok calls
-        self.active_mds_id = self.fs.get_active_names()[0]
+        mds_status = self.fs.rank_asok(["status"])
 
         # Capture the initial OSD map epoch for later use
-        self.initial_osd_epoch = json.loads(
-            self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json").strip()
-        )['epoch']
-
-        # Check the initial barrier epoch on the MDS: this should be
-        # set to the latest map at MDS startup.  We do this check in
-        # setUp to get in there before subclasses might touch things
-        # in their own setUp functions.
-        self.assertGreaterEqual(self.fs.mds_asok(["status"], mds_id=self.active_mds_id)['osdmap_epoch_barrier'],
-                                self.initial_osd_epoch)
+        self.initial_osd_epoch = mds_status['osdmap_epoch_barrier']
 
     def test_barrier(self):
         """
@@ -53,18 +45,26 @@ class FullnessTestCase(CephFSTestCase):
         epoch.
         """
 
-        # Sync up clients with initial MDS OSD map barrier
-        self.mount_a.open_no_data("foo")
-        self.mount_b.open_no_data("bar")
+        # script that sync up client with MDS OSD map barrier. The barrier should
+        # be updated by cap flush ack message.
+        pyscript = dedent("""
+            import os
+            fd = os.open("{path}", os.O_CREAT | os.O_RDWR, 0O600)
+            os.fchmod(fd, 0O666)
+            os.fsync(fd)
+            os.close(fd)
+            """)
+
+        # Sync up client with initial MDS OSD map barrier.
+        path = os.path.join(self.mount_a.mountpoint, "foo")
+        self.mount_a.run_python(pyscript.format(path=path))
 
         # Grab mounts' initial OSD epochs: later we will check that
         # it hasn't advanced beyond this point.
-        mount_a_initial_epoch = self.mount_a.get_osd_epoch()[0]
-        mount_b_initial_epoch = self.mount_b.get_osd_epoch()[0]
+        mount_a_initial_epoch, mount_a_initial_barrier = self.mount_a.get_osd_epoch()
 
         # Freshly mounted at start of test, should be up to date with OSD map
         self.assertGreaterEqual(mount_a_initial_epoch, self.initial_osd_epoch)
-        self.assertGreaterEqual(mount_b_initial_epoch, self.initial_osd_epoch)
 
         # Set and unset a flag to cause OSD epoch to increment
         self.fs.mon_manager.raw_cluster_cmd("osd", "set", "pause")
@@ -77,43 +77,28 @@ class FullnessTestCase(CephFSTestCase):
         # Do a metadata operation on clients, witness that they end up with
         # the old OSD map from startup time (nothing has prompted client
         # to update its map)
-        self.mount_a.open_no_data("alpha")
-        self.mount_b.open_no_data("bravo1")
-
-        # Sleep long enough that if the OSD map was propagating it would
-        # have done so (this is arbitrary because we are 'waiting' for something
-        # to *not* happen).
-        time.sleep(30)
-
+        path = os.path.join(self.mount_a.mountpoint, "foo")
+        self.mount_a.run_python(pyscript.format(path=path))
         mount_a_epoch, mount_a_barrier = self.mount_a.get_osd_epoch()
         self.assertEqual(mount_a_epoch, mount_a_initial_epoch)
-        mount_b_epoch, mount_b_barrier = self.mount_b.get_osd_epoch()
-        self.assertEqual(mount_b_epoch, mount_b_initial_epoch)
+        self.assertEqual(mount_a_barrier, mount_a_initial_barrier)
 
         # Set a barrier on the MDS
-        self.fs.mds_asok(["osdmap", "barrier", new_epoch.__str__()], mds_id=self.active_mds_id)
+        self.fs.rank_asok(["osdmap", "barrier", new_epoch.__str__()])
 
-        # Do an operation on client B, witness that it ends up with
-        # the latest OSD map from the barrier.  This shouldn't generate any
-        # cap revokes to A because B was already the last one to touch
-        # a file in root.
-        self.mount_b.run_shell(["touch", "bravo2"])
-        self.mount_b.open_no_data("bravo2")
+        # Sync up client with new MDS OSD map barrier
+        path = os.path.join(self.mount_a.mountpoint, "baz")
+        self.mount_a.run_python(pyscript.format(path=path))
+        mount_a_epoch, mount_a_barrier = self.mount_a.get_osd_epoch()
+        self.assertEqual(mount_a_barrier, new_epoch)
 
         # Some time passes here because the metadata part of the operation
         # completes immediately, while the resulting OSD map update happens
         # asynchronously (it's an Objecter::_maybe_request_map) as a result
         # of seeing the new epoch barrier.
-        self.wait_until_equal(
-            lambda: self.mount_b.get_osd_epoch(),
-            (new_epoch, new_epoch),
-            30,
-            lambda x: x[0] > new_epoch or x[1] > new_epoch)
-
-        # ...and none of this should have affected the oblivious mount a,
-        # because it wasn't doing any data or metadata IO
-        mount_a_epoch, mount_a_barrier = self.mount_a.get_osd_epoch()
-        self.assertEqual(mount_a_epoch, mount_a_initial_epoch)
+        self.wait_until_true(
+            lambda: self.mount_a.get_osd_epoch()[0] >= new_epoch,
+            timeout=30)
 
     def _data_pool_name(self):
         data_pool_names = self.fs.get_data_pool_names()
@@ -134,26 +119,26 @@ class FullnessTestCase(CephFSTestCase):
                           the failed write.
         """
 
-        osd_mon_report_interval_max = int(self.fs.get_config("osd_mon_report_interval_max", service_type='osd'))
+        osd_mon_report_interval = int(self.fs.get_config("osd_mon_report_interval", service_type='osd'))
 
         log.info("Writing {0}MB should fill this cluster".format(self.fill_mb))
 
         # Fill up the cluster.  This dd may or may not fail, as it depends on
         # how soon the cluster recognises its own fullness
-        self.mount_a.write_n_mb("large_file_a", self.fill_mb / 2)
+        self.mount_a.write_n_mb("large_file_a", self.fill_mb // 2)
         try:
-            self.mount_a.write_n_mb("large_file_b", self.fill_mb / 2)
+            self.mount_a.write_n_mb("large_file_b", self.fill_mb // 2)
         except CommandFailedError:
             log.info("Writing file B failed (full status happened already)")
             assert self.is_full()
         else:
             log.info("Writing file B succeeded (full status will happen soon)")
             self.wait_until_true(lambda: self.is_full(),
-                                 timeout=osd_mon_report_interval_max * 5)
+                                 timeout=osd_mon_report_interval * 5)
 
         # Attempting to write more data should give me ENOSPC
         with self.assertRaises(CommandFailedError) as ar:
-            self.mount_a.write_n_mb("large_file_b", 50, seek=self.fill_mb / 2)
+            self.mount_a.write_n_mb("large_file_b", 50, seek=self.fill_mb // 2)
         self.assertEqual(ar.exception.exitstatus, 1)  # dd returns 1 on "No space"
 
         # Wait for the MDS to see the latest OSD map so that it will reliably
@@ -161,7 +146,7 @@ class FullnessTestCase(CephFSTestCase):
         # while in the full state.
         osd_epoch = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['epoch']
         self.wait_until_true(
-            lambda: self.fs.mds_asok(['status'], mds_id=self.active_mds_id)['osdmap_epoch'] >= osd_epoch,
+            lambda: self.fs.rank_asok(['status'])['osdmap_epoch'] >= osd_epoch,
             timeout=10)
 
         if not self.data_only:
@@ -184,13 +169,13 @@ class FullnessTestCase(CephFSTestCase):
         # * The MDS to purge the stray folder and execute object deletions
         #  * The OSDs to inform the mon that they are no longer full
         self.wait_until_true(lambda: not self.is_full(),
-                             timeout=osd_mon_report_interval_max * 5)
+                             timeout=osd_mon_report_interval * 5)
 
         # Wait for the MDS to see the latest OSD map so that it will reliably
         # be applying the free space policy
         osd_epoch = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['epoch']
         self.wait_until_true(
-            lambda: self.fs.mds_asok(['status'], mds_id=self.active_mds_id)['osdmap_epoch'] >= osd_epoch,
+            lambda: self.fs.rank_asok(['status'])['osdmap_epoch'] >= osd_epoch,
             timeout=10)
 
         # Now I should be able to write again
@@ -212,7 +197,7 @@ class FullnessTestCase(CephFSTestCase):
         file_path = os.path.join(self.mount_a.mountpoint, "full_test_file")
 
         # Enough to trip the full flag
-        osd_mon_report_interval_max = int(self.fs.get_config("osd_mon_report_interval_max", service_type='osd'))
+        osd_mon_report_interval = int(self.fs.get_config("osd_mon_report_interval", service_type='osd'))
         mon_tick_interval = int(self.fs.get_config("mon_tick_interval", service_type="mon"))
 
         # Sufficient data to cause RADOS cluster to go 'full'
@@ -222,13 +207,13 @@ class FullnessTestCase(CephFSTestCase):
         # (report_interval for mon to learn PG stats, tick interval for it to update OSD map,
         #  factor of 1.5 for I/O + network latency in committing OSD map and distributing it
         #  to the OSDs)
-        full_wait = (osd_mon_report_interval_max + mon_tick_interval) * 1.5
+        full_wait = (osd_mon_report_interval + mon_tick_interval) * 1.5
 
         # Configs for this test should bring this setting down in order to
         # run reasonably quickly
-        if osd_mon_report_interval_max > 10:
-            log.warn("This test may run rather slowly unless you decrease"
-                     "osd_mon_report_interval_max (5 is a good setting)!")
+        if osd_mon_report_interval > 10:
+            log.warning("This test may run rather slowly unless you decrease"
+                     "osd_mon_report_interval (5 is a good setting)!")
 
         self.mount_a.run_python(template.format(
             fill_mb=self.fill_mb,
@@ -247,12 +232,12 @@ class FullnessTestCase(CephFSTestCase):
             import os
 
             # Write some buffered data through before going full, all should be well
-            print "writing some data through which we expect to succeed"
+            print("writing some data through which we expect to succeed")
             bytes = 0
             f = os.open("{file_path}", os.O_WRONLY | os.O_CREAT)
-            bytes += os.write(f, 'a' * 512 * 1024)
+            bytes += os.write(f, b'a' * 512 * 1024)
             os.fsync(f)
-            print "fsync'ed data successfully, will now attempt to fill fs"
+            print("fsync'ed data successfully, will now attempt to fill fs")
 
             # Okay, now we're going to fill up the filesystem, and then keep
             # writing until we see an error from fsync.  As long as we're doing
@@ -261,27 +246,27 @@ class FullnessTestCase(CephFSTestCase):
             full = False
 
             for n in range(0, int({fill_mb} * 0.9)):
-                bytes += os.write(f, 'x' * 1024 * 1024)
-                print "wrote {{0}} bytes via buffered write, may repeat".format(bytes)
-            print "done writing {{0}} bytes".format(bytes)
+                bytes += os.write(f, b'x' * 1024 * 1024)
+                print("wrote {{0}} bytes via buffered write, may repeat".format(bytes))
+            print("done writing {{0}} bytes".format(bytes))
 
             # OK, now we should sneak in under the full condition
             # due to the time it takes the OSDs to report to the
             # mons, and get a successful fsync on our full-making data
             os.fsync(f)
-            print "successfully fsync'ed prior to getting full state reported"
+            print("successfully fsync'ed prior to getting full state reported")
 
             # buffered write, add more dirty data to the buffer
-            print "starting buffered write"
+            print("starting buffered write")
             try:
                 for n in range(0, int({fill_mb} * 0.2)):
-                    bytes += os.write(f, 'x' * 1024 * 1024)
-                    print "sleeping a bit as we've exceeded 90% of our expected full ratio"
+                    bytes += os.write(f, b'x' * 1024 * 1024)
+                    print("sleeping a bit as we've exceeded 90% of our expected full ratio")
                     time.sleep({full_wait})
             except OSError:
                 pass;
 
-            print "wrote, now waiting 30s and then doing a close we expect to fail"
+            print("wrote, now waiting 30s and then doing a close we expect to fail")
 
             # Wait long enough for a background flush that should fail
             time.sleep(30)
@@ -291,7 +276,7 @@ class FullnessTestCase(CephFSTestCase):
                 try:
                     os.close(f)
                 except OSError:
-                    print "close() returned an error as expected"
+                    print("close() returned an error as expected")
                 else:
                     raise RuntimeError("close() failed to raise error")
             else:
@@ -318,12 +303,12 @@ class FullnessTestCase(CephFSTestCase):
             import os
 
             # Write some buffered data through before going full, all should be well
-            print "writing some data through which we expect to succeed"
+            print("writing some data through which we expect to succeed")
             bytes = 0
             f = os.open("{file_path}", os.O_WRONLY | os.O_CREAT)
-            bytes += os.write(f, 'a' * 4096)
+            bytes += os.write(f, b'a' * 4096)
             os.fsync(f)
-            print "fsync'ed data successfully, will now attempt to fill fs"
+            print("fsync'ed data successfully, will now attempt to fill fs")
 
             # Okay, now we're going to fill up the filesystem, and then keep
             # writing until we see an error from fsync.  As long as we're doing
@@ -333,26 +318,26 @@ class FullnessTestCase(CephFSTestCase):
 
             for n in range(0, int({fill_mb} * 1.1)):
                 try:
-                    bytes += os.write(f, 'x' * 1024 * 1024)
-                    print "wrote bytes via buffered write, moving on to fsync"
+                    bytes += os.write(f, b'x' * 1024 * 1024)
+                    print("wrote bytes via buffered write, moving on to fsync")
                 except OSError as e:
-                    print "Unexpected error %s from write() instead of fsync()" % e
+                    print("Unexpected error %s from write() instead of fsync()" % e)
                     raise
 
                 try:
                     os.fsync(f)
-                    print "fsync'ed successfully"
+                    print("fsync'ed successfully")
                 except OSError as e:
-                    print "Reached fullness after %.2f MB" % (bytes / (1024.0 * 1024.0))
+                    print("Reached fullness after %.2f MB" % (bytes / (1024.0 * 1024.0)))
                     full = True
                     break
                 else:
-                    print "Not full yet after %.2f MB" % (bytes / (1024.0 * 1024.0))
+                    print("Not full yet after %.2f MB" % (bytes / (1024.0 * 1024.0)))
 
                 if n > {fill_mb} * 0.9:
                     # Be cautious in the last region where we expect to hit
                     # the full condition, so that we don't overshoot too dramatically
-                    print "sleeping a bit as we've exceeded 90% of our expected full ratio"
+                    print("sleeping a bit as we've exceeded 90% of our expected full ratio")
                     time.sleep({full_wait})
 
             if not full:
@@ -361,9 +346,9 @@ class FullnessTestCase(CephFSTestCase):
             # close() should not raise an error because we already caught it in
             # fsync.  There shouldn't have been any more writeback errors
             # since then because all IOs got cancelled on the full flag.
-            print "calling close"
+            print("calling close")
             os.close(f)
-            print "close() did not raise error"
+            print("close() did not raise error")
 
             os.unlink("{file_path}")
             """)
@@ -375,8 +360,8 @@ class TestQuotaFull(FullnessTestCase):
     """
     Test per-pool fullness, which indicates quota limits exceeded
     """
-    pool_capacity = 1024 * 1024 * 32   # arbitrary low-ish limit
-    fill_mb = pool_capacity / (1024 * 1024)
+    pool_capacity = 1024 * 1024 * 32  # arbitrary low-ish limit
+    fill_mb = pool_capacity // (1024 * 1024)  # type: ignore
 
     # We are only testing quota handling on the data pool, not the metadata
     # pool.
@@ -407,7 +392,7 @@ class TestClusterFull(FullnessTestCase):
             max_avail = self.fs.get_pool_df(self._data_pool_name())['max_avail']
             full_ratio = float(self.fs.get_config("mon_osd_full_ratio", service_type="mon"))
             TestClusterFull.pool_capacity = int(max_avail * full_ratio)
-            TestClusterFull.fill_mb = (self.pool_capacity / (1024 * 1024))
+            TestClusterFull.fill_mb = (self.pool_capacity // (1024 * 1024))
 
     def is_full(self):
         return self.fs.is_full()