]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
554310d9813fc6713866de059f36e150ec3271c6
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1 import ceph_module # noqa
2
3 try:
4 from typing import Set, Tuple, Iterator, Any, Dict, Optional, Callable, List
5 except ImportError:
6 # just for type checking
7 pass
8 import logging
9 import errno
10 import json
11 import six
12 import threading
13 from collections import defaultdict, namedtuple
14 import rados
15 import re
16 import time
17
18 PG_STATES = [
19 "active",
20 "clean",
21 "down",
22 "recovery_unfound",
23 "backfill_unfound",
24 "scrubbing",
25 "degraded",
26 "inconsistent",
27 "peering",
28 "repair",
29 "recovering",
30 "forced_recovery",
31 "backfill_wait",
32 "incomplete",
33 "stale",
34 "remapped",
35 "deep",
36 "backfilling",
37 "forced_backfill",
38 "backfill_toofull",
39 "recovery_wait",
40 "recovery_toofull",
41 "undersized",
42 "activating",
43 "peered",
44 "snaptrim",
45 "snaptrim_wait",
46 "snaptrim_error",
47 "creating",
48 "unknown"]
49
50
51 class CommandResult(object):
52 """
53 Use with MgrModule.send_command
54 """
55
56 def __init__(self, tag=None):
57 self.ev = threading.Event()
58 self.outs = ""
59 self.outb = ""
60 self.r = 0
61
62 # This is just a convenience for notifications from
63 # C++ land, to avoid passing addresses around in messages.
64 self.tag = tag if tag else ""
65
66 def complete(self, r, outb, outs):
67 self.r = r
68 self.outb = outb
69 self.outs = outs
70 self.ev.set()
71
72 def wait(self):
73 self.ev.wait()
74 return self.r, self.outb, self.outs
75
76
77 class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
78 def __new__(cls, retval=0, stdout="", stderr=""):
79 """
80 Tuple containing the result of `handle_command()`
81
82 Only write to stderr if there is an error, or in extraordinary circumstances
83
84 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
85 is critical information to include there.
86
87 Everything programmatically consumable should be put on stdout
88
89 :param retval: return code. E.g. 0 or -errno.EINVAL
90 :type retval: int
91 :param stdout: data of this result.
92 :type stdout: str
93 :param stderr: Typically used for error messages.
94 :type stderr: str
95 """
96 return super(HandleCommandResult, cls).__new__(cls, retval, stdout, stderr)
97
98
99 class MonCommandFailed(RuntimeError): pass
100
101
102 class OSDMap(ceph_module.BasePyOSDMap):
103 def get_epoch(self):
104 return self._get_epoch()
105
106 def get_crush_version(self):
107 return self._get_crush_version()
108
109 def dump(self):
110 return self._dump()
111
112 def get_pools(self):
113 # FIXME: efficient implementation
114 d = self._dump()
115 return dict([(p['pool'], p) for p in d['pools']])
116
117 def get_pools_by_name(self):
118 # FIXME: efficient implementation
119 d = self._dump()
120 return dict([(p['pool_name'], p) for p in d['pools']])
121
122 def new_incremental(self):
123 return self._new_incremental()
124
125 def apply_incremental(self, inc):
126 return self._apply_incremental(inc)
127
128 def get_crush(self):
129 return self._get_crush()
130
131 def get_pools_by_take(self, take):
132 return self._get_pools_by_take(take).get('pools', [])
133
134 def calc_pg_upmaps(self, inc,
135 max_deviation=.01, max_iterations=10, pools=None):
136 if pools is None:
137 pools = []
138 return self._calc_pg_upmaps(
139 inc,
140 max_deviation, max_iterations, pools)
141
142 def map_pool_pgs_up(self, poolid):
143 return self._map_pool_pgs_up(poolid)
144
145 def pg_to_up_acting_osds(self, pool_id, ps):
146 return self._pg_to_up_acting_osds(pool_id, ps)
147
148 def pool_raw_used_rate(self, pool_id):
149 return self._pool_raw_used_rate(pool_id)
150
151 def get_ec_profile(self, name):
152 # FIXME: efficient implementation
153 d = self._dump()
154 return d['erasure_code_profiles'].get(name, None)
155
156 def get_require_osd_release(self):
157 d = self._dump()
158 return d['require_osd_release']
159
160
161 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
162 def get_epoch(self):
163 return self._get_epoch()
164
165 def dump(self):
166 return self._dump()
167
168 def set_osd_reweights(self, weightmap):
169 """
170 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
171 """
172 return self._set_osd_reweights(weightmap)
173
174 def set_crush_compat_weight_set_weights(self, weightmap):
175 """
176 weightmap is a dict, int to float. devices only. e.g.,
177 { 0: 3.4, 1: 3.3, 2: 3.334 }
178 """
179 return self._set_crush_compat_weight_set_weights(weightmap)
180
181
182 class CRUSHMap(ceph_module.BasePyCRUSH):
183 ITEM_NONE = 0x7fffffff
184 DEFAULT_CHOOSE_ARGS = '-1'
185
186 def dump(self):
187 return self._dump()
188
189 def get_item_weight(self, item):
190 return self._get_item_weight(item)
191
192 def get_item_name(self, item):
193 return self._get_item_name(item)
194
195 def find_takes(self):
196 return self._find_takes().get('takes', [])
197
198 def get_take_weight_osd_map(self, root):
199 uglymap = self._get_take_weight_osd_map(root)
200 return {int(k): v for k, v in six.iteritems(uglymap.get('weights', {}))}
201
202 @staticmethod
203 def have_default_choose_args(dump):
204 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
205
206 @staticmethod
207 def get_default_choose_args(dump):
208 return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
209
210 def get_rule(self, rule_name):
211 # TODO efficient implementation
212 for rule in self.dump()['rules']:
213 if rule_name == rule['rule_name']:
214 return rule
215
216 return None
217
218 def get_rule_by_id(self, rule_id):
219 for rule in self.dump()['rules']:
220 if rule['rule_id'] == rule_id:
221 return rule
222
223 return None
224
225 def get_rule_root(self, rule_name):
226 rule = self.get_rule(rule_name)
227 if rule is None:
228 return None
229
230 try:
231 first_take = [s for s in rule['steps'] if s['op'] == 'take'][0]
232 except IndexError:
233 logging.warning("CRUSH rule '{0}' has no 'take' step".format(
234 rule_name))
235 return None
236 else:
237 return first_take['item']
238
239 def get_osds_under(self, root_id):
240 # TODO don't abuse dump like this
241 d = self.dump()
242 buckets = dict([(b['id'], b) for b in d['buckets']])
243
244 osd_list = []
245
246 def accumulate(b):
247 for item in b['items']:
248 if item['id'] >= 0:
249 osd_list.append(item['id'])
250 else:
251 try:
252 accumulate(buckets[item['id']])
253 except KeyError:
254 pass
255
256 accumulate(buckets[root_id])
257
258 return osd_list
259
260 def device_class_counts(self):
261 result = defaultdict(int) # type: Dict[str, int]
262 # TODO don't abuse dump like this
263 d = self.dump()
264 for device in d['devices']:
265 cls = device.get('class', None)
266 result[cls] += 1
267
268 return dict(result)
269
270
271 class CLICommand(object):
272 COMMANDS = {} # type: Dict[str, CLICommand]
273
274 def __init__(self, prefix, args="", desc="", perm="rw"):
275 self.prefix = prefix
276 self.args = args
277 self.args_dict = {}
278 self.desc = desc
279 self.perm = perm
280 self.func = None # type: Optional[Callable]
281 self._parse_args()
282
283 def _parse_args(self):
284 if not self.args:
285 return
286 args = self.args.split(" ")
287 for arg in args:
288 arg_desc = arg.strip().split(",")
289 arg_d = {}
290 for kv in arg_desc:
291 k, v = kv.split("=")
292 if k != "name":
293 arg_d[k] = v
294 else:
295 self.args_dict[v] = arg_d
296
297 def __call__(self, func):
298 self.func = func
299 self.COMMANDS[self.prefix] = self
300 return self.func
301
302 def call(self, mgr, cmd_dict, inbuf):
303 kwargs = {}
304 for a, d in self.args_dict.items():
305 if 'req' in d and d['req'] == "false" and a not in cmd_dict:
306 continue
307 kwargs[a.replace("-", "_")] = cmd_dict[a]
308 if inbuf:
309 kwargs['inbuf'] = inbuf
310 assert self.func
311 return self.func(mgr, **kwargs)
312
313 def dump_cmd(self):
314 return {
315 'cmd': '{} {}'.format(self.prefix, self.args),
316 'desc': self.desc,
317 'perm': self.perm
318 }
319
320 @classmethod
321 def dump_cmd_list(cls):
322 return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
323
324
325 def CLIReadCommand(prefix, args="", desc=""):
326 return CLICommand(prefix, args, desc, "r")
327
328
329 def CLIWriteCommand(prefix, args="", desc=""):
330 return CLICommand(prefix, args, desc, "w")
331
332
333 def _get_localized_key(prefix, key):
334 return '{}/{}'.format(prefix, key)
335
336
337 class Option(dict):
338 """
339 Helper class to declare options for MODULE_OPTIONS list.
340
341 Caveat: it uses argument names matching Python keywords (type, min, max),
342 so any further processing should happen in a separate method.
343
344 TODO: type validation.
345 """
346
347 def __init__(
348 self, name,
349 default=None,
350 type='str',
351 desc=None, longdesc=None,
352 min=None, max=None,
353 enum_allowed=None,
354 see_also=None,
355 tags=None,
356 runtime=False,
357 ):
358 super(Option, self).__init__(
359 (k, v) for k, v in vars().items()
360 if k != 'self' and v is not None)
361
362 class Command(dict):
363 """
364 Helper class to declare options for COMMANDS list.
365
366 It also allows to specify prefix and args separately, as well as storing a
367 handler callable.
368
369 Usage:
370 >>> Command(prefix="example",
371 ... args="name=arg,type=CephInt",
372 ... perm='w',
373 ... desc="Blah")
374 {'poll': False, 'cmd': 'example name=arg,type=CephInt', 'perm': 'w', 'desc': 'Blah'}
375 """
376
377 def __init__(
378 self,
379 prefix,
380 args=None,
381 perm="rw",
382 desc=None,
383 poll=False,
384 handler=None
385 ):
386 super(Command, self).__init__(
387 cmd=prefix + (' ' + args if args else ''),
388 perm=perm,
389 desc=desc,
390 poll=poll)
391 self.prefix = prefix
392 self.args = args
393 self.handler = handler
394
395 def register(self, instance=False):
396 """
397 Register a CLICommand handler. It allows an instance to register bound
398 methods. In that case, the mgr instance is not passed, and it's expected
399 to be available in the class instance.
400 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
401 items.
402 """
403 return CLICommand(
404 prefix=self.prefix,
405 args=self.args,
406 desc=self['desc'],
407 perm=self['perm']
408 )(
409 func=lambda mgr, *args, **kwargs: HandleCommandResult(*self.handler(
410 *((instance or mgr,) + args), **kwargs))
411 )
412
413
414 class CPlusPlusHandler(logging.Handler):
415 def __init__(self, module_inst):
416 super(CPlusPlusHandler, self).__init__()
417 self._module = module_inst
418 self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
419 .format(module_inst.module_name)))
420
421 def emit(self, record):
422 if record.levelno >= self.level:
423 self._module._ceph_log(self.format(record))
424
425 class ClusterLogHandler(logging.Handler):
426 def __init__(self, module_inst):
427 super().__init__()
428 self._module = module_inst
429 self.setFormatter(logging.Formatter("%(message)s"))
430
431 def emit(self, record):
432 levelmap = {
433 'DEBUG': MgrModule.CLUSTER_LOG_PRIO_DEBUG,
434 'INFO': MgrModule.CLUSTER_LOG_PRIO_INFO,
435 'WARNING': MgrModule.CLUSTER_LOG_PRIO_WARN,
436 'ERROR': MgrModule.CLUSTER_LOG_PRIO_ERROR,
437 'CRITICAL': MgrModule.CLUSTER_LOG_PRIO_ERROR,
438 }
439 level = levelmap[record.levelname]
440 if record.levelno >= self.level:
441 self._module.cluster_log(self._module.module_name,
442 level,
443 self.format(record))
444
445 class FileHandler(logging.FileHandler):
446 def __init__(self, module_inst):
447 path = module_inst.get_ceph_option("log_file")
448 idx = path.rfind(".log")
449 if idx != -1:
450 self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
451 else:
452 self.path = "{}.{}".format(path, module_inst.module_name)
453 super(FileHandler, self).__init__(self.path, delay=True)
454 self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
455
456
457 class MgrModuleLoggingMixin(object):
458 def _configure_logging(self, mgr_level, module_level, cluster_level,
459 log_to_file, log_to_cluster):
460 self._mgr_level = None
461 self._module_level = None
462 self._root_logger = logging.getLogger()
463
464 self._unconfigure_logging()
465
466 # the ceph log handler is initialized only once
467 self._mgr_log_handler = CPlusPlusHandler(self)
468 self._cluster_log_handler = ClusterLogHandler(self)
469 self._file_log_handler = FileHandler(self)
470
471 self.log_to_file = log_to_file
472 self.log_to_cluster = log_to_cluster
473
474 self._root_logger.addHandler(self._mgr_log_handler)
475 if log_to_file:
476 self._root_logger.addHandler(self._file_log_handler)
477 if log_to_cluster:
478 self._root_logger.addHandler(self._cluster_log_handler)
479
480 self._root_logger.setLevel(logging.NOTSET)
481 self._set_log_level(mgr_level, module_level, cluster_level)
482
483
484 def _unconfigure_logging(self):
485 # remove existing handlers:
486 rm_handlers = [
487 h for h in self._root_logger.handlers if isinstance(h, CPlusPlusHandler) or isinstance(h, FileHandler) or isinstance(h, ClusterLogHandler)]
488 for h in rm_handlers:
489 self._root_logger.removeHandler(h)
490 self.log_to_file = False
491 self.log_to_cluster = False
492
493 def _set_log_level(self, mgr_level, module_level, cluster_level):
494 self._cluster_log_handler.setLevel(cluster_level.upper())
495
496 module_level = module_level.upper() if module_level else ''
497 if not self._module_level:
498 # using debug_mgr level
499 if not module_level and self._mgr_level == mgr_level:
500 # no change in module level neither in debug_mgr
501 return
502 else:
503 if self._module_level == module_level:
504 # no change in module level
505 return
506
507 if not self._module_level and not module_level:
508 level = self._ceph_log_level_to_python(mgr_level)
509 self.getLogger().debug("setting log level based on debug_mgr: %s (%s)", level, mgr_level)
510 elif self._module_level and not module_level:
511 level = self._ceph_log_level_to_python(mgr_level)
512 self.getLogger().warning("unsetting module log level, falling back to "
513 "debug_mgr level: %s (%s)", level, mgr_level)
514 elif module_level:
515 level = module_level
516 self.getLogger().debug("setting log level: %s", level)
517
518 self._module_level = module_level
519 self._mgr_level = mgr_level
520
521 self._mgr_log_handler.setLevel(level)
522 self._file_log_handler.setLevel(level)
523
524 def _enable_file_log(self):
525 # enable file log
526 self.getLogger().warning("enabling logging to file")
527 self.log_to_file = True
528 self._root_logger.addHandler(self._file_log_handler)
529
530 def _disable_file_log(self):
531 # disable file log
532 self.getLogger().warning("disabling logging to file")
533 self.log_to_file = False
534 self._root_logger.removeHandler(self._file_log_handler)
535
536 def _enable_cluster_log(self):
537 # enable cluster log
538 self.getLogger().warning("enabling logging to cluster")
539 self.log_to_cluster = True
540 self._root_logger.addHandler(self._cluster_log_handler)
541
542 def _disable_cluster_log(self):
543 # disable cluster log
544 self.getLogger().warning("disabling logging to cluster")
545 self.log_to_cluster = False
546 self._root_logger.removeHandler(self._cluster_log_handler)
547
548 def _ceph_log_level_to_python(self, ceph_log_level):
549 if ceph_log_level:
550 try:
551 ceph_log_level = int(ceph_log_level.split("/", 1)[0])
552 except ValueError:
553 ceph_log_level = 0
554 else:
555 ceph_log_level = 0
556
557 log_level = "DEBUG"
558 if ceph_log_level <= 0:
559 log_level = "CRITICAL"
560 elif ceph_log_level <= 1:
561 log_level = "WARNING"
562 elif ceph_log_level <= 4:
563 log_level = "INFO"
564 return log_level
565
566 def getLogger(self, name=None):
567 return logging.getLogger(name)
568
569
570 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
571 """
572 Standby modules only implement a serve and shutdown method, they
573 are not permitted to implement commands and they do not receive
574 any notifications.
575
576 They only have access to the mgrmap (for accessing service URI info
577 from their active peer), and to configuration settings (read only).
578 """
579
580 MODULE_OPTIONS = [] # type: List[Dict[str, Any]]
581 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
582
583 def __init__(self, module_name, capsule):
584 super(MgrStandbyModule, self).__init__(capsule)
585 self.module_name = module_name
586
587 # see also MgrModule.__init__()
588 for o in self.MODULE_OPTIONS:
589 if 'default' in o:
590 if 'type' in o:
591 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
592 else:
593 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
594
595 mgr_level = self.get_ceph_option("debug_mgr")
596 log_level = self.get_module_option("log_level")
597 cluster_level = self.get_module_option('log_to_cluster_level')
598 self._configure_logging(mgr_level, log_level, cluster_level,
599 False, False)
600
601 # for backwards compatibility
602 self._logger = self.getLogger()
603
604 def __del__(self):
605 self._unconfigure_logging()
606
607 @classmethod
608 def _register_options(cls, module_name):
609 cls.MODULE_OPTIONS.append(
610 Option(name='log_level', type='str', default="", runtime=True,
611 enum_allowed=['info', 'debug', 'critical', 'error',
612 'warning', '']))
613 cls.MODULE_OPTIONS.append(
614 Option(name='log_to_file', type='bool', default=False, runtime=True))
615 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
616 cls.MODULE_OPTIONS.append(
617 Option(name='log_to_cluster', type='bool', default=False,
618 runtime=True))
619 cls.MODULE_OPTIONS.append(
620 Option(name='log_to_cluster_level', type='str', default='info',
621 runtime=True,
622 enum_allowed=['info', 'debug', 'critical', 'error',
623 'warning', '']))
624
625 @property
626 def log(self):
627 return self._logger
628
629 def serve(self):
630 """
631 The serve method is mandatory for standby modules.
632 :return:
633 """
634 raise NotImplementedError()
635
636 def get_mgr_id(self):
637 return self._ceph_get_mgr_id()
638
639 def get_module_option(self, key, default=None):
640 """
641 Retrieve the value of a persistent configuration setting
642
643 :param str key:
644 :param default: the default value of the config if it is not found
645 :return: str
646 """
647 r = self._ceph_get_module_option(key)
648 if r is None:
649 return self.MODULE_OPTION_DEFAULTS.get(key, default)
650 else:
651 return r
652
653 def get_ceph_option(self, key):
654 return self._ceph_get_option(key)
655
656 def get_store(self, key):
657 """
658 Retrieve the value of a persistent KV store entry
659
660 :param key: String
661 :return: Byte string or None
662 """
663 return self._ceph_get_store(key)
664
665 def get_active_uri(self):
666 return self._ceph_get_active_uri()
667
668 def get_localized_module_option(self, key, default=None):
669 r = self._ceph_get_module_option(key, self.get_mgr_id())
670 if r is None:
671 return self.MODULE_OPTION_DEFAULTS.get(key, default)
672 else:
673 return r
674
675
676 class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
677 COMMANDS = [] # type: List[Any]
678 MODULE_OPTIONS = [] # type: List[dict]
679 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
680
681 # Priority definitions for perf counters
682 PRIO_CRITICAL = 10
683 PRIO_INTERESTING = 8
684 PRIO_USEFUL = 5
685 PRIO_UNINTERESTING = 2
686 PRIO_DEBUGONLY = 0
687
688 # counter value types
689 PERFCOUNTER_TIME = 1
690 PERFCOUNTER_U64 = 2
691
692 # counter types
693 PERFCOUNTER_LONGRUNAVG = 4
694 PERFCOUNTER_COUNTER = 8
695 PERFCOUNTER_HISTOGRAM = 0x10
696 PERFCOUNTER_TYPE_MASK = ~3
697
698 # units supported
699 BYTES = 0
700 NONE = 1
701
702 # Cluster log priorities
703 CLUSTER_LOG_PRIO_DEBUG = 0
704 CLUSTER_LOG_PRIO_INFO = 1
705 CLUSTER_LOG_PRIO_SEC = 2
706 CLUSTER_LOG_PRIO_WARN = 3
707 CLUSTER_LOG_PRIO_ERROR = 4
708
709 def __init__(self, module_name, py_modules_ptr, this_ptr):
710 self.module_name = module_name
711 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
712
713 for o in self.MODULE_OPTIONS:
714 if 'default' in o:
715 if 'type' in o:
716 # we'll assume the declared type matches the
717 # supplied default value's type.
718 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
719 else:
720 # module not declaring it's type, so normalize the
721 # default value to be a string for consistent behavior
722 # with default and user-supplied option values.
723 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
724
725 mgr_level = self.get_ceph_option("debug_mgr")
726 log_level = self.get_module_option("log_level")
727 cluster_level = self.get_module_option('log_to_cluster_level')
728 log_to_file = self.get_module_option("log_to_file")
729 log_to_cluster = self.get_module_option("log_to_cluster")
730 self._configure_logging(mgr_level, log_level, cluster_level,
731 log_to_file, log_to_cluster)
732
733 # for backwards compatibility
734 self._logger = self.getLogger()
735
736 self._version = self._ceph_get_version()
737
738 self._perf_schema_cache = None
739
740 # Keep a librados instance for those that need it.
741 self._rados = None
742
743
744 def __del__(self):
745 self._unconfigure_logging()
746
747 @classmethod
748 def _register_options(cls, module_name):
749 cls.MODULE_OPTIONS.append(
750 Option(name='log_level', type='str', default="", runtime=True,
751 enum_allowed=['info', 'debug', 'critical', 'error',
752 'warning', '']))
753 cls.MODULE_OPTIONS.append(
754 Option(name='log_to_file', type='bool', default=False, runtime=True))
755 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
756 cls.MODULE_OPTIONS.append(
757 Option(name='log_to_cluster', type='bool', default=False,
758 runtime=True))
759 cls.MODULE_OPTIONS.append(
760 Option(name='log_to_cluster_level', type='str', default='info',
761 runtime=True,
762 enum_allowed=['info', 'debug', 'critical', 'error',
763 'warning', '']))
764
765 @classmethod
766 def _register_commands(cls, module_name):
767 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
768
769 @property
770 def log(self):
771 return self._logger
772
773 def cluster_log(self, channel, priority, message):
774 """
775 :param channel: The log channel. This can be 'cluster', 'audit', ...
776 :type channel: str
777 :param priority: The log message priority. This can be
778 CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
779 CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
780 CLUSTER_LOG_PRIO_ERROR.
781 :type priority: int
782 :param message: The message to log.
783 :type message: str
784 """
785 self._ceph_cluster_log(channel, priority, message)
786
787 @property
788 def version(self):
789 return self._version
790
791 @property
792 def release_name(self):
793 """
794 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
795 :return: Returns the release name of the Ceph version in lower case.
796 :rtype: str
797 """
798 return self._ceph_get_release_name()
799
800 def get_context(self):
801 """
802 :return: a Python capsule containing a C++ CephContext pointer
803 """
804 return self._ceph_get_context()
805
806 def notify(self, notify_type, notify_id):
807 """
808 Called by the ceph-mgr service to notify the Python plugin
809 that new state is available.
810
811 :param notify_type: string indicating what kind of notification,
812 such as osd_map, mon_map, fs_map, mon_status,
813 health, pg_summary, command, service_map
814 :param notify_id: string (may be empty) that optionally specifies
815 which entity is being notified about. With
816 "command" notifications this is set to the tag
817 ``from send_command``.
818 """
819 pass
820
821 def _config_notify(self):
822 # check logging options for changes
823 mgr_level = self.get_ceph_option("debug_mgr")
824 module_level = self.get_module_option("log_level")
825 cluster_level = self.get_module_option("log_to_cluster_level")
826 log_to_file = self.get_module_option("log_to_file", False)
827 log_to_cluster = self.get_module_option("log_to_cluster", False)
828
829 self._set_log_level(mgr_level, module_level, cluster_level)
830
831 if log_to_file != self.log_to_file:
832 if log_to_file:
833 self._enable_file_log()
834 else:
835 self._disable_file_log()
836 if log_to_cluster != self.log_to_cluster:
837 if log_to_cluster:
838 self._enable_cluster_log()
839 else:
840 self._disable_cluster_log()
841
842 # call module subclass implementations
843 self.config_notify()
844
845 def config_notify(self):
846 """
847 Called by the ceph-mgr service to notify the Python plugin
848 that the configuration may have changed. Modules will want to
849 refresh any configuration values stored in config variables.
850 """
851 pass
852
853 def serve(self):
854 """
855 Called by the ceph-mgr service to start any server that
856 is provided by this Python plugin. The implementation
857 of this function should block until ``shutdown`` is called.
858
859 You *must* implement ``shutdown`` if you implement ``serve``
860 """
861 pass
862
863 def shutdown(self):
864 """
865 Called by the ceph-mgr service to request that this
866 module drop out of its serve() function. You do not
867 need to implement this if you do not implement serve()
868
869 :return: None
870 """
871 if self._rados:
872 addrs = self._rados.get_addrs()
873 self._rados.shutdown()
874 self._ceph_unregister_client(addrs)
875
876 def get(self, data_name):
877 """
878 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
879
880 :param str data_name: Valid things to fetch are osd_crush_map_text,
881 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
882 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
883 health, mon_status, devices, device <devid>, pg_stats,
884 pool_stats, pg_ready, osd_ping_times.
885
886 Note:
887 All these structures have their own JSON representations: experiment
888 or look at the C++ ``dump()`` methods to learn about them.
889 """
890 return self._ceph_get(data_name)
891
892 def _stattype_to_str(self, stattype):
893
894 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
895 if typeonly == 0:
896 return 'gauge'
897 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
898 # this lie matches the DaemonState decoding: only val, no counts
899 return 'counter'
900 if typeonly == self.PERFCOUNTER_COUNTER:
901 return 'counter'
902 if typeonly == self.PERFCOUNTER_HISTOGRAM:
903 return 'histogram'
904
905 return ''
906
907 def _perfpath_to_path_labels(self, daemon, path):
908 # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]
909 label_names = ("ceph_daemon",) # type: Tuple[str, ...]
910 labels = (daemon,) # type: Tuple[str, ...]
911
912 if daemon.startswith('rbd-mirror.'):
913 match = re.match(
914 r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
915 path
916 )
917 if match:
918 path = 'rbd_mirror_image_' + match.group(4)
919 pool = match.group(1)
920 namespace = match.group(2) or ''
921 image = match.group(3)
922 label_names += ('pool', 'namespace', 'image')
923 labels += (pool, namespace, image)
924
925 return path, label_names, labels,
926
927 def _perfvalue_to_value(self, stattype, value):
928 if stattype & self.PERFCOUNTER_TIME:
929 # Convert from ns to seconds
930 return value / 1000000000.0
931 else:
932 return value
933
934 def _unit_to_str(self, unit):
935 if unit == self.NONE:
936 return "/s"
937 elif unit == self.BYTES:
938 return "B/s"
939
940 @staticmethod
941 def to_pretty_iec(n):
942 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
943 (20, 'Mi'), (10, 'Ki')]:
944 if n > 10 << bits:
945 return str(n >> bits) + ' ' + suffix
946 return str(n) + ' '
947
948 @staticmethod
949 def get_pretty_row(elems, width):
950 """
951 Takes an array of elements and returns a string with those elements
952 formatted as a table row. Useful for polling modules.
953
954 :param elems: the elements to be printed
955 :param width: the width of the terminal
956 """
957 n = len(elems)
958 column_width = int(width / n)
959
960 ret = '|'
961 for elem in elems:
962 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
963
964 return ret
965
966 def get_pretty_header(self, elems, width):
967 """
968 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
969
970 :param elems: the elements to be printed
971 :param width: the width of the terminal
972 """
973 n = len(elems)
974 column_width = int(width / n)
975
976 # dash line
977 ret = '+'
978 for i in range(0, n):
979 ret += '-' * (column_width - 1) + '+'
980 ret += '\n'
981
982 # title
983 ret += self.get_pretty_row(elems, width)
984 ret += '\n'
985
986 # dash line
987 ret += '+'
988 for i in range(0, n):
989 ret += '-' * (column_width - 1) + '+'
990 ret += '\n'
991
992 return ret
993
994 def get_server(self, hostname):
995 """
996 Called by the plugin to fetch metadata about a particular hostname from
997 ceph-mgr.
998
999 This is information that ceph-mgr has gleaned from the daemon metadata
1000 reported by daemons running on a particular server.
1001
1002 :param hostname: a hostname
1003 """
1004 return self._ceph_get_server(hostname)
1005
1006 def get_perf_schema(self, svc_type, svc_name):
1007 """
1008 Called by the plugin to fetch perf counter schema info.
1009 svc_name can be nullptr, as can svc_type, in which case
1010 they are wildcards
1011
1012 :param str svc_type:
1013 :param str svc_name:
1014 :return: list of dicts describing the counters requested
1015 """
1016 return self._ceph_get_perf_schema(svc_type, svc_name)
1017
1018 def get_counter(self, svc_type, svc_name, path):
1019 """
1020 Called by the plugin to fetch the latest performance counter data for a
1021 particular counter on a particular service.
1022
1023 :param str svc_type:
1024 :param str svc_name:
1025 :param str path: a period-separated concatenation of the subsystem and the
1026 counter name, for example "mds.inodes".
1027 :return: A list of two-tuples of (timestamp, value) is returned. This may be
1028 empty if no data is available.
1029 """
1030 return self._ceph_get_counter(svc_type, svc_name, path)
1031
1032 def get_latest_counter(self, svc_type, svc_name, path):
1033 """
1034 Called by the plugin to fetch only the newest performance counter data
1035 pointfor a particular counter on a particular service.
1036
1037 :param str svc_type:
1038 :param str svc_name:
1039 :param str path: a period-separated concatenation of the subsystem and the
1040 counter name, for example "mds.inodes".
1041 :return: A list of two-tuples of (timestamp, value) is returned. This may be
1042 empty if no data is available.
1043 """
1044 return self._ceph_get_latest_counter(svc_type, svc_name, path)
1045
1046 def list_servers(self):
1047 """
1048 Like ``get_server``, but gives information about all servers (i.e. all
1049 unique hostnames that have been mentioned in daemon metadata)
1050
1051 :return: a list of information about all servers
1052 :rtype: list
1053 """
1054 return self._ceph_get_server(None)
1055
1056 def get_metadata(self, svc_type, svc_id):
1057 """
1058 Fetch the daemon metadata for a particular service.
1059
1060 ceph-mgr fetches metadata asynchronously, so are windows of time during
1061 addition/removal of services where the metadata is not available to
1062 modules. ``None`` is returned if no metadata is available.
1063
1064 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1065 :param str svc_id: service id. convert OSD integer IDs to strings when
1066 calling this
1067 :rtype: dict, or None if no metadata found
1068 """
1069 return self._ceph_get_metadata(svc_type, svc_id)
1070
1071 def get_daemon_status(self, svc_type, svc_id):
1072 """
1073 Fetch the latest status for a particular service daemon.
1074
1075 This method may return ``None`` if no status information is
1076 available, for example because the daemon hasn't fully started yet.
1077
1078 :param svc_type: string (e.g., 'rgw')
1079 :param svc_id: string
1080 :return: dict, or None if the service is not found
1081 """
1082 return self._ceph_get_daemon_status(svc_type, svc_id)
1083
1084 def check_mon_command(self, cmd_dict: dict) -> HandleCommandResult:
1085 """
1086 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1087 if ``retval != 0``.
1088 """
1089
1090 r = HandleCommandResult(*self.mon_command(cmd_dict))
1091 if r.retval:
1092 raise MonCommandFailed(f'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1093 return r
1094
1095 def mon_command(self, cmd_dict):
1096 """
1097 Helper for modules that do simple, synchronous mon command
1098 execution.
1099
1100 See send_command for general case.
1101
1102 :return: status int, out std, err str
1103 """
1104
1105 t1 = time.time()
1106 result = CommandResult()
1107 self.send_command(result, "mon", "", json.dumps(cmd_dict), "")
1108 r = result.wait()
1109 t2 = time.time()
1110
1111 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1112 cmd_dict['prefix'], r[0], t2 - t1
1113 ))
1114
1115 return r
1116
1117 def send_command(self, *args, **kwargs):
1118 """
1119 Called by the plugin to send a command to the mon
1120 cluster.
1121
1122 :param CommandResult result: an instance of the ``CommandResult``
1123 class, defined in the same module as MgrModule. This acts as a
1124 completion and stores the output of the command. Use
1125 ``CommandResult.wait()`` if you want to block on completion.
1126 :param str svc_type:
1127 :param str svc_id:
1128 :param str command: a JSON-serialized command. This uses the same
1129 format as the ceph command line, which is a dictionary of command
1130 arguments, with the extra ``prefix`` key containing the command
1131 name itself. Consult MonCommands.h for available commands and
1132 their expected arguments.
1133 :param str tag: used for nonblocking operation: when a command
1134 completes, the ``notify()`` callback on the MgrModule instance is
1135 triggered, with notify_type set to "command", and notify_id set to
1136 the tag of the command.
1137 """
1138 self._ceph_send_command(*args, **kwargs)
1139
1140 def set_health_checks(self, checks):
1141 """
1142 Set the module's current map of health checks. Argument is a
1143 dict of check names to info, in this form:
1144
1145 ::
1146
1147 {
1148 'CHECK_FOO': {
1149 'severity': 'warning', # or 'error'
1150 'summary': 'summary string',
1151 'count': 4, # quantify badness
1152 'detail': [ 'list', 'of', 'detail', 'strings' ],
1153 },
1154 'CHECK_BAR': {
1155 'severity': 'error',
1156 'summary': 'bars are bad',
1157 'detail': [ 'too hard' ],
1158 },
1159 }
1160
1161 :param list: dict of health check dicts
1162 """
1163 self._ceph_set_health_checks(checks)
1164
1165 def _handle_command(self, inbuf, cmd):
1166 if cmd['prefix'] not in CLICommand.COMMANDS:
1167 return self.handle_command(inbuf, cmd)
1168
1169 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
1170
1171 def handle_command(self, inbuf, cmd):
1172 """
1173 Called by ceph-mgr to request the plugin to handle one
1174 of the commands that it declared in self.COMMANDS
1175
1176 Return a status code, an output buffer, and an
1177 output string. The output buffer is for data results,
1178 the output string is for informative text.
1179
1180 :param inbuf: content of any "-i <file>" supplied to ceph cli
1181 :type inbuf: str
1182 :param cmd: from Ceph's cmdmap_t
1183 :type cmd: dict
1184
1185 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1186 """
1187
1188 # Should never get called if they didn't declare
1189 # any ``COMMANDS``
1190 raise NotImplementedError()
1191
1192 def get_mgr_id(self):
1193 """
1194 Retrieve the name of the manager daemon where this plugin
1195 is currently being executed (i.e. the active manager).
1196
1197 :return: str
1198 """
1199 return self._ceph_get_mgr_id()
1200
1201 def get_ceph_option(self, key):
1202 return self._ceph_get_option(key)
1203
1204 def _validate_module_option(self, key):
1205 """
1206 Helper: don't allow get/set config callers to
1207 access config options that they didn't declare
1208 in their schema.
1209 """
1210 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
1211 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1212 format(key, self.__class__.__name__))
1213
1214 def _get_module_option(self, key, default, localized_prefix=""):
1215 r = self._ceph_get_module_option(self.module_name, key,
1216 localized_prefix)
1217 if r is None:
1218 return self.MODULE_OPTION_DEFAULTS.get(key, default)
1219 else:
1220 return r
1221
1222 def get_module_option(self, key, default=None):
1223 """
1224 Retrieve the value of a persistent configuration setting
1225
1226 :param str key:
1227 :param str default:
1228 :return: str
1229 """
1230 self._validate_module_option(key)
1231 return self._get_module_option(key, default)
1232
1233 def get_module_option_ex(self, module, key, default=None):
1234 """
1235 Retrieve the value of a persistent configuration setting
1236 for the specified module.
1237
1238 :param str module: The name of the module, e.g. 'dashboard'
1239 or 'telemetry'.
1240 :param str key: The configuration key, e.g. 'server_addr'.
1241 :param str,None default: The default value to use when the
1242 returned value is ``None``. Defaults to ``None``.
1243 :return: str,int,bool,float,None
1244 """
1245 if module == self.module_name:
1246 self._validate_module_option(key)
1247 r = self._ceph_get_module_option(module, key)
1248 return default if r is None else r
1249
1250 def get_store_prefix(self, key_prefix):
1251 """
1252 Retrieve a dict of KV store keys to values, where the keys
1253 have the given prefix
1254
1255 :param str key_prefix:
1256 :return: str
1257 """
1258 return self._ceph_get_store_prefix(key_prefix)
1259
1260 def _set_localized(self, key, val, setter):
1261 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1262
1263 def get_localized_module_option(self, key, default=None):
1264 """
1265 Retrieve localized configuration for this ceph-mgr instance
1266 :param str key:
1267 :param str default:
1268 :return: str
1269 """
1270 self._validate_module_option(key)
1271 return self._get_module_option(key, default, self.get_mgr_id())
1272
1273 def _set_module_option(self, key, val):
1274 return self._ceph_set_module_option(self.module_name, key,
1275 None if val is None else str(val))
1276
1277 def set_module_option(self, key, val):
1278 """
1279 Set the value of a persistent configuration setting
1280
1281 :param str key:
1282 :type val: str | None
1283 """
1284 self._validate_module_option(key)
1285 return self._set_module_option(key, val)
1286
1287 def set_module_option_ex(self, module, key, val):
1288 """
1289 Set the value of a persistent configuration setting
1290 for the specified module.
1291
1292 :param str module:
1293 :param str key:
1294 :param str val:
1295 """
1296 if module == self.module_name:
1297 self._validate_module_option(key)
1298 return self._ceph_set_module_option(module, key, str(val))
1299
1300 def set_localized_module_option(self, key, val):
1301 """
1302 Set localized configuration for this ceph-mgr instance
1303 :param str key:
1304 :param str val:
1305 :return: str
1306 """
1307 self._validate_module_option(key)
1308 return self._set_localized(key, val, self._set_module_option)
1309
1310 def set_store(self, key, val):
1311 """
1312 Set a value in this module's persistent key value store.
1313 If val is None, remove key from store
1314
1315 :param str key:
1316 :param str val:
1317 """
1318 self._ceph_set_store(key, val)
1319
1320 def get_store(self, key, default=None):
1321 """
1322 Get a value from this module's persistent key value store
1323 """
1324 r = self._ceph_get_store(key)
1325 if r is None:
1326 return default
1327 else:
1328 return r
1329
1330 def get_localized_store(self, key, default=None):
1331 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1332 if r is None:
1333 r = self._ceph_get_store(key)
1334 if r is None:
1335 r = default
1336 return r
1337
1338 def set_localized_store(self, key, val):
1339 return self._set_localized(key, val, self.set_store)
1340
1341 def self_test(self):
1342 """
1343 Run a self-test on the module. Override this function and implement
1344 a best as possible self-test for (automated) testing of the module
1345
1346 Indicate any failures by raising an exception. This does not have
1347 to be pretty, it's mainly for picking up regressions during
1348 development, rather than use in the field.
1349
1350 :return: None, or an advisory string for developer interest, such
1351 as a json dump of some state.
1352 """
1353 pass
1354
1355 def get_osdmap(self):
1356 """
1357 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1358 OSDMap.
1359 :return: OSDMap
1360 """
1361 return self._ceph_get_osdmap()
1362
1363 def get_latest(self, daemon_type, daemon_name, counter):
1364 data = self.get_latest_counter(
1365 daemon_type, daemon_name, counter)[counter]
1366 if data:
1367 return data[1]
1368 else:
1369 return 0
1370
1371 def get_latest_avg(self, daemon_type, daemon_name, counter):
1372 data = self.get_latest_counter(
1373 daemon_type, daemon_name, counter)[counter]
1374 if data:
1375 return data[1], data[2]
1376 else:
1377 return 0, 0
1378
1379 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL,
1380 services=("mds", "mon", "osd",
1381 "rbd-mirror", "rgw", "tcmu-runner")):
1382 """
1383 Return the perf counters currently known to this ceph-mgr
1384 instance, filtered by priority equal to or greater than `prio_limit`.
1385
1386 The result is a map of string to dict, associating services
1387 (like "osd.123") with their counters. The counter
1388 dict for each service maps counter paths to a counter
1389 info structure, which is the information from
1390 the schema, plus an additional "value" member with the latest
1391 value.
1392 """
1393
1394 result = defaultdict(dict) # type: Dict[str, dict]
1395
1396 for server in self.list_servers():
1397 for service in server['services']:
1398 if service['type'] not in services:
1399 continue
1400
1401 schema = self.get_perf_schema(service['type'], service['id'])
1402 if not schema:
1403 self.log.warning("No perf counter schema for {0}.{1}".format(
1404 service['type'], service['id']
1405 ))
1406 continue
1407
1408 # Value is returned in a potentially-multi-service format,
1409 # get just the service we're asking about
1410 svc_full_name = "{0}.{1}".format(
1411 service['type'], service['id'])
1412 schema = schema[svc_full_name]
1413
1414 # Populate latest values
1415 for counter_path, counter_schema in schema.items():
1416 # self.log.debug("{0}: {1}".format(
1417 # counter_path, json.dumps(counter_schema)
1418 # ))
1419 if counter_schema['priority'] < prio_limit:
1420 continue
1421
1422 counter_info = dict(counter_schema)
1423
1424 # Also populate count for the long running avgs
1425 if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
1426 v, c = self.get_latest_avg(
1427 service['type'],
1428 service['id'],
1429 counter_path
1430 )
1431 counter_info['value'], counter_info['count'] = v, c
1432 result[svc_full_name][counter_path] = counter_info
1433 else:
1434 counter_info['value'] = self.get_latest(
1435 service['type'],
1436 service['id'],
1437 counter_path
1438 )
1439
1440 result[svc_full_name][counter_path] = counter_info
1441
1442 self.log.debug("returning {0} counter".format(len(result)))
1443
1444 return result
1445
1446 def set_uri(self, uri):
1447 """
1448 If the module exposes a service, then call this to publish the
1449 address once it is available.
1450
1451 :return: a string
1452 """
1453 return self._ceph_set_uri(uri)
1454
1455 def have_mon_connection(self):
1456 """
1457 Check whether this ceph-mgr daemon has an open connection
1458 to a monitor. If it doesn't, then it's likely that the
1459 information we have about the cluster is out of date,
1460 and/or the monitor cluster is down.
1461 """
1462
1463 return self._ceph_have_mon_connection()
1464
1465 def update_progress_event(self, evid, desc, progress):
1466 return self._ceph_update_progress_event(str(evid),
1467 str(desc),
1468 float(progress))
1469
1470 def complete_progress_event(self, evid):
1471 return self._ceph_complete_progress_event(str(evid))
1472
1473 def clear_all_progress_events(self):
1474 return self._ceph_clear_all_progress_events()
1475
1476 @property
1477 def rados(self):
1478 """
1479 A librados instance to be shared by any classes within
1480 this mgr module that want one.
1481 """
1482 if self._rados:
1483 return self._rados
1484
1485 ctx_capsule = self.get_context()
1486 self._rados = rados.Rados(context=ctx_capsule)
1487 self._rados.connect()
1488 self._ceph_register_client(self._rados.get_addrs())
1489 return self._rados
1490
1491 @staticmethod
1492 def can_run():
1493 """
1494 Implement this function to report whether the module's dependencies
1495 are met. For example, if the module needs to import a particular
1496 dependency to work, then use a try/except around the import at
1497 file scope, and then report here if the import failed.
1498
1499 This will be called in a blocking way from the C++ code, so do not
1500 do any I/O that could block in this function.
1501
1502 :return a 2-tuple consisting of a boolean and explanatory string
1503 """
1504
1505 return True, ""
1506
1507 def remote(self, module_name, method_name, *args, **kwargs):
1508 """
1509 Invoke a method on another module. All arguments, and the return
1510 value from the other module must be serializable.
1511
1512 Limitation: Do not import any modules within the called method.
1513 Otherwise you will get an error in Python 2::
1514
1515 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1516
1517
1518
1519 :param module_name: Name of other module. If module isn't loaded,
1520 an ImportError exception is raised.
1521 :param method_name: Method name. If it does not exist, a NameError
1522 exception is raised.
1523 :param args: Argument tuple
1524 :param kwargs: Keyword argument dict
1525 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1526 :raises ImportError: No such module
1527 """
1528 return self._ceph_dispatch_remote(module_name, method_name,
1529 args, kwargs)
1530
1531 def add_osd_perf_query(self, query):
1532 """
1533 Register an OSD perf query. Argument is a
1534 dict of the query parameters, in this form:
1535
1536 ::
1537
1538 {
1539 'key_descriptor': [
1540 {'type': subkey_type, 'regex': regex_pattern},
1541 ...
1542 ],
1543 'performance_counter_descriptors': [
1544 list, of, descriptor, types
1545 ],
1546 'limit': {'order_by': performance_counter_type, 'max_count': n},
1547 }
1548
1549 Valid subkey types:
1550 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1551 'pg_id', 'object_name', 'snap_id'
1552 Valid performance counter types:
1553 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1554 'latency', 'write_latency', 'read_latency'
1555
1556 :param object query: query
1557 :rtype: int (query id)
1558 """
1559 return self._ceph_add_osd_perf_query(query)
1560
1561 def remove_osd_perf_query(self, query_id):
1562 """
1563 Unregister an OSD perf query.
1564
1565 :param int query_id: query ID
1566 """
1567 return self._ceph_remove_osd_perf_query(query_id)
1568
1569 def get_osd_perf_counters(self, query_id):
1570 """
1571 Get stats collected for an OSD perf query.
1572
1573 :param int query_id: query ID
1574 """
1575 return self._ceph_get_osd_perf_counters(query_id)
1576
1577 def is_authorized(self, arguments):
1578 """
1579 Verifies that the current session caps permit executing the py service
1580 or current module with the provided arguments. This provides a generic
1581 way to allow modules to restrict by more fine-grained controls (e.g.
1582 pools).
1583
1584 :param arguments: dict of key/value arguments to test
1585 """
1586 return self._ceph_is_authorized(arguments)