1 import ceph_module
# noqa
3 from typing
import cast
, Tuple
, Any
, Dict
, Generic
, Optional
, Callable
, List
, \
4 Mapping
, NamedTuple
, Sequence
, Union
, Set
, TYPE_CHECKING
7 if sys
.version_info
>= (3, 8):
8 from typing
import Literal
10 from typing_extensions
import Literal
19 from collections
import defaultdict
20 from enum
import IntEnum
, Enum
27 from ceph_argparse
import CephArgtype
28 from mgr_util
import profile_method
30 if sys
.version_info
>= (3, 8):
31 from typing
import get_args
, get_origin
33 def get_args(tp
: Any
) -> Any
:
37 return getattr(tp
, '__args__', ())
39 def get_origin(tp
: Any
) -> Any
:
40 return getattr(tp
, '__origin__', None)
43 ERROR_MSG_EMPTY_INPUT_FILE
= 'Empty input file'
44 ERROR_MSG_NO_INPUT_FILE
= 'Input file not specified'
45 # Full list of strings in "osd_types.cc:pg_state_string()"
83 NFS_GANESHA_SUPPORTED_FSALS
= ['CEPH', 'RGW']
84 NFS_POOL_NAME
= '.nfs'
87 class NotifyType(str, Enum
):
89 pg_summary
= 'pg_summary'
96 # these are disabled because there are no users.
98 # service_map = 'service_map'
99 # mon_status = 'mon_status'
100 # see DaemonServer.cc:
101 # perf_schema_update = 'perf_schema_update'
104 class CommandResult(object):
106 Use with MgrModule.send_command
109 def __init__(self
, tag
: Optional
[str] = None):
110 self
.ev
= threading
.Event()
115 # This is just a convenience for notifications from
116 # C++ land, to avoid passing addresses around in messages.
117 self
.tag
= tag
if tag
else ""
119 def complete(self
, r
: int, outb
: str, outs
: str) -> None:
125 def wait(self
) -> Tuple
[int, str, str]:
127 return self
.r
, self
.outb
, self
.outs
130 class HandleCommandResult(NamedTuple
):
132 Tuple containing the result of `handle_command()`
134 Only write to stderr if there is an error, or in extraordinary circumstances
136 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
137 is critical information to include there.
139 Everything programmatically consumable should be put on stdout
141 retval
: int = 0 # return code. E.g. 0 or -errno.EINVAL
142 stdout
: str = "" # data of this result.
143 stderr
: str = "" # Typically used for error messages.
146 class MonCommandFailed(RuntimeError): pass
147 class MgrDBNotReady(RuntimeError): pass
150 class OSDMap(ceph_module
.BasePyOSDMap
):
151 def get_epoch(self
) -> int:
152 return self
._get
_epoch
()
154 def get_crush_version(self
) -> int:
155 return self
._get
_crush
_version
()
157 def dump(self
) -> Dict
[str, Any
]:
160 def get_pools(self
) -> Dict
[int, Dict
[str, Any
]]:
161 # FIXME: efficient implementation
163 return dict([(p
['pool'], p
) for p
in d
['pools']])
165 def get_pools_by_name(self
) -> Dict
[str, Dict
[str, Any
]]:
166 # FIXME: efficient implementation
168 return dict([(p
['pool_name'], p
) for p
in d
['pools']])
170 def new_incremental(self
) -> 'OSDMapIncremental':
171 return self
._new
_incremental
()
173 def apply_incremental(self
, inc
: 'OSDMapIncremental') -> 'OSDMap':
174 return self
._apply
_incremental
(inc
)
176 def get_crush(self
) -> 'CRUSHMap':
177 return self
._get
_crush
()
179 def get_pools_by_take(self
, take
: int) -> List
[int]:
180 return self
._get
_pools
_by
_take
(take
).get('pools', [])
182 def calc_pg_upmaps(self
, inc
: 'OSDMapIncremental',
184 max_iterations
: int = 10,
185 pools
: Optional
[List
[str]] = None) -> int:
188 return self
._calc
_pg
_upmaps
(
190 max_deviation
, max_iterations
, pools
)
192 def map_pool_pgs_up(self
, poolid
: int) -> List
[int]:
193 return self
._map
_pool
_pgs
_up
(poolid
)
195 def pg_to_up_acting_osds(self
, pool_id
: int, ps
: int) -> Dict
[str, Any
]:
196 return self
._pg
_to
_up
_acting
_osds
(pool_id
, ps
)
198 def pool_raw_used_rate(self
, pool_id
: int) -> float:
199 return self
._pool
_raw
_used
_rate
(pool_id
)
202 def build_simple(cls
, epoch
: int = 1, uuid
: Optional
[str] = None, num_osd
: int = -1) -> 'ceph_module.BasePyOSDMap':
203 return cls
._build
_simple
(epoch
, uuid
, num_osd
)
205 def get_ec_profile(self
, name
: str) -> Optional
[List
[Dict
[str, str]]]:
206 # FIXME: efficient implementation
208 return d
['erasure_code_profiles'].get(name
, None)
210 def get_require_osd_release(self
) -> str:
212 return d
['require_osd_release']
215 class OSDMapIncremental(ceph_module
.BasePyOSDMapIncremental
):
216 def get_epoch(self
) -> int:
217 return self
._get
_epoch
()
219 def dump(self
) -> Dict
[str, Any
]:
222 def set_osd_reweights(self
, weightmap
: Dict
[int, float]) -> None:
224 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
226 return self
._set
_osd
_reweights
(weightmap
)
228 def set_crush_compat_weight_set_weights(self
, weightmap
: Dict
[str, float]) -> None:
230 weightmap is a dict, int to float. devices only. e.g.,
231 { 0: 3.4, 1: 3.3, 2: 3.334 }
233 return self
._set
_crush
_compat
_weight
_set
_weights
(weightmap
)
236 class CRUSHMap(ceph_module
.BasePyCRUSH
):
237 ITEM_NONE
= 0x7fffffff
238 DEFAULT_CHOOSE_ARGS
= '-1'
240 def dump(self
) -> Dict
[str, Any
]:
243 def get_item_weight(self
, item
: int) -> Optional
[int]:
244 return self
._get
_item
_weight
(item
)
246 def get_item_name(self
, item
: int) -> Optional
[str]:
247 return self
._get
_item
_name
(item
)
249 def find_takes(self
) -> List
[int]:
250 return self
._find
_takes
().get('takes', [])
252 def find_roots(self
) -> List
[int]:
253 return self
._find
_roots
().get('roots', [])
255 def get_take_weight_osd_map(self
, root
: int) -> Dict
[int, float]:
256 uglymap
= self
._get
_take
_weight
_osd
_map
(root
)
257 return {int(k
): v
for k
, v
in uglymap
.get('weights', {}).items()}
260 def have_default_choose_args(dump
: Dict
[str, Any
]) -> bool:
261 return CRUSHMap
.DEFAULT_CHOOSE_ARGS
in dump
.get('choose_args', {})
264 def get_default_choose_args(dump
: Dict
[str, Any
]) -> List
[Dict
[str, Any
]]:
265 choose_args
= dump
.get('choose_args')
266 assert isinstance(choose_args
, dict)
267 return choose_args
.get(CRUSHMap
.DEFAULT_CHOOSE_ARGS
, [])
269 def get_rule(self
, rule_name
: str) -> Optional
[Dict
[str, Any
]]:
270 # TODO efficient implementation
271 for rule
in self
.dump()['rules']:
272 if rule_name
== rule
['rule_name']:
277 def get_rule_by_id(self
, rule_id
: int) -> Optional
[Dict
[str, Any
]]:
278 for rule
in self
.dump()['rules']:
279 if rule
['rule_id'] == rule_id
:
284 def get_rule_root(self
, rule_name
: str) -> Optional
[int]:
285 rule
= self
.get_rule(rule_name
)
290 first_take
= next(s
for s
in rule
['steps'] if s
.get('op') == 'take')
291 except StopIteration:
292 logging
.warning("CRUSH rule '{0}' has no 'take' step".format(
296 return first_take
['item']
298 def get_osds_under(self
, root_id
: int) -> List
[int]:
299 # TODO don't abuse dump like this
301 buckets
= dict([(b
['id'], b
) for b
in d
['buckets']])
305 def accumulate(b
: Dict
[str, Any
]) -> None:
306 for item
in b
['items']:
308 osd_list
.append(item
['id'])
311 accumulate(buckets
[item
['id']])
315 accumulate(buckets
[root_id
])
319 def device_class_counts(self
) -> Dict
[str, int]:
320 result
= defaultdict(int) # type: Dict[str, int]
321 # TODO don't abuse dump like this
323 for device
in d
['devices']:
324 cls
= device
.get('class', None)
330 HandlerFuncType
= Callable
[..., Tuple
[int, str, str]]
332 def _extract_target_func(
334 ) -> Tuple
[HandlerFuncType
, Dict
[str, Any
]]:
335 """In order to interoperate with other decorated functions,
336 we need to find the original function which will provide
337 the main set of arguments. While we descend through the
338 stack of wrapped functions, gather additional arguments
339 the decorators may want to provide.
341 # use getattr to keep mypy happy
342 wrapped
= getattr(f
, "__wrapped__", None)
345 extra_args
: Dict
[str, Any
] = {}
346 while wrapped
is not None:
347 extra_args
.update(getattr(f
, "extra_args", {}))
349 wrapped
= getattr(f
, "__wrapped__", None)
353 class CLICommand(object):
354 COMMANDS
= {} # type: Dict[str, CLICommand]
363 self
.func
= None # type: Optional[Callable]
364 self
.arg_spec
= {} # type: Dict[str, Any]
365 self
.first_default
= -1
367 KNOWN_ARGS
= '_', 'self', 'mgr', 'inbuf', 'return'
370 def _load_func_metadata(cls
: Any
, f
: HandlerFuncType
) -> Tuple
[str, Dict
[str, Any
], int, str]:
371 f
, extra_args
= _extract_target_func(f
)
372 desc
= (inspect
.getdoc(f
) or '').replace('\n', ' ')
373 full_argspec
= inspect
.getfullargspec(f
)
374 arg_spec
= full_argspec
.annotations
375 first_default
= len(arg_spec
)
376 if full_argspec
.defaults
:
377 first_default
-= len(full_argspec
.defaults
)
380 for index
, arg
in enumerate(full_argspec
.args
):
381 if arg
in cls
.KNOWN_ARGS
:
382 # record that this function takes an inbuf if it is present
383 # in the full_argspec and not already in the arg_spec
384 if arg
== 'inbuf' and 'inbuf' not in arg_spec
:
385 arg_spec
['inbuf'] = 'str'
387 if arg
== '_end_positional_':
392 or arg_spec
[arg
] is Optional
[bool]
393 or arg_spec
[arg
] is bool
395 # implicit switch to non-positional on any
396 # Optional[bool] or the --format option
398 assert arg
in arg_spec
, \
399 f
"'{arg}' is not annotated for {f}: {full_argspec}"
400 has_default
= index
>= first_default
401 args
.append(CephArgtype
.to_argdesc(arg_spec
[arg
],
405 for argname
, argtype
in extra_args
.items():
406 # avoid shadowing args from the function
407 if argname
in arg_spec
:
409 arg_spec
[argname
] = argtype
410 args
.append(CephArgtype
.to_argdesc(
411 argtype
, dict(name
=argname
), has_default
=True, positional
=False
413 return desc
, arg_spec
, first_default
, ' '.join(args
)
415 def store_func_metadata(self
, f
: HandlerFuncType
) -> None:
416 self
.desc
, self
.arg_spec
, self
.first_default
, self
.args
= \
417 self
._load
_func
_metadata
(f
)
419 def __call__(self
, func
: HandlerFuncType
) -> HandlerFuncType
:
420 self
.store_func_metadata(func
)
422 self
.COMMANDS
[self
.prefix
] = self
425 def _get_arg_value(self
, kwargs_switch
: bool, key
: str, val
: Any
) -> Tuple
[bool, str, Any
]:
426 def start_kwargs() -> bool:
427 if isinstance(val
, str) and '=' in val
:
428 k
, v
= val
.split('=', 1)
429 if k
in self
.arg_spec
:
433 if not kwargs_switch
:
434 kwargs_switch
= start_kwargs()
437 k
, v
= val
.split('=', 1)
440 return kwargs_switch
, k
.replace('-', '_'), v
442 def _collect_args_by_argspec(self
, cmd_dict
: Dict
[str, Any
]) -> Tuple
[Dict
[str, Any
], Set
[str]]:
445 kwargs_switch
= False
446 for index
, (name
, tp
) in enumerate(self
.arg_spec
.items()):
447 if name
in CLICommand
.KNOWN_ARGS
:
448 special_args
.add(name
)
450 assert self
.first_default
>= 0
451 raw_v
= cmd_dict
.get(name
)
452 if index
>= self
.first_default
:
455 kwargs_switch
, k
, v
= self
._get
_arg
_value
(kwargs_switch
,
457 kwargs
[k
] = CephArgtype
.cast_to(tp
, v
)
458 return kwargs
, special_args
462 cmd_dict
: Dict
[str, Any
],
463 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
464 kwargs
, specials
= self
._collect
_args
_by
_argspec
(cmd_dict
)
466 if 'inbuf' not in specials
:
467 return HandleCommandResult(
470 'Invalid command: Input file data (-i) not supported',
472 kwargs
['inbuf'] = inbuf
474 return self
.func(mgr
, **kwargs
)
476 def dump_cmd(self
) -> Dict
[str, Union
[str, bool]]:
478 'cmd': '{} {}'.format(self
.prefix
, self
.args
),
485 def dump_cmd_list(cls
) -> List
[Dict
[str, Union
[str, bool]]]:
486 return [cmd
.dump_cmd() for cmd
in cls
.COMMANDS
.values()]
489 def CLIReadCommand(prefix
: str, poll
: bool = False) -> CLICommand
:
490 return CLICommand(prefix
, "r", poll
)
493 def CLIWriteCommand(prefix
: str, poll
: bool = False) -> CLICommand
:
494 return CLICommand(prefix
, "w", poll
)
497 def CLICheckNonemptyFileInput(desc
: str) -> Callable
[[HandlerFuncType
], HandlerFuncType
]:
498 def CheckFileInput(func
: HandlerFuncType
) -> HandlerFuncType
:
499 @functools.wraps(func
)
500 def check(*args
: Any
, **kwargs
: Any
) -> Tuple
[int, str, str]:
501 if 'inbuf' not in kwargs
:
502 return -errno
.EINVAL
, '', f
'{ERROR_MSG_NO_INPUT_FILE}: Please specify the file '\
503 f
'containing {desc} with "-i" option'
504 if isinstance(kwargs
['inbuf'], str):
505 # Delete new line separator at EOF (it may have been added by a text editor).
506 kwargs
['inbuf'] = kwargs
['inbuf'].rstrip('\r\n').rstrip('\n')
507 if not kwargs
['inbuf'] or not kwargs
['inbuf'].strip():
508 return -errno
.EINVAL
, '', f
'{ERROR_MSG_EMPTY_INPUT_FILE}: Please add {desc} to '\
510 return func(*args
, **kwargs
)
511 check
.__signature
__ = inspect
.signature(func
) # type: ignore[attr-defined]
513 return CheckFileInput
515 def CLIRequiresDB(func
: HandlerFuncType
) -> HandlerFuncType
:
516 @functools.wraps(func
)
517 def check(self
: MgrModule
, *args
: Any
, **kwargs
: Any
) -> Tuple
[int, str, str]:
518 if not self
.db_ready():
519 return -errno
.EAGAIN
, "", "mgr db not yet available"
520 return func(self
, *args
, **kwargs
)
521 check
.__signature
__ = inspect
.signature(func
) # type: ignore[attr-defined]
524 def _get_localized_key(prefix
: str, key
: str) -> str:
525 return '{}/{}'.format(prefix
, key
)
529 MODULE_OPTIONS types and Option Class
532 OptionTypeLabel
= Literal
[
533 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
536 # common/options.h: value_t
537 OptionValue
= Optional
[Union
[bool, int, float, str]]
542 Helper class to declare options for MODULE_OPTIONS list.
543 TODO: Replace with typing.TypedDict when in python_version >= 3.8
549 default
: OptionValue
= None,
550 type: 'OptionTypeLabel' = 'str',
551 desc
: Optional
[str] = None,
552 long_desc
: Optional
[str] = None,
553 min: OptionValue
= None,
554 max: OptionValue
= None,
555 enum_allowed
: Optional
[List
[str]] = None,
556 tags
: Optional
[List
[str]] = None,
557 see_also
: Optional
[List
[str]] = None,
558 runtime
: bool = False,
560 super(Option
, self
).__init
__(
561 (k
, v
) for k
, v
in vars().items()
562 if k
!= 'self' and v
is not None)
567 Helper class to declare options for COMMANDS list.
569 It also allows to specify prefix and args separately, as well as storing a
573 >>> def handler(): return 0, "", ""
574 >>> Command(prefix="example",
577 {'perm': 'w', 'poll': False}
583 handler
: HandlerFuncType
,
587 super().__init
__(perm
=perm
,
590 self
.handler
= handler
593 def returns_command_result(instance
: Any
,
594 f
: HandlerFuncType
) -> Callable
[..., HandleCommandResult
]:
596 def wrapper(mgr
: Any
, *args
: Any
, **kwargs
: Any
) -> HandleCommandResult
:
597 retval
, stdout
, stderr
= f(instance
or mgr
, *args
, **kwargs
)
598 return HandleCommandResult(retval
, stdout
, stderr
)
599 wrapper
.__signature
__ = inspect
.signature(f
) # type: ignore[attr-defined]
602 def register(self
, instance
: bool = False) -> HandlerFuncType
:
604 Register a CLICommand handler. It allows an instance to register bound
605 methods. In that case, the mgr instance is not passed, and it's expected
606 to be available in the class instance.
607 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
610 cmd
= CLICommand(prefix
=self
.prefix
, perm
=self
['perm'])
611 return cmd(self
.returns_command_result(instance
, self
.handler
))
614 class CPlusPlusHandler(logging
.Handler
):
615 def __init__(self
, module_inst
: Any
):
616 super(CPlusPlusHandler
, self
).__init
__()
617 self
._module
= module_inst
618 self
.setFormatter(logging
.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
619 .format(module_inst
.module_name
)))
621 def emit(self
, record
: logging
.LogRecord
) -> None:
622 if record
.levelno
>= self
.level
:
623 self
._module
._ceph
_log
(self
.format(record
))
626 class ClusterLogHandler(logging
.Handler
):
627 def __init__(self
, module_inst
: Any
):
629 self
._module
= module_inst
630 self
.setFormatter(logging
.Formatter("%(message)s"))
632 def emit(self
, record
: logging
.LogRecord
) -> None:
634 logging
.DEBUG
: MgrModule
.ClusterLogPrio
.DEBUG
,
635 logging
.INFO
: MgrModule
.ClusterLogPrio
.INFO
,
636 logging
.WARNING
: MgrModule
.ClusterLogPrio
.WARN
,
637 logging
.ERROR
: MgrModule
.ClusterLogPrio
.ERROR
,
638 logging
.CRITICAL
: MgrModule
.ClusterLogPrio
.ERROR
,
640 level
= levelmap
[record
.levelno
]
641 if record
.levelno
>= self
.level
:
642 self
._module
.cluster_log(self
._module
.module_name
,
647 class FileHandler(logging
.FileHandler
):
648 def __init__(self
, module_inst
: Any
):
649 path
= module_inst
.get_ceph_option("log_file")
650 idx
= path
.rfind(".log")
652 self
.path
= "{}.{}.log".format(path
[:idx
], module_inst
.module_name
)
654 self
.path
= "{}.{}".format(path
, module_inst
.module_name
)
655 super(FileHandler
, self
).__init
__(self
.path
, delay
=True)
656 self
.setFormatter(logging
.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
659 class MgrModuleLoggingMixin(object):
660 def _configure_logging(self
,
665 log_to_cluster
: bool) -> None:
666 self
._mgr
_level
: Optional
[str] = None
667 self
._module
_level
: Optional
[str] = None
668 self
._root
_logger
= logging
.getLogger()
670 self
._unconfigure
_logging
()
672 # the ceph log handler is initialized only once
673 self
._mgr
_log
_handler
= CPlusPlusHandler(self
)
674 self
._cluster
_log
_handler
= ClusterLogHandler(self
)
675 self
._file
_log
_handler
= FileHandler(self
)
677 self
.log_to_file
= log_to_file
678 self
.log_to_cluster
= log_to_cluster
680 self
._root
_logger
.addHandler(self
._mgr
_log
_handler
)
682 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
684 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
686 self
._root
_logger
.setLevel(logging
.NOTSET
)
687 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
689 def _unconfigure_logging(self
) -> None:
690 # remove existing handlers:
692 h
for h
in self
._root
_logger
.handlers
693 if (isinstance(h
, CPlusPlusHandler
) or
694 isinstance(h
, FileHandler
) or
695 isinstance(h
, ClusterLogHandler
))]
696 for h
in rm_handlers
:
697 self
._root
_logger
.removeHandler(h
)
698 self
.log_to_file
= False
699 self
.log_to_cluster
= False
701 def _set_log_level(self
,
704 cluster_level
: str) -> None:
705 self
._cluster
_log
_handler
.setLevel(cluster_level
.upper())
707 module_level
= module_level
.upper() if module_level
else ''
708 if not self
._module
_level
:
709 # using debug_mgr level
710 if not module_level
and self
._mgr
_level
== mgr_level
:
711 # no change in module level neither in debug_mgr
714 if self
._module
_level
== module_level
:
715 # no change in module level
718 if not self
._module
_level
and not module_level
:
719 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
720 self
.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
722 elif self
._module
_level
and not module_level
:
723 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
724 self
.getLogger().warning("unsetting module log level, falling back to "
725 "debug_mgr level: %s (%s)", level
, mgr_level
)
728 self
.getLogger().debug("setting log level: %s", level
)
730 self
._module
_level
= module_level
731 self
._mgr
_level
= mgr_level
733 self
._mgr
_log
_handler
.setLevel(level
)
734 self
._file
_log
_handler
.setLevel(level
)
736 def _enable_file_log(self
) -> None:
738 self
.getLogger().warning("enabling logging to file")
739 self
.log_to_file
= True
740 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
742 def _disable_file_log(self
) -> None:
744 self
.getLogger().warning("disabling logging to file")
745 self
.log_to_file
= False
746 self
._root
_logger
.removeHandler(self
._file
_log
_handler
)
748 def _enable_cluster_log(self
) -> None:
750 self
.getLogger().warning("enabling logging to cluster")
751 self
.log_to_cluster
= True
752 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
754 def _disable_cluster_log(self
) -> None:
755 # disable cluster log
756 self
.getLogger().warning("disabling logging to cluster")
757 self
.log_to_cluster
= False
758 self
._root
_logger
.removeHandler(self
._cluster
_log
_handler
)
760 def _ceph_log_level_to_python(self
, log_level
: str) -> str:
763 ceph_log_level
= int(log_level
.split("/", 1)[0])
770 if ceph_log_level
<= 0:
771 log_level
= "CRITICAL"
772 elif ceph_log_level
<= 1:
773 log_level
= "WARNING"
774 elif ceph_log_level
<= 4:
778 def getLogger(self
, name
: Optional
[str] = None) -> logging
.Logger
:
779 return logging
.getLogger(name
)
782 class MgrStandbyModule(ceph_module
.BaseMgrStandbyModule
, MgrModuleLoggingMixin
):
784 Standby modules only implement a serve and shutdown method, they
785 are not permitted to implement commands and they do not receive
788 They only have access to the mgrmap (for accessing service URI info
789 from their active peer), and to configuration settings (read only).
792 MODULE_OPTIONS
: List
[Option
] = []
793 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
795 def __init__(self
, module_name
: str, capsule
: Any
):
796 super(MgrStandbyModule
, self
).__init
__(capsule
)
797 self
.module_name
= module_name
799 # see also MgrModule.__init__()
800 for o
in self
.MODULE_OPTIONS
:
803 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
805 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
807 # mock does not return a str
808 mgr_level
= cast(str, self
.get_ceph_option("debug_mgr"))
809 log_level
= cast(str, self
.get_module_option("log_level"))
810 cluster_level
= cast(str, self
.get_module_option('log_to_cluster_level'))
811 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
814 # for backwards compatibility
815 self
._logger
= self
.getLogger()
817 def __del__(self
) -> None:
819 self
._unconfigure
_logging
()
821 def _cleanup(self
) -> None:
825 def _register_options(cls
, module_name
: str) -> None:
826 cls
.MODULE_OPTIONS
.append(
827 Option(name
='log_level', type='str', default
="", runtime
=True,
828 enum_allowed
=['info', 'debug', 'critical', 'error',
830 cls
.MODULE_OPTIONS
.append(
831 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
832 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
833 cls
.MODULE_OPTIONS
.append(
834 Option(name
='log_to_cluster', type='bool', default
=False,
836 cls
.MODULE_OPTIONS
.append(
837 Option(name
='log_to_cluster_level', type='str', default
='info',
839 enum_allowed
=['info', 'debug', 'critical', 'error',
843 def log(self
) -> logging
.Logger
:
846 def serve(self
) -> None:
848 The serve method is mandatory for standby modules.
851 raise NotImplementedError()
853 def get_mgr_id(self
) -> str:
854 return self
._ceph
_get
_mgr
_id
()
856 def get_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
858 Retrieve the value of a persistent configuration setting
860 :param default: the default value of the config if it is not found
862 r
= self
._ceph
_get
_module
_option
(key
)
864 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
868 def get_ceph_option(self
, key
: str) -> OptionValue
:
869 return self
._ceph
_get
_option
(key
)
871 def get_store(self
, key
: str) -> Optional
[str]:
873 Retrieve the value of a persistent KV store entry
876 :return: Byte string or None
878 return self
._ceph
_get
_store
(key
)
880 def get_localized_store(self
, key
: str, default
: Optional
[str] = None) -> Optional
[str]:
881 r
= self
._ceph
_get
_store
(_get_localized_key(self
.get_mgr_id(), key
))
883 r
= self
._ceph
_get
_store
(key
)
888 def get_active_uri(self
) -> str:
889 return self
._ceph
_get
_active
_uri
()
891 def get(self
, data_name
: str) -> Dict
[str, Any
]:
892 return self
._ceph
_get
(data_name
)
894 def get_mgr_ip(self
) -> str:
895 ips
= self
.get("mgr_ips").get('ips', [])
897 return socket
.gethostname()
900 def get_hostname(self
) -> str:
901 return socket
.gethostname()
903 def get_localized_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
904 r
= self
._ceph
_get
_module
_option
(key
, self
.get_mgr_id())
906 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
911 HealthChecksT
= Mapping
[str, Mapping
[str, Union
[int, str, Sequence
[str]]]]
912 # {"type": service_type, "id": service_id}
913 ServiceInfoT
= Dict
[str, str]
914 # {"hostname": hostname,
915 # "ceph_version": version,
916 # "services": [service_info, ..]}
917 ServerInfoT
= Dict
[str, Union
[str, List
[ServiceInfoT
]]]
918 PerfCounterT
= Dict
[str, Any
]
922 def DecoratorFactory(attr
: str, default
: Any
): # type: ignore
923 class DecoratorClass
:
924 _ATTR_TOKEN
= f
'__ATTR_{attr.upper()}__'
926 def __init__(self
, value
: Any
=default
) -> None:
929 def __call__(self
, func
: Callable
) -> Any
:
930 setattr(func
, self
._ATTR
_TOKEN
, self
.value
)
934 def get(cls
, func
: Callable
) -> Any
:
935 return getattr(func
, cls
._ATTR
_TOKEN
, default
)
937 return DecoratorClass
939 perm
= DecoratorFactory('perm', default
='r')
940 expose
= DecoratorFactory('expose', default
=False)(True)
943 class MgrModule(ceph_module
.BaseMgrModule
, MgrModuleLoggingMixin
):
944 MGR_POOL_NAME
= ".mgr"
946 COMMANDS
= [] # type: List[Any]
947 MODULE_OPTIONS
: List
[Option
] = []
948 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
951 SCHEMA
= None # type: Optional[str]
952 SCHEMA_VERSIONED
= None # type: Optional[List[str]]
954 # Priority definitions for perf counters
958 PRIO_UNINTERESTING
= 2
961 # counter value types
966 PERFCOUNTER_LONGRUNAVG
= 4
967 PERFCOUNTER_COUNTER
= 8
968 PERFCOUNTER_HISTOGRAM
= 0x10
969 PERFCOUNTER_TYPE_MASK
= ~
3
975 # Cluster log priorities
976 class ClusterLogPrio(IntEnum
):
983 def __init__(self
, module_name
: str, py_modules_ptr
: object, this_ptr
: object):
984 self
.module_name
= module_name
985 super(MgrModule
, self
).__init
__(py_modules_ptr
, this_ptr
)
987 for o
in self
.MODULE_OPTIONS
:
990 # we'll assume the declared type matches the
991 # supplied default value's type.
992 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
994 # module not declaring it's type, so normalize the
995 # default value to be a string for consistent behavior
996 # with default and user-supplied option values.
997 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
999 mgr_level
= cast(str, self
.get_ceph_option("debug_mgr"))
1000 log_level
= cast(str, self
.get_module_option("log_level"))
1001 cluster_level
= cast(str, self
.get_module_option('log_to_cluster_level'))
1002 log_to_file
= self
.get_module_option("log_to_file")
1003 assert isinstance(log_to_file
, bool)
1004 log_to_cluster
= self
.get_module_option("log_to_cluster")
1005 assert isinstance(log_to_cluster
, bool)
1006 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
1007 log_to_file
, log_to_cluster
)
1009 # for backwards compatibility
1010 self
._logger
= self
.getLogger()
1012 self
._db
= None # type: Optional[sqlite3.Connection]
1014 self
._version
= self
._ceph
_get
_version
()
1016 self
._perf
_schema
_cache
= None
1018 # Keep a librados instance for those that need it.
1019 self
._rados
: Optional
[rados
.Rados
] = None
1021 # this does not change over the lifetime of an active mgr
1022 self
._mgr
_ips
: Optional
[str] = None
1024 self
._db
_lock
= threading
.Lock()
1026 def __del__(self
) -> None:
1027 self
._unconfigure
_logging
()
1030 def _register_options(cls
, module_name
: str) -> None:
1031 cls
.MODULE_OPTIONS
.append(
1032 Option(name
='log_level', type='str', default
="", runtime
=True,
1033 enum_allowed
=['info', 'debug', 'critical', 'error',
1035 cls
.MODULE_OPTIONS
.append(
1036 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
1037 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
1038 cls
.MODULE_OPTIONS
.append(
1039 Option(name
='log_to_cluster', type='bool', default
=False,
1041 cls
.MODULE_OPTIONS
.append(
1042 Option(name
='log_to_cluster_level', type='str', default
='info',
1044 enum_allowed
=['info', 'debug', 'critical', 'error',
1048 def _register_commands(cls
, module_name
: str) -> None:
1049 cls
.COMMANDS
.extend(CLICommand
.dump_cmd_list())
1052 def log(self
) -> logging
.Logger
:
1055 def cluster_log(self
, channel
: str, priority
: ClusterLogPrio
, message
: str) -> None:
1057 :param channel: The log channel. This can be 'cluster', 'audit', ...
1058 :param priority: The log message priority.
1059 :param message: The message to log.
1061 self
._ceph
_cluster
_log
(channel
, priority
.value
, message
)
1064 def version(self
) -> str:
1065 return self
._version
1068 def pool_exists(self
, name
: str) -> bool:
1069 pools
= [p
['pool_name'] for p
in self
.get('osd_map')['pools']]
1070 return name
in pools
1073 def have_enough_osds(self
) -> bool:
1074 # wait until we have enough OSDs to allow the pool to be healthy
1076 for osd
in self
.get("osd_map")["osds"]:
1077 if osd
["up"] and osd
["in"]:
1080 need
= cast(int, self
.get_ceph_option("osd_pool_default_size"))
1081 return ready
>= need
1085 def rename_pool(self
, srcpool
: str, destpool
: str) -> None:
1087 'prefix': 'osd pool rename',
1090 'destpool': destpool
,
1091 'yes_i_really_mean_it': True
1093 self
.check_mon_command(c
)
1097 def create_pool(self
, pool
: str) -> None:
1099 'prefix': 'osd pool create',
1105 'yes_i_really_mean_it': True
1107 self
.check_mon_command(c
)
1111 def appify_pool(self
, pool
: str, app
: str) -> None:
1113 'prefix': 'osd pool application enable',
1117 'yes_i_really_mean_it': True
1119 self
.check_mon_command(c
)
1123 def create_mgr_pool(self
) -> None:
1124 self
.log
.info("creating mgr pool")
1126 ov
= self
.get_module_option_ex('devicehealth', 'pool_name', 'device_health_metrics')
1127 devhealth
= cast(str, ov
)
1128 if devhealth
is not None and self
.pool_exists(devhealth
):
1129 self
.log
.debug("reusing devicehealth pool")
1130 self
.rename_pool(devhealth
, self
.MGR_POOL_NAME
)
1131 self
.appify_pool(self
.MGR_POOL_NAME
, 'mgr')
1133 self
.log
.debug("creating new mgr pool")
1134 self
.create_pool(self
.MGR_POOL_NAME
)
1135 self
.appify_pool(self
.MGR_POOL_NAME
, 'mgr')
1137 def create_skeleton_schema(self
, db
: sqlite3
.Connection
) -> None:
1139 CREATE TABLE IF NOT EXISTS MgrModuleKV (
1140 key TEXT PRIMARY KEY,
1143 INSERT OR IGNORE INTO MgrModuleKV (key, value) VALUES ('__version', 0);
1146 db
.executescript(SQL
)
1148 def update_schema_version(self
, db
: sqlite3
.Connection
, version
: int) -> None:
1149 SQL
= "UPDATE OR ROLLBACK MgrModuleKV SET value = ? WHERE key = '__version';"
1151 db
.execute(SQL
, (version
,))
1153 def set_kv(self
, key
: str, value
: Any
) -> None:
1154 SQL
= "INSERT OR REPLACE INTO MgrModuleKV (key, value) VALUES (?, ?);"
1156 assert key
[:2] != "__"
1158 self
.log
.debug(f
"set_kv('{key}', '{value}')")
1160 with self
._db
_lock
, self
.db
:
1161 self
.db
.execute(SQL
, (key
, value
))
1164 def get_kv(self
, key
: str) -> Any
:
1165 SQL
= "SELECT value FROM MgrModuleKV WHERE key = ?;"
1167 assert key
[:2] != "__"
1169 self
.log
.debug(f
"get_kv('{key}')")
1171 with self
._db
_lock
, self
.db
:
1172 cur
= self
.db
.execute(SQL
, (key
,))
1173 row
= cur
.fetchone()
1178 self
.log
.debug(f
" = {v}")
1181 def maybe_upgrade(self
, db
: sqlite3
.Connection
, version
: int) -> None:
1183 self
.log
.info(f
"creating main.db for {self.module_name}")
1184 assert self
.SCHEMA
is not None
1185 cur
= db
.executescript(self
.SCHEMA
)
1186 self
.update_schema_version(db
, 1)
1188 assert self
.SCHEMA_VERSIONED
is not None
1189 latest
= len(self
.SCHEMA_VERSIONED
)
1190 if latest
< version
:
1191 raise RuntimeError(f
"main.db version is newer ({version}) than module ({latest})")
1192 for i
in range(version
, latest
):
1193 self
.log
.info(f
"upgrading main.db for {self.module_name} from {i-1}:{i}")
1194 SQL
= self
.SCHEMA_VERSIONED
[i
]
1195 db
.executescript(SQL
)
1196 if version
< latest
:
1197 self
.update_schema_version(db
, latest
)
1199 def load_schema(self
, db
: sqlite3
.Connection
) -> None:
1201 SELECT value FROM MgrModuleKV WHERE key = '__version';
1205 self
.create_skeleton_schema(db
)
1206 cur
= db
.execute(SQL
)
1207 row
= cur
.fetchone()
1208 self
.maybe_upgrade(db
, int(row
['value']))
1209 assert cur
.fetchone() is None
1212 def configure_db(self
, db
: sqlite3
.Connection
) -> None:
1213 db
.execute('PRAGMA FOREIGN_KEYS = 1')
1214 db
.execute('PRAGMA JOURNAL_MODE = PERSIST')
1215 db
.execute('PRAGMA PAGE_SIZE = 65536')
1216 db
.execute('PRAGMA CACHE_SIZE = 64')
1217 db
.execute('PRAGMA TEMP_STORE = memory')
1218 db
.row_factory
= sqlite3
.Row
1219 self
.load_schema(db
)
1221 def open_db(self
) -> Optional
[sqlite3
.Connection
]:
1222 if not self
.pool_exists(self
.MGR_POOL_NAME
):
1223 if not self
.have_enough_osds():
1225 self
.create_mgr_pool()
1226 uri
= f
"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
1227 self
.log
.debug(f
"using uri {uri}")
1228 db
= sqlite3
.connect(uri
, check_same_thread
=False, uri
=True)
1229 self
.configure_db(db
)
1233 def db_ready(self
) -> bool:
1236 return self
.db
is not None
1237 except MgrDBNotReady
:
1241 def db(self
) -> sqlite3
.Connection
:
1242 assert self
._db
_lock
.locked()
1243 if self
._db
is not None:
1245 db_allowed
= self
.get_ceph_option("mgr_pool")
1247 raise MgrDBNotReady();
1248 self
._db
= self
.open_db()
1249 if self
._db
is None:
1250 raise MgrDBNotReady();
1254 def release_name(self
) -> str:
1256 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
1257 :return: Returns the release name of the Ceph version in lower case.
1260 return self
._ceph
_get
_release
_name
()
1263 def lookup_release_name(self
, major
: int) -> str:
1264 return self
._ceph
_lookup
_release
_name
(major
)
1266 def get_context(self
) -> object:
1268 :return: a Python capsule containing a C++ CephContext pointer
1270 return self
._ceph
_get
_context
()
1272 def notify(self
, notify_type
: NotifyType
, notify_id
: str) -> None:
1274 Called by the ceph-mgr service to notify the Python plugin
1275 that new state is available. This method is *only* called for
1276 notify_types that are listed in the NOTIFY_TYPES string list
1277 member of the module class.
1279 :param notify_type: string indicating what kind of notification,
1280 such as osd_map, mon_map, fs_map, mon_status,
1281 health, pg_summary, command, service_map
1282 :param notify_id: string (may be empty) that optionally specifies
1283 which entity is being notified about. With
1284 "command" notifications this is set to the tag
1285 ``from send_command``.
1289 def _config_notify(self
) -> None:
1290 # check logging options for changes
1291 mgr_level
= cast(str, self
.get_ceph_option("debug_mgr"))
1292 module_level
= cast(str, self
.get_module_option("log_level"))
1293 cluster_level
= cast(str, self
.get_module_option("log_to_cluster_level"))
1294 assert isinstance(cluster_level
, str)
1295 log_to_file
= self
.get_module_option("log_to_file", False)
1296 assert isinstance(log_to_file
, bool)
1297 log_to_cluster
= self
.get_module_option("log_to_cluster", False)
1298 assert isinstance(log_to_cluster
, bool)
1299 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
1301 if log_to_file
!= self
.log_to_file
:
1303 self
._enable
_file
_log
()
1305 self
._disable
_file
_log
()
1306 if log_to_cluster
!= self
.log_to_cluster
:
1308 self
._enable
_cluster
_log
()
1310 self
._disable
_cluster
_log
()
1312 # call module subclass implementations
1313 self
.config_notify()
1315 def config_notify(self
) -> None:
1317 Called by the ceph-mgr service to notify the Python plugin
1318 that the configuration may have changed. Modules will want to
1319 refresh any configuration values stored in config variables.
1323 def serve(self
) -> None:
1325 Called by the ceph-mgr service to start any server that
1326 is provided by this Python plugin. The implementation
1327 of this function should block until ``shutdown`` is called.
1329 You *must* implement ``shutdown`` if you implement ``serve``
1333 def shutdown(self
) -> None:
1335 Called by the ceph-mgr service to request that this
1336 module drop out of its serve() function. You do not
1337 need to implement this if you do not implement serve()
1342 addrs
= self
._rados
.get_addrs()
1343 self
._rados
.shutdown()
1344 self
._ceph
_unregister
_client
(addrs
)
1348 def get(self
, data_name
: str) -> Any
:
1350 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1352 :param str data_name: Valid things to fetch are osdmap_crush_map_text,
1353 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1354 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
1355 health, mon_status, devices, device <devid>, pg_stats,
1356 pool_stats, pg_ready, osd_ping_times, mgr_map, mgr_ips,
1357 modified_config_options, service_map, mds_metadata,
1358 have_local_config_map, osd_pool_stats, pg_status.
1361 All these structures have their own JSON representations: experiment
1362 or look at the C++ ``dump()`` methods to learn about them.
1364 obj
= self
._ceph
_get
(data_name
)
1365 if isinstance(obj
, bytes
):
1366 obj
= json
.loads(obj
)
1370 def _stattype_to_str(self
, stattype
: int) -> str:
1372 typeonly
= stattype
& self
.PERFCOUNTER_TYPE_MASK
1375 if typeonly
== self
.PERFCOUNTER_LONGRUNAVG
:
1376 # this lie matches the DaemonState decoding: only val, no counts
1378 if typeonly
== self
.PERFCOUNTER_COUNTER
:
1380 if typeonly
== self
.PERFCOUNTER_HISTOGRAM
:
1385 def _perfpath_to_path_labels(self
, daemon
: str,
1386 path
: str) -> Tuple
[str, Tuple
[str, ...], Tuple
[str, ...]]:
1387 if daemon
.startswith('rgw.'):
1388 label_name
= 'instance_id'
1389 daemon
= daemon
[len('rgw.'):]
1391 label_name
= 'ceph_daemon'
1393 label_names
= (label_name
,) # type: Tuple[str, ...]
1394 labels
= (daemon
,) # type: Tuple[str, ...]
1396 if daemon
.startswith('rbd-mirror.'):
1398 r
'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
1402 path
= 'rbd_mirror_image_' + match
.group(4)
1403 pool
= match
.group(1)
1404 namespace
= match
.group(2) or ''
1405 image
= match
.group(3)
1406 label_names
+= ('pool', 'namespace', 'image')
1407 labels
+= (pool
, namespace
, image
)
1409 return path
, label_names
, labels
,
1411 def _perfvalue_to_value(self
, stattype
: int, value
: Union
[int, float]) -> Union
[float, int]:
1412 if stattype
& self
.PERFCOUNTER_TIME
:
1413 # Convert from ns to seconds
1414 return value
/ 1000000000.0
1418 def _unit_to_str(self
, unit
: int) -> str:
1419 if unit
== self
.NONE
:
1421 elif unit
== self
.BYTES
:
1424 raise ValueError(f
'bad unit "{unit}"')
1427 def to_pretty_iec(n
: int) -> str:
1428 for bits
, suffix
in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1429 (20, 'Mi'), (10, 'Ki')]:
1431 return str(n
>> bits
) + ' ' + suffix
1435 def get_pretty_row(elems
: Sequence
[str], width
: int) -> str:
1437 Takes an array of elements and returns a string with those elements
1438 formatted as a table row. Useful for polling modules.
1440 :param elems: the elements to be printed
1441 :param width: the width of the terminal
1444 column_width
= int(width
/ n
)
1448 ret
+= '{0:>{w}} |'.format(elem
, w
=column_width
- 2)
1452 def get_pretty_header(self
, elems
: Sequence
[str], width
: int) -> str:
1454 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1456 :param elems: the elements to be printed
1457 :param width: the width of the terminal
1460 column_width
= int(width
/ n
)
1464 for i
in range(0, n
):
1465 ret
+= '-' * (column_width
- 1) + '+'
1469 ret
+= self
.get_pretty_row(elems
, width
)
1474 for i
in range(0, n
):
1475 ret
+= '-' * (column_width
- 1) + '+'
1481 def get_server(self
, hostname
: str) -> ServerInfoT
:
1483 Called by the plugin to fetch metadata about a particular hostname from
1486 This is information that ceph-mgr has gleaned from the daemon metadata
1487 reported by daemons running on a particular server.
1489 :param hostname: a hostname
1491 return cast(ServerInfoT
, self
._ceph
_get
_server
(hostname
))
1494 def get_perf_schema(self
,
1496 svc_name
: str) -> Dict
[str,
1497 Dict
[str, Dict
[str, Union
[str, int]]]]:
1499 Called by the plugin to fetch perf counter schema info.
1500 svc_name can be nullptr, as can svc_type, in which case
1503 :param str svc_type:
1504 :param str svc_name:
1505 :return: list of dicts describing the counters requested
1507 return self
._ceph
_get
_perf
_schema
(svc_type
, svc_name
)
1509 def get_rocksdb_version(self
) -> str:
1511 Called by the plugin to fetch the latest RocksDB version number.
1513 :return: str representing the major, minor, and patch RocksDB version numbers
1515 return self
._ceph
_get
_rocksdb
_version
()
1518 def get_counter(self
,
1521 path
: str) -> Dict
[str, List
[Tuple
[float, int]]]:
1523 Called by the plugin to fetch the latest performance counter data for a
1524 particular counter on a particular service.
1526 :param str svc_type:
1527 :param str svc_name:
1528 :param str path: a period-separated concatenation of the subsystem and the
1529 counter name, for example "mds.inodes".
1530 :return: A dict of counter names to their values. each value is a list of
1531 of two-tuples of (timestamp, value). This may be empty if no data is
1534 return self
._ceph
_get
_counter
(svc_type
, svc_name
, path
)
1537 def get_latest_counter(self
,
1540 path
: str) -> Dict
[str, Union
[Tuple
[float, int],
1541 Tuple
[float, int, int]]]:
1543 Called by the plugin to fetch only the newest performance counter data
1544 point for a particular counter on a particular service.
1546 :param str svc_type:
1547 :param str svc_name:
1548 :param str path: a period-separated concatenation of the subsystem and the
1549 counter name, for example "mds.inodes".
1550 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1551 (timestamp, value, count) is returned. This may be empty if no
1554 return self
._ceph
_get
_latest
_counter
(svc_type
, svc_name
, path
)
1557 def list_servers(self
) -> List
[ServerInfoT
]:
1559 Like ``get_server``, but gives information about all servers (i.e. all
1560 unique hostnames that have been mentioned in daemon metadata)
1562 :return: a list of information about all servers
1565 return cast(List
[ServerInfoT
], self
._ceph
_get
_server
(None))
1567 def get_metadata(self
,
1570 default
: Optional
[Dict
[str, str]] = None) -> Optional
[Dict
[str, str]]:
1572 Fetch the daemon metadata for a particular service.
1574 ceph-mgr fetches metadata asynchronously, so are windows of time during
1575 addition/removal of services where the metadata is not available to
1576 modules. ``None`` is returned if no metadata is available.
1578 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1579 :param str svc_id: service id. convert OSD integer IDs to strings when
1581 :rtype: dict, or None if no metadata found
1583 metadata
= self
._ceph
_get
_metadata
(svc_type
, svc_id
)
1589 def get_daemon_status(self
, svc_type
: str, svc_id
: str) -> Dict
[str, str]:
1591 Fetch the latest status for a particular service daemon.
1593 This method may return ``None`` if no status information is
1594 available, for example because the daemon hasn't fully started yet.
1596 :param svc_type: string (e.g., 'rgw')
1597 :param svc_id: string
1598 :return: dict, or None if the service is not found
1600 return self
._ceph
_get
_daemon
_status
(svc_type
, svc_id
)
1602 def check_mon_command(self
, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1604 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1608 r
= HandleCommandResult(*self
.mon_command(cmd_dict
, inbuf
))
1610 raise MonCommandFailed(f
'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1613 def mon_command(self
, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1615 Helper for modules that do simple, synchronous mon command
1618 See send_command for general case.
1620 :return: status int, out std, err str
1624 result
= CommandResult()
1625 self
.send_command(result
, "mon", "", json
.dumps(cmd_dict
), "", inbuf
)
1629 self
.log
.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1630 cmd_dict
['prefix'], r
[0], t2
- t1
1635 def osd_command(self
, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1637 Helper for osd command execution.
1639 See send_command for general case. Also, see osd/OSD.cc for available commands.
1641 :param dict cmd_dict: expects a prefix and an osd id, i.e.:
1643 'prefix': 'perf histogram dump',
1646 :return: status int, out std, err str
1649 result
= CommandResult()
1650 self
.send_command(result
, "osd", cmd_dict
['id'], json
.dumps(cmd_dict
), "", inbuf
)
1654 self
.log
.debug("osd_command: '{0}' -> {1} in {2:.3f}s".format(
1655 cmd_dict
['prefix'], r
[0], t2
- t1
1660 def tell_command(self
, daemon_type
: str, daemon_id
: str, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1662 Helper for `ceph tell` command execution.
1664 See send_command for general case.
1666 :param dict cmd_dict: expects a prefix i.e.:
1671 :return: status int, out std, err str
1674 result
= CommandResult()
1675 self
.send_command(result
, daemon_type
, daemon_id
, json
.dumps(cmd_dict
), "", inbuf
)
1679 self
.log
.debug("tell_command on {0}.{1}: '{2}' -> {3} in {4:.5f}s".format(
1680 daemon_type
, daemon_id
, cmd_dict
['prefix'], r
[0], t2
- t1
1687 result
: CommandResult
,
1692 inbuf
: Optional
[str] = None) -> None:
1694 Called by the plugin to send a command to the mon
1697 :param CommandResult result: an instance of the ``CommandResult``
1698 class, defined in the same module as MgrModule. This acts as a
1699 completion and stores the output of the command. Use
1700 ``CommandResult.wait()`` if you want to block on completion.
1701 :param str svc_type:
1703 :param str command: a JSON-serialized command. This uses the same
1704 format as the ceph command line, which is a dictionary of command
1705 arguments, with the extra ``prefix`` key containing the command
1706 name itself. Consult MonCommands.h for available commands and
1707 their expected arguments.
1708 :param str tag: used for nonblocking operation: when a command
1709 completes, the ``notify()`` callback on the MgrModule instance is
1710 triggered, with notify_type set to "command", and notify_id set to
1711 the tag of the command.
1712 :param str inbuf: input buffer for sending additional data.
1714 self
._ceph
_send
_command
(result
, svc_type
, svc_id
, command
, tag
, inbuf
)
1720 stdin
: Optional
[bytes
] = None
1721 ) -> Tuple
[int, str, str]:
1726 '-k', str(self
.get_ceph_option('keyring')),
1727 '-n', f
'mgr.{self.get_mgr_id()}',
1729 self
.log
.debug('exec: ' + ' '.join(cmd
))
1733 stdout
=subprocess
.PIPE
,
1734 stderr
=subprocess
.PIPE
,
1737 except subprocess
.TimeoutExpired
as ex
:
1739 return -errno
.ETIMEDOUT
, '', str(ex
)
1741 self
.log
.error(f
'Non-zero return from {cmd}: {p.stderr.decode()}')
1742 return p
.returncode
, p
.stdout
.decode(), p
.stderr
.decode()
1744 def set_health_checks(self
, checks
: HealthChecksT
) -> None:
1746 Set the module's current map of health checks. Argument is a
1747 dict of check names to info, in this form:
1753 'severity': 'warning', # or 'error'
1754 'summary': 'summary string',
1755 'count': 4, # quantify badness
1756 'detail': [ 'list', 'of', 'detail', 'strings' ],
1759 'severity': 'error',
1760 'summary': 'bars are bad',
1761 'detail': [ 'too hard' ],
1765 :param list: dict of health check dicts
1767 self
._ceph
_set
_health
_checks
(checks
)
1769 def _handle_command(self
,
1771 cmd
: Dict
[str, Any
]) -> Union
[HandleCommandResult
,
1772 Tuple
[int, str, str]]:
1773 if cmd
['prefix'] not in CLICommand
.COMMANDS
:
1774 return self
.handle_command(inbuf
, cmd
)
1776 return CLICommand
.COMMANDS
[cmd
['prefix']].call(self
, cmd
, inbuf
)
1778 def handle_command(self
,
1780 cmd
: Dict
[str, Any
]) -> Union
[HandleCommandResult
,
1781 Tuple
[int, str, str]]:
1783 Called by ceph-mgr to request the plugin to handle one
1784 of the commands that it declared in self.COMMANDS
1786 Return a status code, an output buffer, and an
1787 output string. The output buffer is for data results,
1788 the output string is for informative text.
1790 :param inbuf: content of any "-i <file>" supplied to ceph cli
1792 :param cmd: from Ceph's cmdmap_t
1795 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1798 # Should never get called if they didn't declare
1800 raise NotImplementedError()
1802 def get_mgr_id(self
) -> str:
1804 Retrieve the name of the manager daemon where this plugin
1805 is currently being executed (i.e. the active manager).
1809 return self
._ceph
_get
_mgr
_id
()
1812 def get_ceph_conf_path(self
) -> str:
1813 return self
._ceph
_get
_ceph
_conf
_path
()
1816 def get_mgr_ip(self
) -> str:
1817 if not self
._mgr
_ips
:
1818 ips
= self
.get("mgr_ips").get('ips', [])
1820 return socket
.gethostname()
1821 self
._mgr
_ips
= ips
[0]
1822 assert self
._mgr
_ips
is not None
1823 return self
._mgr
_ips
1826 def get_hostname(self
) -> str:
1827 return socket
.gethostname()
1830 def get_ceph_option(self
, key
: str) -> OptionValue
:
1831 return self
._ceph
_get
_option
(key
)
1834 def get_foreign_ceph_option(self
, entity
: str, key
: str) -> OptionValue
:
1835 return self
._ceph
_get
_foreign
_option
(entity
, key
)
1837 def _validate_module_option(self
, key
: str) -> None:
1839 Helper: don't allow get/set config callers to
1840 access config options that they didn't declare
1843 if key
not in [o
['name'] for o
in self
.MODULE_OPTIONS
]:
1844 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1845 format(key
, self
.__class
__.__name
__))
1847 def _get_module_option(self
,
1849 default
: OptionValue
,
1850 localized_prefix
: str = "") -> OptionValue
:
1851 r
= self
._ceph
_get
_module
_option
(self
.module_name
, key
,
1854 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
1858 def get_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
1860 Retrieve the value of a persistent configuration setting
1862 self
._validate
_module
_option
(key
)
1863 return self
._get
_module
_option
(key
, default
)
1865 def get_module_option_ex(self
, module
: str,
1867 default
: OptionValue
= None) -> OptionValue
:
1869 Retrieve the value of a persistent configuration setting
1870 for the specified module.
1872 :param module: The name of the module, e.g. 'dashboard'
1874 :param key: The configuration key, e.g. 'server_addr'.
1875 :param default: The default value to use when the
1876 returned value is ``None``. Defaults to ``None``.
1878 if module
== self
.module_name
:
1879 self
._validate
_module
_option
(key
)
1880 r
= self
._ceph
_get
_module
_option
(module
, key
)
1881 return default
if r
is None else r
1884 def get_store_prefix(self
, key_prefix
: str) -> Dict
[str, str]:
1886 Retrieve a dict of KV store keys to values, where the keys
1887 have the given prefix
1889 :param str key_prefix:
1892 return self
._ceph
_get
_store
_prefix
(key_prefix
)
1894 def _set_localized(self
,
1897 setter
: Callable
[[str, Optional
[str]], None]) -> None:
1898 return setter(_get_localized_key(self
.get_mgr_id(), key
), val
)
1900 def get_localized_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
1902 Retrieve localized configuration for this ceph-mgr instance
1904 self
._validate
_module
_option
(key
)
1905 return self
._get
_module
_option
(key
, default
, self
.get_mgr_id())
1907 def _set_module_option(self
, key
: str, val
: Any
) -> None:
1908 return self
._ceph
_set
_module
_option
(self
.module_name
, key
,
1909 None if val
is None else str(val
))
1911 def set_module_option(self
, key
: str, val
: Any
) -> None:
1913 Set the value of a persistent configuration setting
1916 :type val: str | None
1917 :raises ValueError: if `val` cannot be parsed or it is out of the specified range
1919 self
._validate
_module
_option
(key
)
1920 return self
._set
_module
_option
(key
, val
)
1922 def set_module_option_ex(self
, module
: str, key
: str, val
: OptionValue
) -> None:
1924 Set the value of a persistent configuration setting
1925 for the specified module.
1931 if module
== self
.module_name
:
1932 self
._validate
_module
_option
(key
)
1933 return self
._ceph
_set
_module
_option
(module
, key
, str(val
))
1937 def set_localized_module_option(self
, key
: str, val
: Optional
[str]) -> None:
1939 Set localized configuration for this ceph-mgr instance
1944 self
._validate
_module
_option
(key
)
1945 return self
._set
_localized
(key
, val
, self
._set
_module
_option
)
1949 def set_store(self
, key
: str, val
: Optional
[str]) -> None:
1951 Set a value in this module's persistent key value store.
1952 If val is None, remove key from store
1954 self
._ceph
_set
_store
(key
, val
)
1957 def get_store(self
, key
: str, default
: Optional
[str] = None) -> Optional
[str]:
1959 Get a value from this module's persistent key value store
1961 r
= self
._ceph
_get
_store
(key
)
1968 def get_localized_store(self
, key
: str, default
: Optional
[str] = None) -> Optional
[str]:
1969 r
= self
._ceph
_get
_store
(_get_localized_key(self
.get_mgr_id(), key
))
1971 r
= self
._ceph
_get
_store
(key
)
1978 def set_localized_store(self
, key
: str, val
: Optional
[str]) -> None:
1979 return self
._set
_localized
(key
, val
, self
.set_store
)
1981 def self_test(self
) -> Optional
[str]:
1983 Run a self-test on the module. Override this function and implement
1984 a best as possible self-test for (automated) testing of the module
1986 Indicate any failures by raising an exception. This does not have
1987 to be pretty, it's mainly for picking up regressions during
1988 development, rather than use in the field.
1990 :return: None, or an advisory string for developer interest, such
1991 as a json dump of some state.
1995 def get_osdmap(self
) -> OSDMap
:
1997 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
2001 return cast(OSDMap
, self
._ceph
_get
_osdmap
())
2004 def get_latest(self
, daemon_type
: str, daemon_name
: str, counter
: str) -> int:
2005 data
= self
.get_latest_counter(
2006 daemon_type
, daemon_name
, counter
)[counter
]
2013 def get_latest_avg(self
, daemon_type
: str, daemon_name
: str, counter
: str) -> Tuple
[int, int]:
2014 data
= self
.get_latest_counter(
2015 daemon_type
, daemon_name
, counter
)[counter
]
2017 # https://github.com/python/mypy/issues/1178
2018 _
, value
, count
= cast(Tuple
[float, int, int], data
)
2025 def get_all_perf_counters(self
, prio_limit
: int = PRIO_USEFUL
,
2026 services
: Sequence
[str] = ("mds", "mon", "osd",
2027 "rbd-mirror", "rgw",
2028 "tcmu-runner")) -> Dict
[str, dict]:
2030 Return the perf counters currently known to this ceph-mgr
2031 instance, filtered by priority equal to or greater than `prio_limit`.
2033 The result is a map of string to dict, associating services
2034 (like "osd.123") with their counters. The counter
2035 dict for each service maps counter paths to a counter
2036 info structure, which is the information from
2037 the schema, plus an additional "value" member with the latest
2041 result
= defaultdict(dict) # type: Dict[str, dict]
2043 for server
in self
.list_servers():
2044 for service
in cast(List
[ServiceInfoT
], server
['services']):
2045 if service
['type'] not in services
:
2048 schemas
= self
.get_perf_schema(service
['type'], service
['id'])
2050 self
.log
.warning("No perf counter schema for {0}.{1}".format(
2051 service
['type'], service
['id']
2055 # Value is returned in a potentially-multi-service format,
2056 # get just the service we're asking about
2057 svc_full_name
= "{0}.{1}".format(
2058 service
['type'], service
['id'])
2059 schema
= schemas
[svc_full_name
]
2061 # Populate latest values
2062 for counter_path
, counter_schema
in schema
.items():
2063 # self.log.debug("{0}: {1}".format(
2064 # counter_path, json.dumps(counter_schema)
2066 priority
= counter_schema
['priority']
2067 assert isinstance(priority
, int)
2068 if priority
< prio_limit
:
2071 tp
= counter_schema
['type']
2072 assert isinstance(tp
, int)
2073 counter_info
= dict(counter_schema
)
2074 # Also populate count for the long running avgs
2075 if tp
& self
.PERFCOUNTER_LONGRUNAVG
:
2076 v
, c
= self
.get_latest_avg(
2081 counter_info
['value'], counter_info
['count'] = v
, c
2082 result
[svc_full_name
][counter_path
] = counter_info
2084 counter_info
['value'] = self
.get_latest(
2090 result
[svc_full_name
][counter_path
] = counter_info
2092 self
.log
.debug("returning {0} counter".format(len(result
)))
2097 def set_uri(self
, uri
: str) -> None:
2099 If the module exposes a service, then call this to publish the
2100 address once it is available.
2104 return self
._ceph
_set
_uri
(uri
)
2108 def set_device_wear_level(self
, devid
: str, wear_level
: float) -> None:
2109 return self
._ceph
_set
_device
_wear
_level
(devid
, wear_level
)
2112 def have_mon_connection(self
) -> bool:
2114 Check whether this ceph-mgr daemon has an open connection
2115 to a monitor. If it doesn't, then it's likely that the
2116 information we have about the cluster is out of date,
2117 and/or the monitor cluster is down.
2120 return self
._ceph
_have
_mon
_connection
()
2122 def update_progress_event(self
,
2126 add_to_ceph_s
: bool) -> None:
2127 return self
._ceph
_update
_progress
_event
(evid
, desc
, progress
, add_to_ceph_s
)
2131 def complete_progress_event(self
, evid
: str) -> None:
2132 return self
._ceph
_complete
_progress
_event
(evid
)
2136 def clear_all_progress_events(self
) -> None:
2137 return self
._ceph
_clear
_all
_progress
_events
()
2140 def rados(self
) -> rados
.Rados
:
2142 A librados instance to be shared by any classes within
2143 this mgr module that want one.
2148 ctx_capsule
= self
.get_context()
2149 self
._rados
= rados
.Rados(context
=ctx_capsule
)
2150 self
._rados
.connect()
2151 self
._ceph
_register
_client
(self
._rados
.get_addrs())
2155 def can_run() -> Tuple
[bool, str]:
2157 Implement this function to report whether the module's dependencies
2158 are met. For example, if the module needs to import a particular
2159 dependency to work, then use a try/except around the import at
2160 file scope, and then report here if the import failed.
2162 This will be called in a blocking way from the C++ code, so do not
2163 do any I/O that could block in this function.
2165 :return a 2-tuple consisting of a boolean and explanatory string
2171 def remote(self
, module_name
: str, method_name
: str, *args
: Any
, **kwargs
: Any
) -> Any
:
2173 Invoke a method on another module. All arguments, and the return
2174 value from the other module must be serializable.
2176 Limitation: Do not import any modules within the called method.
2177 Otherwise you will get an error in Python 2::
2179 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
2183 :param module_name: Name of other module. If module isn't loaded,
2184 an ImportError exception is raised.
2185 :param method_name: Method name. If it does not exist, a NameError
2186 exception is raised.
2187 :param args: Argument tuple
2188 :param kwargs: Keyword argument dict
2189 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
2190 :raises ImportError: No such module
2192 return self
._ceph
_dispatch
_remote
(module_name
, method_name
,
2195 def add_osd_perf_query(self
, query
: Dict
[str, Any
]) -> Optional
[int]:
2197 Register an OSD perf query. Argument is a
2198 dict of the query parameters, in this form:
2204 {'type': subkey_type, 'regex': regex_pattern},
2207 'performance_counter_descriptors': [
2208 list, of, descriptor, types
2210 'limit': {'order_by': performance_counter_type, 'max_count': n},
2214 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
2215 'pg_id', 'object_name', 'snap_id'
2216 Valid performance counter types:
2217 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
2218 'latency', 'write_latency', 'read_latency'
2220 :param object query: query
2221 :rtype: int (query id)
2223 return self
._ceph
_add
_osd
_perf
_query
(query
)
2227 def remove_osd_perf_query(self
, query_id
: int) -> None:
2229 Unregister an OSD perf query.
2231 :param int query_id: query ID
2233 return self
._ceph
_remove
_osd
_perf
_query
(query_id
)
2236 def get_osd_perf_counters(self
, query_id
: int) -> Optional
[Dict
[str, List
[PerfCounterT
]]]:
2238 Get stats collected for an OSD perf query.
2240 :param int query_id: query ID
2242 return self
._ceph
_get
_osd
_perf
_counters
(query_id
)
2244 def add_mds_perf_query(self
, query
: Dict
[str, Any
]) -> Optional
[int]:
2246 Register an MDS perf query. Argument is a
2247 dict of the query parameters, in this form:
2253 {'type': subkey_type, 'regex': regex_pattern},
2256 'performance_counter_descriptors': [
2257 list, of, descriptor, types
2261 NOTE: 'limit' and 'order_by' are not supported (yet).
2264 'mds_rank', 'client_id'
2265 Valid performance counter types:
2268 :param object query: query
2269 :rtype: int (query id)
2271 return self
._ceph
_add
_mds
_perf
_query
(query
)
2275 def remove_mds_perf_query(self
, query_id
: int) -> None:
2277 Unregister an MDS perf query.
2279 :param int query_id: query ID
2281 return self
._ceph
_remove
_mds
_perf
_query
(query_id
)
2285 def reregister_mds_perf_queries(self
) -> None:
2287 Re-register MDS perf queries.
2289 return self
._ceph
_reregister
_mds
_perf
_queries
()
2291 def get_mds_perf_counters(self
, query_id
: int) -> Optional
[Dict
[str, List
[PerfCounterT
]]]:
2293 Get stats collected for an MDS perf query.
2295 :param int query_id: query ID
2297 return self
._ceph
_get
_mds
_perf
_counters
(query_id
)
2299 def get_daemon_health_metrics(self
) -> Dict
[str, List
[Dict
[str, Any
]]]:
2301 Get the list of health metrics per daemon. This includes SLOW_OPS health metrics
2302 in MON and OSD daemons, and PENDING_CREATING_PGS health metrics for OSDs.
2304 return self
._ceph
_get
_daemon
_health
_metrics
()
2306 def is_authorized(self
, arguments
: Dict
[str, str]) -> bool:
2308 Verifies that the current session caps permit executing the py service
2309 or current module with the provided arguments. This provides a generic
2310 way to allow modules to restrict by more fine-grained controls (e.g.
2313 :param arguments: dict of key/value arguments to test
2315 return self
._ceph
_is
_authorized
(arguments
)
2318 def send_rgwadmin_command(self
, args
: List
[str],
2319 stdout_as_json
: bool = True) -> Tuple
[int, Union
[str, dict], str]:
2323 '-c', str(self
.get_ceph_conf_path()),
2324 '-k', str(self
.get_ceph_option('keyring')),
2325 '-n', f
'mgr.{self.get_mgr_id()}',
2327 self
.log
.debug('Executing %s', str(cmd
))
2328 result
= subprocess
.run( # pylint: disable=subprocess-run-check
2330 stdout
=subprocess
.PIPE
,
2331 stderr
=subprocess
.PIPE
,
2334 stdout
= result
.stdout
.decode('utf-8')
2335 stderr
= result
.stderr
.decode('utf-8')
2336 if stdout
and stdout_as_json
:
2337 stdout
= json
.loads(stdout
)
2338 if result
.returncode
:
2339 self
.log
.debug('Error %s executing %s: %s', result
.returncode
, str(cmd
), stderr
)
2340 return result
.returncode
, stdout
, stderr
2341 except subprocess
.CalledProcessError
as ex
:
2342 self
.log
.exception('Error executing radosgw-admin %s: %s', str(ex
.cmd
), str(ex
.output
))
2344 except subprocess
.TimeoutExpired
as ex
:
2345 self
.log
.error('Timeout (10s) executing radosgw-admin %s', str(ex
.cmd
))