]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
6f3e799187724b3dd81c362baf80359bfa02e5f3
1 import ceph_module
# noqa
4 from typing
import Set
, Tuple
, Iterator
, Any
, Dict
, Optional
, Callable
, List
6 # just for type checking
13 from collections
import defaultdict
, namedtuple
17 from mgr_util
import profile_method
19 # Full list of strings in "osd_types.cc:pg_state_string()"
58 class CommandResult(object):
60 Use with MgrModule.send_command
63 def __init__(self
, tag
=None):
64 self
.ev
= threading
.Event()
69 # This is just a convenience for notifications from
70 # C++ land, to avoid passing addresses around in messages.
71 self
.tag
= tag
if tag
else ""
73 def complete(self
, r
, outb
, outs
):
81 return self
.r
, self
.outb
, self
.outs
84 class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
85 def __new__(cls
, retval
=0, stdout
="", stderr
=""):
87 Tuple containing the result of `handle_command()`
89 Only write to stderr if there is an error, or in extraordinary circumstances
91 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
92 is critical information to include there.
94 Everything programmatically consumable should be put on stdout
96 :param retval: return code. E.g. 0 or -errno.EINVAL
98 :param stdout: data of this result.
100 :param stderr: Typically used for error messages.
103 return super(HandleCommandResult
, cls
).__new
__(cls
, retval
, stdout
, stderr
)
106 class MonCommandFailed(RuntimeError): pass
109 class OSDMap(ceph_module
.BasePyOSDMap
):
111 return self
._get
_epoch
()
113 def get_crush_version(self
):
114 return self
._get
_crush
_version
()
120 # FIXME: efficient implementation
122 return dict([(p
['pool'], p
) for p
in d
['pools']])
124 def get_pools_by_name(self
):
125 # FIXME: efficient implementation
127 return dict([(p
['pool_name'], p
) for p
in d
['pools']])
129 def new_incremental(self
):
130 return self
._new
_incremental
()
132 def apply_incremental(self
, inc
):
133 return self
._apply
_incremental
(inc
)
136 return self
._get
_crush
()
138 def get_pools_by_take(self
, take
):
139 return self
._get
_pools
_by
_take
(take
).get('pools', [])
141 def calc_pg_upmaps(self
, inc
,
142 max_deviation
=.01, max_iterations
=10, pools
=None):
145 return self
._calc
_pg
_upmaps
(
147 max_deviation
, max_iterations
, pools
)
149 def map_pool_pgs_up(self
, poolid
):
150 return self
._map
_pool
_pgs
_up
(poolid
)
152 def pg_to_up_acting_osds(self
, pool_id
, ps
):
153 return self
._pg
_to
_up
_acting
_osds
(pool_id
, ps
)
155 def pool_raw_used_rate(self
, pool_id
):
156 return self
._pool
_raw
_used
_rate
(pool_id
)
158 def get_ec_profile(self
, name
):
159 # FIXME: efficient implementation
161 return d
['erasure_code_profiles'].get(name
, None)
163 def get_require_osd_release(self
):
165 return d
['require_osd_release']
168 class OSDMapIncremental(ceph_module
.BasePyOSDMapIncremental
):
170 return self
._get
_epoch
()
175 def set_osd_reweights(self
, weightmap
):
177 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
179 return self
._set
_osd
_reweights
(weightmap
)
181 def set_crush_compat_weight_set_weights(self
, weightmap
):
183 weightmap is a dict, int to float. devices only. e.g.,
184 { 0: 3.4, 1: 3.3, 2: 3.334 }
186 return self
._set
_crush
_compat
_weight
_set
_weights
(weightmap
)
189 class CRUSHMap(ceph_module
.BasePyCRUSH
):
190 ITEM_NONE
= 0x7fffffff
191 DEFAULT_CHOOSE_ARGS
= '-1'
196 def get_item_weight(self
, item
):
197 return self
._get
_item
_weight
(item
)
199 def get_item_name(self
, item
):
200 return self
._get
_item
_name
(item
)
202 def find_takes(self
):
203 return self
._find
_takes
().get('takes', [])
205 def get_take_weight_osd_map(self
, root
):
206 uglymap
= self
._get
_take
_weight
_osd
_map
(root
)
207 return {int(k
): v
for k
, v
in six
.iteritems(uglymap
.get('weights', {}))}
210 def have_default_choose_args(dump
):
211 return CRUSHMap
.DEFAULT_CHOOSE_ARGS
in dump
.get('choose_args', {})
214 def get_default_choose_args(dump
):
215 return dump
.get('choose_args').get(CRUSHMap
.DEFAULT_CHOOSE_ARGS
, [])
217 def get_rule(self
, rule_name
):
218 # TODO efficient implementation
219 for rule
in self
.dump()['rules']:
220 if rule_name
== rule
['rule_name']:
225 def get_rule_by_id(self
, rule_id
):
226 for rule
in self
.dump()['rules']:
227 if rule
['rule_id'] == rule_id
:
232 def get_rule_root(self
, rule_name
):
233 rule
= self
.get_rule(rule_name
)
238 first_take
= [s
for s
in rule
['steps'] if s
['op'] == 'take'][0]
240 logging
.warning("CRUSH rule '{0}' has no 'take' step".format(
244 return first_take
['item']
246 def get_osds_under(self
, root_id
):
247 # TODO don't abuse dump like this
249 buckets
= dict([(b
['id'], b
) for b
in d
['buckets']])
254 for item
in b
['items']:
256 osd_list
.append(item
['id'])
259 accumulate(buckets
[item
['id']])
263 accumulate(buckets
[root_id
])
267 def device_class_counts(self
):
268 result
= defaultdict(int) # type: Dict[str, int]
269 # TODO don't abuse dump like this
271 for device
in d
['devices']:
272 cls
= device
.get('class', None)
278 class CLICommand(object):
279 COMMANDS
= {} # type: Dict[str, CLICommand]
281 def __init__(self
, prefix
, args
="", desc
="", perm
="rw"):
287 self
.func
= None # type: Optional[Callable]
290 def _parse_args(self
):
293 args
= self
.args
.split(" ")
295 arg_desc
= arg
.strip().split(",")
302 self
.args_dict
[v
] = arg_d
304 def __call__(self
, func
):
306 self
.COMMANDS
[self
.prefix
] = self
309 def call(self
, mgr
, cmd_dict
, inbuf
):
311 for a
, d
in self
.args_dict
.items():
312 if 'req' in d
and d
['req'] == "false" and a
not in cmd_dict
:
314 kwargs
[a
.replace("-", "_")] = cmd_dict
[a
]
316 kwargs
['inbuf'] = inbuf
318 return self
.func(mgr
, **kwargs
)
322 'cmd': '{} {}'.format(self
.prefix
, self
.args
),
328 def dump_cmd_list(cls
):
329 return [cmd
.dump_cmd() for cmd
in cls
.COMMANDS
.values()]
332 def CLIReadCommand(prefix
, args
="", desc
=""):
333 return CLICommand(prefix
, args
, desc
, "r")
336 def CLIWriteCommand(prefix
, args
="", desc
=""):
337 return CLICommand(prefix
, args
, desc
, "w")
340 def _get_localized_key(prefix
, key
):
341 return '{}/{}'.format(prefix
, key
)
346 Helper class to declare options for MODULE_OPTIONS list.
348 Caveat: it uses argument names matching Python keywords (type, min, max),
349 so any further processing should happen in a separate method.
351 TODO: type validation.
358 desc
=None, longdesc
=None,
365 super(Option
, self
).__init
__(
366 (k
, v
) for k
, v
in vars().items()
367 if k
!= 'self' and v
is not None)
371 Helper class to declare options for COMMANDS list.
373 It also allows to specify prefix and args separately, as well as storing a
377 >>> Command(prefix="example",
378 ... args="name=arg,type=CephInt",
381 {'poll': False, 'cmd': 'example name=arg,type=CephInt', 'perm': 'w', 'desc': 'Blah'}
393 super(Command
, self
).__init
__(
394 cmd
=prefix
+ (' ' + args
if args
else ''),
400 self
.handler
= handler
402 def register(self
, instance
=False):
404 Register a CLICommand handler. It allows an instance to register bound
405 methods. In that case, the mgr instance is not passed, and it's expected
406 to be available in the class instance.
407 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
416 func
=lambda mgr
, *args
, **kwargs
: HandleCommandResult(*self
.handler(
417 *((instance
or mgr
,) + args
), **kwargs
))
421 class CPlusPlusHandler(logging
.Handler
):
422 def __init__(self
, module_inst
):
423 super(CPlusPlusHandler
, self
).__init
__()
424 self
._module
= module_inst
425 self
.setFormatter(logging
.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
426 .format(module_inst
.module_name
)))
428 def emit(self
, record
):
429 if record
.levelno
>= self
.level
:
430 self
._module
._ceph
_log
(self
.format(record
))
432 class ClusterLogHandler(logging
.Handler
):
433 def __init__(self
, module_inst
):
435 self
._module
= module_inst
436 self
.setFormatter(logging
.Formatter("%(message)s"))
438 def emit(self
, record
):
440 'DEBUG': MgrModule
.CLUSTER_LOG_PRIO_DEBUG
,
441 'INFO': MgrModule
.CLUSTER_LOG_PRIO_INFO
,
442 'WARNING': MgrModule
.CLUSTER_LOG_PRIO_WARN
,
443 'ERROR': MgrModule
.CLUSTER_LOG_PRIO_ERROR
,
444 'CRITICAL': MgrModule
.CLUSTER_LOG_PRIO_ERROR
,
446 level
= levelmap
[record
.levelname
]
447 if record
.levelno
>= self
.level
:
448 self
._module
.cluster_log(self
._module
.module_name
,
452 class FileHandler(logging
.FileHandler
):
453 def __init__(self
, module_inst
):
454 path
= module_inst
.get_ceph_option("log_file")
455 idx
= path
.rfind(".log")
457 self
.path
= "{}.{}.log".format(path
[:idx
], module_inst
.module_name
)
459 self
.path
= "{}.{}".format(path
, module_inst
.module_name
)
460 super(FileHandler
, self
).__init
__(self
.path
, delay
=True)
461 self
.setFormatter(logging
.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
464 class MgrModuleLoggingMixin(object):
465 def _configure_logging(self
, mgr_level
, module_level
, cluster_level
,
466 log_to_file
, log_to_cluster
):
467 self
._mgr
_level
= None
468 self
._module
_level
= None
469 self
._root
_logger
= logging
.getLogger()
471 self
._unconfigure
_logging
()
473 # the ceph log handler is initialized only once
474 self
._mgr
_log
_handler
= CPlusPlusHandler(self
)
475 self
._cluster
_log
_handler
= ClusterLogHandler(self
)
476 self
._file
_log
_handler
= FileHandler(self
)
478 self
.log_to_file
= log_to_file
479 self
.log_to_cluster
= log_to_cluster
481 self
._root
_logger
.addHandler(self
._mgr
_log
_handler
)
483 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
485 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
487 self
._root
_logger
.setLevel(logging
.NOTSET
)
488 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
491 def _unconfigure_logging(self
):
492 # remove existing handlers:
494 h
for h
in self
._root
_logger
.handlers
if isinstance(h
, CPlusPlusHandler
) or isinstance(h
, FileHandler
) or isinstance(h
, ClusterLogHandler
)]
495 for h
in rm_handlers
:
496 self
._root
_logger
.removeHandler(h
)
497 self
.log_to_file
= False
498 self
.log_to_cluster
= False
500 def _set_log_level(self
, mgr_level
, module_level
, cluster_level
):
501 self
._cluster
_log
_handler
.setLevel(cluster_level
.upper())
503 module_level
= module_level
.upper() if module_level
else ''
504 if not self
._module
_level
:
505 # using debug_mgr level
506 if not module_level
and self
._mgr
_level
== mgr_level
:
507 # no change in module level neither in debug_mgr
510 if self
._module
_level
== module_level
:
511 # no change in module level
514 if not self
._module
_level
and not module_level
:
515 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
516 self
.getLogger().debug("setting log level based on debug_mgr: %s (%s)", level
, mgr_level
)
517 elif self
._module
_level
and not module_level
:
518 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
519 self
.getLogger().warning("unsetting module log level, falling back to "
520 "debug_mgr level: %s (%s)", level
, mgr_level
)
523 self
.getLogger().debug("setting log level: %s", level
)
525 self
._module
_level
= module_level
526 self
._mgr
_level
= mgr_level
528 self
._mgr
_log
_handler
.setLevel(level
)
529 self
._file
_log
_handler
.setLevel(level
)
531 def _enable_file_log(self
):
533 self
.getLogger().warning("enabling logging to file")
534 self
.log_to_file
= True
535 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
537 def _disable_file_log(self
):
539 self
.getLogger().warning("disabling logging to file")
540 self
.log_to_file
= False
541 self
._root
_logger
.removeHandler(self
._file
_log
_handler
)
543 def _enable_cluster_log(self
):
545 self
.getLogger().warning("enabling logging to cluster")
546 self
.log_to_cluster
= True
547 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
549 def _disable_cluster_log(self
):
550 # disable cluster log
551 self
.getLogger().warning("disabling logging to cluster")
552 self
.log_to_cluster
= False
553 self
._root
_logger
.removeHandler(self
._cluster
_log
_handler
)
555 def _ceph_log_level_to_python(self
, ceph_log_level
):
558 ceph_log_level
= int(ceph_log_level
.split("/", 1)[0])
565 if ceph_log_level
<= 0:
566 log_level
= "CRITICAL"
567 elif ceph_log_level
<= 1:
568 log_level
= "WARNING"
569 elif ceph_log_level
<= 4:
573 def getLogger(self
, name
=None):
574 return logging
.getLogger(name
)
577 class MgrStandbyModule(ceph_module
.BaseMgrStandbyModule
, MgrModuleLoggingMixin
):
579 Standby modules only implement a serve and shutdown method, they
580 are not permitted to implement commands and they do not receive
583 They only have access to the mgrmap (for accessing service URI info
584 from their active peer), and to configuration settings (read only).
587 MODULE_OPTIONS
= [] # type: List[Dict[str, Any]]
588 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
590 def __init__(self
, module_name
, capsule
):
591 super(MgrStandbyModule
, self
).__init
__(capsule
)
592 self
.module_name
= module_name
594 # see also MgrModule.__init__()
595 for o
in self
.MODULE_OPTIONS
:
598 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
600 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
602 mgr_level
= self
.get_ceph_option("debug_mgr")
603 log_level
= self
.get_module_option("log_level")
604 cluster_level
= self
.get_module_option('log_to_cluster_level')
605 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
608 # for backwards compatibility
609 self
._logger
= self
.getLogger()
612 self
._unconfigure
_logging
()
615 def _register_options(cls
, module_name
):
616 cls
.MODULE_OPTIONS
.append(
617 Option(name
='log_level', type='str', default
="", runtime
=True,
618 enum_allowed
=['info', 'debug', 'critical', 'error',
620 cls
.MODULE_OPTIONS
.append(
621 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
622 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
623 cls
.MODULE_OPTIONS
.append(
624 Option(name
='log_to_cluster', type='bool', default
=False,
626 cls
.MODULE_OPTIONS
.append(
627 Option(name
='log_to_cluster_level', type='str', default
='info',
629 enum_allowed
=['info', 'debug', 'critical', 'error',
638 The serve method is mandatory for standby modules.
641 raise NotImplementedError()
643 def get_mgr_id(self
):
644 return self
._ceph
_get
_mgr
_id
()
646 def get_module_option(self
, key
, default
=None):
648 Retrieve the value of a persistent configuration setting
651 :param default: the default value of the config if it is not found
654 r
= self
._ceph
_get
_module
_option
(key
)
656 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
660 def get_ceph_option(self
, key
):
661 return self
._ceph
_get
_option
(key
)
663 def get_store(self
, key
):
665 Retrieve the value of a persistent KV store entry
668 :return: Byte string or None
670 return self
._ceph
_get
_store
(key
)
672 def get_active_uri(self
):
673 return self
._ceph
_get
_active
_uri
()
675 def get_localized_module_option(self
, key
, default
=None):
676 r
= self
._ceph
_get
_module
_option
(key
, self
.get_mgr_id())
678 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
683 class MgrModule(ceph_module
.BaseMgrModule
, MgrModuleLoggingMixin
):
684 COMMANDS
= [] # type: List[Any]
685 MODULE_OPTIONS
= [] # type: List[dict]
686 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
688 # Priority definitions for perf counters
692 PRIO_UNINTERESTING
= 2
695 # counter value types
700 PERFCOUNTER_LONGRUNAVG
= 4
701 PERFCOUNTER_COUNTER
= 8
702 PERFCOUNTER_HISTOGRAM
= 0x10
703 PERFCOUNTER_TYPE_MASK
= ~
3
709 # Cluster log priorities
710 CLUSTER_LOG_PRIO_DEBUG
= 0
711 CLUSTER_LOG_PRIO_INFO
= 1
712 CLUSTER_LOG_PRIO_SEC
= 2
713 CLUSTER_LOG_PRIO_WARN
= 3
714 CLUSTER_LOG_PRIO_ERROR
= 4
716 def __init__(self
, module_name
, py_modules_ptr
, this_ptr
):
717 self
.module_name
= module_name
718 super(MgrModule
, self
).__init
__(py_modules_ptr
, this_ptr
)
720 for o
in self
.MODULE_OPTIONS
:
723 # we'll assume the declared type matches the
724 # supplied default value's type.
725 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
727 # module not declaring it's type, so normalize the
728 # default value to be a string for consistent behavior
729 # with default and user-supplied option values.
730 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
732 mgr_level
= self
.get_ceph_option("debug_mgr")
733 log_level
= self
.get_module_option("log_level")
734 cluster_level
= self
.get_module_option('log_to_cluster_level')
735 log_to_file
= self
.get_module_option("log_to_file")
736 log_to_cluster
= self
.get_module_option("log_to_cluster")
737 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
738 log_to_file
, log_to_cluster
)
740 # for backwards compatibility
741 self
._logger
= self
.getLogger()
743 self
._version
= self
._ceph
_get
_version
()
745 self
._perf
_schema
_cache
= None
747 # Keep a librados instance for those that need it.
752 self
._unconfigure
_logging
()
755 def _register_options(cls
, module_name
):
756 cls
.MODULE_OPTIONS
.append(
757 Option(name
='log_level', type='str', default
="", runtime
=True,
758 enum_allowed
=['info', 'debug', 'critical', 'error',
760 cls
.MODULE_OPTIONS
.append(
761 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
762 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
763 cls
.MODULE_OPTIONS
.append(
764 Option(name
='log_to_cluster', type='bool', default
=False,
766 cls
.MODULE_OPTIONS
.append(
767 Option(name
='log_to_cluster_level', type='str', default
='info',
769 enum_allowed
=['info', 'debug', 'critical', 'error',
773 def _register_commands(cls
, module_name
):
774 cls
.COMMANDS
.extend(CLICommand
.dump_cmd_list())
780 def cluster_log(self
, channel
, priority
, message
):
782 :param channel: The log channel. This can be 'cluster', 'audit', ...
784 :param priority: The log message priority. This can be
785 CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
786 CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
787 CLUSTER_LOG_PRIO_ERROR.
789 :param message: The message to log.
792 self
._ceph
_cluster
_log
(channel
, priority
, message
)
799 def release_name(self
):
801 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
802 :return: Returns the release name of the Ceph version in lower case.
805 return self
._ceph
_get
_release
_name
()
807 def get_context(self
):
809 :return: a Python capsule containing a C++ CephContext pointer
811 return self
._ceph
_get
_context
()
813 def notify(self
, notify_type
, notify_id
):
815 Called by the ceph-mgr service to notify the Python plugin
816 that new state is available.
818 :param notify_type: string indicating what kind of notification,
819 such as osd_map, mon_map, fs_map, mon_status,
820 health, pg_summary, command, service_map
821 :param notify_id: string (may be empty) that optionally specifies
822 which entity is being notified about. With
823 "command" notifications this is set to the tag
824 ``from send_command``.
828 def _config_notify(self
):
829 # check logging options for changes
830 mgr_level
= self
.get_ceph_option("debug_mgr")
831 module_level
= self
.get_module_option("log_level")
832 cluster_level
= self
.get_module_option("log_to_cluster_level")
833 log_to_file
= self
.get_module_option("log_to_file", False)
834 log_to_cluster
= self
.get_module_option("log_to_cluster", False)
836 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
838 if log_to_file
!= self
.log_to_file
:
840 self
._enable
_file
_log
()
842 self
._disable
_file
_log
()
843 if log_to_cluster
!= self
.log_to_cluster
:
845 self
._enable
_cluster
_log
()
847 self
._disable
_cluster
_log
()
849 # call module subclass implementations
852 def config_notify(self
):
854 Called by the ceph-mgr service to notify the Python plugin
855 that the configuration may have changed. Modules will want to
856 refresh any configuration values stored in config variables.
862 Called by the ceph-mgr service to start any server that
863 is provided by this Python plugin. The implementation
864 of this function should block until ``shutdown`` is called.
866 You *must* implement ``shutdown`` if you implement ``serve``
872 Called by the ceph-mgr service to request that this
873 module drop out of its serve() function. You do not
874 need to implement this if you do not implement serve()
879 addrs
= self
._rados
.get_addrs()
880 self
._rados
.shutdown()
881 self
._ceph
_unregister
_client
(addrs
)
883 def get(self
, data_name
):
885 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
887 :param str data_name: Valid things to fetch are osd_crush_map_text,
888 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
889 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
890 health, mon_status, devices, device <devid>, pg_stats,
891 pool_stats, pg_ready, osd_ping_times.
894 All these structures have their own JSON representations: experiment
895 or look at the C++ ``dump()`` methods to learn about them.
897 return self
._ceph
_get
(data_name
)
899 def _stattype_to_str(self
, stattype
):
901 typeonly
= stattype
& self
.PERFCOUNTER_TYPE_MASK
904 if typeonly
== self
.PERFCOUNTER_LONGRUNAVG
:
905 # this lie matches the DaemonState decoding: only val, no counts
907 if typeonly
== self
.PERFCOUNTER_COUNTER
:
909 if typeonly
== self
.PERFCOUNTER_HISTOGRAM
:
914 def _perfpath_to_path_labels(self
, daemon
, path
):
915 # type: (str, str) -> Tuple[str, Tuple[str, ...], Tuple[str, ...]]
916 label_names
= ("ceph_daemon",) # type: Tuple[str, ...]
917 labels
= (daemon
,) # type: Tuple[str, ...]
919 if daemon
.startswith('rbd-mirror.'):
921 r
'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
925 path
= 'rbd_mirror_image_' + match
.group(4)
926 pool
= match
.group(1)
927 namespace
= match
.group(2) or ''
928 image
= match
.group(3)
929 label_names
+= ('pool', 'namespace', 'image')
930 labels
+= (pool
, namespace
, image
)
932 return path
, label_names
, labels
,
934 def _perfvalue_to_value(self
, stattype
, value
):
935 if stattype
& self
.PERFCOUNTER_TIME
:
936 # Convert from ns to seconds
937 return value
/ 1000000000.0
941 def _unit_to_str(self
, unit
):
942 if unit
== self
.NONE
:
944 elif unit
== self
.BYTES
:
948 def to_pretty_iec(n
):
949 for bits
, suffix
in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
950 (20, 'Mi'), (10, 'Ki')]:
952 return str(n
>> bits
) + ' ' + suffix
956 def get_pretty_row(elems
, width
):
958 Takes an array of elements and returns a string with those elements
959 formatted as a table row. Useful for polling modules.
961 :param elems: the elements to be printed
962 :param width: the width of the terminal
965 column_width
= int(width
/ n
)
969 ret
+= '{0:>{w}} |'.format(elem
, w
=column_width
- 2)
973 def get_pretty_header(self
, elems
, width
):
975 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
977 :param elems: the elements to be printed
978 :param width: the width of the terminal
981 column_width
= int(width
/ n
)
985 for i
in range(0, n
):
986 ret
+= '-' * (column_width
- 1) + '+'
990 ret
+= self
.get_pretty_row(elems
, width
)
995 for i
in range(0, n
):
996 ret
+= '-' * (column_width
- 1) + '+'
1001 def get_server(self
, hostname
):
1003 Called by the plugin to fetch metadata about a particular hostname from
1006 This is information that ceph-mgr has gleaned from the daemon metadata
1007 reported by daemons running on a particular server.
1009 :param hostname: a hostname
1011 return self
._ceph
_get
_server
(hostname
)
1013 def get_perf_schema(self
, svc_type
, svc_name
):
1015 Called by the plugin to fetch perf counter schema info.
1016 svc_name can be nullptr, as can svc_type, in which case
1019 :param str svc_type:
1020 :param str svc_name:
1021 :return: list of dicts describing the counters requested
1023 return self
._ceph
_get
_perf
_schema
(svc_type
, svc_name
)
1025 def get_counter(self
, svc_type
, svc_name
, path
):
1027 Called by the plugin to fetch the latest performance counter data for a
1028 particular counter on a particular service.
1030 :param str svc_type:
1031 :param str svc_name:
1032 :param str path: a period-separated concatenation of the subsystem and the
1033 counter name, for example "mds.inodes".
1034 :return: A list of two-tuples of (timestamp, value) is returned. This may be
1035 empty if no data is available.
1037 return self
._ceph
_get
_counter
(svc_type
, svc_name
, path
)
1039 def get_latest_counter(self
, svc_type
, svc_name
, path
):
1041 Called by the plugin to fetch only the newest performance counter data
1042 pointfor a particular counter on a particular service.
1044 :param str svc_type:
1045 :param str svc_name:
1046 :param str path: a period-separated concatenation of the subsystem and the
1047 counter name, for example "mds.inodes".
1048 :return: A list of two-tuples of (timestamp, value) is returned. This may be
1049 empty if no data is available.
1051 return self
._ceph
_get
_latest
_counter
(svc_type
, svc_name
, path
)
1053 def list_servers(self
):
1055 Like ``get_server``, but gives information about all servers (i.e. all
1056 unique hostnames that have been mentioned in daemon metadata)
1058 :return: a list of information about all servers
1061 return self
._ceph
_get
_server
(None)
1063 def get_metadata(self
, svc_type
, svc_id
, default
=None):
1065 Fetch the daemon metadata for a particular service.
1067 ceph-mgr fetches metadata asynchronously, so are windows of time during
1068 addition/removal of services where the metadata is not available to
1069 modules. ``None`` is returned if no metadata is available.
1071 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1072 :param str svc_id: service id. convert OSD integer IDs to strings when
1074 :rtype: dict, or None if no metadata found
1076 metadata
= self
._ceph
_get
_metadata
(svc_type
, svc_id
)
1077 if metadata
is None:
1081 def get_daemon_status(self
, svc_type
, svc_id
):
1083 Fetch the latest status for a particular service daemon.
1085 This method may return ``None`` if no status information is
1086 available, for example because the daemon hasn't fully started yet.
1088 :param svc_type: string (e.g., 'rgw')
1089 :param svc_id: string
1090 :return: dict, or None if the service is not found
1092 return self
._ceph
_get
_daemon
_status
(svc_type
, svc_id
)
1094 def check_mon_command(self
, cmd_dict
: dict) -> HandleCommandResult
:
1096 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1100 r
= HandleCommandResult(*self
.mon_command(cmd_dict
))
1102 raise MonCommandFailed(f
'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1105 def mon_command(self
, cmd_dict
):
1107 Helper for modules that do simple, synchronous mon command
1110 See send_command for general case.
1112 :return: status int, out std, err str
1116 result
= CommandResult()
1117 self
.send_command(result
, "mon", "", json
.dumps(cmd_dict
), "")
1121 self
.log
.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1122 cmd_dict
['prefix'], r
[0], t2
- t1
1127 def send_command(self
, *args
, **kwargs
):
1129 Called by the plugin to send a command to the mon
1132 :param CommandResult result: an instance of the ``CommandResult``
1133 class, defined in the same module as MgrModule. This acts as a
1134 completion and stores the output of the command. Use
1135 ``CommandResult.wait()`` if you want to block on completion.
1136 :param str svc_type:
1138 :param str command: a JSON-serialized command. This uses the same
1139 format as the ceph command line, which is a dictionary of command
1140 arguments, with the extra ``prefix`` key containing the command
1141 name itself. Consult MonCommands.h for available commands and
1142 their expected arguments.
1143 :param str tag: used for nonblocking operation: when a command
1144 completes, the ``notify()`` callback on the MgrModule instance is
1145 triggered, with notify_type set to "command", and notify_id set to
1146 the tag of the command.
1148 self
._ceph
_send
_command
(*args
, **kwargs
)
1150 def set_health_checks(self
, checks
):
1152 Set the module's current map of health checks. Argument is a
1153 dict of check names to info, in this form:
1159 'severity': 'warning', # or 'error'
1160 'summary': 'summary string',
1161 'count': 4, # quantify badness
1162 'detail': [ 'list', 'of', 'detail', 'strings' ],
1165 'severity': 'error',
1166 'summary': 'bars are bad',
1167 'detail': [ 'too hard' ],
1171 :param list: dict of health check dicts
1173 self
._ceph
_set
_health
_checks
(checks
)
1175 def _handle_command(self
, inbuf
, cmd
):
1176 if cmd
['prefix'] not in CLICommand
.COMMANDS
:
1177 return self
.handle_command(inbuf
, cmd
)
1179 return CLICommand
.COMMANDS
[cmd
['prefix']].call(self
, cmd
, inbuf
)
1181 def handle_command(self
, inbuf
, cmd
):
1183 Called by ceph-mgr to request the plugin to handle one
1184 of the commands that it declared in self.COMMANDS
1186 Return a status code, an output buffer, and an
1187 output string. The output buffer is for data results,
1188 the output string is for informative text.
1190 :param inbuf: content of any "-i <file>" supplied to ceph cli
1192 :param cmd: from Ceph's cmdmap_t
1195 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1198 # Should never get called if they didn't declare
1200 raise NotImplementedError()
1202 def get_mgr_id(self
):
1204 Retrieve the name of the manager daemon where this plugin
1205 is currently being executed (i.e. the active manager).
1209 return self
._ceph
_get
_mgr
_id
()
1211 def get_ceph_option(self
, key
):
1212 return self
._ceph
_get
_option
(key
)
1214 def _validate_module_option(self
, key
):
1216 Helper: don't allow get/set config callers to
1217 access config options that they didn't declare
1220 if key
not in [o
['name'] for o
in self
.MODULE_OPTIONS
]:
1221 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1222 format(key
, self
.__class
__.__name
__))
1224 def _get_module_option(self
, key
, default
, localized_prefix
=""):
1225 r
= self
._ceph
_get
_module
_option
(self
.module_name
, key
,
1228 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
1232 def get_module_option(self
, key
, default
=None):
1234 Retrieve the value of a persistent configuration setting
1240 self
._validate
_module
_option
(key
)
1241 return self
._get
_module
_option
(key
, default
)
1243 def get_module_option_ex(self
, module
, key
, default
=None):
1245 Retrieve the value of a persistent configuration setting
1246 for the specified module.
1248 :param str module: The name of the module, e.g. 'dashboard'
1250 :param str key: The configuration key, e.g. 'server_addr'.
1251 :param str,None default: The default value to use when the
1252 returned value is ``None``. Defaults to ``None``.
1253 :return: str,int,bool,float,None
1255 if module
== self
.module_name
:
1256 self
._validate
_module
_option
(key
)
1257 r
= self
._ceph
_get
_module
_option
(module
, key
)
1258 return default
if r
is None else r
1260 def get_store_prefix(self
, key_prefix
):
1262 Retrieve a dict of KV store keys to values, where the keys
1263 have the given prefix
1265 :param str key_prefix:
1268 return self
._ceph
_get
_store
_prefix
(key_prefix
)
1270 def _set_localized(self
, key
, val
, setter
):
1271 return setter(_get_localized_key(self
.get_mgr_id(), key
), val
)
1273 def get_localized_module_option(self
, key
, default
=None):
1275 Retrieve localized configuration for this ceph-mgr instance
1280 self
._validate
_module
_option
(key
)
1281 return self
._get
_module
_option
(key
, default
, self
.get_mgr_id())
1283 def _set_module_option(self
, key
, val
):
1284 return self
._ceph
_set
_module
_option
(self
.module_name
, key
,
1285 None if val
is None else str(val
))
1287 def set_module_option(self
, key
, val
):
1289 Set the value of a persistent configuration setting
1292 :type val: str | None
1294 self
._validate
_module
_option
(key
)
1295 return self
._set
_module
_option
(key
, val
)
1297 def set_module_option_ex(self
, module
, key
, val
):
1299 Set the value of a persistent configuration setting
1300 for the specified module.
1306 if module
== self
.module_name
:
1307 self
._validate
_module
_option
(key
)
1308 return self
._ceph
_set
_module
_option
(module
, key
, str(val
))
1310 def set_localized_module_option(self
, key
, val
):
1312 Set localized configuration for this ceph-mgr instance
1317 self
._validate
_module
_option
(key
)
1318 return self
._set
_localized
(key
, val
, self
._set
_module
_option
)
1320 def set_store(self
, key
, val
):
1322 Set a value in this module's persistent key value store.
1323 If val is None, remove key from store
1328 self
._ceph
_set
_store
(key
, val
)
1330 def get_store(self
, key
, default
=None):
1332 Get a value from this module's persistent key value store
1334 r
= self
._ceph
_get
_store
(key
)
1340 def get_localized_store(self
, key
, default
=None):
1341 r
= self
._ceph
_get
_store
(_get_localized_key(self
.get_mgr_id(), key
))
1343 r
= self
._ceph
_get
_store
(key
)
1348 def set_localized_store(self
, key
, val
):
1349 return self
._set
_localized
(key
, val
, self
.set_store
)
1351 def self_test(self
):
1353 Run a self-test on the module. Override this function and implement
1354 a best as possible self-test for (automated) testing of the module
1356 Indicate any failures by raising an exception. This does not have
1357 to be pretty, it's mainly for picking up regressions during
1358 development, rather than use in the field.
1360 :return: None, or an advisory string for developer interest, such
1361 as a json dump of some state.
1365 def get_osdmap(self
):
1367 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1371 return self
._ceph
_get
_osdmap
()
1373 def get_latest(self
, daemon_type
, daemon_name
, counter
):
1374 data
= self
.get_latest_counter(
1375 daemon_type
, daemon_name
, counter
)[counter
]
1381 def get_latest_avg(self
, daemon_type
, daemon_name
, counter
):
1382 data
= self
.get_latest_counter(
1383 daemon_type
, daemon_name
, counter
)[counter
]
1385 return data
[1], data
[2]
1390 def get_all_perf_counters(self
, prio_limit
=PRIO_USEFUL
,
1391 services
=("mds", "mon", "osd",
1392 "rbd-mirror", "rgw", "tcmu-runner")):
1394 Return the perf counters currently known to this ceph-mgr
1395 instance, filtered by priority equal to or greater than `prio_limit`.
1397 The result is a map of string to dict, associating services
1398 (like "osd.123") with their counters. The counter
1399 dict for each service maps counter paths to a counter
1400 info structure, which is the information from
1401 the schema, plus an additional "value" member with the latest
1405 result
= defaultdict(dict) # type: Dict[str, dict]
1407 for server
in self
.list_servers():
1408 for service
in server
['services']:
1409 if service
['type'] not in services
:
1412 schema
= self
.get_perf_schema(service
['type'], service
['id'])
1414 self
.log
.warning("No perf counter schema for {0}.{1}".format(
1415 service
['type'], service
['id']
1419 # Value is returned in a potentially-multi-service format,
1420 # get just the service we're asking about
1421 svc_full_name
= "{0}.{1}".format(
1422 service
['type'], service
['id'])
1423 schema
= schema
[svc_full_name
]
1425 # Populate latest values
1426 for counter_path
, counter_schema
in schema
.items():
1427 # self.log.debug("{0}: {1}".format(
1428 # counter_path, json.dumps(counter_schema)
1430 if counter_schema
['priority'] < prio_limit
:
1433 counter_info
= dict(counter_schema
)
1435 # Also populate count for the long running avgs
1436 if counter_schema
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
1437 v
, c
= self
.get_latest_avg(
1442 counter_info
['value'], counter_info
['count'] = v
, c
1443 result
[svc_full_name
][counter_path
] = counter_info
1445 counter_info
['value'] = self
.get_latest(
1451 result
[svc_full_name
][counter_path
] = counter_info
1453 self
.log
.debug("returning {0} counter".format(len(result
)))
1457 def set_uri(self
, uri
):
1459 If the module exposes a service, then call this to publish the
1460 address once it is available.
1464 return self
._ceph
_set
_uri
(uri
)
1466 def have_mon_connection(self
):
1468 Check whether this ceph-mgr daemon has an open connection
1469 to a monitor. If it doesn't, then it's likely that the
1470 information we have about the cluster is out of date,
1471 and/or the monitor cluster is down.
1474 return self
._ceph
_have
_mon
_connection
()
1476 def update_progress_event(self
, evid
, desc
, progress
):
1477 return self
._ceph
_update
_progress
_event
(str(evid
),
1481 def complete_progress_event(self
, evid
):
1482 return self
._ceph
_complete
_progress
_event
(str(evid
))
1484 def clear_all_progress_events(self
):
1485 return self
._ceph
_clear
_all
_progress
_events
()
1490 A librados instance to be shared by any classes within
1491 this mgr module that want one.
1496 ctx_capsule
= self
.get_context()
1497 self
._rados
= rados
.Rados(context
=ctx_capsule
)
1498 self
._rados
.connect()
1499 self
._ceph
_register
_client
(self
._rados
.get_addrs())
1505 Implement this function to report whether the module's dependencies
1506 are met. For example, if the module needs to import a particular
1507 dependency to work, then use a try/except around the import at
1508 file scope, and then report here if the import failed.
1510 This will be called in a blocking way from the C++ code, so do not
1511 do any I/O that could block in this function.
1513 :return a 2-tuple consisting of a boolean and explanatory string
1518 def remote(self
, module_name
, method_name
, *args
, **kwargs
):
1520 Invoke a method on another module. All arguments, and the return
1521 value from the other module must be serializable.
1523 Limitation: Do not import any modules within the called method.
1524 Otherwise you will get an error in Python 2::
1526 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1530 :param module_name: Name of other module. If module isn't loaded,
1531 an ImportError exception is raised.
1532 :param method_name: Method name. If it does not exist, a NameError
1533 exception is raised.
1534 :param args: Argument tuple
1535 :param kwargs: Keyword argument dict
1536 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1537 :raises ImportError: No such module
1539 return self
._ceph
_dispatch
_remote
(module_name
, method_name
,
1542 def add_osd_perf_query(self
, query
):
1544 Register an OSD perf query. Argument is a
1545 dict of the query parameters, in this form:
1551 {'type': subkey_type, 'regex': regex_pattern},
1554 'performance_counter_descriptors': [
1555 list, of, descriptor, types
1557 'limit': {'order_by': performance_counter_type, 'max_count': n},
1561 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1562 'pg_id', 'object_name', 'snap_id'
1563 Valid performance counter types:
1564 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1565 'latency', 'write_latency', 'read_latency'
1567 :param object query: query
1568 :rtype: int (query id)
1570 return self
._ceph
_add
_osd
_perf
_query
(query
)
1572 def remove_osd_perf_query(self
, query_id
):
1574 Unregister an OSD perf query.
1576 :param int query_id: query ID
1578 return self
._ceph
_remove
_osd
_perf
_query
(query_id
)
1580 def get_osd_perf_counters(self
, query_id
):
1582 Get stats collected for an OSD perf query.
1584 :param int query_id: query ID
1586 return self
._ceph
_get
_osd
_perf
_counters
(query_id
)
1588 def is_authorized(self
, arguments
):
1590 Verifies that the current session caps permit executing the py service
1591 or current module with the provided arguments. This provides a generic
1592 way to allow modules to restrict by more fine-grained controls (e.g.
1595 :param arguments: dict of key/value arguments to test
1597 return self
._ceph
_is
_authorized
(arguments
)