1 import ceph_module
# noqa
3 from typing
import cast
, Tuple
, Any
, Dict
, Generic
, Optional
, Callable
, List
, \
4 Mapping
, NamedTuple
, Sequence
, Union
, Set
, TYPE_CHECKING
7 if sys
.version_info
>= (3, 8):
8 from typing
import Literal
10 from typing_extensions
import Literal
19 from collections
import defaultdict
20 from enum
import IntEnum
, Enum
27 from ceph_argparse
import CephArgtype
28 from mgr_util
import profile_method
30 if sys
.version_info
>= (3, 8):
31 from typing
import get_args
, get_origin
33 def get_args(tp
: Any
) -> Any
:
37 return getattr(tp
, '__args__', ())
39 def get_origin(tp
: Any
) -> Any
:
40 return getattr(tp
, '__origin__', None)
43 ERROR_MSG_EMPTY_INPUT_FILE
= 'Empty input file'
44 ERROR_MSG_NO_INPUT_FILE
= 'Input file not specified'
45 # Full list of strings in "osd_types.cc:pg_state_string()"
83 NFS_GANESHA_SUPPORTED_FSALS
= ['CEPH', 'RGW']
84 NFS_POOL_NAME
= '.nfs'
87 class NotifyType(str, Enum
):
89 pg_summary
= 'pg_summary'
96 # these are disabled because there are no users.
98 # service_map = 'service_map'
99 # mon_status = 'mon_status'
100 # see DaemonServer.cc:
101 # perf_schema_update = 'perf_schema_update'
104 class CommandResult(object):
106 Use with MgrModule.send_command
109 def __init__(self
, tag
: Optional
[str] = None):
110 self
.ev
= threading
.Event()
115 # This is just a convenience for notifications from
116 # C++ land, to avoid passing addresses around in messages.
117 self
.tag
= tag
if tag
else ""
119 def complete(self
, r
: int, outb
: str, outs
: str) -> None:
125 def wait(self
) -> Tuple
[int, str, str]:
127 return self
.r
, self
.outb
, self
.outs
130 class HandleCommandResult(NamedTuple
):
132 Tuple containing the result of `handle_command()`
134 Only write to stderr if there is an error, or in extraordinary circumstances
136 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
137 is critical information to include there.
139 Everything programmatically consumable should be put on stdout
141 retval
: int = 0 # return code. E.g. 0 or -errno.EINVAL
142 stdout
: str = "" # data of this result.
143 stderr
: str = "" # Typically used for error messages.
146 class MonCommandFailed(RuntimeError): pass
147 class MgrDBNotReady(RuntimeError): pass
150 class OSDMap(ceph_module
.BasePyOSDMap
):
151 def get_epoch(self
) -> int:
152 return self
._get
_epoch
()
154 def get_crush_version(self
) -> int:
155 return self
._get
_crush
_version
()
157 def dump(self
) -> Dict
[str, Any
]:
160 def get_pools(self
) -> Dict
[int, Dict
[str, Any
]]:
161 # FIXME: efficient implementation
163 return dict([(p
['pool'], p
) for p
in d
['pools']])
165 def get_pools_by_name(self
) -> Dict
[str, Dict
[str, Any
]]:
166 # FIXME: efficient implementation
168 return dict([(p
['pool_name'], p
) for p
in d
['pools']])
170 def new_incremental(self
) -> 'OSDMapIncremental':
171 return self
._new
_incremental
()
173 def apply_incremental(self
, inc
: 'OSDMapIncremental') -> 'OSDMap':
174 return self
._apply
_incremental
(inc
)
176 def get_crush(self
) -> 'CRUSHMap':
177 return self
._get
_crush
()
179 def get_pools_by_take(self
, take
: int) -> List
[int]:
180 return self
._get
_pools
_by
_take
(take
).get('pools', [])
182 def calc_pg_upmaps(self
, inc
: 'OSDMapIncremental',
184 max_iterations
: int = 10,
185 pools
: Optional
[List
[str]] = None) -> int:
188 return self
._calc
_pg
_upmaps
(
190 max_deviation
, max_iterations
, pools
)
192 def map_pool_pgs_up(self
, poolid
: int) -> List
[int]:
193 return self
._map
_pool
_pgs
_up
(poolid
)
195 def pg_to_up_acting_osds(self
, pool_id
: int, ps
: int) -> Dict
[str, Any
]:
196 return self
._pg
_to
_up
_acting
_osds
(pool_id
, ps
)
198 def pool_raw_used_rate(self
, pool_id
: int) -> float:
199 return self
._pool
_raw
_used
_rate
(pool_id
)
202 def build_simple(cls
, epoch
: int = 1, uuid
: Optional
[str] = None, num_osd
: int = -1) -> 'ceph_module.BasePyOSDMap':
203 return cls
._build
_simple
(epoch
, uuid
, num_osd
)
205 def get_ec_profile(self
, name
: str) -> Optional
[List
[Dict
[str, str]]]:
206 # FIXME: efficient implementation
208 return d
['erasure_code_profiles'].get(name
, None)
210 def get_require_osd_release(self
) -> str:
212 return d
['require_osd_release']
215 class OSDMapIncremental(ceph_module
.BasePyOSDMapIncremental
):
216 def get_epoch(self
) -> int:
217 return self
._get
_epoch
()
219 def dump(self
) -> Dict
[str, Any
]:
222 def set_osd_reweights(self
, weightmap
: Dict
[int, float]) -> None:
224 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
226 return self
._set
_osd
_reweights
(weightmap
)
228 def set_crush_compat_weight_set_weights(self
, weightmap
: Dict
[str, float]) -> None:
230 weightmap is a dict, int to float. devices only. e.g.,
231 { 0: 3.4, 1: 3.3, 2: 3.334 }
233 return self
._set
_crush
_compat
_weight
_set
_weights
(weightmap
)
236 class CRUSHMap(ceph_module
.BasePyCRUSH
):
237 ITEM_NONE
= 0x7fffffff
238 DEFAULT_CHOOSE_ARGS
= '-1'
240 def dump(self
) -> Dict
[str, Any
]:
243 def get_item_weight(self
, item
: int) -> Optional
[int]:
244 return self
._get
_item
_weight
(item
)
246 def get_item_name(self
, item
: int) -> Optional
[str]:
247 return self
._get
_item
_name
(item
)
249 def find_takes(self
) -> List
[int]:
250 return self
._find
_takes
().get('takes', [])
252 def find_roots(self
) -> List
[int]:
253 return self
._find
_roots
().get('roots', [])
255 def get_take_weight_osd_map(self
, root
: int) -> Dict
[int, float]:
256 uglymap
= self
._get
_take
_weight
_osd
_map
(root
)
257 return {int(k
): v
for k
, v
in uglymap
.get('weights', {}).items()}
260 def have_default_choose_args(dump
: Dict
[str, Any
]) -> bool:
261 return CRUSHMap
.DEFAULT_CHOOSE_ARGS
in dump
.get('choose_args', {})
264 def get_default_choose_args(dump
: Dict
[str, Any
]) -> List
[Dict
[str, Any
]]:
265 choose_args
= dump
.get('choose_args')
266 assert isinstance(choose_args
, dict)
267 return choose_args
.get(CRUSHMap
.DEFAULT_CHOOSE_ARGS
, [])
269 def get_rule(self
, rule_name
: str) -> Optional
[Dict
[str, Any
]]:
270 # TODO efficient implementation
271 for rule
in self
.dump()['rules']:
272 if rule_name
== rule
['rule_name']:
277 def get_rule_by_id(self
, rule_id
: int) -> Optional
[Dict
[str, Any
]]:
278 for rule
in self
.dump()['rules']:
279 if rule
['rule_id'] == rule_id
:
284 def get_rule_root(self
, rule_name
: str) -> Optional
[int]:
285 rule
= self
.get_rule(rule_name
)
290 first_take
= next(s
for s
in rule
['steps'] if s
.get('op') == 'take')
291 except StopIteration:
292 logging
.warning("CRUSH rule '{0}' has no 'take' step".format(
296 return first_take
['item']
298 def get_osds_under(self
, root_id
: int) -> List
[int]:
299 # TODO don't abuse dump like this
301 buckets
= dict([(b
['id'], b
) for b
in d
['buckets']])
305 def accumulate(b
: Dict
[str, Any
]) -> None:
306 for item
in b
['items']:
308 osd_list
.append(item
['id'])
311 accumulate(buckets
[item
['id']])
315 accumulate(buckets
[root_id
])
319 def device_class_counts(self
) -> Dict
[str, int]:
320 result
= defaultdict(int) # type: Dict[str, int]
321 # TODO don't abuse dump like this
323 for device
in d
['devices']:
324 cls
= device
.get('class', None)
330 HandlerFuncType
= Callable
[..., Tuple
[int, str, str]]
332 def _extract_target_func(
334 ) -> Tuple
[HandlerFuncType
, Dict
[str, Any
]]:
335 """In order to interoperate with other decorated functions,
336 we need to find the original function which will provide
337 the main set of arguments. While we descend through the
338 stack of wrapped functions, gather additional arguments
339 the decorators may want to provide.
341 # use getattr to keep mypy happy
342 wrapped
= getattr(f
, "__wrapped__", None)
345 extra_args
: Dict
[str, Any
] = {}
346 while wrapped
is not None:
347 extra_args
.update(getattr(f
, "extra_args", {}))
349 wrapped
= getattr(f
, "__wrapped__", None)
353 class CLICommand(object):
354 COMMANDS
= {} # type: Dict[str, CLICommand]
363 self
.func
= None # type: Optional[Callable]
364 self
.arg_spec
= {} # type: Dict[str, Any]
365 self
.first_default
= -1
367 KNOWN_ARGS
= '_', 'self', 'mgr', 'inbuf', 'return'
370 def _load_func_metadata(cls
: Any
, f
: HandlerFuncType
) -> Tuple
[str, Dict
[str, Any
], int, str]:
371 f
, extra_args
= _extract_target_func(f
)
372 desc
= (inspect
.getdoc(f
) or '').replace('\n', ' ')
373 full_argspec
= inspect
.getfullargspec(f
)
374 arg_spec
= full_argspec
.annotations
375 first_default
= len(arg_spec
)
376 if full_argspec
.defaults
:
377 first_default
-= len(full_argspec
.defaults
)
380 for index
, arg
in enumerate(full_argspec
.args
):
381 if arg
in cls
.KNOWN_ARGS
:
382 # record that this function takes an inbuf if it is present
383 # in the full_argspec and not already in the arg_spec
384 if arg
== 'inbuf' and 'inbuf' not in arg_spec
:
385 arg_spec
['inbuf'] = 'str'
387 if arg
== '_end_positional_':
392 or arg_spec
[arg
] is Optional
[bool]
393 or arg_spec
[arg
] is bool
395 # implicit switch to non-positional on any
396 # Optional[bool] or the --format option
398 assert arg
in arg_spec
, \
399 f
"'{arg}' is not annotated for {f}: {full_argspec}"
400 has_default
= index
>= first_default
401 args
.append(CephArgtype
.to_argdesc(arg_spec
[arg
],
405 for argname
, argtype
in extra_args
.items():
406 # avoid shadowing args from the function
407 if argname
in arg_spec
:
409 arg_spec
[argname
] = argtype
410 args
.append(CephArgtype
.to_argdesc(
411 argtype
, dict(name
=argname
), has_default
=True, positional
=False
413 return desc
, arg_spec
, first_default
, ' '.join(args
)
415 def store_func_metadata(self
, f
: HandlerFuncType
) -> None:
416 self
.desc
, self
.arg_spec
, self
.first_default
, self
.args
= \
417 self
._load
_func
_metadata
(f
)
419 def __call__(self
, func
: HandlerFuncType
) -> HandlerFuncType
:
420 self
.store_func_metadata(func
)
422 self
.COMMANDS
[self
.prefix
] = self
425 def _get_arg_value(self
, kwargs_switch
: bool, key
: str, val
: Any
) -> Tuple
[bool, str, Any
]:
426 def start_kwargs() -> bool:
427 if isinstance(val
, str) and '=' in val
:
428 k
, v
= val
.split('=', 1)
429 if k
in self
.arg_spec
:
433 if not kwargs_switch
:
434 kwargs_switch
= start_kwargs()
437 k
, v
= val
.split('=', 1)
440 return kwargs_switch
, k
.replace('-', '_'), v
442 def _collect_args_by_argspec(self
, cmd_dict
: Dict
[str, Any
]) -> Tuple
[Dict
[str, Any
], Set
[str]]:
445 kwargs_switch
= False
446 for index
, (name
, tp
) in enumerate(self
.arg_spec
.items()):
447 if name
in CLICommand
.KNOWN_ARGS
:
448 special_args
.add(name
)
450 assert self
.first_default
>= 0
451 raw_v
= cmd_dict
.get(name
)
452 if index
>= self
.first_default
:
455 kwargs_switch
, k
, v
= self
._get
_arg
_value
(kwargs_switch
,
457 kwargs
[k
] = CephArgtype
.cast_to(tp
, v
)
458 return kwargs
, special_args
462 cmd_dict
: Dict
[str, Any
],
463 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
464 kwargs
, specials
= self
._collect
_args
_by
_argspec
(cmd_dict
)
466 if 'inbuf' not in specials
:
467 return HandleCommandResult(
470 'Invalid command: Input file data (-i) not supported',
472 kwargs
['inbuf'] = inbuf
474 return self
.func(mgr
, **kwargs
)
476 def dump_cmd(self
) -> Dict
[str, Union
[str, bool]]:
478 'cmd': '{} {}'.format(self
.prefix
, self
.args
),
485 def dump_cmd_list(cls
) -> List
[Dict
[str, Union
[str, bool]]]:
486 return [cmd
.dump_cmd() for cmd
in cls
.COMMANDS
.values()]
489 def CLIReadCommand(prefix
: str, poll
: bool = False) -> CLICommand
:
490 return CLICommand(prefix
, "r", poll
)
493 def CLIWriteCommand(prefix
: str, poll
: bool = False) -> CLICommand
:
494 return CLICommand(prefix
, "w", poll
)
497 def CLICheckNonemptyFileInput(desc
: str) -> Callable
[[HandlerFuncType
], HandlerFuncType
]:
498 def CheckFileInput(func
: HandlerFuncType
) -> HandlerFuncType
:
499 @functools.wraps(func
)
500 def check(*args
: Any
, **kwargs
: Any
) -> Tuple
[int, str, str]:
501 if 'inbuf' not in kwargs
:
502 return -errno
.EINVAL
, '', f
'{ERROR_MSG_NO_INPUT_FILE}: Please specify the file '\
503 f
'containing {desc} with "-i" option'
504 if isinstance(kwargs
['inbuf'], str):
505 # Delete new line separator at EOF (it may have been added by a text editor).
506 kwargs
['inbuf'] = kwargs
['inbuf'].rstrip('\r\n').rstrip('\n')
507 if not kwargs
['inbuf'] or not kwargs
['inbuf'].strip():
508 return -errno
.EINVAL
, '', f
'{ERROR_MSG_EMPTY_INPUT_FILE}: Please add {desc} to '\
510 return func(*args
, **kwargs
)
511 check
.__signature
__ = inspect
.signature(func
) # type: ignore[attr-defined]
513 return CheckFileInput
515 # If the mgr loses its lock on the database because e.g. the pgs were
516 # transiently down, then close it and allow it to be reopened.
517 MAX_DBCLEANUP_RETRIES
= 3
518 def MgrModuleRecoverDB(func
: Callable
) -> Callable
:
519 @functools.wraps(func
)
520 def check(self
: MgrModule
, *args
: Any
, **kwargs
: Any
) -> Any
:
524 return func(self
, *args
, **kwargs
)
525 except sqlite3
.DatabaseError
as e
:
526 self
.log
.error(f
"Caught fatal database error: {e}")
528 if retries
> MAX_DBCLEANUP_RETRIES
:
530 self
.log
.debug(f
"attempting reopen of database")
533 # allow retry of func(...)
534 check
.__signature
__ = inspect
.signature(func
) # type: ignore[attr-defined]
537 def CLIRequiresDB(func
: HandlerFuncType
) -> HandlerFuncType
:
538 @functools.wraps(func
)
539 def check(self
: MgrModule
, *args
: Any
, **kwargs
: Any
) -> Tuple
[int, str, str]:
540 if not self
.db_ready():
541 return -errno
.EAGAIN
, "", "mgr db not yet available"
542 return func(self
, *args
, **kwargs
)
543 check
.__signature
__ = inspect
.signature(func
) # type: ignore[attr-defined]
546 def _get_localized_key(prefix
: str, key
: str) -> str:
547 return '{}/{}'.format(prefix
, key
)
551 MODULE_OPTIONS types and Option Class
554 OptionTypeLabel
= Literal
[
555 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
558 # common/options.h: value_t
559 OptionValue
= Optional
[Union
[bool, int, float, str]]
564 Helper class to declare options for MODULE_OPTIONS list.
565 TODO: Replace with typing.TypedDict when in python_version >= 3.8
571 default
: OptionValue
= None,
572 type: 'OptionTypeLabel' = 'str',
573 desc
: Optional
[str] = None,
574 long_desc
: Optional
[str] = None,
575 min: OptionValue
= None,
576 max: OptionValue
= None,
577 enum_allowed
: Optional
[List
[str]] = None,
578 tags
: Optional
[List
[str]] = None,
579 see_also
: Optional
[List
[str]] = None,
580 runtime
: bool = False,
582 super(Option
, self
).__init
__(
583 (k
, v
) for k
, v
in vars().items()
584 if k
!= 'self' and v
is not None)
589 Helper class to declare options for COMMANDS list.
591 It also allows to specify prefix and args separately, as well as storing a
595 >>> def handler(): return 0, "", ""
596 >>> Command(prefix="example",
599 {'perm': 'w', 'poll': False}
605 handler
: HandlerFuncType
,
609 super().__init
__(perm
=perm
,
612 self
.handler
= handler
615 def returns_command_result(instance
: Any
,
616 f
: HandlerFuncType
) -> Callable
[..., HandleCommandResult
]:
618 def wrapper(mgr
: Any
, *args
: Any
, **kwargs
: Any
) -> HandleCommandResult
:
619 retval
, stdout
, stderr
= f(instance
or mgr
, *args
, **kwargs
)
620 return HandleCommandResult(retval
, stdout
, stderr
)
621 wrapper
.__signature
__ = inspect
.signature(f
) # type: ignore[attr-defined]
624 def register(self
, instance
: bool = False) -> HandlerFuncType
:
626 Register a CLICommand handler. It allows an instance to register bound
627 methods. In that case, the mgr instance is not passed, and it's expected
628 to be available in the class instance.
629 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
632 cmd
= CLICommand(prefix
=self
.prefix
, perm
=self
['perm'])
633 return cmd(self
.returns_command_result(instance
, self
.handler
))
636 class CPlusPlusHandler(logging
.Handler
):
637 def __init__(self
, module_inst
: Any
):
638 super(CPlusPlusHandler
, self
).__init
__()
639 self
._module
= module_inst
640 self
.setFormatter(logging
.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
641 .format(module_inst
.module_name
)))
643 def emit(self
, record
: logging
.LogRecord
) -> None:
644 if record
.levelno
>= self
.level
:
645 self
._module
._ceph
_log
(self
.format(record
))
648 class ClusterLogHandler(logging
.Handler
):
649 def __init__(self
, module_inst
: Any
):
651 self
._module
= module_inst
652 self
.setFormatter(logging
.Formatter("%(message)s"))
654 def emit(self
, record
: logging
.LogRecord
) -> None:
656 logging
.DEBUG
: MgrModule
.ClusterLogPrio
.DEBUG
,
657 logging
.INFO
: MgrModule
.ClusterLogPrio
.INFO
,
658 logging
.WARNING
: MgrModule
.ClusterLogPrio
.WARN
,
659 logging
.ERROR
: MgrModule
.ClusterLogPrio
.ERROR
,
660 logging
.CRITICAL
: MgrModule
.ClusterLogPrio
.ERROR
,
662 level
= levelmap
[record
.levelno
]
663 if record
.levelno
>= self
.level
:
664 self
._module
.cluster_log(self
._module
.module_name
,
669 class FileHandler(logging
.FileHandler
):
670 def __init__(self
, module_inst
: Any
):
671 path
= module_inst
.get_ceph_option("log_file")
672 idx
= path
.rfind(".log")
674 self
.path
= "{}.{}.log".format(path
[:idx
], module_inst
.module_name
)
676 self
.path
= "{}.{}".format(path
, module_inst
.module_name
)
677 super(FileHandler
, self
).__init
__(self
.path
, delay
=True)
678 self
.setFormatter(logging
.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
681 class MgrModuleLoggingMixin(object):
682 def _configure_logging(self
,
687 log_to_cluster
: bool) -> None:
688 self
._mgr
_level
: Optional
[str] = None
689 self
._module
_level
: Optional
[str] = None
690 self
._root
_logger
= logging
.getLogger()
692 self
._unconfigure
_logging
()
694 # the ceph log handler is initialized only once
695 self
._mgr
_log
_handler
= CPlusPlusHandler(self
)
696 self
._cluster
_log
_handler
= ClusterLogHandler(self
)
697 self
._file
_log
_handler
= FileHandler(self
)
699 self
.log_to_file
= log_to_file
700 self
.log_to_cluster
= log_to_cluster
702 self
._root
_logger
.addHandler(self
._mgr
_log
_handler
)
704 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
706 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
708 self
._root
_logger
.setLevel(logging
.NOTSET
)
709 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
711 def _unconfigure_logging(self
) -> None:
712 # remove existing handlers:
714 h
for h
in self
._root
_logger
.handlers
715 if (isinstance(h
, CPlusPlusHandler
) or
716 isinstance(h
, FileHandler
) or
717 isinstance(h
, ClusterLogHandler
))]
718 for h
in rm_handlers
:
719 self
._root
_logger
.removeHandler(h
)
720 self
.log_to_file
= False
721 self
.log_to_cluster
= False
723 def _set_log_level(self
,
726 cluster_level
: str) -> None:
727 self
._cluster
_log
_handler
.setLevel(cluster_level
.upper())
729 module_level
= module_level
.upper() if module_level
else ''
730 if not self
._module
_level
:
731 # using debug_mgr level
732 if not module_level
and self
._mgr
_level
== mgr_level
:
733 # no change in module level neither in debug_mgr
736 if self
._module
_level
== module_level
:
737 # no change in module level
740 if not self
._module
_level
and not module_level
:
741 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
742 self
.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
744 elif self
._module
_level
and not module_level
:
745 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
746 self
.getLogger().warning("unsetting module log level, falling back to "
747 "debug_mgr level: %s (%s)", level
, mgr_level
)
750 self
.getLogger().debug("setting log level: %s", level
)
752 self
._module
_level
= module_level
753 self
._mgr
_level
= mgr_level
755 self
._mgr
_log
_handler
.setLevel(level
)
756 self
._file
_log
_handler
.setLevel(level
)
758 def _enable_file_log(self
) -> None:
760 self
.getLogger().warning("enabling logging to file")
761 self
.log_to_file
= True
762 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
764 def _disable_file_log(self
) -> None:
766 self
.getLogger().warning("disabling logging to file")
767 self
.log_to_file
= False
768 self
._root
_logger
.removeHandler(self
._file
_log
_handler
)
770 def _enable_cluster_log(self
) -> None:
772 self
.getLogger().warning("enabling logging to cluster")
773 self
.log_to_cluster
= True
774 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
776 def _disable_cluster_log(self
) -> None:
777 # disable cluster log
778 self
.getLogger().warning("disabling logging to cluster")
779 self
.log_to_cluster
= False
780 self
._root
_logger
.removeHandler(self
._cluster
_log
_handler
)
782 def _ceph_log_level_to_python(self
, log_level
: str) -> str:
785 ceph_log_level
= int(log_level
.split("/", 1)[0])
792 if ceph_log_level
<= 0:
793 log_level
= "CRITICAL"
794 elif ceph_log_level
<= 1:
795 log_level
= "WARNING"
796 elif ceph_log_level
<= 4:
800 def getLogger(self
, name
: Optional
[str] = None) -> logging
.Logger
:
801 return logging
.getLogger(name
)
804 class MgrStandbyModule(ceph_module
.BaseMgrStandbyModule
, MgrModuleLoggingMixin
):
806 Standby modules only implement a serve and shutdown method, they
807 are not permitted to implement commands and they do not receive
810 They only have access to the mgrmap (for accessing service URI info
811 from their active peer), and to configuration settings (read only).
814 MODULE_OPTIONS
: List
[Option
] = []
815 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
817 def __init__(self
, module_name
: str, capsule
: Any
):
818 super(MgrStandbyModule
, self
).__init
__(capsule
)
819 self
.module_name
= module_name
821 # see also MgrModule.__init__()
822 for o
in self
.MODULE_OPTIONS
:
825 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
827 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
829 # mock does not return a str
830 mgr_level
= cast(str, self
.get_ceph_option("debug_mgr"))
831 log_level
= cast(str, self
.get_module_option("log_level"))
832 cluster_level
= cast(str, self
.get_module_option('log_to_cluster_level'))
833 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
836 # for backwards compatibility
837 self
._logger
= self
.getLogger()
839 def __del__(self
) -> None:
841 self
._unconfigure
_logging
()
843 def _cleanup(self
) -> None:
847 def _register_options(cls
, module_name
: str) -> None:
848 cls
.MODULE_OPTIONS
.append(
849 Option(name
='log_level', type='str', default
="", runtime
=True,
850 enum_allowed
=['info', 'debug', 'critical', 'error',
852 cls
.MODULE_OPTIONS
.append(
853 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
854 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
855 cls
.MODULE_OPTIONS
.append(
856 Option(name
='log_to_cluster', type='bool', default
=False,
858 cls
.MODULE_OPTIONS
.append(
859 Option(name
='log_to_cluster_level', type='str', default
='info',
861 enum_allowed
=['info', 'debug', 'critical', 'error',
865 def log(self
) -> logging
.Logger
:
868 def serve(self
) -> None:
870 The serve method is mandatory for standby modules.
873 raise NotImplementedError()
875 def get_mgr_id(self
) -> str:
876 return self
._ceph
_get
_mgr
_id
()
878 def get_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
880 Retrieve the value of a persistent configuration setting
882 :param default: the default value of the config if it is not found
884 r
= self
._ceph
_get
_module
_option
(key
)
886 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
890 def get_ceph_option(self
, key
: str) -> OptionValue
:
891 return self
._ceph
_get
_option
(key
)
893 def get_store(self
, key
: str) -> Optional
[str]:
895 Retrieve the value of a persistent KV store entry
898 :return: Byte string or None
900 return self
._ceph
_get
_store
(key
)
902 def get_localized_store(self
, key
: str, default
: Optional
[str] = None) -> Optional
[str]:
903 r
= self
._ceph
_get
_store
(_get_localized_key(self
.get_mgr_id(), key
))
905 r
= self
._ceph
_get
_store
(key
)
910 def get_active_uri(self
) -> str:
911 return self
._ceph
_get
_active
_uri
()
913 def get(self
, data_name
: str) -> Dict
[str, Any
]:
914 return self
._ceph
_get
(data_name
)
916 def get_mgr_ip(self
) -> str:
917 ips
= self
.get("mgr_ips").get('ips', [])
919 return socket
.gethostname()
922 def get_hostname(self
) -> str:
923 return socket
.gethostname()
925 def get_localized_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
926 r
= self
._ceph
_get
_module
_option
(key
, self
.get_mgr_id())
928 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
933 HealthChecksT
= Mapping
[str, Mapping
[str, Union
[int, str, Sequence
[str]]]]
934 # {"type": service_type, "id": service_id}
935 ServiceInfoT
= Dict
[str, str]
936 # {"hostname": hostname,
937 # "ceph_version": version,
938 # "services": [service_info, ..]}
939 ServerInfoT
= Dict
[str, Union
[str, List
[ServiceInfoT
]]]
940 PerfCounterT
= Dict
[str, Any
]
944 def DecoratorFactory(attr
: str, default
: Any
): # type: ignore
945 class DecoratorClass
:
946 _ATTR_TOKEN
= f
'__ATTR_{attr.upper()}__'
948 def __init__(self
, value
: Any
=default
) -> None:
951 def __call__(self
, func
: Callable
) -> Any
:
952 setattr(func
, self
._ATTR
_TOKEN
, self
.value
)
956 def get(cls
, func
: Callable
) -> Any
:
957 return getattr(func
, cls
._ATTR
_TOKEN
, default
)
959 return DecoratorClass
961 perm
= DecoratorFactory('perm', default
='r')
962 expose
= DecoratorFactory('expose', default
=False)(True)
965 class MgrModule(ceph_module
.BaseMgrModule
, MgrModuleLoggingMixin
):
966 MGR_POOL_NAME
= ".mgr"
968 COMMANDS
= [] # type: List[Any]
969 MODULE_OPTIONS
: List
[Option
] = []
970 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
973 SCHEMA
= None # type: Optional[str]
974 SCHEMA_VERSIONED
= None # type: Optional[List[str]]
976 # Priority definitions for perf counters
980 PRIO_UNINTERESTING
= 2
983 # counter value types
988 PERFCOUNTER_LONGRUNAVG
= 4
989 PERFCOUNTER_COUNTER
= 8
990 PERFCOUNTER_HISTOGRAM
= 0x10
991 PERFCOUNTER_TYPE_MASK
= ~
3
997 # Cluster log priorities
998 class ClusterLogPrio(IntEnum
):
1005 def __init__(self
, module_name
: str, py_modules_ptr
: object, this_ptr
: object):
1006 self
.module_name
= module_name
1007 super(MgrModule
, self
).__init
__(py_modules_ptr
, this_ptr
)
1009 for o
in self
.MODULE_OPTIONS
:
1012 # we'll assume the declared type matches the
1013 # supplied default value's type.
1014 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
1016 # module not declaring it's type, so normalize the
1017 # default value to be a string for consistent behavior
1018 # with default and user-supplied option values.
1019 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
1021 mgr_level
= cast(str, self
.get_ceph_option("debug_mgr"))
1022 log_level
= cast(str, self
.get_module_option("log_level"))
1023 cluster_level
= cast(str, self
.get_module_option('log_to_cluster_level'))
1024 log_to_file
= self
.get_module_option("log_to_file")
1025 assert isinstance(log_to_file
, bool)
1026 log_to_cluster
= self
.get_module_option("log_to_cluster")
1027 assert isinstance(log_to_cluster
, bool)
1028 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
1029 log_to_file
, log_to_cluster
)
1031 # for backwards compatibility
1032 self
._logger
= self
.getLogger()
1034 self
._db
= None # type: Optional[sqlite3.Connection]
1036 self
._version
= self
._ceph
_get
_version
()
1038 self
._perf
_schema
_cache
= None
1040 # Keep a librados instance for those that need it.
1041 self
._rados
: Optional
[rados
.Rados
] = None
1043 # this does not change over the lifetime of an active mgr
1044 self
._mgr
_ips
: Optional
[str] = None
1046 self
._db
_lock
= threading
.Lock()
1048 def __del__(self
) -> None:
1049 self
._unconfigure
_logging
()
1052 def _register_options(cls
, module_name
: str) -> None:
1053 cls
.MODULE_OPTIONS
.append(
1054 Option(name
='log_level', type='str', default
="", runtime
=True,
1055 enum_allowed
=['info', 'debug', 'critical', 'error',
1057 cls
.MODULE_OPTIONS
.append(
1058 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
1059 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
1060 cls
.MODULE_OPTIONS
.append(
1061 Option(name
='log_to_cluster', type='bool', default
=False,
1063 cls
.MODULE_OPTIONS
.append(
1064 Option(name
='log_to_cluster_level', type='str', default
='info',
1066 enum_allowed
=['info', 'debug', 'critical', 'error',
1070 def _register_commands(cls
, module_name
: str) -> None:
1071 cls
.COMMANDS
.extend(CLICommand
.dump_cmd_list())
1074 def log(self
) -> logging
.Logger
:
1077 def cluster_log(self
, channel
: str, priority
: ClusterLogPrio
, message
: str) -> None:
1079 :param channel: The log channel. This can be 'cluster', 'audit', ...
1080 :param priority: The log message priority.
1081 :param message: The message to log.
1083 self
._ceph
_cluster
_log
(channel
, priority
.value
, message
)
1086 def version(self
) -> str:
1087 return self
._version
1090 def pool_exists(self
, name
: str) -> bool:
1091 pools
= [p
['pool_name'] for p
in self
.get('osd_map')['pools']]
1092 return name
in pools
1095 def have_enough_osds(self
) -> bool:
1096 # wait until we have enough OSDs to allow the pool to be healthy
1098 for osd
in self
.get("osd_map")["osds"]:
1099 if osd
["up"] and osd
["in"]:
1102 need
= cast(int, self
.get_ceph_option("osd_pool_default_size"))
1103 return ready
>= need
1107 def rename_pool(self
, srcpool
: str, destpool
: str) -> None:
1109 'prefix': 'osd pool rename',
1112 'destpool': destpool
,
1113 'yes_i_really_mean_it': True
1115 self
.check_mon_command(c
)
1119 def create_pool(self
, pool
: str) -> None:
1121 'prefix': 'osd pool create',
1127 'yes_i_really_mean_it': True
1129 self
.check_mon_command(c
)
1133 def appify_pool(self
, pool
: str, app
: str) -> None:
1135 'prefix': 'osd pool application enable',
1139 'yes_i_really_mean_it': True
1141 self
.check_mon_command(c
)
1145 def create_mgr_pool(self
) -> None:
1146 self
.log
.info("creating mgr pool")
1148 ov
= self
.get_module_option_ex('devicehealth', 'pool_name', 'device_health_metrics')
1149 devhealth
= cast(str, ov
)
1150 if devhealth
is not None and self
.pool_exists(devhealth
):
1151 self
.log
.debug("reusing devicehealth pool")
1152 self
.rename_pool(devhealth
, self
.MGR_POOL_NAME
)
1153 self
.appify_pool(self
.MGR_POOL_NAME
, 'mgr')
1155 self
.log
.debug("creating new mgr pool")
1156 self
.create_pool(self
.MGR_POOL_NAME
)
1157 self
.appify_pool(self
.MGR_POOL_NAME
, 'mgr')
1159 def create_skeleton_schema(self
, db
: sqlite3
.Connection
) -> None:
1161 CREATE TABLE IF NOT EXISTS MgrModuleKV (
1162 key TEXT PRIMARY KEY,
1165 INSERT OR IGNORE INTO MgrModuleKV (key, value) VALUES ('__version', 0);
1168 db
.executescript(SQL
)
1170 def update_schema_version(self
, db
: sqlite3
.Connection
, version
: int) -> None:
1171 SQL
= "UPDATE OR ROLLBACK MgrModuleKV SET value = ? WHERE key = '__version';"
1173 db
.execute(SQL
, (version
,))
1175 def set_kv(self
, key
: str, value
: Any
) -> None:
1176 SQL
= "INSERT OR REPLACE INTO MgrModuleKV (key, value) VALUES (?, ?);"
1178 assert key
[:2] != "__"
1180 self
.log
.debug(f
"set_kv('{key}', '{value}')")
1182 with self
._db
_lock
, self
.db
:
1183 self
.db
.execute(SQL
, (key
, value
))
1186 def get_kv(self
, key
: str) -> Any
:
1187 SQL
= "SELECT value FROM MgrModuleKV WHERE key = ?;"
1189 assert key
[:2] != "__"
1191 self
.log
.debug(f
"get_kv('{key}')")
1193 with self
._db
_lock
, self
.db
:
1194 cur
= self
.db
.execute(SQL
, (key
,))
1195 row
= cur
.fetchone()
1200 self
.log
.debug(f
" = {v}")
1203 def maybe_upgrade(self
, db
: sqlite3
.Connection
, version
: int) -> None:
1205 self
.log
.info(f
"creating main.db for {self.module_name}")
1206 assert self
.SCHEMA
is not None
1207 db
.executescript(self
.SCHEMA
)
1208 self
.update_schema_version(db
, 1)
1210 assert self
.SCHEMA_VERSIONED
is not None
1211 latest
= len(self
.SCHEMA_VERSIONED
)
1212 if latest
< version
:
1213 raise RuntimeError(f
"main.db version is newer ({version}) than module ({latest})")
1214 for i
in range(version
, latest
):
1215 self
.log
.info(f
"upgrading main.db for {self.module_name} from {i-1}:{i}")
1216 SQL
= self
.SCHEMA_VERSIONED
[i
]
1217 db
.executescript(SQL
)
1218 if version
< latest
:
1219 self
.update_schema_version(db
, latest
)
1221 def load_schema(self
, db
: sqlite3
.Connection
) -> None:
1223 SELECT value FROM MgrModuleKV WHERE key = '__version';
1227 self
.create_skeleton_schema(db
)
1228 cur
= db
.execute(SQL
)
1229 row
= cur
.fetchone()
1230 self
.maybe_upgrade(db
, int(row
['value']))
1231 assert cur
.fetchone() is None
1234 def configure_db(self
, db
: sqlite3
.Connection
) -> None:
1235 db
.execute('PRAGMA FOREIGN_KEYS = 1')
1236 db
.execute('PRAGMA JOURNAL_MODE = PERSIST')
1237 db
.execute('PRAGMA PAGE_SIZE = 65536')
1238 db
.execute('PRAGMA CACHE_SIZE = 64')
1239 db
.execute('PRAGMA TEMP_STORE = memory')
1240 db
.row_factory
= sqlite3
.Row
1241 self
.load_schema(db
)
1243 def close_db(self
) -> None:
1245 if self
._db
is not None:
1249 def open_db(self
) -> Optional
[sqlite3
.Connection
]:
1250 if not self
.pool_exists(self
.MGR_POOL_NAME
):
1251 if not self
.have_enough_osds():
1253 self
.create_mgr_pool()
1254 uri
= f
"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
1255 self
.log
.debug(f
"using uri {uri}")
1256 db
= sqlite3
.connect(uri
, check_same_thread
=False, uri
=True)
1257 # if libcephsqlite reconnects, update the addrv for blocklist
1259 cur
= db
.execute('SELECT json_extract(ceph_status(), "$.addr");')
1260 (addrv
,) = cur
.fetchone()
1261 assert addrv
is not None
1262 self
.log
.debug(f
"new libcephsqlite addrv = {addrv}")
1263 self
._ceph
_register
_client
("libcephsqlite", addrv
, True)
1264 self
.configure_db(db
)
1268 def db_ready(self
) -> bool:
1271 return self
.db
is not None
1272 except MgrDBNotReady
:
1276 def db(self
) -> sqlite3
.Connection
:
1277 assert self
._db
_lock
.locked()
1278 if self
._db
is not None:
1280 db_allowed
= self
.get_ceph_option("mgr_pool")
1282 raise MgrDBNotReady();
1283 self
._db
= self
.open_db()
1284 if self
._db
is None:
1285 raise MgrDBNotReady();
1289 def release_name(self
) -> str:
1291 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
1292 :return: Returns the release name of the Ceph version in lower case.
1295 return self
._ceph
_get
_release
_name
()
1298 def lookup_release_name(self
, major
: int) -> str:
1299 return self
._ceph
_lookup
_release
_name
(major
)
1301 def get_context(self
) -> object:
1303 :return: a Python capsule containing a C++ CephContext pointer
1305 return self
._ceph
_get
_context
()
1307 def notify(self
, notify_type
: NotifyType
, notify_id
: str) -> None:
1309 Called by the ceph-mgr service to notify the Python plugin
1310 that new state is available. This method is *only* called for
1311 notify_types that are listed in the NOTIFY_TYPES string list
1312 member of the module class.
1314 :param notify_type: string indicating what kind of notification,
1315 such as osd_map, mon_map, fs_map, mon_status,
1316 health, pg_summary, command, service_map
1317 :param notify_id: string (may be empty) that optionally specifies
1318 which entity is being notified about. With
1319 "command" notifications this is set to the tag
1320 ``from send_command``.
1324 def _config_notify(self
) -> None:
1325 # check logging options for changes
1326 mgr_level
= cast(str, self
.get_ceph_option("debug_mgr"))
1327 module_level
= cast(str, self
.get_module_option("log_level"))
1328 cluster_level
= cast(str, self
.get_module_option("log_to_cluster_level"))
1329 assert isinstance(cluster_level
, str)
1330 log_to_file
= self
.get_module_option("log_to_file", False)
1331 assert isinstance(log_to_file
, bool)
1332 log_to_cluster
= self
.get_module_option("log_to_cluster", False)
1333 assert isinstance(log_to_cluster
, bool)
1334 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
1336 if log_to_file
!= self
.log_to_file
:
1338 self
._enable
_file
_log
()
1340 self
._disable
_file
_log
()
1341 if log_to_cluster
!= self
.log_to_cluster
:
1343 self
._enable
_cluster
_log
()
1345 self
._disable
_cluster
_log
()
1347 # call module subclass implementations
1348 self
.config_notify()
1350 def config_notify(self
) -> None:
1352 Called by the ceph-mgr service to notify the Python plugin
1353 that the configuration may have changed. Modules will want to
1354 refresh any configuration values stored in config variables.
1358 def serve(self
) -> None:
1360 Called by the ceph-mgr service to start any server that
1361 is provided by this Python plugin. The implementation
1362 of this function should block until ``shutdown`` is called.
1364 You *must* implement ``shutdown`` if you implement ``serve``
1368 def shutdown(self
) -> None:
1370 Called by the ceph-mgr service to request that this
1371 module drop out of its serve() function. You do not
1372 need to implement this if you do not implement serve()
1377 addrs
= self
._rados
.get_addrs()
1378 self
._rados
.shutdown()
1379 self
._ceph
_unregister
_client
(None, addrs
)
1383 def get(self
, data_name
: str) -> Any
:
1385 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1387 :param str data_name: Valid things to fetch are osdmap_crush_map_text,
1388 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1389 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
1390 health, mon_status, devices, device <devid>, pg_stats,
1391 pool_stats, pg_ready, osd_ping_times, mgr_map, mgr_ips,
1392 modified_config_options, service_map, mds_metadata,
1393 have_local_config_map, osd_pool_stats, pg_status.
1396 All these structures have their own JSON representations: experiment
1397 or look at the C++ ``dump()`` methods to learn about them.
1399 obj
= self
._ceph
_get
(data_name
)
1400 if isinstance(obj
, bytes
):
1401 obj
= json
.loads(obj
)
1405 def _stattype_to_str(self
, stattype
: int) -> str:
1407 typeonly
= stattype
& self
.PERFCOUNTER_TYPE_MASK
1410 if typeonly
== self
.PERFCOUNTER_LONGRUNAVG
:
1411 # this lie matches the DaemonState decoding: only val, no counts
1413 if typeonly
== self
.PERFCOUNTER_COUNTER
:
1415 if typeonly
== self
.PERFCOUNTER_HISTOGRAM
:
1420 def _perfpath_to_path_labels(self
, daemon
: str,
1421 path
: str) -> Tuple
[str, Tuple
[str, ...], Tuple
[str, ...]]:
1422 if daemon
.startswith('rgw.'):
1423 label_name
= 'instance_id'
1424 daemon
= daemon
[len('rgw.'):]
1426 label_name
= 'ceph_daemon'
1428 label_names
= (label_name
,) # type: Tuple[str, ...]
1429 labels
= (daemon
,) # type: Tuple[str, ...]
1431 if daemon
.startswith('rbd-mirror.'):
1433 r
'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
1437 path
= 'rbd_mirror_image_' + match
.group(4)
1438 pool
= match
.group(1)
1439 namespace
= match
.group(2) or ''
1440 image
= match
.group(3)
1441 label_names
+= ('pool', 'namespace', 'image')
1442 labels
+= (pool
, namespace
, image
)
1444 return path
, label_names
, labels
,
1446 def _perfvalue_to_value(self
, stattype
: int, value
: Union
[int, float]) -> Union
[float, int]:
1447 if stattype
& self
.PERFCOUNTER_TIME
:
1448 # Convert from ns to seconds
1449 return value
/ 1000000000.0
1453 def _unit_to_str(self
, unit
: int) -> str:
1454 if unit
== self
.NONE
:
1456 elif unit
== self
.BYTES
:
1459 raise ValueError(f
'bad unit "{unit}"')
1462 def to_pretty_iec(n
: int) -> str:
1463 for bits
, suffix
in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1464 (20, 'Mi'), (10, 'Ki')]:
1466 return str(n
>> bits
) + ' ' + suffix
1470 def get_pretty_row(elems
: Sequence
[str], width
: int) -> str:
1472 Takes an array of elements and returns a string with those elements
1473 formatted as a table row. Useful for polling modules.
1475 :param elems: the elements to be printed
1476 :param width: the width of the terminal
1479 column_width
= int(width
/ n
)
1483 ret
+= '{0:>{w}} |'.format(elem
, w
=column_width
- 2)
1487 def get_pretty_header(self
, elems
: Sequence
[str], width
: int) -> str:
1489 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1491 :param elems: the elements to be printed
1492 :param width: the width of the terminal
1495 column_width
= int(width
/ n
)
1499 for i
in range(0, n
):
1500 ret
+= '-' * (column_width
- 1) + '+'
1504 ret
+= self
.get_pretty_row(elems
, width
)
1509 for i
in range(0, n
):
1510 ret
+= '-' * (column_width
- 1) + '+'
1516 def get_server(self
, hostname
: str) -> ServerInfoT
:
1518 Called by the plugin to fetch metadata about a particular hostname from
1521 This is information that ceph-mgr has gleaned from the daemon metadata
1522 reported by daemons running on a particular server.
1524 :param hostname: a hostname
1526 return cast(ServerInfoT
, self
._ceph
_get
_server
(hostname
))
1529 def get_perf_schema(self
,
1531 svc_name
: str) -> Dict
[str,
1532 Dict
[str, Dict
[str, Union
[str, int]]]]:
1534 Called by the plugin to fetch perf counter schema info.
1535 svc_name can be nullptr, as can svc_type, in which case
1538 :param str svc_type:
1539 :param str svc_name:
1540 :return: list of dicts describing the counters requested
1542 return self
._ceph
_get
_perf
_schema
(svc_type
, svc_name
)
1544 def get_rocksdb_version(self
) -> str:
1546 Called by the plugin to fetch the latest RocksDB version number.
1548 :return: str representing the major, minor, and patch RocksDB version numbers
1550 return self
._ceph
_get
_rocksdb
_version
()
1553 def get_counter(self
,
1556 path
: str) -> Dict
[str, List
[Tuple
[float, int]]]:
1558 Called by the plugin to fetch the latest performance counter data for a
1559 particular counter on a particular service.
1561 :param str svc_type:
1562 :param str svc_name:
1563 :param str path: a period-separated concatenation of the subsystem and the
1564 counter name, for example "mds.inodes".
1565 :return: A dict of counter names to their values. each value is a list of
1566 of two-tuples of (timestamp, value). This may be empty if no data is
1569 return self
._ceph
_get
_counter
(svc_type
, svc_name
, path
)
1572 def get_latest_counter(self
,
1575 path
: str) -> Dict
[str, Union
[Tuple
[float, int],
1576 Tuple
[float, int, int]]]:
1578 Called by the plugin to fetch only the newest performance counter data
1579 point for a particular counter on a particular service.
1581 :param str svc_type:
1582 :param str svc_name:
1583 :param str path: a period-separated concatenation of the subsystem and the
1584 counter name, for example "mds.inodes".
1585 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1586 (timestamp, value, count) is returned. This may be empty if no
1589 return self
._ceph
_get
_latest
_counter
(svc_type
, svc_name
, path
)
1592 def list_servers(self
) -> List
[ServerInfoT
]:
1594 Like ``get_server``, but gives information about all servers (i.e. all
1595 unique hostnames that have been mentioned in daemon metadata)
1597 :return: a list of information about all servers
1600 return cast(List
[ServerInfoT
], self
._ceph
_get
_server
(None))
1602 def get_metadata(self
,
1605 default
: Optional
[Dict
[str, str]] = None) -> Optional
[Dict
[str, str]]:
1607 Fetch the daemon metadata for a particular service.
1609 ceph-mgr fetches metadata asynchronously, so are windows of time during
1610 addition/removal of services where the metadata is not available to
1611 modules. ``None`` is returned if no metadata is available.
1613 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1614 :param str svc_id: service id. convert OSD integer IDs to strings when
1616 :rtype: dict, or None if no metadata found
1618 metadata
= self
._ceph
_get
_metadata
(svc_type
, svc_id
)
1624 def get_daemon_status(self
, svc_type
: str, svc_id
: str) -> Dict
[str, str]:
1626 Fetch the latest status for a particular service daemon.
1628 This method may return ``None`` if no status information is
1629 available, for example because the daemon hasn't fully started yet.
1631 :param svc_type: string (e.g., 'rgw')
1632 :param svc_id: string
1633 :return: dict, or None if the service is not found
1635 return self
._ceph
_get
_daemon
_status
(svc_type
, svc_id
)
1637 def check_mon_command(self
, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1639 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1643 r
= HandleCommandResult(*self
.mon_command(cmd_dict
, inbuf
))
1645 raise MonCommandFailed(f
'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1648 def mon_command(self
, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1650 Helper for modules that do simple, synchronous mon command
1653 See send_command for general case.
1655 :return: status int, out std, err str
1659 result
= CommandResult()
1660 self
.send_command(result
, "mon", "", json
.dumps(cmd_dict
), "", inbuf
)
1664 self
.log
.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1665 cmd_dict
['prefix'], r
[0], t2
- t1
1670 def osd_command(self
, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1672 Helper for osd command execution.
1674 See send_command for general case. Also, see osd/OSD.cc for available commands.
1676 :param dict cmd_dict: expects a prefix and an osd id, i.e.:
1678 'prefix': 'perf histogram dump',
1681 :return: status int, out std, err str
1684 result
= CommandResult()
1685 self
.send_command(result
, "osd", cmd_dict
['id'], json
.dumps(cmd_dict
), "", inbuf
)
1689 self
.log
.debug("osd_command: '{0}' -> {1} in {2:.3f}s".format(
1690 cmd_dict
['prefix'], r
[0], t2
- t1
1695 def tell_command(self
, daemon_type
: str, daemon_id
: str, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1697 Helper for `ceph tell` command execution.
1699 See send_command for general case.
1701 :param dict cmd_dict: expects a prefix i.e.:
1706 :return: status int, out std, err str
1709 result
= CommandResult()
1710 self
.send_command(result
, daemon_type
, daemon_id
, json
.dumps(cmd_dict
), "", inbuf
)
1714 self
.log
.debug("tell_command on {0}.{1}: '{2}' -> {3} in {4:.5f}s".format(
1715 daemon_type
, daemon_id
, cmd_dict
['prefix'], r
[0], t2
- t1
1722 result
: CommandResult
,
1727 inbuf
: Optional
[str] = None) -> None:
1729 Called by the plugin to send a command to the mon
1732 :param CommandResult result: an instance of the ``CommandResult``
1733 class, defined in the same module as MgrModule. This acts as a
1734 completion and stores the output of the command. Use
1735 ``CommandResult.wait()`` if you want to block on completion.
1736 :param str svc_type:
1738 :param str command: a JSON-serialized command. This uses the same
1739 format as the ceph command line, which is a dictionary of command
1740 arguments, with the extra ``prefix`` key containing the command
1741 name itself. Consult MonCommands.h for available commands and
1742 their expected arguments.
1743 :param str tag: used for nonblocking operation: when a command
1744 completes, the ``notify()`` callback on the MgrModule instance is
1745 triggered, with notify_type set to "command", and notify_id set to
1746 the tag of the command.
1747 :param str inbuf: input buffer for sending additional data.
1749 self
._ceph
_send
_command
(result
, svc_type
, svc_id
, command
, tag
, inbuf
)
1755 stdin
: Optional
[bytes
] = None
1756 ) -> Tuple
[int, str, str]:
1761 '-k', str(self
.get_ceph_option('keyring')),
1762 '-n', f
'mgr.{self.get_mgr_id()}',
1764 self
.log
.debug('exec: ' + ' '.join(cmd
))
1768 stdout
=subprocess
.PIPE
,
1769 stderr
=subprocess
.PIPE
,
1772 except subprocess
.TimeoutExpired
as ex
:
1774 return -errno
.ETIMEDOUT
, '', str(ex
)
1776 self
.log
.error(f
'Non-zero return from {cmd}: {p.stderr.decode()}')
1777 return p
.returncode
, p
.stdout
.decode(), p
.stderr
.decode()
1779 def set_health_checks(self
, checks
: HealthChecksT
) -> None:
1781 Set the module's current map of health checks. Argument is a
1782 dict of check names to info, in this form:
1788 'severity': 'warning', # or 'error'
1789 'summary': 'summary string',
1790 'count': 4, # quantify badness
1791 'detail': [ 'list', 'of', 'detail', 'strings' ],
1794 'severity': 'error',
1795 'summary': 'bars are bad',
1796 'detail': [ 'too hard' ],
1800 :param list: dict of health check dicts
1802 self
._ceph
_set
_health
_checks
(checks
)
1804 def _handle_command(self
,
1806 cmd
: Dict
[str, Any
]) -> Union
[HandleCommandResult
,
1807 Tuple
[int, str, str]]:
1808 if cmd
['prefix'] not in CLICommand
.COMMANDS
:
1809 return self
.handle_command(inbuf
, cmd
)
1811 return CLICommand
.COMMANDS
[cmd
['prefix']].call(self
, cmd
, inbuf
)
1813 def handle_command(self
,
1815 cmd
: Dict
[str, Any
]) -> Union
[HandleCommandResult
,
1816 Tuple
[int, str, str]]:
1818 Called by ceph-mgr to request the plugin to handle one
1819 of the commands that it declared in self.COMMANDS
1821 Return a status code, an output buffer, and an
1822 output string. The output buffer is for data results,
1823 the output string is for informative text.
1825 :param inbuf: content of any "-i <file>" supplied to ceph cli
1827 :param cmd: from Ceph's cmdmap_t
1830 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1833 # Should never get called if they didn't declare
1835 raise NotImplementedError()
1837 def get_mgr_id(self
) -> str:
1839 Retrieve the name of the manager daemon where this plugin
1840 is currently being executed (i.e. the active manager).
1844 return self
._ceph
_get
_mgr
_id
()
1847 def get_ceph_conf_path(self
) -> str:
1848 return self
._ceph
_get
_ceph
_conf
_path
()
1851 def get_mgr_ip(self
) -> str:
1852 if not self
._mgr
_ips
:
1853 ips
= self
.get("mgr_ips").get('ips', [])
1855 return socket
.gethostname()
1856 self
._mgr
_ips
= ips
[0]
1857 assert self
._mgr
_ips
is not None
1858 return self
._mgr
_ips
1861 def get_hostname(self
) -> str:
1862 return socket
.gethostname()
1865 def get_ceph_option(self
, key
: str) -> OptionValue
:
1866 return self
._ceph
_get
_option
(key
)
1869 def get_foreign_ceph_option(self
, entity
: str, key
: str) -> OptionValue
:
1870 return self
._ceph
_get
_foreign
_option
(entity
, key
)
1872 def _validate_module_option(self
, key
: str) -> None:
1874 Helper: don't allow get/set config callers to
1875 access config options that they didn't declare
1878 if key
not in [o
['name'] for o
in self
.MODULE_OPTIONS
]:
1879 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1880 format(key
, self
.__class
__.__name
__))
1882 def _get_module_option(self
,
1884 default
: OptionValue
,
1885 localized_prefix
: str = "") -> OptionValue
:
1886 r
= self
._ceph
_get
_module
_option
(self
.module_name
, key
,
1889 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
1893 def get_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
1895 Retrieve the value of a persistent configuration setting
1897 self
._validate
_module
_option
(key
)
1898 return self
._get
_module
_option
(key
, default
)
1900 def get_module_option_ex(self
, module
: str,
1902 default
: OptionValue
= None) -> OptionValue
:
1904 Retrieve the value of a persistent configuration setting
1905 for the specified module.
1907 :param module: The name of the module, e.g. 'dashboard'
1909 :param key: The configuration key, e.g. 'server_addr'.
1910 :param default: The default value to use when the
1911 returned value is ``None``. Defaults to ``None``.
1913 if module
== self
.module_name
:
1914 self
._validate
_module
_option
(key
)
1915 r
= self
._ceph
_get
_module
_option
(module
, key
)
1916 return default
if r
is None else r
1919 def get_store_prefix(self
, key_prefix
: str) -> Dict
[str, str]:
1921 Retrieve a dict of KV store keys to values, where the keys
1922 have the given prefix
1924 :param str key_prefix:
1927 return self
._ceph
_get
_store
_prefix
(key_prefix
)
1929 def _set_localized(self
,
1932 setter
: Callable
[[str, Optional
[str]], None]) -> None:
1933 return setter(_get_localized_key(self
.get_mgr_id(), key
), val
)
1935 def get_localized_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
1937 Retrieve localized configuration for this ceph-mgr instance
1939 self
._validate
_module
_option
(key
)
1940 return self
._get
_module
_option
(key
, default
, self
.get_mgr_id())
1942 def _set_module_option(self
, key
: str, val
: Any
) -> None:
1943 return self
._ceph
_set
_module
_option
(self
.module_name
, key
,
1944 None if val
is None else str(val
))
1946 def set_module_option(self
, key
: str, val
: Any
) -> None:
1948 Set the value of a persistent configuration setting
1951 :type val: str | None
1952 :raises ValueError: if `val` cannot be parsed or it is out of the specified range
1954 self
._validate
_module
_option
(key
)
1955 return self
._set
_module
_option
(key
, val
)
1957 def set_module_option_ex(self
, module
: str, key
: str, val
: OptionValue
) -> None:
1959 Set the value of a persistent configuration setting
1960 for the specified module.
1966 if module
== self
.module_name
:
1967 self
._validate
_module
_option
(key
)
1968 return self
._ceph
_set
_module
_option
(module
, key
, str(val
))
1972 def set_localized_module_option(self
, key
: str, val
: Optional
[str]) -> None:
1974 Set localized configuration for this ceph-mgr instance
1979 self
._validate
_module
_option
(key
)
1980 return self
._set
_localized
(key
, val
, self
._set
_module
_option
)
1984 def set_store(self
, key
: str, val
: Optional
[str]) -> None:
1986 Set a value in this module's persistent key value store.
1987 If val is None, remove key from store
1989 self
._ceph
_set
_store
(key
, val
)
1992 def get_store(self
, key
: str, default
: Optional
[str] = None) -> Optional
[str]:
1994 Get a value from this module's persistent key value store
1996 r
= self
._ceph
_get
_store
(key
)
2003 def get_localized_store(self
, key
: str, default
: Optional
[str] = None) -> Optional
[str]:
2004 r
= self
._ceph
_get
_store
(_get_localized_key(self
.get_mgr_id(), key
))
2006 r
= self
._ceph
_get
_store
(key
)
2013 def set_localized_store(self
, key
: str, val
: Optional
[str]) -> None:
2014 return self
._set
_localized
(key
, val
, self
.set_store
)
2016 def self_test(self
) -> Optional
[str]:
2018 Run a self-test on the module. Override this function and implement
2019 a best as possible self-test for (automated) testing of the module
2021 Indicate any failures by raising an exception. This does not have
2022 to be pretty, it's mainly for picking up regressions during
2023 development, rather than use in the field.
2025 :return: None, or an advisory string for developer interest, such
2026 as a json dump of some state.
2030 def get_osdmap(self
) -> OSDMap
:
2032 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
2036 return cast(OSDMap
, self
._ceph
_get
_osdmap
())
2039 def get_latest(self
, daemon_type
: str, daemon_name
: str, counter
: str) -> int:
2040 data
= self
.get_latest_counter(
2041 daemon_type
, daemon_name
, counter
)[counter
]
2048 def get_latest_avg(self
, daemon_type
: str, daemon_name
: str, counter
: str) -> Tuple
[int, int]:
2049 data
= self
.get_latest_counter(
2050 daemon_type
, daemon_name
, counter
)[counter
]
2052 # https://github.com/python/mypy/issues/1178
2053 _
, value
, count
= cast(Tuple
[float, int, int], data
)
2060 def get_unlabeled_perf_counters(self
, prio_limit
: int = PRIO_USEFUL
,
2061 services
: Sequence
[str] = ("mds", "mon", "osd",
2062 "rbd-mirror", "rgw",
2063 "tcmu-runner")) -> Dict
[str, dict]:
2065 Return the perf counters currently known to this ceph-mgr
2066 instance, filtered by priority equal to or greater than `prio_limit`.
2068 The result is a map of string to dict, associating services
2069 (like "osd.123") with their counters. The counter
2070 dict for each service maps counter paths to a counter
2071 info structure, which is the information from
2072 the schema, plus an additional "value" member with the latest
2076 result
= defaultdict(dict) # type: Dict[str, dict]
2078 for server
in self
.list_servers():
2079 for service
in cast(List
[ServiceInfoT
], server
['services']):
2080 if service
['type'] not in services
:
2083 schemas
= self
.get_perf_schema(service
['type'], service
['id'])
2085 self
.log
.warning("No perf counter schema for {0}.{1}".format(
2086 service
['type'], service
['id']
2090 # Value is returned in a potentially-multi-service format,
2091 # get just the service we're asking about
2092 svc_full_name
= "{0}.{1}".format(
2093 service
['type'], service
['id'])
2094 schema
= schemas
[svc_full_name
]
2096 # Populate latest values
2097 for counter_path
, counter_schema
in schema
.items():
2098 # self.log.debug("{0}: {1}".format(
2099 # counter_path, json.dumps(counter_schema)
2101 priority
= counter_schema
['priority']
2102 assert isinstance(priority
, int)
2103 if priority
< prio_limit
:
2106 tp
= counter_schema
['type']
2107 assert isinstance(tp
, int)
2108 counter_info
= dict(counter_schema
)
2109 # Also populate count for the long running avgs
2110 if tp
& self
.PERFCOUNTER_LONGRUNAVG
:
2111 v
, c
= self
.get_latest_avg(
2116 counter_info
['value'], counter_info
['count'] = v
, c
2117 result
[svc_full_name
][counter_path
] = counter_info
2119 counter_info
['value'] = self
.get_latest(
2125 result
[svc_full_name
][counter_path
] = counter_info
2127 self
.log
.debug("returning {0} counter".format(len(result
)))
2132 def set_uri(self
, uri
: str) -> None:
2134 If the module exposes a service, then call this to publish the
2135 address once it is available.
2139 return self
._ceph
_set
_uri
(uri
)
2143 def set_device_wear_level(self
, devid
: str, wear_level
: float) -> None:
2144 return self
._ceph
_set
_device
_wear
_level
(devid
, wear_level
)
2147 def have_mon_connection(self
) -> bool:
2149 Check whether this ceph-mgr daemon has an open connection
2150 to a monitor. If it doesn't, then it's likely that the
2151 information we have about the cluster is out of date,
2152 and/or the monitor cluster is down.
2155 return self
._ceph
_have
_mon
_connection
()
2157 def update_progress_event(self
,
2161 add_to_ceph_s
: bool) -> None:
2162 return self
._ceph
_update
_progress
_event
(evid
, desc
, progress
, add_to_ceph_s
)
2166 def complete_progress_event(self
, evid
: str) -> None:
2167 return self
._ceph
_complete
_progress
_event
(evid
)
2171 def clear_all_progress_events(self
) -> None:
2172 return self
._ceph
_clear
_all
_progress
_events
()
2175 def rados(self
) -> rados
.Rados
:
2177 A librados instance to be shared by any classes within
2178 this mgr module that want one.
2183 ctx_capsule
= self
.get_context()
2184 self
._rados
= rados
.Rados(context
=ctx_capsule
)
2185 self
._rados
.connect()
2186 self
._ceph
_register
_client
(None, self
._rados
.get_addrs(), False)
2190 def can_run() -> Tuple
[bool, str]:
2192 Implement this function to report whether the module's dependencies
2193 are met. For example, if the module needs to import a particular
2194 dependency to work, then use a try/except around the import at
2195 file scope, and then report here if the import failed.
2197 This will be called in a blocking way from the C++ code, so do not
2198 do any I/O that could block in this function.
2200 :return a 2-tuple consisting of a boolean and explanatory string
2206 def remote(self
, module_name
: str, method_name
: str, *args
: Any
, **kwargs
: Any
) -> Any
:
2208 Invoke a method on another module. All arguments, and the return
2209 value from the other module must be serializable.
2211 Limitation: Do not import any modules within the called method.
2212 Otherwise you will get an error in Python 2::
2214 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
2218 :param module_name: Name of other module. If module isn't loaded,
2219 an ImportError exception is raised.
2220 :param method_name: Method name. If it does not exist, a NameError
2221 exception is raised.
2222 :param args: Argument tuple
2223 :param kwargs: Keyword argument dict
2224 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
2225 :raises ImportError: No such module
2227 return self
._ceph
_dispatch
_remote
(module_name
, method_name
,
2230 def add_osd_perf_query(self
, query
: Dict
[str, Any
]) -> Optional
[int]:
2232 Register an OSD perf query. Argument is a
2233 dict of the query parameters, in this form:
2239 {'type': subkey_type, 'regex': regex_pattern},
2242 'performance_counter_descriptors': [
2243 list, of, descriptor, types
2245 'limit': {'order_by': performance_counter_type, 'max_count': n},
2249 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
2250 'pg_id', 'object_name', 'snap_id'
2251 Valid performance counter types:
2252 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
2253 'latency', 'write_latency', 'read_latency'
2255 :param object query: query
2256 :rtype: int (query id)
2258 return self
._ceph
_add
_osd
_perf
_query
(query
)
2262 def remove_osd_perf_query(self
, query_id
: int) -> None:
2264 Unregister an OSD perf query.
2266 :param int query_id: query ID
2268 return self
._ceph
_remove
_osd
_perf
_query
(query_id
)
2271 def get_osd_perf_counters(self
, query_id
: int) -> Optional
[Dict
[str, List
[PerfCounterT
]]]:
2273 Get stats collected for an OSD perf query.
2275 :param int query_id: query ID
2277 return self
._ceph
_get
_osd
_perf
_counters
(query_id
)
2279 def add_mds_perf_query(self
, query
: Dict
[str, Any
]) -> Optional
[int]:
2281 Register an MDS perf query. Argument is a
2282 dict of the query parameters, in this form:
2288 {'type': subkey_type, 'regex': regex_pattern},
2291 'performance_counter_descriptors': [
2292 list, of, descriptor, types
2296 NOTE: 'limit' and 'order_by' are not supported (yet).
2299 'mds_rank', 'client_id'
2300 Valid performance counter types:
2303 :param object query: query
2304 :rtype: int (query id)
2306 return self
._ceph
_add
_mds
_perf
_query
(query
)
2310 def remove_mds_perf_query(self
, query_id
: int) -> None:
2312 Unregister an MDS perf query.
2314 :param int query_id: query ID
2316 return self
._ceph
_remove
_mds
_perf
_query
(query_id
)
2320 def reregister_mds_perf_queries(self
) -> None:
2322 Re-register MDS perf queries.
2324 return self
._ceph
_reregister
_mds
_perf
_queries
()
2326 def get_mds_perf_counters(self
, query_id
: int) -> Optional
[Dict
[str, List
[PerfCounterT
]]]:
2328 Get stats collected for an MDS perf query.
2330 :param int query_id: query ID
2332 return self
._ceph
_get
_mds
_perf
_counters
(query_id
)
2334 def get_daemon_health_metrics(self
) -> Dict
[str, List
[Dict
[str, Any
]]]:
2336 Get the list of health metrics per daemon. This includes SLOW_OPS health metrics
2337 in MON and OSD daemons, and PENDING_CREATING_PGS health metrics for OSDs.
2339 return self
._ceph
_get
_daemon
_health
_metrics
()
2341 def is_authorized(self
, arguments
: Dict
[str, str]) -> bool:
2343 Verifies that the current session caps permit executing the py service
2344 or current module with the provided arguments. This provides a generic
2345 way to allow modules to restrict by more fine-grained controls (e.g.
2348 :param arguments: dict of key/value arguments to test
2350 return self
._ceph
_is
_authorized
(arguments
)
2353 def send_rgwadmin_command(self
, args
: List
[str],
2354 stdout_as_json
: bool = True) -> Tuple
[int, Union
[str, dict], str]:
2358 '-c', str(self
.get_ceph_conf_path()),
2359 '-k', str(self
.get_ceph_option('keyring')),
2360 '-n', f
'mgr.{self.get_mgr_id()}',
2362 self
.log
.debug('Executing %s', str(cmd
))
2363 result
= subprocess
.run( # pylint: disable=subprocess-run-check
2365 stdout
=subprocess
.PIPE
,
2366 stderr
=subprocess
.PIPE
,
2369 stdout
= result
.stdout
.decode('utf-8')
2370 stderr
= result
.stderr
.decode('utf-8')
2371 if stdout
and stdout_as_json
:
2372 stdout
= json
.loads(stdout
)
2373 if result
.returncode
:
2374 self
.log
.debug('Error %s executing %s: %s', result
.returncode
, str(cmd
), stderr
)
2375 return result
.returncode
, stdout
, stderr
2376 except subprocess
.CalledProcessError
as ex
:
2377 self
.log
.exception('Error executing radosgw-admin %s: %s', str(ex
.cmd
), str(ex
.output
))
2379 except subprocess
.TimeoutExpired
as ex
:
2380 self
.log
.error('Timeout (10s) executing radosgw-admin %s', str(ex
.cmd
))