]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
2 import ceph_module
# noqa
3 #import ceph_osdmap #noqa
4 #import ceph_osdmap_incremental #noqa
5 #import ceph_crushmap #noqa
11 from collections
import defaultdict
14 class CPlusPlusHandler(logging
.Handler
):
15 def __init__(self
, module_inst
):
16 super(CPlusPlusHandler
, self
).__init
__()
17 self
._module
= module_inst
19 def emit(self
, record
):
20 if record
.levelno
<= logging
.DEBUG
:
22 elif record
.levelno
<= logging
.INFO
:
24 elif record
.levelno
<= logging
.WARNING
:
29 self
._module
._ceph
_log
(ceph_level
, self
.format(record
))
32 def configure_logger(module_inst
, name
):
33 logger
= logging
.getLogger(name
)
36 # Don't filter any logs at the python level, leave it to C++
37 logger
.setLevel(logging
.DEBUG
)
39 # FIXME: we should learn the log level from C++ land, and then
40 # avoid calling the C++ level log when we know a message is of
41 # an insufficient level to be ultimately output
42 logger
.addHandler(CPlusPlusHandler(module_inst
))
47 def unconfigure_logger(module_inst
, name
):
48 logger
= logging
.getLogger(name
)
49 rm_handlers
= [h
for h
in logger
.handlers
if isinstance(h
, CPlusPlusHandler
)]
51 logger
.removeHandler(h
)
53 class CommandResult(object):
55 Use with MgrModule.send_command
57 def __init__(self
, tag
):
58 self
.ev
= threading
.Event()
63 # This is just a convenience for notifications from
64 # C++ land, to avoid passing addresses around in messages.
67 def complete(self
, r
, outb
, outs
):
75 return self
.r
, self
.outb
, self
.outs
78 class OSDMap(ceph_module
.BasePyOSDMap
):
80 return self
._get
_epoch
()
82 def get_crush_version(self
):
83 return self
._get
_crush
_version
()
88 def new_incremental(self
):
89 return self
._new
_incremental
()
91 def apply_incremental(self
, inc
):
92 return self
._apply
_incremental
(inc
)
95 return self
._get
_crush
()
97 def get_pools_by_take(self
, take
):
98 return self
._get
_pools
_by
_take
(take
).get('pools', [])
100 def calc_pg_upmaps(self
, inc
,
101 max_deviation
=.01, max_iterations
=10, pools
=[]):
102 return self
._calc
_pg
_upmaps
(
104 max_deviation
, max_iterations
, pools
)
106 def map_pool_pgs_up(self
, poolid
):
107 return self
._map
_pool
_pgs
_up
(poolid
)
109 class OSDMapIncremental(ceph_module
.BasePyOSDMapIncremental
):
111 return self
._get
_epoch
()
116 def set_osd_reweights(self
, weightmap
):
118 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
120 return self
._set
_osd
_reweights
(weightmap
)
122 def set_crush_compat_weight_set_weights(self
, weightmap
):
124 weightmap is a dict, int to float. devices only. e.g.,
125 { 0: 3.4, 1: 3.3, 2: 3.334 }
127 return self
._set
_crush
_compat
_weight
_set
_weights
(weightmap
)
129 class CRUSHMap(ceph_module
.BasePyCRUSH
):
130 ITEM_NONE
= 0x7fffffff
135 def get_item_weight(self
, item
):
136 return self
._get
_item
_weight
(item
)
138 def get_item_name(self
, item
):
139 return self
._get
_item
_name
(item
)
141 def find_takes(self
):
142 return self
._find
_takes
().get('takes', [])
144 def get_take_weight_osd_map(self
, root
):
145 uglymap
= self
._get
_take
_weight
_osd
_map
(root
)
146 return { int(k
): v
for k
, v
in six
.iteritems(uglymap
.get('weights', {})) }
148 class MgrStandbyModule(ceph_module
.BaseMgrStandbyModule
):
150 Standby modules only implement a serve and shutdown method, they
151 are not permitted to implement commands and they do not receive
154 They only have access to the mgrmap (for accessing service URI info
155 from their active peer), and to configuration settings (read only).
158 def __init__(self
, module_name
, capsule
):
159 super(MgrStandbyModule
, self
).__init
__(capsule
)
160 self
.module_name
= module_name
161 self
._logger
= configure_logger(self
, module_name
)
164 unconfigure_logger(self
, self
.module_name
)
172 The serve method is mandatory for standby modules.
175 raise NotImplementedError()
177 def get_mgr_id(self
):
178 return self
._ceph
_get
_mgr
_id
()
180 def get_config(self
, key
, default
=None):
182 Retrieve the value of a persistent configuration setting
185 :param default: the default value of the config if it is not found
188 r
= self
._ceph
_get
_config
(key
)
195 def get_active_uri(self
):
196 return self
._ceph
_get
_active
_uri
()
198 def get_localized_config(self
, key
, default
=None):
199 r
= self
.get_config(self
.get_mgr_id() + '/' + key
)
201 r
= self
.get_config(key
)
207 class MgrModule(ceph_module
.BaseMgrModule
):
210 # Priority definitions for perf counters
214 PRIO_UNINTERESTING
= 2
217 # counter value types
222 PERFCOUNTER_LONGRUNAVG
= 4
223 PERFCOUNTER_COUNTER
= 8
224 PERFCOUNTER_HISTOGRAM
= 0x10
225 PERFCOUNTER_TYPE_MASK
= ~
3
231 def __init__(self
, module_name
, py_modules_ptr
, this_ptr
):
232 self
.module_name
= module_name
234 # If we're taking over from a standby module, let's make sure
235 # its logger was unconfigured before we hook ours up
236 unconfigure_logger(self
, self
.module_name
)
237 self
._logger
= configure_logger(self
, module_name
)
239 super(MgrModule
, self
).__init
__(py_modules_ptr
, this_ptr
)
241 self
._version
= self
._ceph
_get
_version
()
243 self
._perf
_schema
_cache
= None
246 unconfigure_logger(self
, self
.module_name
)
248 def update_perf_schema(self
, daemon_type
, daemon_name
):
250 For plugins that use get_all_perf_counters, call this when
251 receiving a notification of type 'perf_schema_update', to
252 prompt MgrModule to update its cache of counter schemas.
267 def get_context(self
):
269 :return: a Python capsule containing a C++ CephContext pointer
271 return self
._ceph
_get
_context
()
273 def notify(self
, notify_type
, notify_id
):
275 Called by the ceph-mgr service to notify the Python plugin
276 that new state is available.
282 Called by the ceph-mgr service to start any server that
283 is provided by this Python plugin. The implementation
284 of this function should block until ``shutdown`` is called.
286 You *must* implement ``shutdown`` if you implement ``serve``
292 Called by the ceph-mgr service to request that this
293 module drop out of its serve() function. You do not
294 need to implement this if you do not implement serve()
300 def get(self
, data_name
):
302 Called by the plugin to load some cluster state from ceph-mgr
304 return self
._ceph
_get
(data_name
)
306 def _stattype_to_str(self
, stattype
):
308 typeonly
= stattype
& self
.PERFCOUNTER_TYPE_MASK
311 if typeonly
== self
.PERFCOUNTER_LONGRUNAVG
:
312 # this lie matches the DaemonState decoding: only val, no counts
314 if typeonly
== self
.PERFCOUNTER_COUNTER
:
316 if typeonly
== self
.PERFCOUNTER_HISTOGRAM
:
321 def _perfvalue_to_value(self
, stattype
, value
):
322 if stattype
& self
.PERFCOUNTER_TIME
:
323 # Convert from ns to seconds
324 return value
/ 1000000000.0
328 def _unit_to_str(self
, unit
):
329 if unit
== self
.NONE
:
331 elif unit
== self
.BYTES
:
334 def get_server(self
, hostname
):
336 Called by the plugin to load information about a particular
339 :param hostname: a hostame
341 return self
._ceph
_get
_server
(hostname
)
343 def get_perf_schema(self
, svc_type
, svc_name
):
345 Called by the plugin to fetch perf counter schema info.
346 svc_name can be nullptr, as can svc_type, in which case
351 :return: list of dicts describing the counters requested
353 return self
._ceph
_get
_perf
_schema
(svc_type
, svc_name
)
355 def get_counter(self
, svc_type
, svc_name
, path
):
357 Called by the plugin to fetch data for a particular perf counter
358 on a particular service.
363 :return: A list of two-element lists containing time and value
365 return self
._ceph
_get
_counter
(svc_type
, svc_name
, path
)
367 def list_servers(self
):
369 Like ``get_server``, but instead of returning information
370 about just one node, return all the nodes in an array.
372 return self
._ceph
_get
_server
(None)
374 def get_metadata(self
, svc_type
, svc_id
):
376 Fetch the metadata for a particular service.
378 :param svc_type: string (e.g., 'mds', 'osd', 'mon')
379 :param svc_id: string
382 return self
._ceph
_get
_metadata
(svc_type
, svc_id
)
384 def get_daemon_status(self
, svc_type
, svc_id
):
386 Fetch the latest status for a particular service daemon.
388 :param svc_type: string (e.g., 'rgw')
389 :param svc_id: string
392 return self
._ceph
_get
_daemon
_status
(svc_type
, svc_id
)
394 def send_command(self
, *args
, **kwargs
):
396 Called by the plugin to send a command to the mon
399 self
._ceph
_send
_command
(*args
, **kwargs
)
401 def set_health_checks(self
, checks
):
403 Set module's health checks
405 Set the module's current map of health checks. Argument is a
406 dict of check names to info, in this form:
410 'severity': 'warning', # or 'error'
411 'summary': 'summary string',
412 'detail': [ 'list', 'of', 'detail', 'strings' ],
416 'summary': 'bars are bad',
417 'detail': [ 'too hard' ],
421 :param list: dict of health check dicts
423 self
._ceph
_set
_health
_checks
(checks
)
425 def handle_command(self
, cmd
):
427 Called by ceph-mgr to request the plugin to handle one
428 of the commands that it declared in self.COMMANDS
430 Return a status code, an output buffer, and an
431 output string. The output buffer is for data results,
432 the output string is for informative text.
434 :param cmd: dict, from Ceph's cmdmap_t
436 :return: 3-tuple of (int, str, str)
439 # Should never get called if they didn't declare
441 raise NotImplementedError()
443 def get_mgr_id(self
):
449 return self
._ceph
_get
_mgr
_id
()
451 def get_config(self
, key
, default
=None):
453 Retrieve the value of a persistent configuration setting
458 r
= self
._ceph
_get
_config
(key
)
464 def get_config_prefix(self
, key_prefix
):
466 Retrieve a dict of config values with the given prefix
468 :param key_prefix: str
471 return self
._ceph
_get
_config
_prefix
(key_prefix
)
473 def get_localized_config(self
, key
, default
=None):
475 Retrieve localized configuration for this ceph-mgr instance
480 r
= self
.get_config(self
.get_mgr_id() + '/' + key
)
482 r
= self
.get_config(key
)
488 def set_config(self
, key
, val
):
490 Set the value of a persistent configuration setting
495 self
._ceph
_set
_config
(key
, val
)
497 def set_localized_config(self
, key
, val
):
499 Set localized configuration for this ceph-mgr instance
504 return self
._ceph
_set
_config
(self
.get_mgr_id() + '/' + key
, val
)
506 def set_config_json(self
, key
, val
):
508 Helper for setting json-serialized-config
511 :param val: json-serializable object
513 self
._ceph
_set
_config
(key
, json
.dumps(val
))
515 def get_config_json(self
, key
):
517 Helper for getting json-serialized config
522 raw
= self
.get_config(key
)
526 return json
.loads(raw
)
530 Run a self-test on the module. Override this function and implement
531 a best as possible self-test for (automated) testing of the module
536 def get_osdmap(self
):
538 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
542 return self
._ceph
_get
_osdmap
()
544 def get_all_perf_counters(self
, prio_limit
=PRIO_USEFUL
):
546 Return the perf counters currently known to this ceph-mgr
547 instance, filtered by priority equal to or greater than `prio_limit`.
549 The result us a map of string to dict, associating services
550 (like "osd.123") with their counters. The counter
551 dict for each service maps counter paths to a counter
552 info structure, which is the information from
553 the schema, plus an additional "value" member with the latest
557 result
= defaultdict(dict)
559 # TODO: improve C++->Python interface to return just
560 # the latest if that's all we want.
561 def get_latest(daemon_type
, daemon_name
, counter
):
562 data
= self
.get_counter(daemon_type
, daemon_name
, counter
)[counter
]
568 def get_latest_avg(daemon_type
, daemon_name
, counter
):
569 data
= self
.get_counter(daemon_type
, daemon_name
, counter
)[counter
]
571 return (data
[-1][1], data
[-1][2])
575 for server
in self
.list_servers():
576 for service
in server
['services']:
577 if service
['type'] not in ("rgw", "mds", "osd", "mon"):
580 schema
= self
.get_perf_schema(service
['type'], service
['id'])
582 self
.log
.warn("No perf counter schema for {0}.{1}".format(
583 service
['type'], service
['id']
587 # Value is returned in a potentially-multi-service format,
588 # get just the service we're asking about
589 svc_full_name
= "{0}.{1}".format(service
['type'], service
['id'])
590 schema
= schema
[svc_full_name
]
592 # Populate latest values
593 for counter_path
, counter_schema
in schema
.items():
594 # self.log.debug("{0}: {1}".format(
595 # counter_path, json.dumps(counter_schema)
597 if counter_schema
['priority'] < prio_limit
:
600 counter_info
= dict(counter_schema
)
602 # Also populate count for the long running avgs
603 if counter_schema
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
604 v
, c
= get_latest_avg(
609 counter_info
['value'], counter_info
['count'] = v
, c
610 result
[svc_full_name
][counter_path
] = counter_info
612 counter_info
['value'] = get_latest(
618 result
[svc_full_name
][counter_path
] = counter_info
620 self
.log
.debug("returning {0} counter".format(len(result
)))
624 def set_uri(self
, uri
):
626 If the module exposes a service, then call this to publish the
627 address once it is available.
631 return self
._ceph
_set
_uri
(uri
)
633 def have_mon_connection(self
):
635 Check whether this ceph-mgr daemon has an open connection
636 to a monitor. If it doesn't, then it's likely that the
637 information we have about the cluster is out of date,
638 and/or the monitor cluster is down.
641 return self
._ceph
_have
_mon
_connection
()