]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mgr_module.py
update sources to 12.2.7
[ceph.git] / ceph / src / pybind / mgr / mgr_module.py
1
2 import ceph_module # noqa
3 #import ceph_osdmap #noqa
4 #import ceph_osdmap_incremental #noqa
5 #import ceph_crushmap #noqa
6
7 import json
8 import logging
9 import threading
10 from collections import defaultdict
11
12
13 class CPlusPlusHandler(logging.Handler):
14 def __init__(self, module_inst):
15 super(CPlusPlusHandler, self).__init__()
16 self._module = module_inst
17
18 def emit(self, record):
19 if record.levelno <= logging.DEBUG:
20 ceph_level = 20
21 elif record.levelno <= logging.INFO:
22 ceph_level = 4
23 elif record.levelno <= logging.WARNING:
24 ceph_level = 1
25 else:
26 ceph_level = 0
27
28 self._module._ceph_log(ceph_level, self.format(record))
29
30
31 def configure_logger(module_inst, name):
32 logger = logging.getLogger(name)
33
34
35 # Don't filter any logs at the python level, leave it to C++
36 logger.setLevel(logging.DEBUG)
37
38 # FIXME: we should learn the log level from C++ land, and then
39 # avoid calling the C++ level log when we know a message is of
40 # an insufficient level to be ultimately output
41 logger.addHandler(CPlusPlusHandler(module_inst))
42
43 return logger
44
45
46 def unconfigure_logger(module_inst, name):
47 logger = logging.getLogger(name)
48 rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)]
49 for h in rm_handlers:
50 logger.removeHandler(h)
51
52 class CommandResult(object):
53 """
54 Use with MgrModule.send_command
55 """
56 def __init__(self, tag):
57 self.ev = threading.Event()
58 self.outs = ""
59 self.outb = ""
60 self.r = 0
61
62 # This is just a convenience for notifications from
63 # C++ land, to avoid passing addresses around in messages.
64 self.tag = tag
65
66 def complete(self, r, outb, outs):
67 self.r = r
68 self.outb = outb
69 self.outs = outs
70 self.ev.set()
71
72 def wait(self):
73 self.ev.wait()
74 return self.r, self.outb, self.outs
75
76
77 class OSDMap(ceph_module.BasePyOSDMap):
78 def get_epoch(self):
79 return self._get_epoch()
80
81 def get_crush_version(self):
82 return self._get_crush_version()
83
84 def dump(self):
85 return self._dump()
86
87 def new_incremental(self):
88 return self._new_incremental()
89
90 def apply_incremental(self, inc):
91 return self._apply_incremental(inc)
92
93 def get_crush(self):
94 return self._get_crush()
95
96 def get_pools_by_take(self, take):
97 return self._get_pools_by_take(take).get('pools', [])
98
99 def calc_pg_upmaps(self, inc,
100 max_deviation=.01, max_iterations=10, pools=[]):
101 return self._calc_pg_upmaps(
102 inc,
103 max_deviation, max_iterations, pools)
104
105 def map_pool_pgs_up(self, poolid):
106 return self._map_pool_pgs_up(poolid)
107
108 class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental):
109 def get_epoch(self):
110 return self._get_epoch()
111
112 def dump(self):
113 return self._dump()
114
115 def set_osd_reweights(self, weightmap):
116 """
117 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
118 """
119 return self._set_osd_reweights(weightmap)
120
121 def set_crush_compat_weight_set_weights(self, weightmap):
122 """
123 weightmap is a dict, int to float. devices only. e.g.,
124 { 0: 3.4, 1: 3.3, 2: 3.334 }
125 """
126 return self._set_crush_compat_weight_set_weights(weightmap)
127
128 class CRUSHMap(ceph_module.BasePyCRUSH):
129 ITEM_NONE = 0x7fffffff
130
131 def dump(self):
132 return self._dump()
133
134 def get_item_weight(self, item):
135 return self._get_item_weight(item)
136
137 def get_item_name(self, item):
138 return self._get_item_name(item)
139
140 def find_takes(self):
141 return self._find_takes().get('takes', [])
142
143 def get_take_weight_osd_map(self, root):
144 uglymap = self._get_take_weight_osd_map(root)
145 return { int(k): v for k, v in uglymap.get('weights', {}).iteritems() }
146
147 class MgrStandbyModule(ceph_module.BaseMgrStandbyModule):
148 """
149 Standby modules only implement a serve and shutdown method, they
150 are not permitted to implement commands and they do not receive
151 any notifications.
152
153 They only have access to the mgrmap (for accessing service URI info
154 from their active peer), and to configuration settings (read only).
155 """
156
157 def __init__(self, module_name, capsule):
158 super(MgrStandbyModule, self).__init__(capsule)
159 self.module_name = module_name
160 self._logger = configure_logger(self, module_name)
161
162 def __del__(self):
163 unconfigure_logger(self, self.module_name)
164
165 @property
166 def log(self):
167 return self._logger
168
169 def serve(self):
170 """
171 The serve method is mandatory for standby modules.
172 :return:
173 """
174 raise NotImplementedError()
175
176 def get_mgr_id(self):
177 return self._ceph_get_mgr_id()
178
179 def get_config(self, key, default=None):
180 """
181 Retrieve the value of a persistent configuration setting
182
183 :param str key:
184 :param default: the default value of the config if it is not found
185 :return: str
186 """
187 r = self._ceph_get_config(key)
188 if r is None:
189 return default
190 else:
191 return r
192
193
194 def get_active_uri(self):
195 return self._ceph_get_active_uri()
196
197 def get_localized_config(self, key, default=None):
198 r = self.get_config(self.get_mgr_id() + '/' + key)
199 if r is None:
200 r = self.get_config(key)
201
202 if r is None:
203 r = default
204 return r
205
206 class MgrModule(ceph_module.BaseMgrModule):
207 COMMANDS = []
208
209 # Priority definitions for perf counters
210 PRIO_CRITICAL = 10
211 PRIO_INTERESTING = 8
212 PRIO_USEFUL = 5
213 PRIO_UNINTERESTING = 2
214 PRIO_DEBUGONLY = 0
215
216 # counter value types
217 PERFCOUNTER_TIME = 1
218 PERFCOUNTER_U64 = 2
219
220 # counter types
221 PERFCOUNTER_LONGRUNAVG = 4
222 PERFCOUNTER_COUNTER = 8
223 PERFCOUNTER_HISTOGRAM = 0x10
224 PERFCOUNTER_TYPE_MASK = ~3
225
226 def __init__(self, module_name, py_modules_ptr, this_ptr):
227 self.module_name = module_name
228
229 # If we're taking over from a standby module, let's make sure
230 # its logger was unconfigured before we hook ours up
231 unconfigure_logger(self, self.module_name)
232 self._logger = configure_logger(self, module_name)
233
234 super(MgrModule, self).__init__(py_modules_ptr, this_ptr)
235
236 self._version = self._ceph_get_version()
237
238 self._perf_schema_cache = None
239
240 def __del__(self):
241 unconfigure_logger(self, self.module_name)
242
243 def update_perf_schema(self, daemon_type, daemon_name):
244 """
245 For plugins that use get_all_perf_counters, call this when
246 receiving a notification of type 'perf_schema_update', to
247 prompt MgrModule to update its cache of counter schemas.
248
249 :param daemon_type:
250 :param daemon_name:
251 :return:
252 """
253
254 @property
255 def log(self):
256 return self._logger
257
258 @property
259 def version(self):
260 return self._version
261
262 def get_context(self):
263 """
264 :return: a Python capsule containing a C++ CephContext pointer
265 """
266 return self._ceph_get_context()
267
268 def notify(self, notify_type, notify_id):
269 """
270 Called by the ceph-mgr service to notify the Python plugin
271 that new state is available.
272 """
273 pass
274
275 def serve(self):
276 """
277 Called by the ceph-mgr service to start any server that
278 is provided by this Python plugin. The implementation
279 of this function should block until ``shutdown`` is called.
280
281 You *must* implement ``shutdown`` if you implement ``serve``
282 """
283 pass
284
285 def shutdown(self):
286 """
287 Called by the ceph-mgr service to request that this
288 module drop out of its serve() function. You do not
289 need to implement this if you do not implement serve()
290
291 :return: None
292 """
293 pass
294
295 def get(self, data_name):
296 """
297 Called by the plugin to load some cluster state from ceph-mgr
298 """
299 return self._ceph_get(data_name)
300
301 def _stattype_to_str(self, stattype):
302
303 typeonly = stattype & self.PERFCOUNTER_TYPE_MASK
304 if typeonly == 0:
305 return 'gauge'
306 if typeonly == self.PERFCOUNTER_LONGRUNAVG:
307 # this lie matches the DaemonState decoding: only val, no counts
308 return 'counter'
309 if typeonly == self.PERFCOUNTER_COUNTER:
310 return 'counter'
311 if typeonly == self.PERFCOUNTER_HISTOGRAM:
312 return 'histogram'
313
314 return ''
315
316 def _perfvalue_to_value(self, stattype, value):
317 if stattype & self.PERFCOUNTER_TIME:
318 # Convert from ns to seconds
319 return value / 1000000000.0
320 else:
321 return value
322
323 def get_server(self, hostname):
324 """
325 Called by the plugin to load information about a particular
326 node from ceph-mgr.
327
328 :param hostname: a hostame
329 """
330 return self._ceph_get_server(hostname)
331
332 def get_perf_schema(self, svc_type, svc_name):
333 """
334 Called by the plugin to fetch perf counter schema info.
335 svc_name can be nullptr, as can svc_type, in which case
336 they are wildcards
337
338 :param svc_type:
339 :param svc_name:
340 :return: list of dicts describing the counters requested
341 """
342 return self._ceph_get_perf_schema(svc_type, svc_name)
343
344 def get_counter(self, svc_type, svc_name, path):
345 """
346 Called by the plugin to fetch data for a particular perf counter
347 on a particular service.
348
349 :param svc_type:
350 :param svc_name:
351 :param path:
352 :return: A list of two-element lists containing time and value
353 """
354 return self._ceph_get_counter(svc_type, svc_name, path)
355
356 def list_servers(self):
357 """
358 Like ``get_server``, but instead of returning information
359 about just one node, return all the nodes in an array.
360 """
361 return self._ceph_get_server(None)
362
363 def get_metadata(self, svc_type, svc_id):
364 """
365 Fetch the metadata for a particular service.
366
367 :param svc_type: string (e.g., 'mds', 'osd', 'mon')
368 :param svc_id: string
369 :return: dict
370 """
371 return self._ceph_get_metadata(svc_type, svc_id)
372
373 def get_daemon_status(self, svc_type, svc_id):
374 """
375 Fetch the latest status for a particular service daemon.
376
377 :param svc_type: string (e.g., 'rgw')
378 :param svc_id: string
379 :return: dict
380 """
381 return self._ceph_get_daemon_status(svc_type, svc_id)
382
383 def send_command(self, *args, **kwargs):
384 """
385 Called by the plugin to send a command to the mon
386 cluster.
387 """
388 self._ceph_send_command(*args, **kwargs)
389
390 def set_health_checks(self, checks):
391 """
392 Set module's health checks
393
394 Set the module's current map of health checks. Argument is a
395 dict of check names to info, in this form:
396
397 {
398 'CHECK_FOO': {
399 'severity': 'warning', # or 'error'
400 'summary': 'summary string',
401 'detail': [ 'list', 'of', 'detail', 'strings' ],
402 },
403 'CHECK_BAR': {
404 'severity': 'error',
405 'summary': 'bars are bad',
406 'detail': [ 'too hard' ],
407 },
408 }
409
410 :param list: dict of health check dicts
411 """
412 self._ceph_set_health_checks(checks)
413
414 def handle_command(self, cmd):
415 """
416 Called by ceph-mgr to request the plugin to handle one
417 of the commands that it declared in self.COMMANDS
418
419 Return a status code, an output buffer, and an
420 output string. The output buffer is for data results,
421 the output string is for informative text.
422
423 :param cmd: dict, from Ceph's cmdmap_t
424
425 :return: 3-tuple of (int, str, str)
426 """
427
428 # Should never get called if they didn't declare
429 # any ``COMMANDS``
430 raise NotImplementedError()
431
432 def get_mgr_id(self):
433 """
434 Retrieve the mgr id.
435
436 :return: str
437 """
438 return self._ceph_get_mgr_id()
439
440 def get_config(self, key, default=None):
441 """
442 Retrieve the value of a persistent configuration setting
443
444 :param key: str
445 :return: str
446 """
447 r = self._ceph_get_config(key)
448 if r is None:
449 return default
450 else:
451 return r
452
453 def get_config_prefix(self, key_prefix):
454 """
455 Retrieve a dict of config values with the given prefix
456
457 :param key_prefix: str
458 :return: str
459 """
460 return self._ceph_get_config_prefix(key_prefix)
461
462 def get_localized_config(self, key, default=None):
463 """
464 Retrieve localized configuration for this ceph-mgr instance
465 :param key: str
466 :param default: str
467 :return: str
468 """
469 r = self.get_config(self.get_mgr_id() + '/' + key)
470 if r is None:
471 r = self.get_config(key)
472
473 if r is None:
474 r = default
475 return r
476
477 def set_config(self, key, val):
478 """
479 Set the value of a persistent configuration setting
480
481 :param key: str
482 :param val: str
483 """
484 self._ceph_set_config(key, val)
485
486 def set_localized_config(self, key, val):
487 """
488 Set localized configuration for this ceph-mgr instance
489 :param key: str
490 :param default: str
491 :return: str
492 """
493 return self._ceph_set_config(self.get_mgr_id() + '/' + key, val)
494
495 def set_config_json(self, key, val):
496 """
497 Helper for setting json-serialized-config
498
499 :param key: str
500 :param val: json-serializable object
501 """
502 self._ceph_set_config(key, json.dumps(val))
503
504 def get_config_json(self, key):
505 """
506 Helper for getting json-serialized config
507
508 :param key: str
509 :return: object
510 """
511 raw = self.get_config(key)
512 if raw is None:
513 return None
514 else:
515 return json.loads(raw)
516
517 def self_test(self):
518 """
519 Run a self-test on the module. Override this function and implement
520 a best as possible self-test for (automated) testing of the module
521 :return: bool
522 """
523 pass
524
525 def get_osdmap(self):
526 """
527 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
528 OSDMap.
529 :return: OSDMap
530 """
531 return self._ceph_get_osdmap()
532
533 def get_all_perf_counters(self, prio_limit=PRIO_USEFUL):
534 """
535 Return the perf counters currently known to this ceph-mgr
536 instance, filtered by priority equal to or greater than `prio_limit`.
537
538 The result us a map of string to dict, associating services
539 (like "osd.123") with their counters. The counter
540 dict for each service maps counter paths to a counter
541 info structure, which is the information from
542 the schema, plus an additional "value" member with the latest
543 value.
544 """
545
546 result = defaultdict(dict)
547
548 # TODO: improve C++->Python interface to return just
549 # the latest if that's all we want.
550 def get_latest(daemon_type, daemon_name, counter):
551 data = self.get_counter(daemon_type, daemon_name, counter)[counter]
552 if data:
553 return data[-1][1]
554 else:
555 return 0
556
557 def get_latest_avg(daemon_type, daemon_name, counter):
558 data = self.get_counter(daemon_type, daemon_name, counter)[counter]
559 if data:
560 return (data[-1][1], data[-1][2])
561 else:
562 return (0, 0)
563
564 for server in self.list_servers():
565 for service in server['services']:
566 if service['type'] not in ("rgw", "mds", "osd", "mon"):
567 continue
568
569 schema = self.get_perf_schema(service['type'], service['id'])
570 if not schema:
571 self.log.warn("No perf counter schema for {0}.{1}".format(
572 service['type'], service['id']
573 ))
574 continue
575
576 # Value is returned in a potentially-multi-service format,
577 # get just the service we're asking about
578 svc_full_name = "{0}.{1}".format(service['type'], service['id'])
579 schema = schema[svc_full_name]
580
581 # Populate latest values
582 for counter_path, counter_schema in schema.items():
583 # self.log.debug("{0}: {1}".format(
584 # counter_path, json.dumps(counter_schema)
585 # ))
586 if counter_schema['priority'] < prio_limit:
587 continue
588
589 counter_info = dict(counter_schema)
590
591 # Also populate count for the long running avgs
592 if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG:
593 v, c = get_latest_avg(
594 service['type'],
595 service['id'],
596 counter_path
597 )
598 counter_info['value'], counter_info['count'] = v, c
599 result[svc_full_name][counter_path] = counter_info
600 else:
601 counter_info['value'] = get_latest(
602 service['type'],
603 service['id'],
604 counter_path
605 )
606
607 result[svc_full_name][counter_path] = counter_info
608
609 self.log.debug("returning {0} counter".format(len(result)))
610
611 return result
612
613 def set_uri(self, uri):
614 """
615 If the module exposes a service, then call this to publish the
616 address once it is available.
617
618 :return: a string
619 """
620 return self._ceph_set_uri(uri)
621
622 def have_mon_connection(self):
623 """
624 Check whether this ceph-mgr daemon has an open connection
625 to a monitor. If it doesn't, then it's likely that the
626 information we have about the cluster is out of date,
627 and/or the monitor cluster is down.
628 """
629
630 return self._ceph_have_mon_connection()