]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_test_case.py
1 from typing
import Optional
, TYPE_CHECKING
6 from teuthology
.orchestra
.run
import CommandFailedError
9 from tasks
.mgr
.mgr_test_case
import MgrCluster
11 log
= logging
.getLogger(__name__
)
13 class TestTimeoutError(RuntimeError):
16 class CephTestCase(unittest
.TestCase
):
18 For test tasks that want to define a structured set of
19 tests implemented in python. Subclass this with appropriate
20 helpers for the subsystem you're testing.
23 # Environment references
30 mgr_cluster
: Optional
['MgrCluster'] = None
35 # Declarative test requirements: subclasses should override these to indicate
36 # their special needs. If not met, tests will be skipped.
37 REQUIRE_MEMSTORE
= False
40 self
._mon
_configs
_set
= set()
42 self
.ceph_cluster
.mon_manager
.raw_cluster_cmd("log",
43 "Starting test {0}".format(self
.id()))
45 if self
.REQUIRE_MEMSTORE
:
46 objectstore
= self
.ceph_cluster
.get_config("osd_objectstore", "osd")
47 if objectstore
!= "memstore":
48 # You certainly *could* run this on a real OSD, but you don't want to sit
49 # here for hours waiting for the test to fill up a 1TB drive!
50 raise self
.skipTest("Require `memstore` OSD backend (test " \
51 "would take too long on full sized OSDs")
56 self
.ceph_cluster
.mon_manager
.raw_cluster_cmd("log",
57 "Ended test {0}".format(self
.id()))
59 def config_clear(self
):
60 for section
, key
in self
._mon
_configs
_set
:
61 self
.config_rm(section
, key
)
62 self
._mon
_configs
_set
.clear()
64 def _fix_key(self
, key
):
65 return str(key
).replace(' ', '_')
67 def config_get(self
, section
, key
):
68 key
= self
._fix
_key
(key
)
69 return self
.ceph_cluster
.mon_manager
.raw_cluster_cmd("config", "get", section
, key
).strip()
71 def config_show(self
, entity
, key
):
72 key
= self
._fix
_key
(key
)
73 return self
.ceph_cluster
.mon_manager
.raw_cluster_cmd("config", "show", entity
, key
).strip()
75 def config_minimal(self
):
76 return self
.ceph_cluster
.mon_manager
.raw_cluster_cmd("config", "generate-minimal-conf").strip()
78 def config_rm(self
, section
, key
):
79 key
= self
._fix
_key
(key
)
80 self
.ceph_cluster
.mon_manager
.raw_cluster_cmd("config", "rm", section
, key
)
81 # simplification: skip removing from _mon_configs_set;
82 # let tearDown clear everything again
84 def config_set(self
, section
, key
, value
):
85 key
= self
._fix
_key
(key
)
86 self
._mon
_configs
_set
.add((section
, key
))
87 self
.ceph_cluster
.mon_manager
.raw_cluster_cmd("config", "set", section
, key
, str(value
))
89 def assert_cluster_log(self
, expected_pattern
, invert_match
=False,
90 timeout
=10, watch_channel
=None):
92 Context manager. Assert that during execution, or up to 5 seconds later,
93 the Ceph cluster log emits a message matching the expected pattern.
95 :param expected_pattern: A string that you expect to see in the log output
96 :type expected_pattern: str
97 :param watch_channel: Specifies the channel to be watched. This can be
98 'cluster', 'audit', ...
99 :type watch_channel: str
102 ceph_manager
= self
.ceph_cluster
.mon_manager
104 class ContextManager(object):
106 found
= expected_pattern
in self
.watcher_process
.stdout
.getvalue()
113 self
.watcher_process
= ceph_manager
.run_ceph_w(watch_channel
)
115 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
116 if not self
.watcher_process
.finished
:
117 # Check if we got an early match, wait a bit if we didn't
121 log
.debug("No log hits yet, waiting...")
122 # Default monc tick interval is 10s, so wait that long and
124 time
.sleep(5 + timeout
)
126 self
.watcher_process
.stdin
.close()
128 self
.watcher_process
.wait()
129 except CommandFailedError
:
133 log
.error("Log output: \n{0}\n".format(self
.watcher_process
.stdout
.getvalue()))
134 raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern
))
136 return ContextManager()
138 def wait_for_health(self
, pattern
, timeout
):
140 Wait until 'ceph health' contains messages matching the pattern
142 def seen_health_warning():
143 health
= self
.ceph_cluster
.mon_manager
.get_mon_health()
144 codes
= [s
for s
in health
['checks']]
145 summary_strings
= [s
[1]['summary']['message'] for s
in health
['checks'].items()]
146 if len(summary_strings
) == 0:
147 log
.debug("Not expected number of summary strings ({0})".format(summary_strings
))
150 for ss
in summary_strings
:
156 log
.debug("Not found expected summary strings yet ({0})".format(summary_strings
))
159 self
.wait_until_true(seen_health_warning
, timeout
)
161 def wait_for_health_clear(self
, timeout
):
163 Wait until `ceph health` returns no messages
166 health
= self
.ceph_cluster
.mon_manager
.get_mon_health()
167 return len(health
['checks']) == 0
169 self
.wait_until_true(is_clear
, timeout
)
171 def wait_until_equal(self
, get_fn
, expect_val
, timeout
, reject_fn
=None, period
=5):
175 if val
== expect_val
:
177 elif reject_fn
and reject_fn(val
):
178 raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val
))
180 if elapsed
>= timeout
:
181 raise TestTimeoutError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
182 elapsed
, expect_val
, val
185 log
.debug("wait_until_equal: {0} != {1}, waiting (timeout={2})...".format(val
, expect_val
, timeout
))
189 log
.debug("wait_until_equal: success")
192 def wait_until_true(cls
, condition
, timeout
, check_fn
=None, period
=5):
197 log
.debug("wait_until_true: success in {0}s and {1} retries".format(elapsed
, retry_count
))
200 if elapsed
>= timeout
:
201 if check_fn
and check_fn() and retry_count
< 5:
204 log
.debug("wait_until_true: making progress, waiting (timeout={0} retry_count={1})...".format(timeout
, retry_count
))
206 raise TestTimeoutError("Timed out after {0}s and {1} retries".format(elapsed
, retry_count
))
208 log
.debug("wait_until_true: waiting (timeout={0} retry_count={1})...".format(timeout
, retry_count
))