]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
update sources to 12.2.2
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1
2 import ceph_module # noqa
3 #import ceph_osdmap #noqa
4 #import ceph_osdmap_incremental #noqa
5 #import ceph_crushmap #noqa
6
7 import json
8 import logging
9 import threading
10 from collections import defaultdict
11
12
13 class CPlusPlusHandler(logging.Handler):
14 def __init__(self, module_inst):
15 super(CPlusPlusHandler, self).__init__()
16 self._module = module_inst
17
18 def emit(self, record):
19 if record.levelno <= logging.DEBUG:
20 ceph_level = 20
21 elif record.levelno <= logging.INFO:
22 ceph_level = 4
23 elif record.levelno <= logging.WARNING:
24 ceph_level = 1
25 else:
26 ceph_level = 0
27
28 self._module._ceph_log(ceph_level, self.format(record))
29
30
31 def configure_logger(module_inst, name):
32 logger = logging.getLogger(name)
33
34
35 # Don't filter any logs at the python level, leave it to C++
36 logger.setLevel(logging.DEBUG)
37
38 # FIXME: we should learn the log level from C++ land, and then
39 # avoid calling the C++ level log when we know a message is of
40 # an insufficient level to be ultimately output
41 logger.addHandler(CPlusPlusHandler(module_inst))
42
43 return logger
44
45
46 def unconfigure_logger(module_inst, name):
47 logger = logging.getLogger(name)
48 rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
49 for h in rm_handlers:
50 logger.removeHandler(h)
51
52 class CommandResult(object):
53 """
54 Use with MgrModule.send_command
55 """
56 def __init__(self, tag):
57 self.ev = threading.Event()
58 self.outs = ""
59 self.outb = ""
60 self.r = 0
61
62 # This is just a convenience for notifications from
63 # C++ land, to avoid passing addresses around in messages.
64 self.tag = tag
65
66 def complete(self, r, outb, outs):
67 self.r = r
68 self.outb = outb
69 self.outs = outs
70 self.ev.set()
71
72 def wait(self):
73 self.ev.wait()
74 return self.r, self.outb, self.outs
75
76
77 class OSDMap(ceph_module.BasePyOSDMap):
78 def get_epoch(self):
79 return self._get_epoch()
80
81 def get_crush_version(self):
82 return self._get_crush_version()
83
84 def dump(self):
85 return self._dump()
86
87 def new_incremental(self):
88 return self._new_incremental()
89
90 def apply_incremental(self, inc):
91 return self._apply_incremental(inc)
92
93 def get_crush(self):
94 return self._get_crush()
95
96 def get_pools_by_take(self, take):
97 return self._get_pools_by_take(take).get('pools', [])
98
99 def calc_pg_upmaps(self, inc,
100 max_deviation=.01, max_iterations=10, pools=[]):
101 return self._calc_pg_upmaps(
102 inc,
103 max_deviation, max_iterations, pools)
104
105 def map_pool_pgs_up(self, poolid):
106 return self._map_pool_pgs_up(poolid)
107
108 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
109 def get_epoch(self):
110 return self._get_epoch()
111
112 def dump(self):
113 return self._dump()
114
115 def set_osd_reweights(self, weightmap):
116 """
117 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
118 """
119 return self._set_osd_reweights(weightmap)
120
121 def set_crush_compat_weight_set_weights(self, weightmap):
122 """
123 weightmap is a dict, int to float. devices only. e.g.,
124 { 0: 3.4, 1: 3.3, 2: 3.334 }
125 """
126 return self._set_crush_compat_weight_set_weights(weightmap)
127
128 class CRUSHMap(ceph_module.BasePyCRUSH):
129 def dump(self):
130 return self._dump()
131
132 def get_item_weight(self, item):
133 return self._get_item_weight(item)
134
135 def get_item_name(self, item):
136 return self._get_item_name(item)
137
138 def find_takes(self):
139 return self._find_takes().get('takes', [])
140
141 def get_take_weight_osd_map(self, root):
142 uglymap = self._get_take_weight_osd_map(root)
143 return { int(k): v for k, v in uglymap.get('weights', {}).iteritems() }
144
145 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
146 """
147 Standby modules only implement a serve and shutdown method, they
148 are not permitted to implement commands and they do not receive
149 any notifications.
150
151 They only have access to the mgrmap (for acecssing service URI info
152 from their active peer), and to configuration settings (read only).
153 """
154
155 def __init__(self, module_name, capsule):
156 super(MgrStandbyModule, self).__init__(capsule)
157 self.module_name = module_name
158 self._logger = configure_logger(self, module_name)
159
160 def __del__(self):
161 unconfigure_logger(self, self.module_name)
162
163 @property
164 def log(self):
165 return self._logger
166
167 def serve(self):
168 """
169 The serve method is mandatory for standby modules.
170 :return:
171 """
172 raise NotImplementedError()
173
174 def get_mgr_id(self):
175 return self._ceph_get_mgr_id()
176
177 def get_config(self, key):
178 return self._ceph_get_config(key)
179
180 def get_active_uri(self):
181 return self._ceph_get_active_uri()
182
183 def get_localized_config(self, key, default=None):
184 r = self.get_config(self.get_mgr_id() + '/' + key)
185 if r is None:
186 r = self.get_config(key)
187
188 if r is None:
189 r = default
190 return r
191
192 class MgrModule(ceph_module.BaseMgrModule):
193 COMMANDS = []
194
195 # Priority definitions for perf counters
196 PRIO_CRITICAL = 10
197 PRIO_INTERESTING = 8
198 PRIO_USEFUL = 5
199 PRIO_UNINTERESTING = 2
200 PRIO_DEBUGONLY = 0
201
202 # counter value types
203 PERFCOUNTER_TIME = 1
204 PERFCOUNTER_U64 = 2
205
206 # counter types
207 PERFCOUNTER_LONGRUNAVG = 4
208 PERFCOUNTER_COUNTER = 8
209 PERFCOUNTER_HISTOGRAM = 0x10
210 PERFCOUNTER_TYPE_MASK = ~2
211
212 def __init__(self, module_name, py_modules_ptr, this_ptr):
213 self.module_name = module_name
214
215 # If we're taking over from a standby module, let's make sure
216 # its logger was unconfigured before we hook ours up
217 unconfigure_logger(self, self.module_name)
218 self._logger = configure_logger(self, module_name)
219
220 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
221
222 self._version = self._ceph_get_version()
223
224 self._perf_schema_cache = None
225
226 def __del__(self):
227 unconfigure_logger(self, self.module_name)
228
229 def update_perf_schema(self, daemon_type, daemon_name):
230 """
231 For plugins that use get_all_perf_counters, call this when
232 receiving a notification of type 'perf_schema_update', to
233 prompt MgrModule to update its cache of counter schemas.
234
235 :param daemon_type:
236 :param daemon_name:
237 :return:
238 """
239
240 @property
241 def log(self):
242 return self._logger
243
244 @property
245 def version(self):
246 return self._version
247
248 def get_context(self):
249 """
250 :return: a Python capsule containing a C++ CephContext pointer
251 """
252 return self._ceph_get_context()
253
254 def notify(self, notify_type, notify_id):
255 """
256 Called by the ceph-mgr service to notify the Python plugin
257 that new state is available.
258 """
259 pass
260
261 def serve(self):
262 """
263 Called by the ceph-mgr service to start any server that
264 is provided by this Python plugin. The implementation
265 of this function should block until ``shutdown`` is called.
266
267 You *must* implement ``shutdown`` if you implement ``serve``
268 """
269 pass
270
271 def shutdown(self):
272 """
273 Called by the ceph-mgr service to request that this
274 module drop out of its serve() function. You do not
275 need to implement this if you do not implement serve()
276
277 :return: None
278 """
279 pass
280
281 def get(self, data_name):
282 """
283 Called by the plugin to load some cluster state from ceph-mgr
284 """
285 return self._ceph_get(data_name)
286
287 def get_server(self, hostname):
288 """
289 Called by the plugin to load information about a particular
290 node from ceph-mgr.
291
292 :param hostname: a hostame
293 """
294 return self._ceph_get_server(hostname)
295
296 def get_perf_schema(self, svc_type, svc_name):
297 """
298 Called by the plugin to fetch perf counter schema info.
299 svc_name can be nullptr, as can svc_type, in which case
300 they are wildcards
301
302 :param svc_type:
303 :param svc_name:
304 :return: list of dicts describing the counters requested
305 """
306 return self._ceph_get_perf_schema(svc_type, svc_name)
307
308 def get_counter(self, svc_type, svc_name, path):
309 """
310 Called by the plugin to fetch data for a particular perf counter
311 on a particular service.
312
313 :param svc_type:
314 :param svc_name:
315 :param path:
316 :return: A list of two-element lists containing time and value
317 """
318 return self._ceph_get_counter(svc_type, svc_name, path)
319
320 def list_servers(self):
321 """
322 Like ``get_server``, but instead of returning information
323 about just one node, return all the nodes in an array.
324 """
325 return self._ceph_get_server(None)
326
327 def get_metadata(self, svc_type, svc_id):
328 """
329 Fetch the metadata for a particular service.
330
331 :param svc_type: string (e.g., 'mds', 'osd', 'mon')
332 :param svc_id: string
333 :return: dict
334 """
335 return self._ceph_get_metadata(svc_type, svc_id)
336
337 def get_daemon_status(self, svc_type, svc_id):
338 """
339 Fetch the latest status for a particular service daemon.
340
341 :param svc_type: string (e.g., 'rgw')
342 :param svc_id: string
343 :return: dict
344 """
345 return self._ceph_get_daemon_status(svc_type, svc_id)
346
347 def send_command(self, *args, **kwargs):
348 """
349 Called by the plugin to send a command to the mon
350 cluster.
351 """
352 self._ceph_send_command(*args, **kwargs)
353
354 def set_health_checks(self, checks):
355 """
356 Set module's health checks
357
358 Set the module's current map of health checks. Argument is a
359 dict of check names to info, in this form:
360
361 {
362 'CHECK_FOO': {
363 'severity': 'warning', # or 'error'
364 'summary': 'summary string',
365 'detail': [ 'list', 'of', 'detail', 'strings' ],
366 },
367 'CHECK_BAR': {
368 'severity': 'error',
369 'summary': 'bars are bad',
370 'detail': [ 'too hard' ],
371 },
372 }
373
374 :param list: dict of health check dicts
375 """
376 self._ceph_set_health_checks(checks)
377
378 def handle_command(self, cmd):
379 """
380 Called by ceph-mgr to request the plugin to handle one
381 of the commands that it declared in self.COMMANDS
382
383 Return a status code, an output buffer, and an
384 output string. The output buffer is for data results,
385 the output string is for informative text.
386
387 :param cmd: dict, from Ceph's cmdmap_t
388
389 :return: 3-tuple of (int, str, str)
390 """
391
392 # Should never get called if they didn't declare
393 # any ``COMMANDS``
394 raise NotImplementedError()
395
396 def get_mgr_id(self):
397 """
398 Retrieve the mgr id.
399
400 :return: str
401 """
402 return self._ceph_get_mgr_id()
403
404 def get_config(self, key, default=None):
405 """
406 Retrieve the value of a persistent configuration setting
407
408 :param key: str
409 :return: str
410 """
411 r = self._ceph_get_config(key)
412 if r is None:
413 return default
414 else:
415 return r
416
417 def get_config_prefix(self, key_prefix):
418 """
419 Retrieve a dict of config values with the given prefix
420
421 :param key_prefix: str
422 :return: str
423 """
424 return self._ceph_get_config_prefix(key_prefix)
425
426 def get_localized_config(self, key, default=None):
427 """
428 Retrieve localized configuration for this ceph-mgr instance
429 :param key: str
430 :param default: str
431 :return: str
432 """
433 r = self.get_config(self.get_mgr_id() + '/' + key)
434 if r is None:
435 r = self.get_config(key)
436
437 if r is None:
438 r = default
439 return r
440
441 def set_config(self, key, val):
442 """
443 Set the value of a persistent configuration setting
444
445 :param key: str
446 :param val: str
447 """
448 self._ceph_set_config(key, val)
449
450 def set_localized_config(self, key, val):
451 """
452 Set localized configuration for this ceph-mgr instance
453 :param key: str
454 :param default: str
455 :return: str
456 """
457 return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
458
459 def set_config_json(self, key, val):
460 """
461 Helper for setting json-serialized-config
462
463 :param key: str
464 :param val: json-serializable object
465 """
466 self._ceph_set_config(key, json.dumps(val))
467
468 def get_config_json(self, key):
469 """
470 Helper for getting json-serialized config
471
472 :param key: str
473 :return: object
474 """
475 raw = self.get_config(key)
476 if raw is None:
477 return None
478 else:
479 return json.loads(raw)
480
481 def self_test(self):
482 """
483 Run a self-test on the module. Override this function and implement
484 a best as possible self-test for (automated) testing of the module
485 :return: bool
486 """
487 pass
488
489 def get_osdmap(self):
490 """
491 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
492 OSDMap.
493 :return: OSDMap
494 """
495 return self._ceph_get_osdmap()
496
497 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
498 """
499 Return the perf counters currently known to this ceph-mgr
500 instance, filtered by priority equal to or greater than `prio_limit`.
501
502 The result us a map of string to dict, associating services
503 (like "osd.123") with their counters. The counter
504 dict for each service maps counter paths to a counter
505 info structure, which is the information from
506 the schema, plus an additional "value" member with the latest
507 value.
508 """
509
510 result = defaultdict(dict)
511
512 # TODO: improve C++->Python interface to return just
513 # the latest if that's all we want.
514 def get_latest(daemon_type, daemon_name, counter):
515 data = self.get_counter(daemon_type, daemon_name, counter)[counter]
516 if data:
517 return data[-1][1]
518 else:
519 return 0
520
521 for server in self.list_servers():
522 for service in server['services']:
523 if service['type'] not in ("mds", "osd", "mon"):
524 continue
525
526 schema = self.get_perf_schema(service['type'], service['id'])
527 if not schema:
528 self.log.warn("No perf counter schema for {0}.{1}".format(
529 service['type'], service['id']
530 ))
531 continue
532
533 # Value is returned in a potentially-multi-service format,
534 # get just the service we're asking about
535 svc_full_name = "{0}.{1}".format(service['type'], service['id'])
536 schema = schema[svc_full_name]
537
538 # Populate latest values
539 for counter_path, counter_schema in schema.items():
540 # self.log.debug("{0}: {1}".format(
541 # counter_path, json.dumps(counter_schema)
542 # ))
543 if counter_schema['priority'] < prio_limit:
544 continue
545
546 counter_info = counter_schema
547 counter_info['value'] = get_latest(service['type'], service['id'], counter_path)
548 result[svc_full_name][counter_path] = counter_info
549
550 self.log.debug("returning {0} counter".format(len(result)))
551
552 return result
553
554 def set_uri(self, uri):
555 """
556 If the module exposes a service, then call this to publish the
557 address once it is available.
558
559 :return: a string
560 """
561 return self._ceph_set_uri(uri)