]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
update sources to 12.2.8
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1
2 import ceph_module # noqa
3 #import ceph_osdmap #noqa
4 #import ceph_osdmap_incremental #noqa
5 #import ceph_crushmap #noqa
6
7 import json
8 import logging
9 import six
10 import threading
11 from collections import defaultdict
12
13
14 class CPlusPlusHandler(logging.Handler):
15 def __init__(self, module_inst):
16 super(CPlusPlusHandler, self).__init__()
17 self._module = module_inst
18
19 def emit(self, record):
20 if record.levelno <= logging.DEBUG:
21 ceph_level = 20
22 elif record.levelno <= logging.INFO:
23 ceph_level = 4
24 elif record.levelno <= logging.WARNING:
25 ceph_level = 1
26 else:
27 ceph_level = 0
28
29 self._module._ceph_log(ceph_level, self.format(record))
30
31
32 def configure_logger(module_inst, name):
33 logger = logging.getLogger(name)
34
35
36 # Don't filter any logs at the python level, leave it to C++
37 logger.setLevel(logging.DEBUG)
38
39 # FIXME: we should learn the log level from C++ land, and then
40 # avoid calling the C++ level log when we know a message is of
41 # an insufficient level to be ultimately output
42 logger.addHandler(CPlusPlusHandler(module_inst))
43
44 return logger
45
46
47 def unconfigure_logger(module_inst, name):
48 logger = logging.getLogger(name)
49 rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
50 for h in rm_handlers:
51 logger.removeHandler(h)
52
53 class CommandResult(object):
54 """
55 Use with MgrModule.send_command
56 """
57 def __init__(self, tag):
58 self.ev = threading.Event()
59 self.outs = ""
60 self.outb = ""
61 self.r = 0
62
63 # This is just a convenience for notifications from
64 # C++ land, to avoid passing addresses around in messages.
65 self.tag = tag
66
67 def complete(self, r, outb, outs):
68 self.r = r
69 self.outb = outb
70 self.outs = outs
71 self.ev.set()
72
73 def wait(self):
74 self.ev.wait()
75 return self.r, self.outb, self.outs
76
77
78 class OSDMap(ceph_module.BasePyOSDMap):
79 def get_epoch(self):
80 return self._get_epoch()
81
82 def get_crush_version(self):
83 return self._get_crush_version()
84
85 def dump(self):
86 return self._dump()
87
88 def new_incremental(self):
89 return self._new_incremental()
90
91 def apply_incremental(self, inc):
92 return self._apply_incremental(inc)
93
94 def get_crush(self):
95 return self._get_crush()
96
97 def get_pools_by_take(self, take):
98 return self._get_pools_by_take(take).get('pools', [])
99
100 def calc_pg_upmaps(self, inc,
101 max_deviation=.01, max_iterations=10, pools=[]):
102 return self._calc_pg_upmaps(
103 inc,
104 max_deviation, max_iterations, pools)
105
106 def map_pool_pgs_up(self, poolid):
107 return self._map_pool_pgs_up(poolid)
108
109 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
110 def get_epoch(self):
111 return self._get_epoch()
112
113 def dump(self):
114 return self._dump()
115
116 def set_osd_reweights(self, weightmap):
117 """
118 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
119 """
120 return self._set_osd_reweights(weightmap)
121
122 def set_crush_compat_weight_set_weights(self, weightmap):
123 """
124 weightmap is a dict, int to float. devices only. e.g.,
125 { 0: 3.4, 1: 3.3, 2: 3.334 }
126 """
127 return self._set_crush_compat_weight_set_weights(weightmap)
128
129 class CRUSHMap(ceph_module.BasePyCRUSH):
130 ITEM_NONE = 0x7fffffff
131
132 def dump(self):
133 return self._dump()
134
135 def get_item_weight(self, item):
136 return self._get_item_weight(item)
137
138 def get_item_name(self, item):
139 return self._get_item_name(item)
140
141 def find_takes(self):
142 return self._find_takes().get('takes', [])
143
144 def get_take_weight_osd_map(self, root):
145 uglymap = self._get_take_weight_osd_map(root)
146 return { int(k): v for k, v in six.iteritems(uglymap.get('weights', {})) }
147
148 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
149 """
150 Standby modules only implement a serve and shutdown method, they
151 are not permitted to implement commands and they do not receive
152 any notifications.
153
154 They only have access to the mgrmap (for accessing service URI info
155 from their active peer), and to configuration settings (read only).
156 """
157
158 def __init__(self, module_name, capsule):
159 super(MgrStandbyModule, self).__init__(capsule)
160 self.module_name = module_name
161 self._logger = configure_logger(self, module_name)
162
163 def __del__(self):
164 unconfigure_logger(self, self.module_name)
165
166 @property
167 def log(self):
168 return self._logger
169
170 def serve(self):
171 """
172 The serve method is mandatory for standby modules.
173 :return:
174 """
175 raise NotImplementedError()
176
177 def get_mgr_id(self):
178 return self._ceph_get_mgr_id()
179
180 def get_config(self, key, default=None):
181 """
182 Retrieve the value of a persistent configuration setting
183
184 :param str key:
185 :param default: the default value of the config if it is not found
186 :return: str
187 """
188 r = self._ceph_get_config(key)
189 if r is None:
190 return default
191 else:
192 return r
193
194
195 def get_active_uri(self):
196 return self._ceph_get_active_uri()
197
198 def get_localized_config(self, key, default=None):
199 r = self.get_config(self.get_mgr_id() + '/' + key)
200 if r is None:
201 r = self.get_config(key)
202
203 if r is None:
204 r = default
205 return r
206
207 class MgrModule(ceph_module.BaseMgrModule):
208 COMMANDS = []
209
210 # Priority definitions for perf counters
211 PRIO_CRITICAL = 10
212 PRIO_INTERESTING = 8
213 PRIO_USEFUL = 5
214 PRIO_UNINTERESTING = 2
215 PRIO_DEBUGONLY = 0
216
217 # counter value types
218 PERFCOUNTER_TIME = 1
219 PERFCOUNTER_U64 = 2
220
221 # counter types
222 PERFCOUNTER_LONGRUNAVG = 4
223 PERFCOUNTER_COUNTER = 8
224 PERFCOUNTER_HISTOGRAM = 0x10
225 PERFCOUNTER_TYPE_MASK = ~3
226
227 # units supported
228 BYTES = 0
229 NONE = 1
230
231 def __init__(self, module_name, py_modules_ptr, this_ptr):
232 self.module_name = module_name
233
234 # If we're taking over from a standby module, let's make sure
235 # its logger was unconfigured before we hook ours up
236 unconfigure_logger(self, self.module_name)
237 self._logger = configure_logger(self, module_name)
238
239 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
240
241 self._version = self._ceph_get_version()
242
243 self._perf_schema_cache = None
244
245 def __del__(self):
246 unconfigure_logger(self, self.module_name)
247
248 def update_perf_schema(self, daemon_type, daemon_name):
249 """
250 For plugins that use get_all_perf_counters, call this when
251 receiving a notification of type 'perf_schema_update', to
252 prompt MgrModule to update its cache of counter schemas.
253
254 :param daemon_type:
255 :param daemon_name:
256 :return:
257 """
258
259 @property
260 def log(self):
261 return self._logger
262
263 @property
264 def version(self):
265 return self._version
266
267 def get_context(self):
268 """
269 :return: a Python capsule containing a C++ CephContext pointer
270 """
271 return self._ceph_get_context()
272
273 def notify(self, notify_type, notify_id):
274 """
275 Called by the ceph-mgr service to notify the Python plugin
276 that new state is available.
277 """
278 pass
279
280 def serve(self):
281 """
282 Called by the ceph-mgr service to start any server that
283 is provided by this Python plugin. The implementation
284 of this function should block until ``shutdown`` is called.
285
286 You *must* implement ``shutdown`` if you implement ``serve``
287 """
288 pass
289
290 def shutdown(self):
291 """
292 Called by the ceph-mgr service to request that this
293 module drop out of its serve() function. You do not
294 need to implement this if you do not implement serve()
295
296 :return: None
297 """
298 pass
299
300 def get(self, data_name):
301 """
302 Called by the plugin to load some cluster state from ceph-mgr
303 """
304 return self._ceph_get(data_name)
305
306 def _stattype_to_str(self, stattype):
307
308 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
309 if typeonly == 0:
310 return 'gauge'
311 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
312 # this lie matches the DaemonState decoding: only val, no counts
313 return 'counter'
314 if typeonly == self.PERFCOUNTER_COUNTER:
315 return 'counter'
316 if typeonly == self.PERFCOUNTER_HISTOGRAM:
317 return 'histogram'
318
319 return ''
320
321 def _perfvalue_to_value(self, stattype, value):
322 if stattype & self.PERFCOUNTER_TIME:
323 # Convert from ns to seconds
324 return value / 1000000000.0
325 else:
326 return value
327
328 def _unit_to_str(self, unit):
329 if unit == self.NONE:
330 return "/s"
331 elif unit == self.BYTES:
332 return "B/s"
333
334 def get_server(self, hostname):
335 """
336 Called by the plugin to load information about a particular
337 node from ceph-mgr.
338
339 :param hostname: a hostame
340 """
341 return self._ceph_get_server(hostname)
342
343 def get_perf_schema(self, svc_type, svc_name):
344 """
345 Called by the plugin to fetch perf counter schema info.
346 svc_name can be nullptr, as can svc_type, in which case
347 they are wildcards
348
349 :param svc_type:
350 :param svc_name:
351 :return: list of dicts describing the counters requested
352 """
353 return self._ceph_get_perf_schema(svc_type, svc_name)
354
355 def get_counter(self, svc_type, svc_name, path):
356 """
357 Called by the plugin to fetch data for a particular perf counter
358 on a particular service.
359
360 :param svc_type:
361 :param svc_name:
362 :param path:
363 :return: A list of two-element lists containing time and value
364 """
365 return self._ceph_get_counter(svc_type, svc_name, path)
366
367 def list_servers(self):
368 """
369 Like ``get_server``, but instead of returning information
370 about just one node, return all the nodes in an array.
371 """
372 return self._ceph_get_server(None)
373
374 def get_metadata(self, svc_type, svc_id):
375 """
376 Fetch the metadata for a particular service.
377
378 :param svc_type: string (e.g., 'mds', 'osd', 'mon')
379 :param svc_id: string
380 :return: dict
381 """
382 return self._ceph_get_metadata(svc_type, svc_id)
383
384 def get_daemon_status(self, svc_type, svc_id):
385 """
386 Fetch the latest status for a particular service daemon.
387
388 :param svc_type: string (e.g., 'rgw')
389 :param svc_id: string
390 :return: dict
391 """
392 return self._ceph_get_daemon_status(svc_type, svc_id)
393
394 def send_command(self, *args, **kwargs):
395 """
396 Called by the plugin to send a command to the mon
397 cluster.
398 """
399 self._ceph_send_command(*args, **kwargs)
400
401 def set_health_checks(self, checks):
402 """
403 Set module's health checks
404
405 Set the module's current map of health checks. Argument is a
406 dict of check names to info, in this form:
407
408 {
409 'CHECK_FOO': {
410 'severity': 'warning', # or 'error'
411 'summary': 'summary string',
412 'detail': [ 'list', 'of', 'detail', 'strings' ],
413 },
414 'CHECK_BAR': {
415 'severity': 'error',
416 'summary': 'bars are bad',
417 'detail': [ 'too hard' ],
418 },
419 }
420
421 :param list: dict of health check dicts
422 """
423 self._ceph_set_health_checks(checks)
424
425 def handle_command(self, cmd):
426 """
427 Called by ceph-mgr to request the plugin to handle one
428 of the commands that it declared in self.COMMANDS
429
430 Return a status code, an output buffer, and an
431 output string. The output buffer is for data results,
432 the output string is for informative text.
433
434 :param cmd: dict, from Ceph's cmdmap_t
435
436 :return: 3-tuple of (int, str, str)
437 """
438
439 # Should never get called if they didn't declare
440 # any ``COMMANDS``
441 raise NotImplementedError()
442
443 def get_mgr_id(self):
444 """
445 Retrieve the mgr id.
446
447 :return: str
448 """
449 return self._ceph_get_mgr_id()
450
451 def get_config(self, key, default=None):
452 """
453 Retrieve the value of a persistent configuration setting
454
455 :param key: str
456 :return: str
457 """
458 r = self._ceph_get_config(key)
459 if r is None:
460 return default
461 else:
462 return r
463
464 def get_config_prefix(self, key_prefix):
465 """
466 Retrieve a dict of config values with the given prefix
467
468 :param key_prefix: str
469 :return: str
470 """
471 return self._ceph_get_config_prefix(key_prefix)
472
473 def get_localized_config(self, key, default=None):
474 """
475 Retrieve localized configuration for this ceph-mgr instance
476 :param key: str
477 :param default: str
478 :return: str
479 """
480 r = self.get_config(self.get_mgr_id() + '/' + key)
481 if r is None:
482 r = self.get_config(key)
483
484 if r is None:
485 r = default
486 return r
487
488 def set_config(self, key, val):
489 """
490 Set the value of a persistent configuration setting
491
492 :param key: str
493 :param val: str
494 """
495 self._ceph_set_config(key, val)
496
497 def set_localized_config(self, key, val):
498 """
499 Set localized configuration for this ceph-mgr instance
500 :param key: str
501 :param default: str
502 :return: str
503 """
504 return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
505
506 def set_config_json(self, key, val):
507 """
508 Helper for setting json-serialized-config
509
510 :param key: str
511 :param val: json-serializable object
512 """
513 self._ceph_set_config(key, json.dumps(val))
514
515 def get_config_json(self, key):
516 """
517 Helper for getting json-serialized config
518
519 :param key: str
520 :return: object
521 """
522 raw = self.get_config(key)
523 if raw is None:
524 return None
525 else:
526 return json.loads(raw)
527
528 def self_test(self):
529 """
530 Run a self-test on the module. Override this function and implement
531 a best as possible self-test for (automated) testing of the module
532 :return: bool
533 """
534 pass
535
536 def get_osdmap(self):
537 """
538 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
539 OSDMap.
540 :return: OSDMap
541 """
542 return self._ceph_get_osdmap()
543
544 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
545 """
546 Return the perf counters currently known to this ceph-mgr
547 instance, filtered by priority equal to or greater than `prio_limit`.
548
549 The result us a map of string to dict, associating services
550 (like "osd.123") with their counters. The counter
551 dict for each service maps counter paths to a counter
552 info structure, which is the information from
553 the schema, plus an additional "value" member with the latest
554 value.
555 """
556
557 result = defaultdict(dict)
558
559 # TODO: improve C++->Python interface to return just
560 # the latest if that's all we want.
561 def get_latest(daemon_type, daemon_name, counter):
562 data = self.get_counter(daemon_type, daemon_name, counter)[counter]
563 if data:
564 return data[-1][1]
565 else:
566 return 0
567
568 def get_latest_avg(daemon_type, daemon_name, counter):
569 data = self.get_counter(daemon_type, daemon_name, counter)[counter]
570 if data:
571 return (data[-1][1], data[-1][2])
572 else:
573 return (0, 0)
574
575 for server in self.list_servers():
576 for service in server['services']:
577 if service['type'] not in ("rgw", "mds", "osd", "mon"):
578 continue
579
580 schema = self.get_perf_schema(service['type'], service['id'])
581 if not schema:
582 self.log.warn("No perf counter schema for {0}.{1}".format(
583 service['type'], service['id']
584 ))
585 continue
586
587 # Value is returned in a potentially-multi-service format,
588 # get just the service we're asking about
589 svc_full_name = "{0}.{1}".format(service['type'], service['id'])
590 schema = schema[svc_full_name]
591
592 # Populate latest values
593 for counter_path, counter_schema in schema.items():
594 # self.log.debug("{0}: {1}".format(
595 # counter_path, json.dumps(counter_schema)
596 # ))
597 if counter_schema['priority'] < prio_limit:
598 continue
599
600 counter_info = dict(counter_schema)
601
602 # Also populate count for the long running avgs
603 if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
604 v, c = get_latest_avg(
605 service['type'],
606 service['id'],
607 counter_path
608 )
609 counter_info['value'], counter_info['count'] = v, c
610 result[svc_full_name][counter_path] = counter_info
611 else:
612 counter_info['value'] = get_latest(
613 service['type'],
614 service['id'],
615 counter_path
616 )
617
618 result[svc_full_name][counter_path] = counter_info
619
620 self.log.debug("returning {0} counter".format(len(result)))
621
622 return result
623
624 def set_uri(self, uri):
625 """
626 If the module exposes a service, then call this to publish the
627 address once it is available.
628
629 :return: a string
630 """
631 return self._ceph_set_uri(uri)
632
633 def have_mon_connection(self):
634 """
635 Check whether this ceph-mgr daemon has an open connection
636 to a monitor. If it doesn't, then it's likely that the
637 information we have about the cluster is out of date,
638 and/or the monitor cluster is down.
639 """
640
641 return self._ceph_have_mon_connection()