]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
554310d9813fc6713866de059f36e150ec3271c6
1 import ceph_module
# noqa
4 from typing
import Set
, Tuple
, Iterator
, Any
, Dict
, Optional
, Callable
, List
6 # just for type checking
13 from collections
import defaultdict
, namedtuple
51 class CommandResult(object):
53 Use with MgrModule.send_command
56 def __init__(self
, tag
=None):
57 self
.ev
= threading
.Event()
62 # This is just a convenience for notifications from
63 # C++ land, to avoid passing addresses around in messages.
64 self
.tag
= tag
if tag
else ""
66 def complete(self
, r
, outb
, outs
):
74 return self
.r
, self
.outb
, self
.outs
77 class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
78 def __new__(cls
, retval
=0, stdout
="", stderr
=""):
80 Tuple containing the result of `handle_command()`
82 Only write to stderr if there is an error, or in extraordinary circumstances
84 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
85 is critical information to include there.
87 Everything programmatically consumable should be put on stdout
89 :param retval: return code. E.g. 0 or -errno.EINVAL
91 :param stdout: data of this result.
93 :param stderr: Typically used for error messages.
96 return super(HandleCommandResult
, cls
).__new
__(cls
, retval
, stdout
, stderr
)
99 class MonCommandFailed(RuntimeError): pass
102 class OSDMap(ceph_module
.BasePyOSDMap
):
104 return self
._get
_epoch
()
106 def get_crush_version(self
):
107 return self
._get
_crush
_version
()
113 # FIXME: efficient implementation
115 return dict([(p
['pool'], p
) for p
in d
['pools']])
117 def get_pools_by_name(self
):
118 # FIXME: efficient implementation
120 return dict([(p
['pool_name'], p
) for p
in d
['pools']])
122 def new_incremental(self
):
123 return self
._new
_incremental
()
125 def apply_incremental(self
, inc
):
126 return self
._apply
_incremental
(inc
)
129 return self
._get
_crush
()
131 def get_pools_by_take(self
, take
):
132 return self
._get
_pools
_by
_take
(take
).get('pools', [])
134 def calc_pg_upmaps(self
, inc
,
135 max_deviation
=.01, max_iterations
=10, pools
=None):
138 return self
._calc
_pg
_upmaps
(
140 max_deviation
, max_iterations
, pools
)
142 def map_pool_pgs_up(self
, poolid
):
143 return self
._map
_pool
_pgs
_up
(poolid
)
145 def pg_to_up_acting_osds(self
, pool_id
, ps
):
146 return self
._pg
_to
_up
_acting
_osds
(pool_id
, ps
)
148 def pool_raw_used_rate(self
, pool_id
):
149 return self
._pool
_raw
_used
_rate
(pool_id
)
151 def get_ec_profile(self
, name
):
152 # FIXME: efficient implementation
154 return d
['erasure_code_profiles'].get(name
, None)
156 def get_require_osd_release(self
):
158 return d
['require_osd_release']
161 class OSDMapIncremental(ceph_module
.BasePyOSDMapIncremental
):
163 return self
._get
_epoch
()
168 def set_osd_reweights(self
, weightmap
):
170 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
172 return self
._set
_osd
_reweights
(weightmap
)
174 def set_crush_compat_weight_set_weights(self
, weightmap
):
176 weightmap is a dict, int to float. devices only. e.g.,
177 { 0: 3.4, 1: 3.3, 2: 3.334 }
179 return self
._set
_crush
_compat
_weight
_set
_weights
(weightmap
)
182 class CRUSHMap(ceph_module
.BasePyCRUSH
):
183 ITEM_NONE
= 0x7fffffff
184 DEFAULT_CHOOSE_ARGS
= '-1'
189 def get_item_weight(self
, item
):
190 return self
._get
_item
_weight
(item
)
192 def get_item_name(self
, item
):
193 return self
._get
_item
_name
(item
)
195 def find_takes(self
):
196 return self
._find
_takes
().get('takes', [])
198 def get_take_weight_osd_map(self
, root
):
199 uglymap
= self
._get
_take
_weight
_osd
_map
(root
)
200 return {int(k
): v
for k
, v
in six
.iteritems(uglymap
.get('weights', {}))}
203 def have_default_choose_args(dump
):
204 return CRUSHMap
.DEFAULT_CHOOSE_ARGS
in dump
.get('choose_args', {})
207 def get_default_choose_args(dump
):
208 return dump
.get('choose_args').get(CRUSHMap
.DEFAULT_CHOOSE_ARGS
, [])
210 def get_rule(self
, rule_name
):
211 # TODO efficient implementation
212 for rule
in self
.dump()['rules']:
213 if rule_name
== rule
['rule_name']:
218 def get_rule_by_id(self
, rule_id
):
219 for rule
in self
.dump()['rules']:
220 if rule
['rule_id'] == rule_id
:
225 def get_rule_root(self
, rule_name
):
226 rule
= self
.get_rule(rule_name
)
231 first_take
= [s
for s
in rule
['steps'] if s
['op'] == 'take'][0]
233 logging
.warning("CRUSH rule '{0}' has no 'take' step".format(
237 return first_take
['item']
239 def get_osds_under(self
, root_id
):
240 # TODO don't abuse dump like this
242 buckets
= dict([(b
['id'], b
) for b
in d
['buckets']])
247 for item
in b
['items']:
249 osd_list
.append(item
['id'])
252 accumulate(buckets
[item
['id']])
256 accumulate(buckets
[root_id
])
260 def device_class_counts(self
):
261 result
= defaultdict(int) # type: Dict[str, int]
262 # TODO don't abuse dump like this
264 for device
in d
['devices']:
265 cls
= device
.get('class', None)
271 class CLICommand(object):
272 COMMANDS
= {} # type: Dict[str, CLICommand]
274 def __init__(self
, prefix
, args
="", desc
="", perm
="rw"):
280 self
.func
= None # type: Optional[Callable]
283 def _parse_args(self
):
286 args
= self
.args
.split(" ")
288 arg_desc
= arg
.strip().split(",")
295 self
.args_dict
[v
] = arg_d
297 def __call__(self
, func
):
299 self
.COMMANDS
[self
.prefix
] = self
302 def call(self
, mgr
, cmd_dict
, inbuf
):
304 for a
, d
in self
.args_dict
.items():
305 if 'req' in d
and d
['req'] == "false" and a
not in cmd_dict
:
307 kwargs
[a
.replace("-", "_")] = cmd_dict
[a
]
309 kwargs
['inbuf'] = inbuf
311 return self
.func(mgr
, **kwargs
)
315 'cmd': '{} {}'.format(self
.prefix
, self
.args
),
321 def dump_cmd_list(cls
):
322 return [cmd
.dump_cmd() for cmd
in cls
.COMMANDS
.values()]
325 def CLIReadCommand(prefix
, args
="", desc
=""):
326 return CLICommand(prefix
, args
, desc
, "r")
329 def CLIWriteCommand(prefix
, args
="", desc
=""):
330 return CLICommand(prefix
, args
, desc
, "w")
333 def _get_localized_key(prefix
, key
):
334 return '{}/{}'.format(prefix
, key
)
339 Helper class to declare options for MODULE_OPTIONS list.
341 Caveat: it uses argument names matching Python keywords (type, min, max),
342 so any further processing should happen in a separate method.
344 TODO: type validation.
351 desc
=None, longdesc
=None,
358 super(Option
, self
).__init
__(
359 (k
, v
) for k
, v
in vars().items()
360 if k
!= 'self' and v
is not None)
364 Helper class to declare options for COMMANDS list.
366 It also allows to specify prefix and args separately, as well as storing a
370 >>> Command(prefix="example",
371 ... args="name=arg,type=CephInt",
374 {'poll': False, 'cmd': 'example name=arg,type=CephInt', 'perm': 'w', 'desc': 'Blah'}
386 super(Command
, self
).__init
__(
387 cmd
=prefix
+ (' ' + args
if args
else ''),
393 self
.handler
= handler
395 def register(self
, instance
=False):
397 Register a CLICommand handler. It allows an instance to register bound
398 methods. In that case, the mgr instance is not passed, and it's expected
399 to be available in the class instance.
400 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
409 func
=lambda mgr
, *args
, **kwargs
: HandleCommandResult(*self
.handler(
410 *((instance
or mgr
,) + args
), **kwargs
))
414 class CPlusPlusHandler(logging
.Handler
):
415 def __init__(self
, module_inst
):
416 super(CPlusPlusHandler
, self
).__init
__()
417 self
._module
= module_inst
418 self
.setFormatter(logging
.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
419 .format(module_inst
.module_name
)))
421 def emit(self
, record
):
422 if record
.levelno
>= self
.level
:
423 self
._module
._ceph
_log
(self
.format(record
))
425 class ClusterLogHandler(logging
.Handler
):
426 def __init__(self
, module_inst
):
428 self
._module
= module_inst
429 self
.setFormatter(logging
.Formatter("%(message)s"))
431 def emit(self
, record
):
433 'DEBUG': MgrModule
.CLUSTER_LOG_PRIO_DEBUG
,
434 'INFO': MgrModule
.CLUSTER_LOG_PRIO_INFO
,
435 'WARNING': MgrModule
.CLUSTER_LOG_PRIO_WARN
,
436 'ERROR': MgrModule
.CLUSTER_LOG_PRIO_ERROR
,
437 'CRITICAL': MgrModule
.CLUSTER_LOG_PRIO_ERROR
,
439 level
= levelmap
[record
.levelname
]
440 if record
.levelno
>= self
.level
:
441 self
._module
.cluster_log(self
._module
.module_name
,
445 class FileHandler(logging
.FileHandler
):
446 def __init__(self
, module_inst
):
447 path
= module_inst
.get_ceph_option("log_file")
448 idx
= path
.rfind(".log")
450 self
.path
= "{}.{}.log".format(path
[:idx
], module_inst
.module_name
)
452 self
.path
= "{}.{}".format(path
, module_inst
.module_name
)
453 super(FileHandler
, self
).__init
__(self
.path
, delay
=True)
454 self
.setFormatter(logging
.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
457 class MgrModuleLoggingMixin(object):
458 def _configure_logging(self
, mgr_level
, module_level
, cluster_level
,
459 log_to_file
, log_to_cluster
):
460 self
._mgr
_level
= None
461 self
._module
_level
= None
462 self
._root
_logger
= logging
.getLogger()
464 self
._unconfigure
_logging
()
466 # the ceph log handler is initialized only once
467 self
._mgr
_log
_handler
= CPlusPlusHandler(self
)
468 self
._cluster
_log
_handler
= ClusterLogHandler(self
)
469 self
._file
_log
_handler
= FileHandler(self
)
471 self
.log_to_file
= log_to_file
472 self
.log_to_cluster
= log_to_cluster
474 self
._root
_logger
.addHandler(self
._mgr
_log
_handler
)
476 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
478 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
480 self
._root
_logger
.setLevel(logging
.NOTSET
)
481 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
484 def _unconfigure_logging(self
):
485 # remove existing handlers:
487 h
for h
in self
._root
_logger
.handlers
if isinstance(h
, CPlusPlusHandler
) or isinstance(h
, FileHandler
) or isinstance(h
, ClusterLogHandler
)]
488 for h
in rm_handlers
:
489 self
._root
_logger
.removeHandler(h
)
490 self
.log_to_file
= False
491 self
.log_to_cluster
= False
493 def _set_log_level(self
, mgr_level
, module_level
, cluster_level
):
494 self
._cluster
_log
_handler
.setLevel(cluster_level
.upper())
496 module_level
= module_level
.upper() if module_level
else ''
497 if not self
._module
_level
:
498 # using debug_mgr level
499 if not module_level
and self
._mgr
_level
== mgr_level
:
500 # no change in module level neither in debug_mgr
503 if self
._module
_level
== module_level
:
504 # no change in module level
507 if not self
._module
_level
and not module_level
:
508 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
509 self
.getLogger().debug("setting log level based on debug_mgr: %s (%s)", level
, mgr_level
)
510 elif self
._module
_level
and not module_level
:
511 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
512 self
.getLogger().warning("unsetting module log level, falling back to "
513 "debug_mgr level: %s (%s)", level
, mgr_level
)
516 self
.getLogger().debug("setting log level: %s", level
)
518 self
._module
_level
= module_level
519 self
._mgr
_level
= mgr_level
521 self
._mgr
_log
_handler
.setLevel(level
)
522 self
._file
_log
_handler
.setLevel(level
)
524 def _enable_file_log(self
):
526 self
.getLogger().warning("enabling logging to file")
527 self
.log_to_file
= True
528 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
530 def _disable_file_log(self
):
532 self
.getLogger().warning("disabling logging to file")
533 self
.log_to_file
= False
534 self
._root
_logger
.removeHandler(self
._file
_log
_handler
)
536 def _enable_cluster_log(self
):
538 self
.getLogger().warning("enabling logging to cluster")
539 self
.log_to_cluster
= True
540 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
542 def _disable_cluster_log(self
):
543 # disable cluster log
544 self
.getLogger().warning("disabling logging to cluster")
545 self
.log_to_cluster
= False
546 self
._root
_logger
.removeHandler(self
._cluster
_log
_handler
)
548 def _ceph_log_level_to_python(self
, ceph_log_level
):
551 ceph_log_level
= int(ceph_log_level
.split("/", 1)[0])
558 if ceph_log_level
<= 0:
559 log_level
= "CRITICAL"
560 elif ceph_log_level
<= 1:
561 log_level
= "WARNING"
562 elif ceph_log_level
<= 4:
566 def getLogger(self
, name
=None):
567 return logging
.getLogger(name
)
570 class MgrStandbyModule(ceph_module
.BaseMgrStandbyModule
, MgrModuleLoggingMixin
):
572 Standby modules only implement a serve and shutdown method, they
573 are not permitted to implement commands and they do not receive
576 They only have access to the mgrmap (for accessing service URI info
577 from their active peer), and to configuration settings (read only).
580 MODULE_OPTIONS
= [] # type: List[Dict[str, Any]]
581 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
583 def __init__(self
, module_name
, capsule
):
584 super(MgrStandbyModule
, self
).__init
__(capsule
)
585 self
.module_name
= module_name
587 # see also MgrModule.__init__()
588 for o
in self
.MODULE_OPTIONS
:
591 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
593 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
595 mgr_level
= self
.get_ceph_option("debug_mgr")
596 log_level
= self
.get_module_option("log_level")
597 cluster_level
= self
.get_module_option('log_to_cluster_level')
598 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
601 # for backwards compatibility
602 self
._logger
= self
.getLogger()
605 self
._unconfigure
_logging
()
608 def _register_options(cls
, module_name
):
609 cls
.MODULE_OPTIONS
.append(
610 Option(name
='log_level', type='str', default
="", runtime
=True,
611 enum_allowed
=['info', 'debug', 'critical', 'error',
613 cls
.MODULE_OPTIONS
.append(
614 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
615 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
616 cls
.MODULE_OPTIONS
.append(
617 Option(name
='log_to_cluster', type='bool', default
=False,
619 cls
.MODULE_OPTIONS
.append(
620 Option(name
='log_to_cluster_level', type='str', default
='info',
622 enum_allowed
=['info', 'debug', 'critical', 'error',
631 The serve method is mandatory for standby modules.
634 raise NotImplementedError()
636 def get_mgr_id(self
):
637 return self
._ceph
_get
_mgr
_id
()
639 def get_module_option(self
, key
, default
=None):
641 Retrieve the value of a persistent configuration setting
644 :param default: the default value of the config if it is not found
647 r
= self
._ceph
_get
_module
_option
(key
)
649 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
653 def get_ceph_option(self
, key
):
654 return self
._ceph
_get
_option
(key
)
656 def get_store(self
, key
):
658 Retrieve the value of a persistent KV store entry
661 :return: Byte string or None
663 return self
._ceph
_get
_store
(key
)
665 def get_active_uri(self
):
666 return self
._ceph
_get
_active
_uri
()
668 def get_localized_module_option(self
, key
, default
=None):
669 r
= self
._ceph
_get
_module
_option
(key
, self
.get_mgr_id())
671 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
676 class MgrModule(ceph_module
.BaseMgrModule
, MgrModuleLoggingMixin
):
677 COMMANDS
= [] # type: List[Any]
678 MODULE_OPTIONS
= [] # type: List[dict]
679 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
681 # Priority definitions for perf counters
685 PRIO_UNINTERESTING
= 2
688 # counter value types
693 PERFCOUNTER_LONGRUNAVG
= 4
694 PERFCOUNTER_COUNTER
= 8
695 PERFCOUNTER_HISTOGRAM
= 0x10
696 PERFCOUNTER_TYPE_MASK
= ~
3
702 # Cluster log priorities
703 CLUSTER_LOG_PRIO_DEBUG
= 0
704 CLUSTER_LOG_PRIO_INFO
= 1
705 CLUSTER_LOG_PRIO_SEC
= 2
706 CLUSTER_LOG_PRIO_WARN
= 3
707 CLUSTER_LOG_PRIO_ERROR
= 4
709 def __init__(self
, module_name
, py_modules_ptr
, this_ptr
):
710 self
.module_name
= module_name
711 super(MgrModule
, self
).__init
__(py_modules_ptr
, this_ptr
)
713 for o
in self
.MODULE_OPTIONS
:
716 # we'll assume the declared type matches the
717 # supplied default value's type.
718 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
720 # module not declaring it's type, so normalize the
721 # default value to be a string for consistent behavior
722 # with default and user-supplied option values.
723 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
725 mgr_level
= self
.get_ceph_option("debug_mgr")
726 log_level
= self
.get_module_option("log_level")
727 cluster_level
= self
.get_module_option('log_to_cluster_level')
728 log_to_file
= self
.get_module_option("log_to_file")
729 log_to_cluster
= self
.get_module_option("log_to_cluster")
730 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
731 log_to_file
, log_to_cluster
)
733 # for backwards compatibility
734 self
._logger
= self
.getLogger()
736 self
._version
= self
._ceph
_get
_version
()
738 self
._perf
_schema
_cache
= None
740 # Keep a librados instance for those that need it.
745 self
._unconfigure
_logging
()
748 def _register_options(cls
, module_name
):
749 cls
.MODULE_OPTIONS
.append(
750 Option(name
='log_level', type='str', default
="", runtime
=True,
751 enum_allowed
=['info', 'debug', 'critical', 'error',
753 cls
.MODULE_OPTIONS
.append(
754 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
755 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
756 cls
.MODULE_OPTIONS
.append(
757 Option(name
='log_to_cluster', type='bool', default
=False,
759 cls
.MODULE_OPTIONS
.append(
760 Option(name
='log_to_cluster_level', type='str', default
='info',
762 enum_allowed
=['info', 'debug', 'critical', 'error',
766 def _register_commands(cls
, module_name
):
767 cls
.COMMANDS
.extend(CLICommand
.dump_cmd_list())
773 def cluster_log(self
, channel
, priority
, message
):
775 :param channel: The log channel. This can be 'cluster', 'audit', ...
777 :param priority: The log message priority. This can be
778 CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
779 CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
780 CLUSTER_LOG_PRIO_ERROR.
782 :param message: The message to log.
785 self
._ceph
_cluster
_log
(channel
, priority
, message
)
792 def release_name(self
):
794 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
795 :return: Returns the release name of the Ceph version in lower case.
798 return self
._ceph
_get
_release
_name
()
800 def get_context(self
):
802 :return: a Python capsule containing a C++ CephContext pointer
804 return self
._ceph
_get
_context
()
806 def notify(self
, notify_type
, notify_id
):
808 Called by the ceph-mgr service to notify the Python plugin
809 that new state is available.
811 :param notify_type: string indicating what kind of notification,
812 such as osd_map, mon_map, fs_map, mon_status,
813 health, pg_summary, command, service_map
814 :param notify_id: string (may be empty) that optionally specifies
815 which entity is being notified about. With
816 "command" notifications this is set to the tag
817 ``from send_command``.
821 def _config_notify(self
):
822 # check logging options for changes
823 mgr_level
= self
.get_ceph_option("debug_mgr")
824 module_level
= self
.get_module_option("log_level")
825 cluster_level
= self
.get_module_option("log_to_cluster_level")
826 log_to_file
= self
.get_module_option("log_to_file", False)
827 log_to_cluster
= self
.get_module_option("log_to_cluster", False)
829 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
831 if log_to_file
!= self
.log_to_file
:
833 self
._enable
_file
_log
()
835 self
._disable
_file
_log
()
836 if log_to_cluster
!= self
.log_to_cluster
:
838 self
._enable
_cluster
_log
()
840 self
._disable
_cluster
_log
()
842 # call module subclass implementations
845 def config_notify(self
):
847 Called by the ceph-mgr service to notify the Python plugin
848 that the configuration may have changed. Modules will want to
849 refresh any configuration values stored in config variables.
855 Called by the ceph-mgr service to start any server that
856 is provided by this Python plugin. The implementation
857 of this function should block until ``shutdown`` is called.
859 You *must* implement ``shutdown`` if you implement ``serve``
865 Called by the ceph-mgr service to request that this
866 module drop out of its serve() function. You do not
867 need to implement this if you do not implement serve()
872 addrs
= self
._rados
.get_addrs()
873 self
._rados
.shutdown()
874 self
._ceph
_unregister
_client
(addrs
)
876 def get(self
, data_name
):
878 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
880 :param str data_name: Valid things to fetch are osd_crush_map_text,
881 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
882 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
883 health, mon_status, devices, device <devid>, pg_stats,
884 pool_stats, pg_ready, osd_ping_times.
887 All these structures have their own JSON representations: experiment
888 or look at the C++ ``dump()`` methods to learn about them.
890 return self
._ceph
_get
(data_name
)
892 def _stattype_to_str(self
, stattype
):
894 typeonly
= stattype
& self
.PERFCOUNTER_TYPE_MASK
897 if typeonly
== self
.PERFCOUNTER_LONGRUNAVG
:
898 # this lie matches the DaemonState decoding: only val, no counts
900 if typeonly
== self
.PERFCOUNTER_COUNTER
:
902 if typeonly
== self
.PERFCOUNTER_HISTOGRAM
:
907 def _perfpath_to_path_labels(self
, daemon
, path
):
908 # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]
909 label_names
= ("ceph_daemon",) # type: Tuple[str, ...]
910 labels
= (daemon
,) # type: Tuple[str, ...]
912 if daemon
.startswith('rbd-mirror.'):
914 r
'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
918 path
= 'rbd_mirror_image_' + match
.group(4)
919 pool
= match
.group(1)
920 namespace
= match
.group(2) or ''
921 image
= match
.group(3)
922 label_names
+= ('pool', 'namespace', 'image')
923 labels
+= (pool
, namespace
, image
)
925 return path
, label_names
, labels
,
927 def _perfvalue_to_value(self
, stattype
, value
):
928 if stattype
& self
.PERFCOUNTER_TIME
:
929 # Convert from ns to seconds
930 return value
/ 1000000000.0
934 def _unit_to_str(self
, unit
):
935 if unit
== self
.NONE
:
937 elif unit
== self
.BYTES
:
941 def to_pretty_iec(n
):
942 for bits
, suffix
in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
943 (20, 'Mi'), (10, 'Ki')]:
945 return str(n
>> bits
) + ' ' + suffix
949 def get_pretty_row(elems
, width
):
951 Takes an array of elements and returns a string with those elements
952 formatted as a table row. Useful for polling modules.
954 :param elems: the elements to be printed
955 :param width: the width of the terminal
958 column_width
= int(width
/ n
)
962 ret
+= '{0:>{w}} |'.format(elem
, w
=column_width
- 2)
966 def get_pretty_header(self
, elems
, width
):
968 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
970 :param elems: the elements to be printed
971 :param width: the width of the terminal
974 column_width
= int(width
/ n
)
978 for i
in range(0, n
):
979 ret
+= '-' * (column_width
- 1) + '+'
983 ret
+= self
.get_pretty_row(elems
, width
)
988 for i
in range(0, n
):
989 ret
+= '-' * (column_width
- 1) + '+'
994 def get_server(self
, hostname
):
996 Called by the plugin to fetch metadata about a particular hostname from
999 This is information that ceph-mgr has gleaned from the daemon metadata
1000 reported by daemons running on a particular server.
1002 :param hostname: a hostname
1004 return self
._ceph
_get
_server
(hostname
)
1006 def get_perf_schema(self
, svc_type
, svc_name
):
1008 Called by the plugin to fetch perf counter schema info.
1009 svc_name can be nullptr, as can svc_type, in which case
1012 :param str svc_type:
1013 :param str svc_name:
1014 :return: list of dicts describing the counters requested
1016 return self
._ceph
_get
_perf
_schema
(svc_type
, svc_name
)
1018 def get_counter(self
, svc_type
, svc_name
, path
):
1020 Called by the plugin to fetch the latest performance counter data for a
1021 particular counter on a particular service.
1023 :param str svc_type:
1024 :param str svc_name:
1025 :param str path: a period-separated concatenation of the subsystem and the
1026 counter name, for example "mds.inodes".
1027 :return: A list of two-tuples of (timestamp, value) is returned. This may be
1028 empty if no data is available.
1030 return self
._ceph
_get
_counter
(svc_type
, svc_name
, path
)
1032 def get_latest_counter(self
, svc_type
, svc_name
, path
):
1034 Called by the plugin to fetch only the newest performance counter data
1035 pointfor a particular counter on a particular service.
1037 :param str svc_type:
1038 :param str svc_name:
1039 :param str path: a period-separated concatenation of the subsystem and the
1040 counter name, for example "mds.inodes".
1041 :return: A list of two-tuples of (timestamp, value) is returned. This may be
1042 empty if no data is available.
1044 return self
._ceph
_get
_latest
_counter
(svc_type
, svc_name
, path
)
1046 def list_servers(self
):
1048 Like ``get_server``, but gives information about all servers (i.e. all
1049 unique hostnames that have been mentioned in daemon metadata)
1051 :return: a list of information about all servers
1054 return self
._ceph
_get
_server
(None)
1056 def get_metadata(self
, svc_type
, svc_id
):
1058 Fetch the daemon metadata for a particular service.
1060 ceph-mgr fetches metadata asynchronously, so are windows of time during
1061 addition/removal of services where the metadata is not available to
1062 modules. ``None`` is returned if no metadata is available.
1064 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1065 :param str svc_id: service id. convert OSD integer IDs to strings when
1067 :rtype: dict, or None if no metadata found
1069 return self
._ceph
_get
_metadata
(svc_type
, svc_id
)
1071 def get_daemon_status(self
, svc_type
, svc_id
):
1073 Fetch the latest status for a particular service daemon.
1075 This method may return ``None`` if no status information is
1076 available, for example because the daemon hasn't fully started yet.
1078 :param svc_type: string (e.g., 'rgw')
1079 :param svc_id: string
1080 :return: dict, or None if the service is not found
1082 return self
._ceph
_get
_daemon
_status
(svc_type
, svc_id
)
1084 def check_mon_command(self
, cmd_dict
: dict) -> HandleCommandResult
:
1086 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1090 r
= HandleCommandResult(*self
.mon_command(cmd_dict
))
1092 raise MonCommandFailed(f
'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1095 def mon_command(self
, cmd_dict
):
1097 Helper for modules that do simple, synchronous mon command
1100 See send_command for general case.
1102 :return: status int, out std, err str
1106 result
= CommandResult()
1107 self
.send_command(result
, "mon", "", json
.dumps(cmd_dict
), "")
1111 self
.log
.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1112 cmd_dict
['prefix'], r
[0], t2
- t1
1117 def send_command(self
, *args
, **kwargs
):
1119 Called by the plugin to send a command to the mon
1122 :param CommandResult result: an instance of the ``CommandResult``
1123 class, defined in the same module as MgrModule. This acts as a
1124 completion and stores the output of the command. Use
1125 ``CommandResult.wait()`` if you want to block on completion.
1126 :param str svc_type:
1128 :param str command: a JSON-serialized command. This uses the same
1129 format as the ceph command line, which is a dictionary of command
1130 arguments, with the extra ``prefix`` key containing the command
1131 name itself. Consult MonCommands.h for available commands and
1132 their expected arguments.
1133 :param str tag: used for nonblocking operation: when a command
1134 completes, the ``notify()`` callback on the MgrModule instance is
1135 triggered, with notify_type set to "command", and notify_id set to
1136 the tag of the command.
1138 self
._ceph
_send
_command
(*args
, **kwargs
)
1140 def set_health_checks(self
, checks
):
1142 Set the module's current map of health checks. Argument is a
1143 dict of check names to info, in this form:
1149 'severity': 'warning', # or 'error'
1150 'summary': 'summary string',
1151 'count': 4, # quantify badness
1152 'detail': [ 'list', 'of', 'detail', 'strings' ],
1155 'severity': 'error',
1156 'summary': 'bars are bad',
1157 'detail': [ 'too hard' ],
1161 :param list: dict of health check dicts
1163 self
._ceph
_set
_health
_checks
(checks
)
1165 def _handle_command(self
, inbuf
, cmd
):
1166 if cmd
['prefix'] not in CLICommand
.COMMANDS
:
1167 return self
.handle_command(inbuf
, cmd
)
1169 return CLICommand
.COMMANDS
[cmd
['prefix']].call(self
, cmd
, inbuf
)
1171 def handle_command(self
, inbuf
, cmd
):
1173 Called by ceph-mgr to request the plugin to handle one
1174 of the commands that it declared in self.COMMANDS
1176 Return a status code, an output buffer, and an
1177 output string. The output buffer is for data results,
1178 the output string is for informative text.
1180 :param inbuf: content of any "-i <file>" supplied to ceph cli
1182 :param cmd: from Ceph's cmdmap_t
1185 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1188 # Should never get called if they didn't declare
1190 raise NotImplementedError()
1192 def get_mgr_id(self
):
1194 Retrieve the name of the manager daemon where this plugin
1195 is currently being executed (i.e. the active manager).
1199 return self
._ceph
_get
_mgr
_id
()
1201 def get_ceph_option(self
, key
):
1202 return self
._ceph
_get
_option
(key
)
1204 def _validate_module_option(self
, key
):
1206 Helper: don't allow get/set config callers to
1207 access config options that they didn't declare
1210 if key
not in [o
['name'] for o
in self
.MODULE_OPTIONS
]:
1211 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1212 format(key
, self
.__class
__.__name
__))
1214 def _get_module_option(self
, key
, default
, localized_prefix
=""):
1215 r
= self
._ceph
_get
_module
_option
(self
.module_name
, key
,
1218 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
1222 def get_module_option(self
, key
, default
=None):
1224 Retrieve the value of a persistent configuration setting
1230 self
._validate
_module
_option
(key
)
1231 return self
._get
_module
_option
(key
, default
)
1233 def get_module_option_ex(self
, module
, key
, default
=None):
1235 Retrieve the value of a persistent configuration setting
1236 for the specified module.
1238 :param str module: The name of the module, e.g. 'dashboard'
1240 :param str key: The configuration key, e.g. 'server_addr'.
1241 :param str,None default: The default value to use when the
1242 returned value is ``None``. Defaults to ``None``.
1243 :return: str,int,bool,float,None
1245 if module
== self
.module_name
:
1246 self
._validate
_module
_option
(key
)
1247 r
= self
._ceph
_get
_module
_option
(module
, key
)
1248 return default
if r
is None else r
1250 def get_store_prefix(self
, key_prefix
):
1252 Retrieve a dict of KV store keys to values, where the keys
1253 have the given prefix
1255 :param str key_prefix:
1258 return self
._ceph
_get
_store
_prefix
(key_prefix
)
1260 def _set_localized(self
, key
, val
, setter
):
1261 return setter(_get_localized_key(self
.get_mgr_id(), key
), val
)
1263 def get_localized_module_option(self
, key
, default
=None):
1265 Retrieve localized configuration for this ceph-mgr instance
1270 self
._validate
_module
_option
(key
)
1271 return self
._get
_module
_option
(key
, default
, self
.get_mgr_id())
1273 def _set_module_option(self
, key
, val
):
1274 return self
._ceph
_set
_module
_option
(self
.module_name
, key
,
1275 None if val
is None else str(val
))
1277 def set_module_option(self
, key
, val
):
1279 Set the value of a persistent configuration setting
1282 :type val: str | None
1284 self
._validate
_module
_option
(key
)
1285 return self
._set
_module
_option
(key
, val
)
1287 def set_module_option_ex(self
, module
, key
, val
):
1289 Set the value of a persistent configuration setting
1290 for the specified module.
1296 if module
== self
.module_name
:
1297 self
._validate
_module
_option
(key
)
1298 return self
._ceph
_set
_module
_option
(module
, key
, str(val
))
1300 def set_localized_module_option(self
, key
, val
):
1302 Set localized configuration for this ceph-mgr instance
1307 self
._validate
_module
_option
(key
)
1308 return self
._set
_localized
(key
, val
, self
._set
_module
_option
)
1310 def set_store(self
, key
, val
):
1312 Set a value in this module's persistent key value store.
1313 If val is None, remove key from store
1318 self
._ceph
_set
_store
(key
, val
)
1320 def get_store(self
, key
, default
=None):
1322 Get a value from this module's persistent key value store
1324 r
= self
._ceph
_get
_store
(key
)
1330 def get_localized_store(self
, key
, default
=None):
1331 r
= self
._ceph
_get
_store
(_get_localized_key(self
.get_mgr_id(), key
))
1333 r
= self
._ceph
_get
_store
(key
)
1338 def set_localized_store(self
, key
, val
):
1339 return self
._set
_localized
(key
, val
, self
.set_store
)
1341 def self_test(self
):
1343 Run a self-test on the module. Override this function and implement
1344 a best as possible self-test for (automated) testing of the module
1346 Indicate any failures by raising an exception. This does not have
1347 to be pretty, it's mainly for picking up regressions during
1348 development, rather than use in the field.
1350 :return: None, or an advisory string for developer interest, such
1351 as a json dump of some state.
1355 def get_osdmap(self
):
1357 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1361 return self
._ceph
_get
_osdmap
()
1363 def get_latest(self
, daemon_type
, daemon_name
, counter
):
1364 data
= self
.get_latest_counter(
1365 daemon_type
, daemon_name
, counter
)[counter
]
1371 def get_latest_avg(self
, daemon_type
, daemon_name
, counter
):
1372 data
= self
.get_latest_counter(
1373 daemon_type
, daemon_name
, counter
)[counter
]
1375 return data
[1], data
[2]
1379 def get_all_perf_counters(self
, prio_limit
=PRIO_USEFUL
,
1380 services
=("mds", "mon", "osd",
1381 "rbd-mirror", "rgw", "tcmu-runner")):
1383 Return the perf counters currently known to this ceph-mgr
1384 instance, filtered by priority equal to or greater than `prio_limit`.
1386 The result is a map of string to dict, associating services
1387 (like "osd.123") with their counters. The counter
1388 dict for each service maps counter paths to a counter
1389 info structure, which is the information from
1390 the schema, plus an additional "value" member with the latest
1394 result
= defaultdict(dict) # type: Dict[str, dict]
1396 for server
in self
.list_servers():
1397 for service
in server
['services']:
1398 if service
['type'] not in services
:
1401 schema
= self
.get_perf_schema(service
['type'], service
['id'])
1403 self
.log
.warning("No perf counter schema for {0}.{1}".format(
1404 service
['type'], service
['id']
1408 # Value is returned in a potentially-multi-service format,
1409 # get just the service we're asking about
1410 svc_full_name
= "{0}.{1}".format(
1411 service
['type'], service
['id'])
1412 schema
= schema
[svc_full_name
]
1414 # Populate latest values
1415 for counter_path
, counter_schema
in schema
.items():
1416 # self.log.debug("{0}: {1}".format(
1417 # counter_path, json.dumps(counter_schema)
1419 if counter_schema
['priority'] < prio_limit
:
1422 counter_info
= dict(counter_schema
)
1424 # Also populate count for the long running avgs
1425 if counter_schema
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
1426 v
, c
= self
.get_latest_avg(
1431 counter_info
['value'], counter_info
['count'] = v
, c
1432 result
[svc_full_name
][counter_path
] = counter_info
1434 counter_info
['value'] = self
.get_latest(
1440 result
[svc_full_name
][counter_path
] = counter_info
1442 self
.log
.debug("returning {0} counter".format(len(result
)))
1446 def set_uri(self
, uri
):
1448 If the module exposes a service, then call this to publish the
1449 address once it is available.
1453 return self
._ceph
_set
_uri
(uri
)
1455 def have_mon_connection(self
):
1457 Check whether this ceph-mgr daemon has an open connection
1458 to a monitor. If it doesn't, then it's likely that the
1459 information we have about the cluster is out of date,
1460 and/or the monitor cluster is down.
1463 return self
._ceph
_have
_mon
_connection
()
1465 def update_progress_event(self
, evid
, desc
, progress
):
1466 return self
._ceph
_update
_progress
_event
(str(evid
),
1470 def complete_progress_event(self
, evid
):
1471 return self
._ceph
_complete
_progress
_event
(str(evid
))
1473 def clear_all_progress_events(self
):
1474 return self
._ceph
_clear
_all
_progress
_events
()
1479 A librados instance to be shared by any classes within
1480 this mgr module that want one.
1485 ctx_capsule
= self
.get_context()
1486 self
._rados
= rados
.Rados(context
=ctx_capsule
)
1487 self
._rados
.connect()
1488 self
._ceph
_register
_client
(self
._rados
.get_addrs())
1494 Implement this function to report whether the module's dependencies
1495 are met. For example, if the module needs to import a particular
1496 dependency to work, then use a try/except around the import at
1497 file scope, and then report here if the import failed.
1499 This will be called in a blocking way from the C++ code, so do not
1500 do any I/O that could block in this function.
1502 :return a 2-tuple consisting of a boolean and explanatory string
1507 def remote(self
, module_name
, method_name
, *args
, **kwargs
):
1509 Invoke a method on another module. All arguments, and the return
1510 value from the other module must be serializable.
1512 Limitation: Do not import any modules within the called method.
1513 Otherwise you will get an error in Python 2::
1515 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1519 :param module_name: Name of other module. If module isn't loaded,
1520 an ImportError exception is raised.
1521 :param method_name: Method name. If it does not exist, a NameError
1522 exception is raised.
1523 :param args: Argument tuple
1524 :param kwargs: Keyword argument dict
1525 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1526 :raises ImportError: No such module
1528 return self
._ceph
_dispatch
_remote
(module_name
, method_name
,
1531 def add_osd_perf_query(self
, query
):
1533 Register an OSD perf query. Argument is a
1534 dict of the query parameters, in this form:
1540 {'type': subkey_type, 'regex': regex_pattern},
1543 'performance_counter_descriptors': [
1544 list, of, descriptor, types
1546 'limit': {'order_by': performance_counter_type, 'max_count': n},
1550 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1551 'pg_id', 'object_name', 'snap_id'
1552 Valid performance counter types:
1553 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1554 'latency', 'write_latency', 'read_latency'
1556 :param object query: query
1557 :rtype: int (query id)
1559 return self
._ceph
_add
_osd
_perf
_query
(query
)
1561 def remove_osd_perf_query(self
, query_id
):
1563 Unregister an OSD perf query.
1565 :param int query_id: query ID
1567 return self
._ceph
_remove
_osd
_perf
_query
(query_id
)
1569 def get_osd_perf_counters(self
, query_id
):
1571 Get stats collected for an OSD perf query.
1573 :param int query_id: query ID
1575 return self
._ceph
_get
_osd
_perf
_counters
(query_id
)
1577 def is_authorized(self
, arguments
):
1579 Verifies that the current session caps permit executing the py service
1580 or current module with the provided arguments. This provides a generic
1581 way to allow modules to restrict by more fine-grained controls (e.g.
1584 :param arguments: dict of key/value arguments to test
1586 return self
._ceph
_is
_authorized
(arguments
)