]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/mgr_module.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
CommitLineData
3efd9988 1import ceph_module # noqa
3efd9988 2
494da23a
TL
3try:
4 from typing import Set, Tuple, Iterator, Any
5except ImportError:
6 # just for type checking
7 pass
7c673cae 8import logging
11fdf7f2 9import json
1adf2230 10import six
7c673cae 11import threading
11fdf7f2
TL
12from collections import defaultdict, namedtuple
13import rados
81eedcae 14import re
11fdf7f2
TL
15import time
16
17PG_STATES = [
18 "active",
19 "clean",
20 "down",
21 "recovery_unfound",
22 "backfill_unfound",
23 "scrubbing",
24 "degraded",
25 "inconsistent",
26 "peering",
27 "repair",
28 "recovering",
29 "forced_recovery",
30 "backfill_wait",
31 "incomplete",
32 "stale",
33 "remapped",
34 "deep",
35 "backfilling",
36 "forced_backfill",
37 "backfill_toofull",
38 "recovery_wait",
39 "recovery_toofull",
40 "undersized",
41 "activating",
42 "peered",
43 "snaptrim",
44 "snaptrim_wait",
45 "snaptrim_error",
46 "creating",
47 "unknown"]
3efd9988
FG
48
49
50class CPlusPlusHandler(logging.Handler):
51 def __init__(self, module_inst):
52 super(CPlusPlusHandler, self).__init__()
53 self._module = module_inst
54
55 def emit(self, record):
56 if record.levelno <= logging.DEBUG:
57 ceph_level = 20
58 elif record.levelno <= logging.INFO:
59 ceph_level = 4
60 elif record.levelno <= logging.WARNING:
61 ceph_level = 1
62 else:
63 ceph_level = 0
64
65 self._module._ceph_log(ceph_level, self.format(record))
66
67
11fdf7f2
TL
68def configure_logger(module_inst, module_name):
69 """
70 Create and configure the logger with the specified module.
3efd9988 71
11fdf7f2
TL
72 A handler will be added to the root logger which will redirect
73 the messages from all loggers (incl. 3rd party libraries) to the
74 Ceph log.
3efd9988 75
11fdf7f2
TL
76 :param module_inst: The module instance.
77 :type module_inst: instance
78 :param module_name: The module name.
79 :type module_name: str
80 :return: Return the logger with the specified name.
81 """
82 logger = logging.getLogger(module_name)
83 # Don't filter any logs at the python level, leave it to C++.
84 # FIXME: We should learn the log level from C++ land, and then
85 # avoid calling the C++ level log when we know a message
86 # is of an insufficient level to be ultimately output.
87 logger.setLevel(logging.DEBUG) # Don't use NOTSET
88
89 root_logger = logging.getLogger()
90 # Add handler to the root logger, thus this module and all
91 # 3rd party libraries will log their messages to the Ceph log.
92 root_logger.addHandler(CPlusPlusHandler(module_inst))
93 # Set the log level to ``ERROR`` to ensure that we only get
94 # those message from 3rd party libraries (only effective if
95 # they use the default log level ``NOTSET``).
96 # Check https://docs.python.org/3/library/logging.html#logging.Logger.setLevel
97 # for more information about how the effective log level is
98 # determined.
99 root_logger.setLevel(logging.ERROR)
3efd9988
FG
100
101 return logger
7c673cae
FG
102
103
11fdf7f2
TL
104def unconfigure_logger(module_name=None):
105 """
106 :param module_name: The module name. Defaults to ``None``.
107 :type module_name: str or None
108 """
109 logger = logging.getLogger(module_name)
110 rm_handlers = [
111 h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
3efd9988
FG
112 for h in rm_handlers:
113 logger.removeHandler(h)
114
11fdf7f2 115
7c673cae
FG
116class CommandResult(object):
117 """
118 Use with MgrModule.send_command
119 """
11fdf7f2
TL
120
121 def __init__(self, tag=None):
7c673cae
FG
122 self.ev = threading.Event()
123 self.outs = ""
124 self.outb = ""
125 self.r = 0
126
127 # This is just a convenience for notifications from
128 # C++ land, to avoid passing addresses around in messages.
11fdf7f2 129 self.tag = tag if tag else ""
7c673cae
FG
130
131 def complete(self, r, outb, outs):
132 self.r = r
133 self.outb = outb
134 self.outs = outs
135 self.ev.set()
136
137 def wait(self):
138 self.ev.wait()
139 return self.r, self.outb, self.outs
140
141
11fdf7f2
TL
142class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
143 def __new__(cls, retval=0, stdout="", stderr=""):
144 """
145 Tuple containing the result of `handle_command()`
146
147 Only write to stderr if there is an error, or in extraordinary circumstances
148
149 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
150 is critical information to include there.
151
152 Everything programmatically consumable should be put on stdout
153
154 :param retval: return code. E.g. 0 or -errno.EINVAL
155 :type retval: int
156 :param stdout: data of this result.
157 :type stdout: str
158 :param stderr: Typically used for error messages.
159 :type stderr: str
160 """
161 return super(HandleCommandResult, cls).__new__(cls, retval, stdout, stderr)
162
163
3efd9988
FG
164class OSDMap(ceph_module.BasePyOSDMap):
165 def get_epoch(self):
166 return self._get_epoch()
167
168 def get_crush_version(self):
169 return self._get_crush_version()
170
171 def dump(self):
172 return self._dump()
173
11fdf7f2
TL
174 def get_pools(self):
175 # FIXME: efficient implementation
176 d = self._dump()
177 return dict([(p['pool'], p) for p in d['pools']])
178
179 def get_pools_by_name(self):
180 # FIXME: efficient implementation
181 d = self._dump()
182 return dict([(p['pool_name'], p) for p in d['pools']])
183
3efd9988
FG
184 def new_incremental(self):
185 return self._new_incremental()
186
187 def apply_incremental(self, inc):
188 return self._apply_incremental(inc)
189
190 def get_crush(self):
191 return self._get_crush()
192
193 def get_pools_by_take(self, take):
194 return self._get_pools_by_take(take).get('pools', [])
195
196 def calc_pg_upmaps(self, inc,
11fdf7f2
TL
197 max_deviation=.01, max_iterations=10, pools=None):
198 if pools is None:
199 pools = []
3efd9988
FG
200 return self._calc_pg_upmaps(
201 inc,
202 max_deviation, max_iterations, pools)
203
204 def map_pool_pgs_up(self, poolid):
205 return self._map_pool_pgs_up(poolid)
206
11fdf7f2
TL
207 def pg_to_up_acting_osds(self, pool_id, ps):
208 return self._pg_to_up_acting_osds(pool_id, ps)
209
210 def pool_raw_used_rate(self, pool_id):
211 return self._pool_raw_used_rate(pool_id)
212
213 def get_ec_profile(self, name):
214 # FIXME: efficient implementation
215 d = self._dump()
216 return d['erasure_code_profiles'].get(name, None)
217
218
3efd9988
FG
219class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
220 def get_epoch(self):
221 return self._get_epoch()
222
223 def dump(self):
224 return self._dump()
225
226 def set_osd_reweights(self, weightmap):
227 """
228 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
229 """
230 return self._set_osd_reweights(weightmap)
231
232 def set_crush_compat_weight_set_weights(self, weightmap):
233 """
234 weightmap is a dict, int to float. devices only. e.g.,
235 { 0: 3.4, 1: 3.3, 2: 3.334 }
236 """
237 return self._set_crush_compat_weight_set_weights(weightmap)
238
11fdf7f2 239
3efd9988 240class CRUSHMap(ceph_module.BasePyCRUSH):
b32b8144 241 ITEM_NONE = 0x7fffffff
11fdf7f2 242 DEFAULT_CHOOSE_ARGS = '-1'
b32b8144 243
3efd9988
FG
244 def dump(self):
245 return self._dump()
246
247 def get_item_weight(self, item):
248 return self._get_item_weight(item)
249
250 def get_item_name(self, item):
251 return self._get_item_name(item)
252
253 def find_takes(self):
254 return self._find_takes().get('takes', [])
255
256 def get_take_weight_osd_map(self, root):
257 uglymap = self._get_take_weight_osd_map(root)
11fdf7f2
TL
258 return {int(k): v for k, v in six.iteritems(uglymap.get('weights', {}))}
259
260 @staticmethod
261 def have_default_choose_args(dump):
262 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
263
264 @staticmethod
265 def get_default_choose_args(dump):
266 return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
267
268 def get_rule(self, rule_name):
269 # TODO efficient implementation
270 for rule in self.dump()['rules']:
271 if rule_name == rule['rule_name']:
272 return rule
273
274 return None
275
276 def get_rule_by_id(self, rule_id):
277 for rule in self.dump()['rules']:
278 if rule['rule_id'] == rule_id:
279 return rule
280
281 return None
282
283 def get_rule_root(self, rule_name):
284 rule = self.get_rule(rule_name)
285 if rule is None:
286 return None
287
288 try:
289 first_take = [s for s in rule['steps'] if s['op'] == 'take'][0]
290 except IndexError:
291 self.log.warn("CRUSH rule '{0}' has no 'take' step".format(
292 rule_name))
293 return None
294 else:
295 return first_take['item']
296
297 def get_osds_under(self, root_id):
298 # TODO don't abuse dump like this
299 d = self.dump()
300 buckets = dict([(b['id'], b) for b in d['buckets']])
301
302 osd_list = []
303
304 def accumulate(b):
305 for item in b['items']:
306 if item['id'] >= 0:
307 osd_list.append(item['id'])
308 else:
309 try:
310 accumulate(buckets[item['id']])
311 except KeyError:
312 pass
313
314 accumulate(buckets[root_id])
315
316 return osd_list
317
318 def device_class_counts(self):
319 result = defaultdict(int)
320 # TODO don't abuse dump like this
321 d = self.dump()
322 for device in d['devices']:
323 cls = device.get('class', None)
324 result[cls] += 1
325
326 return dict(result)
327
328
329class CLICommand(object):
330 COMMANDS = {}
331
332 def __init__(self, prefix, args="", desc="", perm="rw"):
333 self.prefix = prefix
334 self.args = args
335 self.args_dict = {}
336 self.desc = desc
337 self.perm = perm
338 self.func = None
339 self._parse_args()
340
341 def _parse_args(self):
342 if not self.args:
343 return
344 args = self.args.split(" ")
345 for arg in args:
346 arg_desc = arg.strip().split(",")
347 arg_d = {}
348 for kv in arg_desc:
349 k, v = kv.split("=")
350 if k != "name":
351 arg_d[k] = v
352 else:
353 self.args_dict[v] = arg_d
354
355 def __call__(self, func):
356 self.func = func
357 self.COMMANDS[self.prefix] = self
358 return self.func
359
360 def call(self, mgr, cmd_dict, inbuf):
361 kwargs = {}
362 for a, d in self.args_dict.items():
363 if 'req' in d and d['req'] == "false" and a not in cmd_dict:
364 continue
365 kwargs[a.replace("-", "_")] = cmd_dict[a]
366 if inbuf:
367 kwargs['inbuf'] = inbuf
368 return self.func(mgr, **kwargs)
369
370 @classmethod
371 def dump_cmd_list(cls):
372 return [{
373 'cmd': '{} {}'.format(cmd.prefix, cmd.args),
374 'desc': cmd.desc,
375 'perm': cmd.perm
376 } for _, cmd in cls.COMMANDS.items()]
377
378
379def CLIReadCommand(prefix, args="", desc=""):
380 return CLICommand(prefix, args, desc, "r")
381
382
383def CLIWriteCommand(prefix, args="", desc=""):
384 return CLICommand(prefix, args, desc, "w")
385
386
387def _get_localized_key(prefix, key):
388 return '{}/{}'.format(prefix, key)
389
390
391class Option(dict):
392 """
393 Helper class to declare options for MODULE_OPTIONS list.
394
395 Caveat: it uses argument names matching Python keywords (type, min, max),
396 so any further processing should happen in a separate method.
397
398 TODO: type validation.
399 """
400
401 def __init__(
402 self, name,
403 default=None,
404 type='str',
405 desc=None, longdesc=None,
406 min=None, max=None,
407 enum_allowed=None,
408 see_also=None,
409 tags=None,
410 runtime=False,
411 ):
412 super(Option, self).__init__(
413 (k, v) for k, v in vars().items()
414 if k != 'self' and v is not None)
415
3efd9988
FG
416
417class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
418 """
419 Standby modules only implement a serve and shutdown method, they
420 are not permitted to implement commands and they do not receive
421 any notifications.
422
b32b8144 423 They only have access to the mgrmap (for accessing service URI info
3efd9988
FG
424 from their active peer), and to configuration settings (read only).
425 """
426
11fdf7f2
TL
427 MODULE_OPTIONS = []
428 MODULE_OPTION_DEFAULTS = {}
429
3efd9988
FG
430 def __init__(self, module_name, capsule):
431 super(MgrStandbyModule, self).__init__(capsule)
432 self.module_name = module_name
433 self._logger = configure_logger(self, module_name)
11fdf7f2
TL
434 # see also MgrModule.__init__()
435 for o in self.MODULE_OPTIONS:
436 if 'default' in o:
437 if 'type' in o:
438 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
439 else:
440 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
3efd9988
FG
441
442 def __del__(self):
11fdf7f2 443 unconfigure_logger()
3efd9988
FG
444
445 @property
446 def log(self):
447 return self._logger
448
449 def serve(self):
450 """
451 The serve method is mandatory for standby modules.
452 :return:
453 """
454 raise NotImplementedError()
455
456 def get_mgr_id(self):
457 return self._ceph_get_mgr_id()
458
11fdf7f2 459 def get_module_option(self, key, default=None):
b32b8144
FG
460 """
461 Retrieve the value of a persistent configuration setting
462
463 :param str key:
464 :param default: the default value of the config if it is not found
465 :return: str
466 """
11fdf7f2 467 r = self._ceph_get_module_option(key)
b32b8144 468 if r is None:
11fdf7f2 469 return self.MODULE_OPTION_DEFAULTS.get(key, default)
b32b8144
FG
470 else:
471 return r
472
11fdf7f2
TL
473 def get_ceph_option(self, key):
474 return self._ceph_get_option(key)
475
476 def get_store(self, key):
477 """
478 Retrieve the value of a persistent KV store entry
479
480 :param key: String
481 :return: Byte string or None
482 """
483 return self._ceph_get_store(key)
3efd9988
FG
484
485 def get_active_uri(self):
486 return self._ceph_get_active_uri()
487
11fdf7f2
TL
488 def get_localized_module_option(self, key, default=None):
489 r = self._ceph_get_module_option(key, self.get_mgr_id())
3efd9988 490 if r is None:
11fdf7f2
TL
491 return self.MODULE_OPTION_DEFAULTS.get(key, default)
492 else:
493 return r
3efd9988 494
3efd9988
FG
495
496class MgrModule(ceph_module.BaseMgrModule):
7c673cae 497 COMMANDS = []
11fdf7f2
TL
498 MODULE_OPTIONS = []
499 MODULE_OPTION_DEFAULTS = {}
7c673cae 500
3efd9988
FG
501 # Priority definitions for perf counters
502 PRIO_CRITICAL = 10
503 PRIO_INTERESTING = 8
504 PRIO_USEFUL = 5
505 PRIO_UNINTERESTING = 2
506 PRIO_DEBUGONLY = 0
507
508 # counter value types
509 PERFCOUNTER_TIME = 1
510 PERFCOUNTER_U64 = 2
511
512 # counter types
513 PERFCOUNTER_LONGRUNAVG = 4
514 PERFCOUNTER_COUNTER = 8
515 PERFCOUNTER_HISTOGRAM = 0x10
28e407b8 516 PERFCOUNTER_TYPE_MASK = ~3
7c673cae 517
1adf2230
AA
518 # units supported
519 BYTES = 0
520 NONE = 1
11fdf7f2
TL
521
522 # Cluster log priorities
523 CLUSTER_LOG_PRIO_DEBUG = 0
524 CLUSTER_LOG_PRIO_INFO = 1
525 CLUSTER_LOG_PRIO_SEC = 2
526 CLUSTER_LOG_PRIO_WARN = 3
527 CLUSTER_LOG_PRIO_ERROR = 4
528
3efd9988
FG
529 def __init__(self, module_name, py_modules_ptr, this_ptr):
530 self.module_name = module_name
7c673cae 531
3efd9988
FG
532 # If we're taking over from a standby module, let's make sure
533 # its logger was unconfigured before we hook ours up
11fdf7f2 534 unconfigure_logger()
3efd9988 535 self._logger = configure_logger(self, module_name)
7c673cae 536
3efd9988 537 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
7c673cae 538
3efd9988 539 self._version = self._ceph_get_version()
7c673cae 540
3efd9988 541 self._perf_schema_cache = None
7c673cae 542
11fdf7f2
TL
543 # Keep a librados instance for those that need it.
544 self._rados = None
545
546 for o in self.MODULE_OPTIONS:
547 if 'default' in o:
548 if 'type' in o:
549 # we'll assume the declared type matches the
550 # supplied default value's type.
551 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
552 else:
553 # module not declaring it's type, so normalize the
554 # default value to be a string for consistent behavior
555 # with default and user-supplied option values.
556 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
557
3efd9988 558 def __del__(self):
11fdf7f2 559 unconfigure_logger()
3efd9988 560
11fdf7f2
TL
561 @classmethod
562 def _register_commands(cls):
563 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
7c673cae
FG
564
565 @property
566 def log(self):
567 return self._logger
568
11fdf7f2
TL
569 def cluster_log(self, channel, priority, message):
570 """
571 :param channel: The log channel. This can be 'cluster', 'audit', ...
572 :type channel: str
573 :param priority: The log message priority. This can be
574 CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
575 CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
576 CLUSTER_LOG_PRIO_ERROR.
577 :type priority: int
578 :param message: The message to log.
579 :type message: str
580 """
581 self._ceph_cluster_log(channel, priority, message)
582
7c673cae
FG
583 @property
584 def version(self):
585 return self._version
586
3efd9988
FG
587 def get_context(self):
588 """
589 :return: a Python capsule containing a C++ CephContext pointer
590 """
591 return self._ceph_get_context()
592
7c673cae
FG
593 def notify(self, notify_type, notify_id):
594 """
595 Called by the ceph-mgr service to notify the Python plugin
596 that new state is available.
11fdf7f2
TL
597
598 :param notify_type: string indicating what kind of notification,
599 such as osd_map, mon_map, fs_map, mon_status,
600 health, pg_summary, command, service_map
601 :param notify_id: string (may be empty) that optionally specifies
602 which entity is being notified about. With
603 "command" notifications this is set to the tag
604 ``from send_command``.
605 """
606 pass
607
608 def config_notify(self):
609 """
610 Called by the ceph-mgr service to notify the Python plugin
611 that the configuration may have changed. Modules will want to
612 refresh any configuration values stored in config variables.
7c673cae
FG
613 """
614 pass
615
616 def serve(self):
617 """
618 Called by the ceph-mgr service to start any server that
619 is provided by this Python plugin. The implementation
620 of this function should block until ``shutdown`` is called.
621
622 You *must* implement ``shutdown`` if you implement ``serve``
623 """
624 pass
625
626 def shutdown(self):
627 """
628 Called by the ceph-mgr service to request that this
629 module drop out of its serve() function. You do not
630 need to implement this if you do not implement serve()
631
632 :return: None
633 """
11fdf7f2
TL
634 if self._rados:
635 self._rados.shutdown()
7c673cae
FG
636
637 def get(self, data_name):
638 """
11fdf7f2
TL
639 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
640
641 :param str data_name: Valid things to fetch are osd_crush_map_text,
642 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
643 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
644 health, mon_status, devices, device <devid>.
645
646 Note:
647 All these structures have their own JSON representations: experiment
648 or look at the C++ ``dump()`` methods to learn about them.
7c673cae 649 """
3efd9988 650 return self._ceph_get(data_name)
7c673cae 651
94b18763 652 def _stattype_to_str(self, stattype):
11fdf7f2 653
94b18763
FG
654 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
655 if typeonly == 0:
656 return 'gauge'
657 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
658 # this lie matches the DaemonState decoding: only val, no counts
659 return 'counter'
660 if typeonly == self.PERFCOUNTER_COUNTER:
661 return 'counter'
662 if typeonly == self.PERFCOUNTER_HISTOGRAM:
663 return 'histogram'
11fdf7f2 664
94b18763
FG
665 return ''
666
81eedcae
TL
667 def _perfpath_to_path_labels(self, daemon, path):
668 label_names = ("ceph_daemon",)
669 labels = (daemon,)
670
671 if daemon.startswith('rbd-mirror.'):
672 match = re.match(
673 r'^rbd_mirror_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
674 path
675 )
676 if match:
677 path = 'rbd_mirror_' + match.group(4)
678 pool = match.group(1)
679 namespace = match.group(2) or ''
680 image = match.group(3)
681 label_names += ('pool', 'namespace', 'image')
682 labels += (pool, namespace, image)
683
684 return path, label_names, labels,
685
28e407b8
AA
686 def _perfvalue_to_value(self, stattype, value):
687 if stattype & self.PERFCOUNTER_TIME:
688 # Convert from ns to seconds
689 return value / 1000000000.0
690 else:
691 return value
692
1adf2230
AA
693 def _unit_to_str(self, unit):
694 if unit == self.NONE:
695 return "/s"
696 elif unit == self.BYTES:
11fdf7f2
TL
697 return "B/s"
698
699 @staticmethod
700 def to_pretty_iec(n):
701 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
702 (20, 'Mi'), (10, 'Ki')]:
703 if n > 10 << bits:
704 return str(n >> bits) + ' ' + suffix
705 return str(n) + ' '
706
707 @staticmethod
708 def get_pretty_row(elems, width):
709 """
710 Takes an array of elements and returns a string with those elements
711 formatted as a table row. Useful for polling modules.
712
713 :param elems: the elements to be printed
714 :param width: the width of the terminal
715 """
716 n = len(elems)
717 column_width = int(width / n)
718
719 ret = '|'
720 for elem in elems:
721 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
722
723 return ret
724
725 def get_pretty_header(self, elems, width):
726 """
727 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
728
729 :param elems: the elements to be printed
730 :param width: the width of the terminal
731 """
732 n = len(elems)
733 column_width = int(width / n)
734
735 # dash line
736 ret = '+'
737 for i in range(0, n):
738 ret += '-' * (column_width - 1) + '+'
739 ret += '\n'
740
741 # title
742 ret += self.get_pretty_row(elems, width)
743 ret += '\n'
744
745 # dash line
746 ret += '+'
747 for i in range(0, n):
748 ret += '-' * (column_width - 1) + '+'
749 ret += '\n'
750
751 return ret
752
7c673cae
FG
753 def get_server(self, hostname):
754 """
11fdf7f2
TL
755 Called by the plugin to fetch metadata about a particular hostname from
756 ceph-mgr.
757
758 This is information that ceph-mgr has gleaned from the daemon metadata
759 reported by daemons running on a particular server.
7c673cae 760
11fdf7f2 761 :param hostname: a hostname
7c673cae 762 """
3efd9988 763 return self._ceph_get_server(hostname)
7c673cae 764
c07f9fc5
FG
765 def get_perf_schema(self, svc_type, svc_name):
766 """
767 Called by the plugin to fetch perf counter schema info.
768 svc_name can be nullptr, as can svc_type, in which case
769 they are wildcards
770
11fdf7f2
TL
771 :param str svc_type:
772 :param str svc_name:
c07f9fc5
FG
773 :return: list of dicts describing the counters requested
774 """
3efd9988 775 return self._ceph_get_perf_schema(svc_type, svc_name)
c07f9fc5 776
7c673cae
FG
777 def get_counter(self, svc_type, svc_name, path):
778 """
11fdf7f2
TL
779 Called by the plugin to fetch the latest performance counter data for a
780 particular counter on a particular service.
7c673cae 781
11fdf7f2
TL
782 :param str svc_type:
783 :param str svc_name:
784 :param str path: a period-separated concatenation of the subsystem and the
785 counter name, for example "mds.inodes".
786 :return: A list of two-tuples of (timestamp, value) is returned. This may be
787 empty if no data is available.
7c673cae 788 """
3efd9988 789 return self._ceph_get_counter(svc_type, svc_name, path)
7c673cae 790
11fdf7f2
TL
791 def get_latest_counter(self, svc_type, svc_name, path):
792 """
793 Called by the plugin to fetch only the newest performance counter data
794 pointfor a particular counter on a particular service.
795
796 :param str svc_type:
797 :param str svc_name:
798 :param str path: a period-separated concatenation of the subsystem and the
799 counter name, for example "mds.inodes".
800 :return: A list of two-tuples of (timestamp, value) is returned. This may be
801 empty if no data is available.
802 """
803 return self._ceph_get_latest_counter(svc_type, svc_name, path)
804
7c673cae
FG
805 def list_servers(self):
806 """
11fdf7f2
TL
807 Like ``get_server``, but gives information about all servers (i.e. all
808 unique hostnames that have been mentioned in daemon metadata)
809
810 :return: a list of information about all servers
811 :rtype: list
7c673cae 812 """
3efd9988 813 return self._ceph_get_server(None)
7c673cae
FG
814
815 def get_metadata(self, svc_type, svc_id):
816 """
11fdf7f2 817 Fetch the daemon metadata for a particular service.
7c673cae 818
11fdf7f2
TL
819 ceph-mgr fetches metadata asynchronously, so are windows of time during
820 addition/removal of services where the metadata is not available to
821 modules. ``None`` is returned if no metadata is available.
822
823 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
824 :param str svc_id: service id. convert OSD integer IDs to strings when
825 calling this
826 :rtype: dict, or None if no metadata found
7c673cae 827 """
3efd9988 828 return self._ceph_get_metadata(svc_type, svc_id)
7c673cae 829
224ce89b
WB
830 def get_daemon_status(self, svc_type, svc_id):
831 """
832 Fetch the latest status for a particular service daemon.
833
11fdf7f2
TL
834 This method may return ``None`` if no status information is
835 available, for example because the daemon hasn't fully started yet.
836
224ce89b
WB
837 :param svc_type: string (e.g., 'rgw')
838 :param svc_id: string
11fdf7f2 839 :return: dict, or None if the service is not found
224ce89b 840 """
3efd9988 841 return self._ceph_get_daemon_status(svc_type, svc_id)
224ce89b 842
11fdf7f2
TL
843 def mon_command(self, cmd_dict):
844 """
845 Helper for modules that do simple, synchronous mon command
846 execution.
847
848 See send_command for general case.
849
850 :return: status int, out std, err str
851 """
852
853 t1 = time.time()
854 result = CommandResult()
855 self.send_command(result, "mon", "", json.dumps(cmd_dict), "")
856 r = result.wait()
857 t2 = time.time()
858
859 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
860 cmd_dict['prefix'], r[0], t2 - t1
861 ))
862
863 return r
864
7c673cae
FG
865 def send_command(self, *args, **kwargs):
866 """
867 Called by the plugin to send a command to the mon
868 cluster.
11fdf7f2
TL
869
870 :param CommandResult result: an instance of the ``CommandResult``
871 class, defined in the same module as MgrModule. This acts as a
872 completion and stores the output of the command. Use
873 ``CommandResult.wait()`` if you want to block on completion.
874 :param str svc_type:
875 :param str svc_id:
876 :param str command: a JSON-serialized command. This uses the same
877 format as the ceph command line, which is a dictionary of command
878 arguments, with the extra ``prefix`` key containing the command
879 name itself. Consult MonCommands.h for available commands and
880 their expected arguments.
881 :param str tag: used for nonblocking operation: when a command
882 completes, the ``notify()`` callback on the MgrModule instance is
883 triggered, with notify_type set to "command", and notify_id set to
884 the tag of the command.
7c673cae 885 """
3efd9988 886 self._ceph_send_command(*args, **kwargs)
7c673cae 887
c07f9fc5
FG
888 def set_health_checks(self, checks):
889 """
c07f9fc5
FG
890 Set the module's current map of health checks. Argument is a
891 dict of check names to info, in this form:
892
11fdf7f2
TL
893 ::
894
c07f9fc5
FG
895 {
896 'CHECK_FOO': {
897 'severity': 'warning', # or 'error'
898 'summary': 'summary string',
899 'detail': [ 'list', 'of', 'detail', 'strings' ],
900 },
901 'CHECK_BAR': {
902 'severity': 'error',
903 'summary': 'bars are bad',
904 'detail': [ 'too hard' ],
905 },
906 }
907
908 :param list: dict of health check dicts
909 """
3efd9988 910 self._ceph_set_health_checks(checks)
c07f9fc5 911
11fdf7f2
TL
912 def _handle_command(self, inbuf, cmd):
913 if cmd['prefix'] not in CLICommand.COMMANDS:
914 return self.handle_command(inbuf, cmd)
915 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
916
917 def handle_command(self, inbuf, cmd):
7c673cae
FG
918 """
919 Called by ceph-mgr to request the plugin to handle one
920 of the commands that it declared in self.COMMANDS
921
922 Return a status code, an output buffer, and an
923 output string. The output buffer is for data results,
924 the output string is for informative text.
925
11fdf7f2
TL
926 :param inbuf: content of any "-i <file>" supplied to ceph cli
927 :type inbuf: str
928 :param cmd: from Ceph's cmdmap_t
929 :type cmd: dict
7c673cae 930
11fdf7f2 931 :return: HandleCommandResult or a 3-tuple of (int, str, str)
7c673cae
FG
932 """
933
934 # Should never get called if they didn't declare
935 # any ``COMMANDS``
936 raise NotImplementedError()
937
31f18b77
FG
938 def get_mgr_id(self):
939 """
11fdf7f2
TL
940 Retrieve the name of the manager daemon where this plugin
941 is currently being executed (i.e. the active manager).
31f18b77
FG
942
943 :return: str
944 """
3efd9988 945 return self._ceph_get_mgr_id()
31f18b77 946
11fdf7f2
TL
947 def get_ceph_option(self, key):
948 return self._ceph_get_option(key)
7c673cae 949
11fdf7f2
TL
950 def _validate_module_option(self, key):
951 """
952 Helper: don't allow get/set config callers to
953 access config options that they didn't declare
954 in their schema.
7c673cae 955 """
11fdf7f2
TL
956 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
957 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
958 format(key, self.__class__.__name__))
959
960 def _get_module_option(self, key, default, localized_prefix=""):
961 r = self._ceph_get_module_option(self.module_name, key,
962 localized_prefix)
3efd9988 963 if r is None:
11fdf7f2 964 return self.MODULE_OPTION_DEFAULTS.get(key, default)
3efd9988
FG
965 else:
966 return r
7c673cae 967
11fdf7f2
TL
968 def get_module_option(self, key, default=None):
969 """
970 Retrieve the value of a persistent configuration setting
971
972 :param str key:
973 :param str default:
974 :return: str
975 """
976 self._validate_module_option(key)
977 return self._get_module_option(key, default)
978
979 def get_module_option_ex(self, module, key, default=None):
980 """
981 Retrieve the value of a persistent configuration setting
982 for the specified module.
983
984 :param str module: The name of the module, e.g. 'dashboard'
985 or 'telemetry'.
986 :param str key: The configuration key, e.g. 'server_addr'.
987 :param str,None default: The default value to use when the
988 returned value is ``None``. Defaults to ``None``.
989 :return: str,int,bool,float,None
990 """
991 if module == self.module_name:
992 self._validate_module_option(key)
993 r = self._ceph_get_module_option(module, key)
994 return default if r is None else r
995
996 def get_store_prefix(self, key_prefix):
31f18b77 997 """
11fdf7f2
TL
998 Retrieve a dict of KV store keys to values, where the keys
999 have the given prefix
31f18b77 1000
11fdf7f2 1001 :param str key_prefix:
31f18b77
FG
1002 :return: str
1003 """
11fdf7f2 1004 return self._ceph_get_store_prefix(key_prefix)
31f18b77 1005
11fdf7f2
TL
1006 def _set_localized(self, key, val, setter):
1007 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1008
1009 def get_localized_module_option(self, key, default=None):
224ce89b
WB
1010 """
1011 Retrieve localized configuration for this ceph-mgr instance
11fdf7f2
TL
1012 :param str key:
1013 :param str default:
224ce89b
WB
1014 :return: str
1015 """
11fdf7f2
TL
1016 self._validate_module_option(key)
1017 return self._get_module_option(key, default, self.get_mgr_id())
224ce89b 1018
11fdf7f2
TL
1019 def _set_module_option(self, key, val):
1020 return self._ceph_set_module_option(self.module_name, key, str(val))
224ce89b 1021
11fdf7f2 1022 def set_module_option(self, key, val):
7c673cae
FG
1023 """
1024 Set the value of a persistent configuration setting
1025
11fdf7f2
TL
1026 :param str key:
1027 :type val: str | None
7c673cae 1028 """
11fdf7f2
TL
1029 self._validate_module_option(key)
1030 return self._set_module_option(key, val)
7c673cae 1031
11fdf7f2
TL
1032 def set_module_option_ex(self, module, key, val):
1033 """
1034 Set the value of a persistent configuration setting
1035 for the specified module.
1036
1037 :param str module:
1038 :param str key:
1039 :param str val:
1040 """
1041 if module == self.module_name:
1042 self._validate_module_option(key)
1043 return self._ceph_set_module_option(module, key, str(val))
1044
1045 def set_localized_module_option(self, key, val):
224ce89b
WB
1046 """
1047 Set localized configuration for this ceph-mgr instance
11fdf7f2
TL
1048 :param str key:
1049 :param str val:
224ce89b
WB
1050 :return: str
1051 """
11fdf7f2
TL
1052 self._validate_module_option(key)
1053 return self._set_localized(key, val, self._set_module_option)
224ce89b 1054
11fdf7f2 1055 def set_store(self, key, val):
7c673cae 1056 """
11fdf7f2
TL
1057 Set a value in this module's persistent key value store.
1058 If val is None, remove key from store
7c673cae 1059
11fdf7f2
TL
1060 :param str key:
1061 :param str val:
7c673cae 1062 """
11fdf7f2 1063 self._ceph_set_store(key, val)
7c673cae 1064
11fdf7f2 1065 def get_store(self, key, default=None):
7c673cae 1066 """
11fdf7f2 1067 Get a value from this module's persistent key value store
7c673cae 1068 """
11fdf7f2
TL
1069 r = self._ceph_get_store(key)
1070 if r is None:
1071 return default
7c673cae 1072 else:
11fdf7f2
TL
1073 return r
1074
1075 def get_localized_store(self, key, default=None):
1076 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1077 if r is None:
1078 r = self._ceph_get_store(key)
1079 if r is None:
1080 r = default
1081 return r
1082
1083 def set_localized_store(self, key, val):
1084 return self._set_localized(key, val, self.set_store)
224ce89b
WB
1085
1086 def self_test(self):
1087 """
1088 Run a self-test on the module. Override this function and implement
1089 a best as possible self-test for (automated) testing of the module
11fdf7f2
TL
1090
1091 Indicate any failures by raising an exception. This does not have
1092 to be pretty, it's mainly for picking up regressions during
1093 development, rather than use in the field.
1094
1095 :return: None, or an advisory string for developer interest, such
1096 as a json dump of some state.
224ce89b
WB
1097 """
1098 pass
3efd9988
FG
1099
1100 def get_osdmap(self):
1101 """
1102 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1103 OSDMap.
1104 :return: OSDMap
1105 """
1106 return self._ceph_get_osdmap()
1107
11fdf7f2
TL
1108 def get_latest(self, daemon_type, daemon_name, counter):
1109 data = self.get_latest_counter(
1110 daemon_type, daemon_name, counter)[counter]
1111 if data:
1112 return data[1]
1113 else:
1114 return 0
1115
1116 def get_latest_avg(self, daemon_type, daemon_name, counter):
1117 data = self.get_latest_counter(
1118 daemon_type, daemon_name, counter)[counter]
1119 if data:
1120 return data[1], data[2]
1121 else:
1122 return 0, 0
1123
1124 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL,
1125 services=("mds", "mon", "osd",
81eedcae 1126 "rbd-mirror", "rgw", "tcmu-runner")):
3efd9988
FG
1127 """
1128 Return the perf counters currently known to this ceph-mgr
1129 instance, filtered by priority equal to or greater than `prio_limit`.
1130
11fdf7f2 1131 The result is a map of string to dict, associating services
3efd9988
FG
1132 (like "osd.123") with their counters. The counter
1133 dict for each service maps counter paths to a counter
1134 info structure, which is the information from
1135 the schema, plus an additional "value" member with the latest
1136 value.
1137 """
1138
1139 result = defaultdict(dict)
1140
3efd9988
FG
1141 for server in self.list_servers():
1142 for service in server['services']:
11fdf7f2 1143 if service['type'] not in services:
3efd9988
FG
1144 continue
1145
1146 schema = self.get_perf_schema(service['type'], service['id'])
1147 if not schema:
1148 self.log.warn("No perf counter schema for {0}.{1}".format(
1149 service['type'], service['id']
1150 ))
1151 continue
1152
1153 # Value is returned in a potentially-multi-service format,
1154 # get just the service we're asking about
11fdf7f2
TL
1155 svc_full_name = "{0}.{1}".format(
1156 service['type'], service['id'])
3efd9988
FG
1157 schema = schema[svc_full_name]
1158
1159 # Populate latest values
1160 for counter_path, counter_schema in schema.items():
1161 # self.log.debug("{0}: {1}".format(
1162 # counter_path, json.dumps(counter_schema)
1163 # ))
1164 if counter_schema['priority'] < prio_limit:
1165 continue
1166
28e407b8
AA
1167 counter_info = dict(counter_schema)
1168
1169 # Also populate count for the long running avgs
1170 if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
11fdf7f2 1171 v, c = self.get_latest_avg(
28e407b8
AA
1172 service['type'],
1173 service['id'],
1174 counter_path
1175 )
1176 counter_info['value'], counter_info['count'] = v, c
1177 result[svc_full_name][counter_path] = counter_info
1178 else:
11fdf7f2 1179 counter_info['value'] = self.get_latest(
28e407b8
AA
1180 service['type'],
1181 service['id'],
1182 counter_path
1183 )
1184
3efd9988
FG
1185 result[svc_full_name][counter_path] = counter_info
1186
1187 self.log.debug("returning {0} counter".format(len(result)))
1188
1189 return result
1190
1191 def set_uri(self, uri):
1192 """
1193 If the module exposes a service, then call this to publish the
1194 address once it is available.
1195
1196 :return: a string
1197 """
1198 return self._ceph_set_uri(uri)
94b18763
FG
1199
1200 def have_mon_connection(self):
1201 """
1202 Check whether this ceph-mgr daemon has an open connection
1203 to a monitor. If it doesn't, then it's likely that the
1204 information we have about the cluster is out of date,
1205 and/or the monitor cluster is down.
1206 """
1207
28e407b8 1208 return self._ceph_have_mon_connection()
11fdf7f2
TL
1209
1210 def update_progress_event(self, evid, desc, progress):
1211 return self._ceph_update_progress_event(str(evid), str(desc), float(progress))
1212
1213 def complete_progress_event(self, evid):
1214 return self._ceph_complete_progress_event(str(evid))
1215
1216 def clear_all_progress_events(self):
1217 return self._ceph_clear_all_progress_events()
1218
1219 @property
1220 def rados(self):
1221 """
1222 A librados instance to be shared by any classes within
1223 this mgr module that want one.
1224 """
1225 if self._rados:
1226 return self._rados
1227
1228 ctx_capsule = self.get_context()
1229 self._rados = rados.Rados(context=ctx_capsule)
1230 self._rados.connect()
1231
1232 return self._rados
1233
1234 @staticmethod
1235 def can_run():
1236 """
1237 Implement this function to report whether the module's dependencies
1238 are met. For example, if the module needs to import a particular
1239 dependency to work, then use a try/except around the import at
1240 file scope, and then report here if the import failed.
1241
1242 This will be called in a blocking way from the C++ code, so do not
1243 do any I/O that could block in this function.
1244
1245 :return a 2-tuple consisting of a boolean and explanatory string
1246 """
1247
1248 return True, ""
1249
1250 def remote(self, module_name, method_name, *args, **kwargs):
1251 """
1252 Invoke a method on another module. All arguments, and the return
1253 value from the other module must be serializable.
1254
1255 Limitation: Do not import any modules within the called method.
1256 Otherwise you will get an error in Python 2::
1257
1258 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1259
1260
1261
1262 :param module_name: Name of other module. If module isn't loaded,
1263 an ImportError exception is raised.
1264 :param method_name: Method name. If it does not exist, a NameError
1265 exception is raised.
1266 :param args: Argument tuple
1267 :param kwargs: Keyword argument dict
1268 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1269 :raises ImportError: No such module
1270 """
1271 return self._ceph_dispatch_remote(module_name, method_name,
1272 args, kwargs)
1273
1274 def add_osd_perf_query(self, query):
1275 """
1276 Register an OSD perf query. Argument is a
1277 dict of the query parameters, in this form:
1278
1279 ::
1280
1281 {
1282 'key_descriptor': [
1283 {'type': subkey_type, 'regex': regex_pattern},
1284 ...
1285 ],
1286 'performance_counter_descriptors': [
1287 list, of, descriptor, types
1288 ],
1289 'limit': {'order_by': performance_counter_type, 'max_count': n},
1290 }
1291
1292 Valid subkey types:
1293 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1294 'pg_id', 'object_name', 'snap_id'
1295 Valid performance counter types:
1296 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1297 'latency', 'write_latency', 'read_latency'
1298
1299 :param object query: query
1300 :rtype: int (query id)
1301 """
1302 return self._ceph_add_osd_perf_query(query)
1303
1304 def remove_osd_perf_query(self, query_id):
1305 """
1306 Unregister an OSD perf query.
1307
1308 :param int query_id: query ID
1309 """
1310 return self._ceph_remove_osd_perf_query(query_id)
1311
1312 def get_osd_perf_counters(self, query_id):
1313 """
1314 Get stats collected for an OSD perf query.
1315
1316 :param int query_id: query ID
1317 """
1318 return self._ceph_get_osd_perf_counters(query_id)
494da23a
TL
1319
1320
1321class PersistentStoreDict(object):
1322 def __init__(self, mgr, prefix):
1323 # type: (MgrModule, str) -> None
1324 self.mgr = mgr
1325 self.prefix = prefix + '.'
1326
1327 def _mk_store_key(self, key):
1328 return self.prefix + key
1329
1330 def __missing__(self, key):
1331 # KeyError won't work for the `in` operator.
1332 # https://docs.python.org/3/reference/expressions.html#membership-test-details
1333 raise IndexError('PersistentStoreDict: "{}" not found'.format(key))
1334
1335 def clear(self):
1336 # Don't make any assumptions about the content of the values.
1337 for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
1338 k, _ = item
1339 self.mgr.set_store(k, None)
1340
1341 def __getitem__(self, item):
1342 # type: (str) -> Any
1343 key = self._mk_store_key(item)
1344 try:
1345 val = self.mgr.get_store(key)
1346 if val is None:
1347 self.__missing__(key)
1348 return json.loads(val)
1349 except (KeyError, AttributeError, IndexError, ValueError, TypeError):
1350 logging.getLogger(__name__).exception('failed to deserialize')
1351 self.mgr.set_store(key, None)
1352 raise
1353
1354 def __setitem__(self, item, value):
1355 # type: (str, Any) -> None
1356 """
1357 value=None is not allowed, as it will remove the key.
1358 """
1359 key = self._mk_store_key(item)
1360 self.mgr.set_store(key, json.dumps(value) if value is not None else None)
1361
1362 def __delitem__(self, item):
1363 self[item] = None
1364
1365 def __len__(self):
1366 return len(self.keys())
1367
1368 def items(self):
1369 # type: () -> Iterator[Tuple[str, Any]]
1370 prefix_len = len(self.prefix)
1371 try:
1372 for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)):
1373 k, v = item
1374 yield k[prefix_len:], json.loads(v)
1375 except (KeyError, AttributeError, IndexError, ValueError, TypeError):
1376 logging.getLogger(__name__).exception('failed to deserialize')
1377 self.clear()
1378
1379 def keys(self):
1380 # type: () -> Set[str]
1381 return {item[0] for item in self.items()}
1382
1383 def __iter__(self):
1384 return iter(self.keys())