]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
d6ba31f72086247711e4b14b375b53a66856047d
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1 import ceph_module # noqa
2
3 try:
4 from typing import Set, Tuple, Iterator, Any, Dict, Optional, Callable, List
5 except ImportError:
6 # just for type checking
7 pass
8 import logging
9 import errno
10 import json
11 import six
12 import threading
13 from collections import defaultdict, namedtuple
14 import rados
15 import re
16 import time
17
18 PG_STATES = [
19 "active",
20 "clean",
21 "down",
22 "recovery_unfound",
23 "backfill_unfound",
24 "scrubbing",
25 "degraded",
26 "inconsistent",
27 "peering",
28 "repair",
29 "recovering",
30 "forced_recovery",
31 "backfill_wait",
32 "incomplete",
33 "stale",
34 "remapped",
35 "deep",
36 "backfilling",
37 "forced_backfill",
38 "backfill_toofull",
39 "recovery_wait",
40 "recovery_toofull",
41 "undersized",
42 "activating",
43 "peered",
44 "snaptrim",
45 "snaptrim_wait",
46 "snaptrim_error",
47 "creating",
48 "unknown"]
49
50
51 class CommandResult(object):
52 """
53 Use with MgrModule.send_command
54 """
55
56 def __init__(self, tag=None):
57 self.ev = threading.Event()
58 self.outs = ""
59 self.outb = ""
60 self.r = 0
61
62 # This is just a convenience for notifications from
63 # C++ land, to avoid passing addresses around in messages.
64 self.tag = tag if tag else ""
65
66 def complete(self, r, outb, outs):
67 self.r = r
68 self.outb = outb
69 self.outs = outs
70 self.ev.set()
71
72 def wait(self):
73 self.ev.wait()
74 return self.r, self.outb, self.outs
75
76
77 class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
78 def __new__(cls, retval=0, stdout="", stderr=""):
79 """
80 Tuple containing the result of `handle_command()`
81
82 Only write to stderr if there is an error, or in extraordinary circumstances
83
84 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
85 is critical information to include there.
86
87 Everything programmatically consumable should be put on stdout
88
89 :param retval: return code. E.g. 0 or -errno.EINVAL
90 :type retval: int
91 :param stdout: data of this result.
92 :type stdout: str
93 :param stderr: Typically used for error messages.
94 :type stderr: str
95 """
96 return super(HandleCommandResult, cls).__new__(cls, retval, stdout, stderr)
97
98
99 class OSDMap(ceph_module.BasePyOSDMap):
100 def get_epoch(self):
101 return self._get_epoch()
102
103 def get_crush_version(self):
104 return self._get_crush_version()
105
106 def dump(self):
107 return self._dump()
108
109 def get_pools(self):
110 # FIXME: efficient implementation
111 d = self._dump()
112 return dict([(p['pool'], p) for p in d['pools']])
113
114 def get_pools_by_name(self):
115 # FIXME: efficient implementation
116 d = self._dump()
117 return dict([(p['pool_name'], p) for p in d['pools']])
118
119 def new_incremental(self):
120 return self._new_incremental()
121
122 def apply_incremental(self, inc):
123 return self._apply_incremental(inc)
124
125 def get_crush(self):
126 return self._get_crush()
127
128 def get_pools_by_take(self, take):
129 return self._get_pools_by_take(take).get('pools', [])
130
131 def calc_pg_upmaps(self, inc,
132 max_deviation=.01, max_iterations=10, pools=None):
133 if pools is None:
134 pools = []
135 return self._calc_pg_upmaps(
136 inc,
137 max_deviation, max_iterations, pools)
138
139 def map_pool_pgs_up(self, poolid):
140 return self._map_pool_pgs_up(poolid)
141
142 def pg_to_up_acting_osds(self, pool_id, ps):
143 return self._pg_to_up_acting_osds(pool_id, ps)
144
145 def pool_raw_used_rate(self, pool_id):
146 return self._pool_raw_used_rate(pool_id)
147
148 def get_ec_profile(self, name):
149 # FIXME: efficient implementation
150 d = self._dump()
151 return d['erasure_code_profiles'].get(name, None)
152
153 def get_require_osd_release(self):
154 d = self._dump()
155 return d['require_osd_release']
156
157
158 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
159 def get_epoch(self):
160 return self._get_epoch()
161
162 def dump(self):
163 return self._dump()
164
165 def set_osd_reweights(self, weightmap):
166 """
167 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
168 """
169 return self._set_osd_reweights(weightmap)
170
171 def set_crush_compat_weight_set_weights(self, weightmap):
172 """
173 weightmap is a dict, int to float. devices only. e.g.,
174 { 0: 3.4, 1: 3.3, 2: 3.334 }
175 """
176 return self._set_crush_compat_weight_set_weights(weightmap)
177
178
179 class CRUSHMap(ceph_module.BasePyCRUSH):
180 ITEM_NONE = 0x7fffffff
181 DEFAULT_CHOOSE_ARGS = '-1'
182
183 def dump(self):
184 return self._dump()
185
186 def get_item_weight(self, item):
187 return self._get_item_weight(item)
188
189 def get_item_name(self, item):
190 return self._get_item_name(item)
191
192 def find_takes(self):
193 return self._find_takes().get('takes', [])
194
195 def get_take_weight_osd_map(self, root):
196 uglymap = self._get_take_weight_osd_map(root)
197 return {int(k): v for k, v in six.iteritems(uglymap.get('weights', {}))}
198
199 @staticmethod
200 def have_default_choose_args(dump):
201 return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {})
202
203 @staticmethod
204 def get_default_choose_args(dump):
205 return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, [])
206
207 def get_rule(self, rule_name):
208 # TODO efficient implementation
209 for rule in self.dump()['rules']:
210 if rule_name == rule['rule_name']:
211 return rule
212
213 return None
214
215 def get_rule_by_id(self, rule_id):
216 for rule in self.dump()['rules']:
217 if rule['rule_id'] == rule_id:
218 return rule
219
220 return None
221
222 def get_rule_root(self, rule_name):
223 rule = self.get_rule(rule_name)
224 if rule is None:
225 return None
226
227 try:
228 first_take = [s for s in rule['steps'] if s['op'] == 'take'][0]
229 except IndexError:
230 logging.warning("CRUSH rule '{0}' has no 'take' step".format(
231 rule_name))
232 return None
233 else:
234 return first_take['item']
235
236 def get_osds_under(self, root_id):
237 # TODO don't abuse dump like this
238 d = self.dump()
239 buckets = dict([(b['id'], b) for b in d['buckets']])
240
241 osd_list = []
242
243 def accumulate(b):
244 for item in b['items']:
245 if item['id'] >= 0:
246 osd_list.append(item['id'])
247 else:
248 try:
249 accumulate(buckets[item['id']])
250 except KeyError:
251 pass
252
253 accumulate(buckets[root_id])
254
255 return osd_list
256
257 def device_class_counts(self):
258 result = defaultdict(int) # type: Dict[str, int]
259 # TODO don't abuse dump like this
260 d = self.dump()
261 for device in d['devices']:
262 cls = device.get('class', None)
263 result[cls] += 1
264
265 return dict(result)
266
267
268 class CLICommand(object):
269 COMMANDS = {} # type: Dict[str, CLICommand]
270
271 def __init__(self, prefix, args="", desc="", perm="rw"):
272 self.prefix = prefix
273 self.args = args
274 self.args_dict = {}
275 self.desc = desc
276 self.perm = perm
277 self.func = None # type: Optional[Callable]
278 self._parse_args()
279
280 def _parse_args(self):
281 if not self.args:
282 return
283 args = self.args.split(" ")
284 for arg in args:
285 arg_desc = arg.strip().split(",")
286 arg_d = {}
287 for kv in arg_desc:
288 k, v = kv.split("=")
289 if k != "name":
290 arg_d[k] = v
291 else:
292 self.args_dict[v] = arg_d
293
294 def __call__(self, func):
295 self.func = func
296 self.COMMANDS[self.prefix] = self
297 return self.func
298
299 def call(self, mgr, cmd_dict, inbuf):
300 kwargs = {}
301 for a, d in self.args_dict.items():
302 if 'req' in d and d['req'] == "false" and a not in cmd_dict:
303 continue
304 kwargs[a.replace("-", "_")] = cmd_dict[a]
305 if inbuf:
306 kwargs['inbuf'] = inbuf
307 assert self.func
308 return self.func(mgr, **kwargs)
309
310 def dump_cmd(self):
311 return {
312 'cmd': '{} {}'.format(self.prefix, self.args),
313 'desc': self.desc,
314 'perm': self.perm
315 }
316
317 @classmethod
318 def dump_cmd_list(cls):
319 return [cmd.dump_cmd() for cmd in cls.COMMANDS.values()]
320
321
322 def CLIReadCommand(prefix, args="", desc=""):
323 return CLICommand(prefix, args, desc, "r")
324
325
326 def CLIWriteCommand(prefix, args="", desc=""):
327 return CLICommand(prefix, args, desc, "w")
328
329
330 def _get_localized_key(prefix, key):
331 return '{}/{}'.format(prefix, key)
332
333
334 class Option(dict):
335 """
336 Helper class to declare options for MODULE_OPTIONS list.
337
338 Caveat: it uses argument names matching Python keywords (type, min, max),
339 so any further processing should happen in a separate method.
340
341 TODO: type validation.
342 """
343
344 def __init__(
345 self, name,
346 default=None,
347 type='str',
348 desc=None, longdesc=None,
349 min=None, max=None,
350 enum_allowed=None,
351 see_also=None,
352 tags=None,
353 runtime=False,
354 ):
355 super(Option, self).__init__(
356 (k, v) for k, v in vars().items()
357 if k != 'self' and v is not None)
358
359 class Command(dict):
360 """
361 Helper class to declare options for COMMANDS list.
362
363 It also allows to specify prefix and args separately, as well as storing a
364 handler callable.
365
366 Usage:
367 >>> Command(prefix="example",
368 ... args="name=arg,type=CephInt",
369 ... perm='w',
370 ... desc="Blah")
371 {'poll': False, 'cmd': 'example name=arg,type=CephInt', 'perm': 'w', 'desc': 'Blah'}
372 """
373
374 def __init__(
375 self,
376 prefix,
377 args=None,
378 perm="rw",
379 desc=None,
380 poll=False,
381 handler=None
382 ):
383 super(Command, self).__init__(
384 cmd=prefix + (' ' + args if args else ''),
385 perm=perm,
386 desc=desc,
387 poll=poll)
388 self.prefix = prefix
389 self.args = args
390 self.handler = handler
391
392 def register(self, instance=False):
393 """
394 Register a CLICommand handler. It allows an instance to register bound
395 methods. In that case, the mgr instance is not passed, and it's expected
396 to be available in the class instance.
397 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
398 items.
399 """
400 return CLICommand(
401 prefix=self.prefix,
402 args=self.args,
403 desc=self['desc'],
404 perm=self['perm']
405 )(
406 func=lambda mgr, *args, **kwargs: HandleCommandResult(*self.handler(
407 *((instance or mgr,) + args), **kwargs))
408 )
409
410
411 class CPlusPlusHandler(logging.Handler):
412 def __init__(self, module_inst):
413 super(CPlusPlusHandler, self).__init__()
414 self._module = module_inst
415 self.setFormatter(logging.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
416 .format(module_inst.module_name)))
417
418 def emit(self, record):
419 if record.levelno >= self.level:
420 self._module._ceph_log(self.format(record))
421
422 class ClusterLogHandler(logging.Handler):
423 def __init__(self, module_inst):
424 super().__init__()
425 self._module = module_inst
426 self.setFormatter(logging.Formatter("%(message)s"))
427
428 def emit(self, record):
429 levelmap = {
430 'DEBUG': MgrModule.CLUSTER_LOG_PRIO_DEBUG,
431 'INFO': MgrModule.CLUSTER_LOG_PRIO_INFO,
432 'WARNING': MgrModule.CLUSTER_LOG_PRIO_WARN,
433 'ERROR': MgrModule.CLUSTER_LOG_PRIO_ERROR,
434 'CRITICAL': MgrModule.CLUSTER_LOG_PRIO_ERROR,
435 }
436 level = levelmap[record.levelname]
437 if record.levelno >= self.level:
438 self._module.cluster_log(self._module.module_name,
439 level,
440 self.format(record))
441
442 class FileHandler(logging.FileHandler):
443 def __init__(self, module_inst):
444 path = module_inst.get_ceph_option("log_file")
445 idx = path.rfind(".log")
446 if idx != -1:
447 self.path = "{}.{}.log".format(path[:idx], module_inst.module_name)
448 else:
449 self.path = "{}.{}".format(path, module_inst.module_name)
450 super(FileHandler, self).__init__(self.path, delay=True)
451 self.setFormatter(logging.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
452
453
454 class MgrModuleLoggingMixin(object):
455 def _configure_logging(self, mgr_level, module_level, cluster_level,
456 log_to_file, log_to_cluster):
457 self._mgr_level = None
458 self._module_level = None
459 self._root_logger = logging.getLogger()
460
461 self._unconfigure_logging()
462
463 # the ceph log handler is initialized only once
464 self._mgr_log_handler = CPlusPlusHandler(self)
465 self._cluster_log_handler = ClusterLogHandler(self)
466 self._file_log_handler = FileHandler(self)
467
468 self.log_to_file = log_to_file
469 self.log_to_cluster = log_to_cluster
470
471 self._root_logger.addHandler(self._mgr_log_handler)
472 if log_to_file:
473 self._root_logger.addHandler(self._file_log_handler)
474 if log_to_cluster:
475 self._root_logger.addHandler(self._cluster_log_handler)
476
477 self._root_logger.setLevel(logging.NOTSET)
478 self._set_log_level(mgr_level, module_level, cluster_level)
479
480
481 def _unconfigure_logging(self):
482 # remove existing handlers:
483 rm_handlers = [
484 h for h in self._root_logger.handlers if isinstance(h, CPlusPlusHandler) or isinstance(h, FileHandler) or isinstance(h, ClusterLogHandler)]
485 for h in rm_handlers:
486 self._root_logger.removeHandler(h)
487 self.log_to_file = False
488 self.log_to_cluster = False
489
490 def _set_log_level(self, mgr_level, module_level, cluster_level):
491 self._cluster_log_handler.setLevel(cluster_level.upper())
492
493 module_level = module_level.upper() if module_level else ''
494 if not self._module_level:
495 # using debug_mgr level
496 if not module_level and self._mgr_level == mgr_level:
497 # no change in module level neither in debug_mgr
498 return
499 else:
500 if self._module_level == module_level:
501 # no change in module level
502 return
503
504 if not self._module_level and not module_level:
505 level = self._ceph_log_level_to_python(mgr_level)
506 self.getLogger().debug("setting log level based on debug_mgr: %s (%s)", level, mgr_level)
507 elif self._module_level and not module_level:
508 level = self._ceph_log_level_to_python(mgr_level)
509 self.getLogger().warning("unsetting module log level, falling back to "
510 "debug_mgr level: %s (%s)", level, mgr_level)
511 elif module_level:
512 level = module_level
513 self.getLogger().debug("setting log level: %s", level)
514
515 self._module_level = module_level
516 self._mgr_level = mgr_level
517
518 self._mgr_log_handler.setLevel(level)
519 self._file_log_handler.setLevel(level)
520
521 def _enable_file_log(self):
522 # enable file log
523 self.getLogger().warning("enabling logging to file")
524 self.log_to_file = True
525 self._root_logger.addHandler(self._file_log_handler)
526
527 def _disable_file_log(self):
528 # disable file log
529 self.getLogger().warning("disabling logging to file")
530 self.log_to_file = False
531 self._root_logger.removeHandler(self._file_log_handler)
532
533 def _enable_cluster_log(self):
534 # enable cluster log
535 self.getLogger().warning("enabling logging to cluster")
536 self.log_to_cluster = True
537 self._root_logger.addHandler(self._cluster_log_handler)
538
539 def _disable_cluster_log(self):
540 # disable cluster log
541 self.getLogger().warning("disabling logging to cluster")
542 self.log_to_cluster = False
543 self._root_logger.removeHandler(self._cluster_log_handler)
544
545 def _ceph_log_level_to_python(self, ceph_log_level):
546 if ceph_log_level:
547 try:
548 ceph_log_level = int(ceph_log_level.split("/", 1)[0])
549 except ValueError:
550 ceph_log_level = 0
551 else:
552 ceph_log_level = 0
553
554 log_level = "DEBUG"
555 if ceph_log_level <= 0:
556 log_level = "CRITICAL"
557 elif ceph_log_level <= 1:
558 log_level = "WARNING"
559 elif ceph_log_level <= 4:
560 log_level = "INFO"
561 return log_level
562
563 def getLogger(self, name=None):
564 return logging.getLogger(name)
565
566
567 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule, MgrModuleLoggingMixin):
568 """
569 Standby modules only implement a serve and shutdown method, they
570 are not permitted to implement commands and they do not receive
571 any notifications.
572
573 They only have access to the mgrmap (for accessing service URI info
574 from their active peer), and to configuration settings (read only).
575 """
576
577 MODULE_OPTIONS = [] # type: List[Dict[str, Any]]
578 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
579
580 def __init__(self, module_name, capsule):
581 super(MgrStandbyModule, self).__init__(capsule)
582 self.module_name = module_name
583
584 # see also MgrModule.__init__()
585 for o in self.MODULE_OPTIONS:
586 if 'default' in o:
587 if 'type' in o:
588 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
589 else:
590 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
591
592 mgr_level = self.get_ceph_option("debug_mgr")
593 log_level = self.get_module_option("log_level")
594 cluster_level = self.get_module_option('log_to_cluster_level')
595 self._configure_logging(mgr_level, log_level, cluster_level,
596 False, False)
597
598 # for backwards compatibility
599 self._logger = self.getLogger()
600
601 def __del__(self):
602 self._unconfigure_logging()
603
604 @classmethod
605 def _register_options(cls, module_name):
606 cls.MODULE_OPTIONS.append(
607 Option(name='log_level', type='str', default="", runtime=True,
608 enum_allowed=['info', 'debug', 'critical', 'error',
609 'warning', '']))
610 cls.MODULE_OPTIONS.append(
611 Option(name='log_to_file', type='bool', default=False, runtime=True))
612 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
613 cls.MODULE_OPTIONS.append(
614 Option(name='log_to_cluster', type='bool', default=False,
615 runtime=True))
616 cls.MODULE_OPTIONS.append(
617 Option(name='log_to_cluster_level', type='str', default='info',
618 runtime=True,
619 enum_allowed=['info', 'debug', 'critical', 'error',
620 'warning', '']))
621
622 @property
623 def log(self):
624 return self._logger
625
626 def serve(self):
627 """
628 The serve method is mandatory for standby modules.
629 :return:
630 """
631 raise NotImplementedError()
632
633 def get_mgr_id(self):
634 return self._ceph_get_mgr_id()
635
636 def get_module_option(self, key, default=None):
637 """
638 Retrieve the value of a persistent configuration setting
639
640 :param str key:
641 :param default: the default value of the config if it is not found
642 :return: str
643 """
644 r = self._ceph_get_module_option(key)
645 if r is None:
646 return self.MODULE_OPTION_DEFAULTS.get(key, default)
647 else:
648 return r
649
650 def get_ceph_option(self, key):
651 return self._ceph_get_option(key)
652
653 def get_store(self, key):
654 """
655 Retrieve the value of a persistent KV store entry
656
657 :param key: String
658 :return: Byte string or None
659 """
660 return self._ceph_get_store(key)
661
662 def get_active_uri(self):
663 return self._ceph_get_active_uri()
664
665 def get_localized_module_option(self, key, default=None):
666 r = self._ceph_get_module_option(key, self.get_mgr_id())
667 if r is None:
668 return self.MODULE_OPTION_DEFAULTS.get(key, default)
669 else:
670 return r
671
672
673 class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
674 COMMANDS = [] # type: List[Any]
675 MODULE_OPTIONS = [] # type: List[dict]
676 MODULE_OPTION_DEFAULTS = {} # type: Dict[str, Any]
677
678 # Priority definitions for perf counters
679 PRIO_CRITICAL = 10
680 PRIO_INTERESTING = 8
681 PRIO_USEFUL = 5
682 PRIO_UNINTERESTING = 2
683 PRIO_DEBUGONLY = 0
684
685 # counter value types
686 PERFCOUNTER_TIME = 1
687 PERFCOUNTER_U64 = 2
688
689 # counter types
690 PERFCOUNTER_LONGRUNAVG = 4
691 PERFCOUNTER_COUNTER = 8
692 PERFCOUNTER_HISTOGRAM = 0x10
693 PERFCOUNTER_TYPE_MASK = ~3
694
695 # units supported
696 BYTES = 0
697 NONE = 1
698
699 # Cluster log priorities
700 CLUSTER_LOG_PRIO_DEBUG = 0
701 CLUSTER_LOG_PRIO_INFO = 1
702 CLUSTER_LOG_PRIO_SEC = 2
703 CLUSTER_LOG_PRIO_WARN = 3
704 CLUSTER_LOG_PRIO_ERROR = 4
705
706 def __init__(self, module_name, py_modules_ptr, this_ptr):
707 self.module_name = module_name
708 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
709
710 for o in self.MODULE_OPTIONS:
711 if 'default' in o:
712 if 'type' in o:
713 # we'll assume the declared type matches the
714 # supplied default value's type.
715 self.MODULE_OPTION_DEFAULTS[o['name']] = o['default']
716 else:
717 # module not declaring it's type, so normalize the
718 # default value to be a string for consistent behavior
719 # with default and user-supplied option values.
720 self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default'])
721
722 mgr_level = self.get_ceph_option("debug_mgr")
723 log_level = self.get_module_option("log_level")
724 cluster_level = self.get_module_option('log_to_cluster_level')
725 log_to_file = self.get_module_option("log_to_file")
726 log_to_cluster = self.get_module_option("log_to_cluster")
727 self._configure_logging(mgr_level, log_level, cluster_level,
728 log_to_file, log_to_cluster)
729
730 # for backwards compatibility
731 self._logger = self.getLogger()
732
733 self._version = self._ceph_get_version()
734
735 self._perf_schema_cache = None
736
737 # Keep a librados instance for those that need it.
738 self._rados = None
739
740
741 def __del__(self):
742 self._unconfigure_logging()
743
744 @classmethod
745 def _register_options(cls, module_name):
746 cls.MODULE_OPTIONS.append(
747 Option(name='log_level', type='str', default="", runtime=True,
748 enum_allowed=['info', 'debug', 'critical', 'error',
749 'warning', '']))
750 cls.MODULE_OPTIONS.append(
751 Option(name='log_to_file', type='bool', default=False, runtime=True))
752 if not [x for x in cls.MODULE_OPTIONS if x['name'] == 'log_to_cluster']:
753 cls.MODULE_OPTIONS.append(
754 Option(name='log_to_cluster', type='bool', default=False,
755 runtime=True))
756 cls.MODULE_OPTIONS.append(
757 Option(name='log_to_cluster_level', type='str', default='info',
758 runtime=True,
759 enum_allowed=['info', 'debug', 'critical', 'error',
760 'warning', '']))
761
762 @classmethod
763 def _register_commands(cls, module_name):
764 cls.COMMANDS.extend(CLICommand.dump_cmd_list())
765
766 @property
767 def log(self):
768 return self._logger
769
770 def cluster_log(self, channel, priority, message):
771 """
772 :param channel: The log channel. This can be 'cluster', 'audit', ...
773 :type channel: str
774 :param priority: The log message priority. This can be
775 CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
776 CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
777 CLUSTER_LOG_PRIO_ERROR.
778 :type priority: int
779 :param message: The message to log.
780 :type message: str
781 """
782 self._ceph_cluster_log(channel, priority, message)
783
784 @property
785 def version(self):
786 return self._version
787
788 @property
789 def release_name(self):
790 """
791 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
792 :return: Returns the release name of the Ceph version in lower case.
793 :rtype: str
794 """
795 return self._ceph_get_release_name()
796
797 def get_context(self):
798 """
799 :return: a Python capsule containing a C++ CephContext pointer
800 """
801 return self._ceph_get_context()
802
803 def notify(self, notify_type, notify_id):
804 """
805 Called by the ceph-mgr service to notify the Python plugin
806 that new state is available.
807
808 :param notify_type: string indicating what kind of notification,
809 such as osd_map, mon_map, fs_map, mon_status,
810 health, pg_summary, command, service_map
811 :param notify_id: string (may be empty) that optionally specifies
812 which entity is being notified about. With
813 "command" notifications this is set to the tag
814 ``from send_command``.
815 """
816 pass
817
818 def _config_notify(self):
819 # check logging options for changes
820 mgr_level = self.get_ceph_option("debug_mgr")
821 module_level = self.get_module_option("log_level")
822 cluster_level = self.get_module_option("log_to_cluster_level")
823 log_to_file = self.get_module_option("log_to_file", False)
824 log_to_cluster = self.get_module_option("log_to_cluster", False)
825
826 self._set_log_level(mgr_level, module_level, cluster_level)
827
828 if log_to_file != self.log_to_file:
829 if log_to_file:
830 self._enable_file_log()
831 else:
832 self._disable_file_log()
833 if log_to_cluster != self.log_to_cluster:
834 if log_to_cluster:
835 self._enable_cluster_log()
836 else:
837 self._disable_cluster_log()
838
839 # call module subclass implementations
840 self.config_notify()
841
842 def config_notify(self):
843 """
844 Called by the ceph-mgr service to notify the Python plugin
845 that the configuration may have changed. Modules will want to
846 refresh any configuration values stored in config variables.
847 """
848 pass
849
850 def serve(self):
851 """
852 Called by the ceph-mgr service to start any server that
853 is provided by this Python plugin. The implementation
854 of this function should block until ``shutdown`` is called.
855
856 You *must* implement ``shutdown`` if you implement ``serve``
857 """
858 pass
859
860 def shutdown(self):
861 """
862 Called by the ceph-mgr service to request that this
863 module drop out of its serve() function. You do not
864 need to implement this if you do not implement serve()
865
866 :return: None
867 """
868 if self._rados:
869 addrs = self._rados.get_addrs()
870 self._rados.shutdown()
871 self._ceph_unregister_client(addrs)
872
873 def get(self, data_name):
874 """
875 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
876
877 :param str data_name: Valid things to fetch are osd_crush_map_text,
878 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
879 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
880 health, mon_status, devices, device <devid>, pg_stats,
881 pool_stats, pg_ready, osd_ping_times.
882
883 Note:
884 All these structures have their own JSON representations: experiment
885 or look at the C++ ``dump()`` methods to learn about them.
886 """
887 return self._ceph_get(data_name)
888
889 def _stattype_to_str(self, stattype):
890
891 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
892 if typeonly == 0:
893 return 'gauge'
894 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
895 # this lie matches the DaemonState decoding: only val, no counts
896 return 'counter'
897 if typeonly == self.PERFCOUNTER_COUNTER:
898 return 'counter'
899 if typeonly == self.PERFCOUNTER_HISTOGRAM:
900 return 'histogram'
901
902 return ''
903
904 def _perfpath_to_path_labels(self, daemon, path):
905 # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]
906 label_names = ("ceph_daemon",) # type: Tuple[str, ...]
907 labels = (daemon,) # type: Tuple[str, ...]
908
909 if daemon.startswith('rbd-mirror.'):
910 match = re.match(
911 r'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
912 path
913 )
914 if match:
915 path = 'rbd_mirror_image_' + match.group(4)
916 pool = match.group(1)
917 namespace = match.group(2) or ''
918 image = match.group(3)
919 label_names += ('pool', 'namespace', 'image')
920 labels += (pool, namespace, image)
921
922 return path, label_names, labels,
923
924 def _perfvalue_to_value(self, stattype, value):
925 if stattype & self.PERFCOUNTER_TIME:
926 # Convert from ns to seconds
927 return value / 1000000000.0
928 else:
929 return value
930
931 def _unit_to_str(self, unit):
932 if unit == self.NONE:
933 return "/s"
934 elif unit == self.BYTES:
935 return "B/s"
936
937 @staticmethod
938 def to_pretty_iec(n):
939 for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
940 (20, 'Mi'), (10, 'Ki')]:
941 if n > 10 << bits:
942 return str(n >> bits) + ' ' + suffix
943 return str(n) + ' '
944
945 @staticmethod
946 def get_pretty_row(elems, width):
947 """
948 Takes an array of elements and returns a string with those elements
949 formatted as a table row. Useful for polling modules.
950
951 :param elems: the elements to be printed
952 :param width: the width of the terminal
953 """
954 n = len(elems)
955 column_width = int(width / n)
956
957 ret = '|'
958 for elem in elems:
959 ret += '{0:>{w}} |'.format(elem, w=column_width - 2)
960
961 return ret
962
963 def get_pretty_header(self, elems, width):
964 """
965 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
966
967 :param elems: the elements to be printed
968 :param width: the width of the terminal
969 """
970 n = len(elems)
971 column_width = int(width / n)
972
973 # dash line
974 ret = '+'
975 for i in range(0, n):
976 ret += '-' * (column_width - 1) + '+'
977 ret += '\n'
978
979 # title
980 ret += self.get_pretty_row(elems, width)
981 ret += '\n'
982
983 # dash line
984 ret += '+'
985 for i in range(0, n):
986 ret += '-' * (column_width - 1) + '+'
987 ret += '\n'
988
989 return ret
990
991 def get_server(self, hostname):
992 """
993 Called by the plugin to fetch metadata about a particular hostname from
994 ceph-mgr.
995
996 This is information that ceph-mgr has gleaned from the daemon metadata
997 reported by daemons running on a particular server.
998
999 :param hostname: a hostname
1000 """
1001 return self._ceph_get_server(hostname)
1002
1003 def get_perf_schema(self, svc_type, svc_name):
1004 """
1005 Called by the plugin to fetch perf counter schema info.
1006 svc_name can be nullptr, as can svc_type, in which case
1007 they are wildcards
1008
1009 :param str svc_type:
1010 :param str svc_name:
1011 :return: list of dicts describing the counters requested
1012 """
1013 return self._ceph_get_perf_schema(svc_type, svc_name)
1014
1015 def get_counter(self, svc_type, svc_name, path):
1016 """
1017 Called by the plugin to fetch the latest performance counter data for a
1018 particular counter on a particular service.
1019
1020 :param str svc_type:
1021 :param str svc_name:
1022 :param str path: a period-separated concatenation of the subsystem and the
1023 counter name, for example "mds.inodes".
1024 :return: A list of two-tuples of (timestamp, value) is returned. This may be
1025 empty if no data is available.
1026 """
1027 return self._ceph_get_counter(svc_type, svc_name, path)
1028
1029 def get_latest_counter(self, svc_type, svc_name, path):
1030 """
1031 Called by the plugin to fetch only the newest performance counter data
1032 pointfor a particular counter on a particular service.
1033
1034 :param str svc_type:
1035 :param str svc_name:
1036 :param str path: a period-separated concatenation of the subsystem and the
1037 counter name, for example "mds.inodes".
1038 :return: A list of two-tuples of (timestamp, value) is returned. This may be
1039 empty if no data is available.
1040 """
1041 return self._ceph_get_latest_counter(svc_type, svc_name, path)
1042
1043 def list_servers(self):
1044 """
1045 Like ``get_server``, but gives information about all servers (i.e. all
1046 unique hostnames that have been mentioned in daemon metadata)
1047
1048 :return: a list of information about all servers
1049 :rtype: list
1050 """
1051 return self._ceph_get_server(None)
1052
1053 def get_metadata(self, svc_type, svc_id):
1054 """
1055 Fetch the daemon metadata for a particular service.
1056
1057 ceph-mgr fetches metadata asynchronously, so are windows of time during
1058 addition/removal of services where the metadata is not available to
1059 modules. ``None`` is returned if no metadata is available.
1060
1061 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1062 :param str svc_id: service id. convert OSD integer IDs to strings when
1063 calling this
1064 :rtype: dict, or None if no metadata found
1065 """
1066 return self._ceph_get_metadata(svc_type, svc_id)
1067
1068 def get_daemon_status(self, svc_type, svc_id):
1069 """
1070 Fetch the latest status for a particular service daemon.
1071
1072 This method may return ``None`` if no status information is
1073 available, for example because the daemon hasn't fully started yet.
1074
1075 :param svc_type: string (e.g., 'rgw')
1076 :param svc_id: string
1077 :return: dict, or None if the service is not found
1078 """
1079 return self._ceph_get_daemon_status(svc_type, svc_id)
1080
1081 def mon_command(self, cmd_dict):
1082 """
1083 Helper for modules that do simple, synchronous mon command
1084 execution.
1085
1086 See send_command for general case.
1087
1088 :return: status int, out std, err str
1089 """
1090
1091 t1 = time.time()
1092 result = CommandResult()
1093 self.send_command(result, "mon", "", json.dumps(cmd_dict), "")
1094 r = result.wait()
1095 t2 = time.time()
1096
1097 self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1098 cmd_dict['prefix'], r[0], t2 - t1
1099 ))
1100
1101 return r
1102
1103 def send_command(self, *args, **kwargs):
1104 """
1105 Called by the plugin to send a command to the mon
1106 cluster.
1107
1108 :param CommandResult result: an instance of the ``CommandResult``
1109 class, defined in the same module as MgrModule. This acts as a
1110 completion and stores the output of the command. Use
1111 ``CommandResult.wait()`` if you want to block on completion.
1112 :param str svc_type:
1113 :param str svc_id:
1114 :param str command: a JSON-serialized command. This uses the same
1115 format as the ceph command line, which is a dictionary of command
1116 arguments, with the extra ``prefix`` key containing the command
1117 name itself. Consult MonCommands.h for available commands and
1118 their expected arguments.
1119 :param str tag: used for nonblocking operation: when a command
1120 completes, the ``notify()`` callback on the MgrModule instance is
1121 triggered, with notify_type set to "command", and notify_id set to
1122 the tag of the command.
1123 """
1124 self._ceph_send_command(*args, **kwargs)
1125
1126 def set_health_checks(self, checks):
1127 """
1128 Set the module's current map of health checks. Argument is a
1129 dict of check names to info, in this form:
1130
1131 ::
1132
1133 {
1134 'CHECK_FOO': {
1135 'severity': 'warning', # or 'error'
1136 'summary': 'summary string',
1137 'count': 4, # quantify badness
1138 'detail': [ 'list', 'of', 'detail', 'strings' ],
1139 },
1140 'CHECK_BAR': {
1141 'severity': 'error',
1142 'summary': 'bars are bad',
1143 'detail': [ 'too hard' ],
1144 },
1145 }
1146
1147 :param list: dict of health check dicts
1148 """
1149 self._ceph_set_health_checks(checks)
1150
1151 def _handle_command(self, inbuf, cmd):
1152 if cmd['prefix'] not in CLICommand.COMMANDS:
1153 return self.handle_command(inbuf, cmd)
1154
1155 return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf)
1156
1157 def handle_command(self, inbuf, cmd):
1158 """
1159 Called by ceph-mgr to request the plugin to handle one
1160 of the commands that it declared in self.COMMANDS
1161
1162 Return a status code, an output buffer, and an
1163 output string. The output buffer is for data results,
1164 the output string is for informative text.
1165
1166 :param inbuf: content of any "-i <file>" supplied to ceph cli
1167 :type inbuf: str
1168 :param cmd: from Ceph's cmdmap_t
1169 :type cmd: dict
1170
1171 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1172 """
1173
1174 # Should never get called if they didn't declare
1175 # any ``COMMANDS``
1176 raise NotImplementedError()
1177
1178 def get_mgr_id(self):
1179 """
1180 Retrieve the name of the manager daemon where this plugin
1181 is currently being executed (i.e. the active manager).
1182
1183 :return: str
1184 """
1185 return self._ceph_get_mgr_id()
1186
1187 def get_ceph_option(self, key):
1188 return self._ceph_get_option(key)
1189
1190 def _validate_module_option(self, key):
1191 """
1192 Helper: don't allow get/set config callers to
1193 access config options that they didn't declare
1194 in their schema.
1195 """
1196 if key not in [o['name'] for o in self.MODULE_OPTIONS]:
1197 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1198 format(key, self.__class__.__name__))
1199
1200 def _get_module_option(self, key, default, localized_prefix=""):
1201 r = self._ceph_get_module_option(self.module_name, key,
1202 localized_prefix)
1203 if r is None:
1204 return self.MODULE_OPTION_DEFAULTS.get(key, default)
1205 else:
1206 return r
1207
1208 def get_module_option(self, key, default=None):
1209 """
1210 Retrieve the value of a persistent configuration setting
1211
1212 :param str key:
1213 :param str default:
1214 :return: str
1215 """
1216 self._validate_module_option(key)
1217 return self._get_module_option(key, default)
1218
1219 def get_module_option_ex(self, module, key, default=None):
1220 """
1221 Retrieve the value of a persistent configuration setting
1222 for the specified module.
1223
1224 :param str module: The name of the module, e.g. 'dashboard'
1225 or 'telemetry'.
1226 :param str key: The configuration key, e.g. 'server_addr'.
1227 :param str,None default: The default value to use when the
1228 returned value is ``None``. Defaults to ``None``.
1229 :return: str,int,bool,float,None
1230 """
1231 if module == self.module_name:
1232 self._validate_module_option(key)
1233 r = self._ceph_get_module_option(module, key)
1234 return default if r is None else r
1235
1236 def get_store_prefix(self, key_prefix):
1237 """
1238 Retrieve a dict of KV store keys to values, where the keys
1239 have the given prefix
1240
1241 :param str key_prefix:
1242 :return: str
1243 """
1244 return self._ceph_get_store_prefix(key_prefix)
1245
1246 def _set_localized(self, key, val, setter):
1247 return setter(_get_localized_key(self.get_mgr_id(), key), val)
1248
1249 def get_localized_module_option(self, key, default=None):
1250 """
1251 Retrieve localized configuration for this ceph-mgr instance
1252 :param str key:
1253 :param str default:
1254 :return: str
1255 """
1256 self._validate_module_option(key)
1257 return self._get_module_option(key, default, self.get_mgr_id())
1258
1259 def _set_module_option(self, key, val):
1260 return self._ceph_set_module_option(self.module_name, key,
1261 None if val is None else str(val))
1262
1263 def set_module_option(self, key, val):
1264 """
1265 Set the value of a persistent configuration setting
1266
1267 :param str key:
1268 :type val: str | None
1269 """
1270 self._validate_module_option(key)
1271 return self._set_module_option(key, val)
1272
1273 def set_module_option_ex(self, module, key, val):
1274 """
1275 Set the value of a persistent configuration setting
1276 for the specified module.
1277
1278 :param str module:
1279 :param str key:
1280 :param str val:
1281 """
1282 if module == self.module_name:
1283 self._validate_module_option(key)
1284 return self._ceph_set_module_option(module, key, str(val))
1285
1286 def set_localized_module_option(self, key, val):
1287 """
1288 Set localized configuration for this ceph-mgr instance
1289 :param str key:
1290 :param str val:
1291 :return: str
1292 """
1293 self._validate_module_option(key)
1294 return self._set_localized(key, val, self._set_module_option)
1295
1296 def set_store(self, key, val):
1297 """
1298 Set a value in this module's persistent key value store.
1299 If val is None, remove key from store
1300
1301 :param str key:
1302 :param str val:
1303 """
1304 self._ceph_set_store(key, val)
1305
1306 def get_store(self, key, default=None):
1307 """
1308 Get a value from this module's persistent key value store
1309 """
1310 r = self._ceph_get_store(key)
1311 if r is None:
1312 return default
1313 else:
1314 return r
1315
1316 def get_localized_store(self, key, default=None):
1317 r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
1318 if r is None:
1319 r = self._ceph_get_store(key)
1320 if r is None:
1321 r = default
1322 return r
1323
1324 def set_localized_store(self, key, val):
1325 return self._set_localized(key, val, self.set_store)
1326
1327 def self_test(self):
1328 """
1329 Run a self-test on the module. Override this function and implement
1330 a best as possible self-test for (automated) testing of the module
1331
1332 Indicate any failures by raising an exception. This does not have
1333 to be pretty, it's mainly for picking up regressions during
1334 development, rather than use in the field.
1335
1336 :return: None, or an advisory string for developer interest, such
1337 as a json dump of some state.
1338 """
1339 pass
1340
1341 def get_osdmap(self):
1342 """
1343 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1344 OSDMap.
1345 :return: OSDMap
1346 """
1347 return self._ceph_get_osdmap()
1348
1349 def get_latest(self, daemon_type, daemon_name, counter):
1350 data = self.get_latest_counter(
1351 daemon_type, daemon_name, counter)[counter]
1352 if data:
1353 return data[1]
1354 else:
1355 return 0
1356
1357 def get_latest_avg(self, daemon_type, daemon_name, counter):
1358 data = self.get_latest_counter(
1359 daemon_type, daemon_name, counter)[counter]
1360 if data:
1361 return data[1], data[2]
1362 else:
1363 return 0, 0
1364
1365 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL,
1366 services=("mds", "mon", "osd",
1367 "rbd-mirror", "rgw", "tcmu-runner")):
1368 """
1369 Return the perf counters currently known to this ceph-mgr
1370 instance, filtered by priority equal to or greater than `prio_limit`.
1371
1372 The result is a map of string to dict, associating services
1373 (like "osd.123") with their counters. The counter
1374 dict for each service maps counter paths to a counter
1375 info structure, which is the information from
1376 the schema, plus an additional "value" member with the latest
1377 value.
1378 """
1379
1380 result = defaultdict(dict) # type: Dict[str, dict]
1381
1382 for server in self.list_servers():
1383 for service in server['services']:
1384 if service['type'] not in services:
1385 continue
1386
1387 schema = self.get_perf_schema(service['type'], service['id'])
1388 if not schema:
1389 self.log.warn("No perf counter schema for {0}.{1}".format(
1390 service['type'], service['id']
1391 ))
1392 continue
1393
1394 # Value is returned in a potentially-multi-service format,
1395 # get just the service we're asking about
1396 svc_full_name = "{0}.{1}".format(
1397 service['type'], service['id'])
1398 schema = schema[svc_full_name]
1399
1400 # Populate latest values
1401 for counter_path, counter_schema in schema.items():
1402 # self.log.debug("{0}: {1}".format(
1403 # counter_path, json.dumps(counter_schema)
1404 # ))
1405 if counter_schema['priority'] < prio_limit:
1406 continue
1407
1408 counter_info = dict(counter_schema)
1409
1410 # Also populate count for the long running avgs
1411 if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
1412 v, c = self.get_latest_avg(
1413 service['type'],
1414 service['id'],
1415 counter_path
1416 )
1417 counter_info['value'], counter_info['count'] = v, c
1418 result[svc_full_name][counter_path] = counter_info
1419 else:
1420 counter_info['value'] = self.get_latest(
1421 service['type'],
1422 service['id'],
1423 counter_path
1424 )
1425
1426 result[svc_full_name][counter_path] = counter_info
1427
1428 self.log.debug("returning {0} counter".format(len(result)))
1429
1430 return result
1431
1432 def set_uri(self, uri):
1433 """
1434 If the module exposes a service, then call this to publish the
1435 address once it is available.
1436
1437 :return: a string
1438 """
1439 return self._ceph_set_uri(uri)
1440
1441 def have_mon_connection(self):
1442 """
1443 Check whether this ceph-mgr daemon has an open connection
1444 to a monitor. If it doesn't, then it's likely that the
1445 information we have about the cluster is out of date,
1446 and/or the monitor cluster is down.
1447 """
1448
1449 return self._ceph_have_mon_connection()
1450
1451 def update_progress_event(self, evid, desc, progress):
1452 return self._ceph_update_progress_event(str(evid),
1453 str(desc),
1454 float(progress))
1455
1456 def complete_progress_event(self, evid):
1457 return self._ceph_complete_progress_event(str(evid))
1458
1459 def clear_all_progress_events(self):
1460 return self._ceph_clear_all_progress_events()
1461
1462 @property
1463 def rados(self):
1464 """
1465 A librados instance to be shared by any classes within
1466 this mgr module that want one.
1467 """
1468 if self._rados:
1469 return self._rados
1470
1471 ctx_capsule = self.get_context()
1472 self._rados = rados.Rados(context=ctx_capsule)
1473 self._rados.connect()
1474 self._ceph_register_client(self._rados.get_addrs())
1475 return self._rados
1476
1477 @staticmethod
1478 def can_run():
1479 """
1480 Implement this function to report whether the module's dependencies
1481 are met. For example, if the module needs to import a particular
1482 dependency to work, then use a try/except around the import at
1483 file scope, and then report here if the import failed.
1484
1485 This will be called in a blocking way from the C++ code, so do not
1486 do any I/O that could block in this function.
1487
1488 :return a 2-tuple consisting of a boolean and explanatory string
1489 """
1490
1491 return True, ""
1492
1493 def remote(self, module_name, method_name, *args, **kwargs):
1494 """
1495 Invoke a method on another module. All arguments, and the return
1496 value from the other module must be serializable.
1497
1498 Limitation: Do not import any modules within the called method.
1499 Otherwise you will get an error in Python 2::
1500
1501 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1502
1503
1504
1505 :param module_name: Name of other module. If module isn't loaded,
1506 an ImportError exception is raised.
1507 :param method_name: Method name. If it does not exist, a NameError
1508 exception is raised.
1509 :param args: Argument tuple
1510 :param kwargs: Keyword argument dict
1511 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1512 :raises ImportError: No such module
1513 """
1514 return self._ceph_dispatch_remote(module_name, method_name,
1515 args, kwargs)
1516
1517 def add_osd_perf_query(self, query):
1518 """
1519 Register an OSD perf query. Argument is a
1520 dict of the query parameters, in this form:
1521
1522 ::
1523
1524 {
1525 'key_descriptor': [
1526 {'type': subkey_type, 'regex': regex_pattern},
1527 ...
1528 ],
1529 'performance_counter_descriptors': [
1530 list, of, descriptor, types
1531 ],
1532 'limit': {'order_by': performance_counter_type, 'max_count': n},
1533 }
1534
1535 Valid subkey types:
1536 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1537 'pg_id', 'object_name', 'snap_id'
1538 Valid performance counter types:
1539 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1540 'latency', 'write_latency', 'read_latency'
1541
1542 :param object query: query
1543 :rtype: int (query id)
1544 """
1545 return self._ceph_add_osd_perf_query(query)
1546
1547 def remove_osd_perf_query(self, query_id):
1548 """
1549 Unregister an OSD perf query.
1550
1551 :param int query_id: query ID
1552 """
1553 return self._ceph_remove_osd_perf_query(query_id)
1554
1555 def get_osd_perf_counters(self, query_id):
1556 """
1557 Get stats collected for an OSD perf query.
1558
1559 :param int query_id: query ID
1560 """
1561 return self._ceph_get_osd_perf_counters(query_id)
1562
1563 def is_authorized(self, arguments):
1564 """
1565 Verifies that the current session caps permit executing the py service
1566 or current module with the provided arguments. This provides a generic
1567 way to allow modules to restrict by more fine-grained controls (e.g.
1568 pools).
1569
1570 :param arguments: dict of key/value arguments to test
1571 """
1572 return self._ceph_is_authorized(arguments)