]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_test_case.py
import ceph 16.2.7
[ceph.git] / ceph / qa / tasks / ceph_test_case.py
1 from typing import Optional, TYPE_CHECKING
2 import unittest
3 import time
4 import logging
5
6 from teuthology.orchestra.run import CommandFailedError
7
8 if TYPE_CHECKING:
9 from tasks.mgr.mgr_test_case import MgrCluster
10
11 log = logging.getLogger(__name__)
12
13 class TestTimeoutError(RuntimeError):
14 pass
15
16 class CephTestCase(unittest.TestCase):
17 """
18 For test tasks that want to define a structured set of
19 tests implemented in python. Subclass this with appropriate
20 helpers for the subsystem you're testing.
21 """
22
23 # Environment references
24 mounts = None
25 fs = None
26 recovery_fs = None
27 backup_fs = None
28 ceph_cluster = None
29 mds_cluster = None
30 mgr_cluster: Optional['MgrCluster'] = None
31 ctx = None
32
33 mon_manager = None
34
35 # Declarative test requirements: subclasses should override these to indicate
36 # their special needs. If not met, tests will be skipped.
37 REQUIRE_MEMSTORE = False
38
39 def setUp(self):
40 self._mon_configs_set = set()
41
42 self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
43 "Starting test {0}".format(self.id()))
44
45 if self.REQUIRE_MEMSTORE:
46 objectstore = self.ceph_cluster.get_config("osd_objectstore", "osd")
47 if objectstore != "memstore":
48 # You certainly *could* run this on a real OSD, but you don't want to sit
49 # here for hours waiting for the test to fill up a 1TB drive!
50 raise self.skipTest("Require `memstore` OSD backend (test " \
51 "would take too long on full sized OSDs")
52
53 def tearDown(self):
54 self.config_clear()
55
56 self.ceph_cluster.mon_manager.raw_cluster_cmd("log",
57 "Ended test {0}".format(self.id()))
58
59 def config_clear(self):
60 for section, key in self._mon_configs_set:
61 self.config_rm(section, key)
62 self._mon_configs_set.clear()
63
64 def _fix_key(self, key):
65 return str(key).replace(' ', '_')
66
67 def config_get(self, section, key):
68 key = self._fix_key(key)
69 return self.ceph_cluster.mon_manager.raw_cluster_cmd("config", "get", section, key).strip()
70
71 def config_show(self, entity, key):
72 key = self._fix_key(key)
73 return self.ceph_cluster.mon_manager.raw_cluster_cmd("config", "show", entity, key).strip()
74
75 def config_minimal(self):
76 return self.ceph_cluster.mon_manager.raw_cluster_cmd("config", "generate-minimal-conf").strip()
77
78 def config_rm(self, section, key):
79 key = self._fix_key(key)
80 self.ceph_cluster.mon_manager.raw_cluster_cmd("config", "rm", section, key)
81 # simplification: skip removing from _mon_configs_set;
82 # let tearDown clear everything again
83
84 def config_set(self, section, key, value):
85 key = self._fix_key(key)
86 self._mon_configs_set.add((section, key))
87 self.ceph_cluster.mon_manager.raw_cluster_cmd("config", "set", section, key, str(value))
88
89 def assert_cluster_log(self, expected_pattern, invert_match=False,
90 timeout=10, watch_channel=None):
91 """
92 Context manager. Assert that during execution, or up to 5 seconds later,
93 the Ceph cluster log emits a message matching the expected pattern.
94
95 :param expected_pattern: A string that you expect to see in the log output
96 :type expected_pattern: str
97 :param watch_channel: Specifies the channel to be watched. This can be
98 'cluster', 'audit', ...
99 :type watch_channel: str
100 """
101
102 ceph_manager = self.ceph_cluster.mon_manager
103
104 class ContextManager(object):
105 def match(self):
106 found = expected_pattern in self.watcher_process.stdout.getvalue()
107 if invert_match:
108 return not found
109
110 return found
111
112 def __enter__(self):
113 self.watcher_process = ceph_manager.run_ceph_w(watch_channel)
114
115 def __exit__(self, exc_type, exc_val, exc_tb):
116 if not self.watcher_process.finished:
117 # Check if we got an early match, wait a bit if we didn't
118 if self.match():
119 return
120 else:
121 log.debug("No log hits yet, waiting...")
122 # Default monc tick interval is 10s, so wait that long and
123 # then some grace
124 time.sleep(5 + timeout)
125
126 self.watcher_process.stdin.close()
127 try:
128 self.watcher_process.wait()
129 except CommandFailedError:
130 pass
131
132 if not self.match():
133 log.error("Log output: \n{0}\n".format(self.watcher_process.stdout.getvalue()))
134 raise AssertionError("Expected log message not found: '{0}'".format(expected_pattern))
135
136 return ContextManager()
137
138 def wait_for_health(self, pattern, timeout):
139 """
140 Wait until 'ceph health' contains messages matching the pattern
141 """
142 def seen_health_warning():
143 health = self.ceph_cluster.mon_manager.get_mon_health()
144 codes = [s for s in health['checks']]
145 summary_strings = [s[1]['summary']['message'] for s in health['checks'].items()]
146 if len(summary_strings) == 0:
147 log.debug("Not expected number of summary strings ({0})".format(summary_strings))
148 return False
149 else:
150 for ss in summary_strings:
151 if pattern in ss:
152 return True
153 if pattern in codes:
154 return True
155
156 log.debug("Not found expected summary strings yet ({0})".format(summary_strings))
157 return False
158
159 self.wait_until_true(seen_health_warning, timeout)
160
161 def wait_for_health_clear(self, timeout):
162 """
163 Wait until `ceph health` returns no messages
164 """
165 def is_clear():
166 health = self.ceph_cluster.mon_manager.get_mon_health()
167 return len(health['checks']) == 0
168
169 self.wait_until_true(is_clear, timeout)
170
171 def wait_until_equal(self, get_fn, expect_val, timeout, reject_fn=None, period=5):
172 elapsed = 0
173 while True:
174 val = get_fn()
175 if val == expect_val:
176 return
177 elif reject_fn and reject_fn(val):
178 raise RuntimeError("wait_until_equal: forbidden value {0} seen".format(val))
179 else:
180 if elapsed >= timeout:
181 raise TestTimeoutError("Timed out after {0} seconds waiting for {1} (currently {2})".format(
182 elapsed, expect_val, val
183 ))
184 else:
185 log.debug("wait_until_equal: {0} != {1}, waiting (timeout={2})...".format(val, expect_val, timeout))
186 time.sleep(period)
187 elapsed += period
188
189 log.debug("wait_until_equal: success")
190
191 @classmethod
192 def wait_until_true(cls, condition, timeout, check_fn=None, period=5):
193 elapsed = 0
194 retry_count = 0
195 while True:
196 if condition():
197 log.debug("wait_until_true: success in {0}s and {1} retries".format(elapsed, retry_count))
198 return
199 else:
200 if elapsed >= timeout:
201 if check_fn and check_fn() and retry_count < 5:
202 elapsed = 0
203 retry_count += 1
204 log.debug("wait_until_true: making progress, waiting (timeout={0} retry_count={1})...".format(timeout, retry_count))
205 else:
206 raise TestTimeoutError("Timed out after {0}s and {1} retries".format(elapsed, retry_count))
207 else:
208 log.debug("wait_until_true: waiting (timeout={0} retry_count={1})...".format(timeout, retry_count))
209 time.sleep(period)
210 elapsed += period