]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/mgr_module.py
update sources to 12.2.8
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
CommitLineData
7c673cae 1
3efd9988
FG
2import ceph_module # noqa
3#import ceph_osdmap #noqa
4#import ceph_osdmap_incremental #noqa
5#import ceph_crushmap #noqa
6
7c673cae
FG
7import json
8import logging
1adf2230 9import six
7c673cae 10import threading
3efd9988
FG
11from collections import defaultdict
12
13
14class CPlusPlusHandler(logging.Handler):
15 def __init__(self, module_inst):
16 super(CPlusPlusHandler, self).__init__()
17 self._module = module_inst
18
19 def emit(self, record):
20 if record.levelno <= logging.DEBUG:
21 ceph_level = 20
22 elif record.levelno <= logging.INFO:
23 ceph_level = 4
24 elif record.levelno <= logging.WARNING:
25 ceph_level = 1
26 else:
27 ceph_level = 0
28
29 self._module._ceph_log(ceph_level, self.format(record))
30
31
32def configure_logger(module_inst, name):
33 logger = logging.getLogger(name)
34
35
36 # Don't filter any logs at the python level, leave it to C++
37 logger.setLevel(logging.DEBUG)
38
39 # FIXME: we should learn the log level from C++ land, and then
40 # avoid calling the C++ level log when we know a message is of
41 # an insufficient level to be ultimately output
42 logger.addHandler(CPlusPlusHandler(module_inst))
43
44 return logger
7c673cae
FG
45
46
3efd9988
FG
47def unconfigure_logger(module_inst, name):
48 logger = logging.getLogger(name)
49 rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
50 for h in rm_handlers:
51 logger.removeHandler(h)
52
7c673cae
FG
53class CommandResult(object):
54 """
55 Use with MgrModule.send_command
56 """
57 def __init__(self, tag):
58 self.ev = threading.Event()
59 self.outs = ""
60 self.outb = ""
61 self.r = 0
62
63 # This is just a convenience for notifications from
64 # C++ land, to avoid passing addresses around in messages.
65 self.tag = tag
66
67 def complete(self, r, outb, outs):
68 self.r = r
69 self.outb = outb
70 self.outs = outs
71 self.ev.set()
72
73 def wait(self):
74 self.ev.wait()
75 return self.r, self.outb, self.outs
76
77
3efd9988
FG
78class OSDMap(ceph_module.BasePyOSDMap):
79 def get_epoch(self):
80 return self._get_epoch()
81
82 def get_crush_version(self):
83 return self._get_crush_version()
84
85 def dump(self):
86 return self._dump()
87
88 def new_incremental(self):
89 return self._new_incremental()
90
91 def apply_incremental(self, inc):
92 return self._apply_incremental(inc)
93
94 def get_crush(self):
95 return self._get_crush()
96
97 def get_pools_by_take(self, take):
98 return self._get_pools_by_take(take).get('pools', [])
99
100 def calc_pg_upmaps(self, inc,
101 max_deviation=.01, max_iterations=10, pools=[]):
102 return self._calc_pg_upmaps(
103 inc,
104 max_deviation, max_iterations, pools)
105
106 def map_pool_pgs_up(self, poolid):
107 return self._map_pool_pgs_up(poolid)
108
109class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
110 def get_epoch(self):
111 return self._get_epoch()
112
113 def dump(self):
114 return self._dump()
115
116 def set_osd_reweights(self, weightmap):
117 """
118 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
119 """
120 return self._set_osd_reweights(weightmap)
121
122 def set_crush_compat_weight_set_weights(self, weightmap):
123 """
124 weightmap is a dict, int to float. devices only. e.g.,
125 { 0: 3.4, 1: 3.3, 2: 3.334 }
126 """
127 return self._set_crush_compat_weight_set_weights(weightmap)
128
129class CRUSHMap(ceph_module.BasePyCRUSH):
b32b8144
FG
130 ITEM_NONE = 0x7fffffff
131
3efd9988
FG
132 def dump(self):
133 return self._dump()
134
135 def get_item_weight(self, item):
136 return self._get_item_weight(item)
137
138 def get_item_name(self, item):
139 return self._get_item_name(item)
140
141 def find_takes(self):
142 return self._find_takes().get('takes', [])
143
144 def get_take_weight_osd_map(self, root):
145 uglymap = self._get_take_weight_osd_map(root)
1adf2230 146 return { int(k): v for k, v in six.iteritems(uglymap.get('weights', {})) }
3efd9988
FG
147
148class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
149 """
150 Standby modules only implement a serve and shutdown method, they
151 are not permitted to implement commands and they do not receive
152 any notifications.
153
b32b8144 154 They only have access to the mgrmap (for accessing service URI info
3efd9988
FG
155 from their active peer), and to configuration settings (read only).
156 """
157
158 def __init__(self, module_name, capsule):
159 super(MgrStandbyModule, self).__init__(capsule)
160 self.module_name = module_name
161 self._logger = configure_logger(self, module_name)
162
163 def __del__(self):
164 unconfigure_logger(self, self.module_name)
165
166 @property
167 def log(self):
168 return self._logger
169
170 def serve(self):
171 """
172 The serve method is mandatory for standby modules.
173 :return:
174 """
175 raise NotImplementedError()
176
177 def get_mgr_id(self):
178 return self._ceph_get_mgr_id()
179
b32b8144
FG
180 def get_config(self, key, default=None):
181 """
182 Retrieve the value of a persistent configuration setting
183
184 :param str key:
185 :param default: the default value of the config if it is not found
186 :return: str
187 """
188 r = self._ceph_get_config(key)
189 if r is None:
190 return default
191 else:
192 return r
193
3efd9988
FG
194
195 def get_active_uri(self):
196 return self._ceph_get_active_uri()
197
198 def get_localized_config(self, key, default=None):
199 r = self.get_config(self.get_mgr_id() + '/' + key)
200 if r is None:
201 r = self.get_config(key)
202
203 if r is None:
204 r = default
205 return r
206
207class MgrModule(ceph_module.BaseMgrModule):
7c673cae
FG
208 COMMANDS = []
209
3efd9988
FG
210 # Priority definitions for perf counters
211 PRIO_CRITICAL = 10
212 PRIO_INTERESTING = 8
213 PRIO_USEFUL = 5
214 PRIO_UNINTERESTING = 2
215 PRIO_DEBUGONLY = 0
216
217 # counter value types
218 PERFCOUNTER_TIME = 1
219 PERFCOUNTER_U64 = 2
220
221 # counter types
222 PERFCOUNTER_LONGRUNAVG = 4
223 PERFCOUNTER_COUNTER = 8
224 PERFCOUNTER_HISTOGRAM = 0x10
28e407b8 225 PERFCOUNTER_TYPE_MASK = ~3
7c673cae 226
1adf2230
AA
227 # units supported
228 BYTES = 0
229 NONE = 1
230
3efd9988
FG
231 def __init__(self, module_name, py_modules_ptr, this_ptr):
232 self.module_name = module_name
7c673cae 233
3efd9988
FG
234 # If we're taking over from a standby module, let's make sure
235 # its logger was unconfigured before we hook ours up
236 unconfigure_logger(self, self.module_name)
237 self._logger = configure_logger(self, module_name)
7c673cae 238
3efd9988 239 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
7c673cae 240
3efd9988 241 self._version = self._ceph_get_version()
7c673cae 242
3efd9988 243 self._perf_schema_cache = None
7c673cae 244
3efd9988
FG
245 def __del__(self):
246 unconfigure_logger(self, self.module_name)
247
248 def update_perf_schema(self, daemon_type, daemon_name):
249 """
250 For plugins that use get_all_perf_counters, call this when
251 receiving a notification of type 'perf_schema_update', to
252 prompt MgrModule to update its cache of counter schemas.
253
254 :param daemon_type:
255 :param daemon_name:
256 :return:
257 """
7c673cae
FG
258
259 @property
260 def log(self):
261 return self._logger
262
263 @property
264 def version(self):
265 return self._version
266
3efd9988
FG
267 def get_context(self):
268 """
269 :return: a Python capsule containing a C++ CephContext pointer
270 """
271 return self._ceph_get_context()
272
7c673cae
FG
273 def notify(self, notify_type, notify_id):
274 """
275 Called by the ceph-mgr service to notify the Python plugin
276 that new state is available.
277 """
278 pass
279
280 def serve(self):
281 """
282 Called by the ceph-mgr service to start any server that
283 is provided by this Python plugin. The implementation
284 of this function should block until ``shutdown`` is called.
285
286 You *must* implement ``shutdown`` if you implement ``serve``
287 """
288 pass
289
290 def shutdown(self):
291 """
292 Called by the ceph-mgr service to request that this
293 module drop out of its serve() function. You do not
294 need to implement this if you do not implement serve()
295
296 :return: None
297 """
298 pass
299
300 def get(self, data_name):
301 """
302 Called by the plugin to load some cluster state from ceph-mgr
303 """
3efd9988 304 return self._ceph_get(data_name)
7c673cae 305
94b18763
FG
306 def _stattype_to_str(self, stattype):
307
308 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
309 if typeonly == 0:
310 return 'gauge'
311 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
312 # this lie matches the DaemonState decoding: only val, no counts
313 return 'counter'
314 if typeonly == self.PERFCOUNTER_COUNTER:
315 return 'counter'
316 if typeonly == self.PERFCOUNTER_HISTOGRAM:
317 return 'histogram'
318
319 return ''
320
28e407b8
AA
321 def _perfvalue_to_value(self, stattype, value):
322 if stattype & self.PERFCOUNTER_TIME:
323 # Convert from ns to seconds
324 return value / 1000000000.0
325 else:
326 return value
327
1adf2230
AA
328 def _unit_to_str(self, unit):
329 if unit == self.NONE:
330 return "/s"
331 elif unit == self.BYTES:
332 return "B/s"
333
7c673cae
FG
334 def get_server(self, hostname):
335 """
336 Called by the plugin to load information about a particular
337 node from ceph-mgr.
338
339 :param hostname: a hostame
340 """
3efd9988 341 return self._ceph_get_server(hostname)
7c673cae 342
c07f9fc5
FG
343 def get_perf_schema(self, svc_type, svc_name):
344 """
345 Called by the plugin to fetch perf counter schema info.
346 svc_name can be nullptr, as can svc_type, in which case
347 they are wildcards
348
349 :param svc_type:
350 :param svc_name:
351 :return: list of dicts describing the counters requested
352 """
3efd9988 353 return self._ceph_get_perf_schema(svc_type, svc_name)
c07f9fc5 354
7c673cae
FG
355 def get_counter(self, svc_type, svc_name, path):
356 """
357 Called by the plugin to fetch data for a particular perf counter
358 on a particular service.
359
360 :param svc_type:
361 :param svc_name:
362 :param path:
363 :return: A list of two-element lists containing time and value
364 """
3efd9988 365 return self._ceph_get_counter(svc_type, svc_name, path)
7c673cae
FG
366
367 def list_servers(self):
368 """
369 Like ``get_server``, but instead of returning information
370 about just one node, return all the nodes in an array.
371 """
3efd9988 372 return self._ceph_get_server(None)
7c673cae
FG
373
374 def get_metadata(self, svc_type, svc_id):
375 """
376 Fetch the metadata for a particular service.
377
224ce89b 378 :param svc_type: string (e.g., 'mds', 'osd', 'mon')
7c673cae
FG
379 :param svc_id: string
380 :return: dict
381 """
3efd9988 382 return self._ceph_get_metadata(svc_type, svc_id)
7c673cae 383
224ce89b
WB
384 def get_daemon_status(self, svc_type, svc_id):
385 """
386 Fetch the latest status for a particular service daemon.
387
388 :param svc_type: string (e.g., 'rgw')
389 :param svc_id: string
390 :return: dict
391 """
3efd9988 392 return self._ceph_get_daemon_status(svc_type, svc_id)
224ce89b 393
7c673cae
FG
394 def send_command(self, *args, **kwargs):
395 """
396 Called by the plugin to send a command to the mon
397 cluster.
398 """
3efd9988 399 self._ceph_send_command(*args, **kwargs)
7c673cae 400
c07f9fc5
FG
401 def set_health_checks(self, checks):
402 """
403 Set module's health checks
404
405 Set the module's current map of health checks. Argument is a
406 dict of check names to info, in this form:
407
408 {
409 'CHECK_FOO': {
410 'severity': 'warning', # or 'error'
411 'summary': 'summary string',
412 'detail': [ 'list', 'of', 'detail', 'strings' ],
413 },
414 'CHECK_BAR': {
415 'severity': 'error',
416 'summary': 'bars are bad',
417 'detail': [ 'too hard' ],
418 },
419 }
420
421 :param list: dict of health check dicts
422 """
3efd9988 423 self._ceph_set_health_checks(checks)
c07f9fc5 424
7c673cae
FG
425 def handle_command(self, cmd):
426 """
427 Called by ceph-mgr to request the plugin to handle one
428 of the commands that it declared in self.COMMANDS
429
430 Return a status code, an output buffer, and an
431 output string. The output buffer is for data results,
432 the output string is for informative text.
433
434 :param cmd: dict, from Ceph's cmdmap_t
435
436 :return: 3-tuple of (int, str, str)
437 """
438
439 # Should never get called if they didn't declare
440 # any ``COMMANDS``
441 raise NotImplementedError()
442
31f18b77
FG
443 def get_mgr_id(self):
444 """
445 Retrieve the mgr id.
446
447 :return: str
448 """
3efd9988 449 return self._ceph_get_mgr_id()
31f18b77 450
3efd9988 451 def get_config(self, key, default=None):
7c673cae
FG
452 """
453 Retrieve the value of a persistent configuration setting
454
455 :param key: str
456 :return: str
457 """
3efd9988
FG
458 r = self._ceph_get_config(key)
459 if r is None:
460 return default
461 else:
462 return r
7c673cae 463
31f18b77
FG
464 def get_config_prefix(self, key_prefix):
465 """
466 Retrieve a dict of config values with the given prefix
467
468 :param key_prefix: str
469 :return: str
470 """
3efd9988 471 return self._ceph_get_config_prefix(key_prefix)
31f18b77 472
224ce89b
WB
473 def get_localized_config(self, key, default=None):
474 """
475 Retrieve localized configuration for this ceph-mgr instance
476 :param key: str
477 :param default: str
478 :return: str
479 """
480 r = self.get_config(self.get_mgr_id() + '/' + key)
481 if r is None:
482 r = self.get_config(key)
483
484 if r is None:
485 r = default
486 return r
487
7c673cae
FG
488 def set_config(self, key, val):
489 """
490 Set the value of a persistent configuration setting
491
492 :param key: str
493 :param val: str
494 """
3efd9988 495 self._ceph_set_config(key, val)
7c673cae 496
224ce89b
WB
497 def set_localized_config(self, key, val):
498 """
499 Set localized configuration for this ceph-mgr instance
500 :param key: str
501 :param default: str
502 :return: str
503 """
3efd9988 504 return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
224ce89b 505
7c673cae
FG
506 def set_config_json(self, key, val):
507 """
508 Helper for setting json-serialized-config
509
510 :param key: str
511 :param val: json-serializable object
512 """
3efd9988 513 self._ceph_set_config(key, json.dumps(val))
7c673cae
FG
514
515 def get_config_json(self, key):
516 """
517 Helper for getting json-serialized config
518
519 :param key: str
520 :return: object
521 """
522 raw = self.get_config(key)
523 if raw is None:
524 return None
525 else:
526 return json.loads(raw)
224ce89b
WB
527
528 def self_test(self):
529 """
530 Run a self-test on the module. Override this function and implement
531 a best as possible self-test for (automated) testing of the module
532 :return: bool
533 """
534 pass
3efd9988
FG
535
536 def get_osdmap(self):
537 """
538 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
539 OSDMap.
540 :return: OSDMap
541 """
542 return self._ceph_get_osdmap()
543
544 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
545 """
546 Return the perf counters currently known to this ceph-mgr
547 instance, filtered by priority equal to or greater than `prio_limit`.
548
549 The result us a map of string to dict, associating services
550 (like "osd.123") with their counters. The counter
551 dict for each service maps counter paths to a counter
552 info structure, which is the information from
553 the schema, plus an additional "value" member with the latest
554 value.
555 """
556
557 result = defaultdict(dict)
558
559 # TODO: improve C++->Python interface to return just
560 # the latest if that's all we want.
561 def get_latest(daemon_type, daemon_name, counter):
562 data = self.get_counter(daemon_type, daemon_name, counter)[counter]
563 if data:
564 return data[-1][1]
565 else:
566 return 0
567
28e407b8
AA
568 def get_latest_avg(daemon_type, daemon_name, counter):
569 data = self.get_counter(daemon_type, daemon_name, counter)[counter]
570 if data:
571 return (data[-1][1], data[-1][2])
572 else:
573 return (0, 0)
574
3efd9988
FG
575 for server in self.list_servers():
576 for service in server['services']:
94b18763 577 if service['type'] not in ("rgw", "mds", "osd", "mon"):
3efd9988
FG
578 continue
579
580 schema = self.get_perf_schema(service['type'], service['id'])
581 if not schema:
582 self.log.warn("No perf counter schema for {0}.{1}".format(
583 service['type'], service['id']
584 ))
585 continue
586
587 # Value is returned in a potentially-multi-service format,
588 # get just the service we're asking about
589 svc_full_name = "{0}.{1}".format(service['type'], service['id'])
590 schema = schema[svc_full_name]
591
592 # Populate latest values
593 for counter_path, counter_schema in schema.items():
594 # self.log.debug("{0}: {1}".format(
595 # counter_path, json.dumps(counter_schema)
596 # ))
597 if counter_schema['priority'] < prio_limit:
598 continue
599
28e407b8
AA
600 counter_info = dict(counter_schema)
601
602 # Also populate count for the long running avgs
603 if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
604 v, c = get_latest_avg(
605 service['type'],
606 service['id'],
607 counter_path
608 )
609 counter_info['value'], counter_info['count'] = v, c
610 result[svc_full_name][counter_path] = counter_info
611 else:
612 counter_info['value'] = get_latest(
613 service['type'],
614 service['id'],
615 counter_path
616 )
617
3efd9988
FG
618 result[svc_full_name][counter_path] = counter_info
619
620 self.log.debug("returning {0} counter".format(len(result)))
621
622 return result
623
624 def set_uri(self, uri):
625 """
626 If the module exposes a service, then call this to publish the
627 address once it is available.
628
629 :return: a string
630 """
631 return self._ceph_set_uri(uri)
94b18763
FG
632
633 def have_mon_connection(self):
634 """
635 Check whether this ceph-mgr daemon has an open connection
636 to a monitor. If it doesn't, then it's likely that the
637 information we have about the cluster is out of date,
638 and/or the monitor cluster is down.
639 """
640
28e407b8 641 return self._ceph_have_mon_connection()