1 import ceph_module
# noqa
3 from typing
import cast
, Tuple
, Any
, Dict
, Generic
, Optional
, Callable
, List
, \
4 Mapping
, NamedTuple
, Sequence
, Union
, TYPE_CHECKING
7 if sys
.version_info
>= (3, 8):
8 from typing
import Literal
10 from typing_extensions
import Literal
19 from collections
import defaultdict
20 from enum
import IntEnum
, Enum
27 from ceph_argparse
import CephArgtype
28 from mgr_util
import profile_method
30 if sys
.version_info
>= (3, 8):
31 from typing
import get_args
, get_origin
33 def get_args(tp
: Any
) -> Any
:
37 return getattr(tp
, '__args__', ())
39 def get_origin(tp
: Any
) -> Any
:
40 return getattr(tp
, '__origin__', None)
43 ERROR_MSG_EMPTY_INPUT_FILE
= 'Empty input file'
44 ERROR_MSG_NO_INPUT_FILE
= 'Input file not specified'
45 # Full list of strings in "osd_types.cc:pg_state_string()"
83 NFS_GANESHA_SUPPORTED_FSALS
= ['CEPH', 'RGW']
84 NFS_POOL_NAME
= '.nfs'
87 class NotifyType(str, Enum
):
89 pg_summary
= 'pg_summary'
96 # these are disabled because there are no users.
98 # service_map = 'service_map'
99 # mon_status = 'mon_status'
100 # see DaemonServer.cc:
101 # perf_schema_update = 'perf_schema_update'
104 class CommandResult(object):
106 Use with MgrModule.send_command
109 def __init__(self
, tag
: Optional
[str] = None):
110 self
.ev
= threading
.Event()
115 # This is just a convenience for notifications from
116 # C++ land, to avoid passing addresses around in messages.
117 self
.tag
= tag
if tag
else ""
119 def complete(self
, r
: int, outb
: str, outs
: str) -> None:
125 def wait(self
) -> Tuple
[int, str, str]:
127 return self
.r
, self
.outb
, self
.outs
130 class HandleCommandResult(NamedTuple
):
132 Tuple containing the result of `handle_command()`
134 Only write to stderr if there is an error, or in extraordinary circumstances
136 Avoid having `ceph foo bar` commands say "did foo bar" on success unless there
137 is critical information to include there.
139 Everything programmatically consumable should be put on stdout
141 retval
: int = 0 # return code. E.g. 0 or -errno.EINVAL
142 stdout
: str = "" # data of this result.
143 stderr
: str = "" # Typically used for error messages.
146 class MonCommandFailed(RuntimeError): pass
147 class MgrDBNotReady(RuntimeError): pass
150 class OSDMap(ceph_module
.BasePyOSDMap
):
151 def get_epoch(self
) -> int:
152 return self
._get
_epoch
()
154 def get_crush_version(self
) -> int:
155 return self
._get
_crush
_version
()
157 def dump(self
) -> Dict
[str, Any
]:
160 def get_pools(self
) -> Dict
[int, Dict
[str, Any
]]:
161 # FIXME: efficient implementation
163 return dict([(p
['pool'], p
) for p
in d
['pools']])
165 def get_pools_by_name(self
) -> Dict
[str, Dict
[str, Any
]]:
166 # FIXME: efficient implementation
168 return dict([(p
['pool_name'], p
) for p
in d
['pools']])
170 def new_incremental(self
) -> 'OSDMapIncremental':
171 return self
._new
_incremental
()
173 def apply_incremental(self
, inc
: 'OSDMapIncremental') -> 'OSDMap':
174 return self
._apply
_incremental
(inc
)
176 def get_crush(self
) -> 'CRUSHMap':
177 return self
._get
_crush
()
179 def get_pools_by_take(self
, take
: int) -> List
[int]:
180 return self
._get
_pools
_by
_take
(take
).get('pools', [])
182 def calc_pg_upmaps(self
, inc
: 'OSDMapIncremental',
184 max_iterations
: int = 10,
185 pools
: Optional
[List
[str]] = None) -> int:
188 return self
._calc
_pg
_upmaps
(
190 max_deviation
, max_iterations
, pools
)
192 def map_pool_pgs_up(self
, poolid
: int) -> List
[int]:
193 return self
._map
_pool
_pgs
_up
(poolid
)
195 def pg_to_up_acting_osds(self
, pool_id
: int, ps
: int) -> Dict
[str, Any
]:
196 return self
._pg
_to
_up
_acting
_osds
(pool_id
, ps
)
198 def pool_raw_used_rate(self
, pool_id
: int) -> float:
199 return self
._pool
_raw
_used
_rate
(pool_id
)
202 def build_simple(cls
, epoch
: int = 1, uuid
: Optional
[str] = None, num_osd
: int = -1) -> 'ceph_module.BasePyOSDMap':
203 return cls
._build
_simple
(epoch
, uuid
, num_osd
)
205 def get_ec_profile(self
, name
: str) -> Optional
[List
[Dict
[str, str]]]:
206 # FIXME: efficient implementation
208 return d
['erasure_code_profiles'].get(name
, None)
210 def get_require_osd_release(self
) -> str:
212 return d
['require_osd_release']
215 class OSDMapIncremental(ceph_module
.BasePyOSDMapIncremental
):
216 def get_epoch(self
) -> int:
217 return self
._get
_epoch
()
219 def dump(self
) -> Dict
[str, Any
]:
222 def set_osd_reweights(self
, weightmap
: Dict
[int, float]) -> None:
224 weightmap is a dict, int to float. e.g. { 0: .9, 1: 1.0, 3: .997 }
226 return self
._set
_osd
_reweights
(weightmap
)
228 def set_crush_compat_weight_set_weights(self
, weightmap
: Dict
[str, float]) -> None:
230 weightmap is a dict, int to float. devices only. e.g.,
231 { 0: 3.4, 1: 3.3, 2: 3.334 }
233 return self
._set
_crush
_compat
_weight
_set
_weights
(weightmap
)
236 class CRUSHMap(ceph_module
.BasePyCRUSH
):
237 ITEM_NONE
= 0x7fffffff
238 DEFAULT_CHOOSE_ARGS
= '-1'
240 def dump(self
) -> Dict
[str, Any
]:
243 def get_item_weight(self
, item
: int) -> Optional
[int]:
244 return self
._get
_item
_weight
(item
)
246 def get_item_name(self
, item
: int) -> Optional
[str]:
247 return self
._get
_item
_name
(item
)
249 def find_takes(self
) -> List
[int]:
250 return self
._find
_takes
().get('takes', [])
252 def find_roots(self
) -> List
[int]:
253 return self
._find
_roots
().get('roots', [])
255 def get_take_weight_osd_map(self
, root
: int) -> Dict
[int, float]:
256 uglymap
= self
._get
_take
_weight
_osd
_map
(root
)
257 return {int(k
): v
for k
, v
in uglymap
.get('weights', {}).items()}
260 def have_default_choose_args(dump
: Dict
[str, Any
]) -> bool:
261 return CRUSHMap
.DEFAULT_CHOOSE_ARGS
in dump
.get('choose_args', {})
264 def get_default_choose_args(dump
: Dict
[str, Any
]) -> List
[Dict
[str, Any
]]:
265 choose_args
= dump
.get('choose_args')
266 assert isinstance(choose_args
, dict)
267 return choose_args
.get(CRUSHMap
.DEFAULT_CHOOSE_ARGS
, [])
269 def get_rule(self
, rule_name
: str) -> Optional
[Dict
[str, Any
]]:
270 # TODO efficient implementation
271 for rule
in self
.dump()['rules']:
272 if rule_name
== rule
['rule_name']:
277 def get_rule_by_id(self
, rule_id
: int) -> Optional
[Dict
[str, Any
]]:
278 for rule
in self
.dump()['rules']:
279 if rule
['rule_id'] == rule_id
:
284 def get_rule_root(self
, rule_name
: str) -> Optional
[int]:
285 rule
= self
.get_rule(rule_name
)
290 first_take
= next(s
for s
in rule
['steps'] if s
.get('op') == 'take')
291 except StopIteration:
292 logging
.warning("CRUSH rule '{0}' has no 'take' step".format(
296 return first_take
['item']
298 def get_osds_under(self
, root_id
: int) -> List
[int]:
299 # TODO don't abuse dump like this
301 buckets
= dict([(b
['id'], b
) for b
in d
['buckets']])
305 def accumulate(b
: Dict
[str, Any
]) -> None:
306 for item
in b
['items']:
308 osd_list
.append(item
['id'])
311 accumulate(buckets
[item
['id']])
315 accumulate(buckets
[root_id
])
319 def device_class_counts(self
) -> Dict
[str, int]:
320 result
= defaultdict(int) # type: Dict[str, int]
321 # TODO don't abuse dump like this
323 for device
in d
['devices']:
324 cls
= device
.get('class', None)
330 HandlerFuncType
= Callable
[..., Tuple
[int, str, str]]
332 def _extract_target_func(
334 ) -> Tuple
[HandlerFuncType
, Dict
[str, Any
]]:
335 """In order to interoperate with other decorated functions,
336 we need to find the original function which will provide
337 the main set of arguments. While we descend through the
338 stack of wrapped functions, gather additional arguments
339 the decorators may want to provide.
341 # use getattr to keep mypy happy
342 wrapped
= getattr(f
, "__wrapped__", None)
346 while wrapped
is not None:
347 extra_args
.update(getattr(f
, "extra_args", {}))
349 wrapped
= getattr(f
, "__wrapped__", None)
353 class CLICommand(object):
354 COMMANDS
= {} # type: Dict[str, CLICommand]
363 self
.func
= None # type: Optional[Callable]
364 self
.arg_spec
= {} # type: Dict[str, Any]
365 self
.first_default
= -1
367 KNOWN_ARGS
= '_', 'self', 'mgr', 'inbuf', 'return'
370 def _load_func_metadata(cls
: Any
, f
: HandlerFuncType
) -> Tuple
[str, Dict
[str, Any
], int, str]:
371 f
, extra_args
= _extract_target_func(f
)
372 desc
= (inspect
.getdoc(f
) or '').replace('\n', ' ')
373 full_argspec
= inspect
.getfullargspec(f
)
374 arg_spec
= full_argspec
.annotations
375 first_default
= len(arg_spec
)
376 if full_argspec
.defaults
:
377 first_default
-= len(full_argspec
.defaults
)
380 for index
, arg
in enumerate(full_argspec
.args
):
381 if arg
in cls
.KNOWN_ARGS
:
383 if arg
== '_end_positional_':
388 or arg_spec
[arg
] is Optional
[bool]
389 or arg_spec
[arg
] is bool
391 # implicit switch to non-positional on any
392 # Optional[bool] or the --format option
394 assert arg
in arg_spec
, \
395 f
"'{arg}' is not annotated for {f}: {full_argspec}"
396 has_default
= index
>= first_default
397 args
.append(CephArgtype
.to_argdesc(arg_spec
[arg
],
401 for argname
, argtype
in extra_args
.items():
402 # avoid shadowing args from the function
403 if argname
in arg_spec
:
405 arg_spec
[argname
] = argtype
406 args
.append(CephArgtype
.to_argdesc(
407 argtype
, dict(name
=arg
), has_default
=True, positional
=False
409 return desc
, arg_spec
, first_default
, ' '.join(args
)
411 def store_func_metadata(self
, f
: HandlerFuncType
) -> None:
412 self
.desc
, self
.arg_spec
, self
.first_default
, self
.args
= \
413 self
._load
_func
_metadata
(f
)
415 def __call__(self
, func
: HandlerFuncType
) -> HandlerFuncType
:
416 self
.store_func_metadata(func
)
418 self
.COMMANDS
[self
.prefix
] = self
421 def _get_arg_value(self
, kwargs_switch
: bool, key
: str, val
: Any
) -> Tuple
[bool, str, Any
]:
422 def start_kwargs() -> bool:
423 if isinstance(val
, str) and '=' in val
:
424 k
, v
= val
.split('=', 1)
425 if k
in self
.arg_spec
:
429 if not kwargs_switch
:
430 kwargs_switch
= start_kwargs()
433 k
, v
= val
.split('=', 1)
436 return kwargs_switch
, k
.replace('-', '_'), v
438 def _collect_args_by_argspec(self
, cmd_dict
: Dict
[str, Any
]) -> Dict
[str, Any
]:
440 kwargs_switch
= False
441 for index
, (name
, tp
) in enumerate(self
.arg_spec
.items()):
442 if name
in CLICommand
.KNOWN_ARGS
:
444 assert self
.first_default
>= 0
445 raw_v
= cmd_dict
.get(name
)
446 if index
>= self
.first_default
:
449 kwargs_switch
, k
, v
= self
._get
_arg
_value
(kwargs_switch
,
451 kwargs
[k
] = CephArgtype
.cast_to(tp
, v
)
456 cmd_dict
: Dict
[str, Any
],
457 inbuf
: Optional
[str] = None) -> HandleCommandResult
:
458 kwargs
= self
._collect
_args
_by
_argspec
(cmd_dict
)
460 kwargs
['inbuf'] = inbuf
462 return self
.func(mgr
, **kwargs
)
464 def dump_cmd(self
) -> Dict
[str, Union
[str, bool]]:
466 'cmd': '{} {}'.format(self
.prefix
, self
.args
),
473 def dump_cmd_list(cls
) -> List
[Dict
[str, Union
[str, bool]]]:
474 return [cmd
.dump_cmd() for cmd
in cls
.COMMANDS
.values()]
477 def CLIReadCommand(prefix
: str, poll
: bool = False) -> CLICommand
:
478 return CLICommand(prefix
, "r", poll
)
481 def CLIWriteCommand(prefix
: str, poll
: bool = False) -> CLICommand
:
482 return CLICommand(prefix
, "w", poll
)
485 def CLICheckNonemptyFileInput(desc
: str) -> Callable
[[HandlerFuncType
], HandlerFuncType
]:
486 def CheckFileInput(func
: HandlerFuncType
) -> HandlerFuncType
:
487 @functools.wraps(func
)
488 def check(*args
: Any
, **kwargs
: Any
) -> Tuple
[int, str, str]:
489 if 'inbuf' not in kwargs
:
490 return -errno
.EINVAL
, '', f
'{ERROR_MSG_NO_INPUT_FILE}: Please specify the file '\
491 f
'containing {desc} with "-i" option'
492 if isinstance(kwargs
['inbuf'], str):
493 # Delete new line separator at EOF (it may have been added by a text editor).
494 kwargs
['inbuf'] = kwargs
['inbuf'].rstrip('\r\n').rstrip('\n')
495 if not kwargs
['inbuf'] or not kwargs
['inbuf'].strip():
496 return -errno
.EINVAL
, '', f
'{ERROR_MSG_EMPTY_INPUT_FILE}: Please add {desc} to '\
498 return func(*args
, **kwargs
)
499 check
.__signature
__ = inspect
.signature(func
) # type: ignore[attr-defined]
501 return CheckFileInput
503 def CLIRequiresDB(func
: HandlerFuncType
) -> HandlerFuncType
:
504 @functools.wraps(func
)
505 def check(self
: MgrModule
, *args
: Any
, **kwargs
: Any
) -> Tuple
[int, str, str]:
506 if not self
.db_ready():
507 return -errno
.EAGAIN
, "", "mgr db not yet available"
508 return func(self
, *args
, **kwargs
)
509 check
.__signature
__ = inspect
.signature(func
) # type: ignore[attr-defined]
512 def _get_localized_key(prefix
: str, key
: str) -> str:
513 return '{}/{}'.format(prefix
, key
)
517 MODULE_OPTIONS types and Option Class
520 OptionTypeLabel
= Literal
[
521 'uint', 'int', 'str', 'float', 'bool', 'addr', 'addrvec', 'uuid', 'size', 'secs']
524 # common/options.h: value_t
525 OptionValue
= Optional
[Union
[bool, int, float, str]]
530 Helper class to declare options for MODULE_OPTIONS list.
531 TODO: Replace with typing.TypedDict when in python_version >= 3.8
537 default
: OptionValue
= None,
538 type: 'OptionTypeLabel' = 'str',
539 desc
: Optional
[str] = None,
540 long_desc
: Optional
[str] = None,
541 min: OptionValue
= None,
542 max: OptionValue
= None,
543 enum_allowed
: Optional
[List
[str]] = None,
544 tags
: Optional
[List
[str]] = None,
545 see_also
: Optional
[List
[str]] = None,
546 runtime
: bool = False,
548 super(Option
, self
).__init
__(
549 (k
, v
) for k
, v
in vars().items()
550 if k
!= 'self' and v
is not None)
555 Helper class to declare options for COMMANDS list.
557 It also allows to specify prefix and args separately, as well as storing a
561 >>> def handler(): return 0, "", ""
562 >>> Command(prefix="example",
565 {'perm': 'w', 'poll': False}
571 handler
: HandlerFuncType
,
575 super().__init
__(perm
=perm
,
578 self
.handler
= handler
581 def returns_command_result(instance
: Any
,
582 f
: HandlerFuncType
) -> Callable
[..., HandleCommandResult
]:
584 def wrapper(mgr
: Any
, *args
: Any
, **kwargs
: Any
) -> HandleCommandResult
:
585 retval
, stdout
, stderr
= f(instance
or mgr
, *args
, **kwargs
)
586 return HandleCommandResult(retval
, stdout
, stderr
)
587 wrapper
.__signature
__ = inspect
.signature(f
) # type: ignore[attr-defined]
590 def register(self
, instance
: bool = False) -> HandlerFuncType
:
592 Register a CLICommand handler. It allows an instance to register bound
593 methods. In that case, the mgr instance is not passed, and it's expected
594 to be available in the class instance.
595 It also uses HandleCommandResult helper to return a wrapped a tuple of 3
598 cmd
= CLICommand(prefix
=self
.prefix
, perm
=self
['perm'])
599 return cmd(self
.returns_command_result(instance
, self
.handler
))
602 class CPlusPlusHandler(logging
.Handler
):
603 def __init__(self
, module_inst
: Any
):
604 super(CPlusPlusHandler
, self
).__init
__()
605 self
._module
= module_inst
606 self
.setFormatter(logging
.Formatter("[{} %(levelname)-4s %(name)s] %(message)s"
607 .format(module_inst
.module_name
)))
609 def emit(self
, record
: logging
.LogRecord
) -> None:
610 if record
.levelno
>= self
.level
:
611 self
._module
._ceph
_log
(self
.format(record
))
614 class ClusterLogHandler(logging
.Handler
):
615 def __init__(self
, module_inst
: Any
):
617 self
._module
= module_inst
618 self
.setFormatter(logging
.Formatter("%(message)s"))
620 def emit(self
, record
: logging
.LogRecord
) -> None:
622 logging
.DEBUG
: MgrModule
.ClusterLogPrio
.DEBUG
,
623 logging
.INFO
: MgrModule
.ClusterLogPrio
.INFO
,
624 logging
.WARNING
: MgrModule
.ClusterLogPrio
.WARN
,
625 logging
.ERROR
: MgrModule
.ClusterLogPrio
.ERROR
,
626 logging
.CRITICAL
: MgrModule
.ClusterLogPrio
.ERROR
,
628 level
= levelmap
[record
.levelno
]
629 if record
.levelno
>= self
.level
:
630 self
._module
.cluster_log(self
._module
.module_name
,
635 class FileHandler(logging
.FileHandler
):
636 def __init__(self
, module_inst
: Any
):
637 path
= module_inst
.get_ceph_option("log_file")
638 idx
= path
.rfind(".log")
640 self
.path
= "{}.{}.log".format(path
[:idx
], module_inst
.module_name
)
642 self
.path
= "{}.{}".format(path
, module_inst
.module_name
)
643 super(FileHandler
, self
).__init
__(self
.path
, delay
=True)
644 self
.setFormatter(logging
.Formatter("%(asctime)s [%(threadName)s] [%(levelname)-4s] [%(name)s] %(message)s"))
647 class MgrModuleLoggingMixin(object):
648 def _configure_logging(self
,
653 log_to_cluster
: bool) -> None:
654 self
._mgr
_level
: Optional
[str] = None
655 self
._module
_level
: Optional
[str] = None
656 self
._root
_logger
= logging
.getLogger()
658 self
._unconfigure
_logging
()
660 # the ceph log handler is initialized only once
661 self
._mgr
_log
_handler
= CPlusPlusHandler(self
)
662 self
._cluster
_log
_handler
= ClusterLogHandler(self
)
663 self
._file
_log
_handler
= FileHandler(self
)
665 self
.log_to_file
= log_to_file
666 self
.log_to_cluster
= log_to_cluster
668 self
._root
_logger
.addHandler(self
._mgr
_log
_handler
)
670 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
672 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
674 self
._root
_logger
.setLevel(logging
.NOTSET
)
675 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
677 def _unconfigure_logging(self
) -> None:
678 # remove existing handlers:
680 h
for h
in self
._root
_logger
.handlers
681 if (isinstance(h
, CPlusPlusHandler
) or
682 isinstance(h
, FileHandler
) or
683 isinstance(h
, ClusterLogHandler
))]
684 for h
in rm_handlers
:
685 self
._root
_logger
.removeHandler(h
)
686 self
.log_to_file
= False
687 self
.log_to_cluster
= False
689 def _set_log_level(self
,
692 cluster_level
: str) -> None:
693 self
._cluster
_log
_handler
.setLevel(cluster_level
.upper())
695 module_level
= module_level
.upper() if module_level
else ''
696 if not self
._module
_level
:
697 # using debug_mgr level
698 if not module_level
and self
._mgr
_level
== mgr_level
:
699 # no change in module level neither in debug_mgr
702 if self
._module
_level
== module_level
:
703 # no change in module level
706 if not self
._module
_level
and not module_level
:
707 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
708 self
.getLogger().debug("setting log level based on debug_mgr: %s (%s)",
710 elif self
._module
_level
and not module_level
:
711 level
= self
._ceph
_log
_level
_to
_python
(mgr_level
)
712 self
.getLogger().warning("unsetting module log level, falling back to "
713 "debug_mgr level: %s (%s)", level
, mgr_level
)
716 self
.getLogger().debug("setting log level: %s", level
)
718 self
._module
_level
= module_level
719 self
._mgr
_level
= mgr_level
721 self
._mgr
_log
_handler
.setLevel(level
)
722 self
._file
_log
_handler
.setLevel(level
)
724 def _enable_file_log(self
) -> None:
726 self
.getLogger().warning("enabling logging to file")
727 self
.log_to_file
= True
728 self
._root
_logger
.addHandler(self
._file
_log
_handler
)
730 def _disable_file_log(self
) -> None:
732 self
.getLogger().warning("disabling logging to file")
733 self
.log_to_file
= False
734 self
._root
_logger
.removeHandler(self
._file
_log
_handler
)
736 def _enable_cluster_log(self
) -> None:
738 self
.getLogger().warning("enabling logging to cluster")
739 self
.log_to_cluster
= True
740 self
._root
_logger
.addHandler(self
._cluster
_log
_handler
)
742 def _disable_cluster_log(self
) -> None:
743 # disable cluster log
744 self
.getLogger().warning("disabling logging to cluster")
745 self
.log_to_cluster
= False
746 self
._root
_logger
.removeHandler(self
._cluster
_log
_handler
)
748 def _ceph_log_level_to_python(self
, log_level
: str) -> str:
751 ceph_log_level
= int(log_level
.split("/", 1)[0])
758 if ceph_log_level
<= 0:
759 log_level
= "CRITICAL"
760 elif ceph_log_level
<= 1:
761 log_level
= "WARNING"
762 elif ceph_log_level
<= 4:
766 def getLogger(self
, name
: Optional
[str] = None) -> logging
.Logger
:
767 return logging
.getLogger(name
)
770 class MgrStandbyModule(ceph_module
.BaseMgrStandbyModule
, MgrModuleLoggingMixin
):
772 Standby modules only implement a serve and shutdown method, they
773 are not permitted to implement commands and they do not receive
776 They only have access to the mgrmap (for accessing service URI info
777 from their active peer), and to configuration settings (read only).
780 MODULE_OPTIONS
: List
[Option
] = []
781 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
783 def __init__(self
, module_name
: str, capsule
: Any
):
784 super(MgrStandbyModule
, self
).__init
__(capsule
)
785 self
.module_name
= module_name
787 # see also MgrModule.__init__()
788 for o
in self
.MODULE_OPTIONS
:
791 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
793 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
795 # mock does not return a str
796 mgr_level
= cast(str, self
.get_ceph_option("debug_mgr"))
797 log_level
= cast(str, self
.get_module_option("log_level"))
798 cluster_level
= cast(str, self
.get_module_option('log_to_cluster_level'))
799 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
802 # for backwards compatibility
803 self
._logger
= self
.getLogger()
805 def __del__(self
) -> None:
807 self
._unconfigure
_logging
()
809 def _cleanup(self
) -> None:
813 def _register_options(cls
, module_name
: str) -> None:
814 cls
.MODULE_OPTIONS
.append(
815 Option(name
='log_level', type='str', default
="", runtime
=True,
816 enum_allowed
=['info', 'debug', 'critical', 'error',
818 cls
.MODULE_OPTIONS
.append(
819 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
820 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
821 cls
.MODULE_OPTIONS
.append(
822 Option(name
='log_to_cluster', type='bool', default
=False,
824 cls
.MODULE_OPTIONS
.append(
825 Option(name
='log_to_cluster_level', type='str', default
='info',
827 enum_allowed
=['info', 'debug', 'critical', 'error',
831 def log(self
) -> logging
.Logger
:
834 def serve(self
) -> None:
836 The serve method is mandatory for standby modules.
839 raise NotImplementedError()
841 def get_mgr_id(self
) -> str:
842 return self
._ceph
_get
_mgr
_id
()
844 def get_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
846 Retrieve the value of a persistent configuration setting
848 :param default: the default value of the config if it is not found
850 r
= self
._ceph
_get
_module
_option
(key
)
852 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
856 def get_ceph_option(self
, key
: str) -> OptionValue
:
857 return self
._ceph
_get
_option
(key
)
859 def get_store(self
, key
: str) -> Optional
[str]:
861 Retrieve the value of a persistent KV store entry
864 :return: Byte string or None
866 return self
._ceph
_get
_store
(key
)
868 def get_localized_store(self
, key
: str, default
: Optional
[str] = None) -> Optional
[str]:
869 r
= self
._ceph
_get
_store
(_get_localized_key(self
.get_mgr_id(), key
))
871 r
= self
._ceph
_get
_store
(key
)
876 def get_active_uri(self
) -> str:
877 return self
._ceph
_get
_active
_uri
()
879 def get(self
, data_name
: str) -> Dict
[str, Any
]:
880 return self
._ceph
_get
(data_name
)
882 def get_mgr_ip(self
) -> str:
883 ips
= self
.get("mgr_ips").get('ips', [])
885 return socket
.gethostname()
888 def get_localized_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
889 r
= self
._ceph
_get
_module
_option
(key
, self
.get_mgr_id())
891 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
896 HealthChecksT
= Mapping
[str, Mapping
[str, Union
[int, str, Sequence
[str]]]]
897 # {"type": service_type, "id": service_id}
898 ServiceInfoT
= Dict
[str, str]
899 # {"hostname": hostname,
900 # "ceph_version": version,
901 # "services": [service_info, ..]}
902 ServerInfoT
= Dict
[str, Union
[str, List
[ServiceInfoT
]]]
903 PerfCounterT
= Dict
[str, Any
]
907 def DecoratorFactory(attr
: str, default
: Any
): # type: ignore
908 class DecoratorClass
:
909 _ATTR_TOKEN
= f
'__ATTR_{attr.upper()}__'
911 def __init__(self
, value
: Any
=default
) -> None:
914 def __call__(self
, func
: Callable
) -> Any
:
915 setattr(func
, self
._ATTR
_TOKEN
, self
.value
)
919 def get(cls
, func
: Callable
) -> Any
:
920 return getattr(func
, cls
._ATTR
_TOKEN
, default
)
922 return DecoratorClass
924 perm
= DecoratorFactory('perm', default
='r')
925 expose
= DecoratorFactory('expose', default
=False)(True)
928 class MgrModule(ceph_module
.BaseMgrModule
, MgrModuleLoggingMixin
):
929 MGR_POOL_NAME
= ".mgr"
931 COMMANDS
= [] # type: List[Any]
932 MODULE_OPTIONS
: List
[Option
] = []
933 MODULE_OPTION_DEFAULTS
= {} # type: Dict[str, Any]
936 SCHEMA
= None # type: Optional[str]
937 SCHEMA_VERSIONED
= None # type: Optional[List[str]]
939 # Priority definitions for perf counters
943 PRIO_UNINTERESTING
= 2
946 # counter value types
951 PERFCOUNTER_LONGRUNAVG
= 4
952 PERFCOUNTER_COUNTER
= 8
953 PERFCOUNTER_HISTOGRAM
= 0x10
954 PERFCOUNTER_TYPE_MASK
= ~
3
960 # Cluster log priorities
961 class ClusterLogPrio(IntEnum
):
968 def __init__(self
, module_name
: str, py_modules_ptr
: object, this_ptr
: object):
969 self
.module_name
= module_name
970 super(MgrModule
, self
).__init
__(py_modules_ptr
, this_ptr
)
972 for o
in self
.MODULE_OPTIONS
:
975 # we'll assume the declared type matches the
976 # supplied default value's type.
977 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = o
['default']
979 # module not declaring it's type, so normalize the
980 # default value to be a string for consistent behavior
981 # with default and user-supplied option values.
982 self
.MODULE_OPTION_DEFAULTS
[o
['name']] = str(o
['default'])
984 mgr_level
= cast(str, self
.get_ceph_option("debug_mgr"))
985 log_level
= cast(str, self
.get_module_option("log_level"))
986 cluster_level
= cast(str, self
.get_module_option('log_to_cluster_level'))
987 log_to_file
= self
.get_module_option("log_to_file")
988 assert isinstance(log_to_file
, bool)
989 log_to_cluster
= self
.get_module_option("log_to_cluster")
990 assert isinstance(log_to_cluster
, bool)
991 self
._configure
_logging
(mgr_level
, log_level
, cluster_level
,
992 log_to_file
, log_to_cluster
)
994 # for backwards compatibility
995 self
._logger
= self
.getLogger()
997 self
._db
= None # type: Optional[sqlite3.Connection]
999 self
._version
= self
._ceph
_get
_version
()
1001 self
._perf
_schema
_cache
= None
1003 # Keep a librados instance for those that need it.
1004 self
._rados
: Optional
[rados
.Rados
] = None
1006 # this does not change over the lifetime of an active mgr
1007 self
._mgr
_ips
: Optional
[str] = None
1009 self
._db
_lock
= threading
.Lock()
1011 def __del__(self
) -> None:
1012 self
._unconfigure
_logging
()
1015 def _register_options(cls
, module_name
: str) -> None:
1016 cls
.MODULE_OPTIONS
.append(
1017 Option(name
='log_level', type='str', default
="", runtime
=True,
1018 enum_allowed
=['info', 'debug', 'critical', 'error',
1020 cls
.MODULE_OPTIONS
.append(
1021 Option(name
='log_to_file', type='bool', default
=False, runtime
=True))
1022 if not [x
for x
in cls
.MODULE_OPTIONS
if x
['name'] == 'log_to_cluster']:
1023 cls
.MODULE_OPTIONS
.append(
1024 Option(name
='log_to_cluster', type='bool', default
=False,
1026 cls
.MODULE_OPTIONS
.append(
1027 Option(name
='log_to_cluster_level', type='str', default
='info',
1029 enum_allowed
=['info', 'debug', 'critical', 'error',
1033 def _register_commands(cls
, module_name
: str) -> None:
1034 cls
.COMMANDS
.extend(CLICommand
.dump_cmd_list())
1037 def log(self
) -> logging
.Logger
:
1040 def cluster_log(self
, channel
: str, priority
: ClusterLogPrio
, message
: str) -> None:
1042 :param channel: The log channel. This can be 'cluster', 'audit', ...
1043 :param priority: The log message priority.
1044 :param message: The message to log.
1046 self
._ceph
_cluster
_log
(channel
, priority
.value
, message
)
1049 def version(self
) -> str:
1050 return self
._version
1053 def pool_exists(self
, name
: str) -> bool:
1054 pools
= [p
['pool_name'] for p
in self
.get('osd_map')['pools']]
1055 return name
in pools
1058 def have_enough_osds(self
) -> bool:
1059 # wait until we have enough OSDs to allow the pool to be healthy
1061 for osd
in self
.get("osd_map")["osds"]:
1062 if osd
["up"] and osd
["in"]:
1065 need
= cast(int, self
.get_ceph_option("osd_pool_default_size"))
1066 return ready
>= need
1070 def rename_pool(self
, srcpool
: str, destpool
: str) -> None:
1072 'prefix': 'osd pool rename',
1075 'destpool': destpool
,
1077 self
.check_mon_command(c
)
1081 def create_pool(self
, pool
: str) -> None:
1083 'prefix': 'osd pool create',
1090 self
.check_mon_command(c
)
1094 def appify_pool(self
, pool
: str, app
: str) -> None:
1096 'prefix': 'osd pool application enable',
1100 'yes_i_really_mean_it': True
1102 self
.check_mon_command(c
)
1106 def create_mgr_pool(self
) -> None:
1107 self
.log
.info("creating mgr pool")
1109 ov
= self
.get_module_option_ex('devicehealth', 'pool_name', 'device_health_metrics')
1110 devhealth
= cast(str, ov
)
1111 if devhealth
is not None and self
.pool_exists(devhealth
):
1112 self
.log
.debug("reusing devicehealth pool")
1113 self
.rename_pool(devhealth
, self
.MGR_POOL_NAME
)
1114 self
.appify_pool(self
.MGR_POOL_NAME
, 'mgr')
1116 self
.log
.debug("creating new mgr pool")
1117 self
.create_pool(self
.MGR_POOL_NAME
)
1118 self
.appify_pool(self
.MGR_POOL_NAME
, 'mgr')
1120 def create_skeleton_schema(self
, db
: sqlite3
.Connection
) -> None:
1122 CREATE TABLE IF NOT EXISTS MgrModuleKV (
1123 key TEXT PRIMARY KEY,
1126 INSERT OR IGNORE INTO MgrModuleKV (key, value) VALUES ('__version', 0);
1129 db
.executescript(SQL
)
1131 def update_schema_version(self
, db
: sqlite3
.Connection
, version
: int) -> None:
1132 SQL
= "UPDATE OR ROLLBACK MgrModuleKV SET value = ? WHERE key = '__version';"
1134 db
.execute(SQL
, (version
,))
1136 def set_kv(self
, key
: str, value
: Any
) -> None:
1137 SQL
= "INSERT OR REPLACE INTO MgrModuleKV (key, value) VALUES (?, ?);"
1139 assert key
[:2] != "__"
1141 self
.log
.debug(f
"set_kv('{key}', '{value}')")
1143 with self
._db
_lock
, self
.db
:
1144 self
.db
.execute(SQL
, (key
, value
))
1147 def get_kv(self
, key
: str) -> Any
:
1148 SQL
= "SELECT value FROM MgrModuleKV WHERE key = ?;"
1150 assert key
[:2] != "__"
1152 self
.log
.debug(f
"get_kv('{key}')")
1154 with self
._db
_lock
, self
.db
:
1155 cur
= self
.db
.execute(SQL
, (key
,))
1156 row
= cur
.fetchone()
1161 self
.log
.debug(f
" = {v}")
1164 def maybe_upgrade(self
, db
: sqlite3
.Connection
, version
: int) -> None:
1166 self
.log
.info(f
"creating main.db for {self.module_name}")
1167 assert self
.SCHEMA
is not None
1168 cur
= db
.executescript(self
.SCHEMA
)
1169 self
.update_schema_version(db
, 1)
1171 assert self
.SCHEMA_VERSIONED
is not None
1172 latest
= len(self
.SCHEMA_VERSIONED
)
1173 if latest
< version
:
1174 raise RuntimeError(f
"main.db version is newer ({version}) than module ({latest})")
1175 for i
in range(version
, latest
):
1176 self
.log
.info(f
"upgrading main.db for {self.module_name} from {i-1}:{i}")
1177 SQL
= self
.SCHEMA_VERSIONED
[i
]
1178 db
.executescript(SQL
)
1179 if version
< latest
:
1180 self
.update_schema_version(db
, latest
)
1182 def load_schema(self
, db
: sqlite3
.Connection
) -> None:
1184 SELECT value FROM MgrModuleKV WHERE key = '__version';
1188 self
.create_skeleton_schema(db
)
1189 cur
= db
.execute(SQL
)
1190 row
= cur
.fetchone()
1191 self
.maybe_upgrade(db
, int(row
['value']))
1192 assert cur
.fetchone() is None
1195 def configure_db(self
, db
: sqlite3
.Connection
) -> None:
1196 db
.execute('PRAGMA FOREIGN_KEYS = 1')
1197 db
.execute('PRAGMA JOURNAL_MODE = PERSIST')
1198 db
.execute('PRAGMA PAGE_SIZE = 65536')
1199 db
.execute('PRAGMA CACHE_SIZE = 64')
1200 db
.row_factory
= sqlite3
.Row
1201 self
.load_schema(db
)
1203 def open_db(self
) -> Optional
[sqlite3
.Connection
]:
1204 if not self
.pool_exists(self
.MGR_POOL_NAME
):
1205 if not self
.have_enough_osds():
1207 self
.create_mgr_pool()
1208 uri
= f
"file:///{self.MGR_POOL_NAME}:{self.module_name}/main.db?vfs=ceph";
1209 self
.log
.debug(f
"using uri {uri}")
1210 db
= sqlite3
.connect(uri
, check_same_thread
=False, uri
=True)
1211 self
.configure_db(db
)
1215 def db_ready(self
) -> bool:
1218 return self
.db
is not None
1219 except MgrDBNotReady
:
1223 def db(self
) -> sqlite3
.Connection
:
1224 assert self
._db
_lock
.locked()
1225 if self
._db
is not None:
1227 db_allowed
= self
.get_ceph_option("mgr_pool")
1229 raise MgrDBNotReady();
1230 self
._db
= self
.open_db()
1231 if self
._db
is None:
1232 raise MgrDBNotReady();
1236 def release_name(self
) -> str:
1238 Get the release name of the Ceph version, e.g. 'nautilus' or 'octopus'.
1239 :return: Returns the release name of the Ceph version in lower case.
1242 return self
._ceph
_get
_release
_name
()
1245 def lookup_release_name(self
, major
: int) -> str:
1246 return self
._ceph
_lookup
_release
_name
(major
)
1248 def get_context(self
) -> object:
1250 :return: a Python capsule containing a C++ CephContext pointer
1252 return self
._ceph
_get
_context
()
1254 def notify(self
, notify_type
: NotifyType
, notify_id
: str) -> None:
1256 Called by the ceph-mgr service to notify the Python plugin
1257 that new state is available. This method is *only* called for
1258 notify_types that are listed in the NOTIFY_TYPES string list
1259 member of the module class.
1261 :param notify_type: string indicating what kind of notification,
1262 such as osd_map, mon_map, fs_map, mon_status,
1263 health, pg_summary, command, service_map
1264 :param notify_id: string (may be empty) that optionally specifies
1265 which entity is being notified about. With
1266 "command" notifications this is set to the tag
1267 ``from send_command``.
1271 def _config_notify(self
) -> None:
1272 # check logging options for changes
1273 mgr_level
= cast(str, self
.get_ceph_option("debug_mgr"))
1274 module_level
= cast(str, self
.get_module_option("log_level"))
1275 cluster_level
= cast(str, self
.get_module_option("log_to_cluster_level"))
1276 assert isinstance(cluster_level
, str)
1277 log_to_file
= self
.get_module_option("log_to_file", False)
1278 assert isinstance(log_to_file
, bool)
1279 log_to_cluster
= self
.get_module_option("log_to_cluster", False)
1280 assert isinstance(log_to_cluster
, bool)
1281 self
._set
_log
_level
(mgr_level
, module_level
, cluster_level
)
1283 if log_to_file
!= self
.log_to_file
:
1285 self
._enable
_file
_log
()
1287 self
._disable
_file
_log
()
1288 if log_to_cluster
!= self
.log_to_cluster
:
1290 self
._enable
_cluster
_log
()
1292 self
._disable
_cluster
_log
()
1294 # call module subclass implementations
1295 self
.config_notify()
1297 def config_notify(self
) -> None:
1299 Called by the ceph-mgr service to notify the Python plugin
1300 that the configuration may have changed. Modules will want to
1301 refresh any configuration values stored in config variables.
1305 def serve(self
) -> None:
1307 Called by the ceph-mgr service to start any server that
1308 is provided by this Python plugin. The implementation
1309 of this function should block until ``shutdown`` is called.
1311 You *must* implement ``shutdown`` if you implement ``serve``
1315 def shutdown(self
) -> None:
1317 Called by the ceph-mgr service to request that this
1318 module drop out of its serve() function. You do not
1319 need to implement this if you do not implement serve()
1324 addrs
= self
._rados
.get_addrs()
1325 self
._rados
.shutdown()
1326 self
._ceph
_unregister
_client
(addrs
)
1329 def get(self
, data_name
: str) -> Any
:
1331 Called by the plugin to fetch named cluster-wide objects from ceph-mgr.
1333 :param str data_name: Valid things to fetch are osdmap_crush_map_text,
1334 osd_map, osd_map_tree, osd_map_crush, config, mon_map, fs_map,
1335 osd_metadata, pg_summary, io_rate, pg_dump, df, osd_stats,
1336 health, mon_status, devices, device <devid>, pg_stats,
1337 pool_stats, pg_ready, osd_ping_times, mgr_map, mgr_ips,
1338 modified_config_options, service_map, mds_metadata,
1339 have_local_config_map, osd_pool_stats, pg_status.
1342 All these structures have their own JSON representations: experiment
1343 or look at the C++ ``dump()`` methods to learn about them.
1345 obj
= self
._ceph
_get
(data_name
)
1346 if isinstance(obj
, bytes
):
1347 obj
= json
.loads(obj
)
1351 def _stattype_to_str(self
, stattype
: int) -> str:
1353 typeonly
= stattype
& self
.PERFCOUNTER_TYPE_MASK
1356 if typeonly
== self
.PERFCOUNTER_LONGRUNAVG
:
1357 # this lie matches the DaemonState decoding: only val, no counts
1359 if typeonly
== self
.PERFCOUNTER_COUNTER
:
1361 if typeonly
== self
.PERFCOUNTER_HISTOGRAM
:
1366 def _perfpath_to_path_labels(self
, daemon
: str,
1367 path
: str) -> Tuple
[str, Tuple
[str, ...], Tuple
[str, ...]]:
1368 if daemon
.startswith('rgw.'):
1369 label_name
= 'instance_id'
1370 daemon
= daemon
[len('rgw.'):]
1372 label_name
= 'ceph_daemon'
1374 label_names
= (label_name
,) # type: Tuple[str, ...]
1375 labels
= (daemon
,) # type: Tuple[str, ...]
1377 if daemon
.startswith('rbd-mirror.'):
1379 r
'^rbd_mirror_image_([^/]+)/(?:(?:([^/]+)/)?)(.*)\.(replay(?:_bytes|_latency)?)$',
1383 path
= 'rbd_mirror_image_' + match
.group(4)
1384 pool
= match
.group(1)
1385 namespace
= match
.group(2) or ''
1386 image
= match
.group(3)
1387 label_names
+= ('pool', 'namespace', 'image')
1388 labels
+= (pool
, namespace
, image
)
1390 return path
, label_names
, labels
,
1392 def _perfvalue_to_value(self
, stattype
: int, value
: Union
[int, float]) -> Union
[float, int]:
1393 if stattype
& self
.PERFCOUNTER_TIME
:
1394 # Convert from ns to seconds
1395 return value
/ 1000000000.0
1399 def _unit_to_str(self
, unit
: int) -> str:
1400 if unit
== self
.NONE
:
1402 elif unit
== self
.BYTES
:
1405 raise ValueError(f
'bad unit "{unit}"')
1408 def to_pretty_iec(n
: int) -> str:
1409 for bits
, suffix
in [(60, 'Ei'), (50, 'Pi'), (40, 'Ti'), (30, 'Gi'),
1410 (20, 'Mi'), (10, 'Ki')]:
1412 return str(n
>> bits
) + ' ' + suffix
1416 def get_pretty_row(elems
: Sequence
[str], width
: int) -> str:
1418 Takes an array of elements and returns a string with those elements
1419 formatted as a table row. Useful for polling modules.
1421 :param elems: the elements to be printed
1422 :param width: the width of the terminal
1425 column_width
= int(width
/ n
)
1429 ret
+= '{0:>{w}} |'.format(elem
, w
=column_width
- 2)
1433 def get_pretty_header(self
, elems
: Sequence
[str], width
: int) -> str:
1435 Like ``get_pretty_row`` but adds dashes, to be used as a table title.
1437 :param elems: the elements to be printed
1438 :param width: the width of the terminal
1441 column_width
= int(width
/ n
)
1445 for i
in range(0, n
):
1446 ret
+= '-' * (column_width
- 1) + '+'
1450 ret
+= self
.get_pretty_row(elems
, width
)
1455 for i
in range(0, n
):
1456 ret
+= '-' * (column_width
- 1) + '+'
1462 def get_server(self
, hostname
: str) -> ServerInfoT
:
1464 Called by the plugin to fetch metadata about a particular hostname from
1467 This is information that ceph-mgr has gleaned from the daemon metadata
1468 reported by daemons running on a particular server.
1470 :param hostname: a hostname
1472 return cast(ServerInfoT
, self
._ceph
_get
_server
(hostname
))
1475 def get_perf_schema(self
,
1477 svc_name
: str) -> Dict
[str,
1478 Dict
[str, Dict
[str, Union
[str, int]]]]:
1480 Called by the plugin to fetch perf counter schema info.
1481 svc_name can be nullptr, as can svc_type, in which case
1484 :param str svc_type:
1485 :param str svc_name:
1486 :return: list of dicts describing the counters requested
1488 return self
._ceph
_get
_perf
_schema
(svc_type
, svc_name
)
1490 def get_rocksdb_version(self
) -> str:
1492 Called by the plugin to fetch the latest RocksDB version number.
1494 :return: str representing the major, minor, and patch RocksDB version numbers
1496 return self
._ceph
_get
_rocksdb
_version
()
1499 def get_counter(self
,
1502 path
: str) -> Dict
[str, List
[Tuple
[float, int]]]:
1504 Called by the plugin to fetch the latest performance counter data for a
1505 particular counter on a particular service.
1507 :param str svc_type:
1508 :param str svc_name:
1509 :param str path: a period-separated concatenation of the subsystem and the
1510 counter name, for example "mds.inodes".
1511 :return: A dict of counter names to their values. each value is a list of
1512 of two-tuples of (timestamp, value). This may be empty if no data is
1515 return self
._ceph
_get
_counter
(svc_type
, svc_name
, path
)
1518 def get_latest_counter(self
,
1521 path
: str) -> Dict
[str, Union
[Tuple
[float, int],
1522 Tuple
[float, int, int]]]:
1524 Called by the plugin to fetch only the newest performance counter data
1525 point for a particular counter on a particular service.
1527 :param str svc_type:
1528 :param str svc_name:
1529 :param str path: a period-separated concatenation of the subsystem and the
1530 counter name, for example "mds.inodes".
1531 :return: A list of two-tuples of (timestamp, value) or three-tuple of
1532 (timestamp, value, count) is returned. This may be empty if no
1535 return self
._ceph
_get
_latest
_counter
(svc_type
, svc_name
, path
)
1538 def list_servers(self
) -> List
[ServerInfoT
]:
1540 Like ``get_server``, but gives information about all servers (i.e. all
1541 unique hostnames that have been mentioned in daemon metadata)
1543 :return: a list of information about all servers
1546 return cast(List
[ServerInfoT
], self
._ceph
_get
_server
(None))
1548 def get_metadata(self
,
1551 default
: Optional
[Dict
[str, str]] = None) -> Optional
[Dict
[str, str]]:
1553 Fetch the daemon metadata for a particular service.
1555 ceph-mgr fetches metadata asynchronously, so are windows of time during
1556 addition/removal of services where the metadata is not available to
1557 modules. ``None`` is returned if no metadata is available.
1559 :param str svc_type: service type (e.g., 'mds', 'osd', 'mon')
1560 :param str svc_id: service id. convert OSD integer IDs to strings when
1562 :rtype: dict, or None if no metadata found
1564 metadata
= self
._ceph
_get
_metadata
(svc_type
, svc_id
)
1565 if metadata
is None:
1570 def get_daemon_status(self
, svc_type
: str, svc_id
: str) -> Dict
[str, str]:
1572 Fetch the latest status for a particular service daemon.
1574 This method may return ``None`` if no status information is
1575 available, for example because the daemon hasn't fully started yet.
1577 :param svc_type: string (e.g., 'rgw')
1578 :param svc_id: string
1579 :return: dict, or None if the service is not found
1581 return self
._ceph
_get
_daemon
_status
(svc_type
, svc_id
)
1583 def check_mon_command(self
, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> HandleCommandResult
:
1585 Wrapper around :func:`~mgr_module.MgrModule.mon_command`, but raises,
1589 r
= HandleCommandResult(*self
.mon_command(cmd_dict
, inbuf
))
1591 raise MonCommandFailed(f
'{cmd_dict["prefix"]} failed: {r.stderr} retval: {r.retval}')
1594 def mon_command(self
, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1596 Helper for modules that do simple, synchronous mon command
1599 See send_command for general case.
1601 :return: status int, out std, err str
1605 result
= CommandResult()
1606 self
.send_command(result
, "mon", "", json
.dumps(cmd_dict
), "", inbuf
)
1610 self
.log
.debug("mon_command: '{0}' -> {1} in {2:.3f}s".format(
1611 cmd_dict
['prefix'], r
[0], t2
- t1
1616 def osd_command(self
, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1618 Helper for osd command execution.
1620 See send_command for general case. Also, see osd/OSD.cc for available commands.
1622 :param dict cmd_dict: expects a prefix and an osd id, i.e.:
1624 'prefix': 'perf histogram dump',
1627 :return: status int, out std, err str
1630 result
= CommandResult()
1631 self
.send_command(result
, "osd", cmd_dict
['id'], json
.dumps(cmd_dict
), "", inbuf
)
1635 self
.log
.debug("osd_command: '{0}' -> {1} in {2:.3f}s".format(
1636 cmd_dict
['prefix'], r
[0], t2
- t1
1641 def tell_command(self
, daemon_type
: str, daemon_id
: str, cmd_dict
: dict, inbuf
: Optional
[str] = None) -> Tuple
[int, str, str]:
1643 Helper for `ceph tell` command execution.
1645 See send_command for general case.
1647 :param dict cmd_dict: expects a prefix i.e.:
1652 :return: status int, out std, err str
1655 result
= CommandResult()
1656 self
.send_command(result
, daemon_type
, daemon_id
, json
.dumps(cmd_dict
), "", inbuf
)
1660 self
.log
.debug("tell_command on {0}.{1}: '{2}' -> {3} in {4:.5f}s".format(
1661 daemon_type
, daemon_id
, cmd_dict
['prefix'], r
[0], t2
- t1
1668 result
: CommandResult
,
1673 inbuf
: Optional
[str] = None) -> None:
1675 Called by the plugin to send a command to the mon
1678 :param CommandResult result: an instance of the ``CommandResult``
1679 class, defined in the same module as MgrModule. This acts as a
1680 completion and stores the output of the command. Use
1681 ``CommandResult.wait()`` if you want to block on completion.
1682 :param str svc_type:
1684 :param str command: a JSON-serialized command. This uses the same
1685 format as the ceph command line, which is a dictionary of command
1686 arguments, with the extra ``prefix`` key containing the command
1687 name itself. Consult MonCommands.h for available commands and
1688 their expected arguments.
1689 :param str tag: used for nonblocking operation: when a command
1690 completes, the ``notify()`` callback on the MgrModule instance is
1691 triggered, with notify_type set to "command", and notify_id set to
1692 the tag of the command.
1693 :param str inbuf: input buffer for sending additional data.
1695 self
._ceph
_send
_command
(result
, svc_type
, svc_id
, command
, tag
, inbuf
)
1701 stdin
: Optional
[bytes
] = None
1702 ) -> Tuple
[int, str, str]:
1707 '-k', str(self
.get_ceph_option('keyring')),
1708 '-n', f
'mgr.{self.get_mgr_id()}',
1710 self
.log
.debug('exec: ' + ' '.join(cmd
))
1714 stdout
=subprocess
.PIPE
,
1715 stderr
=subprocess
.PIPE
,
1718 except subprocess
.TimeoutExpired
as ex
:
1720 return -errno
.ETIMEDOUT
, '', str(ex
)
1722 self
.log
.error(f
'Non-zero return from {cmd}: {p.stderr.decode()}')
1723 return p
.returncode
, p
.stdout
.decode(), p
.stderr
.decode()
1725 def set_health_checks(self
, checks
: HealthChecksT
) -> None:
1727 Set the module's current map of health checks. Argument is a
1728 dict of check names to info, in this form:
1734 'severity': 'warning', # or 'error'
1735 'summary': 'summary string',
1736 'count': 4, # quantify badness
1737 'detail': [ 'list', 'of', 'detail', 'strings' ],
1740 'severity': 'error',
1741 'summary': 'bars are bad',
1742 'detail': [ 'too hard' ],
1746 :param list: dict of health check dicts
1748 self
._ceph
_set
_health
_checks
(checks
)
1750 def _handle_command(self
,
1752 cmd
: Dict
[str, Any
]) -> Union
[HandleCommandResult
,
1753 Tuple
[int, str, str]]:
1754 if cmd
['prefix'] not in CLICommand
.COMMANDS
:
1755 return self
.handle_command(inbuf
, cmd
)
1757 return CLICommand
.COMMANDS
[cmd
['prefix']].call(self
, cmd
, inbuf
)
1759 def handle_command(self
,
1761 cmd
: Dict
[str, Any
]) -> Union
[HandleCommandResult
,
1762 Tuple
[int, str, str]]:
1764 Called by ceph-mgr to request the plugin to handle one
1765 of the commands that it declared in self.COMMANDS
1767 Return a status code, an output buffer, and an
1768 output string. The output buffer is for data results,
1769 the output string is for informative text.
1771 :param inbuf: content of any "-i <file>" supplied to ceph cli
1773 :param cmd: from Ceph's cmdmap_t
1776 :return: HandleCommandResult or a 3-tuple of (int, str, str)
1779 # Should never get called if they didn't declare
1781 raise NotImplementedError()
1783 def get_mgr_id(self
) -> str:
1785 Retrieve the name of the manager daemon where this plugin
1786 is currently being executed (i.e. the active manager).
1790 return self
._ceph
_get
_mgr
_id
()
1793 def get_ceph_conf_path(self
) -> str:
1794 return self
._ceph
_get
_ceph
_conf
_path
()
1797 def get_mgr_ip(self
) -> str:
1798 if not self
._mgr
_ips
:
1799 ips
= self
.get("mgr_ips").get('ips', [])
1801 return socket
.gethostname()
1802 self
._mgr
_ips
= ips
[0]
1803 assert self
._mgr
_ips
is not None
1804 return self
._mgr
_ips
1807 def get_ceph_option(self
, key
: str) -> OptionValue
:
1808 return self
._ceph
_get
_option
(key
)
1811 def get_foreign_ceph_option(self
, entity
: str, key
: str) -> OptionValue
:
1812 return self
._ceph
_get
_foreign
_option
(entity
, key
)
1814 def _validate_module_option(self
, key
: str) -> None:
1816 Helper: don't allow get/set config callers to
1817 access config options that they didn't declare
1820 if key
not in [o
['name'] for o
in self
.MODULE_OPTIONS
]:
1821 raise RuntimeError("Config option '{0}' is not in {1}.MODULE_OPTIONS".
1822 format(key
, self
.__class
__.__name
__))
1824 def _get_module_option(self
,
1826 default
: OptionValue
,
1827 localized_prefix
: str = "") -> OptionValue
:
1828 r
= self
._ceph
_get
_module
_option
(self
.module_name
, key
,
1831 return self
.MODULE_OPTION_DEFAULTS
.get(key
, default
)
1835 def get_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
1837 Retrieve the value of a persistent configuration setting
1839 self
._validate
_module
_option
(key
)
1840 return self
._get
_module
_option
(key
, default
)
1842 def get_module_option_ex(self
, module
: str,
1844 default
: OptionValue
= None) -> OptionValue
:
1846 Retrieve the value of a persistent configuration setting
1847 for the specified module.
1849 :param module: The name of the module, e.g. 'dashboard'
1851 :param key: The configuration key, e.g. 'server_addr'.
1852 :param default: The default value to use when the
1853 returned value is ``None``. Defaults to ``None``.
1855 if module
== self
.module_name
:
1856 self
._validate
_module
_option
(key
)
1857 r
= self
._ceph
_get
_module
_option
(module
, key
)
1858 return default
if r
is None else r
1861 def get_store_prefix(self
, key_prefix
: str) -> Dict
[str, str]:
1863 Retrieve a dict of KV store keys to values, where the keys
1864 have the given prefix
1866 :param str key_prefix:
1869 return self
._ceph
_get
_store
_prefix
(key_prefix
)
1871 def _set_localized(self
,
1874 setter
: Callable
[[str, Optional
[str]], None]) -> None:
1875 return setter(_get_localized_key(self
.get_mgr_id(), key
), val
)
1877 def get_localized_module_option(self
, key
: str, default
: OptionValue
= None) -> OptionValue
:
1879 Retrieve localized configuration for this ceph-mgr instance
1881 self
._validate
_module
_option
(key
)
1882 return self
._get
_module
_option
(key
, default
, self
.get_mgr_id())
1884 def _set_module_option(self
, key
: str, val
: Any
) -> None:
1885 return self
._ceph
_set
_module
_option
(self
.module_name
, key
,
1886 None if val
is None else str(val
))
1888 def set_module_option(self
, key
: str, val
: Any
) -> None:
1890 Set the value of a persistent configuration setting
1893 :type val: str | None
1894 :raises ValueError: if `val` cannot be parsed or it is out of the specified range
1896 self
._validate
_module
_option
(key
)
1897 return self
._set
_module
_option
(key
, val
)
1899 def set_module_option_ex(self
, module
: str, key
: str, val
: OptionValue
) -> None:
1901 Set the value of a persistent configuration setting
1902 for the specified module.
1908 if module
== self
.module_name
:
1909 self
._validate
_module
_option
(key
)
1910 return self
._ceph
_set
_module
_option
(module
, key
, str(val
))
1914 def set_localized_module_option(self
, key
: str, val
: Optional
[str]) -> None:
1916 Set localized configuration for this ceph-mgr instance
1921 self
._validate
_module
_option
(key
)
1922 return self
._set
_localized
(key
, val
, self
._set
_module
_option
)
1926 def set_store(self
, key
: str, val
: Optional
[str]) -> None:
1928 Set a value in this module's persistent key value store.
1929 If val is None, remove key from store
1931 self
._ceph
_set
_store
(key
, val
)
1934 def get_store(self
, key
: str, default
: Optional
[str] = None) -> Optional
[str]:
1936 Get a value from this module's persistent key value store
1938 r
= self
._ceph
_get
_store
(key
)
1945 def get_localized_store(self
, key
: str, default
: Optional
[str] = None) -> Optional
[str]:
1946 r
= self
._ceph
_get
_store
(_get_localized_key(self
.get_mgr_id(), key
))
1948 r
= self
._ceph
_get
_store
(key
)
1955 def set_localized_store(self
, key
: str, val
: Optional
[str]) -> None:
1956 return self
._set
_localized
(key
, val
, self
.set_store
)
1958 def self_test(self
) -> Optional
[str]:
1960 Run a self-test on the module. Override this function and implement
1961 a best as possible self-test for (automated) testing of the module
1963 Indicate any failures by raising an exception. This does not have
1964 to be pretty, it's mainly for picking up regressions during
1965 development, rather than use in the field.
1967 :return: None, or an advisory string for developer interest, such
1968 as a json dump of some state.
1972 def get_osdmap(self
) -> OSDMap
:
1974 Get a handle to an OSDMap. If epoch==0, get a handle for the latest
1978 return cast(OSDMap
, self
._ceph
_get
_osdmap
())
1981 def get_latest(self
, daemon_type
: str, daemon_name
: str, counter
: str) -> int:
1982 data
= self
.get_latest_counter(
1983 daemon_type
, daemon_name
, counter
)[counter
]
1990 def get_latest_avg(self
, daemon_type
: str, daemon_name
: str, counter
: str) -> Tuple
[int, int]:
1991 data
= self
.get_latest_counter(
1992 daemon_type
, daemon_name
, counter
)[counter
]
1994 # https://github.com/python/mypy/issues/1178
1995 _
, value
, count
= cast(Tuple
[float, int, int], data
)
2002 def get_all_perf_counters(self
, prio_limit
: int = PRIO_USEFUL
,
2003 services
: Sequence
[str] = ("mds", "mon", "osd",
2004 "rbd-mirror", "rgw",
2005 "tcmu-runner")) -> Dict
[str, dict]:
2007 Return the perf counters currently known to this ceph-mgr
2008 instance, filtered by priority equal to or greater than `prio_limit`.
2010 The result is a map of string to dict, associating services
2011 (like "osd.123") with their counters. The counter
2012 dict for each service maps counter paths to a counter
2013 info structure, which is the information from
2014 the schema, plus an additional "value" member with the latest
2018 result
= defaultdict(dict) # type: Dict[str, dict]
2020 for server
in self
.list_servers():
2021 for service
in cast(List
[ServiceInfoT
], server
['services']):
2022 if service
['type'] not in services
:
2025 schemas
= self
.get_perf_schema(service
['type'], service
['id'])
2027 self
.log
.warning("No perf counter schema for {0}.{1}".format(
2028 service
['type'], service
['id']
2032 # Value is returned in a potentially-multi-service format,
2033 # get just the service we're asking about
2034 svc_full_name
= "{0}.{1}".format(
2035 service
['type'], service
['id'])
2036 schema
= schemas
[svc_full_name
]
2038 # Populate latest values
2039 for counter_path
, counter_schema
in schema
.items():
2040 # self.log.debug("{0}: {1}".format(
2041 # counter_path, json.dumps(counter_schema)
2043 priority
= counter_schema
['priority']
2044 assert isinstance(priority
, int)
2045 if priority
< prio_limit
:
2048 tp
= counter_schema
['type']
2049 assert isinstance(tp
, int)
2050 counter_info
= dict(counter_schema
)
2051 # Also populate count for the long running avgs
2052 if tp
& self
.PERFCOUNTER_LONGRUNAVG
:
2053 v
, c
= self
.get_latest_avg(
2058 counter_info
['value'], counter_info
['count'] = v
, c
2059 result
[svc_full_name
][counter_path
] = counter_info
2061 counter_info
['value'] = self
.get_latest(
2067 result
[svc_full_name
][counter_path
] = counter_info
2069 self
.log
.debug("returning {0} counter".format(len(result
)))
2074 def set_uri(self
, uri
: str) -> None:
2076 If the module exposes a service, then call this to publish the
2077 address once it is available.
2081 return self
._ceph
_set
_uri
(uri
)
2085 def set_device_wear_level(self
, devid
: str, wear_level
: float) -> None:
2086 return self
._ceph
_set
_device
_wear
_level
(devid
, wear_level
)
2089 def have_mon_connection(self
) -> bool:
2091 Check whether this ceph-mgr daemon has an open connection
2092 to a monitor. If it doesn't, then it's likely that the
2093 information we have about the cluster is out of date,
2094 and/or the monitor cluster is down.
2097 return self
._ceph
_have
_mon
_connection
()
2099 def update_progress_event(self
,
2103 add_to_ceph_s
: bool) -> None:
2104 return self
._ceph
_update
_progress
_event
(evid
, desc
, progress
, add_to_ceph_s
)
2108 def complete_progress_event(self
, evid
: str) -> None:
2109 return self
._ceph
_complete
_progress
_event
(evid
)
2113 def clear_all_progress_events(self
) -> None:
2114 return self
._ceph
_clear
_all
_progress
_events
()
2117 def rados(self
) -> rados
.Rados
:
2119 A librados instance to be shared by any classes within
2120 this mgr module that want one.
2125 ctx_capsule
= self
.get_context()
2126 self
._rados
= rados
.Rados(context
=ctx_capsule
)
2127 self
._rados
.connect()
2128 self
._ceph
_register
_client
(self
._rados
.get_addrs())
2132 def can_run() -> Tuple
[bool, str]:
2134 Implement this function to report whether the module's dependencies
2135 are met. For example, if the module needs to import a particular
2136 dependency to work, then use a try/except around the import at
2137 file scope, and then report here if the import failed.
2139 This will be called in a blocking way from the C++ code, so do not
2140 do any I/O that could block in this function.
2142 :return a 2-tuple consisting of a boolean and explanatory string
2148 def remote(self
, module_name
: str, method_name
: str, *args
: Any
, **kwargs
: Any
) -> Any
:
2150 Invoke a method on another module. All arguments, and the return
2151 value from the other module must be serializable.
2153 Limitation: Do not import any modules within the called method.
2154 Otherwise you will get an error in Python 2::
2156 RuntimeError('cannot unmarshal code objects in restricted execution mode',)
2160 :param module_name: Name of other module. If module isn't loaded,
2161 an ImportError exception is raised.
2162 :param method_name: Method name. If it does not exist, a NameError
2163 exception is raised.
2164 :param args: Argument tuple
2165 :param kwargs: Keyword argument dict
2166 :raises RuntimeError: **Any** error raised within the method is converted to a RuntimeError
2167 :raises ImportError: No such module
2169 return self
._ceph
_dispatch
_remote
(module_name
, method_name
,
2172 def add_osd_perf_query(self
, query
: Dict
[str, Any
]) -> Optional
[int]:
2174 Register an OSD perf query. Argument is a
2175 dict of the query parameters, in this form:
2181 {'type': subkey_type, 'regex': regex_pattern},
2184 'performance_counter_descriptors': [
2185 list, of, descriptor, types
2187 'limit': {'order_by': performance_counter_type, 'max_count': n},
2191 'client_id', 'client_address', 'pool_id', 'namespace', 'osd_id',
2192 'pg_id', 'object_name', 'snap_id'
2193 Valid performance counter types:
2194 'ops', 'write_ops', 'read_ops', 'bytes', 'write_bytes', 'read_bytes',
2195 'latency', 'write_latency', 'read_latency'
2197 :param object query: query
2198 :rtype: int (query id)
2200 return self
._ceph
_add
_osd
_perf
_query
(query
)
2204 def remove_osd_perf_query(self
, query_id
: int) -> None:
2206 Unregister an OSD perf query.
2208 :param int query_id: query ID
2210 return self
._ceph
_remove
_osd
_perf
_query
(query_id
)
2213 def get_osd_perf_counters(self
, query_id
: int) -> Optional
[Dict
[str, List
[PerfCounterT
]]]:
2215 Get stats collected for an OSD perf query.
2217 :param int query_id: query ID
2219 return self
._ceph
_get
_osd
_perf
_counters
(query_id
)
2221 def add_mds_perf_query(self
, query
: Dict
[str, Any
]) -> Optional
[int]:
2223 Register an MDS perf query. Argument is a
2224 dict of the query parameters, in this form:
2230 {'type': subkey_type, 'regex': regex_pattern},
2233 'performance_counter_descriptors': [
2234 list, of, descriptor, types
2238 NOTE: 'limit' and 'order_by' are not supported (yet).
2241 'mds_rank', 'client_id'
2242 Valid performance counter types:
2245 :param object query: query
2246 :rtype: int (query id)
2248 return self
._ceph
_add
_mds
_perf
_query
(query
)
2252 def remove_mds_perf_query(self
, query_id
: int) -> None:
2254 Unregister an MDS perf query.
2256 :param int query_id: query ID
2258 return self
._ceph
_remove
_mds
_perf
_query
(query_id
)
2262 def reregister_mds_perf_queries(self
) -> None:
2264 Re-register MDS perf queries.
2266 return self
._ceph
_reregister
_mds
_perf
_queries
()
2268 def get_mds_perf_counters(self
, query_id
: int) -> Optional
[Dict
[str, List
[PerfCounterT
]]]:
2270 Get stats collected for an MDS perf query.
2272 :param int query_id: query ID
2274 return self
._ceph
_get
_mds
_perf
_counters
(query_id
)
2276 def is_authorized(self
, arguments
: Dict
[str, str]) -> bool:
2278 Verifies that the current session caps permit executing the py service
2279 or current module with the provided arguments. This provides a generic
2280 way to allow modules to restrict by more fine-grained controls (e.g.
2283 :param arguments: dict of key/value arguments to test
2285 return self
._ceph
_is
_authorized
(arguments
)
2288 def send_rgwadmin_command(self
, args
: List
[str],
2289 stdout_as_json
: bool = True) -> Tuple
[int, Union
[str, dict], str]:
2293 '-c', str(self
.get_ceph_conf_path()),
2294 '-k', str(self
.get_ceph_option('keyring')),
2295 '-n', f
'mgr.{self.get_mgr_id()}',
2297 self
.log
.debug('Executing %s', str(cmd
))
2298 result
= subprocess
.run( # pylint: disable=subprocess-run-check
2300 stdout
=subprocess
.PIPE
,
2301 stderr
=subprocess
.PIPE
,
2304 stdout
= result
.stdout
.decode('utf-8')
2305 stderr
= result
.stderr
.decode('utf-8')
2306 if stdout
and stdout_as_json
:
2307 stdout
= json
.loads(stdout
)
2308 if result
.returncode
:
2309 self
.log
.debug('Error %s executing %s: %s', result
.returncode
, str(cmd
), stderr
)
2310 return result
.returncode
, stdout
, stderr
2311 except subprocess
.CalledProcessError
as ex
:
2312 self
.log
.exception('Error executing radosgw-admin %s: %s', str(ex
.cmd
), str(ex
.output
))
2314 except subprocess
.TimeoutExpired
as ex
:
2315 self
.log
.error('Timeout (10s) executing radosgw-admin %s', str(ex
.cmd
))