]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | |
3efd9988 FG |
2 | import ceph_module # noqa |
3 | #import ceph_osdmap #noqa | |
4 | #import ceph_osdmap_incremental #noqa | |
5 | #import ceph_crushmap #noqa | |
6 | ||
7c673cae FG |
7 | import json |
8 | import logging | |
9 | import threading | |
3efd9988 FG |
10 | from collections import defaultdict |
11 | ||
12 | ||
13 | class CPlusPlusHandler(logging.Handler): | |
14 | def __init__(self, module_inst): | |
15 | super(CPlusPlusHandler, self).__init__() | |
16 | self._module = module_inst | |
17 | ||
18 | def emit(self, record): | |
19 | if record.levelno <= logging.DEBUG: | |
20 | ceph_level = 20 | |
21 | elif record.levelno <= logging.INFO: | |
22 | ceph_level = 4 | |
23 | elif record.levelno <= logging.WARNING: | |
24 | ceph_level = 1 | |
25 | else: | |
26 | ceph_level = 0 | |
27 | ||
28 | self._module._ceph_log(ceph_level, self.format(record)) | |
29 | ||
30 | ||
31 | def configure_logger(module_inst, name): | |
32 | logger = logging.getLogger(name) | |
33 | ||
34 | ||
35 | # Don't filter any logs at the python level, leave it to C++ | |
36 | logger.setLevel(logging.DEBUG) | |
37 | ||
38 | # FIXME: we should learn the log level from C++ land, and then | |
39 | # avoid calling the C++ level log when we know a message is of | |
40 | # an insufficient level to be ultimately output | |
41 | logger.addHandler(CPlusPlusHandler(module_inst)) | |
42 | ||
43 | return logger | |
7c673cae FG |
44 | |
45 | ||
3efd9988 FG |
46 | def unconfigure_logger(module_inst, name): |
47 | logger = logging.getLogger(name) | |
48 | rm_handlers = [h for h in logger.handlers if isinstance(h, CPlusPlusHandler)] | |
49 | for h in rm_handlers: | |
50 | logger.removeHandler(h) | |
51 | ||
7c673cae FG |
52 | class CommandResult(object): |
53 | """ | |
54 | Use with MgrModule.send_command | |
55 | """ | |
56 | def __init__(self, tag): | |
57 | self.ev = threading.Event() | |
58 | self.outs = "" | |
59 | self.outb = "" | |
60 | self.r = 0 | |
61 | ||
62 | # This is just a convenience for notifications from | |
63 | # C++ land, to avoid passing addresses around in messages. | |
64 | self.tag = tag | |
65 | ||
66 | def complete(self, r, outb, outs): | |
67 | self.r = r | |
68 | self.outb = outb | |
69 | self.outs = outs | |
70 | self.ev.set() | |
71 | ||
72 | def wait(self): | |
73 | self.ev.wait() | |
74 | return self.r, self.outb, self.outs | |
75 | ||
76 | ||
3efd9988 FG |
77 | class OSDMap(ceph_module.BasePyOSDMap): |
78 | def get_epoch(self): | |
79 | return self._get_epoch() | |
80 | ||
81 | def get_crush_version(self): | |
82 | return self._get_crush_version() | |
83 | ||
84 | def dump(self): | |
85 | return self._dump() | |
86 | ||
87 | def new_incremental(self): | |
88 | return self._new_incremental() | |
89 | ||
90 | def apply_incremental(self, inc): | |
91 | return self._apply_incremental(inc) | |
92 | ||
93 | def get_crush(self): | |
94 | return self._get_crush() | |
95 | ||
96 | def get_pools_by_take(self, take): | |
97 | return self._get_pools_by_take(take).get('pools', []) | |
98 | ||
99 | def calc_pg_upmaps(self, inc, | |
100 | max_deviation=.01, max_iterations=10, pools=[]): | |
101 | return self._calc_pg_upmaps( | |
102 | inc, | |
103 | max_deviation, max_iterations, pools) | |
104 | ||
105 | def map_pool_pgs_up(self, poolid): | |
106 | return self._map_pool_pgs_up(poolid) | |
107 | ||
108 | class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental): | |
109 | def get_epoch(self): | |
110 | return self._get_epoch() | |
111 | ||
112 | def dump(self): | |
113 | return self._dump() | |
114 | ||
115 | def set_osd_reweights(self, weightmap): | |
116 | """ | |
117 | weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 } | |
118 | """ | |
119 | return self._set_osd_reweights(weightmap) | |
120 | ||
121 | def set_crush_compat_weight_set_weights(self, weightmap): | |
122 | """ | |
123 | weightmap is a dict, int to float. devices only. e.g., | |
124 | { 0: 3.4, 1: 3.3, 2: 3.334 } | |
125 | """ | |
126 | return self._set_crush_compat_weight_set_weights(weightmap) | |
127 | ||
128 | class CRUSHMap(ceph_module.BasePyCRUSH): | |
b32b8144 FG |
129 | ITEM_NONE = 0x7fffffff |
130 | ||
3efd9988 FG |
131 | def dump(self): |
132 | return self._dump() | |
133 | ||
134 | def get_item_weight(self, item): | |
135 | return self._get_item_weight(item) | |
136 | ||
137 | def get_item_name(self, item): | |
138 | return self._get_item_name(item) | |
139 | ||
140 | def find_takes(self): | |
141 | return self._find_takes().get('takes', []) | |
142 | ||
143 | def get_take_weight_osd_map(self, root): | |
144 | uglymap = self._get_take_weight_osd_map(root) | |
145 | return { int(k): v for k, v in uglymap.get('weights', {}).iteritems() } | |
146 | ||
147 | class MgrStandbyModule(ceph_module.BaseMgrStandbyModule): | |
148 | """ | |
149 | Standby modules only implement a serve and shutdown method, they | |
150 | are not permitted to implement commands and they do not receive | |
151 | any notifications. | |
152 | ||
b32b8144 | 153 | They only have access to the mgrmap (for accessing service URI info |
3efd9988 FG |
154 | from their active peer), and to configuration settings (read only). |
155 | """ | |
156 | ||
157 | def __init__(self, module_name, capsule): | |
158 | super(MgrStandbyModule, self).__init__(capsule) | |
159 | self.module_name = module_name | |
160 | self._logger = configure_logger(self, module_name) | |
161 | ||
162 | def __del__(self): | |
163 | unconfigure_logger(self, self.module_name) | |
164 | ||
165 | @property | |
166 | def log(self): | |
167 | return self._logger | |
168 | ||
169 | def serve(self): | |
170 | """ | |
171 | The serve method is mandatory for standby modules. | |
172 | :return: | |
173 | """ | |
174 | raise NotImplementedError() | |
175 | ||
176 | def get_mgr_id(self): | |
177 | return self._ceph_get_mgr_id() | |
178 | ||
b32b8144 FG |
179 | def get_config(self, key, default=None): |
180 | """ | |
181 | Retrieve the value of a persistent configuration setting | |
182 | ||
183 | :param str key: | |
184 | :param default: the default value of the config if it is not found | |
185 | :return: str | |
186 | """ | |
187 | r = self._ceph_get_config(key) | |
188 | if r is None: | |
189 | return default | |
190 | else: | |
191 | return r | |
192 | ||
3efd9988 FG |
193 | |
194 | def get_active_uri(self): | |
195 | return self._ceph_get_active_uri() | |
196 | ||
197 | def get_localized_config(self, key, default=None): | |
198 | r = self.get_config(self.get_mgr_id() + '/' + key) | |
199 | if r is None: | |
200 | r = self.get_config(key) | |
201 | ||
202 | if r is None: | |
203 | r = default | |
204 | return r | |
205 | ||
206 | class MgrModule(ceph_module.BaseMgrModule): | |
7c673cae FG |
207 | COMMANDS = [] |
208 | ||
3efd9988 FG |
209 | # Priority definitions for perf counters |
210 | PRIO_CRITICAL = 10 | |
211 | PRIO_INTERESTING = 8 | |
212 | PRIO_USEFUL = 5 | |
213 | PRIO_UNINTERESTING = 2 | |
214 | PRIO_DEBUGONLY = 0 | |
215 | ||
216 | # counter value types | |
217 | PERFCOUNTER_TIME = 1 | |
218 | PERFCOUNTER_U64 = 2 | |
219 | ||
220 | # counter types | |
221 | PERFCOUNTER_LONGRUNAVG = 4 | |
222 | PERFCOUNTER_COUNTER = 8 | |
223 | PERFCOUNTER_HISTOGRAM = 0x10 | |
224 | PERFCOUNTER_TYPE_MASK = ~2 | |
7c673cae | 225 | |
3efd9988 FG |
226 | def __init__(self, module_name, py_modules_ptr, this_ptr): |
227 | self.module_name = module_name | |
7c673cae | 228 | |
3efd9988 FG |
229 | # If we're taking over from a standby module, let's make sure |
230 | # its logger was unconfigured before we hook ours up | |
231 | unconfigure_logger(self, self.module_name) | |
232 | self._logger = configure_logger(self, module_name) | |
7c673cae | 233 | |
3efd9988 | 234 | super(MgrModule, self).__init__(py_modules_ptr, this_ptr) |
7c673cae | 235 | |
3efd9988 | 236 | self._version = self._ceph_get_version() |
7c673cae | 237 | |
3efd9988 | 238 | self._perf_schema_cache = None |
7c673cae | 239 | |
3efd9988 FG |
240 | def __del__(self): |
241 | unconfigure_logger(self, self.module_name) | |
242 | ||
243 | def update_perf_schema(self, daemon_type, daemon_name): | |
244 | """ | |
245 | For plugins that use get_all_perf_counters, call this when | |
246 | receiving a notification of type 'perf_schema_update', to | |
247 | prompt MgrModule to update its cache of counter schemas. | |
248 | ||
249 | :param daemon_type: | |
250 | :param daemon_name: | |
251 | :return: | |
252 | """ | |
7c673cae FG |
253 | |
254 | @property | |
255 | def log(self): | |
256 | return self._logger | |
257 | ||
258 | @property | |
259 | def version(self): | |
260 | return self._version | |
261 | ||
3efd9988 FG |
262 | def get_context(self): |
263 | """ | |
264 | :return: a Python capsule containing a C++ CephContext pointer | |
265 | """ | |
266 | return self._ceph_get_context() | |
267 | ||
7c673cae FG |
268 | def notify(self, notify_type, notify_id): |
269 | """ | |
270 | Called by the ceph-mgr service to notify the Python plugin | |
271 | that new state is available. | |
272 | """ | |
273 | pass | |
274 | ||
275 | def serve(self): | |
276 | """ | |
277 | Called by the ceph-mgr service to start any server that | |
278 | is provided by this Python plugin. The implementation | |
279 | of this function should block until ``shutdown`` is called. | |
280 | ||
281 | You *must* implement ``shutdown`` if you implement ``serve`` | |
282 | """ | |
283 | pass | |
284 | ||
285 | def shutdown(self): | |
286 | """ | |
287 | Called by the ceph-mgr service to request that this | |
288 | module drop out of its serve() function. You do not | |
289 | need to implement this if you do not implement serve() | |
290 | ||
291 | :return: None | |
292 | """ | |
293 | pass | |
294 | ||
295 | def get(self, data_name): | |
296 | """ | |
297 | Called by the plugin to load some cluster state from ceph-mgr | |
298 | """ | |
3efd9988 | 299 | return self._ceph_get(data_name) |
7c673cae | 300 | |
94b18763 FG |
301 | def _stattype_to_str(self, stattype): |
302 | ||
303 | typeonly = stattype & self.PERFCOUNTER_TYPE_MASK | |
304 | if typeonly == 0: | |
305 | return 'gauge' | |
306 | if typeonly == self.PERFCOUNTER_LONGRUNAVG: | |
307 | # this lie matches the DaemonState decoding: only val, no counts | |
308 | return 'counter' | |
309 | if typeonly == self.PERFCOUNTER_COUNTER: | |
310 | return 'counter' | |
311 | if typeonly == self.PERFCOUNTER_HISTOGRAM: | |
312 | return 'histogram' | |
313 | ||
314 | return '' | |
315 | ||
7c673cae FG |
316 | def get_server(self, hostname): |
317 | """ | |
318 | Called by the plugin to load information about a particular | |
319 | node from ceph-mgr. | |
320 | ||
321 | :param hostname: a hostame | |
322 | """ | |
3efd9988 | 323 | return self._ceph_get_server(hostname) |
7c673cae | 324 | |
c07f9fc5 FG |
325 | def get_perf_schema(self, svc_type, svc_name): |
326 | """ | |
327 | Called by the plugin to fetch perf counter schema info. | |
328 | svc_name can be nullptr, as can svc_type, in which case | |
329 | they are wildcards | |
330 | ||
331 | :param svc_type: | |
332 | :param svc_name: | |
333 | :return: list of dicts describing the counters requested | |
334 | """ | |
3efd9988 | 335 | return self._ceph_get_perf_schema(svc_type, svc_name) |
c07f9fc5 | 336 | |
7c673cae FG |
337 | def get_counter(self, svc_type, svc_name, path): |
338 | """ | |
339 | Called by the plugin to fetch data for a particular perf counter | |
340 | on a particular service. | |
341 | ||
342 | :param svc_type: | |
343 | :param svc_name: | |
344 | :param path: | |
345 | :return: A list of two-element lists containing time and value | |
346 | """ | |
3efd9988 | 347 | return self._ceph_get_counter(svc_type, svc_name, path) |
7c673cae FG |
348 | |
349 | def list_servers(self): | |
350 | """ | |
351 | Like ``get_server``, but instead of returning information | |
352 | about just one node, return all the nodes in an array. | |
353 | """ | |
3efd9988 | 354 | return self._ceph_get_server(None) |
7c673cae FG |
355 | |
356 | def get_metadata(self, svc_type, svc_id): | |
357 | """ | |
358 | Fetch the metadata for a particular service. | |
359 | ||
224ce89b | 360 | :param svc_type: string (e.g., 'mds', 'osd', 'mon') |
7c673cae FG |
361 | :param svc_id: string |
362 | :return: dict | |
363 | """ | |
3efd9988 | 364 | return self._ceph_get_metadata(svc_type, svc_id) |
7c673cae | 365 | |
224ce89b WB |
366 | def get_daemon_status(self, svc_type, svc_id): |
367 | """ | |
368 | Fetch the latest status for a particular service daemon. | |
369 | ||
370 | :param svc_type: string (e.g., 'rgw') | |
371 | :param svc_id: string | |
372 | :return: dict | |
373 | """ | |
3efd9988 | 374 | return self._ceph_get_daemon_status(svc_type, svc_id) |
224ce89b | 375 | |
7c673cae FG |
376 | def send_command(self, *args, **kwargs): |
377 | """ | |
378 | Called by the plugin to send a command to the mon | |
379 | cluster. | |
380 | """ | |
3efd9988 | 381 | self._ceph_send_command(*args, **kwargs) |
7c673cae | 382 | |
c07f9fc5 FG |
383 | def set_health_checks(self, checks): |
384 | """ | |
385 | Set module's health checks | |
386 | ||
387 | Set the module's current map of health checks. Argument is a | |
388 | dict of check names to info, in this form: | |
389 | ||
390 | { | |
391 | 'CHECK_FOO': { | |
392 | 'severity': 'warning', # or 'error' | |
393 | 'summary': 'summary string', | |
394 | 'detail': [ 'list', 'of', 'detail', 'strings' ], | |
395 | }, | |
396 | 'CHECK_BAR': { | |
397 | 'severity': 'error', | |
398 | 'summary': 'bars are bad', | |
399 | 'detail': [ 'too hard' ], | |
400 | }, | |
401 | } | |
402 | ||
403 | :param list: dict of health check dicts | |
404 | """ | |
3efd9988 | 405 | self._ceph_set_health_checks(checks) |
c07f9fc5 | 406 | |
7c673cae FG |
407 | def handle_command(self, cmd): |
408 | """ | |
409 | Called by ceph-mgr to request the plugin to handle one | |
410 | of the commands that it declared in self.COMMANDS | |
411 | ||
412 | Return a status code, an output buffer, and an | |
413 | output string. The output buffer is for data results, | |
414 | the output string is for informative text. | |
415 | ||
416 | :param cmd: dict, from Ceph's cmdmap_t | |
417 | ||
418 | :return: 3-tuple of (int, str, str) | |
419 | """ | |
420 | ||
421 | # Should never get called if they didn't declare | |
422 | # any ``COMMANDS`` | |
423 | raise NotImplementedError() | |
424 | ||
31f18b77 FG |
425 | def get_mgr_id(self): |
426 | """ | |
427 | Retrieve the mgr id. | |
428 | ||
429 | :return: str | |
430 | """ | |
3efd9988 | 431 | return self._ceph_get_mgr_id() |
31f18b77 | 432 | |
3efd9988 | 433 | def get_config(self, key, default=None): |
7c673cae FG |
434 | """ |
435 | Retrieve the value of a persistent configuration setting | |
436 | ||
437 | :param key: str | |
438 | :return: str | |
439 | """ | |
3efd9988 FG |
440 | r = self._ceph_get_config(key) |
441 | if r is None: | |
442 | return default | |
443 | else: | |
444 | return r | |
7c673cae | 445 | |
31f18b77 FG |
446 | def get_config_prefix(self, key_prefix): |
447 | """ | |
448 | Retrieve a dict of config values with the given prefix | |
449 | ||
450 | :param key_prefix: str | |
451 | :return: str | |
452 | """ | |
3efd9988 | 453 | return self._ceph_get_config_prefix(key_prefix) |
31f18b77 | 454 | |
224ce89b WB |
455 | def get_localized_config(self, key, default=None): |
456 | """ | |
457 | Retrieve localized configuration for this ceph-mgr instance | |
458 | :param key: str | |
459 | :param default: str | |
460 | :return: str | |
461 | """ | |
462 | r = self.get_config(self.get_mgr_id() + '/' + key) | |
463 | if r is None: | |
464 | r = self.get_config(key) | |
465 | ||
466 | if r is None: | |
467 | r = default | |
468 | return r | |
469 | ||
7c673cae FG |
470 | def set_config(self, key, val): |
471 | """ | |
472 | Set the value of a persistent configuration setting | |
473 | ||
474 | :param key: str | |
475 | :param val: str | |
476 | """ | |
3efd9988 | 477 | self._ceph_set_config(key, val) |
7c673cae | 478 | |
224ce89b WB |
479 | def set_localized_config(self, key, val): |
480 | """ | |
481 | Set localized configuration for this ceph-mgr instance | |
482 | :param key: str | |
483 | :param default: str | |
484 | :return: str | |
485 | """ | |
3efd9988 | 486 | return self._ceph_set_config(self.get_mgr_id() + '/' + key, val) |
224ce89b | 487 | |
7c673cae FG |
488 | def set_config_json(self, key, val): |
489 | """ | |
490 | Helper for setting json-serialized-config | |
491 | ||
492 | :param key: str | |
493 | :param val: json-serializable object | |
494 | """ | |
3efd9988 | 495 | self._ceph_set_config(key, json.dumps(val)) |
7c673cae FG |
496 | |
497 | def get_config_json(self, key): | |
498 | """ | |
499 | Helper for getting json-serialized config | |
500 | ||
501 | :param key: str | |
502 | :return: object | |
503 | """ | |
504 | raw = self.get_config(key) | |
505 | if raw is None: | |
506 | return None | |
507 | else: | |
508 | return json.loads(raw) | |
224ce89b WB |
509 | |
510 | def self_test(self): | |
511 | """ | |
512 | Run a self-test on the module. Override this function and implement | |
513 | a best as possible self-test for (automated) testing of the module | |
514 | :return: bool | |
515 | """ | |
516 | pass | |
3efd9988 FG |
517 | |
518 | def get_osdmap(self): | |
519 | """ | |
520 | Get a handle to an OSDMap. If epoch==0, get a handle for the latest | |
521 | OSDMap. | |
522 | :return: OSDMap | |
523 | """ | |
524 | return self._ceph_get_osdmap() | |
525 | ||
526 | def get_all_perf_counters(self, prio_limit=PRIO_USEFUL): | |
527 | """ | |
528 | Return the perf counters currently known to this ceph-mgr | |
529 | instance, filtered by priority equal to or greater than `prio_limit`. | |
530 | ||
531 | The result us a map of string to dict, associating services | |
532 | (like "osd.123") with their counters. The counter | |
533 | dict for each service maps counter paths to a counter | |
534 | info structure, which is the information from | |
535 | the schema, plus an additional "value" member with the latest | |
536 | value. | |
537 | """ | |
538 | ||
539 | result = defaultdict(dict) | |
540 | ||
541 | # TODO: improve C++->Python interface to return just | |
542 | # the latest if that's all we want. | |
543 | def get_latest(daemon_type, daemon_name, counter): | |
544 | data = self.get_counter(daemon_type, daemon_name, counter)[counter] | |
545 | if data: | |
546 | return data[-1][1] | |
547 | else: | |
548 | return 0 | |
549 | ||
550 | for server in self.list_servers(): | |
551 | for service in server['services']: | |
94b18763 | 552 | if service['type'] not in ("rgw", "mds", "osd", "mon"): |
3efd9988 FG |
553 | continue |
554 | ||
555 | schema = self.get_perf_schema(service['type'], service['id']) | |
556 | if not schema: | |
557 | self.log.warn("No perf counter schema for {0}.{1}".format( | |
558 | service['type'], service['id'] | |
559 | )) | |
560 | continue | |
561 | ||
562 | # Value is returned in a potentially-multi-service format, | |
563 | # get just the service we're asking about | |
564 | svc_full_name = "{0}.{1}".format(service['type'], service['id']) | |
565 | schema = schema[svc_full_name] | |
566 | ||
567 | # Populate latest values | |
568 | for counter_path, counter_schema in schema.items(): | |
569 | # self.log.debug("{0}: {1}".format( | |
570 | # counter_path, json.dumps(counter_schema) | |
571 | # )) | |
572 | if counter_schema['priority'] < prio_limit: | |
573 | continue | |
574 | ||
575 | counter_info = counter_schema | |
576 | counter_info['value'] = get_latest(service['type'], service['id'], counter_path) | |
577 | result[svc_full_name][counter_path] = counter_info | |
578 | ||
579 | self.log.debug("returning {0} counter".format(len(result))) | |
580 | ||
581 | return result | |
582 | ||
583 | def set_uri(self, uri): | |
584 | """ | |
585 | If the module exposes a service, then call this to publish the | |
586 | address once it is available. | |
587 | ||
588 | :return: a string | |
589 | """ | |
590 | return self._ceph_set_uri(uri) | |
94b18763 FG |
591 | |
592 | def have_mon_connection(self): | |
593 | """ | |
594 | Check whether this ceph-mgr daemon has an open connection | |
595 | to a monitor. If it doesn't, then it's likely that the | |
596 | information we have about the cluster is out of date, | |
597 | and/or the monitor cluster is down. | |
598 | """ | |
599 | ||
600 | return self._ceph_have_mon_connection() |