]>
Commit | Line | Data |
---|---|---|
1 | import ceph_module # noqa | |
2 | ||
3 | try: | |
4 | from typing import Set, Tuple, Iterator, Any | |
5 | except ImportError: | |
6 | # just for type checking | |
7 | pass | |
8 | import logging | |
9 | import json | |
10 | import six | |
11 | import threading | |
12 | from collections import defaultdict, namedtuple | |
13 | import rados | |
14 | import re | |
15 | import time | |
16 | ||
17 | PG_STATES = [ | |
18 | "active", | |
19 | "clean", | |
20 | "down", | |
21 | "recovery_unfound", | |
22 | "backfill_unfound", | |
23 | "scrubbing", | |
24 | "degraded", | |
25 | "inconsistent", | |
26 | "peering", | |
27 | "repair", | |
28 | "recovering", | |
29 | "forced_recovery", | |
30 | "backfill_wait", | |
31 | "incomplete", | |
32 | "stale", | |
33 | "remapped", | |
34 | "deep", | |
35 | "backfilling", | |
36 | "forced_backfill", | |
37 | "backfill_toofull", | |
38 | "recovery_wait", | |
39 | "recovery_toofull", | |
40 | "undersized", | |
41 | "activating", | |
42 | "peered", | |
43 | "snaptrim", | |
44 | "snaptrim_wait", | |
45 | "snaptrim_error", | |
46 | "creating", | |
47 | "unknown"] | |
48 | ||
49 | ||
50 | class CPlusPlusHandler(logging.Handler): | |
51 | def __init__(self, module_inst): | |
52 | super(CPlusPlusHandler, self).__init__() | |
53 | self._module = module_inst | |
54 | ||
55 | def emit(self, record): | |
56 | if record.levelno <= logging.DEBUG: | |
57 | ceph_level = 20 | |
58 | elif record.levelno <= logging.INFO: | |
59 | ceph_level = 4 | |
60 | elif record.levelno <= logging.WARNING: | |
61 | ceph_level = 1 | |
62 | else: | |
63 | ceph_level = 0 | |
64 | ||
65 | self._module._ceph_log(ceph_level, self.format(record)) | |
66 | ||
67 | ||
68 | def configure_logger(module_inst, module_name): | |
69 | """ | |
70 | Create and configure the logger with the specified module. | |
71 | ||
72 | A handler will be added to the root logger which will redirect | |
73 | the messages from all loggers (incl. 3rd party libraries) to the | |
74 | Ceph log. | |
75 | ||
76 | :param module_inst: The module instance. | |
77 | :type module_inst: instance | |
78 | :param module_name: The module name. | |
79 | :type module_name: str | |
80 | :return: Return the logger with the specified name. | |
81 | """ | |
82 | logger = logging.getLogger(module_name) | |
83 | # Don't filter any logs at the python level, leave it to C++. | |
84 | # FIXME: We should learn the log level from C++ land, and then | |
85 | # avoid calling the C++ level log when we know a message | |
86 | # is of an insufficient level to be ultimately output. | |
87 | logger.setLevel(logging.DEBUG) # Don't use NOTSET | |
88 | ||
89 | root_logger = logging.getLogger() | |
90 | # Add handler to the root logger, thus this module and all | |
91 | # 3rd party libraries will log their messages to the Ceph log. | |
92 | root_logger.addHandler(CPlusPlusHandler(module_inst)) | |
93 | # Set the log level to ``ERROR`` to ensure that we only get | |
94 | # those message from 3rd party libraries (only effective if | |
95 | # they use the default log level ``NOTSET``). | |
96 | # Check https://docs.python.org/3/library/logging.html#logging.Logger.setLevel | |
97 | # for more information about how the effective log level is | |
98 | # determined. | |
99 | root_logger.setLevel(logging.ERROR) | |
100 | ||
101 | return logger | |
102 | ||
103 | ||
104 | def unconfigure_logger(module_name=None): | |
105 | """ | |
106 | :param module_name: The module name. Defaults to ``None``. | |
107 | :type module_name: str or None | |
108 | """ | |
109 | logger = logging.getLogger(module_name) | |
110 | rm_handlers = [ | |
111 | h for h in logger.handlers if isinstance(h, CPlusPlusHandler)] | |
112 | for h in rm_handlers: | |
113 | logger.removeHandler(h) | |
114 | ||
115 | ||
116 | class CommandResult(object): | |
117 | """ | |
118 | Use with MgrModule.send_command | |
119 | """ | |
120 | ||
121 | def __init__(self, tag=None): | |
122 | self.ev = threading.Event() | |
123 | self.outs = "" | |
124 | self.outb = "" | |
125 | self.r = 0 | |
126 | ||
127 | # This is just a convenience for notifications from | |
128 | # C++ land, to avoid passing addresses around in messages. | |
129 | self.tag = tag if tag else "" | |
130 | ||
131 | def complete(self, r, outb, outs): | |
132 | self.r = r | |
133 | self.outb = outb | |
134 | self.outs = outs | |
135 | self.ev.set() | |
136 | ||
137 | def wait(self): | |
138 | self.ev.wait() | |
139 | return self.r, self.outb, self.outs | |
140 | ||
141 | ||
142 | class HandleCommandResult(namedtuple('HandleCommandResult', ['retval', 'stdout', 'stderr'])): | |
143 | def __new__(cls, retval=0, stdout="", stderr=""): | |
144 | """ | |
145 | Tuple containing the result of `handle_command()` | |
146 | ||
147 | Only write to stderr if there is an error, or in extraordinary circumstances | |
148 | ||
149 | Avoid having `ceph foo bar` commands say "did foo bar" on success unless there | |
150 | is critical information to include there. | |
151 | ||
152 | Everything programmatically consumable should be put on stdout | |
153 | ||
154 | :param retval: return code. E.g. 0 or -errno.EINVAL | |
155 | :type retval: int | |
156 | :param stdout: data of this result. | |
157 | :type stdout: str | |
158 | :param stderr: Typically used for error messages. | |
159 | :type stderr: str | |
160 | """ | |
161 | return super(HandleCommandResult, cls).__new__(cls, retval, stdout, stderr) | |
162 | ||
163 | ||
164 | class OSDMap(ceph_module.BasePyOSDMap): | |
165 | def get_epoch(self): | |
166 | return self._get_epoch() | |
167 | ||
168 | def get_crush_version(self): | |
169 | return self._get_crush_version() | |
170 | ||
171 | def dump(self): | |
172 | return self._dump() | |
173 | ||
174 | def get_pools(self): | |
175 | # FIXME: efficient implementation | |
176 | d = self._dump() | |
177 | return dict([(p['pool'], p) for p in d['pools']]) | |
178 | ||
179 | def get_pools_by_name(self): | |
180 | # FIXME: efficient implementation | |
181 | d = self._dump() | |
182 | return dict([(p['pool_name'], p) for p in d['pools']]) | |
183 | ||
184 | def new_incremental(self): | |
185 | return self._new_incremental() | |
186 | ||
187 | def apply_incremental(self, inc): | |
188 | return self._apply_incremental(inc) | |
189 | ||
190 | def get_crush(self): | |
191 | return self._get_crush() | |
192 | ||
193 | def get_pools_by_take(self, take): | |
194 | return self._get_pools_by_take(take).get('pools', []) | |
195 | ||
196 | def calc_pg_upmaps(self, inc, | |
197 | max_deviation=.01, max_iterations=10, pools=None): | |
198 | if pools is None: | |
199 | pools = [] | |
200 | return self._calc_pg_upmaps( | |
201 | inc, | |
202 | max_deviation, max_iterations, pools) | |
203 | ||
204 | def map_pool_pgs_up(self, poolid): | |
205 | return self._map_pool_pgs_up(poolid) | |
206 | ||
207 | def pg_to_up_acting_osds(self, pool_id, ps): | |
208 | return self._pg_to_up_acting_osds(pool_id, ps) | |
209 | ||
210 | def pool_raw_used_rate(self, pool_id): | |
211 | return self._pool_raw_used_rate(pool_id) | |
212 | ||
213 | def get_ec_profile(self, name): | |
214 | # FIXME: efficient implementation | |
215 | d = self._dump() | |
216 | return d['erasure_code_profiles'].get(name, None) | |
217 | ||
218 | ||
219 | class OSDMapIncremental(ceph_module.BasePyOSDMapIncremental): | |
220 | def get_epoch(self): | |
221 | return self._get_epoch() | |
222 | ||
223 | def dump(self): | |
224 | return self._dump() | |
225 | ||
226 | def set_osd_reweights(self, weightmap): | |
227 | """ | |
228 | weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 } | |
229 | """ | |
230 | return self._set_osd_reweights(weightmap) | |
231 | ||
232 | def set_crush_compat_weight_set_weights(self, weightmap): | |
233 | """ | |
234 | weightmap is a dict, int to float. devices only. e.g., | |
235 | { 0: 3.4, 1: 3.3, 2: 3.334 } | |
236 | """ | |
237 | return self._set_crush_compat_weight_set_weights(weightmap) | |
238 | ||
239 | ||
240 | class CRUSHMap(ceph_module.BasePyCRUSH): | |
241 | ITEM_NONE = 0x7fffffff | |
242 | DEFAULT_CHOOSE_ARGS = '-1' | |
243 | ||
244 | def dump(self): | |
245 | return self._dump() | |
246 | ||
247 | def get_item_weight(self, item): | |
248 | return self._get_item_weight(item) | |
249 | ||
250 | def get_item_name(self, item): | |
251 | return self._get_item_name(item) | |
252 | ||
253 | def find_takes(self): | |
254 | return self._find_takes().get('takes', []) | |
255 | ||
256 | def get_take_weight_osd_map(self, root): | |
257 | uglymap = self._get_take_weight_osd_map(root) | |
258 | return {int(k): v for k, v in six.iteritems(uglymap.get('weights', {}))} | |
259 | ||
260 | @staticmethod | |
261 | def have_default_choose_args(dump): | |
262 | return CRUSHMap.DEFAULT_CHOOSE_ARGS in dump.get('choose_args', {}) | |
263 | ||
264 | @staticmethod | |
265 | def get_default_choose_args(dump): | |
266 | return dump.get('choose_args').get(CRUSHMap.DEFAULT_CHOOSE_ARGS, []) | |
267 | ||
268 | def get_rule(self, rule_name): | |
269 | # TODO efficient implementation | |
270 | for rule in self.dump()['rules']: | |
271 | if rule_name == rule['rule_name']: | |
272 | return rule | |
273 | ||
274 | return None | |
275 | ||
276 | def get_rule_by_id(self, rule_id): | |
277 | for rule in self.dump()['rules']: | |
278 | if rule['rule_id'] == rule_id: | |
279 | return rule | |
280 | ||
281 | return None | |
282 | ||
283 | def get_rule_root(self, rule_name): | |
284 | rule = self.get_rule(rule_name) | |
285 | if rule is None: | |
286 | return None | |
287 | ||
288 | try: | |
289 | first_take = [s for s in rule['steps'] if s['op'] == 'take'][0] | |
290 | except IndexError: | |
291 | self.log.warn("CRUSH rule '{0}' has no 'take' step".format( | |
292 | rule_name)) | |
293 | return None | |
294 | else: | |
295 | return first_take['item'] | |
296 | ||
297 | def get_osds_under(self, root_id): | |
298 | # TODO don't abuse dump like this | |
299 | d = self.dump() | |
300 | buckets = dict([(b['id'], b) for b in d['buckets']]) | |
301 | ||
302 | osd_list = [] | |
303 | ||
304 | def accumulate(b): | |
305 | for item in b['items']: | |
306 | if item['id'] >= 0: | |
307 | osd_list.append(item['id']) | |
308 | else: | |
309 | try: | |
310 | accumulate(buckets[item['id']]) | |
311 | except KeyError: | |
312 | pass | |
313 | ||
314 | accumulate(buckets[root_id]) | |
315 | ||
316 | return osd_list | |
317 | ||
318 | def device_class_counts(self): | |
319 | result = defaultdict(int) | |
320 | # TODO don't abuse dump like this | |
321 | d = self.dump() | |
322 | for device in d['devices']: | |
323 | cls = device.get('class', None) | |
324 | result[cls] += 1 | |
325 | ||
326 | return dict(result) | |
327 | ||
328 | ||
329 | class CLICommand(object): | |
330 | COMMANDS = {} | |
331 | ||
332 | def __init__(self, prefix, args="", desc="", perm="rw"): | |
333 | self.prefix = prefix | |
334 | self.args = args | |
335 | self.args_dict = {} | |
336 | self.desc = desc | |
337 | self.perm = perm | |
338 | self.func = None | |
339 | self._parse_args() | |
340 | ||
341 | def _parse_args(self): | |
342 | if not self.args: | |
343 | return | |
344 | args = self.args.split(" ") | |
345 | for arg in args: | |
346 | arg_desc = arg.strip().split(",") | |
347 | arg_d = {} | |
348 | for kv in arg_desc: | |
349 | k, v = kv.split("=") | |
350 | if k != "name": | |
351 | arg_d[k] = v | |
352 | else: | |
353 | self.args_dict[v] = arg_d | |
354 | ||
355 | def __call__(self, func): | |
356 | self.func = func | |
357 | self.COMMANDS[self.prefix] = self | |
358 | return self.func | |
359 | ||
360 | def call(self, mgr, cmd_dict, inbuf): | |
361 | kwargs = {} | |
362 | for a, d in self.args_dict.items(): | |
363 | if 'req' in d and d['req'] == "false" and a not in cmd_dict: | |
364 | continue | |
365 | kwargs[a.replace("-", "_")] = cmd_dict[a] | |
366 | if inbuf: | |
367 | kwargs['inbuf'] = inbuf | |
368 | return self.func(mgr, **kwargs) | |
369 | ||
370 | @classmethod | |
371 | def dump_cmd_list(cls): | |
372 | return [{ | |
373 | 'cmd': '{} {}'.format(cmd.prefix, cmd.args), | |
374 | 'desc': cmd.desc, | |
375 | 'perm': cmd.perm | |
376 | } for _, cmd in cls.COMMANDS.items()] | |
377 | ||
378 | ||
379 | def CLIReadCommand(prefix, args="", desc=""): | |
380 | return CLICommand(prefix, args, desc, "r") | |
381 | ||
382 | ||
383 | def CLIWriteCommand(prefix, args="", desc=""): | |
384 | return CLICommand(prefix, args, desc, "w") | |
385 | ||
386 | ||
387 | def _get_localized_key(prefix, key): | |
388 | return '{}/{}'.format(prefix, key) | |
389 | ||
390 | ||
391 | class Option(dict): | |
392 | """ | |
393 | Helper class to declare options for MODULE_OPTIONS list. | |
394 | ||
395 | Caveat: it uses argument names matching Python keywords (type, min, max), | |
396 | so any further processing should happen in a separate method. | |
397 | ||
398 | TODO: type validation. | |
399 | """ | |
400 | ||
401 | def __init__( | |
402 | self, name, | |
403 | default=None, | |
404 | type='str', | |
405 | desc=None, longdesc=None, | |
406 | min=None, max=None, | |
407 | enum_allowed=None, | |
408 | see_also=None, | |
409 | tags=None, | |
410 | runtime=False, | |
411 | ): | |
412 | super(Option, self).__init__( | |
413 | (k, v) for k, v in vars().items() | |
414 | if k != 'self' and v is not None) | |
415 | ||
416 | ||
417 | class MgrStandbyModule(ceph_module.BaseMgrStandbyModule): | |
418 | """ | |
419 | Standby modules only implement a serve and shutdown method, they | |
420 | are not permitted to implement commands and they do not receive | |
421 | any notifications. | |
422 | ||
423 | They only have access to the mgrmap (for accessing service URI info | |
424 | from their active peer), and to configuration settings (read only). | |
425 | """ | |
426 | ||
427 | MODULE_OPTIONS = [] | |
428 | MODULE_OPTION_DEFAULTS = {} | |
429 | ||
430 | def __init__(self, module_name, capsule): | |
431 | super(MgrStandbyModule, self).__init__(capsule) | |
432 | self.module_name = module_name | |
433 | self._logger = configure_logger(self, module_name) | |
434 | # see also MgrModule.__init__() | |
435 | for o in self.MODULE_OPTIONS: | |
436 | if 'default' in o: | |
437 | if 'type' in o: | |
438 | self.MODULE_OPTION_DEFAULTS[o['name']] = o['default'] | |
439 | else: | |
440 | self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default']) | |
441 | ||
442 | def __del__(self): | |
443 | unconfigure_logger() | |
444 | ||
445 | @property | |
446 | def log(self): | |
447 | return self._logger | |
448 | ||
449 | def serve(self): | |
450 | """ | |
451 | The serve method is mandatory for standby modules. | |
452 | :return: | |
453 | """ | |
454 | raise NotImplementedError() | |
455 | ||
456 | def get_mgr_id(self): | |
457 | return self._ceph_get_mgr_id() | |
458 | ||
459 | def get_module_option(self, key, default=None): | |
460 | """ | |
461 | Retrieve the value of a persistent configuration setting | |
462 | ||
463 | :param str key: | |
464 | :param default: the default value of the config if it is not found | |
465 | :return: str | |
466 | """ | |
467 | r = self._ceph_get_module_option(key) | |
468 | if r is None: | |
469 | return self.MODULE_OPTION_DEFAULTS.get(key, default) | |
470 | else: | |
471 | return r | |
472 | ||
473 | def get_ceph_option(self, key): | |
474 | return self._ceph_get_option(key) | |
475 | ||
476 | def get_store(self, key): | |
477 | """ | |
478 | Retrieve the value of a persistent KV store entry | |
479 | ||
480 | :param key: String | |
481 | :return: Byte string or None | |
482 | """ | |
483 | return self._ceph_get_store(key) | |
484 | ||
485 | def get_active_uri(self): | |
486 | return self._ceph_get_active_uri() | |
487 | ||
488 | def get_localized_module_option(self, key, default=None): | |
489 | r = self._ceph_get_module_option(key, self.get_mgr_id()) | |
490 | if r is None: | |
491 | return self.MODULE_OPTION_DEFAULTS.get(key, default) | |
492 | else: | |
493 | return r | |
494 | ||
495 | ||
496 | class MgrModule(ceph_module.BaseMgrModule): | |
497 | COMMANDS = [] | |
498 | MODULE_OPTIONS = [] | |
499 | MODULE_OPTION_DEFAULTS = {} | |
500 | ||
501 | # Priority definitions for perf counters | |
502 | PRIO_CRITICAL = 10 | |
503 | PRIO_INTERESTING = 8 | |
504 | PRIO_USEFUL = 5 | |
505 | PRIO_UNINTERESTING = 2 | |
506 | PRIO_DEBUGONLY = 0 | |
507 | ||
508 | # counter value types | |
509 | PERFCOUNTER_TIME = 1 | |
510 | PERFCOUNTER_U64 = 2 | |
511 | ||
512 | # counter types | |
513 | PERFCOUNTER_LONGRUNAVG = 4 | |
514 | PERFCOUNTER_COUNTER = 8 | |
515 | PERFCOUNTER_HISTOGRAM = 0x10 | |
516 | PERFCOUNTER_TYPE_MASK = ~3 | |
517 | ||
518 | # units supported | |
519 | BYTES = 0 | |
520 | NONE = 1 | |
521 | ||
522 | # Cluster log priorities | |
523 | CLUSTER_LOG_PRIO_DEBUG = 0 | |
524 | CLUSTER_LOG_PRIO_INFO = 1 | |
525 | CLUSTER_LOG_PRIO_SEC = 2 | |
526 | CLUSTER_LOG_PRIO_WARN = 3 | |
527 | CLUSTER_LOG_PRIO_ERROR = 4 | |
528 | ||
529 | def __init__(self, module_name, py_modules_ptr, this_ptr): | |
530 | self.module_name = module_name | |
531 | ||
532 | # If we're taking over from a standby module, let's make sure | |
533 | # its logger was unconfigured before we hook ours up | |
534 | unconfigure_logger() | |
535 | self._logger = configure_logger(self, module_name) | |
536 | ||
537 | super(MgrModule, self).__init__(py_modules_ptr, this_ptr) | |
538 | ||
539 | self._version = self._ceph_get_version() | |
540 | ||
541 | self._perf_schema_cache = None | |
542 | ||
543 | # Keep a librados instance for those that need it. | |
544 | self._rados = None | |
545 | ||
546 | for o in self.MODULE_OPTIONS: | |
547 | if 'default' in o: | |
548 | if 'type' in o: | |
549 | # we'll assume the declared type matches the | |
550 | # supplied default value's type. | |
551 | self.MODULE_OPTION_DEFAULTS[o['name']] = o['default'] | |
552 | else: | |
553 | # module not declaring it's type, so normalize the | |
554 | # default value to be a string for consistent behavior | |
555 | # with default and user-supplied option values. | |
556 | self.MODULE_OPTION_DEFAULTS[o['name']] = str(o['default']) | |
557 | ||
558 | def __del__(self): | |
559 | unconfigure_logger() | |
560 | ||
561 | @classmethod | |
562 | def _register_commands(cls): | |
563 | cls.COMMANDS.extend(CLICommand.dump_cmd_list()) | |
564 | ||
565 | @property | |
566 | def log(self): | |
567 | return self._logger | |
568 | ||
569 | def cluster_log(self, channel, priority, message): | |
570 | """ | |
571 | :param channel: The log channel. This can be 'cluster', 'audit', ... | |
572 | :type channel: str | |
573 | :param priority: The log message priority. This can be | |
574 | CLUSTER_LOG_PRIO_DEBUG, CLUSTER_LOG_PRIO_INFO, | |
575 | CLUSTER_LOG_PRIO_SEC, CLUSTER_LOG_PRIO_WARN or | |
576 | CLUSTER_LOG_PRIO_ERROR. | |
577 | :type priority: int | |
578 | :param message: The message to log. | |
579 | :type message: str | |
580 | """ | |
581 | self._ceph_cluster_log(channel, priority, message) | |
582 | ||
583 | @property | |
584 | def version(self): | |
585 | return self._version | |
586 | ||
587 | def get_context(self): | |
588 | """ | |
589 | :return: a Python capsule containing a C++ CephContext pointer | |
590 | """ | |
591 | return self._ceph_get_context() | |
592 | ||
593 | def notify(self, notify_type, notify_id): | |
594 | """ | |
595 | Called by the ceph-mgr service to notify the Python plugin | |
596 | that new state is available. | |
597 | ||
598 | :param notify_type: string indicating what kind of notification, | |
599 | such as osd_map, mon_map, fs_map, mon_status, | |
600 | health, pg_summary, command, service_map | |
601 | :param notify_id: string (may be empty) that optionally specifies | |
602 | which entity is being notified about. With | |
603 | "command" notifications this is set to the tag | |
604 | ``from send_command``. | |
605 | """ | |
606 | pass | |
607 | ||
608 | def config_notify(self): | |
609 | """ | |
610 | Called by the ceph-mgr service to notify the Python plugin | |
611 | that the configuration may have changed. Modules will want to | |
612 | refresh any configuration values stored in config variables. | |
613 | """ | |
614 | pass | |
615 | ||
616 | def serve(self): | |
617 | """ | |
618 | Called by the ceph-mgr service to start any server that | |
619 | is provided by this Python plugin. The implementation | |
620 | of this function should block until ``shutdown`` is called. | |
621 | ||
622 | You *must* implement ``shutdown`` if you implement ``serve`` | |
623 | """ | |
624 | pass | |
625 | ||
626 | def shutdown(self): | |
627 | """ | |
628 | Called by the ceph-mgr service to request that this | |
629 | module drop out of its serve() function. You do not | |
630 | need to implement this if you do not implement serve() | |
631 | ||
632 | :return: None | |
633 | """ | |
634 | if self._rados: | |
635 | self._rados.shutdown() | |
636 | ||
637 | def get(self, data_name): | |
638 | """ | |
639 | Called by the plugin to fetch named cluster-wide objects from ceph-mgr. | |
640 | ||
641 | :param str data_name: Valid things to fetch are osd_crush_map_text, | |
642 | osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map, | |
643 | osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats, | |
644 | health, mon_status, devices, device <devid>. | |
645 | ||
646 | Note: | |
647 | All these structures have their own JSON representations: experiment | |
648 | or look at the C++ ``dump()`` methods to learn about them. | |
649 | """ | |
650 | return self._ceph_get(data_name) | |
651 | ||
652 | def _stattype_to_str(self, stattype): | |
653 | ||
654 | typeonly = stattype & self.PERFCOUNTER_TYPE_MASK | |
655 | if typeonly == 0: | |
656 | return 'gauge' | |
657 | if typeonly == self.PERFCOUNTER_LONGRUNAVG: | |
658 | # this lie matches the DaemonState decoding: only val, no counts | |
659 | return 'counter' | |
660 | if typeonly == self.PERFCOUNTER_COUNTER: | |
661 | return 'counter' | |
662 | if typeonly == self.PERFCOUNTER_HISTOGRAM: | |
663 | return 'histogram' | |
664 | ||
665 | return '' | |
666 | ||
667 | def _perfpath_to_path_labels(self, daemon, path): | |
668 | label_names = ("ceph_daemon",) | |
669 | labels = (daemon,) | |
670 | ||
671 | if daemon.startswith('rbd-mirror.'): | |
672 | match = re.match( | |
673 | r'^rbd_mirror_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$', | |
674 | path | |
675 | ) | |
676 | if match: | |
677 | path = 'rbd_mirror_' + match.group(4) | |
678 | pool = match.group(1) | |
679 | namespace = match.group(2) or '' | |
680 | image = match.group(3) | |
681 | label_names += ('pool', 'namespace', 'image') | |
682 | labels += (pool, namespace, image) | |
683 | ||
684 | return path, label_names, labels, | |
685 | ||
686 | def _perfvalue_to_value(self, stattype, value): | |
687 | if stattype & self.PERFCOUNTER_TIME: | |
688 | # Convert from ns to seconds | |
689 | return value / 1000000000.0 | |
690 | else: | |
691 | return value | |
692 | ||
693 | def _unit_to_str(self, unit): | |
694 | if unit == self.NONE: | |
695 | return "/s" | |
696 | elif unit == self.BYTES: | |
697 | return "B/s" | |
698 | ||
699 | @staticmethod | |
700 | def to_pretty_iec(n): | |
701 | for bits, suffix in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'), | |
702 | (20, 'Mi'), (10, 'Ki')]: | |
703 | if n > 10 << bits: | |
704 | return str(n >> bits) + ' ' + suffix | |
705 | return str(n) + ' ' | |
706 | ||
707 | @staticmethod | |
708 | def get_pretty_row(elems, width): | |
709 | """ | |
710 | Takes an array of elements and returns a string with those elements | |
711 | formatted as a table row. Useful for polling modules. | |
712 | ||
713 | :param elems: the elements to be printed | |
714 | :param width: the width of the terminal | |
715 | """ | |
716 | n = len(elems) | |
717 | column_width = int(width / n) | |
718 | ||
719 | ret = '|' | |
720 | for elem in elems: | |
721 | ret += '{0:>{w}} |'.format(elem, w=column_width - 2) | |
722 | ||
723 | return ret | |
724 | ||
725 | def get_pretty_header(self, elems, width): | |
726 | """ | |
727 | Like ``get_pretty_row`` but adds dashes, to be used as a table title. | |
728 | ||
729 | :param elems: the elements to be printed | |
730 | :param width: the width of the terminal | |
731 | """ | |
732 | n = len(elems) | |
733 | column_width = int(width / n) | |
734 | ||
735 | # dash line | |
736 | ret = '+' | |
737 | for i in range(0, n): | |
738 | ret += '-' * (column_width - 1) + '+' | |
739 | ret += '\n' | |
740 | ||
741 | # title | |
742 | ret += self.get_pretty_row(elems, width) | |
743 | ret += '\n' | |
744 | ||
745 | # dash line | |
746 | ret += '+' | |
747 | for i in range(0, n): | |
748 | ret += '-' * (column_width - 1) + '+' | |
749 | ret += '\n' | |
750 | ||
751 | return ret | |
752 | ||
753 | def get_server(self, hostname): | |
754 | """ | |
755 | Called by the plugin to fetch metadata about a particular hostname from | |
756 | ceph-mgr. | |
757 | ||
758 | This is information that ceph-mgr has gleaned from the daemon metadata | |
759 | reported by daemons running on a particular server. | |
760 | ||
761 | :param hostname: a hostname | |
762 | """ | |
763 | return self._ceph_get_server(hostname) | |
764 | ||
765 | def get_perf_schema(self, svc_type, svc_name): | |
766 | """ | |
767 | Called by the plugin to fetch perf counter schema info. | |
768 | svc_name can be nullptr, as can svc_type, in which case | |
769 | they are wildcards | |
770 | ||
771 | :param str svc_type: | |
772 | :param str svc_name: | |
773 | :return: list of dicts describing the counters requested | |
774 | """ | |
775 | return self._ceph_get_perf_schema(svc_type, svc_name) | |
776 | ||
777 | def get_counter(self, svc_type, svc_name, path): | |
778 | """ | |
779 | Called by the plugin to fetch the latest performance counter data for a | |
780 | particular counter on a particular service. | |
781 | ||
782 | :param str svc_type: | |
783 | :param str svc_name: | |
784 | :param str path: a period-separated concatenation of the subsystem and the | |
785 | counter name, for example "mds.inodes". | |
786 | :return: A list of two-tuples of (timestamp, value) is returned. This may be | |
787 | empty if no data is available. | |
788 | """ | |
789 | return self._ceph_get_counter(svc_type, svc_name, path) | |
790 | ||
791 | def get_latest_counter(self, svc_type, svc_name, path): | |
792 | """ | |
793 | Called by the plugin to fetch only the newest performance counter data | |
794 | pointfor a particular counter on a particular service. | |
795 | ||
796 | :param str svc_type: | |
797 | :param str svc_name: | |
798 | :param str path: a period-separated concatenation of the subsystem and the | |
799 | counter name, for example "mds.inodes". | |
800 | :return: A list of two-tuples of (timestamp, value) is returned. This may be | |
801 | empty if no data is available. | |
802 | """ | |
803 | return self._ceph_get_latest_counter(svc_type, svc_name, path) | |
804 | ||
805 | def list_servers(self): | |
806 | """ | |
807 | Like ``get_server``, but gives information about all servers (i.e. all | |
808 | unique hostnames that have been mentioned in daemon metadata) | |
809 | ||
810 | :return: a list of information about all servers | |
811 | :rtype: list | |
812 | """ | |
813 | return self._ceph_get_server(None) | |
814 | ||
815 | def get_metadata(self, svc_type, svc_id): | |
816 | """ | |
817 | Fetch the daemon metadata for a particular service. | |
818 | ||
819 | ceph-mgr fetches metadata asynchronously, so are windows of time during | |
820 | addition/removal of services where the metadata is not available to | |
821 | modules. ``None`` is returned if no metadata is available. | |
822 | ||
823 | :param str svc_type: service type (e.g., 'mds', 'osd', 'mon') | |
824 | :param str svc_id: service id. convert OSD integer IDs to strings when | |
825 | calling this | |
826 | :rtype: dict, or None if no metadata found | |
827 | """ | |
828 | return self._ceph_get_metadata(svc_type, svc_id) | |
829 | ||
830 | def get_daemon_status(self, svc_type, svc_id): | |
831 | """ | |
832 | Fetch the latest status for a particular service daemon. | |
833 | ||
834 | This method may return ``None`` if no status information is | |
835 | available, for example because the daemon hasn't fully started yet. | |
836 | ||
837 | :param svc_type: string (e.g., 'rgw') | |
838 | :param svc_id: string | |
839 | :return: dict, or None if the service is not found | |
840 | """ | |
841 | return self._ceph_get_daemon_status(svc_type, svc_id) | |
842 | ||
843 | def mon_command(self, cmd_dict): | |
844 | """ | |
845 | Helper for modules that do simple, synchronous mon command | |
846 | execution. | |
847 | ||
848 | See send_command for general case. | |
849 | ||
850 | :return: status int, out std, err str | |
851 | """ | |
852 | ||
853 | t1 = time.time() | |
854 | result = CommandResult() | |
855 | self.send_command(result, "mon", "", json.dumps(cmd_dict), "") | |
856 | r = result.wait() | |
857 | t2 = time.time() | |
858 | ||
859 | self.log.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format( | |
860 | cmd_dict['prefix'], r[0], t2 - t1 | |
861 | )) | |
862 | ||
863 | return r | |
864 | ||
865 | def send_command(self, *args, **kwargs): | |
866 | """ | |
867 | Called by the plugin to send a command to the mon | |
868 | cluster. | |
869 | ||
870 | :param CommandResult result: an instance of the ``CommandResult`` | |
871 | class, defined in the same module as MgrModule. This acts as a | |
872 | completion and stores the output of the command. Use | |
873 | ``CommandResult.wait()`` if you want to block on completion. | |
874 | :param str svc_type: | |
875 | :param str svc_id: | |
876 | :param str command: a JSON-serialized command. This uses the same | |
877 | format as the ceph command line, which is a dictionary of command | |
878 | arguments, with the extra ``prefix`` key containing the command | |
879 | name itself. Consult MonCommands.h for available commands and | |
880 | their expected arguments. | |
881 | :param str tag: used for nonblocking operation: when a command | |
882 | completes, the ``notify()`` callback on the MgrModule instance is | |
883 | triggered, with notify_type set to "command", and notify_id set to | |
884 | the tag of the command. | |
885 | """ | |
886 | self._ceph_send_command(*args, **kwargs) | |
887 | ||
888 | def set_health_checks(self, checks): | |
889 | """ | |
890 | Set the module's current map of health checks. Argument is a | |
891 | dict of check names to info, in this form: | |
892 | ||
893 | :: | |
894 | ||
895 | { | |
896 | 'CHECK_FOO': { | |
897 | 'severity': 'warning', # or 'error' | |
898 | 'summary': 'summary string', | |
899 | 'detail': [ 'list', 'of', 'detail', 'strings' ], | |
900 | }, | |
901 | 'CHECK_BAR': { | |
902 | 'severity': 'error', | |
903 | 'summary': 'bars are bad', | |
904 | 'detail': [ 'too hard' ], | |
905 | }, | |
906 | } | |
907 | ||
908 | :param list: dict of health check dicts | |
909 | """ | |
910 | self._ceph_set_health_checks(checks) | |
911 | ||
912 | def _handle_command(self, inbuf, cmd): | |
913 | if cmd['prefix'] not in CLICommand.COMMANDS: | |
914 | return self.handle_command(inbuf, cmd) | |
915 | return CLICommand.COMMANDS[cmd['prefix']].call(self, cmd, inbuf) | |
916 | ||
917 | def handle_command(self, inbuf, cmd): | |
918 | """ | |
919 | Called by ceph-mgr to request the plugin to handle one | |
920 | of the commands that it declared in self.COMMANDS | |
921 | ||
922 | Return a status code, an output buffer, and an | |
923 | output string. The output buffer is for data results, | |
924 | the output string is for informative text. | |
925 | ||
926 | :param inbuf: content of any "-i <file>" supplied to ceph cli | |
927 | :type inbuf: str | |
928 | :param cmd: from Ceph's cmdmap_t | |
929 | :type cmd: dict | |
930 | ||
931 | :return: HandleCommandResult or a 3-tuple of (int, str, str) | |
932 | """ | |
933 | ||
934 | # Should never get called if they didn't declare | |
935 | # any ``COMMANDS`` | |
936 | raise NotImplementedError() | |
937 | ||
938 | def get_mgr_id(self): | |
939 | """ | |
940 | Retrieve the name of the manager daemon where this plugin | |
941 | is currently being executed (i.e. the active manager). | |
942 | ||
943 | :return: str | |
944 | """ | |
945 | return self._ceph_get_mgr_id() | |
946 | ||
947 | def get_ceph_option(self, key): | |
948 | return self._ceph_get_option(key) | |
949 | ||
950 | def _validate_module_option(self, key): | |
951 | """ | |
952 | Helper: don't allow get/set config callers to | |
953 | access config options that they didn't declare | |
954 | in their schema. | |
955 | """ | |
956 | if key not in [o['name'] for o in self.MODULE_OPTIONS]: | |
957 | raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS". | |
958 | format(key, self.__class__.__name__)) | |
959 | ||
960 | def _get_module_option(self, key, default, localized_prefix=""): | |
961 | r = self._ceph_get_module_option(self.module_name, key, | |
962 | localized_prefix) | |
963 | if r is None: | |
964 | return self.MODULE_OPTION_DEFAULTS.get(key, default) | |
965 | else: | |
966 | return r | |
967 | ||
968 | def get_module_option(self, key, default=None): | |
969 | """ | |
970 | Retrieve the value of a persistent configuration setting | |
971 | ||
972 | :param str key: | |
973 | :param str default: | |
974 | :return: str | |
975 | """ | |
976 | self._validate_module_option(key) | |
977 | return self._get_module_option(key, default) | |
978 | ||
979 | def get_module_option_ex(self, module, key, default=None): | |
980 | """ | |
981 | Retrieve the value of a persistent configuration setting | |
982 | for the specified module. | |
983 | ||
984 | :param str module: The name of the module, e.g. 'dashboard' | |
985 | or 'telemetry'. | |
986 | :param str key: The configuration key, e.g. 'server_addr'. | |
987 | :param str,None default: The default value to use when the | |
988 | returned value is ``None``. Defaults to ``None``. | |
989 | :return: str,int,bool,float,None | |
990 | """ | |
991 | if module == self.module_name: | |
992 | self._validate_module_option(key) | |
993 | r = self._ceph_get_module_option(module, key) | |
994 | return default if r is None else r | |
995 | ||
996 | def get_store_prefix(self, key_prefix): | |
997 | """ | |
998 | Retrieve a dict of KV store keys to values, where the keys | |
999 | have the given prefix | |
1000 | ||
1001 | :param str key_prefix: | |
1002 | :return: str | |
1003 | """ | |
1004 | return self._ceph_get_store_prefix(key_prefix) | |
1005 | ||
1006 | def _set_localized(self, key, val, setter): | |
1007 | return setter(_get_localized_key(self.get_mgr_id(), key), val) | |
1008 | ||
1009 | def get_localized_module_option(self, key, default=None): | |
1010 | """ | |
1011 | Retrieve localized configuration for this ceph-mgr instance | |
1012 | :param str key: | |
1013 | :param str default: | |
1014 | :return: str | |
1015 | """ | |
1016 | self._validate_module_option(key) | |
1017 | return self._get_module_option(key, default, self.get_mgr_id()) | |
1018 | ||
1019 | def _set_module_option(self, key, val): | |
1020 | return self._ceph_set_module_option(self.module_name, key, str(val)) | |
1021 | ||
1022 | def set_module_option(self, key, val): | |
1023 | """ | |
1024 | Set the value of a persistent configuration setting | |
1025 | ||
1026 | :param str key: | |
1027 | :type val: str | None | |
1028 | """ | |
1029 | self._validate_module_option(key) | |
1030 | return self._set_module_option(key, val) | |
1031 | ||
1032 | def set_module_option_ex(self, module, key, val): | |
1033 | """ | |
1034 | Set the value of a persistent configuration setting | |
1035 | for the specified module. | |
1036 | ||
1037 | :param str module: | |
1038 | :param str key: | |
1039 | :param str val: | |
1040 | """ | |
1041 | if module == self.module_name: | |
1042 | self._validate_module_option(key) | |
1043 | return self._ceph_set_module_option(module, key, str(val)) | |
1044 | ||
1045 | def set_localized_module_option(self, key, val): | |
1046 | """ | |
1047 | Set localized configuration for this ceph-mgr instance | |
1048 | :param str key: | |
1049 | :param str val: | |
1050 | :return: str | |
1051 | """ | |
1052 | self._validate_module_option(key) | |
1053 | return self._set_localized(key, val, self._set_module_option) | |
1054 | ||
1055 | def set_store(self, key, val): | |
1056 | """ | |
1057 | Set a value in this module's persistent key value store. | |
1058 | If val is None, remove key from store | |
1059 | ||
1060 | :param str key: | |
1061 | :param str val: | |
1062 | """ | |
1063 | self._ceph_set_store(key, val) | |
1064 | ||
1065 | def get_store(self, key, default=None): | |
1066 | """ | |
1067 | Get a value from this module's persistent key value store | |
1068 | """ | |
1069 | r = self._ceph_get_store(key) | |
1070 | if r is None: | |
1071 | return default | |
1072 | else: | |
1073 | return r | |
1074 | ||
1075 | def get_localized_store(self, key, default=None): | |
1076 | r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key)) | |
1077 | if r is None: | |
1078 | r = self._ceph_get_store(key) | |
1079 | if r is None: | |
1080 | r = default | |
1081 | return r | |
1082 | ||
1083 | def set_localized_store(self, key, val): | |
1084 | return self._set_localized(key, val, self.set_store) | |
1085 | ||
1086 | def self_test(self): | |
1087 | """ | |
1088 | Run a self-test on the module. Override this function and implement | |
1089 | a best as possible self-test for (automated) testing of the module | |
1090 | ||
1091 | Indicate any failures by raising an exception. This does not have | |
1092 | to be pretty, it's mainly for picking up regressions during | |
1093 | development, rather than use in the field. | |
1094 | ||
1095 | :return: None, or an advisory string for developer interest, such | |
1096 | as a json dump of some state. | |
1097 | """ | |
1098 | pass | |
1099 | ||
1100 | def get_osdmap(self): | |
1101 | """ | |
1102 | Get a handle to an OSDMap. If epoch==0, get a handle for the latest | |
1103 | OSDMap. | |
1104 | :return: OSDMap | |
1105 | """ | |
1106 | return self._ceph_get_osdmap() | |
1107 | ||
1108 | def get_latest(self, daemon_type, daemon_name, counter): | |
1109 | data = self.get_latest_counter( | |
1110 | daemon_type, daemon_name, counter)[counter] | |
1111 | if data: | |
1112 | return data[1] | |
1113 | else: | |
1114 | return 0 | |
1115 | ||
1116 | def get_latest_avg(self, daemon_type, daemon_name, counter): | |
1117 | data = self.get_latest_counter( | |
1118 | daemon_type, daemon_name, counter)[counter] | |
1119 | if data: | |
1120 | return data[1], data[2] | |
1121 | else: | |
1122 | return 0, 0 | |
1123 | ||
1124 | def get_all_perf_counters(self, prio_limit=PRIO_USEFUL, | |
1125 | services=("mds", "mon", "osd", | |
1126 | "rbd-mirror", "rgw", "tcmu-runner")): | |
1127 | """ | |
1128 | Return the perf counters currently known to this ceph-mgr | |
1129 | instance, filtered by priority equal to or greater than `prio_limit`. | |
1130 | ||
1131 | The result is a map of string to dict, associating services | |
1132 | (like "osd.123") with their counters. The counter | |
1133 | dict for each service maps counter paths to a counter | |
1134 | info structure, which is the information from | |
1135 | the schema, plus an additional "value" member with the latest | |
1136 | value. | |
1137 | """ | |
1138 | ||
1139 | result = defaultdict(dict) | |
1140 | ||
1141 | for server in self.list_servers(): | |
1142 | for service in server['services']: | |
1143 | if service['type'] not in services: | |
1144 | continue | |
1145 | ||
1146 | schema = self.get_perf_schema(service['type'], service['id']) | |
1147 | if not schema: | |
1148 | self.log.warn("No perf counter schema for {0}.{1}".format( | |
1149 | service['type'], service['id'] | |
1150 | )) | |
1151 | continue | |
1152 | ||
1153 | # Value is returned in a potentially-multi-service format, | |
1154 | # get just the service we're asking about | |
1155 | svc_full_name = "{0}.{1}".format( | |
1156 | service['type'], service['id']) | |
1157 | schema = schema[svc_full_name] | |
1158 | ||
1159 | # Populate latest values | |
1160 | for counter_path, counter_schema in schema.items(): | |
1161 | # self.log.debug("{0}: {1}".format( | |
1162 | # counter_path, json.dumps(counter_schema) | |
1163 | # )) | |
1164 | if counter_schema['priority'] < prio_limit: | |
1165 | continue | |
1166 | ||
1167 | counter_info = dict(counter_schema) | |
1168 | ||
1169 | # Also populate count for the long running avgs | |
1170 | if counter_schema['type'] & self.PERFCOUNTER_LONGRUNAVG: | |
1171 | v, c = self.get_latest_avg( | |
1172 | service['type'], | |
1173 | service['id'], | |
1174 | counter_path | |
1175 | ) | |
1176 | counter_info['value'], counter_info['count'] = v, c | |
1177 | result[svc_full_name][counter_path] = counter_info | |
1178 | else: | |
1179 | counter_info['value'] = self.get_latest( | |
1180 | service['type'], | |
1181 | service['id'], | |
1182 | counter_path | |
1183 | ) | |
1184 | ||
1185 | result[svc_full_name][counter_path] = counter_info | |
1186 | ||
1187 | self.log.debug("returning {0} counter".format(len(result))) | |
1188 | ||
1189 | return result | |
1190 | ||
1191 | def set_uri(self, uri): | |
1192 | """ | |
1193 | If the module exposes a service, then call this to publish the | |
1194 | address once it is available. | |
1195 | ||
1196 | :return: a string | |
1197 | """ | |
1198 | return self._ceph_set_uri(uri) | |
1199 | ||
1200 | def have_mon_connection(self): | |
1201 | """ | |
1202 | Check whether this ceph-mgr daemon has an open connection | |
1203 | to a monitor. If it doesn't, then it's likely that the | |
1204 | information we have about the cluster is out of date, | |
1205 | and/or the monitor cluster is down. | |
1206 | """ | |
1207 | ||
1208 | return self._ceph_have_mon_connection() | |
1209 | ||
1210 | def update_progress_event(self, evid, desc, progress): | |
1211 | return self._ceph_update_progress_event(str(evid), str(desc), float(progress)) | |
1212 | ||
1213 | def complete_progress_event(self, evid): | |
1214 | return self._ceph_complete_progress_event(str(evid)) | |
1215 | ||
1216 | def clear_all_progress_events(self): | |
1217 | return self._ceph_clear_all_progress_events() | |
1218 | ||
1219 | @property | |
1220 | def rados(self): | |
1221 | """ | |
1222 | A librados instance to be shared by any classes within | |
1223 | this mgr module that want one. | |
1224 | """ | |
1225 | if self._rados: | |
1226 | return self._rados | |
1227 | ||
1228 | ctx_capsule = self.get_context() | |
1229 | self._rados = rados.Rados(context=ctx_capsule) | |
1230 | self._rados.connect() | |
1231 | ||
1232 | return self._rados | |
1233 | ||
1234 | @staticmethod | |
1235 | def can_run(): | |
1236 | """ | |
1237 | Implement this function to report whether the module's dependencies | |
1238 | are met. For example, if the module needs to import a particular | |
1239 | dependency to work, then use a try/except around the import at | |
1240 | file scope, and then report here if the import failed. | |
1241 | ||
1242 | This will be called in a blocking way from the C++ code, so do not | |
1243 | do any I/O that could block in this function. | |
1244 | ||
1245 | :return a 2-tuple consisting of a boolean and explanatory string | |
1246 | """ | |
1247 | ||
1248 | return True, "" | |
1249 | ||
1250 | def remote(self, module_name, method_name, *args, **kwargs): | |
1251 | """ | |
1252 | Invoke a method on another module. All arguments, and the return | |
1253 | value from the other module must be serializable. | |
1254 | ||
1255 | Limitation: Do not import any modules within the called method. | |
1256 | Otherwise you will get an error in Python 2:: | |
1257 | ||
1258 | RuntimeError('cannot unmarshal code objects in restricted execution mode',) | |
1259 | ||
1260 | ||
1261 | ||
1262 | :param module_name: Name of other module. If module isn't loaded, | |
1263 | an ImportError exception is raised. | |
1264 | :param method_name: Method name. If it does not exist, a NameError | |
1265 | exception is raised. | |
1266 | :param args: Argument tuple | |
1267 | :param kwargs: Keyword argument dict | |
1268 | :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError | |
1269 | :raises ImportError: No such module | |
1270 | """ | |
1271 | return self._ceph_dispatch_remote(module_name, method_name, | |
1272 | args, kwargs) | |
1273 | ||
1274 | def add_osd_perf_query(self, query): | |
1275 | """ | |
1276 | Register an OSD perf query. Argument is a | |
1277 | dict of the query parameters, in this form: | |
1278 | ||
1279 | :: | |
1280 | ||
1281 | { | |
1282 | 'key_descriptor': [ | |
1283 | {'type': subkey_type, 'regex': regex_pattern}, | |
1284 | ... | |
1285 | ], | |
1286 | 'performance_counter_descriptors': [ | |
1287 | list, of, descriptor, types | |
1288 | ], | |
1289 | 'limit': {'order_by': performance_counter_type, 'max_count': n}, | |
1290 | } | |
1291 | ||
1292 | Valid subkey types: | |
1293 | 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id', | |
1294 | 'pg_id', 'object_name', 'snap_id' | |
1295 | Valid performance counter types: | |
1296 | 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes', | |
1297 | 'latency', 'write_latency', 'read_latency' | |
1298 | ||
1299 | :param object query: query | |
1300 | :rtype: int (query id) | |
1301 | """ | |
1302 | return self._ceph_add_osd_perf_query(query) | |
1303 | ||
1304 | def remove_osd_perf_query(self, query_id): | |
1305 | """ | |
1306 | Unregister an OSD perf query. | |
1307 | ||
1308 | :param int query_id: query ID | |
1309 | """ | |
1310 | return self._ceph_remove_osd_perf_query(query_id) | |
1311 | ||
1312 | def get_osd_perf_counters(self, query_id): | |
1313 | """ | |
1314 | Get stats collected for an OSD perf query. | |
1315 | ||
1316 | :param int query_id: query ID | |
1317 | """ | |
1318 | return self._ceph_get_osd_perf_counters(query_id) | |
1319 | ||
1320 | ||
1321 | class PersistentStoreDict(object): | |
1322 | def __init__(self, mgr, prefix): | |
1323 | # type: (MgrModule, str) -> None | |
1324 | self.mgr = mgr | |
1325 | self.prefix = prefix + '.' | |
1326 | ||
1327 | def _mk_store_key(self, key): | |
1328 | return self.prefix + key | |
1329 | ||
1330 | def __missing__(self, key): | |
1331 | # KeyError won't work for the `in` operator. | |
1332 | # https://docs.python.org/3/reference/expressions.html#membership-test-details | |
1333 | raise IndexError('PersistentStoreDict: "{}" not found'.format(key)) | |
1334 | ||
1335 | def clear(self): | |
1336 | # Don't make any assumptions about the content of the values. | |
1337 | for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)): | |
1338 | k, _ = item | |
1339 | self.mgr.set_store(k, None) | |
1340 | ||
1341 | def __getitem__(self, item): | |
1342 | # type: (str) -> Any | |
1343 | key = self._mk_store_key(item) | |
1344 | try: | |
1345 | val = self.mgr.get_store(key) | |
1346 | if val is None: | |
1347 | self.__missing__(key) | |
1348 | return json.loads(val) | |
1349 | except (KeyError, AttributeError, IndexError, ValueError, TypeError): | |
1350 | logging.getLogger(__name__).exception('failed to deserialize') | |
1351 | self.mgr.set_store(key, None) | |
1352 | raise | |
1353 | ||
1354 | def __setitem__(self, item, value): | |
1355 | # type: (str, Any) -> None | |
1356 | """ | |
1357 | value=None is not allowed, as it will remove the key. | |
1358 | """ | |
1359 | key = self._mk_store_key(item) | |
1360 | self.mgr.set_store(key, json.dumps(value) if value is not None else None) | |
1361 | ||
1362 | def __delitem__(self, item): | |
1363 | self[item] = None | |
1364 | ||
1365 | def __len__(self): | |
1366 | return len(self.keys()) | |
1367 | ||
1368 | def items(self): | |
1369 | # type: () -> Iterator[Tuple[str, Any]] | |
1370 | prefix_len = len(self.prefix) | |
1371 | try: | |
1372 | for item in six.iteritems(self.mgr.get_store_prefix(self.prefix)): | |
1373 | k, v = item | |
1374 | yield k[prefix_len:], json.loads(v) | |
1375 | except (KeyError, AttributeError, IndexError, ValueError, TypeError): | |
1376 | logging.getLogger(__name__).exception('failed to deserialize') | |
1377 | self.clear() | |
1378 | ||
1379 | def keys(self): | |
1380 | # type: () -> Set[str] | |
1381 | return {item[0] for item in self.items()} | |
1382 | ||
1383 | def __iter__(self): | |
1384 | return iter(self.keys()) |