]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | |
3efd9988 FG |
2 | import ceph_module # noqa |
3 | #import ceph_osdmap #noqa | |
4 | #import ceph_osdmap_incremental #noqa | |
5 | #import ceph_crushmap #noqa | |
6 | ||
7c673cae FG |
7 | import json |
8 | import logging | |
1adf2230 | 9 | import six |
7c673cae | 10 | import threading |
3efd9988 FG |
11 | from collections import defaultdict |
12 | ||
13 | ||
14 | class CPlusPlusHandler(logging.Handler): | |
15 | def __init__(self, module_inst): | |
16 | super(CPlusPlusHandler, self).__init__() | |
17 | self._module = module_inst | |
18 | ||
19 | def emit(self, record): | |
20 | if record.levelno <= logging.DEBUG: | |
21 | ceph_level = 20 | |
22 | elif record.levelno <= logging.INFO: | |
23 | ceph_level = 4 | |
24 | elif record.levelno <= logging.WARNING: | |
25 | ceph_level = 1 | |
26 | else: | |
27 | ceph_level = 0 | |
28 | ||
29 | self._module._ceph_log(ceph_level, self.format(record)) | |
30 | ||
31 | ||
32 | def configure_logger(module_inst, name): | |
33 | logger = logging.getLogger(name) | |
34 | ||
35 | ||
36 | # Don't filter any logs at the python level, leave it to C++ | |
37 | logger.setLevel(logging.DEBUG) | |
38 | ||
39 | # FIXME: we should learn the log level from C++ land, and then | |
40 | # avoid calling the C++ level log when we know a message is of | |
41 | # an insufficient level to be ultimately output | |
42 | logger.addHandler(CPlusPlusHandler(module_inst)) | |
43 | ||
44 | return logger | |
7c673cae FG |
45 | |
46 | ||
3efd9988 FG |
47 | def unconfigure_logger(module_inst, name): |
48 | logger = logging.getLogger(name) | |
49 | rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)] | |
50 | for h in rm_handlers: | |
51 | logger.removeHandler(h) | |
52 | ||
7c673cae FG |
53 | class CommandResult(object): |
54 | """ | |
55 | Use with MgrModule.send_command | |
56 | """ | |
57 | def __init__(self, tag): | |
58 | self.ev = threading.Event() | |
59 | self.outs = "" | |
60 | self.outb = "" | |
61 | self.r = 0 | |
62 | ||
63 | # This is just a convenience for notifications from | |
64 | # C++ land, to avoid passing addresses around in messages. | |
65 | self.tag = tag | |
66 | ||
67 | def complete(self, r, outb, outs): | |
68 | self.r = r | |
69 | self.outb = outb | |
70 | self.outs = outs | |
71 | self.ev.set() | |
72 | ||
73 | def wait(self): | |
74 | self.ev.wait() | |
75 | return self.r, self.outb, self.outs | |
76 | ||
77 | ||
3efd9988 FG |
78 | class OSDMap(ceph_module.BasePyOSDMap): |
79 | def get_epoch(self): | |
80 | return self._get_epoch() | |
81 | ||
82 | def get_crush_version(self): | |
83 | return self._get_crush_version() | |
84 | ||
85 | def dump(self): | |
86 | return self._dump() | |
87 | ||
88 | def new_incremental(self): | |
89 | return self._new_incremental() | |
90 | ||
91 | def apply_incremental(self, inc): | |
92 | return self._apply_incremental(inc) | |
93 | ||
94 | def get_crush(self): | |
95 | return self._get_crush() | |
96 | ||
97 | def get_pools_by_take(self, take): | |
98 | return self._get_pools_by_take(take).get('pools', []) | |
99 | ||
100 | def calc_pg_upmaps(self, inc, | |
101 | max_deviation=.01, max_iterations=10, pools=[]): | |
102 | return self._calc_pg_upmaps( | |
103 | inc, | |
104 | max_deviation, max_iterations, pools) | |
105 | ||
106 | def map_pool_pgs_up(self, poolid): | |
107 | return self._map_pool_pgs_up(poolid) | |
108 | ||
109 | class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental): | |
110 | def get_epoch(self): | |
111 | return self._get_epoch() | |
112 | ||
113 | def dump(self): | |
114 | return self._dump() | |
115 | ||
116 | def set_osd_reweights(self, weightmap): | |
117 | """ | |
118 | weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 } | |
119 | """ | |
120 | return self._set_osd_reweights(weightmap) | |
121 | ||
122 | def set_crush_compat_weight_set_weights(self, weightmap): | |
123 | """ | |
124 | weightmap is a dict, int to float. devices only. e.g., | |
125 | { 0: 3.4, 1: 3.3, 2: 3.334 } | |
126 | """ | |
127 | return self._set_crush_compat_weight_set_weights(weightmap) | |
128 | ||
129 | class CRUSHMap(ceph_module.BasePyCRUSH): | |
b32b8144 FG |
130 | ITEM_NONE = 0x7fffffff |
131 | ||
3efd9988 FG |
132 | def dump(self): |
133 | return self._dump() | |
134 | ||
135 | def get_item_weight(self, item): | |
136 | return self._get_item_weight(item) | |
137 | ||
138 | def get_item_name(self, item): | |
139 | return self._get_item_name(item) | |
140 | ||
141 | def find_takes(self): | |
142 | return self._find_takes().get('takes', []) | |
143 | ||
144 | def get_take_weight_osd_map(self, root): | |
145 | uglymap = self._get_take_weight_osd_map(root) | |
1adf2230 | 146 | return { int(k): v for k, v in six.iteritems(uglymap.get('weights', {})) } |
3efd9988 FG |
147 | |
148 | class MgrStandbyModule(ceph_module.BaseMgrStandbyModule): | |
149 | """ | |
150 | Standby modules only implement a serve and shutdown method, they | |
151 | are not permitted to implement commands and they do not receive | |
152 | any notifications. | |
153 | ||
b32b8144 | 154 | They only have access to the mgrmap (for accessing service URI info |
3efd9988 FG |
155 | from their active peer), and to configuration settings (read only). |
156 | """ | |
157 | ||
158 | def __init__(self, module_name, capsule): | |
159 | super(MgrStandbyModule, self).__init__(capsule) | |
160 | self.module_name = module_name | |
161 | self._logger = configure_logger(self, module_name) | |
162 | ||
163 | def __del__(self): | |
164 | unconfigure_logger(self, self.module_name) | |
165 | ||
166 | @property | |
167 | def log(self): | |
168 | return self._logger | |
169 | ||
170 | def serve(self): | |
171 | """ | |
172 | The serve method is mandatory for standby modules. | |
173 | :return: | |
174 | """ | |
175 | raise NotImplementedError() | |
176 | ||
177 | def get_mgr_id(self): | |
178 | return self._ceph_get_mgr_id() | |
179 | ||
b32b8144 FG |
180 | def get_config(self, key, default=None): |
181 | """ | |
182 | Retrieve the value of a persistent configuration setting | |
183 | ||
184 | :param str key: | |
185 | :param default: the default value of the config if it is not found | |
186 | :return: str | |
187 | """ | |
188 | r = self._ceph_get_config(key) | |
189 | if r is None: | |
190 | return default | |
191 | else: | |
192 | return r | |
193 | ||
3efd9988 FG |
194 | |
195 | def get_active_uri(self): | |
196 | return self._ceph_get_active_uri() | |
197 | ||
198 | def get_localized_config(self, key, default=None): | |
199 | r = self.get_config(self.get_mgr_id() + '/' + key) | |
200 | if r is None: | |
201 | r = self.get_config(key) | |
202 | ||
203 | if r is None: | |
204 | r = default | |
205 | return r | |
206 | ||
207 | class MgrModule(ceph_module.BaseMgrModule): | |
7c673cae FG |
208 | COMMANDS = [] |
209 | ||
3efd9988 FG |
210 | # Priority definitions for perf counters |
211 | PRIO_CRITICAL = 10 | |
212 | PRIO_INTERESTING = 8 | |
213 | PRIO_USEFUL = 5 | |
214 | PRIO_UNINTERESTING = 2 | |
215 | PRIO_DEBUGONLY = 0 | |
216 | ||
217 | # counter value types | |
218 | PERFCOUNTER_TIME = 1 | |
219 | PERFCOUNTER_U64 = 2 | |
220 | ||
221 | # counter types | |
222 | PERFCOUNTER_LONGRUNAVG = 4 | |
223 | PERFCOUNTER_COUNTER = 8 | |
224 | PERFCOUNTER_HISTOGRAM = 0x10 | |
28e407b8 | 225 | PERFCOUNTER_TYPE_MASK = ~3 |
7c673cae | 226 | |
1adf2230 AA |
227 | # units supported |
228 | BYTES = 0 | |
229 | NONE = 1 | |
230 | ||
3efd9988 FG |
231 | def __init__(self, module_name, py_modules_ptr, this_ptr): |
232 | self.module_name = module_name | |
7c673cae | 233 | |
3efd9988 FG |
234 | # If we're taking over from a standby module, let's make sure |
235 | # its logger was unconfigured before we hook ours up | |
236 | unconfigure_logger(self, self.module_name) | |
237 | self._logger = configure_logger(self, module_name) | |
7c673cae | 238 | |
3efd9988 | 239 | super(MgrModule, self).__init__(py_modules_ptr, this_ptr) |
7c673cae | 240 | |
3efd9988 | 241 | self._version = self._ceph_get_version() |
7c673cae | 242 | |
3efd9988 | 243 | self._perf_schema_cache = None |
7c673cae | 244 | |
3efd9988 FG |
245 | def __del__(self): |
246 | unconfigure_logger(self, self.module_name) | |
247 | ||
248 | def update_perf_schema(self, daemon_type, daemon_name): | |
249 | """ | |
250 | For plugins that use get_all_perf_counters, call this when | |
251 | receiving a notification of type 'perf_schema_update', to | |
252 | prompt MgrModule to update its cache of counter schemas. | |
253 | ||
254 | :param daemon_type: | |
255 | :param daemon_name: | |
256 | :return: | |
257 | """ | |
7c673cae FG |
258 | |
259 | @property | |
260 | def log(self): | |
261 | return self._logger | |
262 | ||
263 | @property | |
264 | def version(self): | |
265 | return self._version | |
266 | ||
3efd9988 FG |
267 | def get_context(self): |
268 | """ | |
269 | :return: a Python capsule containing a C++ CephContext pointer | |
270 | """ | |
271 | return self._ceph_get_context() | |
272 | ||
7c673cae FG |
273 | def notify(self, notify_type, notify_id): |
274 | """ | |
275 | Called by the ceph-mgr service to notify the Python plugin | |
276 | that new state is available. | |
277 | """ | |
278 | pass | |
279 | ||
280 | def serve(self): | |
281 | """ | |
282 | Called by the ceph-mgr service to start any server that | |
283 | is provided by this Python plugin. The implementation | |
284 | of this function should block until ``shutdown`` is called. | |
285 | ||
286 | You *must* implement ``shutdown`` if you implement ``serve`` | |
287 | """ | |
288 | pass | |
289 | ||
290 | def shutdown(self): | |
291 | """ | |
292 | Called by the ceph-mgr service to request that this | |
293 | module drop out of its serve() function. You do not | |
294 | need to implement this if you do not implement serve() | |
295 | ||
296 | :return: None | |
297 | """ | |
298 | pass | |
299 | ||
300 | def get(self, data_name): | |
301 | """ | |
302 | Called by the plugin to load some cluster state from ceph-mgr | |
303 | """ | |
3efd9988 | 304 | return self._ceph_get(data_name) |
7c673cae | 305 | |
94b18763 FG |
306 | def _stattype_to_str(self, stattype): |
307 | ||
308 | typeonly = stattype & self.PERFCOUNTER_TYPE_MASK | |
309 | if typeonly == 0: | |
310 | return 'gauge' | |
311 | if typeonly == self.PERFCOUNTER_LONGRUNAVG: | |
312 | # this lie matches the DaemonState decoding: only val, no counts | |
313 | return 'counter' | |
314 | if typeonly == self.PERFCOUNTER_COUNTER: | |
315 | return 'counter' | |
316 | if typeonly == self.PERFCOUNTER_HISTOGRAM: | |
317 | return 'histogram' | |
318 | ||
319 | return '' | |
320 | ||
28e407b8 AA |
321 | def _perfvalue_to_value(self, stattype, value): |
322 | if stattype & self.PERFCOUNTER_TIME: | |
323 | # Convert from ns to seconds | |
324 | return value / 1000000000.0 | |
325 | else: | |
326 | return value | |
327 | ||
1adf2230 AA |
328 | def _unit_to_str(self, unit): |
329 | if unit == self.NONE: | |
330 | return "/s" | |
331 | elif unit == self.BYTES: | |
332 | return "B/s" | |
333 | ||
7c673cae FG |
334 | def get_server(self, hostname): |
335 | """ | |
336 | Called by the plugin to load information about a particular | |
337 | node from ceph-mgr. | |
338 | ||
339 | :param hostname: a hostame | |
340 | """ | |
3efd9988 | 341 | return self._ceph_get_server(hostname) |
7c673cae | 342 | |
c07f9fc5 FG |
343 | def get_perf_schema(self, svc_type, svc_name): |
344 | """ | |
345 | Called by the plugin to fetch perf counter schema info. | |
346 | svc_name can be nullptr, as can svc_type, in which case | |
347 | they are wildcards | |
348 | ||
349 | :param svc_type: | |
350 | :param svc_name: | |
351 | :return: list of dicts describing the counters requested | |
352 | """ | |
3efd9988 | 353 | return self._ceph_get_perf_schema(svc_type, svc_name) |
c07f9fc5 | 354 | |
7c673cae FG |
355 | def get_counter(self, svc_type, svc_name, path): |
356 | """ | |
357 | Called by the plugin to fetch data for a particular perf counter | |
358 | on a particular service. | |
359 | ||
360 | :param svc_type: | |
361 | :param svc_name: | |
362 | :param path: | |
363 | :return: A list of two-element lists containing time and value | |
364 | """ | |
3efd9988 | 365 | return self._ceph_get_counter(svc_type, svc_name, path) |
7c673cae FG |
366 | |
367 | def list_servers(self): | |
368 | """ | |
369 | Like ``get_server``, but instead of returning information | |
370 | about just one node, return all the nodes in an array. | |
371 | """ | |
3efd9988 | 372 | return self._ceph_get_server(None) |
7c673cae FG |
373 | |
374 | def get_metadata(self, svc_type, svc_id): | |
375 | """ | |
376 | Fetch the metadata for a particular service. | |
377 | ||
224ce89b | 378 | :param svc_type: string (e.g., 'mds', 'osd', 'mon') |
7c673cae FG |
379 | :param svc_id: string |
380 | :return: dict | |
381 | """ | |
3efd9988 | 382 | return self._ceph_get_metadata(svc_type, svc_id) |
7c673cae | 383 | |
224ce89b WB |
384 | def get_daemon_status(self, svc_type, svc_id): |
385 | """ | |
386 | Fetch the latest status for a particular service daemon. | |
387 | ||
388 | :param svc_type: string (e.g., 'rgw') | |
389 | :param svc_id: string | |
390 | :return: dict | |
391 | """ | |
3efd9988 | 392 | return self._ceph_get_daemon_status(svc_type, svc_id) |
224ce89b | 393 | |
7c673cae FG |
394 | def send_command(self, *args, **kwargs): |
395 | """ | |
396 | Called by the plugin to send a command to the mon | |
397 | cluster. | |
398 | """ | |
3efd9988 | 399 | self._ceph_send_command(*args, **kwargs) |
7c673cae | 400 | |
c07f9fc5 FG |
401 | def set_health_checks(self, checks): |
402 | """ | |
403 | Set module's health checks | |
404 | ||
405 | Set the module's current map of health checks. Argument is a | |
406 | dict of check names to info, in this form: | |
407 | ||
408 | { | |
409 | 'CHECK_FOO': { | |
410 | 'severity': 'warning', # or 'error' | |
411 | 'summary': 'summary string', | |
412 | 'detail': [ 'list', 'of', 'detail', 'strings' ], | |
413 | }, | |
414 | 'CHECK_BAR': { | |
415 | 'severity': 'error', | |
416 | 'summary': 'bars are bad', | |
417 | 'detail': [ 'too hard' ], | |
418 | }, | |
419 | } | |
420 | ||
421 | :param list: dict of health check dicts | |
422 | """ | |
3efd9988 | 423 | self._ceph_set_health_checks(checks) |
c07f9fc5 | 424 | |
7c673cae FG |
425 | def handle_command(self, cmd): |
426 | """ | |
427 | Called by ceph-mgr to request the plugin to handle one | |
428 | of the commands that it declared in self.COMMANDS | |
429 | ||
430 | Return a status code, an output buffer, and an | |
431 | output string. The output buffer is for data results, | |
432 | the output string is for informative text. | |
433 | ||
434 | :param cmd: dict, from Ceph's cmdmap_t | |
435 | ||
436 | :return: 3-tuple of (int, str, str) | |
437 | """ | |
438 | ||
439 | # Should never get called if they didn't declare | |
440 | # any ``COMMANDS`` | |
441 | raise NotImplementedError() | |
442 | ||
31f18b77 FG |
443 | def get_mgr_id(self): |
444 | """ | |
445 | Retrieve the mgr id. | |
446 | ||
447 | :return: str | |
448 | """ | |
3efd9988 | 449 | return self._ceph_get_mgr_id() |
31f18b77 | 450 | |
3efd9988 | 451 | def get_config(self, key, default=None): |
7c673cae FG |
452 | """ |
453 | Retrieve the value of a persistent configuration setting | |
454 | ||
455 | :param key: str | |
456 | :return: str | |
457 | """ | |
3efd9988 FG |
458 | r = self._ceph_get_config(key) |
459 | if r is None: | |
460 | return default | |
461 | else: | |
462 | return r | |
7c673cae | 463 | |
31f18b77 FG |
464 | def get_config_prefix(self, key_prefix): |
465 | """ | |
466 | Retrieve a dict of config values with the given prefix | |
467 | ||
468 | :param key_prefix: str | |
469 | :return: str | |
470 | """ | |
3efd9988 | 471 | return self._ceph_get_config_prefix(key_prefix) |
31f18b77 | 472 | |
224ce89b WB |
473 | def get_localized_config(self, key, default=None): |
474 | """ | |
475 | Retrieve localized configuration for this ceph-mgr instance | |
476 | :param key: str | |
477 | :param default: str | |
478 | :return: str | |
479 | """ | |
480 | r = self.get_config(self.get_mgr_id() + '/' + key) | |
481 | if r is None: | |
482 | r = self.get_config(key) | |
483 | ||
484 | if r is None: | |
485 | r = default | |
486 | return r | |
487 | ||
7c673cae FG |
488 | def set_config(self, key, val): |
489 | """ | |
490 | Set the value of a persistent configuration setting | |
491 | ||
492 | :param key: str | |
493 | :param val: str | |
494 | """ | |
3efd9988 | 495 | self._ceph_set_config(key, val) |
7c673cae | 496 | |
224ce89b WB |
497 | def set_localized_config(self, key, val): |
498 | """ | |
499 | Set localized configuration for this ceph-mgr instance | |
500 | :param key: str | |
501 | :param default: str | |
502 | :return: str | |
503 | """ | |
3efd9988 | 504 | return self._ceph_set_config(self.get_mgr_id() + '/' + key, val) |
224ce89b | 505 | |
7c673cae FG |
506 | def set_config_json(self, key, val): |
507 | """ | |
508 | Helper for setting json-serialized-config | |
509 | ||
510 | :param key: str | |
511 | :param val: json-serializable object | |
512 | """ | |
3efd9988 | 513 | self._ceph_set_config(key, json.dumps(val)) |
7c673cae FG |
514 | |
515 | def get_config_json(self, key): | |
516 | """ | |
517 | Helper for getting json-serialized config | |
518 | ||
519 | :param key: str | |
520 | :return: object | |
521 | """ | |
522 | raw = self.get_config(key) | |
523 | if raw is None: | |
524 | return None | |
525 | else: | |
526 | return json.loads(raw) | |
224ce89b WB |
527 | |
528 | def self_test(self): | |
529 | """ | |
530 | Run a self-test on the module. Override this function and implement | |
531 | a best as possible self-test for (automated) testing of the module | |
532 | :return: bool | |
533 | """ | |
534 | pass | |
3efd9988 FG |
535 | |
536 | def get_osdmap(self): | |
537 | """ | |
538 | Get a handle to an OSDMap. If epoch==0, get a handle for the latest | |
539 | OSDMap. | |
540 | :return: OSDMap | |
541 | """ | |
542 | return self._ceph_get_osdmap() | |
543 | ||
544 | def get_all_perf_counters(self, prio_limit=PRIO_USEFUL): | |
545 | """ | |
546 | Return the perf counters currently known to this ceph-mgr | |
547 | instance, filtered by priority equal to or greater than `prio_limit`. | |
548 | ||
549 | The result us a map of string to dict, associating services | |
550 | (like "osd.123") with their counters. The counter | |
551 | dict for each service maps counter paths to a counter | |
552 | info structure, which is the information from | |
553 | the schema, plus an additional "value" member with the latest | |
554 | value. | |
555 | """ | |
556 | ||
557 | result = defaultdict(dict) | |
558 | ||
559 | # TODO: improve C++->Python interface to return just | |
560 | # the latest if that's all we want. | |
561 | def get_latest(daemon_type, daemon_name, counter): | |
562 | data = self.get_counter(daemon_type, daemon_name, counter)[counter] | |
563 | if data: | |
564 | return data[-1][1] | |
565 | else: | |
566 | return 0 | |
567 | ||
28e407b8 AA |
568 | def get_latest_avg(daemon_type, daemon_name, counter): |
569 | data = self.get_counter(daemon_type, daemon_name, counter)[counter] | |
570 | if data: | |
571 | return (data[-1][1], data[-1][2]) | |
572 | else: | |
573 | return (0, 0) | |
574 | ||
3efd9988 FG |
575 | for server in self.list_servers(): |
576 | for service in server['services']: | |
94b18763 | 577 | if service['type'] not in ("rgw", "mds", "osd", "mon"): |
3efd9988 FG |
578 | continue |
579 | ||
580 | schema = self.get_perf_schema(service['type'], service['id']) | |
581 | if not schema: | |
582 | self.log.warn("No perf counter schema for {0}.{1}".format( | |
583 | service['type'], service['id'] | |
584 | )) | |
585 | continue | |
586 | ||
587 | # Value is returned in a potentially-multi-service format, | |
588 | # get just the service we're asking about | |
589 | svc_full_name = "{0}.{1}".format(service['type'], service['id']) | |
590 | schema = schema[svc_full_name] | |
591 | ||
592 | # Populate latest values | |
593 | for counter_path, counter_schema in schema.items(): | |
594 | # self.log.debug("{0}: {1}".format( | |
595 | # counter_path, json.dumps(counter_schema) | |
596 | # )) | |
597 | if counter_schema['priority'] < prio_limit: | |
598 | continue | |
599 | ||
28e407b8 AA |
600 | counter_info = dict(counter_schema) |
601 | ||
602 | # Also populate count for the long running avgs | |
603 | if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG: | |
604 | v, c = get_latest_avg( | |
605 | service['type'], | |
606 | service['id'], | |
607 | counter_path | |
608 | ) | |
609 | counter_info['value'], counter_info['count'] = v, c | |
610 | result[svc_full_name][counter_path] = counter_info | |
611 | else: | |
612 | counter_info['value'] = get_latest( | |
613 | service['type'], | |
614 | service['id'], | |
615 | counter_path | |
616 | ) | |
617 | ||
3efd9988 FG |
618 | result[svc_full_name][counter_path] = counter_info |
619 | ||
620 | self.log.debug("returning {0} counter".format(len(result))) | |
621 | ||
622 | return result | |
623 | ||
624 | def set_uri(self, uri): | |
625 | """ | |
626 | If the module exposes a service, then call this to publish the | |
627 | address once it is available. | |
628 | ||
629 | :return: a string | |
630 | """ | |
631 | return self._ceph_set_uri(uri) | |
94b18763 FG |
632 | |
633 | def have_mon_connection(self): | |
634 | """ | |
635 | Check whether this ceph-mgr daemon has an open connection | |
636 | to a monitor. If it doesn't, then it's likely that the | |
637 | information we have about the cluster is out of date, | |
638 | and/or the monitor cluster is down. | |
639 | """ | |
640 | ||
28e407b8 | 641 | return self._ceph_have_mon_connection() |