]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
d6ba31f72086247711e4b14b375b53a66856047d
1 import ceph_module
# noqa
4 from typing
import Set
, Tuple
, Iterator
, Any
, Dict
, Optional
, Callable
, List
6 # just for type checking
13 from collections
import defaultdict
, namedtuple
51 class CommandResult(object):
53 Use with MgrModule.send_command
56 def __init__(self
, tag
=None):
57 self
.ev
= threading
.Event()
62 # This is just a convenience for notifications from
63 # C++ land, to avoid passing addresses around in messages.
64 self
.tag
= tag
if tag
else ""
66 def complete(self
, r
, outb
, outs
):
74 return self
.r
, self
.outb
, self
.outs
77 class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
78 def __new__(cls
, retval
=0, stdout
="", stderr
=""):
80 Tuple containing the result of `handle_command()`
82 Only write to stderr if there is an error, or in extraordinary circumstances
84 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
85 is critical information to include there.
87 Everything programmatically consumable should be put on stdout
89 :param retval: return code. E.g. 0 or -errno.EINVAL
91 :param stdout: data of this result.
93 :param stderr: Typically used for error messages.
96 return super(HandleCommandResult
, cls
).__new
__(cls
, retval
, stdout
, stderr
)
99 class OSDMap(ceph_module
.BasePyOSDMap
):
101 return self
._get
_epoch
()
103 def get_crush_version(self
):
104 return self
._get
_crush
_version
()
110 # FIXME: efficient implementation
112 return dict([(p
['pool'], p
) for p
in d
['pools']])
114 def get_pools_by_name(self
):
115 # FIXME: efficient implementation
117 return dict([(p
['pool_name'], p
) for p
in d
['pools']])
119 def new_incremental(self
):
120 return self
._new
_incremental
()
122 def apply_incremental(self
, inc
):
123 return self
._apply
_incremental
(inc
)
126 return self
._get
_crush
()
128 def get_pools_by_take(self
, take
):
129 return self
._get
_pools
_by
_take
(take
).get('pools', [])
131 def calc_pg_upmaps(self
, inc
,
132 max_deviation
=.01, max_iterations
=10, pools
=None):
135 return self
._calc
_pg
_upmaps
(
137 max_deviation
, max_iterations
, pools
)
139 def map_pool_pgs_up(self
, poolid
):
140 return self
._map
_pool
_pgs
_up
(poolid
)
142 def pg_to_up_acting_osds(self
, pool_id
, ps
):
143 return self
._pg
_to
_up
_acting
_osds
(pool_id
, ps
)
145 def pool_raw_used_rate(self
, pool_id
):
146 return self
._pool
_raw
_used
_rate
(pool_id
)
148 def get_ec_profile(self
, name
):
149 # FIXME: efficient implementation
151 return d
['erasure_code_profiles'].get(name
, None)
153 def get_require_osd_release(self
):
155 return d
['require_osd_release']
158 class OSDMapIncremental(ceph_module
.BasePyOSDMapIncremental
):
160 return self
._get
_epoch
()
165 def set_osd_reweights(self
, weightmap
):
167 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
169 return self
._set
_osd
_reweights
(weightmap
)
171 def set_crush_compat_weight_set_weights(self
, weightmap
):
173 weightmap is a dict, int to float. devices only. e.g.,
174 { 0: 3.4, 1: 3.3, 2: 3.334 }
176 return self
._set
_crush
_compat
_weight
_set
_weights
(weightmap
)
179 class CRUSHMap(ceph_module
.BasePyCRUSH
):
180 ITEM_NONE
= 0x7fffffff
181 DEFAULT_CHOOSE_ARGS
= '-1'
186 def get_item_weight(self
, item
):
187 return self
._get
_item
_weight
(item
)
189 def get_item_name(self
, item
):
190 return self
._get
_item
_name
(item
)
192 def find_takes(self
):
193 return self
._find
_takes
().get('takes', [])
195 def get_take_weight_osd_map(self
, root
):
196 uglymap
= self
._get
_take
_weight
_osd
_map
(root
)
197 return {int(k
): v
for k
, v
in six
.iteritems(uglymap
.get('weights', {}))}
200 def have_default_choose_args(dump
):
201 return CRUSHMap
.DEFAULT_CHOOSE_ARGS
in dump
.get('choose_args', {})
204 def get_default_choose_args(dump
):
205 return dump
.get('choose_args').get(CRUSHMap
.DEFAULT_CHOOSE_ARGS
, [])
207 def get_rule(self
, rule_name
):
208 # TODO efficient implementation
209 for rule
in self
.dump()['rules']:
210 if rule_name
== rule
['rule_name']:
215 def get_rule_by_id(self
, rule_id
):
216 for rule
in self
.dump()['rules']:
217 if rule
['rule_id'] == rule_id
:
222 def get_rule_root(self
, rule_name
):
223 rule
= self
.get_rule(rule_name
)
228 first_take
= [s
for s
in rule
['steps'] if s
['op'] == 'take'][0]
230 logging
.warning("CRUSH rule '{0}' has no 'take' step".format(
234 return first_take
['item']
236 def get_osds_under(self
, root_id
):
237 # TODO don't abuse dump like this
239 buckets
= dict([(b
['id'], b
) for b
in d
['buckets']])
244 for item
in b
['items']:
246 osd_list
.append(item
['id'])
249 accumulate(buckets
[item
['id']])
253 accumulate(buckets
[root_id
])
257 def device_class_counts(self
):
258 result
= defaultdict(int) # type: Dict[str, int]
259 # TODO don't abuse dump like this
261 for device
in d
['devices']:
262 cls
= device
.get('class', None)
268 class CLICommand(object):
269 COMMANDS
= {} # type: Dict[str, CLICommand]
271 def __init__(self
, prefix
, args
="", desc
="", perm
="rw"):
277 self
.func
= None # type: Optional[Callable]
280 def _parse_args(self
):
283 args
= self
.args
.split(" ")
285 arg_desc
= arg
.strip().split(",")
292 self
.args_dict
[v
] = arg_d
294 def __call__(self
, func
):
296 self
.COMMANDS
[self
.prefix
] = self
299 def call(self
, mgr
, cmd_dict
, inbuf
):
301 for a
, d
in self
.args_dict
.items():
302 if 'req' in d
and d
['req'] == "false" and a
not in cmd_dict
:
304 kwargs
[a
.replace("-", "_")] = cmd_dict
[a
]
306 kwargs
['inbuf'] = inbuf
308 return self
.func(mgr
, **kwargs
)
312 'cmd': '{} {}'.format(self
.prefix
, self
.args
),
318 def dump_cmd_list(cls
):
319 return [cmd
.dump_cmd() for cmd
in cls
.COMMANDS
.values()]
322 def CLIReadCommand(prefix
, args
="", desc
=""):
323 return CLICommand(prefix
, args
, desc
, "r")
326 def CLIWriteCommand(prefix
, args
="", desc
=""):
327 return CLICommand(prefix
, args
, desc
, "w")
330 def _get_localized_key(prefix
, key
):
331 return '{}/{}'.format(prefix
, key
)
336 Helper class to declare options for MODULE_OPTIONS list.
338 Caveat: it uses argument names matching Python keywords (type, min, max),
339 so any further processing should happen in a separate method.
341 TODO: type validation.
348 desc
=None, longdesc
=None,
355 super(Option
, self
).__init
__(
356 (k
, v
) for k
, v
in vars().items()
357 if k
!= 'self' and v
is not None)
361 Helper class to declare options for COMMANDS list.
363 It also allows to specify prefix and args separately, as well as storing a
367 >>> Command(prefix="example",
368 ... args="name=arg,type=CephInt",
371 {'poll': False, 'cmd': 'example name=arg,type=CephInt', 'perm': 'w', 'desc': 'Blah'}
383 super(Command
, self
).__init
__(
384 cmd
=prefix
+ (' ' + args
if args
else ''),
390 self
.handler
= handler
392 def register(self
, instance
=False):
394 Register a CLICommand handler. It allows an instance to register bound
395 methods. In that case, the mgr instance is not passed, and it's expected
396 to be available in the class instance.
397 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
406 func
=lambda mgr
, *args
, **kwargs
: HandleCommandResult(*self
.handler(
407 *((instance
or mgr
,) + args
), **kwargs
))
411 class CPlusPlusHandler(logging
.Handler
):
412 def __init__(self
, module_inst
):
413 super(CPlusPlusHandler
, self
).__init
__()
414 self
._module
= module_inst
415 self
.setFormatter(logging
.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
416 .format(module_inst
.module_name
)))
418 def emit(self
, record
):
419 if record
.levelno
>= self
.level
:
420 self
._module
._ceph
_log
(self
.format(record
))
422 class ClusterLogHandler(logging
.Handler
):
423 def __init__(self
, module_inst
):
425 self
._module
= module_inst
426 self
.setFormatter(logging
.Formatter("%(message)s"))
428 def emit(self
, record
):
430 'DEBUG': MgrModule
.CLUSTER_LOG_PRIO_DEBUG
,
431 'INFO': MgrModule
.CLUSTER_LOG_PRIO_INFO
,
432 'WARNING': MgrModule
.CLUSTER_LOG_PRIO_WARN
,
433 'ERROR': MgrModule
.CLUSTER_LOG_PRIO_ERROR
,
434 'CRITICAL': MgrModule
.CLUSTER_LOG_PRIO_ERROR
,
436 level
= levelmap
[record
.levelname
]
437 if record
.levelno
>= self
.level
:
438 self
._module
.cluster_log(self
._module
.module_name
,
442 class FileHandler(logging
.FileHandler
):
443 def __init__(self
, module_inst
):
444 path
= module_inst
.get_ceph_option("log_file")
445 idx
= path
.rfind(".log")
447 self
.path
= "{}.{}.log".format(path
[:idx
], module_inst
.module_name
)
449 self
.path
= "{}.{}".format(path
, module_inst
.module_name
)
450 super(FileHandler
, self
).__init
__(self
.path
, delay
=True)
451 self
.setFormatter(logging
.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
454 class MgrModuleLoggingMixin(object):
455 def _configure_logging(self
, mgr_level
, module_level
, cluster_level
,
456 log_to_file
, log_to_cluster
):
457 self
._mgr
_level
= None
458 self
._module
_level
= None
459 self
._root
_logger
= logging
.getLogger()
461 self
._unconfigure
_logging
()
463 # the ceph log handler is initialized only once
464 self
._mgr
_log
_handler
= CPlusPlusHandler(self
)
465 self
._cluster
_log
_handler
= ClusterLogHandler(self
)
466 self
._file
_log
_handler
= FileHandler(self
)
468 self
.log_to_file
= log_to_file
469 self
.log_to_cluster
= log_to_cluster
471 self
._root
_logger
.addHandler(self
._mgr
_log
_handler
)
473 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
475 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
477 self
._root
_logger
.setLevel(logging
.NOTSET
)
478 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
481 def _unconfigure_logging(self
):
482 # remove existing handlers:
484 h
for h
in self
._root
_logger
.handlers
if isinstance(h
, CPlusPlusHandler
) or isinstance(h
, FileHandler
) or isinstance(h
, ClusterLogHandler
)]
485 for h
in rm_handlers
:
486 self
._root
_logger
.removeHandler(h
)
487 self
.log_to_file
= False
488 self
.log_to_cluster
= False
490 def _set_log_level(self
, mgr_level
, module_level
, cluster_level
):
491 self
._cluster
_log
_handler
.setLevel(cluster_level
.upper())
493 module_level
= module_level
.upper() if module_level
else ''
494 if not self
._module
_level
:
495 # using debug_mgr level
496 if not module_level
and self
._mgr
_level
== mgr_level
:
497 # no change in module level neither in debug_mgr
500 if self
._module
_level
== module_level
:
501 # no change in module level
504 if not self
._module
_level
and not module_level
:
505 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
506 self
.getLogger().debug("setting log level based on debug_mgr: %s (%s)", level
, mgr_level
)
507 elif self
._module
_level
and not module_level
:
508 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
509 self
.getLogger().warning("unsetting module log level, falling back to "
510 "debug_mgr level: %s (%s)", level
, mgr_level
)
513 self
.getLogger().debug("setting log level: %s", level
)
515 self
._module
_level
= module_level
516 self
._mgr
_level
= mgr_level
518 self
._mgr
_log
_handler
.setLevel(level
)
519 self
._file
_log
_handler
.setLevel(level
)
521 def _enable_file_log(self
):
523 self
.getLogger().warning("enabling logging to file")
524 self
.log_to_file
= True
525 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
527 def _disable_file_log(self
):
529 self
.getLogger().warning("disabling logging to file")
530 self
.log_to_file
= False
531 self
._root
_logger
.removeHandler(self
._file
_log
_handler
)
533 def _enable_cluster_log(self
):
535 self
.getLogger().warning("enabling logging to cluster")
536 self
.log_to_cluster
= True
537 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
539 def _disable_cluster_log(self
):
540 # disable cluster log
541 self
.getLogger().warning("disabling logging to cluster")
542 self
.log_to_cluster
= False
543 self
._root
_logger
.removeHandler(self
._cluster
_log
_handler
)
545 def _ceph_log_level_to_python(self
, ceph_log_level
):
548 ceph_log_level
= int(ceph_log_level
.split("/", 1)[0])
555 if ceph_log_level
<= 0:
556 log_level
= "CRITICAL"
557 elif ceph_log_level
<= 1:
558 log_level
= "WARNING"
559 elif ceph_log_level
<= 4:
563 def getLogger(self
, name
=None):
564 return logging
.getLogger(name
)
567 class MgrStandbyModule(ceph_module
.BaseMgrStandbyModule
, MgrModuleLoggingMixin
):
569 Standby modules only implement a serve and shutdown method, they
570 are not permitted to implement commands and they do not receive
573 They only have access to the mgrmap (for accessing service URI info
574 from their active peer), and to configuration settings (read only).
577 MODULE_OPTIONS
= [] # type: List[Dict[str, Any]]
578 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
580 def __init__(self
, module_name
, capsule
):
581 super(MgrStandbyModule
, self
).__init
__(capsule
)
582 self
.module_name
= module_name
584 # see also MgrModule.__init__()
585 for o
in self
.MODULE_OPTIONS
:
588 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
590 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
592 mgr_level
= self
.get_ceph_option("debug_mgr")
593 log_level
= self
.get_module_option("log_level")
594 cluster_level
= self
.get_module_option('log_to_cluster_level')
595 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
598 # for backwards compatibility
599 self
._logger
= self
.getLogger()
602 self
._unconfigure
_logging
()
605 def _register_options(cls
, module_name
):
606 cls
.MODULE_OPTIONS
.append(
607 Option(name
='log_level', type='str', default
="", runtime
=True,
608 enum_allowed
=['info', 'debug', 'critical', 'error',
610 cls
.MODULE_OPTIONS
.append(
611 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
612 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
613 cls
.MODULE_OPTIONS
.append(
614 Option(name
='log_to_cluster', type='bool', default
=False,
616 cls
.MODULE_OPTIONS
.append(
617 Option(name
='log_to_cluster_level', type='str', default
='info',
619 enum_allowed
=['info', 'debug', 'critical', 'error',
628 The serve method is mandatory for standby modules.
631 raise NotImplementedError()
633 def get_mgr_id(self
):
634 return self
._ceph
_get
_mgr
_id
()
636 def get_module_option(self
, key
, default
=None):
638 Retrieve the value of a persistent configuration setting
641 :param default: the default value of the config if it is not found
644 r
= self
._ceph
_get
_module
_option
(key
)
646 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
650 def get_ceph_option(self
, key
):
651 return self
._ceph
_get
_option
(key
)
653 def get_store(self
, key
):
655 Retrieve the value of a persistent KV store entry
658 :return: Byte string or None
660 return self
._ceph
_get
_store
(key
)
662 def get_active_uri(self
):
663 return self
._ceph
_get
_active
_uri
()
665 def get_localized_module_option(self
, key
, default
=None):
666 r
= self
._ceph
_get
_module
_option
(key
, self
.get_mgr_id())
668 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
673 class MgrModule(ceph_module
.BaseMgrModule
, MgrModuleLoggingMixin
):
674 COMMANDS
= [] # type: List[Any]
675 MODULE_OPTIONS
= [] # type: List[dict]
676 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
678 # Priority definitions for perf counters
682 PRIO_UNINTERESTING
= 2
685 # counter value types
690 PERFCOUNTER_LONGRUNAVG
= 4
691 PERFCOUNTER_COUNTER
= 8
692 PERFCOUNTER_HISTOGRAM
= 0x10
693 PERFCOUNTER_TYPE_MASK
= ~
3
699 # Cluster log priorities
700 CLUSTER_LOG_PRIO_DEBUG
= 0
701 CLUSTER_LOG_PRIO_INFO
= 1
702 CLUSTER_LOG_PRIO_SEC
= 2
703 CLUSTER_LOG_PRIO_WARN
= 3
704 CLUSTER_LOG_PRIO_ERROR
= 4
706 def __init__(self
, module_name
, py_modules_ptr
, this_ptr
):
707 self
.module_name
= module_name
708 super(MgrModule
, self
).__init
__(py_modules_ptr
, this_ptr
)
710 for o
in self
.MODULE_OPTIONS
:
713 # we'll assume the declared type matches the
714 # supplied default value's type.
715 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
717 # module not declaring it's type, so normalize the
718 # default value to be a string for consistent behavior
719 # with default and user-supplied option values.
720 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
722 mgr_level
= self
.get_ceph_option("debug_mgr")
723 log_level
= self
.get_module_option("log_level")
724 cluster_level
= self
.get_module_option('log_to_cluster_level')
725 log_to_file
= self
.get_module_option("log_to_file")
726 log_to_cluster
= self
.get_module_option("log_to_cluster")
727 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
728 log_to_file
, log_to_cluster
)
730 # for backwards compatibility
731 self
._logger
= self
.getLogger()
733 self
._version
= self
._ceph
_get
_version
()
735 self
._perf
_schema
_cache
= None
737 # Keep a librados instance for those that need it.
742 self
._unconfigure
_logging
()
745 def _register_options(cls
, module_name
):
746 cls
.MODULE_OPTIONS
.append(
747 Option(name
='log_level', type='str', default
="", runtime
=True,
748 enum_allowed
=['info', 'debug', 'critical', 'error',
750 cls
.MODULE_OPTIONS
.append(
751 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
752 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
753 cls
.MODULE_OPTIONS
.append(
754 Option(name
='log_to_cluster', type='bool', default
=False,
756 cls
.MODULE_OPTIONS
.append(
757 Option(name
='log_to_cluster_level', type='str', default
='info',
759 enum_allowed
=['info', 'debug', 'critical', 'error',
763 def _register_commands(cls
, module_name
):
764 cls
.COMMANDS
.extend(CLICommand
.dump_cmd_list())
770 def cluster_log(self
, channel
, priority
, message
):
772 :param channel: The log channel. This can be 'cluster', 'audit', ...
774 :param priority: The log message priority. This can be
775 CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
776 CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
777 CLUSTER_LOG_PRIO_ERROR.
779 :param message: The message to log.
782 self
._ceph
_cluster
_log
(channel
, priority
, message
)
789 def release_name(self
):
791 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
792 :return: Returns the release name of the Ceph version in lower case.
795 return self
._ceph
_get
_release
_name
()
797 def get_context(self
):
799 :return: a Python capsule containing a C++ CephContext pointer
801 return self
._ceph
_get
_context
()
803 def notify(self
, notify_type
, notify_id
):
805 Called by the ceph-mgr service to notify the Python plugin
806 that new state is available.
808 :param notify_type: string indicating what kind of notification,
809 such as osd_map, mon_map, fs_map, mon_status,
810 health, pg_summary, command, service_map
811 :param notify_id: string (may be empty) that optionally specifies
812 which entity is being notified about. With
813 "command" notifications this is set to the tag
814 ``from send_command``.
818 def _config_notify(self
):
819 # check logging options for changes
820 mgr_level
= self
.get_ceph_option("debug_mgr")
821 module_level
= self
.get_module_option("log_level")
822 cluster_level
= self
.get_module_option("log_to_cluster_level")
823 log_to_file
= self
.get_module_option("log_to_file", False)
824 log_to_cluster
= self
.get_module_option("log_to_cluster", False)
826 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
828 if log_to_file
!= self
.log_to_file
:
830 self
._enable
_file
_log
()
832 self
._disable
_file
_log
()
833 if log_to_cluster
!= self
.log_to_cluster
:
835 self
._enable
_cluster
_log
()
837 self
._disable
_cluster
_log
()
839 # call module subclass implementations
842 def config_notify(self
):
844 Called by the ceph-mgr service to notify the Python plugin
845 that the configuration may have changed. Modules will want to
846 refresh any configuration values stored in config variables.
852 Called by the ceph-mgr service to start any server that
853 is provided by this Python plugin. The implementation
854 of this function should block until ``shutdown`` is called.
856 You *must* implement ``shutdown`` if you implement ``serve``
862 Called by the ceph-mgr service to request that this
863 module drop out of its serve() function. You do not
864 need to implement this if you do not implement serve()
869 addrs
= self
._rados
.get_addrs()
870 self
._rados
.shutdown()
871 self
._ceph
_unregister
_client
(addrs
)
873 def get(self
, data_name
):
875 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
877 :param str data_name: Valid things to fetch are osd_crush_map_text,
878 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
879 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
880 health, mon_status, devices, device <devid>, pg_stats,
881 pool_stats, pg_ready, osd_ping_times.
884 All these structures have their own JSON representations: experiment
885 or look at the C++ ``dump()`` methods to learn about them.
887 return self
._ceph
_get
(data_name
)
889 def _stattype_to_str(self
, stattype
):
891 typeonly
= stattype
& self
.PERFCOUNTER_TYPE_MASK
894 if typeonly
== self
.PERFCOUNTER_LONGRUNAVG
:
895 # this lie matches the DaemonState decoding: only val, no counts
897 if typeonly
== self
.PERFCOUNTER_COUNTER
:
899 if typeonly
== self
.PERFCOUNTER_HISTOGRAM
:
904 def _perfpath_to_path_labels(self
, daemon
, path
):
905 # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]
906 label_names
= ("ceph_daemon",) # type: Tuple[str, ...]
907 labels
= (daemon
,) # type: Tuple[str, ...]
909 if daemon
.startswith('rbd-mirror.'):
911 r
'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
915 path
= 'rbd_mirror_image_' + match
.group(4)
916 pool
= match
.group(1)
917 namespace
= match
.group(2) or ''
918 image
= match
.group(3)
919 label_names
+= ('pool', 'namespace', 'image')
920 labels
+= (pool
, namespace
, image
)
922 return path
, label_names
, labels
,
924 def _perfvalue_to_value(self
, stattype
, value
):
925 if stattype
& self
.PERFCOUNTER_TIME
:
926 # Convert from ns to seconds
927 return value
/ 1000000000.0
931 def _unit_to_str(self
, unit
):
932 if unit
== self
.NONE
:
934 elif unit
== self
.BYTES
:
938 def to_pretty_iec(n
):
939 for bits
, suffix
in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
940 (20, 'Mi'), (10, 'Ki')]:
942 return str(n
>> bits
) + ' ' + suffix
946 def get_pretty_row(elems
, width
):
948 Takes an array of elements and returns a string with those elements
949 formatted as a table row. Useful for polling modules.
951 :param elems: the elements to be printed
952 :param width: the width of the terminal
955 column_width
= int(width
/ n
)
959 ret
+= '{0:>{w}} |'.format(elem
, w
=column_width
- 2)
963 def get_pretty_header(self
, elems
, width
):
965 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
967 :param elems: the elements to be printed
968 :param width: the width of the terminal
971 column_width
= int(width
/ n
)
975 for i
in range(0, n
):
976 ret
+= '-' * (column_width
- 1) + '+'
980 ret
+= self
.get_pretty_row(elems
, width
)
985 for i
in range(0, n
):
986 ret
+= '-' * (column_width
- 1) + '+'
991 def get_server(self
, hostname
):
993 Called by the plugin to fetch metadata about a particular hostname from
996 This is information that ceph-mgr has gleaned from the daemon metadata
997 reported by daemons running on a particular server.
999 :param hostname: a hostname
1001 return self
._ceph
_get
_server
(hostname
)
1003 def get_perf_schema(self
, svc_type
, svc_name
):
1005 Called by the plugin to fetch perf counter schema info.
1006 svc_name can be nullptr, as can svc_type, in which case
1009 :param str svc_type:
1010 :param str svc_name:
1011 :return: list of dicts describing the counters requested
1013 return self
._ceph
_get
_perf
_schema
(svc_type
, svc_name
)
1015 def get_counter(self
, svc_type
, svc_name
, path
):
1017 Called by the plugin to fetch the latest performance counter data for a
1018 particular counter on a particular service.
1020 :param str svc_type:
1021 :param str svc_name:
1022 :param str path: a period-separated concatenation of the subsystem and the
1023 counter name, for example "mds.inodes".
1024 :return: A list of two-tuples of (timestamp, value) is returned. This may be
1025 empty if no data is available.
1027 return self
._ceph
_get
_counter
(svc_type
, svc_name
, path
)
1029 def get_latest_counter(self
, svc_type
, svc_name
, path
):
1031 Called by the plugin to fetch only the newest performance counter data
1032 pointfor a particular counter on a particular service.
1034 :param str svc_type:
1035 :param str svc_name:
1036 :param str path: a period-separated concatenation of the subsystem and the
1037 counter name, for example "mds.inodes".
1038 :return: A list of two-tuples of (timestamp, value) is returned. This may be
1039 empty if no data is available.
1041 return self
._ceph
_get
_latest
_counter
(svc_type
, svc_name
, path
)
1043 def list_servers(self
):
1045 Like ``get_server``, but gives information about all servers (i.e. all
1046 unique hostnames that have been mentioned in daemon metadata)
1048 :return: a list of information about all servers
1051 return self
._ceph
_get
_server
(None)
1053 def get_metadata(self
, svc_type
, svc_id
):
1055 Fetch the daemon metadata for a particular service.
1057 ceph-mgr fetches metadata asynchronously, so are windows of time during
1058 addition/removal of services where the metadata is not available to
1059 modules. ``None`` is returned if no metadata is available.
1061 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1062 :param str svc_id: service id. convert OSD integer IDs to strings when
1064 :rtype: dict, or None if no metadata found
1066 return self
._ceph
_get
_metadata
(svc_type
, svc_id
)
1068 def get_daemon_status(self
, svc_type
, svc_id
):
1070 Fetch the latest status for a particular service daemon.
1072 This method may return ``None`` if no status information is
1073 available, for example because the daemon hasn't fully started yet.
1075 :param svc_type: string (e.g., 'rgw')
1076 :param svc_id: string
1077 :return: dict, or None if the service is not found
1079 return self
._ceph
_get
_daemon
_status
(svc_type
, svc_id
)
1081 def mon_command(self
, cmd_dict
):
1083 Helper for modules that do simple, synchronous mon command
1086 See send_command for general case.
1088 :return: status int, out std, err str
1092 result
= CommandResult()
1093 self
.send_command(result
, "mon", "", json
.dumps(cmd_dict
), "")
1097 self
.log
.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1098 cmd_dict
['prefix'], r
[0], t2
- t1
1103 def send_command(self
, *args
, **kwargs
):
1105 Called by the plugin to send a command to the mon
1108 :param CommandResult result: an instance of the ``CommandResult``
1109 class, defined in the same module as MgrModule. This acts as a
1110 completion and stores the output of the command. Use
1111 ``CommandResult.wait()`` if you want to block on completion.
1112 :param str svc_type:
1114 :param str command: a JSON-serialized command. This uses the same
1115 format as the ceph command line, which is a dictionary of command
1116 arguments, with the extra ``prefix`` key containing the command
1117 name itself. Consult MonCommands.h for available commands and
1118 their expected arguments.
1119 :param str tag: used for nonblocking operation: when a command
1120 completes, the ``notify()`` callback on the MgrModule instance is
1121 triggered, with notify_type set to "command", and notify_id set to
1122 the tag of the command.
1124 self
._ceph
_send
_command
(*args
, **kwargs
)
1126 def set_health_checks(self
, checks
):
1128 Set the module's current map of health checks. Argument is a
1129 dict of check names to info, in this form:
1135 'severity': 'warning', # or 'error'
1136 'summary': 'summary string',
1137 'count': 4, # quantify badness
1138 'detail': [ 'list', 'of', 'detail', 'strings' ],
1141 'severity': 'error',
1142 'summary': 'bars are bad',
1143 'detail': [ 'too hard' ],
1147 :param list: dict of health check dicts
1149 self
._ceph
_set
_health
_checks
(checks
)
1151 def _handle_command(self
, inbuf
, cmd
):
1152 if cmd
['prefix'] not in CLICommand
.COMMANDS
:
1153 return self
.handle_command(inbuf
, cmd
)
1155 return CLICommand
.COMMANDS
[cmd
['prefix']].call(self
, cmd
, inbuf
)
1157 def handle_command(self
, inbuf
, cmd
):
1159 Called by ceph-mgr to request the plugin to handle one
1160 of the commands that it declared in self.COMMANDS
1162 Return a status code, an output buffer, and an
1163 output string. The output buffer is for data results,
1164 the output string is for informative text.
1166 :param inbuf: content of any "-i <file>" supplied to ceph cli
1168 :param cmd: from Ceph's cmdmap_t
1171 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1174 # Should never get called if they didn't declare
1176 raise NotImplementedError()
1178 def get_mgr_id(self
):
1180 Retrieve the name of the manager daemon where this plugin
1181 is currently being executed (i.e. the active manager).
1185 return self
._ceph
_get
_mgr
_id
()
1187 def get_ceph_option(self
, key
):
1188 return self
._ceph
_get
_option
(key
)
1190 def _validate_module_option(self
, key
):
1192 Helper: don't allow get/set config callers to
1193 access config options that they didn't declare
1196 if key
not in [o
['name'] for o
in self
.MODULE_OPTIONS
]:
1197 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1198 format(key
, self
.__class
__.__name
__))
1200 def _get_module_option(self
, key
, default
, localized_prefix
=""):
1201 r
= self
._ceph
_get
_module
_option
(self
.module_name
, key
,
1204 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
1208 def get_module_option(self
, key
, default
=None):
1210 Retrieve the value of a persistent configuration setting
1216 self
._validate
_module
_option
(key
)
1217 return self
._get
_module
_option
(key
, default
)
1219 def get_module_option_ex(self
, module
, key
, default
=None):
1221 Retrieve the value of a persistent configuration setting
1222 for the specified module.
1224 :param str module: The name of the module, e.g. 'dashboard'
1226 :param str key: The configuration key, e.g. 'server_addr'.
1227 :param str,None default: The default value to use when the
1228 returned value is ``None``. Defaults to ``None``.
1229 :return: str,int,bool,float,None
1231 if module
== self
.module_name
:
1232 self
._validate
_module
_option
(key
)
1233 r
= self
._ceph
_get
_module
_option
(module
, key
)
1234 return default
if r
is None else r
1236 def get_store_prefix(self
, key_prefix
):
1238 Retrieve a dict of KV store keys to values, where the keys
1239 have the given prefix
1241 :param str key_prefix:
1244 return self
._ceph
_get
_store
_prefix
(key_prefix
)
1246 def _set_localized(self
, key
, val
, setter
):
1247 return setter(_get_localized_key(self
.get_mgr_id(), key
), val
)
1249 def get_localized_module_option(self
, key
, default
=None):
1251 Retrieve localized configuration for this ceph-mgr instance
1256 self
._validate
_module
_option
(key
)
1257 return self
._get
_module
_option
(key
, default
, self
.get_mgr_id())
1259 def _set_module_option(self
, key
, val
):
1260 return self
._ceph
_set
_module
_option
(self
.module_name
, key
,
1261 None if val
is None else str(val
))
1263 def set_module_option(self
, key
, val
):
1265 Set the value of a persistent configuration setting
1268 :type val: str | None
1270 self
._validate
_module
_option
(key
)
1271 return self
._set
_module
_option
(key
, val
)
1273 def set_module_option_ex(self
, module
, key
, val
):
1275 Set the value of a persistent configuration setting
1276 for the specified module.
1282 if module
== self
.module_name
:
1283 self
._validate
_module
_option
(key
)
1284 return self
._ceph
_set
_module
_option
(module
, key
, str(val
))
1286 def set_localized_module_option(self
, key
, val
):
1288 Set localized configuration for this ceph-mgr instance
1293 self
._validate
_module
_option
(key
)
1294 return self
._set
_localized
(key
, val
, self
._set
_module
_option
)
1296 def set_store(self
, key
, val
):
1298 Set a value in this module's persistent key value store.
1299 If val is None, remove key from store
1304 self
._ceph
_set
_store
(key
, val
)
1306 def get_store(self
, key
, default
=None):
1308 Get a value from this module's persistent key value store
1310 r
= self
._ceph
_get
_store
(key
)
1316 def get_localized_store(self
, key
, default
=None):
1317 r
= self
._ceph
_get
_store
(_get_localized_key(self
.get_mgr_id(), key
))
1319 r
= self
._ceph
_get
_store
(key
)
1324 def set_localized_store(self
, key
, val
):
1325 return self
._set
_localized
(key
, val
, self
.set_store
)
1327 def self_test(self
):
1329 Run a self-test on the module. Override this function and implement
1330 a best as possible self-test for (automated) testing of the module
1332 Indicate any failures by raising an exception. This does not have
1333 to be pretty, it's mainly for picking up regressions during
1334 development, rather than use in the field.
1336 :return: None, or an advisory string for developer interest, such
1337 as a json dump of some state.
1341 def get_osdmap(self
):
1343 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1347 return self
._ceph
_get
_osdmap
()
1349 def get_latest(self
, daemon_type
, daemon_name
, counter
):
1350 data
= self
.get_latest_counter(
1351 daemon_type
, daemon_name
, counter
)[counter
]
1357 def get_latest_avg(self
, daemon_type
, daemon_name
, counter
):
1358 data
= self
.get_latest_counter(
1359 daemon_type
, daemon_name
, counter
)[counter
]
1361 return data
[1], data
[2]
1365 def get_all_perf_counters(self
, prio_limit
=PRIO_USEFUL
,
1366 services
=("mds", "mon", "osd",
1367 "rbd-mirror", "rgw", "tcmu-runner")):
1369 Return the perf counters currently known to this ceph-mgr
1370 instance, filtered by priority equal to or greater than `prio_limit`.
1372 The result is a map of string to dict, associating services
1373 (like "osd.123") with their counters. The counter
1374 dict for each service maps counter paths to a counter
1375 info structure, which is the information from
1376 the schema, plus an additional "value" member with the latest
1380 result
= defaultdict(dict) # type: Dict[str, dict]
1382 for server
in self
.list_servers():
1383 for service
in server
['services']:
1384 if service
['type'] not in services
:
1387 schema
= self
.get_perf_schema(service
['type'], service
['id'])
1389 self
.log
.warn("No perf counter schema for {0}.{1}".format(
1390 service
['type'], service
['id']
1394 # Value is returned in a potentially-multi-service format,
1395 # get just the service we're asking about
1396 svc_full_name
= "{0}.{1}".format(
1397 service
['type'], service
['id'])
1398 schema
= schema
[svc_full_name
]
1400 # Populate latest values
1401 for counter_path
, counter_schema
in schema
.items():
1402 # self.log.debug("{0}: {1}".format(
1403 # counter_path, json.dumps(counter_schema)
1405 if counter_schema
['priority'] < prio_limit
:
1408 counter_info
= dict(counter_schema
)
1410 # Also populate count for the long running avgs
1411 if counter_schema
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
1412 v
, c
= self
.get_latest_avg(
1417 counter_info
['value'], counter_info
['count'] = v
, c
1418 result
[svc_full_name
][counter_path
] = counter_info
1420 counter_info
['value'] = self
.get_latest(
1426 result
[svc_full_name
][counter_path
] = counter_info
1428 self
.log
.debug("returning {0} counter".format(len(result
)))
1432 def set_uri(self
, uri
):
1434 If the module exposes a service, then call this to publish the
1435 address once it is available.
1439 return self
._ceph
_set
_uri
(uri
)
1441 def have_mon_connection(self
):
1443 Check whether this ceph-mgr daemon has an open connection
1444 to a monitor. If it doesn't, then it's likely that the
1445 information we have about the cluster is out of date,
1446 and/or the monitor cluster is down.
1449 return self
._ceph
_have
_mon
_connection
()
1451 def update_progress_event(self
, evid
, desc
, progress
):
1452 return self
._ceph
_update
_progress
_event
(str(evid
),
1456 def complete_progress_event(self
, evid
):
1457 return self
._ceph
_complete
_progress
_event
(str(evid
))
1459 def clear_all_progress_events(self
):
1460 return self
._ceph
_clear
_all
_progress
_events
()
1465 A librados instance to be shared by any classes within
1466 this mgr module that want one.
1471 ctx_capsule
= self
.get_context()
1472 self
._rados
= rados
.Rados(context
=ctx_capsule
)
1473 self
._rados
.connect()
1474 self
._ceph
_register
_client
(self
._rados
.get_addrs())
1480 Implement this function to report whether the module's dependencies
1481 are met. For example, if the module needs to import a particular
1482 dependency to work, then use a try/except around the import at
1483 file scope, and then report here if the import failed.
1485 This will be called in a blocking way from the C++ code, so do not
1486 do any I/O that could block in this function.
1488 :return a 2-tuple consisting of a boolean and explanatory string
1493 def remote(self
, module_name
, method_name
, *args
, **kwargs
):
1495 Invoke a method on another module. All arguments, and the return
1496 value from the other module must be serializable.
1498 Limitation: Do not import any modules within the called method.
1499 Otherwise you will get an error in Python 2::
1501 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1505 :param module_name: Name of other module. If module isn't loaded,
1506 an ImportError exception is raised.
1507 :param method_name: Method name. If it does not exist, a NameError
1508 exception is raised.
1509 :param args: Argument tuple
1510 :param kwargs: Keyword argument dict
1511 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1512 :raises ImportError: No such module
1514 return self
._ceph
_dispatch
_remote
(module_name
, method_name
,
1517 def add_osd_perf_query(self
, query
):
1519 Register an OSD perf query. Argument is a
1520 dict of the query parameters, in this form:
1526 {'type': subkey_type, 'regex': regex_pattern},
1529 'performance_counter_descriptors': [
1530 list, of, descriptor, types
1532 'limit': {'order_by': performance_counter_type, 'max_count': n},
1536 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1537 'pg_id', 'object_name', 'snap_id'
1538 Valid performance counter types:
1539 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1540 'latency', 'write_latency', 'read_latency'
1542 :param object query: query
1543 :rtype: int (query id)
1545 return self
._ceph
_add
_osd
_perf
_query
(query
)
1547 def remove_osd_perf_query(self
, query_id
):
1549 Unregister an OSD perf query.
1551 :param int query_id: query ID
1553 return self
._ceph
_remove
_osd
_perf
_query
(query_id
)
1555 def get_osd_perf_counters(self
, query_id
):
1557 Get stats collected for an OSD perf query.
1559 :param int query_id: query ID
1561 return self
._ceph
_get
_osd
_perf
_counters
(query_id
)
1563 def is_authorized(self
, arguments
):
1565 Verifies that the current session caps permit executing the py service
1566 or current module with the provided arguments. This provides a generic
1567 way to allow modules to restrict by more fine-grained controls (e.g.
1570 :param arguments: dict of key/value arguments to test
1572 return self
._ceph
_is
_authorized
(arguments
)