]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/mgr_module.py
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
CommitLineData
3efd9988 1import ceph_module # noqa
3efd9988 2
7c673cae 3import logging
11fdf7f2 4import json
1adf2230 5import six
7c673cae 6import threading
11fdf7f2
TL
7from collections import defaultdict, namedtuple
8import rados
9import time
10
11PG_STATES = [
12 "active",
13 "clean",
14 "down",
15 "recovery_unfound",
16 "backfill_unfound",
17 "scrubbing",
18 "degraded",
19 "inconsistent",
20 "peering",
21 "repair",
22 "recovering",
23 "forced_recovery",
24 "backfill_wait",
25 "incomplete",
26 "stale",
27 "remapped",
28 "deep",
29 "backfilling",
30 "forced_backfill",
31 "backfill_toofull",
32 "recovery_wait",
33 "recovery_toofull",
34 "undersized",
35 "activating",
36 "peered",
37 "snaptrim",
38 "snaptrim_wait",
39 "snaptrim_error",
40 "creating",
41 "unknown"]
3efd9988
FG
42
43
44class CPlusPlusHandler(logging.Handler):
45 def __init__(self, module_inst):
46 super(CPlusPlusHandler, self).__init__()
47 self._module = module_inst
48
49 def emit(self, record):
50 if record.levelno <= logging.DEBUG:
51 ceph_level = 20
52 elif record.levelno <= logging.INFO:
53 ceph_level = 4
54 elif record.levelno <= logging.WARNING:
55 ceph_level = 1
56 else:
57 ceph_level = 0
58
59 self._module._ceph_log(ceph_level, self.format(record))
60
61
11fdf7f2
TL
62def configure_logger(module_inst, module_name):
63 """
64 Create and configure the logger with the specified module.
3efd9988 65
11fdf7f2
TL
66 A handler will be added to the root logger which will redirect
67 the messages from all loggers (incl. 3rd party libraries) to the
68 Ceph log.
3efd9988 69
11fdf7f2
TL
70 :param module_inst: The module instance.
71 :type module_inst: instance
72 :param module_name: The module name.
73 :type module_name: str
74 :return: Return the logger with the specified name.
75 """
76 logger = logging.getLogger(module_name)
77 # Don't filter any logs at the python level, leave it to C++.
78 # FIXME: We should learn the log level from C++ land, and then
79 # avoid calling the C++ level log when we know a message
80 # is of an insufficient level to be ultimately output.
81 logger.setLevel(logging.DEBUG) # Don't use NOTSET
82
83 root_logger = logging.getLogger()
84 # Add handler to the root logger, thus this module and all
85 # 3rd party libraries will log their messages to the Ceph log.
86 root_logger.addHandler(CPlusPlusHandler(module_inst))
87 # Set the log level to ``ERROR`` to ensure that we only get
88 # those message from 3rd party libraries (only effective if
89 # they use the default log level ``NOTSET``).
90 # Check https://docs.python.org/3/library/logging.html#logging.Logger.setLevel
91 # for more information about how the effective log level is
92 # determined.
93 root_logger.setLevel(logging.ERROR)
3efd9988
FG
94
95 return logger
7c673cae
FG
96
97
11fdf7f2
TL
98def unconfigure_logger(module_name=None):
99 """
100 :param module_name: The module name. Defaults to ``None``.
101 :type module_name: str or None
102 """
103 logger = logging.getLogger(module_name)
104 rm_handlers = [
105 h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
3efd9988
FG
106 for h in rm_handlers:
107 logger.removeHandler(h)
108
11fdf7f2 109
7c673cae
FG
110class CommandResult(object):
111 """
112 Use with MgrModule.send_command
113 """
11fdf7f2
TL
114
115 def __init__(self, tag=None):
7c673cae
FG
116 self.ev = threading.Event()
117 self.outs = ""
118 self.outb = ""
119 self.r = 0
120
121 # This is just a convenience for notifications from
122 # C++ land, to avoid passing addresses around in messages.
11fdf7f2 123 self.tag = tag if tag else ""
7c673cae
FG
124
125 def complete(self, r, outb, outs):
126 self.r = r
127 self.outb = outb
128 self.outs = outs
129 self.ev.set()
130
131 def wait(self):
132 self.ev.wait()
133 return self.r, self.outb, self.outs
134
135
11fdf7f2
TL
136class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
137 def __new__(cls, retval=0, stdout="", stderr=""):
138 """
139 Tuple containing the result of `handle_command()`
140
141 Only write to stderr if there is an error, or in extraordinary circumstances
142
143 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
144 is critical information to include there.
145
146 Everything programmatically consumable should be put on stdout
147
148 :param retval: return code. E.g. 0 or -errno.EINVAL
149 :type retval: int
150 :param stdout: data of this result.
151 :type stdout: str
152 :param stderr: Typically used for error messages.
153 :type stderr: str
154 """
155 return super(HandleCommandResult, cls).__new__(cls, retval, stdout, stderr)
156
157
3efd9988
FG
158class OSDMap(ceph_module.BasePyOSDMap):
159 def get_epoch(self):
160 return self._get_epoch()
161
162 def get_crush_version(self):
163 return self._get_crush_version()
164
165 def dump(self):
166 return self._dump()
167
11fdf7f2
TL
168 def get_pools(self):
169 # FIXME: efficient implementation
170 d = self._dump()
171 return dict([(p['pool'], p) for p in d['pools']])
172
173 def get_pools_by_name(self):
174 # FIXME: efficient implementation
175 d = self._dump()
176 return dict([(p['pool_name'], p) for p in d['pools']])
177
3efd9988
FG
178 def new_incremental(self):
179 return self._new_incremental()
180
181 def apply_incremental(self, inc):
182 return self._apply_incremental(inc)
183
184 def get_crush(self):
185 return self._get_crush()
186
187 def get_pools_by_take(self, take):
188 return self._get_pools_by_take(take).get('pools', [])
189
190 def calc_pg_upmaps(self, inc,
11fdf7f2
TL
191 max_deviation=.01, max_iterations=10, pools=None):
192 if pools is None:
193 pools = []
3efd9988
FG
194 return self._calc_pg_upmaps(
195 inc,
196 max_deviation, max_iterations, pools)
197
198 def map_pool_pgs_up(self, poolid):
199 return self._map_pool_pgs_up(poolid)
200
11fdf7f2
TL
201 def pg_to_up_acting_osds(self, pool_id, ps):
202 return self._pg_to_up_acting_osds(pool_id, ps)
203
204 def pool_raw_used_rate(self, pool_id):
205 return self._pool_raw_used_rate(pool_id)
206
207 def get_ec_profile(self, name):
208 # FIXME: efficient implementation
209 d = self._dump()
210 return d['erasure_code_profiles'].get(name, None)
211
212
3efd9988
FG
213class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
214 def get_epoch(self):
215 return self._get_epoch()
216
217 def dump(self):
218 return self._dump()
219
220 def set_osd_reweights(self, weightmap):
221 """
222 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
223 """
224 return self._set_osd_reweights(weightmap)
225
226 def set_crush_compat_weight_set_weights(self, weightmap):
227 """
228 weightmap is a dict, int to float. devices only. e.g.,
229 { 0: 3.4, 1: 3.3, 2: 3.334 }
230 """
231 return self._set_crush_compat_weight_set_weights(weightmap)
232
11fdf7f2 233
3efd9988 234class CRUSHMap(ceph_module.BasePyCRUSH):
b32b8144 235 ITEM_NONE = 0x7fffffff
11fdf7f2 236 DEFAULT_CHOOSE_ARGS = '-1'
b32b8144 237
3efd9988
FG
238 def dump(self):
239 return self._dump()
240
241 def get_item_weight(self, item):
242 return self._get_item_weight(item)
243
244 def get_item_name(self, item):
245 return self._get_item_name(item)
246
247 def find_takes(self):
248 return self._find_takes().get('takes', [])
249
250 def get_take_weight_osd_map(self, root):
251 uglymap = self._get_take_weight_osd_map(root)
11fdf7f2
TL
252 return {int(k): v for k, v in six.iteritems(uglymap.get('weights', {}))}
253
254 @staticmethod
255 def have_default_choose_args(dump):
256 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
257
258 @staticmethod
259 def get_default_choose_args(dump):
260 return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
261
262 def get_rule(self, rule_name):
263 # TODO efficient implementation
264 for rule in self.dump()['rules']:
265 if rule_name == rule['rule_name']:
266 return rule
267
268 return None
269
270 def get_rule_by_id(self, rule_id):
271 for rule in self.dump()['rules']:
272 if rule['rule_id'] == rule_id:
273 return rule
274
275 return None
276
277 def get_rule_root(self, rule_name):
278 rule = self.get_rule(rule_name)
279 if rule is None:
280 return None
281
282 try:
283 first_take = [s for s in rule['steps'] if s['op'] == 'take'][0]
284 except IndexError:
285 self.log.warn("CRUSH rule '{0}' has no 'take' step".format(
286 rule_name))
287 return None
288 else:
289 return first_take['item']
290
291 def get_osds_under(self, root_id):
292 # TODO don't abuse dump like this
293 d = self.dump()
294 buckets = dict([(b['id'], b) for b in d['buckets']])
295
296 osd_list = []
297
298 def accumulate(b):
299 for item in b['items']:
300 if item['id'] >= 0:
301 osd_list.append(item['id'])
302 else:
303 try:
304 accumulate(buckets[item['id']])
305 except KeyError:
306 pass
307
308 accumulate(buckets[root_id])
309
310 return osd_list
311
312 def device_class_counts(self):
313 result = defaultdict(int)
314 # TODO don't abuse dump like this
315 d = self.dump()
316 for device in d['devices']:
317 cls = device.get('class', None)
318 result[cls] += 1
319
320 return dict(result)
321
322
323class CLICommand(object):
324 COMMANDS = {}
325
326 def __init__(self, prefix, args="", desc="", perm="rw"):
327 self.prefix = prefix
328 self.args = args
329 self.args_dict = {}
330 self.desc = desc
331 self.perm = perm
332 self.func = None
333 self._parse_args()
334
335 def _parse_args(self):
336 if not self.args:
337 return
338 args = self.args.split(" ")
339 for arg in args:
340 arg_desc = arg.strip().split(",")
341 arg_d = {}
342 for kv in arg_desc:
343 k, v = kv.split("=")
344 if k != "name":
345 arg_d[k] = v
346 else:
347 self.args_dict[v] = arg_d
348
349 def __call__(self, func):
350 self.func = func
351 self.COMMANDS[self.prefix] = self
352 return self.func
353
354 def call(self, mgr, cmd_dict, inbuf):
355 kwargs = {}
356 for a, d in self.args_dict.items():
357 if 'req' in d and d['req'] == "false" and a not in cmd_dict:
358 continue
359 kwargs[a.replace("-", "_")] = cmd_dict[a]
360 if inbuf:
361 kwargs['inbuf'] = inbuf
362 return self.func(mgr, **kwargs)
363
364 @classmethod
365 def dump_cmd_list(cls):
366 return [{
367 'cmd': '{} {}'.format(cmd.prefix, cmd.args),
368 'desc': cmd.desc,
369 'perm': cmd.perm
370 } for _, cmd in cls.COMMANDS.items()]
371
372
373def CLIReadCommand(prefix, args="", desc=""):
374 return CLICommand(prefix, args, desc, "r")
375
376
377def CLIWriteCommand(prefix, args="", desc=""):
378 return CLICommand(prefix, args, desc, "w")
379
380
381def _get_localized_key(prefix, key):
382 return '{}/{}'.format(prefix, key)
383
384
385class Option(dict):
386 """
387 Helper class to declare options for MODULE_OPTIONS list.
388
389 Caveat: it uses argument names matching Python keywords (type, min, max),
390 so any further processing should happen in a separate method.
391
392 TODO: type validation.
393 """
394
395 def __init__(
396 self, name,
397 default=None,
398 type='str',
399 desc=None, longdesc=None,
400 min=None, max=None,
401 enum_allowed=None,
402 see_also=None,
403 tags=None,
404 runtime=False,
405 ):
406 super(Option, self).__init__(
407 (k, v) for k, v in vars().items()
408 if k != 'self' and v is not None)
409
3efd9988
FG
410
411class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
412 """
413 Standby modules only implement a serve and shutdown method, they
414 are not permitted to implement commands and they do not receive
415 any notifications.
416
b32b8144 417 They only have access to the mgrmap (for accessing service URI info
3efd9988
FG
418 from their active peer), and to configuration settings (read only).
419 """
420
11fdf7f2
TL
421 MODULE_OPTIONS = []
422 MODULE_OPTION_DEFAULTS = {}
423
3efd9988
FG
424 def __init__(self, module_name, capsule):
425 super(MgrStandbyModule, self).__init__(capsule)
426 self.module_name = module_name
427 self._logger = configure_logger(self, module_name)
11fdf7f2
TL
428 # see also MgrModule.__init__()
429 for o in self.MODULE_OPTIONS:
430 if 'default' in o:
431 if 'type' in o:
432 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
433 else:
434 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
3efd9988
FG
435
436 def __del__(self):
11fdf7f2 437 unconfigure_logger()
3efd9988
FG
438
439 @property
440 def log(self):
441 return self._logger
442
443 def serve(self):
444 """
445 The serve method is mandatory for standby modules.
446 :return:
447 """
448 raise NotImplementedError()
449
450 def get_mgr_id(self):
451 return self._ceph_get_mgr_id()
452
11fdf7f2 453 def get_module_option(self, key, default=None):
b32b8144
FG
454 """
455 Retrieve the value of a persistent configuration setting
456
457 :param str key:
458 :param default: the default value of the config if it is not found
459 :return: str
460 """
11fdf7f2 461 r = self._ceph_get_module_option(key)
b32b8144 462 if r is None:
11fdf7f2 463 return self.MODULE_OPTION_DEFAULTS.get(key, default)
b32b8144
FG
464 else:
465 return r
466
11fdf7f2
TL
467 def get_ceph_option(self, key):
468 return self._ceph_get_option(key)
469
470 def get_store(self, key):
471 """
472 Retrieve the value of a persistent KV store entry
473
474 :param key: String
475 :return: Byte string or None
476 """
477 return self._ceph_get_store(key)
3efd9988
FG
478
479 def get_active_uri(self):
480 return self._ceph_get_active_uri()
481
11fdf7f2
TL
482 def get_localized_module_option(self, key, default=None):
483 r = self._ceph_get_module_option(key, self.get_mgr_id())
3efd9988 484 if r is None:
11fdf7f2
TL
485 return self.MODULE_OPTION_DEFAULTS.get(key, default)
486 else:
487 return r
3efd9988 488
3efd9988
FG
489
490class MgrModule(ceph_module.BaseMgrModule):
7c673cae 491 COMMANDS = []
11fdf7f2
TL
492 MODULE_OPTIONS = []
493 MODULE_OPTION_DEFAULTS = {}
7c673cae 494
3efd9988
FG
495 # Priority definitions for perf counters
496 PRIO_CRITICAL = 10
497 PRIO_INTERESTING = 8
498 PRIO_USEFUL = 5
499 PRIO_UNINTERESTING = 2
500 PRIO_DEBUGONLY = 0
501
502 # counter value types
503 PERFCOUNTER_TIME = 1
504 PERFCOUNTER_U64 = 2
505
506 # counter types
507 PERFCOUNTER_LONGRUNAVG = 4
508 PERFCOUNTER_COUNTER = 8
509 PERFCOUNTER_HISTOGRAM = 0x10
28e407b8 510 PERFCOUNTER_TYPE_MASK = ~3
7c673cae 511
1adf2230
AA
512 # units supported
513 BYTES = 0
514 NONE = 1
11fdf7f2
TL
515
516 # Cluster log priorities
517 CLUSTER_LOG_PRIO_DEBUG = 0
518 CLUSTER_LOG_PRIO_INFO = 1
519 CLUSTER_LOG_PRIO_SEC = 2
520 CLUSTER_LOG_PRIO_WARN = 3
521 CLUSTER_LOG_PRIO_ERROR = 4
522
3efd9988
FG
523 def __init__(self, module_name, py_modules_ptr, this_ptr):
524 self.module_name = module_name
7c673cae 525
3efd9988
FG
526 # If we're taking over from a standby module, let's make sure
527 # its logger was unconfigured before we hook ours up
11fdf7f2 528 unconfigure_logger()
3efd9988 529 self._logger = configure_logger(self, module_name)
7c673cae 530
3efd9988 531 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
7c673cae 532
3efd9988 533 self._version = self._ceph_get_version()
7c673cae 534
3efd9988 535 self._perf_schema_cache = None
7c673cae 536
11fdf7f2
TL
537 # Keep a librados instance for those that need it.
538 self._rados = None
539
540 for o in self.MODULE_OPTIONS:
541 if 'default' in o:
542 if 'type' in o:
543 # we'll assume the declared type matches the
544 # supplied default value's type.
545 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
546 else:
547 # module not declaring it's type, so normalize the
548 # default value to be a string for consistent behavior
549 # with default and user-supplied option values.
550 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
551
3efd9988 552 def __del__(self):
11fdf7f2 553 unconfigure_logger()
3efd9988 554
11fdf7f2
TL
555 @classmethod
556 def _register_commands(cls):
557 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
7c673cae
FG
558
559 @property
560 def log(self):
561 return self._logger
562
11fdf7f2
TL
563 def cluster_log(self, channel, priority, message):
564 """
565 :param channel: The log channel. This can be 'cluster', 'audit', ...
566 :type channel: str
567 :param priority: The log message priority. This can be
568 CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
569 CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
570 CLUSTER_LOG_PRIO_ERROR.
571 :type priority: int
572 :param message: The message to log.
573 :type message: str
574 """
575 self._ceph_cluster_log(channel, priority, message)
576
7c673cae
FG
577 @property
578 def version(self):
579 return self._version
580
3efd9988
FG
581 def get_context(self):
582 """
583 :return: a Python capsule containing a C++ CephContext pointer
584 """
585 return self._ceph_get_context()
586
7c673cae
FG
587 def notify(self, notify_type, notify_id):
588 """
589 Called by the ceph-mgr service to notify the Python plugin
590 that new state is available.
11fdf7f2
TL
591
592 :param notify_type: string indicating what kind of notification,
593 such as osd_map, mon_map, fs_map, mon_status,
594 health, pg_summary, command, service_map
595 :param notify_id: string (may be empty) that optionally specifies
596 which entity is being notified about. With
597 "command" notifications this is set to the tag
598 ``from send_command``.
599 """
600 pass
601
602 def config_notify(self):
603 """
604 Called by the ceph-mgr service to notify the Python plugin
605 that the configuration may have changed. Modules will want to
606 refresh any configuration values stored in config variables.
7c673cae
FG
607 """
608 pass
609
610 def serve(self):
611 """
612 Called by the ceph-mgr service to start any server that
613 is provided by this Python plugin. The implementation
614 of this function should block until ``shutdown`` is called.
615
616 You *must* implement ``shutdown`` if you implement ``serve``
617 """
618 pass
619
620 def shutdown(self):
621 """
622 Called by the ceph-mgr service to request that this
623 module drop out of its serve() function. You do not
624 need to implement this if you do not implement serve()
625
626 :return: None
627 """
11fdf7f2
TL
628 if self._rados:
629 self._rados.shutdown()
7c673cae
FG
630
631 def get(self, data_name):
632 """
11fdf7f2
TL
633 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
634
635 :param str data_name: Valid things to fetch are osd_crush_map_text,
636 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
637 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
638 health, mon_status, devices, device <devid>.
639
640 Note:
641 All these structures have their own JSON representations: experiment
642 or look at the C++ ``dump()`` methods to learn about them.
7c673cae 643 """
3efd9988 644 return self._ceph_get(data_name)
7c673cae 645
94b18763 646 def _stattype_to_str(self, stattype):
11fdf7f2 647
94b18763
FG
648 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
649 if typeonly == 0:
650 return 'gauge'
651 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
652 # this lie matches the DaemonState decoding: only val, no counts
653 return 'counter'
654 if typeonly == self.PERFCOUNTER_COUNTER:
655 return 'counter'
656 if typeonly == self.PERFCOUNTER_HISTOGRAM:
657 return 'histogram'
11fdf7f2 658
94b18763
FG
659 return ''
660
28e407b8
AA
661 def _perfvalue_to_value(self, stattype, value):
662 if stattype & self.PERFCOUNTER_TIME:
663 # Convert from ns to seconds
664 return value / 1000000000.0
665 else:
666 return value
667
1adf2230
AA
668 def _unit_to_str(self, unit):
669 if unit == self.NONE:
670 return "/s"
671 elif unit == self.BYTES:
11fdf7f2
TL
672 return "B/s"
673
674 @staticmethod
675 def to_pretty_iec(n):
676 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
677 (20, 'Mi'), (10, 'Ki')]:
678 if n > 10 << bits:
679 return str(n >> bits) + ' ' + suffix
680 return str(n) + ' '
681
682 @staticmethod
683 def get_pretty_row(elems, width):
684 """
685 Takes an array of elements and returns a string with those elements
686 formatted as a table row. Useful for polling modules.
687
688 :param elems: the elements to be printed
689 :param width: the width of the terminal
690 """
691 n = len(elems)
692 column_width = int(width / n)
693
694 ret = '|'
695 for elem in elems:
696 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
697
698 return ret
699
700 def get_pretty_header(self, elems, width):
701 """
702 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
703
704 :param elems: the elements to be printed
705 :param width: the width of the terminal
706 """
707 n = len(elems)
708 column_width = int(width / n)
709
710 # dash line
711 ret = '+'
712 for i in range(0, n):
713 ret += '-' * (column_width - 1) + '+'
714 ret += '\n'
715
716 # title
717 ret += self.get_pretty_row(elems, width)
718 ret += '\n'
719
720 # dash line
721 ret += '+'
722 for i in range(0, n):
723 ret += '-' * (column_width - 1) + '+'
724 ret += '\n'
725
726 return ret
727
7c673cae
FG
728 def get_server(self, hostname):
729 """
11fdf7f2
TL
730 Called by the plugin to fetch metadata about a particular hostname from
731 ceph-mgr.
732
733 This is information that ceph-mgr has gleaned from the daemon metadata
734 reported by daemons running on a particular server.
7c673cae 735
11fdf7f2 736 :param hostname: a hostname
7c673cae 737 """
3efd9988 738 return self._ceph_get_server(hostname)
7c673cae 739
c07f9fc5
FG
740 def get_perf_schema(self, svc_type, svc_name):
741 """
742 Called by the plugin to fetch perf counter schema info.
743 svc_name can be nullptr, as can svc_type, in which case
744 they are wildcards
745
11fdf7f2
TL
746 :param str svc_type:
747 :param str svc_name:
c07f9fc5
FG
748 :return: list of dicts describing the counters requested
749 """
3efd9988 750 return self._ceph_get_perf_schema(svc_type, svc_name)
c07f9fc5 751
7c673cae
FG
752 def get_counter(self, svc_type, svc_name, path):
753 """
11fdf7f2
TL
754 Called by the plugin to fetch the latest performance counter data for a
755 particular counter on a particular service.
7c673cae 756
11fdf7f2
TL
757 :param str svc_type:
758 :param str svc_name:
759 :param str path: a period-separated concatenation of the subsystem and the
760 counter name, for example "mds.inodes".
761 :return: A list of two-tuples of (timestamp, value) is returned. This may be
762 empty if no data is available.
7c673cae 763 """
3efd9988 764 return self._ceph_get_counter(svc_type, svc_name, path)
7c673cae 765
11fdf7f2
TL
766 def get_latest_counter(self, svc_type, svc_name, path):
767 """
768 Called by the plugin to fetch only the newest performance counter data
769 pointfor a particular counter on a particular service.
770
771 :param str svc_type:
772 :param str svc_name:
773 :param str path: a period-separated concatenation of the subsystem and the
774 counter name, for example "mds.inodes".
775 :return: A list of two-tuples of (timestamp, value) is returned. This may be
776 empty if no data is available.
777 """
778 return self._ceph_get_latest_counter(svc_type, svc_name, path)
779
7c673cae
FG
780 def list_servers(self):
781 """
11fdf7f2
TL
782 Like ``get_server``, but gives information about all servers (i.e. all
783 unique hostnames that have been mentioned in daemon metadata)
784
785 :return: a list of information about all servers
786 :rtype: list
7c673cae 787 """
3efd9988 788 return self._ceph_get_server(None)
7c673cae
FG
789
790 def get_metadata(self, svc_type, svc_id):
791 """
11fdf7f2 792 Fetch the daemon metadata for a particular service.
7c673cae 793
11fdf7f2
TL
794 ceph-mgr fetches metadata asynchronously, so are windows of time during
795 addition/removal of services where the metadata is not available to
796 modules. ``None`` is returned if no metadata is available.
797
798 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
799 :param str svc_id: service id. convert OSD integer IDs to strings when
800 calling this
801 :rtype: dict, or None if no metadata found
7c673cae 802 """
3efd9988 803 return self._ceph_get_metadata(svc_type, svc_id)
7c673cae 804
224ce89b
WB
805 def get_daemon_status(self, svc_type, svc_id):
806 """
807 Fetch the latest status for a particular service daemon.
808
11fdf7f2
TL
809 This method may return ``None`` if no status information is
810 available, for example because the daemon hasn't fully started yet.
811
224ce89b
WB
812 :param svc_type: string (e.g., 'rgw')
813 :param svc_id: string
11fdf7f2 814 :return: dict, or None if the service is not found
224ce89b 815 """
3efd9988 816 return self._ceph_get_daemon_status(svc_type, svc_id)
224ce89b 817
11fdf7f2
TL
818 def mon_command(self, cmd_dict):
819 """
820 Helper for modules that do simple, synchronous mon command
821 execution.
822
823 See send_command for general case.
824
825 :return: status int, out std, err str
826 """
827
828 t1 = time.time()
829 result = CommandResult()
830 self.send_command(result, "mon", "", json.dumps(cmd_dict), "")
831 r = result.wait()
832 t2 = time.time()
833
834 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
835 cmd_dict['prefix'], r[0], t2 - t1
836 ))
837
838 return r
839
7c673cae
FG
840 def send_command(self, *args, **kwargs):
841 """
842 Called by the plugin to send a command to the mon
843 cluster.
11fdf7f2
TL
844
845 :param CommandResult result: an instance of the ``CommandResult``
846 class, defined in the same module as MgrModule. This acts as a
847 completion and stores the output of the command. Use
848 ``CommandResult.wait()`` if you want to block on completion.
849 :param str svc_type:
850 :param str svc_id:
851 :param str command: a JSON-serialized command. This uses the same
852 format as the ceph command line, which is a dictionary of command
853 arguments, with the extra ``prefix`` key containing the command
854 name itself. Consult MonCommands.h for available commands and
855 their expected arguments.
856 :param str tag: used for nonblocking operation: when a command
857 completes, the ``notify()`` callback on the MgrModule instance is
858 triggered, with notify_type set to "command", and notify_id set to
859 the tag of the command.
7c673cae 860 """
3efd9988 861 self._ceph_send_command(*args, **kwargs)
7c673cae 862
c07f9fc5
FG
863 def set_health_checks(self, checks):
864 """
c07f9fc5
FG
865 Set the module's current map of health checks. Argument is a
866 dict of check names to info, in this form:
867
11fdf7f2
TL
868 ::
869
c07f9fc5
FG
870 {
871 'CHECK_FOO': {
872 'severity': 'warning', # or 'error'
873 'summary': 'summary string',
874 'detail': [ 'list', 'of', 'detail', 'strings' ],
875 },
876 'CHECK_BAR': {
877 'severity': 'error',
878 'summary': 'bars are bad',
879 'detail': [ 'too hard' ],
880 },
881 }
882
883 :param list: dict of health check dicts
884 """
3efd9988 885 self._ceph_set_health_checks(checks)
c07f9fc5 886
11fdf7f2
TL
887 def _handle_command(self, inbuf, cmd):
888 if cmd['prefix'] not in CLICommand.COMMANDS:
889 return self.handle_command(inbuf, cmd)
890 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
891
892 def handle_command(self, inbuf, cmd):
7c673cae
FG
893 """
894 Called by ceph-mgr to request the plugin to handle one
895 of the commands that it declared in self.COMMANDS
896
897 Return a status code, an output buffer, and an
898 output string. The output buffer is for data results,
899 the output string is for informative text.
900
11fdf7f2
TL
901 :param inbuf: content of any "-i <file>" supplied to ceph cli
902 :type inbuf: str
903 :param cmd: from Ceph's cmdmap_t
904 :type cmd: dict
7c673cae 905
11fdf7f2 906 :return: HandleCommandResult or a 3-tuple of (int, str, str)
7c673cae
FG
907 """
908
909 # Should never get called if they didn't declare
910 # any ``COMMANDS``
911 raise NotImplementedError()
912
31f18b77
FG
913 def get_mgr_id(self):
914 """
11fdf7f2
TL
915 Retrieve the name of the manager daemon where this plugin
916 is currently being executed (i.e. the active manager).
31f18b77
FG
917
918 :return: str
919 """
3efd9988 920 return self._ceph_get_mgr_id()
31f18b77 921
11fdf7f2
TL
922 def get_ceph_option(self, key):
923 return self._ceph_get_option(key)
7c673cae 924
11fdf7f2
TL
925 def _validate_module_option(self, key):
926 """
927 Helper: don't allow get/set config callers to
928 access config options that they didn't declare
929 in their schema.
7c673cae 930 """
11fdf7f2
TL
931 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
932 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
933 format(key, self.__class__.__name__))
934
935 def _get_module_option(self, key, default, localized_prefix=""):
936 r = self._ceph_get_module_option(self.module_name, key,
937 localized_prefix)
3efd9988 938 if r is None:
11fdf7f2 939 return self.MODULE_OPTION_DEFAULTS.get(key, default)
3efd9988
FG
940 else:
941 return r
7c673cae 942
11fdf7f2
TL
943 def get_module_option(self, key, default=None):
944 """
945 Retrieve the value of a persistent configuration setting
946
947 :param str key:
948 :param str default:
949 :return: str
950 """
951 self._validate_module_option(key)
952 return self._get_module_option(key, default)
953
954 def get_module_option_ex(self, module, key, default=None):
955 """
956 Retrieve the value of a persistent configuration setting
957 for the specified module.
958
959 :param str module: The name of the module, e.g. 'dashboard'
960 or 'telemetry'.
961 :param str key: The configuration key, e.g. 'server_addr'.
962 :param str,None default: The default value to use when the
963 returned value is ``None``. Defaults to ``None``.
964 :return: str,int,bool,float,None
965 """
966 if module == self.module_name:
967 self._validate_module_option(key)
968 r = self._ceph_get_module_option(module, key)
969 return default if r is None else r
970
971 def get_store_prefix(self, key_prefix):
31f18b77 972 """
11fdf7f2
TL
973 Retrieve a dict of KV store keys to values, where the keys
974 have the given prefix
31f18b77 975
11fdf7f2 976 :param str key_prefix:
31f18b77
FG
977 :return: str
978 """
11fdf7f2 979 return self._ceph_get_store_prefix(key_prefix)
31f18b77 980
11fdf7f2
TL
981 def _set_localized(self, key, val, setter):
982 return setter(_get_localized_key(self.get_mgr_id(), key), val)
983
984 def get_localized_module_option(self, key, default=None):
224ce89b
WB
985 """
986 Retrieve localized configuration for this ceph-mgr instance
11fdf7f2
TL
987 :param str key:
988 :param str default:
224ce89b
WB
989 :return: str
990 """
11fdf7f2
TL
991 self._validate_module_option(key)
992 return self._get_module_option(key, default, self.get_mgr_id())
224ce89b 993
11fdf7f2
TL
994 def _set_module_option(self, key, val):
995 return self._ceph_set_module_option(self.module_name, key, str(val))
224ce89b 996
11fdf7f2 997 def set_module_option(self, key, val):
7c673cae
FG
998 """
999 Set the value of a persistent configuration setting
1000
11fdf7f2
TL
1001 :param str key:
1002 :type val: str | None
7c673cae 1003 """
11fdf7f2
TL
1004 self._validate_module_option(key)
1005 return self._set_module_option(key, val)
7c673cae 1006
11fdf7f2
TL
1007 def set_module_option_ex(self, module, key, val):
1008 """
1009 Set the value of a persistent configuration setting
1010 for the specified module.
1011
1012 :param str module:
1013 :param str key:
1014 :param str val:
1015 """
1016 if module == self.module_name:
1017 self._validate_module_option(key)
1018 return self._ceph_set_module_option(module, key, str(val))
1019
1020 def set_localized_module_option(self, key, val):
224ce89b
WB
1021 """
1022 Set localized configuration for this ceph-mgr instance
11fdf7f2
TL
1023 :param str key:
1024 :param str val:
224ce89b
WB
1025 :return: str
1026 """
11fdf7f2
TL
1027 self._validate_module_option(key)
1028 return self._set_localized(key, val, self._set_module_option)
224ce89b 1029
11fdf7f2 1030 def set_store(self, key, val):
7c673cae 1031 """
11fdf7f2
TL
1032 Set a value in this module's persistent key value store.
1033 If val is None, remove key from store
7c673cae 1034
11fdf7f2
TL
1035 :param str key:
1036 :param str val:
7c673cae 1037 """
11fdf7f2 1038 self._ceph_set_store(key, val)
7c673cae 1039
11fdf7f2 1040 def get_store(self, key, default=None):
7c673cae 1041 """
11fdf7f2 1042 Get a value from this module's persistent key value store
7c673cae 1043 """
11fdf7f2
TL
1044 r = self._ceph_get_store(key)
1045 if r is None:
1046 return default
7c673cae 1047 else:
11fdf7f2
TL
1048 return r
1049
1050 def get_localized_store(self, key, default=None):
1051 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1052 if r is None:
1053 r = self._ceph_get_store(key)
1054 if r is None:
1055 r = default
1056 return r
1057
1058 def set_localized_store(self, key, val):
1059 return self._set_localized(key, val, self.set_store)
224ce89b
WB
1060
1061 def self_test(self):
1062 """
1063 Run a self-test on the module. Override this function and implement
1064 a best as possible self-test for (automated) testing of the module
11fdf7f2
TL
1065
1066 Indicate any failures by raising an exception. This does not have
1067 to be pretty, it's mainly for picking up regressions during
1068 development, rather than use in the field.
1069
1070 :return: None, or an advisory string for developer interest, such
1071 as a json dump of some state.
224ce89b
WB
1072 """
1073 pass
3efd9988
FG
1074
1075 def get_osdmap(self):
1076 """
1077 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1078 OSDMap.
1079 :return: OSDMap
1080 """
1081 return self._ceph_get_osdmap()
1082
11fdf7f2
TL
1083 def get_latest(self, daemon_type, daemon_name, counter):
1084 data = self.get_latest_counter(
1085 daemon_type, daemon_name, counter)[counter]
1086 if data:
1087 return data[1]
1088 else:
1089 return 0
1090
1091 def get_latest_avg(self, daemon_type, daemon_name, counter):
1092 data = self.get_latest_counter(
1093 daemon_type, daemon_name, counter)[counter]
1094 if data:
1095 return data[1], data[2]
1096 else:
1097 return 0, 0
1098
1099 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL,
1100 services=("mds", "mon", "osd",
1101 "rbd-mirror", "rgw")):
3efd9988
FG
1102 """
1103 Return the perf counters currently known to this ceph-mgr
1104 instance, filtered by priority equal to or greater than `prio_limit`.
1105
11fdf7f2 1106 The result is a map of string to dict, associating services
3efd9988
FG
1107 (like "osd.123") with their counters. The counter
1108 dict for each service maps counter paths to a counter
1109 info structure, which is the information from
1110 the schema, plus an additional "value" member with the latest
1111 value.
1112 """
1113
1114 result = defaultdict(dict)
1115
3efd9988
FG
1116 for server in self.list_servers():
1117 for service in server['services']:
11fdf7f2 1118 if service['type'] not in services:
3efd9988
FG
1119 continue
1120
1121 schema = self.get_perf_schema(service['type'], service['id'])
1122 if not schema:
1123 self.log.warn("No perf counter schema for {0}.{1}".format(
1124 service['type'], service['id']
1125 ))
1126 continue
1127
1128 # Value is returned in a potentially-multi-service format,
1129 # get just the service we're asking about
11fdf7f2
TL
1130 svc_full_name = "{0}.{1}".format(
1131 service['type'], service['id'])
3efd9988
FG
1132 schema = schema[svc_full_name]
1133
1134 # Populate latest values
1135 for counter_path, counter_schema in schema.items():
1136 # self.log.debug("{0}: {1}".format(
1137 # counter_path, json.dumps(counter_schema)
1138 # ))
1139 if counter_schema['priority'] < prio_limit:
1140 continue
1141
28e407b8
AA
1142 counter_info = dict(counter_schema)
1143
1144 # Also populate count for the long running avgs
1145 if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
11fdf7f2 1146 v, c = self.get_latest_avg(
28e407b8
AA
1147 service['type'],
1148 service['id'],
1149 counter_path
1150 )
1151 counter_info['value'], counter_info['count'] = v, c
1152 result[svc_full_name][counter_path] = counter_info
1153 else:
11fdf7f2 1154 counter_info['value'] = self.get_latest(
28e407b8
AA
1155 service['type'],
1156 service['id'],
1157 counter_path
1158 )
1159
3efd9988
FG
1160 result[svc_full_name][counter_path] = counter_info
1161
1162 self.log.debug("returning {0} counter".format(len(result)))
1163
1164 return result
1165
1166 def set_uri(self, uri):
1167 """
1168 If the module exposes a service, then call this to publish the
1169 address once it is available.
1170
1171 :return: a string
1172 """
1173 return self._ceph_set_uri(uri)
94b18763
FG
1174
1175 def have_mon_connection(self):
1176 """
1177 Check whether this ceph-mgr daemon has an open connection
1178 to a monitor. If it doesn't, then it's likely that the
1179 information we have about the cluster is out of date,
1180 and/or the monitor cluster is down.
1181 """
1182
28e407b8 1183 return self._ceph_have_mon_connection()
11fdf7f2
TL
1184
1185 def update_progress_event(self, evid, desc, progress):
1186 return self._ceph_update_progress_event(str(evid), str(desc), float(progress))
1187
1188 def complete_progress_event(self, evid):
1189 return self._ceph_complete_progress_event(str(evid))
1190
1191 def clear_all_progress_events(self):
1192 return self._ceph_clear_all_progress_events()
1193
1194 @property
1195 def rados(self):
1196 """
1197 A librados instance to be shared by any classes within
1198 this mgr module that want one.
1199 """
1200 if self._rados:
1201 return self._rados
1202
1203 ctx_capsule = self.get_context()
1204 self._rados = rados.Rados(context=ctx_capsule)
1205 self._rados.connect()
1206
1207 return self._rados
1208
1209 @staticmethod
1210 def can_run():
1211 """
1212 Implement this function to report whether the module's dependencies
1213 are met. For example, if the module needs to import a particular
1214 dependency to work, then use a try/except around the import at
1215 file scope, and then report here if the import failed.
1216
1217 This will be called in a blocking way from the C++ code, so do not
1218 do any I/O that could block in this function.
1219
1220 :return a 2-tuple consisting of a boolean and explanatory string
1221 """
1222
1223 return True, ""
1224
1225 def remote(self, module_name, method_name, *args, **kwargs):
1226 """
1227 Invoke a method on another module. All arguments, and the return
1228 value from the other module must be serializable.
1229
1230 Limitation: Do not import any modules within the called method.
1231 Otherwise you will get an error in Python 2::
1232
1233 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1234
1235
1236
1237 :param module_name: Name of other module. If module isn't loaded,
1238 an ImportError exception is raised.
1239 :param method_name: Method name. If it does not exist, a NameError
1240 exception is raised.
1241 :param args: Argument tuple
1242 :param kwargs: Keyword argument dict
1243 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1244 :raises ImportError: No such module
1245 """
1246 return self._ceph_dispatch_remote(module_name, method_name,
1247 args, kwargs)
1248
1249 def add_osd_perf_query(self, query):
1250 """
1251 Register an OSD perf query. Argument is a
1252 dict of the query parameters, in this form:
1253
1254 ::
1255
1256 {
1257 'key_descriptor': [
1258 {'type': subkey_type, 'regex': regex_pattern},
1259 ...
1260 ],
1261 'performance_counter_descriptors': [
1262 list, of, descriptor, types
1263 ],
1264 'limit': {'order_by': performance_counter_type, 'max_count': n},
1265 }
1266
1267 Valid subkey types:
1268 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1269 'pg_id', 'object_name', 'snap_id'
1270 Valid performance counter types:
1271 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1272 'latency', 'write_latency', 'read_latency'
1273
1274 :param object query: query
1275 :rtype: int (query id)
1276 """
1277 return self._ceph_add_osd_perf_query(query)
1278
1279 def remove_osd_perf_query(self, query_id):
1280 """
1281 Unregister an OSD perf query.
1282
1283 :param int query_id: query ID
1284 """
1285 return self._ceph_remove_osd_perf_query(query_id)
1286
1287 def get_osd_perf_counters(self, query_id):
1288 """
1289 Get stats collected for an OSD perf query.
1290
1291 :param int query_id: query ID
1292 """
1293 return self._ceph_get_osd_perf_counters(query_id)