]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
2 import ceph_module
# noqa
3 #import ceph_osdmap #noqa
4 #import ceph_osdmap_incremental #noqa
5 #import ceph_crushmap #noqa
10 from collections
import defaultdict
13 class CPlusPlusHandler(logging
.Handler
):
14 def __init__(self
, module_inst
):
15 super(CPlusPlusHandler
, self
).__init
__()
16 self
._module
= module_inst
18 def emit(self
, record
):
19 if record
.levelno
<= logging
.DEBUG
:
21 elif record
.levelno
<= logging
.INFO
:
23 elif record
.levelno
<= logging
.WARNING
:
28 self
._module
._ceph
_log
(ceph_level
, self
.format(record
))
31 def configure_logger(module_inst
, name
):
32 logger
= logging
.getLogger(name
)
35 # Don't filter any logs at the python level, leave it to C++
36 logger
.setLevel(logging
.DEBUG
)
38 # FIXME: we should learn the log level from C++ land, and then
39 # avoid calling the C++ level log when we know a message is of
40 # an insufficient level to be ultimately output
41 logger
.addHandler(CPlusPlusHandler(module_inst
))
46 def unconfigure_logger(module_inst
, name
):
47 logger
= logging
.getLogger(name
)
48 rm_handlers
= [h
for h
in logger
.handlers
if isinstance(h
, CPlusPlusHandler
)]
50 logger
.removeHandler(h
)
52 class CommandResult(object):
54 Use with MgrModule.send_command
56 def __init__(self
, tag
):
57 self
.ev
= threading
.Event()
62 # This is just a convenience for notifications from
63 # C++ land, to avoid passing addresses around in messages.
66 def complete(self
, r
, outb
, outs
):
74 return self
.r
, self
.outb
, self
.outs
77 class OSDMap(ceph_module
.BasePyOSDMap
):
79 return self
._get
_epoch
()
81 def get_crush_version(self
):
82 return self
._get
_crush
_version
()
87 def new_incremental(self
):
88 return self
._new
_incremental
()
90 def apply_incremental(self
, inc
):
91 return self
._apply
_incremental
(inc
)
94 return self
._get
_crush
()
96 def get_pools_by_take(self
, take
):
97 return self
._get
_pools
_by
_take
(take
).get('pools', [])
99 def calc_pg_upmaps(self
, inc
,
100 max_deviation
=.01, max_iterations
=10, pools
=[]):
101 return self
._calc
_pg
_upmaps
(
103 max_deviation
, max_iterations
, pools
)
105 def map_pool_pgs_up(self
, poolid
):
106 return self
._map
_pool
_pgs
_up
(poolid
)
108 class OSDMapIncremental(ceph_module
.BasePyOSDMapIncremental
):
110 return self
._get
_epoch
()
115 def set_osd_reweights(self
, weightmap
):
117 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
119 return self
._set
_osd
_reweights
(weightmap
)
121 def set_crush_compat_weight_set_weights(self
, weightmap
):
123 weightmap is a dict, int to float. devices only. e.g.,
124 { 0: 3.4, 1: 3.3, 2: 3.334 }
126 return self
._set
_crush
_compat
_weight
_set
_weights
(weightmap
)
128 class CRUSHMap(ceph_module
.BasePyCRUSH
):
129 ITEM_NONE
= 0x7fffffff
134 def get_item_weight(self
, item
):
135 return self
._get
_item
_weight
(item
)
137 def get_item_name(self
, item
):
138 return self
._get
_item
_name
(item
)
140 def find_takes(self
):
141 return self
._find
_takes
().get('takes', [])
143 def get_take_weight_osd_map(self
, root
):
144 uglymap
= self
._get
_take
_weight
_osd
_map
(root
)
145 return { int(k
): v
for k
, v
in uglymap
.get('weights', {}).iteritems() }
147 class MgrStandbyModule(ceph_module
.BaseMgrStandbyModule
):
149 Standby modules only implement a serve and shutdown method, they
150 are not permitted to implement commands and they do not receive
153 They only have access to the mgrmap (for accessing service URI info
154 from their active peer), and to configuration settings (read only).
157 def __init__(self
, module_name
, capsule
):
158 super(MgrStandbyModule
, self
).__init
__(capsule
)
159 self
.module_name
= module_name
160 self
._logger
= configure_logger(self
, module_name
)
163 unconfigure_logger(self
, self
.module_name
)
171 The serve method is mandatory for standby modules.
174 raise NotImplementedError()
176 def get_mgr_id(self
):
177 return self
._ceph
_get
_mgr
_id
()
179 def get_config(self
, key
, default
=None):
181 Retrieve the value of a persistent configuration setting
184 :param default: the default value of the config if it is not found
187 r
= self
._ceph
_get
_config
(key
)
194 def get_active_uri(self
):
195 return self
._ceph
_get
_active
_uri
()
197 def get_localized_config(self
, key
, default
=None):
198 r
= self
.get_config(self
.get_mgr_id() + '/' + key
)
200 r
= self
.get_config(key
)
206 class MgrModule(ceph_module
.BaseMgrModule
):
209 # Priority definitions for perf counters
213 PRIO_UNINTERESTING
= 2
216 # counter value types
221 PERFCOUNTER_LONGRUNAVG
= 4
222 PERFCOUNTER_COUNTER
= 8
223 PERFCOUNTER_HISTOGRAM
= 0x10
224 PERFCOUNTER_TYPE_MASK
= ~
3
226 def __init__(self
, module_name
, py_modules_ptr
, this_ptr
):
227 self
.module_name
= module_name
229 # If we're taking over from a standby module, let's make sure
230 # its logger was unconfigured before we hook ours up
231 unconfigure_logger(self
, self
.module_name
)
232 self
._logger
= configure_logger(self
, module_name
)
234 super(MgrModule
, self
).__init
__(py_modules_ptr
, this_ptr
)
236 self
._version
= self
._ceph
_get
_version
()
238 self
._perf
_schema
_cache
= None
241 unconfigure_logger(self
, self
.module_name
)
243 def update_perf_schema(self
, daemon_type
, daemon_name
):
245 For plugins that use get_all_perf_counters, call this when
246 receiving a notification of type 'perf_schema_update', to
247 prompt MgrModule to update its cache of counter schemas.
262 def get_context(self
):
264 :return: a Python capsule containing a C++ CephContext pointer
266 return self
._ceph
_get
_context
()
268 def notify(self
, notify_type
, notify_id
):
270 Called by the ceph-mgr service to notify the Python plugin
271 that new state is available.
277 Called by the ceph-mgr service to start any server that
278 is provided by this Python plugin. The implementation
279 of this function should block until ``shutdown`` is called.
281 You *must* implement ``shutdown`` if you implement ``serve``
287 Called by the ceph-mgr service to request that this
288 module drop out of its serve() function. You do not
289 need to implement this if you do not implement serve()
295 def get(self
, data_name
):
297 Called by the plugin to load some cluster state from ceph-mgr
299 return self
._ceph
_get
(data_name
)
301 def _stattype_to_str(self
, stattype
):
303 typeonly
= stattype
& self
.PERFCOUNTER_TYPE_MASK
306 if typeonly
== self
.PERFCOUNTER_LONGRUNAVG
:
307 # this lie matches the DaemonState decoding: only val, no counts
309 if typeonly
== self
.PERFCOUNTER_COUNTER
:
311 if typeonly
== self
.PERFCOUNTER_HISTOGRAM
:
316 def _perfvalue_to_value(self
, stattype
, value
):
317 if stattype
& self
.PERFCOUNTER_TIME
:
318 # Convert from ns to seconds
319 return value
/ 1000000000.0
323 def get_server(self
, hostname
):
325 Called by the plugin to load information about a particular
328 :param hostname: a hostame
330 return self
._ceph
_get
_server
(hostname
)
332 def get_perf_schema(self
, svc_type
, svc_name
):
334 Called by the plugin to fetch perf counter schema info.
335 svc_name can be nullptr, as can svc_type, in which case
340 :return: list of dicts describing the counters requested
342 return self
._ceph
_get
_perf
_schema
(svc_type
, svc_name
)
344 def get_counter(self
, svc_type
, svc_name
, path
):
346 Called by the plugin to fetch data for a particular perf counter
347 on a particular service.
352 :return: A list of two-element lists containing time and value
354 return self
._ceph
_get
_counter
(svc_type
, svc_name
, path
)
356 def list_servers(self
):
358 Like ``get_server``, but instead of returning information
359 about just one node, return all the nodes in an array.
361 return self
._ceph
_get
_server
(None)
363 def get_metadata(self
, svc_type
, svc_id
):
365 Fetch the metadata for a particular service.
367 :param svc_type: string (e.g., 'mds', 'osd', 'mon')
368 :param svc_id: string
371 return self
._ceph
_get
_metadata
(svc_type
, svc_id
)
373 def get_daemon_status(self
, svc_type
, svc_id
):
375 Fetch the latest status for a particular service daemon.
377 :param svc_type: string (e.g., 'rgw')
378 :param svc_id: string
381 return self
._ceph
_get
_daemon
_status
(svc_type
, svc_id
)
383 def send_command(self
, *args
, **kwargs
):
385 Called by the plugin to send a command to the mon
388 self
._ceph
_send
_command
(*args
, **kwargs
)
390 def set_health_checks(self
, checks
):
392 Set module's health checks
394 Set the module's current map of health checks. Argument is a
395 dict of check names to info, in this form:
399 'severity': 'warning', # or 'error'
400 'summary': 'summary string',
401 'detail': [ 'list', 'of', 'detail', 'strings' ],
405 'summary': 'bars are bad',
406 'detail': [ 'too hard' ],
410 :param list: dict of health check dicts
412 self
._ceph
_set
_health
_checks
(checks
)
414 def handle_command(self
, cmd
):
416 Called by ceph-mgr to request the plugin to handle one
417 of the commands that it declared in self.COMMANDS
419 Return a status code, an output buffer, and an
420 output string. The output buffer is for data results,
421 the output string is for informative text.
423 :param cmd: dict, from Ceph's cmdmap_t
425 :return: 3-tuple of (int, str, str)
428 # Should never get called if they didn't declare
430 raise NotImplementedError()
432 def get_mgr_id(self
):
438 return self
._ceph
_get
_mgr
_id
()
440 def get_config(self
, key
, default
=None):
442 Retrieve the value of a persistent configuration setting
447 r
= self
._ceph
_get
_config
(key
)
453 def get_config_prefix(self
, key_prefix
):
455 Retrieve a dict of config values with the given prefix
457 :param key_prefix: str
460 return self
._ceph
_get
_config
_prefix
(key_prefix
)
462 def get_localized_config(self
, key
, default
=None):
464 Retrieve localized configuration for this ceph-mgr instance
469 r
= self
.get_config(self
.get_mgr_id() + '/' + key
)
471 r
= self
.get_config(key
)
477 def set_config(self
, key
, val
):
479 Set the value of a persistent configuration setting
484 self
._ceph
_set
_config
(key
, val
)
486 def set_localized_config(self
, key
, val
):
488 Set localized configuration for this ceph-mgr instance
493 return self
._ceph
_set
_config
(self
.get_mgr_id() + '/' + key
, val
)
495 def set_config_json(self
, key
, val
):
497 Helper for setting json-serialized-config
500 :param val: json-serializable object
502 self
._ceph
_set
_config
(key
, json
.dumps(val
))
504 def get_config_json(self
, key
):
506 Helper for getting json-serialized config
511 raw
= self
.get_config(key
)
515 return json
.loads(raw
)
519 Run a self-test on the module. Override this function and implement
520 a best as possible self-test for (automated) testing of the module
525 def get_osdmap(self
):
527 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
531 return self
._ceph
_get
_osdmap
()
533 def get_all_perf_counters(self
, prio_limit
=PRIO_USEFUL
):
535 Return the perf counters currently known to this ceph-mgr
536 instance, filtered by priority equal to or greater than `prio_limit`.
538 The result us a map of string to dict, associating services
539 (like "osd.123") with their counters. The counter
540 dict for each service maps counter paths to a counter
541 info structure, which is the information from
542 the schema, plus an additional "value" member with the latest
546 result
= defaultdict(dict)
548 # TODO: improve C++->Python interface to return just
549 # the latest if that's all we want.
550 def get_latest(daemon_type
, daemon_name
, counter
):
551 data
= self
.get_counter(daemon_type
, daemon_name
, counter
)[counter
]
557 def get_latest_avg(daemon_type
, daemon_name
, counter
):
558 data
= self
.get_counter(daemon_type
, daemon_name
, counter
)[counter
]
560 return (data
[-1][1], data
[-1][2])
564 for server
in self
.list_servers():
565 for service
in server
['services']:
566 if service
['type'] not in ("rgw", "mds", "osd", "mon"):
569 schema
= self
.get_perf_schema(service
['type'], service
['id'])
571 self
.log
.warn("No perf counter schema for {0}.{1}".format(
572 service
['type'], service
['id']
576 # Value is returned in a potentially-multi-service format,
577 # get just the service we're asking about
578 svc_full_name
= "{0}.{1}".format(service
['type'], service
['id'])
579 schema
= schema
[svc_full_name
]
581 # Populate latest values
582 for counter_path
, counter_schema
in schema
.items():
583 # self.log.debug("{0}: {1}".format(
584 # counter_path, json.dumps(counter_schema)
586 if counter_schema
['priority'] < prio_limit
:
589 counter_info
= dict(counter_schema
)
591 # Also populate count for the long running avgs
592 if counter_schema
['type'] & self
.PERFCOUNTER_LONGRUNAVG
:
593 v
, c
= get_latest_avg(
598 counter_info
['value'], counter_info
['count'] = v
, c
599 result
[svc_full_name
][counter_path
] = counter_info
601 counter_info
['value'] = get_latest(
607 result
[svc_full_name
][counter_path
] = counter_info
609 self
.log
.debug("returning {0} counter".format(len(result
)))
613 def set_uri(self
, uri
):
615 If the module exposes a service, then call this to publish the
616 address once it is available.
620 return self
._ceph
_set
_uri
(uri
)
622 def have_mon_connection(self
):
624 Check whether this ceph-mgr daemon has an open connection
625 to a monitor. If it doesn't, then it's likely that the
626 information we have about the cluster is out of date,
627 and/or the monitor cluster is down.
630 return self
._ceph
_have
_mon
_connection
()