]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
2 import ceph_module
# noqa
3 #import ceph_osdmap #noqa
4 #import ceph_osdmap_incremental #noqa
5 #import ceph_crushmap #noqa
10 from collections
import defaultdict
13 class CPlusPlusHandler(logging
.Handler
):
14 def __init__(self
, module_inst
):
15 super(CPlusPlusHandler
, self
).__init
__()
16 self
._module
= module_inst
18 def emit(self
, record
):
19 if record
.levelno
<= logging
.DEBUG
:
21 elif record
.levelno
<= logging
.INFO
:
23 elif record
.levelno
<= logging
.WARNING
:
28 self
._module
._ceph
_log
(ceph_level
, self
.format(record
))
31 def configure_logger(module_inst
, name
):
32 logger
= logging
.getLogger(name
)
35 # Don't filter any logs at the python level, leave it to C++
36 logger
.setLevel(logging
.DEBUG
)
38 # FIXME: we should learn the log level from C++ land, and then
39 # avoid calling the C++ level log when we know a message is of
40 # an insufficient level to be ultimately output
41 logger
.addHandler(CPlusPlusHandler(module_inst
))
46 def unconfigure_logger(module_inst
, name
):
47 logger
= logging
.getLogger(name
)
48 rm_handlers
= [h
for h
in logger
.handlers
if isinstance(h
, CPlusPlusHandler
)]
50 logger
.removeHandler(h
)
52 class CommandResult(object):
54 Use with MgrModule.send_command
56 def __init__(self
, tag
):
57 self
.ev
= threading
.Event()
62 # This is just a convenience for notifications from
63 # C++ land, to avoid passing addresses around in messages.
66 def complete(self
, r
, outb
, outs
):
74 return self
.r
, self
.outb
, self
.outs
77 class OSDMap(ceph_module
.BasePyOSDMap
):
79 return self
._get
_epoch
()
81 def get_crush_version(self
):
82 return self
._get
_crush
_version
()
87 def new_incremental(self
):
88 return self
._new
_incremental
()
90 def apply_incremental(self
, inc
):
91 return self
._apply
_incremental
(inc
)
94 return self
._get
_crush
()
96 def get_pools_by_take(self
, take
):
97 return self
._get
_pools
_by
_take
(take
).get('pools', [])
99 def calc_pg_upmaps(self
, inc
,
100 max_deviation
=.01, max_iterations
=10, pools
=[]):
101 return self
._calc
_pg
_upmaps
(
103 max_deviation
, max_iterations
, pools
)
105 def map_pool_pgs_up(self
, poolid
):
106 return self
._map
_pool
_pgs
_up
(poolid
)
108 class OSDMapIncremental(ceph_module
.BasePyOSDMapIncremental
):
110 return self
._get
_epoch
()
115 def set_osd_reweights(self
, weightmap
):
117 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
119 return self
._set
_osd
_reweights
(weightmap
)
121 def set_crush_compat_weight_set_weights(self
, weightmap
):
123 weightmap is a dict, int to float. devices only. e.g.,
124 { 0: 3.4, 1: 3.3, 2: 3.334 }
126 return self
._set
_crush
_compat
_weight
_set
_weights
(weightmap
)
128 class CRUSHMap(ceph_module
.BasePyCRUSH
):
132 def get_item_weight(self
, item
):
133 return self
._get
_item
_weight
(item
)
135 def get_item_name(self
, item
):
136 return self
._get
_item
_name
(item
)
138 def find_takes(self
):
139 return self
._find
_takes
().get('takes', [])
141 def get_take_weight_osd_map(self
, root
):
142 uglymap
= self
._get
_take
_weight
_osd
_map
(root
)
143 return { int(k
): v
for k
, v
in uglymap
.get('weights', {}).iteritems() }
145 class MgrStandbyModule(ceph_module
.BaseMgrStandbyModule
):
147 Standby modules only implement a serve and shutdown method, they
148 are not permitted to implement commands and they do not receive
151 They only have access to the mgrmap (for acecssing service URI info
152 from their active peer), and to configuration settings (read only).
155 def __init__(self
, module_name
, capsule
):
156 super(MgrStandbyModule
, self
).__init
__(capsule
)
157 self
.module_name
= module_name
158 self
._logger
= configure_logger(self
, module_name
)
161 unconfigure_logger(self
, self
.module_name
)
169 The serve method is mandatory for standby modules.
172 raise NotImplementedError()
174 def get_mgr_id(self
):
175 return self
._ceph
_get
_mgr
_id
()
177 def get_config(self
, key
):
178 return self
._ceph
_get
_config
(key
)
180 def get_active_uri(self
):
181 return self
._ceph
_get
_active
_uri
()
183 def get_localized_config(self
, key
, default
=None):
184 r
= self
.get_config(self
.get_mgr_id() + '/' + key
)
186 r
= self
.get_config(key
)
192 class MgrModule(ceph_module
.BaseMgrModule
):
195 # Priority definitions for perf counters
199 PRIO_UNINTERESTING
= 2
202 # counter value types
207 PERFCOUNTER_LONGRUNAVG
= 4
208 PERFCOUNTER_COUNTER
= 8
209 PERFCOUNTER_HISTOGRAM
= 0x10
210 PERFCOUNTER_TYPE_MASK
= ~
2
212 def __init__(self
, module_name
, py_modules_ptr
, this_ptr
):
213 self
.module_name
= module_name
215 # If we're taking over from a standby module, let's make sure
216 # its logger was unconfigured before we hook ours up
217 unconfigure_logger(self
, self
.module_name
)
218 self
._logger
= configure_logger(self
, module_name
)
220 super(MgrModule
, self
).__init
__(py_modules_ptr
, this_ptr
)
222 self
._version
= self
._ceph
_get
_version
()
224 self
._perf
_schema
_cache
= None
227 unconfigure_logger(self
, self
.module_name
)
229 def update_perf_schema(self
, daemon_type
, daemon_name
):
231 For plugins that use get_all_perf_counters, call this when
232 receiving a notification of type 'perf_schema_update', to
233 prompt MgrModule to update its cache of counter schemas.
248 def get_context(self
):
250 :return: a Python capsule containing a C++ CephContext pointer
252 return self
._ceph
_get
_context
()
254 def notify(self
, notify_type
, notify_id
):
256 Called by the ceph-mgr service to notify the Python plugin
257 that new state is available.
263 Called by the ceph-mgr service to start any server that
264 is provided by this Python plugin. The implementation
265 of this function should block until ``shutdown`` is called.
267 You *must* implement ``shutdown`` if you implement ``serve``
273 Called by the ceph-mgr service to request that this
274 module drop out of its serve() function. You do not
275 need to implement this if you do not implement serve()
281 def get(self
, data_name
):
283 Called by the plugin to load some cluster state from ceph-mgr
285 return self
._ceph
_get
(data_name
)
287 def get_server(self
, hostname
):
289 Called by the plugin to load information about a particular
292 :param hostname: a hostame
294 return self
._ceph
_get
_server
(hostname
)
296 def get_perf_schema(self
, svc_type
, svc_name
):
298 Called by the plugin to fetch perf counter schema info.
299 svc_name can be nullptr, as can svc_type, in which case
304 :return: list of dicts describing the counters requested
306 return self
._ceph
_get
_perf
_schema
(svc_type
, svc_name
)
308 def get_counter(self
, svc_type
, svc_name
, path
):
310 Called by the plugin to fetch data for a particular perf counter
311 on a particular service.
316 :return: A list of two-element lists containing time and value
318 return self
._ceph
_get
_counter
(svc_type
, svc_name
, path
)
320 def list_servers(self
):
322 Like ``get_server``, but instead of returning information
323 about just one node, return all the nodes in an array.
325 return self
._ceph
_get
_server
(None)
327 def get_metadata(self
, svc_type
, svc_id
):
329 Fetch the metadata for a particular service.
331 :param svc_type: string (e.g., 'mds', 'osd', 'mon')
332 :param svc_id: string
335 return self
._ceph
_get
_metadata
(svc_type
, svc_id
)
337 def get_daemon_status(self
, svc_type
, svc_id
):
339 Fetch the latest status for a particular service daemon.
341 :param svc_type: string (e.g., 'rgw')
342 :param svc_id: string
345 return self
._ceph
_get
_daemon
_status
(svc_type
, svc_id
)
347 def send_command(self
, *args
, **kwargs
):
349 Called by the plugin to send a command to the mon
352 self
._ceph
_send
_command
(*args
, **kwargs
)
354 def set_health_checks(self
, checks
):
356 Set module's health checks
358 Set the module's current map of health checks. Argument is a
359 dict of check names to info, in this form:
363 'severity': 'warning', # or 'error'
364 'summary': 'summary string',
365 'detail': [ 'list', 'of', 'detail', 'strings' ],
369 'summary': 'bars are bad',
370 'detail': [ 'too hard' ],
374 :param list: dict of health check dicts
376 self
._ceph
_set
_health
_checks
(checks
)
378 def handle_command(self
, cmd
):
380 Called by ceph-mgr to request the plugin to handle one
381 of the commands that it declared in self.COMMANDS
383 Return a status code, an output buffer, and an
384 output string. The output buffer is for data results,
385 the output string is for informative text.
387 :param cmd: dict, from Ceph's cmdmap_t
389 :return: 3-tuple of (int, str, str)
392 # Should never get called if they didn't declare
394 raise NotImplementedError()
396 def get_mgr_id(self
):
402 return self
._ceph
_get
_mgr
_id
()
404 def get_config(self
, key
, default
=None):
406 Retrieve the value of a persistent configuration setting
411 r
= self
._ceph
_get
_config
(key
)
417 def get_config_prefix(self
, key_prefix
):
419 Retrieve a dict of config values with the given prefix
421 :param key_prefix: str
424 return self
._ceph
_get
_config
_prefix
(key_prefix
)
426 def get_localized_config(self
, key
, default
=None):
428 Retrieve localized configuration for this ceph-mgr instance
433 r
= self
.get_config(self
.get_mgr_id() + '/' + key
)
435 r
= self
.get_config(key
)
441 def set_config(self
, key
, val
):
443 Set the value of a persistent configuration setting
448 self
._ceph
_set
_config
(key
, val
)
450 def set_localized_config(self
, key
, val
):
452 Set localized configuration for this ceph-mgr instance
457 return self
._ceph
_set
_config
(self
.get_mgr_id() + '/' + key
, val
)
459 def set_config_json(self
, key
, val
):
461 Helper for setting json-serialized-config
464 :param val: json-serializable object
466 self
._ceph
_set
_config
(key
, json
.dumps(val
))
468 def get_config_json(self
, key
):
470 Helper for getting json-serialized config
475 raw
= self
.get_config(key
)
479 return json
.loads(raw
)
483 Run a self-test on the module. Override this function and implement
484 a best as possible self-test for (automated) testing of the module
489 def get_osdmap(self
):
491 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
495 return self
._ceph
_get
_osdmap
()
497 def get_all_perf_counters(self
, prio_limit
=PRIO_USEFUL
):
499 Return the perf counters currently known to this ceph-mgr
500 instance, filtered by priority equal to or greater than `prio_limit`.
502 The result us a map of string to dict, associating services
503 (like "osd.123") with their counters. The counter
504 dict for each service maps counter paths to a counter
505 info structure, which is the information from
506 the schema, plus an additional "value" member with the latest
510 result
= defaultdict(dict)
512 # TODO: improve C++->Python interface to return just
513 # the latest if that's all we want.
514 def get_latest(daemon_type
, daemon_name
, counter
):
515 data
= self
.get_counter(daemon_type
, daemon_name
, counter
)[counter
]
521 for server
in self
.list_servers():
522 for service
in server
['services']:
523 if service
['type'] not in ("mds", "osd", "mon"):
526 schema
= self
.get_perf_schema(service
['type'], service
['id'])
528 self
.log
.warn("No perf counter schema for {0}.{1}".format(
529 service
['type'], service
['id']
533 # Value is returned in a potentially-multi-service format,
534 # get just the service we're asking about
535 svc_full_name
= "{0}.{1}".format(service
['type'], service
['id'])
536 schema
= schema
[svc_full_name
]
538 # Populate latest values
539 for counter_path
, counter_schema
in schema
.items():
540 # self.log.debug("{0}: {1}".format(
541 # counter_path, json.dumps(counter_schema)
543 if counter_schema
['priority'] < prio_limit
:
546 counter_info
= counter_schema
547 counter_info
['value'] = get_latest(service
['type'], service
['id'], counter_path
)
548 result
[svc_full_name
][counter_path
] = counter_info
550 self
.log
.debug("returning {0} counter".format(len(result
)))
554 def set_uri(self
, uri
):
556 If the module exposes a service, then call this to publish the
557 address once it is available.
561 return self
._ceph
_set
_uri
(uri
)