]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
1 import ceph_module
# noqa
4 from typing
import Set
, Tuple
, Iterator
, Any
6 # just for type checking
12 from collections
import defaultdict
, namedtuple
50 class CPlusPlusHandler(logging
.Handler
):
51 def __init__(self
, module_inst
):
52 super(CPlusPlusHandler
, self
).__init
__()
53 self
._module
= module_inst
55 def emit(self
, record
):
56 if record
.levelno
<= logging
.DEBUG
:
58 elif record
.levelno
<= logging
.INFO
:
60 elif record
.levelno
<= logging
.WARNING
:
65 self
._module
._ceph
_log
(ceph_level
, self
.format(record
))
68 def configure_logger(module_inst
, module_name
):
70 Create and configure the logger with the specified module.
72 A handler will be added to the root logger which will redirect
73 the messages from all loggers (incl. 3rd party libraries) to the
76 :param module_inst: The module instance.
77 :type module_inst: instance
78 :param module_name: The module name.
79 :type module_name: str
80 :return: Return the logger with the specified name.
82 logger
= logging
.getLogger(module_name
)
83 # Don't filter any logs at the python level, leave it to C++.
84 # FIXME: We should learn the log level from C++ land, and then
85 # avoid calling the C++ level log when we know a message
86 # is of an insufficient level to be ultimately output.
87 logger
.setLevel(logging
.DEBUG
) # Don't use NOTSET
89 root_logger
= logging
.getLogger()
90 # Add handler to the root logger, thus this module and all
91 # 3rd party libraries will log their messages to the Ceph log.
92 root_logger
.addHandler(CPlusPlusHandler(module_inst
))
93 # Set the log level to ``ERROR`` to ensure that we only get
94 # those message from 3rd party libraries (only effective if
95 # they use the default log level ``NOTSET``).
96 # Check https://docs.python.org/3/library/logging.html#logging.Logger.setLevel
97 # for more information about how the effective log level is
99 root_logger
.setLevel(logging
.ERROR
)
104 def unconfigure_logger(module_name
=None):
106 :param module_name: The module name. Defaults to ``None``.
107 :type module_name: str or None
109 logger
= logging
.getLogger(module_name
)
111 h
for h
in logger
.handlers
if isinstance(h
, CPlusPlusHandler
)]
112 for h
in rm_handlers
:
113 logger
.removeHandler(h
)
116 class CommandResult(object):
118 Use with MgrModule.send_command
121 def __init__(self
, tag
=None):
122 self
.ev
= threading
.Event()
127 # This is just a convenience for notifications from
128 # C++ land, to avoid passing addresses around in messages.
129 self
.tag
= tag
if tag
else ""
131 def complete(self
, r
, outb
, outs
):
139 return self
.r
, self
.outb
, self
.outs
142 class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])):
143 def __new__(cls
, retval
=0, stdout
="", stderr
=""):
145 Tuple containing the result of `handle_command()`
147 Only write to stderr if there is an error, or in extraordinary circumstances
149 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
150 is critical information to include there.
152 Everything programmatically consumable should be put on stdout
154 :param retval: return code. E.g. 0 or -errno.EINVAL
156 :param stdout: data of this result.
158 :param stderr: Typically used for error messages.
161 return super(HandleCommandResult
, cls
).__new
__(cls
, retval
, stdout
, stderr
)
164 class OSDMap(ceph_module
.BasePyOSDMap
):
166 return self
._get
_epoch
()
168 def get_crush_version(self
):
169 return self
._get
_crush
_version
()
175 # FIXME: efficient implementation
177 return dict([(p
['pool'], p
) for p
in d
['pools']])
179 def get_pools_by_name(self
):
180 # FIXME: efficient implementation
182 return dict([(p
['pool_name'], p
) for p
in d
['pools']])
184 def new_incremental(self
):
185 return self
._new
_incremental
()
187 def apply_incremental(self
, inc
):
188 return self
._apply
_incremental
(inc
)
191 return self
._get
_crush
()
193 def get_pools_by_take(self
, take
):
194 return self
._get
_pools
_by
_take
(take
).get('pools', [])
196 def calc_pg_upmaps(self
, inc
,
197 max_deviation
=.01, max_iterations
=10, pools
=None):
200 return self
._calc
_pg
_upmaps
(
202 max_deviation
, max_iterations
, pools
)
204 def map_pool_pgs_up(self
, poolid
):
205 return self
._map
_pool
_pgs
_up
(poolid
)
207 def pg_to_up_acting_osds(self
, pool_id
, ps
):
208 return self
._pg
_to
_up
_acting
_osds
(pool_id
, ps
)
210 def pool_raw_used_rate(self
, pool_id
):
211 return self
._pool
_raw
_used
_rate
(pool_id
)
213 def get_ec_profile(self
, name
):
214 # FIXME: efficient implementation
216 return d
['erasure_code_profiles'].get(name
, None)
219 class OSDMapIncremental(ceph_module
.BasePyOSDMapIncremental
):
221 return self
._get
_epoch
()
226 def set_osd_reweights(self
, weightmap
):
228 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
230 return self
._set
_osd
_reweights
(weightmap
)
232 def set_crush_compat_weight_set_weights(self
, weightmap
):
234 weightmap is a dict, int to float. devices only. e.g.,
235 { 0: 3.4, 1: 3.3, 2: 3.334 }
237 return self
._set
_crush
_compat
_weight
_set
_weights
(weightmap
)
240 class CRUSHMap(ceph_module
.BasePyCRUSH
):
241 ITEM_NONE
= 0x7fffffff
242 DEFAULT_CHOOSE_ARGS
= '-1'
247 def get_item_weight(self
, item
):
248 return self
._get
_item
_weight
(item
)
250 def get_item_name(self
, item
):
251 return self
._get
_item
_name
(item
)
253 def find_takes(self
):
254 return self
._find
_takes
().get('takes', [])
256 def get_take_weight_osd_map(self
, root
):
257 uglymap
= self
._get
_take
_weight
_osd
_map
(root
)
258 return {int(k
): v
for k
, v
in six
.iteritems(uglymap
.get('weights', {}))}
261 def have_default_choose_args(dump
):
262 return CRUSHMap
.DEFAULT_CHOOSE_ARGS
in dump
.get('choose_args', {})
265 def get_default_choose_args(dump
):
266 return dump
.get('choose_args').get(CRUSHMap
.DEFAULT_CHOOSE_ARGS
, [])
268 def get_rule(self
, rule_name
):
269 # TODO efficient implementation
270 for rule
in self
.dump()['rules']:
271 if rule_name
== rule
['rule_name']:
276 def get_rule_by_id(self
, rule_id
):
277 for rule
in self
.dump()['rules']:
278 if rule
['rule_id'] == rule_id
:
283 def get_rule_root(self
, rule_name
):
284 rule
= self
.get_rule(rule_name
)
289 first_take
= [s
for s
in rule
['steps'] if s
['op'] == 'take'][0]
291 self
.log
.warn("CRUSH rule '{0}' has no 'take' step".format(
295 return first_take
['item']
297 def get_osds_under(self
, root_id
):
298 # TODO don't abuse dump like this
300 buckets
= dict([(b
['id'], b
) for b
in d
['buckets']])
305 for item
in b
['items']:
307 osd_list
.append(item
['id'])
310 accumulate(buckets
[item
['id']])
314 accumulate(buckets
[root_id
])
318 def device_class_counts(self
):
319 result
= defaultdict(int)
320 # TODO don't abuse dump like this
322 for device
in d
['devices']:
323 cls
= device
.get('class', None)
329 class CLICommand(object):
332 def __init__(self
, prefix
, args
="", desc
="", perm
="rw"):
341 def _parse_args(self
):
344 args
= self
.args
.split(" ")
346 arg_desc
= arg
.strip().split(",")
353 self
.args_dict
[v
] = arg_d
355 def __call__(self
, func
):
357 self
.COMMANDS
[self
.prefix
] = self
360 def call(self
, mgr
, cmd_dict
, inbuf
):
362 for a
, d
in self
.args_dict
.items():
363 if 'req' in d
and d
['req'] == "false" and a
not in cmd_dict
:
365 kwargs
[a
.replace("-", "_")] = cmd_dict
[a
]
367 kwargs
['inbuf'] = inbuf
368 return self
.func(mgr
, **kwargs
)
371 def dump_cmd_list(cls
):
373 'cmd': '{} {}'.format(cmd
.prefix
, cmd
.args
),
376 } for _
, cmd
in cls
.COMMANDS
.items()]
379 def CLIReadCommand(prefix
, args
="", desc
=""):
380 return CLICommand(prefix
, args
, desc
, "r")
383 def CLIWriteCommand(prefix
, args
="", desc
=""):
384 return CLICommand(prefix
, args
, desc
, "w")
387 def _get_localized_key(prefix
, key
):
388 return '{}/{}'.format(prefix
, key
)
393 Helper class to declare options for MODULE_OPTIONS list.
395 Caveat: it uses argument names matching Python keywords (type, min, max),
396 so any further processing should happen in a separate method.
398 TODO: type validation.
405 desc
=None, longdesc
=None,
412 super(Option
, self
).__init
__(
413 (k
, v
) for k
, v
in vars().items()
414 if k
!= 'self' and v
is not None)
417 class MgrStandbyModule(ceph_module
.BaseMgrStandbyModule
):
419 Standby modules only implement a serve and shutdown method, they
420 are not permitted to implement commands and they do not receive
423 They only have access to the mgrmap (for accessing service URI info
424 from their active peer), and to configuration settings (read only).
428 MODULE_OPTION_DEFAULTS
= {}
430 def __init__(self
, module_name
, capsule
):
431 super(MgrStandbyModule
, self
).__init
__(capsule
)
432 self
.module_name
= module_name
433 self
._logger
= configure_logger(self
, module_name
)
434 # see also MgrModule.__init__()
435 for o
in self
.MODULE_OPTIONS
:
438 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
440 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
451 The serve method is mandatory for standby modules.
454 raise NotImplementedError()
456 def get_mgr_id(self
):
457 return self
._ceph
_get
_mgr
_id
()
459 def get_module_option(self
, key
, default
=None):
461 Retrieve the value of a persistent configuration setting
464 :param default: the default value of the config if it is not found
467 r
= self
._ceph
_get
_module
_option
(key
)
469 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
473 def get_ceph_option(self
, key
):
474 return self
._ceph
_get
_option
(key
)
476 def get_store(self
, key
):
478 Retrieve the value of a persistent KV store entry
481 :return: Byte string or None
483 return self
._ceph
_get
_store
(key
)
485 def get_active_uri(self
):
486 return self
._ceph
_get
_active
_uri
()
488 def get_localized_module_option(self
, key
, default
=None):
489 r
= self
._ceph
_get
_module
_option
(key
, self
.get_mgr_id())
491 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
496 class MgrModule(ceph_module
.BaseMgrModule
):
499 MODULE_OPTION_DEFAULTS
= {}
501 # Priority definitions for perf counters
505 PRIO_UNINTERESTING
= 2
508 # counter value types
513 PERFCOUNTER_LONGRUNAVG
= 4
514 PERFCOUNTER_COUNTER
= 8
515 PERFCOUNTER_HISTOGRAM
= 0x10
516 PERFCOUNTER_TYPE_MASK
= ~
3
522 # Cluster log priorities
523 CLUSTER_LOG_PRIO_DEBUG
= 0
524 CLUSTER_LOG_PRIO_INFO
= 1
525 CLUSTER_LOG_PRIO_SEC
= 2
526 CLUSTER_LOG_PRIO_WARN
= 3
527 CLUSTER_LOG_PRIO_ERROR
= 4
529 def __init__(self
, module_name
, py_modules_ptr
, this_ptr
):
530 self
.module_name
= module_name
532 # If we're taking over from a standby module, let's make sure
533 # its logger was unconfigured before we hook ours up
535 self
._logger
= configure_logger(self
, module_name
)
537 super(MgrModule
, self
).__init
__(py_modules_ptr
, this_ptr
)
539 self
._version
= self
._ceph
_get
_version
()
541 self
._perf
_schema
_cache
= None
543 # Keep a librados instance for those that need it.
546 for o
in self
.MODULE_OPTIONS
:
549 # we'll assume the declared type matches the
550 # supplied default value's type.
551 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
553 # module not declaring it's type, so normalize the
554 # default value to be a string for consistent behavior
555 # with default and user-supplied option values.
556 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
562 def _register_commands(cls
):
563 cls
.COMMANDS
.extend(CLICommand
.dump_cmd_list())
569 def cluster_log(self
, channel
, priority
, message
):
571 :param channel: The log channel. This can be 'cluster', 'audit', ...
573 :param priority: The log message priority. This can be
574 CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO,
575 CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or
576 CLUSTER_LOG_PRIO_ERROR.
578 :param message: The message to log.
581 self
._ceph
_cluster
_log
(channel
, priority
, message
)
587 def get_context(self
):
589 :return: a Python capsule containing a C++ CephContext pointer
591 return self
._ceph
_get
_context
()
593 def notify(self
, notify_type
, notify_id
):
595 Called by the ceph-mgr service to notify the Python plugin
596 that new state is available.
598 :param notify_type: string indicating what kind of notification,
599 such as osd_map, mon_map, fs_map, mon_status,
600 health, pg_summary, command, service_map
601 :param notify_id: string (may be empty) that optionally specifies
602 which entity is being notified about. With
603 "command" notifications this is set to the tag
604 ``from send_command``.
608 def config_notify(self
):
610 Called by the ceph-mgr service to notify the Python plugin
611 that the configuration may have changed. Modules will want to
612 refresh any configuration values stored in config variables.
618 Called by the ceph-mgr service to start any server that
619 is provided by this Python plugin. The implementation
620 of this function should block until ``shutdown`` is called.
622 You *must* implement ``shutdown`` if you implement ``serve``
628 Called by the ceph-mgr service to request that this
629 module drop out of its serve() function. You do not
630 need to implement this if you do not implement serve()
635 self
._rados
.shutdown()
637 def get(self
, data_name
):
639 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
641 :param str data_name: Valid things to fetch are osd_crush_map_text,
642 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
643 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
644 health, mon_status, devices, device <devid>.
647 All these structures have their own JSON representations: experiment
648 or look at the C++ ``dump()`` methods to learn about them.
650 return self
._ceph
_get
(data_name
)
652 def _stattype_to_str(self
, stattype
):
654 typeonly
= stattype
& self
.PERFCOUNTER_TYPE_MASK
657 if typeonly
== self
.PERFCOUNTER_LONGRUNAVG
:
658 # this lie matches the DaemonState decoding: only val, no counts
660 if typeonly
== self
.PERFCOUNTER_COUNTER
:
662 if typeonly
== self
.PERFCOUNTER_HISTOGRAM
:
667 def _perfpath_to_path_labels(self
, daemon
, path
):
668 label_names
= ("ceph_daemon",)
671 if daemon
.startswith('rbd-mirror.'):
673 r
'^rbd_mirror_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
677 path
= 'rbd_mirror_' + match
.group(4)
678 pool
= match
.group(1)
679 namespace
= match
.group(2) or ''
680 image
= match
.group(3)
681 label_names
+= ('pool', 'namespace', 'image')
682 labels
+= (pool
, namespace
, image
)
684 return path
, label_names
, labels
,
686 def _perfvalue_to_value(self
, stattype
, value
):
687 if stattype
& self
.PERFCOUNTER_TIME
:
688 # Convert from ns to seconds
689 return value
/ 1000000000.0
693 def _unit_to_str(self
, unit
):
694 if unit
== self
.NONE
:
696 elif unit
== self
.BYTES
:
700 def to_pretty_iec(n
):
701 for bits
, suffix
in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
702 (20, 'Mi'), (10, 'Ki')]:
704 return str(n
>> bits
) + ' ' + suffix
708 def get_pretty_row(elems
, width
):
710 Takes an array of elements and returns a string with those elements
711 formatted as a table row. Useful for polling modules.
713 :param elems: the elements to be printed
714 :param width: the width of the terminal
717 column_width
= int(width
/ n
)
721 ret
+= '{0:>{w}} |'.format(elem
, w
=column_width
- 2)
725 def get_pretty_header(self
, elems
, width
):
727 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
729 :param elems: the elements to be printed
730 :param width: the width of the terminal
733 column_width
= int(width
/ n
)
737 for i
in range(0, n
):
738 ret
+= '-' * (column_width
- 1) + '+'
742 ret
+= self
.get_pretty_row(elems
, width
)
747 for i
in range(0, n
):
748 ret
+= '-' * (column_width
- 1) + '+'
753 def get_server(self
, hostname
):
755 Called by the plugin to fetch metadata about a particular hostname from
758 This is information that ceph-mgr has gleaned from the daemon metadata
759 reported by daemons running on a particular server.
761 :param hostname: a hostname
763 return self
._ceph
_get
_server
(hostname
)
765 def get_perf_schema(self
, svc_type
, svc_name
):
767 Called by the plugin to fetch perf counter schema info.
768 svc_name can be nullptr, as can svc_type, in which case
773 :return: list of dicts describing the counters requested
775 return self
._ceph
_get
_perf
_schema
(svc_type
, svc_name
)
777 def get_counter(self
, svc_type
, svc_name
, path
):
779 Called by the plugin to fetch the latest performance counter data for a
780 particular counter on a particular service.
784 :param str path: a period-separated concatenation of the subsystem and the
785 counter name, for example "mds.inodes".
786 :return: A list of two-tuples of (timestamp, value) is returned. This may be
787 empty if no data is available.
789 return self
._ceph
_get
_counter
(svc_type
, svc_name
, path
)
791 def get_latest_counter(self
, svc_type
, svc_name
, path
):
793 Called by the plugin to fetch only the newest performance counter data
794 pointfor a particular counter on a particular service.
798 :param str path: a period-separated concatenation of the subsystem and the
799 counter name, for example "mds.inodes".
800 :return: A list of two-tuples of (timestamp, value) is returned. This may be
801 empty if no data is available.
803 return self
._ceph
_get
_latest
_counter
(svc_type
, svc_name
, path
)
805 def list_servers(self
):
807 Like ``get_server``, but gives information about all servers (i.e. all
808 unique hostnames that have been mentioned in daemon metadata)
810 :return: a list of information about all servers
813 return self
._ceph
_get
_server
(None)
815 def get_metadata(self
, svc_type
, svc_id
):
817 Fetch the daemon metadata for a particular service.
819 ceph-mgr fetches metadata asynchronously, so are windows of time during
820 addition/removal of services where the metadata is not available to
821 modules. ``None`` is returned if no metadata is available.
823 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
824 :param str svc_id: service id. convert OSD integer IDs to strings when
826 :rtype: dict, or None if no metadata found
828 return self
._ceph
_get
_metadata
(svc_type
, svc_id
)
830 def get_daemon_status(self
, svc_type
, svc_id
):
832 Fetch the latest status for a particular service daemon.
834 This method may return ``None`` if no status information is
835 available, for example because the daemon hasn't fully started yet.
837 :param svc_type: string (e.g., 'rgw')
838 :param svc_id: string
839 :return: dict, or None if the service is not found
841 return self
._ceph
_get
_daemon
_status
(svc_type
, svc_id
)
843 def mon_command(self
, cmd_dict
):
845 Helper for modules that do simple, synchronous mon command
848 See send_command for general case.
850 :return: status int, out std, err str
854 result
= CommandResult()
855 self
.send_command(result
, "mon", "", json
.dumps(cmd_dict
), "")
859 self
.log
.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
860 cmd_dict
['prefix'], r
[0], t2
- t1
865 def send_command(self
, *args
, **kwargs
):
867 Called by the plugin to send a command to the mon
870 :param CommandResult result: an instance of the ``CommandResult``
871 class, defined in the same module as MgrModule. This acts as a
872 completion and stores the output of the command. Use
873 ``CommandResult.wait()`` if you want to block on completion.
876 :param str command: a JSON-serialized command. This uses the same
877 format as the ceph command line, which is a dictionary of command
878 arguments, with the extra ``prefix`` key containing the command
879 name itself. Consult MonCommands.h for available commands and
880 their expected arguments.
881 :param str tag: used for nonblocking operation: when a command
882 completes, the ``notify()`` callback on the MgrModule instance is
883 triggered, with notify_type set to "command", and notify_id set to
884 the tag of the command.
886 self
._ceph
_send
_command
(*args
, **kwargs
)
888 def set_health_checks(self
, checks
):
890 Set the module's current map of health checks. Argument is a
891 dict of check names to info, in this form:
897 'severity': 'warning', # or 'error'
898 'summary': 'summary string',
899 'detail': [ 'list', 'of', 'detail', 'strings' ],
903 'summary': 'bars are bad',
904 'detail': [ 'too hard' ],
908 :param list: dict of health check dicts
910 self
._ceph
_set
_health
_checks
(checks
)
912 def _handle_command(self
, inbuf
, cmd
):
913 if cmd
['prefix'] not in CLICommand
.COMMANDS
:
914 return self
.handle_command(inbuf
, cmd
)
915 return CLICommand
.COMMANDS
[cmd
['prefix']].call(self
, cmd
, inbuf
)
917 def handle_command(self
, inbuf
, cmd
):
919 Called by ceph-mgr to request the plugin to handle one
920 of the commands that it declared in self.COMMANDS
922 Return a status code, an output buffer, and an
923 output string. The output buffer is for data results,
924 the output string is for informative text.
926 :param inbuf: content of any "-i <file>" supplied to ceph cli
928 :param cmd: from Ceph's cmdmap_t
931 :return: HandleCommandResult or a 3-tuple of (int, str, str)
934 # Should never get called if they didn't declare
936 raise NotImplementedError()
938 def get_mgr_id(self
):
940 Retrieve the name of the manager daemon where this plugin
941 is currently being executed (i.e. the active manager).
945 return self
._ceph
_get
_mgr
_id
()
947 def get_ceph_option(self
, key
):
948 return self
._ceph
_get
_option
(key
)
950 def _validate_module_option(self
, key
):
952 Helper: don't allow get/set config callers to
953 access config options that they didn't declare
956 if key
not in [o
['name'] for o
in self
.MODULE_OPTIONS
]:
957 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
958 format(key
, self
.__class
__.__name
__))
960 def _get_module_option(self
, key
, default
, localized_prefix
=""):
961 r
= self
._ceph
_get
_module
_option
(self
.module_name
, key
,
964 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
968 def get_module_option(self
, key
, default
=None):
970 Retrieve the value of a persistent configuration setting
976 self
._validate
_module
_option
(key
)
977 return self
._get
_module
_option
(key
, default
)
979 def get_module_option_ex(self
, module
, key
, default
=None):
981 Retrieve the value of a persistent configuration setting
982 for the specified module.
984 :param str module: The name of the module, e.g. 'dashboard'
986 :param str key: The configuration key, e.g. 'server_addr'.
987 :param str,None default: The default value to use when the
988 returned value is ``None``. Defaults to ``None``.
989 :return: str,int,bool,float,None
991 if module
== self
.module_name
:
992 self
._validate
_module
_option
(key
)
993 r
= self
._ceph
_get
_module
_option
(module
, key
)
994 return default
if r
is None else r
996 def get_store_prefix(self
, key_prefix
):
998 Retrieve a dict of KV store keys to values, where the keys
999 have the given prefix
1001 :param str key_prefix:
1004 return self
._ceph
_get
_store
_prefix
(key_prefix
)
1006 def _set_localized(self
, key
, val
, setter
):
1007 return setter(_get_localized_key(self
.get_mgr_id(), key
), val
)
1009 def get_localized_module_option(self
, key
, default
=None):
1011 Retrieve localized configuration for this ceph-mgr instance
1016 self
._validate
_module
_option
(key
)
1017 return self
._get
_module
_option
(key
, default
, self
.get_mgr_id())
1019 def _set_module_option(self
, key
, val
):
1020 return self
._ceph
_set
_module
_option
(self
.module_name
, key
, str(val
))
1022 def set_module_option(self
, key
, val
):
1024 Set the value of a persistent configuration setting
1027 :type val: str | None
1029 self
._validate
_module
_option
(key
)
1030 return self
._set
_module
_option
(key
, val
)
1032 def set_module_option_ex(self
, module
, key
, val
):
1034 Set the value of a persistent configuration setting
1035 for the specified module.
1041 if module
== self
.module_name
:
1042 self
._validate
_module
_option
(key
)
1043 return self
._ceph
_set
_module
_option
(module
, key
, str(val
))
1045 def set_localized_module_option(self
, key
, val
):
1047 Set localized configuration for this ceph-mgr instance
1052 self
._validate
_module
_option
(key
)
1053 return self
._set
_localized
(key
, val
, self
._set
_module
_option
)
1055 def set_store(self
, key
, val
):
1057 Set a value in this module's persistent key value store.
1058 If val is None, remove key from store
1063 self
._ceph
_set
_store
(key
, val
)
1065 def get_store(self
, key
, default
=None):
1067 Get a value from this module's persistent key value store
1069 r
= self
._ceph
_get
_store
(key
)
1075 def get_localized_store(self
, key
, default
=None):
1076 r
= self
._ceph
_get
_store
(_get_localized_key(self
.get_mgr_id(), key
))
1078 r
= self
._ceph
_get
_store
(key
)
1083 def set_localized_store(self
, key
, val
):
1084 return self
._set
_localized
(key
, val
, self
.set_store
)
1086 def self_test(self
):
1088 Run a self-test on the module. Override this function and implement
1089 a best as possible self-test for (automated) testing of the module
1091 Indicate any failures by raising an exception. This does not have
1092 to be pretty, it's mainly for picking up regressions during
1093 development, rather than use in the field.
1095 :return: None, or an advisory string for developer interest, such
1096 as a json dump of some state.
1100 def get_osdmap(self
):
1102 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1106 return self
._ceph
_get
_osdmap
()
1108 def get_latest(self
, daemon_type
, daemon_name
, counter
):
1109 data
= self
.get_latest_counter(
1110 daemon_type
, daemon_name
, counter
)[counter
]
1116 def get_latest_avg(self
, daemon_type
, daemon_name
, counter
):
1117 data
= self
.get_latest_counter(
1118 daemon_type
, daemon_name
, counter
)[counter
]
1120 return data
[1], data
[2]
1124 def get_all_perf_counters(self
, prio_limit
=PRIO_USEFUL
,
1125 services
=("mds", "mon", "osd",
1126 "rbd-mirror", "rgw", "tcmu-runner")):
1128 Return the perf counters currently known to this ceph-mgr
1129 instance, filtered by priority equal to or greater than `prio_limit`.
1131 The result is a map of string to dict, associating services
1132 (like "osd.123") with their counters. The counter
1133 dict for each service maps counter paths to a counter
1134 info structure, which is the information from
1135 the schema, plus an additional "value" member with the latest
1139 result
= defaultdict(dict)
1141 for server
in self
.list_servers():
1142 for service
in server
['services']:
1143 if service
['type'] not in services
:
1146 schema
= self
.get_perf_schema(service
['type'], service
['id'])
1148 self
.log
.warn("No perf counter schema for {0}.{1}".format(
1149 service
['type'], service
['id']
1153 # Value is returned in a potentially-multi-service format,
1154 # get just the service we're asking about
1155 svc_full_name
= "{0}.{1}".format(
1156 service
['type'], service
['id'])
1157 schema
= schema
[svc_full_name
]
1159 # Populate latest values
1160 for counter_path
, counter_schema
in schema
.items():
1161 # self.log.debug("{0}: {1}".format(
1162 # counter_path, json.dumps(counter_schema)
1164 if counter_schema
['priority'] < prio_limit
:
1167 counter_info
= dict(counter_schema
)
1169 # Also populate count for the long running avgs
1170 if counter_schema
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
1171 v
, c
= self
.get_latest_avg(
1176 counter_info
['value'], counter_info
['count'] = v
, c
1177 result
[svc_full_name
][counter_path
] = counter_info
1179 counter_info
['value'] = self
.get_latest(
1185 result
[svc_full_name
][counter_path
] = counter_info
1187 self
.log
.debug("returning {0} counter".format(len(result
)))
1191 def set_uri(self
, uri
):
1193 If the module exposes a service, then call this to publish the
1194 address once it is available.
1198 return self
._ceph
_set
_uri
(uri
)
1200 def have_mon_connection(self
):
1202 Check whether this ceph-mgr daemon has an open connection
1203 to a monitor. If it doesn't, then it's likely that the
1204 information we have about the cluster is out of date,
1205 and/or the monitor cluster is down.
1208 return self
._ceph
_have
_mon
_connection
()
1210 def update_progress_event(self
, evid
, desc
, progress
):
1211 return self
._ceph
_update
_progress
_event
(str(evid
), str(desc
), float(progress
))
1213 def complete_progress_event(self
, evid
):
1214 return self
._ceph
_complete
_progress
_event
(str(evid
))
1216 def clear_all_progress_events(self
):
1217 return self
._ceph
_clear
_all
_progress
_events
()
1222 A librados instance to be shared by any classes within
1223 this mgr module that want one.
1228 ctx_capsule
= self
.get_context()
1229 self
._rados
= rados
.Rados(context
=ctx_capsule
)
1230 self
._rados
.connect()
1237 Implement this function to report whether the module's dependencies
1238 are met. For example, if the module needs to import a particular
1239 dependency to work, then use a try/except around the import at
1240 file scope, and then report here if the import failed.
1242 This will be called in a blocking way from the C++ code, so do not
1243 do any I/O that could block in this function.
1245 :return a 2-tuple consisting of a boolean and explanatory string
1250 def remote(self
, module_name
, method_name
, *args
, **kwargs
):
1252 Invoke a method on another module. All arguments, and the return
1253 value from the other module must be serializable.
1255 Limitation: Do not import any modules within the called method.
1256 Otherwise you will get an error in Python 2::
1258 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
1262 :param module_name: Name of other module. If module isn't loaded,
1263 an ImportError exception is raised.
1264 :param method_name: Method name. If it does not exist, a NameError
1265 exception is raised.
1266 :param args: Argument tuple
1267 :param kwargs: Keyword argument dict
1268 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
1269 :raises ImportError: No such module
1271 return self
._ceph
_dispatch
_remote
(module_name
, method_name
,
1274 def add_osd_perf_query(self
, query
):
1276 Register an OSD perf query. Argument is a
1277 dict of the query parameters, in this form:
1283 {'type': subkey_type, 'regex': regex_pattern},
1286 'performance_counter_descriptors': [
1287 list, of, descriptor, types
1289 'limit': {'order_by': performance_counter_type, 'max_count': n},
1293 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
1294 'pg_id', 'object_name', 'snap_id'
1295 Valid performance counter types:
1296 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
1297 'latency', 'write_latency', 'read_latency'
1299 :param object query: query
1300 :rtype: int (query id)
1302 return self
._ceph
_add
_osd
_perf
_query
(query
)
1304 def remove_osd_perf_query(self
, query_id
):
1306 Unregister an OSD perf query.
1308 :param int query_id: query ID
1310 return self
._ceph
_remove
_osd
_perf
_query
(query_id
)
1312 def get_osd_perf_counters(self
, query_id
):
1314 Get stats collected for an OSD perf query.
1316 :param int query_id: query ID
1318 return self
._ceph
_get
_osd
_perf
_counters
(query_id
)
1321 class PersistentStoreDict(object):
1322 def __init__(self
, mgr
, prefix
):
1323 # type: (MgrModule, str) -> None
1325 self
.prefix
= prefix
+ '.'
1327 def _mk_store_key(self
, key
):
1328 return self
.prefix
+ key
1330 def __missing__(self
, key
):
1331 # KeyError won't work for the `in` operator.
1332 # https://docs.python.org/3/reference/expressions.html#membership-test-details
1333 raise IndexError('PersistentStoreDict: "{}" not found'.format(key
))
1336 # Don't make any assumptions about the content of the values.
1337 for item
in six
.iteritems(self
.mgr
.get_store_prefix(self
.prefix
)):
1339 self
.mgr
.set_store(k
, None)
1341 def __getitem__(self
, item
):
1342 # type: (str) -> Any
1343 key
= self
._mk
_store
_key
(item
)
1345 val
= self
.mgr
.get_store(key
)
1347 self
.__missing
__(key
)
1348 return json
.loads(val
)
1349 except (KeyError, AttributeError, IndexError, ValueError, TypeError):
1350 logging
.getLogger(__name__
).exception('failed to deserialize')
1351 self
.mgr
.set_store(key
, None)
1354 def __setitem__(self
, item
, value
):
1355 # type: (str, Any) -> None
1357 value=None is not allowed, as it will remove the key.
1359 key
= self
._mk
_store
_key
(item
)
1360 self
.mgr
.set_store(key
, json
.dumps(value
) if value
is not None else None)
1362 def __delitem__(self
, item
):
1366 return len(self
.keys())
1369 # type: () -> Iterator[Tuple[str, Any]]
1370 prefix_len
= len(self
.prefix
)
1372 for item
in six
.iteritems(self
.mgr
.get_store_prefix(self
.prefix
)):
1374 yield k
[prefix_len
:], json
.loads(v
)
1375 except (KeyError, AttributeError, IndexError, ValueError, TypeError):
1376 logging
.getLogger(__name__
).exception('failed to deserialize')
1380 # type: () -> Set[str]
1381 return {item
[0] for item
in self
.items()}
1384 return iter(self
.keys())