]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | |
3efd9988 FG |
2 | import ceph_module # noqa |
3 | #import ceph_osdmap #noqa | |
4 | #import ceph_osdmap_incremental #noqa | |
5 | #import ceph_crushmap #noqa | |
6 | ||
7c673cae FG |
7 | import json |
8 | import logging | |
9 | import threading | |
3efd9988 FG |
10 | from collections import defaultdict |
11 | ||
12 | ||
13 | class CPlusPlusHandler(logging.Handler): | |
14 | def __init__(self, module_inst): | |
15 | super(CPlusPlusHandler, self).__init__() | |
16 | self._module = module_inst | |
17 | ||
18 | def emit(self, record): | |
19 | if record.levelno <= logging.DEBUG: | |
20 | ceph_level = 20 | |
21 | elif record.levelno <= logging.INFO: | |
22 | ceph_level = 4 | |
23 | elif record.levelno <= logging.WARNING: | |
24 | ceph_level = 1 | |
25 | else: | |
26 | ceph_level = 0 | |
27 | ||
28 | self._module._ceph_log(ceph_level, self.format(record)) | |
29 | ||
30 | ||
31 | def configure_logger(module_inst, name): | |
32 | logger = logging.getLogger(name) | |
33 | ||
34 | ||
35 | # Don't filter any logs at the python level, leave it to C++ | |
36 | logger.setLevel(logging.DEBUG) | |
37 | ||
38 | # FIXME: we should learn the log level from C++ land, and then | |
39 | # avoid calling the C++ level log when we know a message is of | |
40 | # an insufficient level to be ultimately output | |
41 | logger.addHandler(CPlusPlusHandler(module_inst)) | |
42 | ||
43 | return logger | |
7c673cae FG |
44 | |
45 | ||
3efd9988 FG |
46 | def unconfigure_logger(module_inst, name): |
47 | logger = logging.getLogger(name) | |
48 | rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)] | |
49 | for h in rm_handlers: | |
50 | logger.removeHandler(h) | |
51 | ||
7c673cae FG |
52 | class CommandResult(object): |
53 | """ | |
54 | Use with MgrModule.send_command | |
55 | """ | |
56 | def __init__(self, tag): | |
57 | self.ev = threading.Event() | |
58 | self.outs = "" | |
59 | self.outb = "" | |
60 | self.r = 0 | |
61 | ||
62 | # This is just a convenience for notifications from | |
63 | # C++ land, to avoid passing addresses around in messages. | |
64 | self.tag = tag | |
65 | ||
66 | def complete(self, r, outb, outs): | |
67 | self.r = r | |
68 | self.outb = outb | |
69 | self.outs = outs | |
70 | self.ev.set() | |
71 | ||
72 | def wait(self): | |
73 | self.ev.wait() | |
74 | return self.r, self.outb, self.outs | |
75 | ||
76 | ||
3efd9988 FG |
77 | class OSDMap(ceph_module.BasePyOSDMap): |
78 | def get_epoch(self): | |
79 | return self._get_epoch() | |
80 | ||
81 | def get_crush_version(self): | |
82 | return self._get_crush_version() | |
83 | ||
84 | def dump(self): | |
85 | return self._dump() | |
86 | ||
87 | def new_incremental(self): | |
88 | return self._new_incremental() | |
89 | ||
90 | def apply_incremental(self, inc): | |
91 | return self._apply_incremental(inc) | |
92 | ||
93 | def get_crush(self): | |
94 | return self._get_crush() | |
95 | ||
96 | def get_pools_by_take(self, take): | |
97 | return self._get_pools_by_take(take).get('pools', []) | |
98 | ||
99 | def calc_pg_upmaps(self, inc, | |
100 | max_deviation=.01, max_iterations=10, pools=[]): | |
101 | return self._calc_pg_upmaps( | |
102 | inc, | |
103 | max_deviation, max_iterations, pools) | |
104 | ||
105 | def map_pool_pgs_up(self, poolid): | |
106 | return self._map_pool_pgs_up(poolid) | |
107 | ||
108 | class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental): | |
109 | def get_epoch(self): | |
110 | return self._get_epoch() | |
111 | ||
112 | def dump(self): | |
113 | return self._dump() | |
114 | ||
115 | def set_osd_reweights(self, weightmap): | |
116 | """ | |
117 | weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 } | |
118 | """ | |
119 | return self._set_osd_reweights(weightmap) | |
120 | ||
121 | def set_crush_compat_weight_set_weights(self, weightmap): | |
122 | """ | |
123 | weightmap is a dict, int to float. devices only. e.g., | |
124 | { 0: 3.4, 1: 3.3, 2: 3.334 } | |
125 | """ | |
126 | return self._set_crush_compat_weight_set_weights(weightmap) | |
127 | ||
128 | class CRUSHMap(ceph_module.BasePyCRUSH): | |
b32b8144 FG |
129 | ITEM_NONE = 0x7fffffff |
130 | ||
3efd9988 FG |
131 | def dump(self): |
132 | return self._dump() | |
133 | ||
134 | def get_item_weight(self, item): | |
135 | return self._get_item_weight(item) | |
136 | ||
137 | def get_item_name(self, item): | |
138 | return self._get_item_name(item) | |
139 | ||
140 | def find_takes(self): | |
141 | return self._find_takes().get('takes', []) | |
142 | ||
143 | def get_take_weight_osd_map(self, root): | |
144 | uglymap = self._get_take_weight_osd_map(root) | |
145 | return { int(k): v for k, v in uglymap.get('weights', {}).iteritems() } | |
146 | ||
147 | class MgrStandbyModule(ceph_module.BaseMgrStandbyModule): | |
148 | """ | |
149 | Standby modules only implement a serve and shutdown method, they | |
150 | are not permitted to implement commands and they do not receive | |
151 | any notifications. | |
152 | ||
b32b8144 | 153 | They only have access to the mgrmap (for accessing service URI info |
3efd9988 FG |
154 | from their active peer), and to configuration settings (read only). |
155 | """ | |
156 | ||
157 | def __init__(self, module_name, capsule): | |
158 | super(MgrStandbyModule, self).__init__(capsule) | |
159 | self.module_name = module_name | |
160 | self._logger = configure_logger(self, module_name) | |
161 | ||
162 | def __del__(self): | |
163 | unconfigure_logger(self, self.module_name) | |
164 | ||
165 | @property | |
166 | def log(self): | |
167 | return self._logger | |
168 | ||
169 | def serve(self): | |
170 | """ | |
171 | The serve method is mandatory for standby modules. | |
172 | :return: | |
173 | """ | |
174 | raise NotImplementedError() | |
175 | ||
176 | def get_mgr_id(self): | |
177 | return self._ceph_get_mgr_id() | |
178 | ||
b32b8144 FG |
179 | def get_config(self, key, default=None): |
180 | """ | |
181 | Retrieve the value of a persistent configuration setting | |
182 | ||
183 | :param str key: | |
184 | :param default: the default value of the config if it is not found | |
185 | :return: str | |
186 | """ | |
187 | r = self._ceph_get_config(key) | |
188 | if r is None: | |
189 | return default | |
190 | else: | |
191 | return r | |
192 | ||
3efd9988 FG |
193 | |
194 | def get_active_uri(self): | |
195 | return self._ceph_get_active_uri() | |
196 | ||
197 | def get_localized_config(self, key, default=None): | |
198 | r = self.get_config(self.get_mgr_id() + '/' + key) | |
199 | if r is None: | |
200 | r = self.get_config(key) | |
201 | ||
202 | if r is None: | |
203 | r = default | |
204 | return r | |
205 | ||
206 | class MgrModule(ceph_module.BaseMgrModule): | |
7c673cae FG |
207 | COMMANDS = [] |
208 | ||
3efd9988 FG |
209 | # Priority definitions for perf counters |
210 | PRIO_CRITICAL = 10 | |
211 | PRIO_INTERESTING = 8 | |
212 | PRIO_USEFUL = 5 | |
213 | PRIO_UNINTERESTING = 2 | |
214 | PRIO_DEBUGONLY = 0 | |
215 | ||
216 | # counter value types | |
217 | PERFCOUNTER_TIME = 1 | |
218 | PERFCOUNTER_U64 = 2 | |
219 | ||
220 | # counter types | |
221 | PERFCOUNTER_LONGRUNAVG = 4 | |
222 | PERFCOUNTER_COUNTER = 8 | |
223 | PERFCOUNTER_HISTOGRAM = 0x10 | |
28e407b8 | 224 | PERFCOUNTER_TYPE_MASK = ~3 |
7c673cae | 225 | |
3efd9988 FG |
226 | def __init__(self, module_name, py_modules_ptr, this_ptr): |
227 | self.module_name = module_name | |
7c673cae | 228 | |
3efd9988 FG |
229 | # If we're taking over from a standby module, let's make sure |
230 | # its logger was unconfigured before we hook ours up | |
231 | unconfigure_logger(self, self.module_name) | |
232 | self._logger = configure_logger(self, module_name) | |
7c673cae | 233 | |
3efd9988 | 234 | super(MgrModule, self).__init__(py_modules_ptr, this_ptr) |
7c673cae | 235 | |
3efd9988 | 236 | self._version = self._ceph_get_version() |
7c673cae | 237 | |
3efd9988 | 238 | self._perf_schema_cache = None |
7c673cae | 239 | |
3efd9988 FG |
240 | def __del__(self): |
241 | unconfigure_logger(self, self.module_name) | |
242 | ||
243 | def update_perf_schema(self, daemon_type, daemon_name): | |
244 | """ | |
245 | For plugins that use get_all_perf_counters, call this when | |
246 | receiving a notification of type 'perf_schema_update', to | |
247 | prompt MgrModule to update its cache of counter schemas. | |
248 | ||
249 | :param daemon_type: | |
250 | :param daemon_name: | |
251 | :return: | |
252 | """ | |
7c673cae FG |
253 | |
254 | @property | |
255 | def log(self): | |
256 | return self._logger | |
257 | ||
258 | @property | |
259 | def version(self): | |
260 | return self._version | |
261 | ||
3efd9988 FG |
262 | def get_context(self): |
263 | """ | |
264 | :return: a Python capsule containing a C++ CephContext pointer | |
265 | """ | |
266 | return self._ceph_get_context() | |
267 | ||
7c673cae FG |
268 | def notify(self, notify_type, notify_id): |
269 | """ | |
270 | Called by the ceph-mgr service to notify the Python plugin | |
271 | that new state is available. | |
272 | """ | |
273 | pass | |
274 | ||
275 | def serve(self): | |
276 | """ | |
277 | Called by the ceph-mgr service to start any server that | |
278 | is provided by this Python plugin. The implementation | |
279 | of this function should block until ``shutdown`` is called. | |
280 | ||
281 | You *must* implement ``shutdown`` if you implement ``serve`` | |
282 | """ | |
283 | pass | |
284 | ||
285 | def shutdown(self): | |
286 | """ | |
287 | Called by the ceph-mgr service to request that this | |
288 | module drop out of its serve() function. You do not | |
289 | need to implement this if you do not implement serve() | |
290 | ||
291 | :return: None | |
292 | """ | |
293 | pass | |
294 | ||
295 | def get(self, data_name): | |
296 | """ | |
297 | Called by the plugin to load some cluster state from ceph-mgr | |
298 | """ | |
3efd9988 | 299 | return self._ceph_get(data_name) |
7c673cae | 300 | |
94b18763 FG |
301 | def _stattype_to_str(self, stattype): |
302 | ||
303 | typeonly = stattype & self.PERFCOUNTER_TYPE_MASK | |
304 | if typeonly == 0: | |
305 | return 'gauge' | |
306 | if typeonly == self.PERFCOUNTER_LONGRUNAVG: | |
307 | # this lie matches the DaemonState decoding: only val, no counts | |
308 | return 'counter' | |
309 | if typeonly == self.PERFCOUNTER_COUNTER: | |
310 | return 'counter' | |
311 | if typeonly == self.PERFCOUNTER_HISTOGRAM: | |
312 | return 'histogram' | |
313 | ||
314 | return '' | |
315 | ||
28e407b8 AA |
316 | def _perfvalue_to_value(self, stattype, value): |
317 | if stattype & self.PERFCOUNTER_TIME: | |
318 | # Convert from ns to seconds | |
319 | return value / 1000000000.0 | |
320 | else: | |
321 | return value | |
322 | ||
7c673cae FG |
323 | def get_server(self, hostname): |
324 | """ | |
325 | Called by the plugin to load information about a particular | |
326 | node from ceph-mgr. | |
327 | ||
328 | :param hostname: a hostame | |
329 | """ | |
3efd9988 | 330 | return self._ceph_get_server(hostname) |
7c673cae | 331 | |
c07f9fc5 FG |
332 | def get_perf_schema(self, svc_type, svc_name): |
333 | """ | |
334 | Called by the plugin to fetch perf counter schema info. | |
335 | svc_name can be nullptr, as can svc_type, in which case | |
336 | they are wildcards | |
337 | ||
338 | :param svc_type: | |
339 | :param svc_name: | |
340 | :return: list of dicts describing the counters requested | |
341 | """ | |
3efd9988 | 342 | return self._ceph_get_perf_schema(svc_type, svc_name) |
c07f9fc5 | 343 | |
7c673cae FG |
344 | def get_counter(self, svc_type, svc_name, path): |
345 | """ | |
346 | Called by the plugin to fetch data for a particular perf counter | |
347 | on a particular service. | |
348 | ||
349 | :param svc_type: | |
350 | :param svc_name: | |
351 | :param path: | |
352 | :return: A list of two-element lists containing time and value | |
353 | """ | |
3efd9988 | 354 | return self._ceph_get_counter(svc_type, svc_name, path) |
7c673cae FG |
355 | |
356 | def list_servers(self): | |
357 | """ | |
358 | Like ``get_server``, but instead of returning information | |
359 | about just one node, return all the nodes in an array. | |
360 | """ | |
3efd9988 | 361 | return self._ceph_get_server(None) |
7c673cae FG |
362 | |
363 | def get_metadata(self, svc_type, svc_id): | |
364 | """ | |
365 | Fetch the metadata for a particular service. | |
366 | ||
224ce89b | 367 | :param svc_type: string (e.g., 'mds', 'osd', 'mon') |
7c673cae FG |
368 | :param svc_id: string |
369 | :return: dict | |
370 | """ | |
3efd9988 | 371 | return self._ceph_get_metadata(svc_type, svc_id) |
7c673cae | 372 | |
224ce89b WB |
373 | def get_daemon_status(self, svc_type, svc_id): |
374 | """ | |
375 | Fetch the latest status for a particular service daemon. | |
376 | ||
377 | :param svc_type: string (e.g., 'rgw') | |
378 | :param svc_id: string | |
379 | :return: dict | |
380 | """ | |
3efd9988 | 381 | return self._ceph_get_daemon_status(svc_type, svc_id) |
224ce89b | 382 | |
7c673cae FG |
383 | def send_command(self, *args, **kwargs): |
384 | """ | |
385 | Called by the plugin to send a command to the mon | |
386 | cluster. | |
387 | """ | |
3efd9988 | 388 | self._ceph_send_command(*args, **kwargs) |
7c673cae | 389 | |
c07f9fc5 FG |
390 | def set_health_checks(self, checks): |
391 | """ | |
392 | Set module's health checks | |
393 | ||
394 | Set the module's current map of health checks. Argument is a | |
395 | dict of check names to info, in this form: | |
396 | ||
397 | { | |
398 | 'CHECK_FOO': { | |
399 | 'severity': 'warning', # or 'error' | |
400 | 'summary': 'summary string', | |
401 | 'detail': [ 'list', 'of', 'detail', 'strings' ], | |
402 | }, | |
403 | 'CHECK_BAR': { | |
404 | 'severity': 'error', | |
405 | 'summary': 'bars are bad', | |
406 | 'detail': [ 'too hard' ], | |
407 | }, | |
408 | } | |
409 | ||
410 | :param list: dict of health check dicts | |
411 | """ | |
3efd9988 | 412 | self._ceph_set_health_checks(checks) |
c07f9fc5 | 413 | |
7c673cae FG |
414 | def handle_command(self, cmd): |
415 | """ | |
416 | Called by ceph-mgr to request the plugin to handle one | |
417 | of the commands that it declared in self.COMMANDS | |
418 | ||
419 | Return a status code, an output buffer, and an | |
420 | output string. The output buffer is for data results, | |
421 | the output string is for informative text. | |
422 | ||
423 | :param cmd: dict, from Ceph's cmdmap_t | |
424 | ||
425 | :return: 3-tuple of (int, str, str) | |
426 | """ | |
427 | ||
428 | # Should never get called if they didn't declare | |
429 | # any ``COMMANDS`` | |
430 | raise NotImplementedError() | |
431 | ||
31f18b77 FG |
432 | def get_mgr_id(self): |
433 | """ | |
434 | Retrieve the mgr id. | |
435 | ||
436 | :return: str | |
437 | """ | |
3efd9988 | 438 | return self._ceph_get_mgr_id() |
31f18b77 | 439 | |
3efd9988 | 440 | def get_config(self, key, default=None): |
7c673cae FG |
441 | """ |
442 | Retrieve the value of a persistent configuration setting | |
443 | ||
444 | :param key: str | |
445 | :return: str | |
446 | """ | |
3efd9988 FG |
447 | r = self._ceph_get_config(key) |
448 | if r is None: | |
449 | return default | |
450 | else: | |
451 | return r | |
7c673cae | 452 | |
31f18b77 FG |
453 | def get_config_prefix(self, key_prefix): |
454 | """ | |
455 | Retrieve a dict of config values with the given prefix | |
456 | ||
457 | :param key_prefix: str | |
458 | :return: str | |
459 | """ | |
3efd9988 | 460 | return self._ceph_get_config_prefix(key_prefix) |
31f18b77 | 461 | |
224ce89b WB |
462 | def get_localized_config(self, key, default=None): |
463 | """ | |
464 | Retrieve localized configuration for this ceph-mgr instance | |
465 | :param key: str | |
466 | :param default: str | |
467 | :return: str | |
468 | """ | |
469 | r = self.get_config(self.get_mgr_id() + '/' + key) | |
470 | if r is None: | |
471 | r = self.get_config(key) | |
472 | ||
473 | if r is None: | |
474 | r = default | |
475 | return r | |
476 | ||
7c673cae FG |
477 | def set_config(self, key, val): |
478 | """ | |
479 | Set the value of a persistent configuration setting | |
480 | ||
481 | :param key: str | |
482 | :param val: str | |
483 | """ | |
3efd9988 | 484 | self._ceph_set_config(key, val) |
7c673cae | 485 | |
224ce89b WB |
486 | def set_localized_config(self, key, val): |
487 | """ | |
488 | Set localized configuration for this ceph-mgr instance | |
489 | :param key: str | |
490 | :param default: str | |
491 | :return: str | |
492 | """ | |
3efd9988 | 493 | return self._ceph_set_config(self.get_mgr_id() + '/' + key, val) |
224ce89b | 494 | |
7c673cae FG |
495 | def set_config_json(self, key, val): |
496 | """ | |
497 | Helper for setting json-serialized-config | |
498 | ||
499 | :param key: str | |
500 | :param val: json-serializable object | |
501 | """ | |
3efd9988 | 502 | self._ceph_set_config(key, json.dumps(val)) |
7c673cae FG |
503 | |
504 | def get_config_json(self, key): | |
505 | """ | |
506 | Helper for getting json-serialized config | |
507 | ||
508 | :param key: str | |
509 | :return: object | |
510 | """ | |
511 | raw = self.get_config(key) | |
512 | if raw is None: | |
513 | return None | |
514 | else: | |
515 | return json.loads(raw) | |
224ce89b WB |
516 | |
517 | def self_test(self): | |
518 | """ | |
519 | Run a self-test on the module. Override this function and implement | |
520 | a best as possible self-test for (automated) testing of the module | |
521 | :return: bool | |
522 | """ | |
523 | pass | |
3efd9988 FG |
524 | |
525 | def get_osdmap(self): | |
526 | """ | |
527 | Get a handle to an OSDMap. If epoch==0, get a handle for the latest | |
528 | OSDMap. | |
529 | :return: OSDMap | |
530 | """ | |
531 | return self._ceph_get_osdmap() | |
532 | ||
533 | def get_all_perf_counters(self, prio_limit=PRIO_USEFUL): | |
534 | """ | |
535 | Return the perf counters currently known to this ceph-mgr | |
536 | instance, filtered by priority equal to or greater than `prio_limit`. | |
537 | ||
538 | The result us a map of string to dict, associating services | |
539 | (like "osd.123") with their counters. The counter | |
540 | dict for each service maps counter paths to a counter | |
541 | info structure, which is the information from | |
542 | the schema, plus an additional "value" member with the latest | |
543 | value. | |
544 | """ | |
545 | ||
546 | result = defaultdict(dict) | |
547 | ||
548 | # TODO: improve C++->Python interface to return just | |
549 | # the latest if that's all we want. | |
550 | def get_latest(daemon_type, daemon_name, counter): | |
551 | data = self.get_counter(daemon_type, daemon_name, counter)[counter] | |
552 | if data: | |
553 | return data[-1][1] | |
554 | else: | |
555 | return 0 | |
556 | ||
28e407b8 AA |
557 | def get_latest_avg(daemon_type, daemon_name, counter): |
558 | data = self.get_counter(daemon_type, daemon_name, counter)[counter] | |
559 | if data: | |
560 | return (data[-1][1], data[-1][2]) | |
561 | else: | |
562 | return (0, 0) | |
563 | ||
3efd9988 FG |
564 | for server in self.list_servers(): |
565 | for service in server['services']: | |
94b18763 | 566 | if service['type'] not in ("rgw", "mds", "osd", "mon"): |
3efd9988 FG |
567 | continue |
568 | ||
569 | schema = self.get_perf_schema(service['type'], service['id']) | |
570 | if not schema: | |
571 | self.log.warn("No perf counter schema for {0}.{1}".format( | |
572 | service['type'], service['id'] | |
573 | )) | |
574 | continue | |
575 | ||
576 | # Value is returned in a potentially-multi-service format, | |
577 | # get just the service we're asking about | |
578 | svc_full_name = "{0}.{1}".format(service['type'], service['id']) | |
579 | schema = schema[svc_full_name] | |
580 | ||
581 | # Populate latest values | |
582 | for counter_path, counter_schema in schema.items(): | |
583 | # self.log.debug("{0}: {1}".format( | |
584 | # counter_path, json.dumps(counter_schema) | |
585 | # )) | |
586 | if counter_schema['priority'] < prio_limit: | |
587 | continue | |
588 | ||
28e407b8 AA |
589 | counter_info = dict(counter_schema) |
590 | ||
591 | # Also populate count for the long running avgs | |
592 | if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG: | |
593 | v, c = get_latest_avg( | |
594 | service['type'], | |
595 | service['id'], | |
596 | counter_path | |
597 | ) | |
598 | counter_info['value'], counter_info['count'] = v, c | |
599 | result[svc_full_name][counter_path] = counter_info | |
600 | else: | |
601 | counter_info['value'] = get_latest( | |
602 | service['type'], | |
603 | service['id'], | |
604 | counter_path | |
605 | ) | |
606 | ||
3efd9988 FG |
607 | result[svc_full_name][counter_path] = counter_info |
608 | ||
609 | self.log.debug("returning {0} counter".format(len(result))) | |
610 | ||
611 | return result | |
612 | ||
613 | def set_uri(self, uri): | |
614 | """ | |
615 | If the module exposes a service, then call this to publish the | |
616 | address once it is available. | |
617 | ||
618 | :return: a string | |
619 | """ | |
620 | return self._ceph_set_uri(uri) | |
94b18763 FG |
621 | |
622 | def have_mon_connection(self): | |
623 | """ | |
624 | Check whether this ceph-mgr daemon has an open connection | |
625 | to a monitor. If it doesn't, then it's likely that the | |
626 | information we have about the cluster is out of date, | |
627 | and/or the monitor cluster is down. | |
628 | """ | |
629 | ||
28e407b8 | 630 | return self._ceph_have_mon_connection() |