]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/selftest/module.py
update sources to 12.2.2
[ceph.git] / ceph / src / pybind / mgr / selftest / module.py
1
2 from mgr_module import MgrModule, CommandResult
3 import threading
4 import random
5 import json
6 import errno
7
8
9 class Module(MgrModule):
10 """
11 This module is for testing the ceph-mgr python interface from within
12 a running ceph-mgr daemon.
13
14 It implements a sychronous self-test command for calling the functions
15 in the MgrModule interface one by one, and a background "workload"
16 command for causing the module to perform some thrashing-type
17 activities in its serve() thread.
18 """
19
20 WORKLOAD_COMMAND_SPAM = "command_spam"
21 SHUTDOWN = "shutdown"
22
23 WORKLOADS = (WORKLOAD_COMMAND_SPAM, )
24
25 COMMANDS = [
26 {
27 "cmd": "mgr self-test run",
28 "desc": "Run mgr python interface tests",
29 "perm": "r"
30 },
31 {
32 "cmd": "mgr self-test background start name=workload,type=CephString",
33 "desc": "Activate a background workload (one of {0})".format(
34 ", ".join(WORKLOADS)),
35 "perm": "r"
36 },
37 {
38 "cmd": "mgr self-test background stop",
39 "desc": "Stop background workload if any is running",
40 "perm": "r"
41 },
42 ]
43
44
45
46 def __init__(self, *args, **kwargs):
47 super(Module, self).__init__(*args, **kwargs)
48 self._event = threading.Event()
49 self._workload = None
50
51 def handle_command(self, command):
52 if command['prefix'] == 'mgr self-test run':
53 self._self_test()
54 return 0, '', 'Self-test succeeded'
55
56 elif command['prefix'] == 'mgr self-test background start':
57 if command['workload'] not in self.WORKLOADS:
58 return (-errno.EINVAL, '',
59 "Workload not found '{0}'".format(command['workload']))
60 self._workload = command['workload']
61 self._event.set()
62 return 0, '', 'Running `{0}` in background'.format(self._workload)
63
64 elif command['prefix'] == 'mgr self-test background stop':
65 if self._workload:
66 was_running = self._workload
67 self._workload = None
68 self._event.set()
69 return 0, '', 'Stopping background workload `{0}`'.format(
70 was_running)
71 else:
72 return 0, '', 'No background workload was running'
73
74 else:
75 return (-errno.EINVAL, '',
76 "Command not found '{0}'".format(command['prefix']))
77
78 def _self_test(self):
79 self.log.info("Running self-test procedure...")
80
81 self._self_test_osdmap()
82 self._self_test_getters()
83 self._self_test_config()
84 self._self_test_misc()
85 self._self_test_perf_counters()
86
87 def _self_test_getters(self):
88 self.version
89 self.get_context()
90 self.get_mgr_id()
91
92 # In this function, we will assume that the system is in a steady
93 # state, i.e. if a server/service appears in one call, it will
94 # not have gone by the time we call another function referring to it
95
96 objects = [
97 "fs_map",
98 "osdmap_crush_map_text",
99 "osd_map",
100 "config",
101 "mon_map",
102 "service_map",
103 "osd_metadata",
104 "pg_summary",
105 "pg_status",
106 "pg_dump",
107 "df",
108 "osd_stats",
109 "health",
110 "mon_status",
111 "mgr_map"
112 ]
113 for obj in objects:
114 self.get(obj)
115
116 servers = self.list_servers()
117 for server in servers:
118 self.get_server(server['hostname'])
119
120 osdmap = self.get('osd_map')
121 for o in osdmap['osds']:
122 osd_id = o['osd']
123 self.get_metadata("osd", str(osd_id))
124
125 self.get_daemon_status("osd", "0")
126 #send_command
127
128 def _self_test_config(self):
129 # This is not a strong test (can't tell if values really
130 # persisted), it's just for the python interface bit.
131
132 self.set_config("testkey", "testvalue")
133 assert self.get_config("testkey") == "testvalue"
134
135 self.set_localized_config("testkey", "testvalue")
136 assert self.get_localized_config("testkey") == "testvalue"
137
138 self.set_config_json("testjsonkey", {"testblob": 2})
139 assert self.get_config_json("testjsonkey") == {"testblob": 2}
140
141 assert sorted(self.get_config_prefix("test").keys()) == sorted(
142 ["testkey", "testjsonkey"])
143
144 def _self_test_perf_counters(self):
145 self.get_perf_schema("osd", "0")
146 self.get_counter("osd", "0", "osd.op")
147 #get_counter
148 #get_all_perf_coutners
149
150 def _self_test_misc(self):
151 self.set_uri("http://this.is.a.test.com")
152 self.set_health_checks({})
153
154 def _self_test_osdmap(self):
155 osdmap = self.get_osdmap()
156 osdmap.get_epoch()
157 osdmap.get_crush_version()
158 osdmap.dump()
159
160 inc = osdmap.new_incremental()
161 osdmap.apply_incremental(inc)
162 inc.get_epoch()
163 inc.dump()
164
165 crush = osdmap.get_crush()
166 crush.dump()
167 crush.get_item_name(-1)
168 crush.get_item_weight(-1)
169 crush.find_takes()
170 crush.get_take_weight_osd_map(-1)
171
172 #osdmap.get_pools_by_take()
173 #osdmap.calc_pg_upmaps()
174 #osdmap.map_pools_pgs_up()
175
176 #inc.set_osd_reweights
177 #inc.set_crush_compat_weight_set_weights
178
179 self.log.info("Finished self-test procedure.")
180
181 def shutdown(self):
182 self._workload = self.SHUTDOWN
183 self._event.set()
184
185 def _command_spam(self):
186 self.log.info("Starting command_spam workload...")
187 while not self._event.is_set():
188 osdmap = self.get_osdmap()
189 dump = osdmap.dump()
190 count = len(dump['osds'])
191 i = int(random.random() * count)
192 w = random.random()
193
194 result = CommandResult('')
195 self.send_command(result, 'mon', '', json.dumps({
196 'prefix': 'osd reweight',
197 'id': i,
198 'weight': w
199 }), '')
200
201 crush = osdmap.get_crush().dump()
202 r, outb, outs = result.wait()
203
204 self._event.clear()
205 self.log.info("Ended command_spam workload...")
206
207 def serve(self):
208 while True:
209 if self._workload == self.WORKLOAD_COMMAND_SPAM:
210 self._command_spam()
211 elif self._workload == self.SHUTDOWN:
212 self.log.info("Shutting down...")
213 break
214 else:
215 self.log.info("Waiting for workload request...")
216 self._event.wait()
217 self._event.clear()