]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1 import ceph_module # noqa
2
3 try:
4 from typing import Set, Tuple, Iterator, Any
5 except ImportError:
6 # just for type checking
7 pass
8 import logging
9 import json
10 import six
11 import threading
12 from collections import defaultdict, namedtuple
13 import rados
14 import re
15 import time
16
17 PG_STATES = [
18 "active",
19 "clean",
20 "down",
21 "recovery_unfound",
22 "backfill_unfound",
23 "scrubbing",
24 "degraded",
25 "inconsistent",
26 "peering",
27 "repair",
28 "recovering",
29 "forced_recovery",
30 "backfill_wait",
31 "incomplete",
32 "stale",
33 "remapped",
34 "deep",
35 "backfilling",
36 "forced_backfill",
37 "backfill_toofull",
38 "recovery_wait",
39 "recovery_toofull",
40 "undersized",
41 "activating",
42 "peered",
43 "snaptrim",
44 "snaptrim_wait",
45 "snaptrim_error",
46 "creating",
47 "unknown"]
48
49
50 class CPlusPlusHandler(logging.Handler):
51 def __init__(self, module_inst):
52 super(CPlusPlusHandler, self).__init__()
53 self._module = module_inst
54
55 def emit(self, record):
56 if record.levelno <= logging.DEBUG:
57 ceph_level = 20
58 elif record.levelno <= logging.INFO:
59 ceph_level = 4
60 elif record.levelno <= logging.WARNING:
61 ceph_level = 1
62 else:
63 ceph_level = 0
64
65 self._module._ceph_log(ceph_level, self.format(record))
66
67
68 def configure_logger(module_inst, module_name):
69 """
70 Create and configure the logger with the specified module.
71
72 A handler will be added to the root logger which will redirect
73 the messages from all loggers (incl. 3rd party libraries) to the
74 Ceph log.
75
76 :param module_inst: The module instance.
77 :type module_inst: instance
78 :param module_name: The module name.
79 :type module_name: str
80 :return: Return the logger with the specified name.
81 """
82 logger = logging.getLogger(module_name)
83 # Don't filter any logs at the python level, leave it to C++.
84 # FIXME: We should learn the log level from C++ land, and then
85 # avoid calling the C++ level log when we know a message
86 # is of an insufficient level to be ultimately output.
87 logger.setLevel(logging.DEBUG) # Don't use NOTSET
88
89 root_logger = logging.getLogger()
90 # Add handler to the root logger, thus this module and all
91 # 3rd party libraries will log their messages to the Ceph log.
92 root_logger.addHandler(CPlusPlusHandler(module_inst))
93 # Set the log level to ``ERROR`` to ensure that we only get
94 # those message from 3rd party libraries (only effective if
95 # they use the default log level ``NOTSET``).
96 # Check https://docs.python.org/3/library/logging.html#logging.Logger.setLevel
97 # for more information about how the effective log level is
98 # determined.
99 root_logger.setLevel(logging.ERROR)
100
101 return logger
102
103
104 def unconfigure_logger(module_name=None):
105 """
106 :param module_name: The module name. Defaults to ``None``.
107 :type module_name: str or None
108 """
109 logger = logging.getLogger(module_name)
110 rm_handlers = [
111 h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
112 for h in rm_handlers:
113 logger.removeHandler(h)
114
115
116 class CommandResult(object):
117 """
118 Use with MgrModule.send_command
119 """
120
121 def __init__(self, tag=None):
122 self.ev = threading.Event()
123 self.outs = ""
124 self.outb = ""
125 self.r = 0
126
127 # This is just a convenience for notifications from
128 # C++ land, to avoid passing addresses around in messages.
129 self.tag = tag if tag else ""
130
131 def complete(self, r, outb, outs):
132 self.r = r
133 self.outb = outb
134 self.outs = outs
135 self.ev.set()
136
137 def wait(self):
138 self.ev.wait()
139 return self.r, self.outb, self.outs
140
141
142 class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
143 def __new__(cls, retval=0, stdout="", stderr=""):
144 """
145 Tuple containing the result of `handle_command()`
146
147 Only write to stderr if there is an error, or in extraordinary circumstances
148
149 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
150 is critical information to include there.
151
152 Everything programmatically consumable should be put on stdout
153
154 :param retval: return code. E.g. 0 or -errno.EINVAL
155 :type retval: int
156 :param stdout: data of this result.
157 :type stdout: str
158 :param stderr: Typically used for error messages.
159 :type stderr: str
160 """
161 return super(HandleCommandResult, cls).__new__(cls, retval, stdout, stderr)
162
163
164 class OSDMap(ceph_module.BasePyOSDMap):
165 def get_epoch(self):
166 return self._get_epoch()
167
168 def get_crush_version(self):
169 return self._get_crush_version()
170
171 def dump(self):
172 return self._dump()
173
174 def get_pools(self):
175 # FIXME: efficient implementation
176 d = self._dump()
177 return dict([(p['pool'], p) for p in d['pools']])
178
179 def get_pools_by_name(self):
180 # FIXME: efficient implementation
181 d = self._dump()
182 return dict([(p['pool_name'], p) for p in d['pools']])
183
184 def new_incremental(self):
185 return self._new_incremental()
186
187 def apply_incremental(self, inc):
188 return self._apply_incremental(inc)
189
190 def get_crush(self):
191 return self._get_crush()
192
193 def get_pools_by_take(self, take):
194 return self._get_pools_by_take(take).get('pools', [])
195
196 def calc_pg_upmaps(self, inc,
197 max_deviation=.01, max_iterations=10, pools=None):
198 if pools is None:
199 pools = []
200 return self._calc_pg_upmaps(
201 inc,
202 max_deviation, max_iterations, pools)
203
204 def map_pool_pgs_up(self, poolid):
205 return self._map_pool_pgs_up(poolid)
206
207 def pg_to_up_acting_osds(self, pool_id, ps):
208 return self._pg_to_up_acting_osds(pool_id, ps)
209
210 def pool_raw_used_rate(self, pool_id):
211 return self._pool_raw_used_rate(pool_id)
212
213 def get_ec_profile(self, name):
214 # FIXME: efficient implementation
215 d = self._dump()
216 return d['erasure_code_profiles'].get(name, None)
217
218
219 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
220 def get_epoch(self):
221 return self._get_epoch()
222
223 def dump(self):
224 return self._dump()
225
226 def set_osd_reweights(self, weightmap):
227 """
228 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
229 """
230 return self._set_osd_reweights(weightmap)
231
232 def set_crush_compat_weight_set_weights(self, weightmap):
233 """
234 weightmap is a dict, int to float. devices only. e.g.,
235 { 0: 3.4, 1: 3.3, 2: 3.334 }
236 """
237 return self._set_crush_compat_weight_set_weights(weightmap)
238
239
240 class CRUSHMap(ceph_module.BasePyCRUSH):
241 ITEM_NONE = 0x7fffffff
242 DEFAULT_CHOOSE_ARGS = '-1'
243
244 def dump(self):
245 return self._dump()
246
247 def get_item_weight(self, item):
248 return self._get_item_weight(item)
249
250 def get_item_name(self, item):
251 return self._get_item_name(item)
252
253 def find_takes(self):
254 return self._find_takes().get('takes', [])
255
256 def get_take_weight_osd_map(self, root):
257 uglymap = self._get_take_weight_osd_map(root)
258 return {int(k): v for k, v in six.iteritems(uglymap.get('weights', {}))}
259
260 @staticmethod
261 def have_default_choose_args(dump):
262 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
263
264 @staticmethod
265 def get_default_choose_args(dump):
266 return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
267
268 def get_rule(self, rule_name):
269 # TODO efficient implementation
270 for rule in self.dump()['rules']:
271 if rule_name == rule['rule_name']:
272 return rule
273
274 return None
275
276 def get_rule_by_id(self, rule_id):
277 for rule in self.dump()['rules']:
278 if rule['rule_id'] == rule_id:
279 return rule
280
281 return None
282
283 def get_rule_root(self, rule_name):
284 rule = self.get_rule(rule_name)
285 if rule is None:
286 return None
287
288 try:
289 first_take = [s for s in rule['steps'] if s['op'] == 'take'][0]
290 except IndexError:
291 self.log.warn("CRUSH rule '{0}' has no 'take' step".format(
292 rule_name))
293 return None
294 else:
295 return first_take['item']
296
297 def get_osds_under(self, root_id):
298 # TODO don't abuse dump like this
299 d = self.dump()
300 buckets = dict([(b['id'], b) for b in d['buckets']])
301
302 osd_list = []
303
304 def accumulate(b):
305 for item in b['items']:
306 if item['id'] >= 0:
307 osd_list.append(item['id'])
308 else:
309 try:
310 accumulate(buckets[item['id']])
311 except KeyError:
312 pass
313
314 accumulate(buckets[root_id])
315
316 return osd_list
317
318 def device_class_counts(self):
319 result = defaultdict(int)
320 # TODO don't abuse dump like this
321 d = self.dump()
322 for device in d['devices']:
323 cls = device.get('class', None)
324 result[cls] += 1
325
326 return dict(result)
327
328
329 class CLICommand(object):
330 COMMANDS = {}
331
332 def __init__(self, prefix, args="", desc="", perm="rw"):
333 self.prefix = prefix
334 self.args = args
335 self.args_dict = {}
336 self.desc = desc
337 self.perm = perm
338 self.func = None
339 self._parse_args()
340
341 def _parse_args(self):
342 if not self.args:
343 return
344 args = self.args.split(" ")
345 for arg in args:
346 arg_desc = arg.strip().split(",")
347 arg_d = {}
348 for kv in arg_desc:
349 k, v = kv.split("=")
350 if k != "name":
351 arg_d[k] = v
352 else:
353 self.args_dict[v] = arg_d
354
355 def __call__(self, func):
356 self.func = func
357 self.COMMANDS[self.prefix] = self
358 return self.func
359
360 def call(self, mgr, cmd_dict, inbuf):
361 kwargs = {}
362 for a, d in self.args_dict.items():
363 if 'req' in d and d['req'] == "false" and a not in cmd_dict:
364 continue
365 kwargs[a.replace("-", "_")] = cmd_dict[a]
366 if inbuf:
367 kwargs['inbuf'] = inbuf
368 return self.func(mgr, **kwargs)
369
370 @classmethod
371 def dump_cmd_list(cls):
372 return [{
373 'cmd': '{} {}'.format(cmd.prefix, cmd.args),
374 'desc': cmd.desc,
375 'perm': cmd.perm
376 } for _, cmd in cls.COMMANDS.items()]
377
378
379 def CLIReadCommand(prefix, args="", desc=""):
380 return CLICommand(prefix, args, desc, "r")
381
382
383 def CLIWriteCommand(prefix, args="", desc=""):
384 return CLICommand(prefix, args, desc, "w")
385
386
387 def _get_localized_key(prefix, key):
388 return '{}/{}'.format(prefix, key)
389
390
391 class Option(dict):
392 """
393 Helper class to declare options for MODULE_OPTIONS list.
394
395 Caveat: it uses argument names matching Python keywords (type, min, max),
396 so any further processing should happen in a separate method.
397
398 TODO: type validation.
399 """
400
401 def __init__(
402 self, name,
403 default=None,
404 type='str',
405 desc=None, longdesc=None,
406 min=None, max=None,
407 enum_allowed=None,
408 see_also=None,
409 tags=None,
410 runtime=False,
411 ):
412 super(Option, self).__init__(
413 (k, v) for k, v in vars().items()
414 if k != 'self' and v is not None)
415
416
417 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
418 """
419 Standby modules only implement a serve and shutdown method, they
420 are not permitted to implement commands and they do not receive
421 any notifications.
422
423 They only have access to the mgrmap (for accessing service URI info
424 from their active peer), and to configuration settings (read only).
425 """
426
427 MODULE_OPTIONS = []
428 MODULE_OPTION_DEFAULTS = {}
429
430 def __init__(self, module_name, capsule):
431 super(MgrStandbyModule, self).__init__(capsule)
432 self.module_name = module_name
433 self._logger = configure_logger(self, module_name)
434 # see also MgrModule.__init__()
435 for o in self.MODULE_OPTIONS:
436 if 'default' in o:
437 if 'type' in o:
438 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
439 else:
440 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
441
442 def __del__(self):
443 unconfigure_logger()
444
445 @property
446 def log(self):
447 return self._logger
448
449 def serve(self):
450 """
451 The serve method is mandatory for standby modules.
452 :return:
453 """
454 raise NotImplementedError()
455
456 def get_mgr_id(self):
457 return self._ceph_get_mgr_id()
458
459 def get_module_option(self, key, default=None):
460 """
461 Retrieve the value of a persistent configuration setting
462
463 :param str key:
464 :param default: the default value of the config if it is not found
465 :return: str
466 """
467 r = self._ceph_get_module_option(key)
468 if r is None:
469 return self.MODULE_OPTION_DEFAULTS.get(key, default)
470 else:
471 return r
472
473 def get_ceph_option(self, key):
474 return self._ceph_get_option(key)
475
476 def get_store(self, key):
477 """
478 Retrieve the value of a persistent KV store entry
479
480 :param key: String
481 :return: Byte string or None
482 """
483 return self._ceph_get_store(key)
484
485 def get_active_uri(self):
486 return self._ceph_get_active_uri()
487
488 def get_localized_module_option(self, key, default=None):
489 r = self._ceph_get_module_option(key, self.get_mgr_id())
490 if r is None:
491 return self.MODULE_OPTION_DEFAULTS.get(key, default)
492 else:
493 return r
494
495
496 class MgrModule(ceph_module.BaseMgrModule):
497 COMMANDS = []
498 MODULE_OPTIONS = []
499 MODULE_OPTION_DEFAULTS = {}
500
501 # Priority definitions for perf counters
502 PRIO_CRITICAL = 10
503 PRIO_INTERESTING = 8
504 PRIO_USEFUL = 5
505 PRIO_UNINTERESTING = 2
506 PRIO_DEBUGONLY = 0
507
508 # counter value types
509 PERFCOUNTER_TIME = 1
510 PERFCOUNTER_U64 = 2
511
512 # counter types
513 PERFCOUNTER_LONGRUNAVG = 4
514 PERFCOUNTER_COUNTER = 8
515 PERFCOUNTER_HISTOGRAM = 0x10
516 PERFCOUNTER_TYPE_MASK = ~3
517
518 # units supported
519 BYTES = 0
520 NONE = 1
521
522 # Cluster log priorities
523 CLUSTER_LOG_PRIO_DEBUG = 0
524 CLUSTER_LOG_PRIO_INFO = 1
525 CLUSTER_LOG_PRIO_SEC = 2
526 CLUSTER_LOG_PRIO_WARN = 3
527 CLUSTER_LOG_PRIO_ERROR = 4
528
529 def __init__(self, module_name, py_modules_ptr, this_ptr):
530 self.module_name = module_name
531
532 # If we're taking over from a standby module, let's make sure
533 # its logger was unconfigured before we hook ours up
534 unconfigure_logger()
535 self._logger = configure_logger(self, module_name)
536
537 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
538
539 self._version = self._ceph_get_version()
540
541 self._perf_schema_cache = None
542
543 # Keep a librados instance for those that need it.
544 self._rados = None
545
546 for o in self.MODULE_OPTIONS:
547 if 'default' in o:
548 if 'type' in o:
549 # we'll assume the declared type matches the
550 # supplied default value's type.
551 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
552 else:
553 # module not declaring it's type, so normalize the
554 # default value to be a string for consistent behavior
555 # with default and user-supplied option values.
556 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
557
558 def __del__(self):
559 unconfigure_logger()
560
561 @classmethod
562 def _register_commands(cls):
563 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
564
565 @property
566 def log(self):
567 return self._logger
568
569 def cluster_log(self, channel, priority, message):
570 """
571 :param channel: The log channel. This can be 'cluster', 'audit', ...
572 :type channel: str
573 :param priority: The log message priority. This can be
574 CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
575 CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
576 CLUSTER_LOG_PRIO_ERROR.
577 :type priority: int
578 :param message: The message to log.
579 :type message: str
580 """
581 self._ceph_cluster_log(channel, priority, message)
582
583 @property
584 def version(self):
585 return self._version
586
587 def get_context(self):
588 """
589 :return: a Python capsule containing a C++ CephContext pointer
590 """
591 return self._ceph_get_context()
592
593 def notify(self, notify_type, notify_id):
594 """
595 Called by the ceph-mgr service to notify the Python plugin
596 that new state is available.
597
598 :param notify_type: string indicating what kind of notification,
599 such as osd_map, mon_map, fs_map, mon_status,
600 health, pg_summary, command, service_map
601 :param notify_id: string (may be empty) that optionally specifies
602 which entity is being notified about. With
603 "command" notifications this is set to the tag
604 ``from send_command``.
605 """
606 pass
607
608 def config_notify(self):
609 """
610 Called by the ceph-mgr service to notify the Python plugin
611 that the configuration may have changed. Modules will want to
612 refresh any configuration values stored in config variables.
613 """
614 pass
615
616 def serve(self):
617 """
618 Called by the ceph-mgr service to start any server that
619 is provided by this Python plugin. The implementation
620 of this function should block until ``shutdown`` is called.
621
622 You *must* implement ``shutdown`` if you implement ``serve``
623 """
624 pass
625
626 def shutdown(self):
627 """
628 Called by the ceph-mgr service to request that this
629 module drop out of its serve() function. You do not
630 need to implement this if you do not implement serve()
631
632 :return: None
633 """
634 if self._rados:
635 self._rados.shutdown()
636
637 def get(self, data_name):
638 """
639 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
640
641 :param str data_name: Valid things to fetch are osd_crush_map_text,
642 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
643 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
644 health, mon_status, devices, device <devid>.
645
646 Note:
647 All these structures have their own JSON representations: experiment
648 or look at the C++ ``dump()`` methods to learn about them.
649 """
650 return self._ceph_get(data_name)
651
652 def _stattype_to_str(self, stattype):
653
654 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
655 if typeonly == 0:
656 return 'gauge'
657 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
658 # this lie matches the DaemonState decoding: only val, no counts
659 return 'counter'
660 if typeonly == self.PERFCOUNTER_COUNTER:
661 return 'counter'
662 if typeonly == self.PERFCOUNTER_HISTOGRAM:
663 return 'histogram'
664
665 return ''
666
667 def _perfpath_to_path_labels(self, daemon, path):
668 label_names = ("ceph_daemon",)
669 labels = (daemon,)
670
671 if daemon.startswith('rbd-mirror.'):
672 match = re.match(
673 r'^rbd_mirror_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
674 path
675 )
676 if match:
677 path = 'rbd_mirror_' + match.group(4)
678 pool = match.group(1)
679 namespace = match.group(2) or ''
680 image = match.group(3)
681 label_names += ('pool', 'namespace', 'image')
682 labels += (pool, namespace, image)
683
684 return path, label_names, labels,
685
686 def _perfvalue_to_value(self, stattype, value):
687 if stattype & self.PERFCOUNTER_TIME:
688 # Convert from ns to seconds
689 return value / 1000000000.0
690 else:
691 return value
692
693 def _unit_to_str(self, unit):
694 if unit == self.NONE:
695 return "/s"
696 elif unit == self.BYTES:
697 return "B/s"
698
699 @staticmethod
700 def to_pretty_iec(n):
701 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
702 (20, 'Mi'), (10, 'Ki')]:
703 if n > 10 << bits:
704 return str(n >> bits) + ' ' + suffix
705 return str(n) + ' '
706
707 @staticmethod
708 def get_pretty_row(elems, width):
709 """
710 Takes an array of elements and returns a string with those elements
711 formatted as a table row. Useful for polling modules.
712
713 :param elems: the elements to be printed
714 :param width: the width of the terminal
715 """
716 n = len(elems)
717 column_width = int(width / n)
718
719 ret = '|'
720 for elem in elems:
721 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
722
723 return ret
724
725 def get_pretty_header(self, elems, width):
726 """
727 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
728
729 :param elems: the elements to be printed
730 :param width: the width of the terminal
731 """
732 n = len(elems)
733 column_width = int(width / n)
734
735 # dash line
736 ret = '+'
737 for i in range(0, n):
738 ret += '-' * (column_width - 1) + '+'
739 ret += '\n'
740
741 # title
742 ret += self.get_pretty_row(elems, width)
743 ret += '\n'
744
745 # dash line
746 ret += '+'
747 for i in range(0, n):
748 ret += '-' * (column_width - 1) + '+'
749 ret += '\n'
750
751 return ret
752
753 def get_server(self, hostname):
754 """
755 Called by the plugin to fetch metadata about a particular hostname from
756 ceph-mgr.
757
758 This is information that ceph-mgr has gleaned from the daemon metadata
759 reported by daemons running on a particular server.
760
761 :param hostname: a hostname
762 """
763 return self._ceph_get_server(hostname)
764
765 def get_perf_schema(self, svc_type, svc_name):
766 """
767 Called by the plugin to fetch perf counter schema info.
768 svc_name can be nullptr, as can svc_type, in which case
769 they are wildcards
770
771 :param str svc_type:
772 :param str svc_name:
773 :return: list of dicts describing the counters requested
774 """
775 return self._ceph_get_perf_schema(svc_type, svc_name)
776
777 def get_counter(self, svc_type, svc_name, path):
778 """
779 Called by the plugin to fetch the latest performance counter data for a
780 particular counter on a particular service.
781
782 :param str svc_type:
783 :param str svc_name:
784 :param str path: a period-separated concatenation of the subsystem and the
785 counter name, for example "mds.inodes".
786 :return: A list of two-tuples of (timestamp, value) is returned. This may be
787 empty if no data is available.
788 """
789 return self._ceph_get_counter(svc_type, svc_name, path)
790
791 def get_latest_counter(self, svc_type, svc_name, path):
792 """
793 Called by the plugin to fetch only the newest performance counter data
794 pointfor a particular counter on a particular service.
795
796 :param str svc_type:
797 :param str svc_name:
798 :param str path: a period-separated concatenation of the subsystem and the
799 counter name, for example "mds.inodes".
800 :return: A list of two-tuples of (timestamp, value) is returned. This may be
801 empty if no data is available.
802 """
803 return self._ceph_get_latest_counter(svc_type, svc_name, path)
804
805 def list_servers(self):
806 """
807 Like ``get_server``, but gives information about all servers (i.e. all
808 unique hostnames that have been mentioned in daemon metadata)
809
810 :return: a list of information about all servers
811 :rtype: list
812 """
813 return self._ceph_get_server(None)
814
815 def get_metadata(self, svc_type, svc_id):
816 """
817 Fetch the daemon metadata for a particular service.
818
819 ceph-mgr fetches metadata asynchronously, so are windows of time during
820 addition/removal of services where the metadata is not available to
821 modules. ``None`` is returned if no metadata is available.
822
823 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
824 :param str svc_id: service id. convert OSD integer IDs to strings when
825 calling this
826 :rtype: dict, or None if no metadata found
827 """
828 return self._ceph_get_metadata(svc_type, svc_id)
829
830 def get_daemon_status(self, svc_type, svc_id):
831 """
832 Fetch the latest status for a particular service daemon.
833
834 This method may return ``None`` if no status information is
835 available, for example because the daemon hasn't fully started yet.
836
837 :param svc_type: string (e.g., 'rgw')
838 :param svc_id: string
839 :return: dict, or None if the service is not found
840 """
841 return self._ceph_get_daemon_status(svc_type, svc_id)
842
843 def mon_command(self, cmd_dict):
844 """
845 Helper for modules that do simple, synchronous mon command
846 execution.
847
848 See send_command for general case.
849
850 :return: status int, out std, err str
851 """
852
853 t1 = time.time()
854 result = CommandResult()
855 self.send_command(result, "mon", "", json.dumps(cmd_dict), "")
856 r = result.wait()
857 t2 = time.time()
858
859 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
860 cmd_dict['prefix'], r[0], t2 - t1
861 ))
862
863 return r
864
865 def send_command(self, *args, **kwargs):
866 """
867 Called by the plugin to send a command to the mon
868 cluster.
869
870 :param CommandResult result: an instance of the ``CommandResult``
871 class, defined in the same module as MgrModule. This acts as a
872 completion and stores the output of the command. Use
873 ``CommandResult.wait()`` if you want to block on completion.
874 :param str svc_type:
875 :param str svc_id:
876 :param str command: a JSON-serialized command. This uses the same
877 format as the ceph command line, which is a dictionary of command
878 arguments, with the extra ``prefix`` key containing the command
879 name itself. Consult MonCommands.h for available commands and
880 their expected arguments.
881 :param str tag: used for nonblocking operation: when a command
882 completes, the ``notify()`` callback on the MgrModule instance is
883 triggered, with notify_type set to "command", and notify_id set to
884 the tag of the command.
885 """
886 self._ceph_send_command(*args, **kwargs)
887
888 def set_health_checks(self, checks):
889 """
890 Set the module's current map of health checks. Argument is a
891 dict of check names to info, in this form:
892
893 ::
894
895 {
896 'CHECK_FOO': {
897 'severity': 'warning', # or 'error'
898 'summary': 'summary string',
899 'detail': [ 'list', 'of', 'detail', 'strings' ],
900 },
901 'CHECK_BAR': {
902 'severity': 'error',
903 'summary': 'bars are bad',
904 'detail': [ 'too hard' ],
905 },
906 }
907
908 :param list: dict of health check dicts
909 """
910 self._ceph_set_health_checks(checks)
911
912 def _handle_command(self, inbuf, cmd):
913 if cmd['prefix'] not in CLICommand.COMMANDS:
914 return self.handle_command(inbuf, cmd)
915 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
916
917 def handle_command(self, inbuf, cmd):
918 """
919 Called by ceph-mgr to request the plugin to handle one
920 of the commands that it declared in self.COMMANDS
921
922 Return a status code, an output buffer, and an
923 output string. The output buffer is for data results,
924 the output string is for informative text.
925
926 :param inbuf: content of any "-i <file>" supplied to ceph cli
927 :type inbuf: str
928 :param cmd: from Ceph's cmdmap_t
929 :type cmd: dict
930
931 :return: HandleCommandResult or a 3-tuple of (int, str, str)
932 """
933
934 # Should never get called if they didn't declare
935 # any ``COMMANDS``
936 raise NotImplementedError()
937
938 def get_mgr_id(self):
939 """
940 Retrieve the name of the manager daemon where this plugin
941 is currently being executed (i.e. the active manager).
942
943 :return: str
944 """
945 return self._ceph_get_mgr_id()
946
947 def get_ceph_option(self, key):
948 return self._ceph_get_option(key)
949
950 def _validate_module_option(self, key):
951 """
952 Helper: don't allow get/set config callers to
953 access config options that they didn't declare
954 in their schema.
955 """
956 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
957 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
958 format(key, self.__class__.__name__))
959
960 def _get_module_option(self, key, default, localized_prefix=""):
961 r = self._ceph_get_module_option(self.module_name, key,
962 localized_prefix)
963 if r is None:
964 return self.MODULE_OPTION_DEFAULTS.get(key, default)
965 else:
966 return r
967
968 def get_module_option(self, key, default=None):
969 """
970 Retrieve the value of a persistent configuration setting
971
972 :param str key:
973 :param str default:
974 :return: str
975 """
976 self._validate_module_option(key)
977 return self._get_module_option(key, default)
978
979 def get_module_option_ex(self, module, key, default=None):
980 """
981 Retrieve the value of a persistent configuration setting
982 for the specified module.
983
984 :param str module: The name of the module, e.g. 'dashboard'
985 or 'telemetry'.
986 :param str key: The configuration key, e.g. 'server_addr'.
987 :param str,None default: The default value to use when the
988 returned value is ``None``. Defaults to ``None``.
989 :return: str,int,bool,float,None
990 """
991 if module == self.module_name:
992 self._validate_module_option(key)
993 r = self._ceph_get_module_option(module, key)
994 return default if r is None else r
995
996 def get_store_prefix(self, key_prefix):
997 """
998 Retrieve a dict of KV store keys to values, where the keys
999 have the given prefix
1000
1001 :param str key_prefix:
1002 :return: str
1003 """
1004 return self._ceph_get_store_prefix(key_prefix)
1005
1006 def _set_localized(self, key, val, setter):
1007 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1008
1009 def get_localized_module_option(self, key, default=None):
1010 """
1011 Retrieve localized configuration for this ceph-mgr instance
1012 :param str key:
1013 :param str default:
1014 :return: str
1015 """
1016 self._validate_module_option(key)
1017 return self._get_module_option(key, default, self.get_mgr_id())
1018
1019 def _set_module_option(self, key, val):
1020 return self._ceph_set_module_option(self.module_name, key, str(val))
1021
1022 def set_module_option(self, key, val):
1023 """
1024 Set the value of a persistent configuration setting
1025
1026 :param str key:
1027 :type val: str | None
1028 """
1029 self._validate_module_option(key)
1030 return self._set_module_option(key, val)
1031
1032 def set_module_option_ex(self, module, key, val):
1033 """
1034 Set the value of a persistent configuration setting
1035 for the specified module.
1036
1037 :param str module:
1038 :param str key:
1039 :param str val:
1040 """
1041 if module == self.module_name:
1042 self._validate_module_option(key)
1043 return self._ceph_set_module_option(module, key, str(val))
1044
1045 def set_localized_module_option(self, key, val):
1046 """
1047 Set localized configuration for this ceph-mgr instance
1048 :param str key:
1049 :param str val:
1050 :return: str
1051 """
1052 self._validate_module_option(key)
1053 return self._set_localized(key, val, self._set_module_option)
1054
1055 def set_store(self, key, val):
1056 """
1057 Set a value in this module's persistent key value store.
1058 If val is None, remove key from store
1059
1060 :param str key:
1061 :param str val:
1062 """
1063 self._ceph_set_store(key, val)
1064
1065 def get_store(self, key, default=None):
1066 """
1067 Get a value from this module's persistent key value store
1068 """
1069 r = self._ceph_get_store(key)
1070 if r is None:
1071 return default
1072 else:
1073 return r
1074
1075 def get_localized_store(self, key, default=None):
1076 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1077 if r is None:
1078 r = self._ceph_get_store(key)
1079 if r is None:
1080 r = default
1081 return r
1082
1083 def set_localized_store(self, key, val):
1084 return self._set_localized(key, val, self.set_store)
1085
1086 def self_test(self):
1087 """
1088 Run a self-test on the module. Override this function and implement
1089 a best as possible self-test for (automated) testing of the module
1090
1091 Indicate any failures by raising an exception. This does not have
1092 to be pretty, it's mainly for picking up regressions during
1093 development, rather than use in the field.
1094
1095 :return: None, or an advisory string for developer interest, such
1096 as a json dump of some state.
1097 """
1098 pass
1099
1100 def get_osdmap(self):
1101 """
1102 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1103 OSDMap.
1104 :return: OSDMap
1105 """
1106 return self._ceph_get_osdmap()
1107
1108 def get_latest(self, daemon_type, daemon_name, counter):
1109 data = self.get_latest_counter(
1110 daemon_type, daemon_name, counter)[counter]
1111 if data:
1112 return data[1]
1113 else:
1114 return 0
1115
1116 def get_latest_avg(self, daemon_type, daemon_name, counter):
1117 data = self.get_latest_counter(
1118 daemon_type, daemon_name, counter)[counter]
1119 if data:
1120 return data[1], data[2]
1121 else:
1122 return 0, 0
1123
1124 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL,
1125 services=("mds", "mon", "osd",
1126 "rbd-mirror", "rgw", "tcmu-runner")):
1127 """
1128 Return the perf counters currently known to this ceph-mgr
1129 instance, filtered by priority equal to or greater than `prio_limit`.
1130
1131 The result is a map of string to dict, associating services
1132 (like "osd.123") with their counters. The counter
1133 dict for each service maps counter paths to a counter
1134 info structure, which is the information from
1135 the schema, plus an additional "value" member with the latest
1136 value.
1137 """
1138
1139 result = defaultdict(dict)
1140
1141 for server in self.list_servers():
1142 for service in server['services']:
1143 if service['type'] not in services:
1144 continue
1145
1146 schema = self.get_perf_schema(service['type'], service['id'])
1147 if not schema:
1148 self.log.warn("No perf counter schema for {0}.{1}".format(
1149 service['type'], service['id']
1150 ))
1151 continue
1152
1153 # Value is returned in a potentially-multi-service format,
1154 # get just the service we're asking about
1155 svc_full_name = "{0}.{1}".format(
1156 service['type'], service['id'])
1157 schema = schema[svc_full_name]
1158
1159 # Populate latest values
1160 for counter_path, counter_schema in schema.items():
1161 # self.log.debug("{0}: {1}".format(
1162 # counter_path, json.dumps(counter_schema)
1163 # ))
1164 if counter_schema['priority'] < prio_limit:
1165 continue
1166
1167 counter_info = dict(counter_schema)
1168
1169 # Also populate count for the long running avgs
1170 if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
1171 v, c = self.get_latest_avg(
1172 service['type'],
1173 service['id'],
1174 counter_path
1175 )
1176 counter_info['value'], counter_info['count'] = v, c
1177 result[svc_full_name][counter_path] = counter_info
1178 else:
1179 counter_info['value'] = self.get_latest(
1180 service['type'],
1181 service['id'],
1182 counter_path
1183 )
1184
1185 result[svc_full_name][counter_path] = counter_info
1186
1187 self.log.debug("returning {0} counter".format(len(result)))
1188
1189 return result
1190
1191 def set_uri(self, uri):
1192 """
1193 If the module exposes a service, then call this to publish the
1194 address once it is available.
1195
1196 :return: a string
1197 """
1198 return self._ceph_set_uri(uri)
1199
1200 def have_mon_connection(self):
1201 """
1202 Check whether this ceph-mgr daemon has an open connection
1203 to a monitor. If it doesn't, then it's likely that the
1204 information we have about the cluster is out of date,
1205 and/or the monitor cluster is down.
1206 """
1207
1208 return self._ceph_have_mon_connection()
1209
1210 def update_progress_event(self, evid, desc, progress):
1211 return self._ceph_update_progress_event(str(evid), str(desc), float(progress))
1212
1213 def complete_progress_event(self, evid):
1214 return self._ceph_complete_progress_event(str(evid))
1215
1216 def clear_all_progress_events(self):
1217 return self._ceph_clear_all_progress_events()
1218
1219 @property
1220 def rados(self):
1221 """
1222 A librados instance to be shared by any classes within
1223 this mgr module that want one.
1224 """
1225 if self._rados:
1226 return self._rados
1227
1228 ctx_capsule = self.get_context()
1229 self._rados = rados.Rados(context=ctx_capsule)
1230 self._rados.connect()
1231
1232 return self._rados
1233
1234 @staticmethod
1235 def can_run():
1236 """
1237 Implement this function to report whether the module's dependencies
1238 are met. For example, if the module needs to import a particular
1239 dependency to work, then use a try/except around the import at
1240 file scope, and then report here if the import failed.
1241
1242 This will be called in a blocking way from the C++ code, so do not
1243 do any I/O that could block in this function.
1244
1245 :return a 2-tuple consisting of a boolean and explanatory string
1246 """
1247
1248 return True, ""
1249
1250 def remote(self, module_name, method_name, *args, **kwargs):
1251 """
1252 Invoke a method on another module. All arguments, and the return
1253 value from the other module must be serializable.
1254
1255 Limitation: Do not import any modules within the called method.
1256 Otherwise you will get an error in Python 2::
1257
1258 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1259
1260
1261
1262 :param module_name: Name of other module. If module isn't loaded,
1263 an ImportError exception is raised.
1264 :param method_name: Method name. If it does not exist, a NameError
1265 exception is raised.
1266 :param args: Argument tuple
1267 :param kwargs: Keyword argument dict
1268 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1269 :raises ImportError: No such module
1270 """
1271 return self._ceph_dispatch_remote(module_name, method_name,
1272 args, kwargs)
1273
1274 def add_osd_perf_query(self, query):
1275 """
1276 Register an OSD perf query. Argument is a
1277 dict of the query parameters, in this form:
1278
1279 ::
1280
1281 {
1282 'key_descriptor': [
1283 {'type': subkey_type, 'regex': regex_pattern},
1284 ...
1285 ],
1286 'performance_counter_descriptors': [
1287 list, of, descriptor, types
1288 ],
1289 'limit': {'order_by': performance_counter_type, 'max_count': n},
1290 }
1291
1292 Valid subkey types:
1293 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1294 'pg_id', 'object_name', 'snap_id'
1295 Valid performance counter types:
1296 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1297 'latency', 'write_latency', 'read_latency'
1298
1299 :param object query: query
1300 :rtype: int (query id)
1301 """
1302 return self._ceph_add_osd_perf_query(query)
1303
1304 def remove_osd_perf_query(self, query_id):
1305 """
1306 Unregister an OSD perf query.
1307
1308 :param int query_id: query ID
1309 """
1310 return self._ceph_remove_osd_perf_query(query_id)
1311
1312 def get_osd_perf_counters(self, query_id):
1313 """
1314 Get stats collected for an OSD perf query.
1315
1316 :param int query_id: query ID
1317 """
1318 return self._ceph_get_osd_perf_counters(query_id)
1319
1320
1321 class PersistentStoreDict(object):
1322 def __init__(self, mgr, prefix):
1323 # type: (MgrModule, str) -> None
1324 self.mgr = mgr
1325 self.prefix = prefix + '.'
1326
1327 def _mk_store_key(self, key):
1328 return self.prefix + key
1329
1330 def __missing__(self, key):
1331 # KeyError won't work for the `in` operator.
1332 # https://docs.python.org/3/reference/expressions.html#membership-test-details
1333 raise IndexError('PersistentStoreDict: "{}" not found'.format(key))
1334
1335 def clear(self):
1336 # Don't make any assumptions about the content of the values.
1337 for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
1338 k, _ = item
1339 self.mgr.set_store(k, None)
1340
1341 def __getitem__(self, item):
1342 # type: (str) -> Any
1343 key = self._mk_store_key(item)
1344 try:
1345 val = self.mgr.get_store(key)
1346 if val is None:
1347 self.__missing__(key)
1348 return json.loads(val)
1349 except (KeyError, AttributeError, IndexError, ValueError, TypeError):
1350 logging.getLogger(__name__).exception('failed to deserialize')
1351 self.mgr.set_store(key, None)
1352 raise
1353
1354 def __setitem__(self, item, value):
1355 # type: (str, Any) -> None
1356 """
1357 value=None is not allowed, as it will remove the key.
1358 """
1359 key = self._mk_store_key(item)
1360 self.mgr.set_store(key, json.dumps(value) if value is not None else None)
1361
1362 def __delitem__(self, item):
1363 self[item] = None
1364
1365 def __len__(self):
1366 return len(self.keys())
1367
1368 def items(self):
1369 # type: () -> Iterator[Tuple[str, Any]]
1370 prefix_len = len(self.prefix)
1371 try:
1372 for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
1373 k, v = item
1374 yield k[prefix_len:], json.loads(v)
1375 except (KeyError, AttributeError, IndexError, ValueError, TypeError):
1376 logging.getLogger(__name__).exception('failed to deserialize')
1377 self.clear()
1378
1379 def keys(self):
1380 # type: () -> Set[str]
1381 return {item[0] for item in self.items()}
1382
1383 def __iter__(self):
1384 return iter(self.keys())