]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/selftest/module.py
import ceph 15.2.14
[ceph.git] / ceph / src / pybind / mgr / selftest / module.py
1
2 from mgr_module import MgrModule, CommandResult
3 import errno
4 import six
5 import json
6 import random
7 import sys
8 import threading
9
10
11 class Module(MgrModule):
12 """
13 This module is for testing the ceph-mgr python interface from within
14 a running ceph-mgr daemon.
15
16 It implements a sychronous self-test command for calling the functions
17 in the MgrModule interface one by one, and a background "workload"
18 command for causing the module to perform some thrashing-type
19 activities in its serve() thread.
20 """
21
22 # These workloads are things that can be requested to run inside the
23 # serve() function
24 WORKLOAD_COMMAND_SPAM = "command_spam"
25 WORKLOAD_THROW_EXCEPTION = "throw_exception"
26 SHUTDOWN = "shutdown"
27
28 WORKLOADS = (WORKLOAD_COMMAND_SPAM, WORKLOAD_THROW_EXCEPTION)
29
30 # The test code in qa/ relies on these options existing -- they
31 # are of course not really used for anything in the module
32 MODULE_OPTIONS = [
33 {'name': 'testkey'},
34 {'name': 'testlkey'},
35 {'name': 'testnewline'},
36 {'name': 'roption1'},
37 {'name': 'roption2', 'type': 'str', 'default': 'xyz'},
38 {'name': 'rwoption1'},
39 {'name': 'rwoption2', 'type': 'int'},
40 {'name': 'rwoption3', 'type': 'float'},
41 {'name': 'rwoption4', 'type': 'str'},
42 {'name': 'rwoption5', 'type': 'bool'},
43 {'name': 'rwoption6', 'type': 'bool', 'default': True}
44 ]
45
46 COMMANDS = [
47 {
48 "cmd": "mgr self-test run",
49 "desc": "Run mgr python interface tests",
50 "perm": "rw"
51 },
52 {
53 "cmd": "mgr self-test background start name=workload,type=CephString",
54 "desc": "Activate a background workload (one of {0})".format(
55 ", ".join(WORKLOADS)),
56 "perm": "rw"
57 },
58 {
59 "cmd": "mgr self-test background stop",
60 "desc": "Stop background workload if any is running",
61 "perm": "rw"
62 },
63 {
64 "cmd": "mgr self-test config get name=key,type=CephString",
65 "desc": "Peek at a configuration value",
66 "perm": "rw"
67 },
68 {
69 "cmd": "mgr self-test config get_localized name=key,type=CephString",
70 "desc": "Peek at a configuration value (localized variant)",
71 "perm": "rw"
72 },
73 {
74 "cmd": "mgr self-test remote",
75 "desc": "Test inter-module calls",
76 "perm": "rw"
77 },
78 {
79 "cmd": "mgr self-test module name=module,type=CephString",
80 "desc": "Run another module's self_test() method",
81 "perm": "rw"
82 },
83 {
84 "cmd": "mgr self-test health set name=checks,type=CephString",
85 "desc": "Set a health check from a JSON-formatted description.",
86 "perm": "rw"
87 },
88 {
89 "cmd": "mgr self-test health clear name=checks,type=CephString,n=N,req=False",
90 "desc": "Clear health checks by name. If no names provided, clear all.",
91 "perm": "rw"
92 },
93 {
94 "cmd": "mgr self-test insights_set_now_offset name=hours,type=CephString",
95 "desc": "Set the now time for the insights module.",
96 "perm": "rw"
97 },
98 {
99 "cmd": "mgr self-test cluster-log name=channel,type=CephString "
100 "name=priority,type=CephString "
101 "name=message,type=CephString",
102 "desc": "Create an audit log record.",
103 "perm": "rw"
104 },
105 {
106 "cmd": "mgr self-test python-version",
107 "desc": "Query the version of the embedded Python runtime",
108 "perm": "r"
109 },
110 ]
111
112 def __init__(self, *args, **kwargs):
113 super(Module, self).__init__(*args, **kwargs)
114 self._event = threading.Event()
115 self._workload = None
116 self._health = {}
117
118 def handle_command(self, inbuf, command):
119 if command['prefix'] == 'mgr self-test python-version':
120 major = sys.version_info.major
121 minor = sys.version_info.minor
122 micro = sys.version_info.micro
123 return 0, f'{major}.{minor}.{micro}', ''
124
125 elif command['prefix'] == 'mgr self-test run':
126 self._self_test()
127 return 0, '', 'Self-test succeeded'
128
129 elif command['prefix'] == 'mgr self-test background start':
130 if command['workload'] not in self.WORKLOADS:
131 return (-errno.EINVAL, '',
132 "Workload not found '{0}'".format(command['workload']))
133 self._workload = command['workload']
134 self._event.set()
135 return 0, '', 'Running `{0}` in background'.format(self._workload)
136
137 elif command['prefix'] == 'mgr self-test background stop':
138 if self._workload:
139 was_running = self._workload
140 self._workload = None
141 self._event.set()
142 return 0, '', 'Stopping background workload `{0}`'.format(
143 was_running)
144 else:
145 return 0, '', 'No background workload was running'
146 elif command['prefix'] == 'mgr self-test config get':
147 return 0, str(self.get_module_option(command['key'])), ''
148 elif command['prefix'] == 'mgr self-test config get_localized':
149 return 0, str(self.get_localized_module_option(command['key'])), ''
150 elif command['prefix'] == 'mgr self-test remote':
151 self._test_remote_calls()
152 return 0, '', 'Successfully called'
153 elif command['prefix'] == 'mgr self-test module':
154 try:
155 r = self.remote(command['module'], "self_test")
156 except RuntimeError as e:
157 return -1, '', "Test failed: {0}".format(e)
158 else:
159 return 0, str(r), "Self-test OK"
160 elif command['prefix'] == 'mgr self-test health set':
161 return self._health_set(inbuf, command)
162 elif command['prefix'] == 'mgr self-test health clear':
163 return self._health_clear(inbuf, command)
164 elif command['prefix'] == 'mgr self-test insights_set_now_offset':
165 return self._insights_set_now_offset(inbuf, command)
166 elif command['prefix'] == 'mgr self-test cluster-log':
167 priority_map = {
168 'info': self.CLUSTER_LOG_PRIO_INFO,
169 'security': self.CLUSTER_LOG_PRIO_SEC,
170 'warning': self.CLUSTER_LOG_PRIO_WARN,
171 'error': self.CLUSTER_LOG_PRIO_ERROR
172 }
173 self.cluster_log(command['channel'],
174 priority_map[command['priority']],
175 command['message'])
176 return 0, '', 'Successfully called'
177 else:
178 return (-errno.EINVAL, '',
179 "Command not found '{0}'".format(command['prefix']))
180
181 def _health_set(self, inbuf, command):
182 try:
183 checks = json.loads(command["checks"])
184 except Exception as e:
185 return -1, "", "Failed to decode JSON input: {}".format(e)
186
187 try:
188 for check, info in six.iteritems(checks):
189 self._health[check] = {
190 "severity": str(info["severity"]),
191 "summary": str(info["summary"]),
192 "count": 123,
193 "detail": [str(m) for m in info["detail"]]
194 }
195 except Exception as e:
196 return -1, "", "Invalid health check format: {}".format(e)
197
198 self.set_health_checks(self._health)
199 return 0, "", ""
200
201 def _health_clear(self, inbuf, command):
202 if "checks" in command:
203 for check in command["checks"]:
204 if check in self._health:
205 del self._health[check]
206 else:
207 self._health = dict()
208
209 self.set_health_checks(self._health)
210 return 0, "", ""
211
212 def _insights_set_now_offset(self, inbuf, command):
213 try:
214 hours = int(command["hours"])
215 except Exception as e:
216 return -1, "", "Timestamp must be numeric: {}".format(e)
217
218 self.remote("insights", "testing_set_now_time_offset", hours)
219 return 0, "", ""
220
221 def _self_test(self):
222 self.log.info("Running self-test procedure...")
223
224 self._self_test_osdmap()
225 self._self_test_getters()
226 self._self_test_config()
227 self._self_test_store()
228 self._self_test_misc()
229 self._self_test_perf_counters()
230
231 def _self_test_getters(self):
232 self.version
233 self.get_context()
234 self.get_mgr_id()
235
236 # In this function, we will assume that the system is in a steady
237 # state, i.e. if a server/service appears in one call, it will
238 # not have gone by the time we call another function referring to it
239
240 objects = [
241 "fs_map",
242 "osdmap_crush_map_text",
243 "osd_map",
244 "config",
245 "mon_map",
246 "service_map",
247 "osd_metadata",
248 "pg_summary",
249 "pg_status",
250 "pg_dump",
251 "pg_ready",
252 "df",
253 "pg_stats",
254 "pool_stats",
255 "osd_stats",
256 "osd_ping_times",
257 "health",
258 "mon_status",
259 "mgr_map"
260 ]
261 for obj in objects:
262 assert self.get(obj) is not None
263
264 assert self.get("__OBJ_DNE__") is None
265
266 servers = self.list_servers()
267 for server in servers:
268 self.get_server(server['hostname'])
269
270 osdmap = self.get('osd_map')
271 for o in osdmap['osds']:
272 osd_id = o['osd']
273 self.get_metadata("osd", str(osd_id))
274
275 self.get_daemon_status("osd", "0")
276 #send_command
277
278 def _self_test_config(self):
279 # This is not a strong test (can't tell if values really
280 # persisted), it's just for the python interface bit.
281
282 self.set_module_option("testkey", "testvalue")
283 assert self.get_module_option("testkey") == "testvalue"
284
285 self.set_localized_module_option("testkey", "foo")
286 assert self.get_localized_module_option("testkey") == "foo"
287
288 # Must return the default value defined in MODULE_OPTIONS.
289 value = self.get_localized_module_option("rwoption6")
290 assert isinstance(value, bool)
291 assert value is True
292
293 # Use default value.
294 assert self.get_module_option("roption1") is None
295 assert self.get_module_option("roption1", "foobar") == "foobar"
296 assert self.get_module_option("roption2") == "xyz"
297 assert self.get_module_option("roption2", "foobar") == "xyz"
298
299 # Option type is not defined => return as string.
300 self.set_module_option("rwoption1", 8080)
301 value = self.get_module_option("rwoption1")
302 assert isinstance(value, str)
303 assert value == "8080"
304
305 # Option type is defined => return as integer.
306 self.set_module_option("rwoption2", 10)
307 value = self.get_module_option("rwoption2")
308 assert isinstance(value, int)
309 assert value == 10
310
311 # Option type is defined => return as float.
312 self.set_module_option("rwoption3", 1.5)
313 value = self.get_module_option("rwoption3")
314 assert isinstance(value, float)
315 assert value == 1.5
316
317 # Option type is defined => return as string.
318 self.set_module_option("rwoption4", "foo")
319 value = self.get_module_option("rwoption4")
320 assert isinstance(value, str)
321 assert value == "foo"
322
323 # Option type is defined => return as bool.
324 self.set_module_option("rwoption5", False)
325 value = self.get_module_option("rwoption5")
326 assert isinstance(value, bool)
327 assert value is False
328
329 # Specified module does not exist => return None.
330 assert self.get_module_option_ex("foo", "bar") is None
331
332 # Specified key does not exist => return None.
333 assert self.get_module_option_ex("dashboard", "bar") is None
334
335 self.set_module_option_ex("telemetry", "contact", "test@test.com")
336 assert self.get_module_option_ex("telemetry", "contact") == "test@test.com"
337
338 # No option default value, so use the specified one.
339 assert self.get_module_option_ex("dashboard", "password") is None
340 assert self.get_module_option_ex("dashboard", "password", "foobar") == "foobar"
341
342 # Option type is not defined => return as string.
343 self.set_module_option_ex("selftest", "rwoption1", 1234)
344 value = self.get_module_option_ex("selftest", "rwoption1")
345 assert isinstance(value, str)
346 assert value == "1234"
347
348 # Option type is defined => return as integer.
349 self.set_module_option_ex("telemetry", "interval", 60)
350 value = self.get_module_option_ex("telemetry", "interval")
351 assert isinstance(value, int)
352 assert value == 60
353
354 # Option type is defined => return as bool.
355 self.set_module_option_ex("telemetry", "leaderboard", True)
356 value = self.get_module_option_ex("telemetry", "leaderboard")
357 assert isinstance(value, bool)
358 assert value is True
359
360 def _self_test_store(self):
361 existing_keys = set(self.get_store_prefix("test").keys())
362 self.set_store("testkey", "testvalue")
363 assert self.get_store("testkey") == "testvalue"
364
365 assert sorted(self.get_store_prefix("test").keys()) == sorted(
366 list({"testkey"} | existing_keys))
367
368
369 def _self_test_perf_counters(self):
370 self.get_perf_schema("osd", "0")
371 self.get_counter("osd", "0", "osd.op")
372 #get_counter
373 #get_all_perf_coutners
374
375 def _self_test_misc(self):
376 self.set_uri("http://this.is.a.test.com")
377 self.set_health_checks({})
378
379 def _self_test_osdmap(self):
380 osdmap = self.get_osdmap()
381 osdmap.get_epoch()
382 osdmap.get_crush_version()
383 osdmap.dump()
384
385 inc = osdmap.new_incremental()
386 osdmap.apply_incremental(inc)
387 inc.get_epoch()
388 inc.dump()
389
390 crush = osdmap.get_crush()
391 crush.dump()
392 crush.get_item_name(-1)
393 crush.get_item_weight(-1)
394 crush.find_takes()
395 crush.get_take_weight_osd_map(-1)
396
397 #osdmap.get_pools_by_take()
398 #osdmap.calc_pg_upmaps()
399 #osdmap.map_pools_pgs_up()
400
401 #inc.set_osd_reweights
402 #inc.set_crush_compat_weight_set_weights
403
404 self.log.info("Finished self-test procedure.")
405
406 def _test_remote_calls(self):
407 # Test making valid call
408 self.remote("influx", "handle_command", "", {"prefix": "influx self-test"})
409
410 # Test calling module that exists but isn't enabled
411 # (arbitrarily pick a non-always-on module to use)
412 disabled_module = "telegraf"
413 mgr_map = self.get("mgr_map")
414 assert disabled_module not in mgr_map['modules']
415
416 # (This works until the Z release in about 2027)
417 latest_release = sorted(mgr_map['always_on_modules'].keys())[-1]
418 assert disabled_module not in mgr_map['always_on_modules'][latest_release]
419
420 try:
421 self.remote(disabled_module, "handle_command", {"prefix": "influx self-test"})
422 except ImportError:
423 pass
424 else:
425 raise RuntimeError("ImportError not raised for disabled module")
426
427 # Test calling module that doesn't exist
428 try:
429 self.remote("idontexist", "handle_command", {"prefix": "influx self-test"})
430 except ImportError:
431 pass
432 else:
433 raise RuntimeError("ImportError not raised for nonexistent module")
434
435 # Test calling method that doesn't exist
436 try:
437 self.remote("influx", "idontexist", {"prefix": "influx self-test"})
438 except NameError:
439 pass
440 else:
441 raise RuntimeError("KeyError not raised")
442
443 def remote_from_orchestrator_cli_self_test(self, what):
444 import orchestrator
445 if what == 'OrchestratorError':
446 c = orchestrator.TrivialReadCompletion(result=None)
447 c.fail(orchestrator.OrchestratorError('hello, world'))
448 return c
449 elif what == "ZeroDivisionError":
450 c = orchestrator.TrivialReadCompletion(result=None)
451 c.fail(ZeroDivisionError('hello, world'))
452 return c
453 assert False, repr(what)
454
455 def shutdown(self):
456 self._workload = self.SHUTDOWN
457 self._event.set()
458
459 def _command_spam(self):
460 self.log.info("Starting command_spam workload...")
461 while not self._event.is_set():
462 osdmap = self.get_osdmap()
463 dump = osdmap.dump()
464 count = len(dump['osds'])
465 i = int(random.random() * count)
466 w = random.random()
467
468 result = CommandResult('')
469 self.send_command(result, 'mon', '', json.dumps({
470 'prefix': 'osd reweight',
471 'id': i,
472 'weight': w
473 }), '')
474
475 crush = osdmap.get_crush().dump()
476 r, outb, outs = result.wait()
477
478 self._event.clear()
479 self.log.info("Ended command_spam workload...")
480
481 def serve(self):
482 while True:
483 if self._workload == self.WORKLOAD_COMMAND_SPAM:
484 self._command_spam()
485 elif self._workload == self.SHUTDOWN:
486 self.log.info("Shutting down...")
487 break
488 elif self._workload == self.WORKLOAD_THROW_EXCEPTION:
489 raise RuntimeError("Synthetic exception in serve")
490 else:
491 self.log.info("Waiting for workload request...")
492 self._event.wait()
493 self._event.clear()