]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/mgr_module.py
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
CommitLineData
3efd9988 1import ceph_module # noqa
3efd9988 2
7c673cae 3import logging
11fdf7f2 4import json
1adf2230 5import six
7c673cae 6import threading
11fdf7f2
TL
7from collections import defaultdict, namedtuple
8import rados
81eedcae 9import re
11fdf7f2
TL
10import time
11
12PG_STATES = [
13 "active",
14 "clean",
15 "down",
16 "recovery_unfound",
17 "backfill_unfound",
18 "scrubbing",
19 "degraded",
20 "inconsistent",
21 "peering",
22 "repair",
23 "recovering",
24 "forced_recovery",
25 "backfill_wait",
26 "incomplete",
27 "stale",
28 "remapped",
29 "deep",
30 "backfilling",
31 "forced_backfill",
32 "backfill_toofull",
33 "recovery_wait",
34 "recovery_toofull",
35 "undersized",
36 "activating",
37 "peered",
38 "snaptrim",
39 "snaptrim_wait",
40 "snaptrim_error",
41 "creating",
42 "unknown"]
3efd9988
FG
43
44
45class CPlusPlusHandler(logging.Handler):
46 def __init__(self, module_inst):
47 super(CPlusPlusHandler, self).__init__()
48 self._module = module_inst
49
50 def emit(self, record):
51 if record.levelno <= logging.DEBUG:
52 ceph_level = 20
53 elif record.levelno <= logging.INFO:
54 ceph_level = 4
55 elif record.levelno <= logging.WARNING:
56 ceph_level = 1
57 else:
58 ceph_level = 0
59
60 self._module._ceph_log(ceph_level, self.format(record))
61
62
11fdf7f2
TL
63def configure_logger(module_inst, module_name):
64 """
65 Create and configure the logger with the specified module.
3efd9988 66
11fdf7f2
TL
67 A handler will be added to the root logger which will redirect
68 the messages from all loggers (incl. 3rd party libraries) to the
69 Ceph log.
3efd9988 70
11fdf7f2
TL
71 :param module_inst: The module instance.
72 :type module_inst: instance
73 :param module_name: The module name.
74 :type module_name: str
75 :return: Return the logger with the specified name.
76 """
77 logger = logging.getLogger(module_name)
78 # Don't filter any logs at the python level, leave it to C++.
79 # FIXME: We should learn the log level from C++ land, and then
80 # avoid calling the C++ level log when we know a message
81 # is of an insufficient level to be ultimately output.
82 logger.setLevel(logging.DEBUG) # Don't use NOTSET
83
84 root_logger = logging.getLogger()
85 # Add handler to the root logger, thus this module and all
86 # 3rd party libraries will log their messages to the Ceph log.
87 root_logger.addHandler(CPlusPlusHandler(module_inst))
88 # Set the log level to ``ERROR`` to ensure that we only get
89 # those message from 3rd party libraries (only effective if
90 # they use the default log level ``NOTSET``).
91 # Check https://docs.python.org/3/library/logging.html#logging.Logger.setLevel
92 # for more information about how the effective log level is
93 # determined.
94 root_logger.setLevel(logging.ERROR)
3efd9988
FG
95
96 return logger
7c673cae
FG
97
98
11fdf7f2
TL
99def unconfigure_logger(module_name=None):
100 """
101 :param module_name: The module name. Defaults to ``None``.
102 :type module_name: str or None
103 """
104 logger = logging.getLogger(module_name)
105 rm_handlers = [
106 h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
3efd9988
FG
107 for h in rm_handlers:
108 logger.removeHandler(h)
109
11fdf7f2 110
7c673cae
FG
111class CommandResult(object):
112 """
113 Use with MgrModule.send_command
114 """
11fdf7f2
TL
115
116 def __init__(self, tag=None):
7c673cae
FG
117 self.ev = threading.Event()
118 self.outs = ""
119 self.outb = ""
120 self.r = 0
121
122 # This is just a convenience for notifications from
123 # C++ land, to avoid passing addresses around in messages.
11fdf7f2 124 self.tag = tag if tag else ""
7c673cae
FG
125
126 def complete(self, r, outb, outs):
127 self.r = r
128 self.outb = outb
129 self.outs = outs
130 self.ev.set()
131
132 def wait(self):
133 self.ev.wait()
134 return self.r, self.outb, self.outs
135
136
11fdf7f2
TL
137class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
138 def __new__(cls, retval=0, stdout="", stderr=""):
139 """
140 Tuple containing the result of `handle_command()`
141
142 Only write to stderr if there is an error, or in extraordinary circumstances
143
144 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
145 is critical information to include there.
146
147 Everything programmatically consumable should be put on stdout
148
149 :param retval: return code. E.g. 0 or -errno.EINVAL
150 :type retval: int
151 :param stdout: data of this result.
152 :type stdout: str
153 :param stderr: Typically used for error messages.
154 :type stderr: str
155 """
156 return super(HandleCommandResult, cls).__new__(cls, retval, stdout, stderr)
157
158
3efd9988
FG
159class OSDMap(ceph_module.BasePyOSDMap):
160 def get_epoch(self):
161 return self._get_epoch()
162
163 def get_crush_version(self):
164 return self._get_crush_version()
165
166 def dump(self):
167 return self._dump()
168
11fdf7f2
TL
169 def get_pools(self):
170 # FIXME: efficient implementation
171 d = self._dump()
172 return dict([(p['pool'], p) for p in d['pools']])
173
174 def get_pools_by_name(self):
175 # FIXME: efficient implementation
176 d = self._dump()
177 return dict([(p['pool_name'], p) for p in d['pools']])
178
3efd9988
FG
179 def new_incremental(self):
180 return self._new_incremental()
181
182 def apply_incremental(self, inc):
183 return self._apply_incremental(inc)
184
185 def get_crush(self):
186 return self._get_crush()
187
188 def get_pools_by_take(self, take):
189 return self._get_pools_by_take(take).get('pools', [])
190
191 def calc_pg_upmaps(self, inc,
11fdf7f2
TL
192 max_deviation=.01, max_iterations=10, pools=None):
193 if pools is None:
194 pools = []
3efd9988
FG
195 return self._calc_pg_upmaps(
196 inc,
197 max_deviation, max_iterations, pools)
198
199 def map_pool_pgs_up(self, poolid):
200 return self._map_pool_pgs_up(poolid)
201
11fdf7f2
TL
202 def pg_to_up_acting_osds(self, pool_id, ps):
203 return self._pg_to_up_acting_osds(pool_id, ps)
204
205 def pool_raw_used_rate(self, pool_id):
206 return self._pool_raw_used_rate(pool_id)
207
208 def get_ec_profile(self, name):
209 # FIXME: efficient implementation
210 d = self._dump()
211 return d['erasure_code_profiles'].get(name, None)
212
213
3efd9988
FG
214class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
215 def get_epoch(self):
216 return self._get_epoch()
217
218 def dump(self):
219 return self._dump()
220
221 def set_osd_reweights(self, weightmap):
222 """
223 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
224 """
225 return self._set_osd_reweights(weightmap)
226
227 def set_crush_compat_weight_set_weights(self, weightmap):
228 """
229 weightmap is a dict, int to float. devices only. e.g.,
230 { 0: 3.4, 1: 3.3, 2: 3.334 }
231 """
232 return self._set_crush_compat_weight_set_weights(weightmap)
233
11fdf7f2 234
3efd9988 235class CRUSHMap(ceph_module.BasePyCRUSH):
b32b8144 236 ITEM_NONE = 0x7fffffff
11fdf7f2 237 DEFAULT_CHOOSE_ARGS = '-1'
b32b8144 238
3efd9988
FG
239 def dump(self):
240 return self._dump()
241
242 def get_item_weight(self, item):
243 return self._get_item_weight(item)
244
245 def get_item_name(self, item):
246 return self._get_item_name(item)
247
248 def find_takes(self):
249 return self._find_takes().get('takes', [])
250
251 def get_take_weight_osd_map(self, root):
252 uglymap = self._get_take_weight_osd_map(root)
11fdf7f2
TL
253 return {int(k): v for k, v in six.iteritems(uglymap.get('weights', {}))}
254
255 @staticmethod
256 def have_default_choose_args(dump):
257 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
258
259 @staticmethod
260 def get_default_choose_args(dump):
261 return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
262
263 def get_rule(self, rule_name):
264 # TODO efficient implementation
265 for rule in self.dump()['rules']:
266 if rule_name == rule['rule_name']:
267 return rule
268
269 return None
270
271 def get_rule_by_id(self, rule_id):
272 for rule in self.dump()['rules']:
273 if rule['rule_id'] == rule_id:
274 return rule
275
276 return None
277
278 def get_rule_root(self, rule_name):
279 rule = self.get_rule(rule_name)
280 if rule is None:
281 return None
282
283 try:
284 first_take = [s for s in rule['steps'] if s['op'] == 'take'][0]
285 except IndexError:
286 self.log.warn("CRUSH rule '{0}' has no 'take' step".format(
287 rule_name))
288 return None
289 else:
290 return first_take['item']
291
292 def get_osds_under(self, root_id):
293 # TODO don't abuse dump like this
294 d = self.dump()
295 buckets = dict([(b['id'], b) for b in d['buckets']])
296
297 osd_list = []
298
299 def accumulate(b):
300 for item in b['items']:
301 if item['id'] >= 0:
302 osd_list.append(item['id'])
303 else:
304 try:
305 accumulate(buckets[item['id']])
306 except KeyError:
307 pass
308
309 accumulate(buckets[root_id])
310
311 return osd_list
312
313 def device_class_counts(self):
314 result = defaultdict(int)
315 # TODO don't abuse dump like this
316 d = self.dump()
317 for device in d['devices']:
318 cls = device.get('class', None)
319 result[cls] += 1
320
321 return dict(result)
322
323
324class CLICommand(object):
325 COMMANDS = {}
326
327 def __init__(self, prefix, args="", desc="", perm="rw"):
328 self.prefix = prefix
329 self.args = args
330 self.args_dict = {}
331 self.desc = desc
332 self.perm = perm
333 self.func = None
334 self._parse_args()
335
336 def _parse_args(self):
337 if not self.args:
338 return
339 args = self.args.split(" ")
340 for arg in args:
341 arg_desc = arg.strip().split(",")
342 arg_d = {}
343 for kv in arg_desc:
344 k, v = kv.split("=")
345 if k != "name":
346 arg_d[k] = v
347 else:
348 self.args_dict[v] = arg_d
349
350 def __call__(self, func):
351 self.func = func
352 self.COMMANDS[self.prefix] = self
353 return self.func
354
355 def call(self, mgr, cmd_dict, inbuf):
356 kwargs = {}
357 for a, d in self.args_dict.items():
358 if 'req' in d and d['req'] == "false" and a not in cmd_dict:
359 continue
360 kwargs[a.replace("-", "_")] = cmd_dict[a]
361 if inbuf:
362 kwargs['inbuf'] = inbuf
363 return self.func(mgr, **kwargs)
364
365 @classmethod
366 def dump_cmd_list(cls):
367 return [{
368 'cmd': '{} {}'.format(cmd.prefix, cmd.args),
369 'desc': cmd.desc,
370 'perm': cmd.perm
371 } for _, cmd in cls.COMMANDS.items()]
372
373
374def CLIReadCommand(prefix, args="", desc=""):
375 return CLICommand(prefix, args, desc, "r")
376
377
378def CLIWriteCommand(prefix, args="", desc=""):
379 return CLICommand(prefix, args, desc, "w")
380
381
382def _get_localized_key(prefix, key):
383 return '{}/{}'.format(prefix, key)
384
385
386class Option(dict):
387 """
388 Helper class to declare options for MODULE_OPTIONS list.
389
390 Caveat: it uses argument names matching Python keywords (type, min, max),
391 so any further processing should happen in a separate method.
392
393 TODO: type validation.
394 """
395
396 def __init__(
397 self, name,
398 default=None,
399 type='str',
400 desc=None, longdesc=None,
401 min=None, max=None,
402 enum_allowed=None,
403 see_also=None,
404 tags=None,
405 runtime=False,
406 ):
407 super(Option, self).__init__(
408 (k, v) for k, v in vars().items()
409 if k != 'self' and v is not None)
410
3efd9988
FG
411
412class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
413 """
414 Standby modules only implement a serve and shutdown method, they
415 are not permitted to implement commands and they do not receive
416 any notifications.
417
b32b8144 418 They only have access to the mgrmap (for accessing service URI info
3efd9988
FG
419 from their active peer), and to configuration settings (read only).
420 """
421
11fdf7f2
TL
422 MODULE_OPTIONS = []
423 MODULE_OPTION_DEFAULTS = {}
424
3efd9988
FG
425 def __init__(self, module_name, capsule):
426 super(MgrStandbyModule, self).__init__(capsule)
427 self.module_name = module_name
428 self._logger = configure_logger(self, module_name)
11fdf7f2
TL
429 # see also MgrModule.__init__()
430 for o in self.MODULE_OPTIONS:
431 if 'default' in o:
432 if 'type' in o:
433 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
434 else:
435 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
3efd9988
FG
436
437 def __del__(self):
11fdf7f2 438 unconfigure_logger()
3efd9988
FG
439
440 @property
441 def log(self):
442 return self._logger
443
444 def serve(self):
445 """
446 The serve method is mandatory for standby modules.
447 :return:
448 """
449 raise NotImplementedError()
450
451 def get_mgr_id(self):
452 return self._ceph_get_mgr_id()
453
11fdf7f2 454 def get_module_option(self, key, default=None):
b32b8144
FG
455 """
456 Retrieve the value of a persistent configuration setting
457
458 :param str key:
459 :param default: the default value of the config if it is not found
460 :return: str
461 """
11fdf7f2 462 r = self._ceph_get_module_option(key)
b32b8144 463 if r is None:
11fdf7f2 464 return self.MODULE_OPTION_DEFAULTS.get(key, default)
b32b8144
FG
465 else:
466 return r
467
11fdf7f2
TL
468 def get_ceph_option(self, key):
469 return self._ceph_get_option(key)
470
471 def get_store(self, key):
472 """
473 Retrieve the value of a persistent KV store entry
474
475 :param key: String
476 :return: Byte string or None
477 """
478 return self._ceph_get_store(key)
3efd9988
FG
479
480 def get_active_uri(self):
481 return self._ceph_get_active_uri()
482
11fdf7f2
TL
483 def get_localized_module_option(self, key, default=None):
484 r = self._ceph_get_module_option(key, self.get_mgr_id())
3efd9988 485 if r is None:
11fdf7f2
TL
486 return self.MODULE_OPTION_DEFAULTS.get(key, default)
487 else:
488 return r
3efd9988 489
3efd9988
FG
490
491class MgrModule(ceph_module.BaseMgrModule):
7c673cae 492 COMMANDS = []
11fdf7f2
TL
493 MODULE_OPTIONS = []
494 MODULE_OPTION_DEFAULTS = {}
7c673cae 495
3efd9988
FG
496 # Priority definitions for perf counters
497 PRIO_CRITICAL = 10
498 PRIO_INTERESTING = 8
499 PRIO_USEFUL = 5
500 PRIO_UNINTERESTING = 2
501 PRIO_DEBUGONLY = 0
502
503 # counter value types
504 PERFCOUNTER_TIME = 1
505 PERFCOUNTER_U64 = 2
506
507 # counter types
508 PERFCOUNTER_LONGRUNAVG = 4
509 PERFCOUNTER_COUNTER = 8
510 PERFCOUNTER_HISTOGRAM = 0x10
28e407b8 511 PERFCOUNTER_TYPE_MASK = ~3
7c673cae 512
1adf2230
AA
513 # units supported
514 BYTES = 0
515 NONE = 1
11fdf7f2
TL
516
517 # Cluster log priorities
518 CLUSTER_LOG_PRIO_DEBUG = 0
519 CLUSTER_LOG_PRIO_INFO = 1
520 CLUSTER_LOG_PRIO_SEC = 2
521 CLUSTER_LOG_PRIO_WARN = 3
522 CLUSTER_LOG_PRIO_ERROR = 4
523
3efd9988
FG
524 def __init__(self, module_name, py_modules_ptr, this_ptr):
525 self.module_name = module_name
7c673cae 526
3efd9988
FG
527 # If we're taking over from a standby module, let's make sure
528 # its logger was unconfigured before we hook ours up
11fdf7f2 529 unconfigure_logger()
3efd9988 530 self._logger = configure_logger(self, module_name)
7c673cae 531
3efd9988 532 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
7c673cae 533
3efd9988 534 self._version = self._ceph_get_version()
7c673cae 535
3efd9988 536 self._perf_schema_cache = None
7c673cae 537
11fdf7f2
TL
538 # Keep a librados instance for those that need it.
539 self._rados = None
540
541 for o in self.MODULE_OPTIONS:
542 if 'default' in o:
543 if 'type' in o:
544 # we'll assume the declared type matches the
545 # supplied default value's type.
546 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
547 else:
548 # module not declaring it's type, so normalize the
549 # default value to be a string for consistent behavior
550 # with default and user-supplied option values.
551 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
552
3efd9988 553 def __del__(self):
11fdf7f2 554 unconfigure_logger()
3efd9988 555
11fdf7f2
TL
556 @classmethod
557 def _register_commands(cls):
558 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
7c673cae
FG
559
560 @property
561 def log(self):
562 return self._logger
563
11fdf7f2
TL
564 def cluster_log(self, channel, priority, message):
565 """
566 :param channel: The log channel. This can be 'cluster', 'audit', ...
567 :type channel: str
568 :param priority: The log message priority. This can be
569 CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
570 CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
571 CLUSTER_LOG_PRIO_ERROR.
572 :type priority: int
573 :param message: The message to log.
574 :type message: str
575 """
576 self._ceph_cluster_log(channel, priority, message)
577
7c673cae
FG
578 @property
579 def version(self):
580 return self._version
581
3efd9988
FG
582 def get_context(self):
583 """
584 :return: a Python capsule containing a C++ CephContext pointer
585 """
586 return self._ceph_get_context()
587
7c673cae
FG
588 def notify(self, notify_type, notify_id):
589 """
590 Called by the ceph-mgr service to notify the Python plugin
591 that new state is available.
11fdf7f2
TL
592
593 :param notify_type: string indicating what kind of notification,
594 such as osd_map, mon_map, fs_map, mon_status,
595 health, pg_summary, command, service_map
596 :param notify_id: string (may be empty) that optionally specifies
597 which entity is being notified about. With
598 "command" notifications this is set to the tag
599 ``from send_command``.
600 """
601 pass
602
603 def config_notify(self):
604 """
605 Called by the ceph-mgr service to notify the Python plugin
606 that the configuration may have changed. Modules will want to
607 refresh any configuration values stored in config variables.
7c673cae
FG
608 """
609 pass
610
611 def serve(self):
612 """
613 Called by the ceph-mgr service to start any server that
614 is provided by this Python plugin. The implementation
615 of this function should block until ``shutdown`` is called.
616
617 You *must* implement ``shutdown`` if you implement ``serve``
618 """
619 pass
620
621 def shutdown(self):
622 """
623 Called by the ceph-mgr service to request that this
624 module drop out of its serve() function. You do not
625 need to implement this if you do not implement serve()
626
627 :return: None
628 """
11fdf7f2
TL
629 if self._rados:
630 self._rados.shutdown()
7c673cae
FG
631
632 def get(self, data_name):
633 """
11fdf7f2
TL
634 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
635
636 :param str data_name: Valid things to fetch are osd_crush_map_text,
637 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
638 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
639 health, mon_status, devices, device <devid>.
640
641 Note:
642 All these structures have their own JSON representations: experiment
643 or look at the C++ ``dump()`` methods to learn about them.
7c673cae 644 """
3efd9988 645 return self._ceph_get(data_name)
7c673cae 646
94b18763 647 def _stattype_to_str(self, stattype):
11fdf7f2 648
94b18763
FG
649 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
650 if typeonly == 0:
651 return 'gauge'
652 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
653 # this lie matches the DaemonState decoding: only val, no counts
654 return 'counter'
655 if typeonly == self.PERFCOUNTER_COUNTER:
656 return 'counter'
657 if typeonly == self.PERFCOUNTER_HISTOGRAM:
658 return 'histogram'
11fdf7f2 659
94b18763
FG
660 return ''
661
81eedcae
TL
662 def _perfpath_to_path_labels(self, daemon, path):
663 label_names = ("ceph_daemon",)
664 labels = (daemon,)
665
666 if daemon.startswith('rbd-mirror.'):
667 match = re.match(
668 r'^rbd_mirror_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
669 path
670 )
671 if match:
672 path = 'rbd_mirror_' + match.group(4)
673 pool = match.group(1)
674 namespace = match.group(2) or ''
675 image = match.group(3)
676 label_names += ('pool', 'namespace', 'image')
677 labels += (pool, namespace, image)
678
679 return path, label_names, labels,
680
28e407b8
AA
681 def _perfvalue_to_value(self, stattype, value):
682 if stattype & self.PERFCOUNTER_TIME:
683 # Convert from ns to seconds
684 return value / 1000000000.0
685 else:
686 return value
687
1adf2230
AA
688 def _unit_to_str(self, unit):
689 if unit == self.NONE:
690 return "/s"
691 elif unit == self.BYTES:
11fdf7f2
TL
692 return "B/s"
693
694 @staticmethod
695 def to_pretty_iec(n):
696 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
697 (20, 'Mi'), (10, 'Ki')]:
698 if n > 10 << bits:
699 return str(n >> bits) + ' ' + suffix
700 return str(n) + ' '
701
702 @staticmethod
703 def get_pretty_row(elems, width):
704 """
705 Takes an array of elements and returns a string with those elements
706 formatted as a table row. Useful for polling modules.
707
708 :param elems: the elements to be printed
709 :param width: the width of the terminal
710 """
711 n = len(elems)
712 column_width = int(width / n)
713
714 ret = '|'
715 for elem in elems:
716 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
717
718 return ret
719
720 def get_pretty_header(self, elems, width):
721 """
722 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
723
724 :param elems: the elements to be printed
725 :param width: the width of the terminal
726 """
727 n = len(elems)
728 column_width = int(width / n)
729
730 # dash line
731 ret = '+'
732 for i in range(0, n):
733 ret += '-' * (column_width - 1) + '+'
734 ret += '\n'
735
736 # title
737 ret += self.get_pretty_row(elems, width)
738 ret += '\n'
739
740 # dash line
741 ret += '+'
742 for i in range(0, n):
743 ret += '-' * (column_width - 1) + '+'
744 ret += '\n'
745
746 return ret
747
7c673cae
FG
748 def get_server(self, hostname):
749 """
11fdf7f2
TL
750 Called by the plugin to fetch metadata about a particular hostname from
751 ceph-mgr.
752
753 This is information that ceph-mgr has gleaned from the daemon metadata
754 reported by daemons running on a particular server.
7c673cae 755
11fdf7f2 756 :param hostname: a hostname
7c673cae 757 """
3efd9988 758 return self._ceph_get_server(hostname)
7c673cae 759
c07f9fc5
FG
760 def get_perf_schema(self, svc_type, svc_name):
761 """
762 Called by the plugin to fetch perf counter schema info.
763 svc_name can be nullptr, as can svc_type, in which case
764 they are wildcards
765
11fdf7f2
TL
766 :param str svc_type:
767 :param str svc_name:
c07f9fc5
FG
768 :return: list of dicts describing the counters requested
769 """
3efd9988 770 return self._ceph_get_perf_schema(svc_type, svc_name)
c07f9fc5 771
7c673cae
FG
772 def get_counter(self, svc_type, svc_name, path):
773 """
11fdf7f2
TL
774 Called by the plugin to fetch the latest performance counter data for a
775 particular counter on a particular service.
7c673cae 776
11fdf7f2
TL
777 :param str svc_type:
778 :param str svc_name:
779 :param str path: a period-separated concatenation of the subsystem and the
780 counter name, for example "mds.inodes".
781 :return: A list of two-tuples of (timestamp, value) is returned. This may be
782 empty if no data is available.
7c673cae 783 """
3efd9988 784 return self._ceph_get_counter(svc_type, svc_name, path)
7c673cae 785
11fdf7f2
TL
786 def get_latest_counter(self, svc_type, svc_name, path):
787 """
788 Called by the plugin to fetch only the newest performance counter data
789 pointfor a particular counter on a particular service.
790
791 :param str svc_type:
792 :param str svc_name:
793 :param str path: a period-separated concatenation of the subsystem and the
794 counter name, for example "mds.inodes".
795 :return: A list of two-tuples of (timestamp, value) is returned. This may be
796 empty if no data is available.
797 """
798 return self._ceph_get_latest_counter(svc_type, svc_name, path)
799
7c673cae
FG
800 def list_servers(self):
801 """
11fdf7f2
TL
802 Like ``get_server``, but gives information about all servers (i.e. all
803 unique hostnames that have been mentioned in daemon metadata)
804
805 :return: a list of information about all servers
806 :rtype: list
7c673cae 807 """
3efd9988 808 return self._ceph_get_server(None)
7c673cae
FG
809
810 def get_metadata(self, svc_type, svc_id):
811 """
11fdf7f2 812 Fetch the daemon metadata for a particular service.
7c673cae 813
11fdf7f2
TL
814 ceph-mgr fetches metadata asynchronously, so are windows of time during
815 addition/removal of services where the metadata is not available to
816 modules. ``None`` is returned if no metadata is available.
817
818 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
819 :param str svc_id: service id. convert OSD integer IDs to strings when
820 calling this
821 :rtype: dict, or None if no metadata found
7c673cae 822 """
3efd9988 823 return self._ceph_get_metadata(svc_type, svc_id)
7c673cae 824
224ce89b
WB
825 def get_daemon_status(self, svc_type, svc_id):
826 """
827 Fetch the latest status for a particular service daemon.
828
11fdf7f2
TL
829 This method may return ``None`` if no status information is
830 available, for example because the daemon hasn't fully started yet.
831
224ce89b
WB
832 :param svc_type: string (e.g., 'rgw')
833 :param svc_id: string
11fdf7f2 834 :return: dict, or None if the service is not found
224ce89b 835 """
3efd9988 836 return self._ceph_get_daemon_status(svc_type, svc_id)
224ce89b 837
11fdf7f2
TL
838 def mon_command(self, cmd_dict):
839 """
840 Helper for modules that do simple, synchronous mon command
841 execution.
842
843 See send_command for general case.
844
845 :return: status int, out std, err str
846 """
847
848 t1 = time.time()
849 result = CommandResult()
850 self.send_command(result, "mon", "", json.dumps(cmd_dict), "")
851 r = result.wait()
852 t2 = time.time()
853
854 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
855 cmd_dict['prefix'], r[0], t2 - t1
856 ))
857
858 return r
859
7c673cae
FG
860 def send_command(self, *args, **kwargs):
861 """
862 Called by the plugin to send a command to the mon
863 cluster.
11fdf7f2
TL
864
865 :param CommandResult result: an instance of the ``CommandResult``
866 class, defined in the same module as MgrModule. This acts as a
867 completion and stores the output of the command. Use
868 ``CommandResult.wait()`` if you want to block on completion.
869 :param str svc_type:
870 :param str svc_id:
871 :param str command: a JSON-serialized command. This uses the same
872 format as the ceph command line, which is a dictionary of command
873 arguments, with the extra ``prefix`` key containing the command
874 name itself. Consult MonCommands.h for available commands and
875 their expected arguments.
876 :param str tag: used for nonblocking operation: when a command
877 completes, the ``notify()`` callback on the MgrModule instance is
878 triggered, with notify_type set to "command", and notify_id set to
879 the tag of the command.
7c673cae 880 """
3efd9988 881 self._ceph_send_command(*args, **kwargs)
7c673cae 882
c07f9fc5
FG
883 def set_health_checks(self, checks):
884 """
c07f9fc5
FG
885 Set the module's current map of health checks. Argument is a
886 dict of check names to info, in this form:
887
11fdf7f2
TL
888 ::
889
c07f9fc5
FG
890 {
891 'CHECK_FOO': {
892 'severity': 'warning', # or 'error'
893 'summary': 'summary string',
894 'detail': [ 'list', 'of', 'detail', 'strings' ],
895 },
896 'CHECK_BAR': {
897 'severity': 'error',
898 'summary': 'bars are bad',
899 'detail': [ 'too hard' ],
900 },
901 }
902
903 :param list: dict of health check dicts
904 """
3efd9988 905 self._ceph_set_health_checks(checks)
c07f9fc5 906
11fdf7f2
TL
907 def _handle_command(self, inbuf, cmd):
908 if cmd['prefix'] not in CLICommand.COMMANDS:
909 return self.handle_command(inbuf, cmd)
910 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
911
912 def handle_command(self, inbuf, cmd):
7c673cae
FG
913 """
914 Called by ceph-mgr to request the plugin to handle one
915 of the commands that it declared in self.COMMANDS
916
917 Return a status code, an output buffer, and an
918 output string. The output buffer is for data results,
919 the output string is for informative text.
920
11fdf7f2
TL
921 :param inbuf: content of any "-i <file>" supplied to ceph cli
922 :type inbuf: str
923 :param cmd: from Ceph's cmdmap_t
924 :type cmd: dict
7c673cae 925
11fdf7f2 926 :return: HandleCommandResult or a 3-tuple of (int, str, str)
7c673cae
FG
927 """
928
929 # Should never get called if they didn't declare
930 # any ``COMMANDS``
931 raise NotImplementedError()
932
31f18b77
FG
933 def get_mgr_id(self):
934 """
11fdf7f2
TL
935 Retrieve the name of the manager daemon where this plugin
936 is currently being executed (i.e. the active manager).
31f18b77
FG
937
938 :return: str
939 """
3efd9988 940 return self._ceph_get_mgr_id()
31f18b77 941
11fdf7f2
TL
942 def get_ceph_option(self, key):
943 return self._ceph_get_option(key)
7c673cae 944
11fdf7f2
TL
945 def _validate_module_option(self, key):
946 """
947 Helper: don't allow get/set config callers to
948 access config options that they didn't declare
949 in their schema.
7c673cae 950 """
11fdf7f2
TL
951 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
952 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
953 format(key, self.__class__.__name__))
954
955 def _get_module_option(self, key, default, localized_prefix=""):
956 r = self._ceph_get_module_option(self.module_name, key,
957 localized_prefix)
3efd9988 958 if r is None:
11fdf7f2 959 return self.MODULE_OPTION_DEFAULTS.get(key, default)
3efd9988
FG
960 else:
961 return r
7c673cae 962
11fdf7f2
TL
963 def get_module_option(self, key, default=None):
964 """
965 Retrieve the value of a persistent configuration setting
966
967 :param str key:
968 :param str default:
969 :return: str
970 """
971 self._validate_module_option(key)
972 return self._get_module_option(key, default)
973
974 def get_module_option_ex(self, module, key, default=None):
975 """
976 Retrieve the value of a persistent configuration setting
977 for the specified module.
978
979 :param str module: The name of the module, e.g. 'dashboard'
980 or 'telemetry'.
981 :param str key: The configuration key, e.g. 'server_addr'.
982 :param str,None default: The default value to use when the
983 returned value is ``None``. Defaults to ``None``.
984 :return: str,int,bool,float,None
985 """
986 if module == self.module_name:
987 self._validate_module_option(key)
988 r = self._ceph_get_module_option(module, key)
989 return default if r is None else r
990
991 def get_store_prefix(self, key_prefix):
31f18b77 992 """
11fdf7f2
TL
993 Retrieve a dict of KV store keys to values, where the keys
994 have the given prefix
31f18b77 995
11fdf7f2 996 :param str key_prefix:
31f18b77
FG
997 :return: str
998 """
11fdf7f2 999 return self._ceph_get_store_prefix(key_prefix)
31f18b77 1000
11fdf7f2
TL
1001 def _set_localized(self, key, val, setter):
1002 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1003
1004 def get_localized_module_option(self, key, default=None):
224ce89b
WB
1005 """
1006 Retrieve localized configuration for this ceph-mgr instance
11fdf7f2
TL
1007 :param str key:
1008 :param str default:
224ce89b
WB
1009 :return: str
1010 """
11fdf7f2
TL
1011 self._validate_module_option(key)
1012 return self._get_module_option(key, default, self.get_mgr_id())
224ce89b 1013
11fdf7f2
TL
1014 def _set_module_option(self, key, val):
1015 return self._ceph_set_module_option(self.module_name, key, str(val))
224ce89b 1016
11fdf7f2 1017 def set_module_option(self, key, val):
7c673cae
FG
1018 """
1019 Set the value of a persistent configuration setting
1020
11fdf7f2
TL
1021 :param str key:
1022 :type val: str | None
7c673cae 1023 """
11fdf7f2
TL
1024 self._validate_module_option(key)
1025 return self._set_module_option(key, val)
7c673cae 1026
11fdf7f2
TL
1027 def set_module_option_ex(self, module, key, val):
1028 """
1029 Set the value of a persistent configuration setting
1030 for the specified module.
1031
1032 :param str module:
1033 :param str key:
1034 :param str val:
1035 """
1036 if module == self.module_name:
1037 self._validate_module_option(key)
1038 return self._ceph_set_module_option(module, key, str(val))
1039
1040 def set_localized_module_option(self, key, val):
224ce89b
WB
1041 """
1042 Set localized configuration for this ceph-mgr instance
11fdf7f2
TL
1043 :param str key:
1044 :param str val:
224ce89b
WB
1045 :return: str
1046 """
11fdf7f2
TL
1047 self._validate_module_option(key)
1048 return self._set_localized(key, val, self._set_module_option)
224ce89b 1049
11fdf7f2 1050 def set_store(self, key, val):
7c673cae 1051 """
11fdf7f2
TL
1052 Set a value in this module's persistent key value store.
1053 If val is None, remove key from store
7c673cae 1054
11fdf7f2
TL
1055 :param str key:
1056 :param str val:
7c673cae 1057 """
11fdf7f2 1058 self._ceph_set_store(key, val)
7c673cae 1059
11fdf7f2 1060 def get_store(self, key, default=None):
7c673cae 1061 """
11fdf7f2 1062 Get a value from this module's persistent key value store
7c673cae 1063 """
11fdf7f2
TL
1064 r = self._ceph_get_store(key)
1065 if r is None:
1066 return default
7c673cae 1067 else:
11fdf7f2
TL
1068 return r
1069
1070 def get_localized_store(self, key, default=None):
1071 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1072 if r is None:
1073 r = self._ceph_get_store(key)
1074 if r is None:
1075 r = default
1076 return r
1077
1078 def set_localized_store(self, key, val):
1079 return self._set_localized(key, val, self.set_store)
224ce89b
WB
1080
1081 def self_test(self):
1082 """
1083 Run a self-test on the module. Override this function and implement
1084 a best as possible self-test for (automated) testing of the module
11fdf7f2
TL
1085
1086 Indicate any failures by raising an exception. This does not have
1087 to be pretty, it's mainly for picking up regressions during
1088 development, rather than use in the field.
1089
1090 :return: None, or an advisory string for developer interest, such
1091 as a json dump of some state.
224ce89b
WB
1092 """
1093 pass
3efd9988
FG
1094
1095 def get_osdmap(self):
1096 """
1097 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1098 OSDMap.
1099 :return: OSDMap
1100 """
1101 return self._ceph_get_osdmap()
1102
11fdf7f2
TL
1103 def get_latest(self, daemon_type, daemon_name, counter):
1104 data = self.get_latest_counter(
1105 daemon_type, daemon_name, counter)[counter]
1106 if data:
1107 return data[1]
1108 else:
1109 return 0
1110
1111 def get_latest_avg(self, daemon_type, daemon_name, counter):
1112 data = self.get_latest_counter(
1113 daemon_type, daemon_name, counter)[counter]
1114 if data:
1115 return data[1], data[2]
1116 else:
1117 return 0, 0
1118
1119 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL,
1120 services=("mds", "mon", "osd",
81eedcae 1121 "rbd-mirror", "rgw", "tcmu-runner")):
3efd9988
FG
1122 """
1123 Return the perf counters currently known to this ceph-mgr
1124 instance, filtered by priority equal to or greater than `prio_limit`.
1125
11fdf7f2 1126 The result is a map of string to dict, associating services
3efd9988
FG
1127 (like "osd.123") with their counters. The counter
1128 dict for each service maps counter paths to a counter
1129 info structure, which is the information from
1130 the schema, plus an additional "value" member with the latest
1131 value.
1132 """
1133
1134 result = defaultdict(dict)
1135
3efd9988
FG
1136 for server in self.list_servers():
1137 for service in server['services']:
11fdf7f2 1138 if service['type'] not in services:
3efd9988
FG
1139 continue
1140
1141 schema = self.get_perf_schema(service['type'], service['id'])
1142 if not schema:
1143 self.log.warn("No perf counter schema for {0}.{1}".format(
1144 service['type'], service['id']
1145 ))
1146 continue
1147
1148 # Value is returned in a potentially-multi-service format,
1149 # get just the service we're asking about
11fdf7f2
TL
1150 svc_full_name = "{0}.{1}".format(
1151 service['type'], service['id'])
3efd9988
FG
1152 schema = schema[svc_full_name]
1153
1154 # Populate latest values
1155 for counter_path, counter_schema in schema.items():
1156 # self.log.debug("{0}: {1}".format(
1157 # counter_path, json.dumps(counter_schema)
1158 # ))
1159 if counter_schema['priority'] < prio_limit:
1160 continue
1161
28e407b8
AA
1162 counter_info = dict(counter_schema)
1163
1164 # Also populate count for the long running avgs
1165 if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
11fdf7f2 1166 v, c = self.get_latest_avg(
28e407b8
AA
1167 service['type'],
1168 service['id'],
1169 counter_path
1170 )
1171 counter_info['value'], counter_info['count'] = v, c
1172 result[svc_full_name][counter_path] = counter_info
1173 else:
11fdf7f2 1174 counter_info['value'] = self.get_latest(
28e407b8
AA
1175 service['type'],
1176 service['id'],
1177 counter_path
1178 )
1179
3efd9988
FG
1180 result[svc_full_name][counter_path] = counter_info
1181
1182 self.log.debug("returning {0} counter".format(len(result)))
1183
1184 return result
1185
1186 def set_uri(self, uri):
1187 """
1188 If the module exposes a service, then call this to publish the
1189 address once it is available.
1190
1191 :return: a string
1192 """
1193 return self._ceph_set_uri(uri)
94b18763
FG
1194
1195 def have_mon_connection(self):
1196 """
1197 Check whether this ceph-mgr daemon has an open connection
1198 to a monitor. If it doesn't, then it's likely that the
1199 information we have about the cluster is out of date,
1200 and/or the monitor cluster is down.
1201 """
1202
28e407b8 1203 return self._ceph_have_mon_connection()
11fdf7f2
TL
1204
1205 def update_progress_event(self, evid, desc, progress):
1206 return self._ceph_update_progress_event(str(evid), str(desc), float(progress))
1207
1208 def complete_progress_event(self, evid):
1209 return self._ceph_complete_progress_event(str(evid))
1210
1211 def clear_all_progress_events(self):
1212 return self._ceph_clear_all_progress_events()
1213
1214 @property
1215 def rados(self):
1216 """
1217 A librados instance to be shared by any classes within
1218 this mgr module that want one.
1219 """
1220 if self._rados:
1221 return self._rados
1222
1223 ctx_capsule = self.get_context()
1224 self._rados = rados.Rados(context=ctx_capsule)
1225 self._rados.connect()
1226
1227 return self._rados
1228
1229 @staticmethod
1230 def can_run():
1231 """
1232 Implement this function to report whether the module's dependencies
1233 are met. For example, if the module needs to import a particular
1234 dependency to work, then use a try/except around the import at
1235 file scope, and then report here if the import failed.
1236
1237 This will be called in a blocking way from the C++ code, so do not
1238 do any I/O that could block in this function.
1239
1240 :return a 2-tuple consisting of a boolean and explanatory string
1241 """
1242
1243 return True, ""
1244
1245 def remote(self, module_name, method_name, *args, **kwargs):
1246 """
1247 Invoke a method on another module. All arguments, and the return
1248 value from the other module must be serializable.
1249
1250 Limitation: Do not import any modules within the called method.
1251 Otherwise you will get an error in Python 2::
1252
1253 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1254
1255
1256
1257 :param module_name: Name of other module. If module isn't loaded,
1258 an ImportError exception is raised.
1259 :param method_name: Method name. If it does not exist, a NameError
1260 exception is raised.
1261 :param args: Argument tuple
1262 :param kwargs: Keyword argument dict
1263 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1264 :raises ImportError: No such module
1265 """
1266 return self._ceph_dispatch_remote(module_name, method_name,
1267 args, kwargs)
1268
1269 def add_osd_perf_query(self, query):
1270 """
1271 Register an OSD perf query. Argument is a
1272 dict of the query parameters, in this form:
1273
1274 ::
1275
1276 {
1277 'key_descriptor': [
1278 {'type': subkey_type, 'regex': regex_pattern},
1279 ...
1280 ],
1281 'performance_counter_descriptors': [
1282 list, of, descriptor, types
1283 ],
1284 'limit': {'order_by': performance_counter_type, 'max_count': n},
1285 }
1286
1287 Valid subkey types:
1288 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1289 'pg_id', 'object_name', 'snap_id'
1290 Valid performance counter types:
1291 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1292 'latency', 'write_latency', 'read_latency'
1293
1294 :param object query: query
1295 :rtype: int (query id)
1296 """
1297 return self._ceph_add_osd_perf_query(query)
1298
1299 def remove_osd_perf_query(self, query_id):
1300 """
1301 Unregister an OSD perf query.
1302
1303 :param int query_id: query ID
1304 """
1305 return self._ceph_remove_osd_perf_query(query_id)
1306
1307 def get_osd_perf_counters(self, query_id):
1308 """
1309 Get stats collected for an OSD perf query.
1310
1311 :param int query_id: query ID
1312 """
1313 return self._ceph_get_osd_perf_counters(query_id)