5 from functools
import wraps
6 from typing
import Optional
, Callable
, TypeVar
, List
, NewType
, TYPE_CHECKING
, Any
, NamedTuple
7 from orchestrator
import OrchestratorError
10 from cephadm
import CephadmOrchestrator
13 logger
= logging
.getLogger(__name__
)
15 ConfEntity
= NewType('ConfEntity', str)
18 class CephadmNoImage(Enum
):
22 # ceph daemon types that use the ceph container image.
23 # NOTE: order important here as these are used for upgrade order
24 CEPH_TYPES
= ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror', 'cephfs-mirror']
25 GATEWAY_TYPES
= ['iscsi', 'nfs']
26 MONITORING_STACK_TYPES
= ['node-exporter', 'prometheus',
27 'alertmanager', 'grafana', 'loki', 'promtail']
28 RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
= ['nfs']
30 CEPH_UPGRADE_ORDER
= CEPH_TYPES
+ GATEWAY_TYPES
+ MONITORING_STACK_TYPES
32 # these daemon types use the ceph container image
33 CEPH_IMAGE_TYPES
= CEPH_TYPES
+ ['iscsi', 'nfs']
35 # Used for _run_cephadm used for check-host etc that don't require an --image parameter
36 cephadmNoImage
= CephadmNoImage
.token
39 class ContainerInspectInfo(NamedTuple
):
41 ceph_version
: Optional
[str]
42 repo_digests
: Optional
[List
[str]]
45 def name_to_config_section(name
: str) -> ConfEntity
:
47 Map from daemon names to ceph entity names (as seen in config)
49 daemon_type
= name
.split('.', 1)[0]
50 if daemon_type
in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi']:
51 return ConfEntity('client.' + name
)
52 elif daemon_type
in ['mon', 'osd', 'mds', 'mgr', 'client']:
53 return ConfEntity(name
)
55 return ConfEntity('mon')
58 def forall_hosts(f
: Callable
[..., T
]) -> Callable
[..., List
[T
]]:
60 def forall_hosts_wrapper(*args
: Any
) -> List
[T
]:
61 from cephadm
.module
import CephadmOrchestrator
63 # Some weired logic to make calling functions with multiple arguments work.
70 assert 'either f([...]) or self.f([...])'
72 def do_work(arg
: Any
) -> T
:
73 if not isinstance(arg
, tuple):
80 logger
.exception(f
'executing {f.__name__}({args}) failed.')
83 assert CephadmOrchestrator
.instance
is not None
84 return CephadmOrchestrator
.instance
._worker
_pool
.map(do_work
, vals
)
86 return forall_hosts_wrapper
89 def get_cluster_health(mgr
: 'CephadmOrchestrator') -> str:
90 # check cluster health
91 ret
, out
, err
= mgr
.check_mon_command({
98 msg
= 'Failed to parse health status: Cannot decode JSON'
99 logger
.exception('%s: \'%s\'' % (msg
, out
))
100 raise OrchestratorError('failed to parse health status')
105 def is_repo_digest(image_name
: str) -> bool:
107 repo digest are something like "ceph/ceph@sha256:blablabla"
109 return '@' in image_name
112 def resolve_ip(hostname
: str) -> str:
114 r
= socket
.getaddrinfo(hostname
, None, flags
=socket
.AI_CANONNAME
,
115 type=socket
.SOCK_STREAM
)
116 # pick first v4 IP, if present
118 if a
[0] == socket
.AF_INET
:
121 except socket
.gaierror
as e
:
122 raise OrchestratorError(f
"Cannot resolve ip for host {hostname}: {e}")
125 def ceph_release_to_major(release
: str) -> int:
126 return ord(release
[0]) - ord('a') + 1
129 def file_mode_to_str(mode
: int) -> str:
131 for shift
in range(0, 9, 3):
133 f
'{"r" if (mode >> shift) & 4 else "-"}'
134 f
'{"w" if (mode >> shift) & 2 else "-"}'
135 f
'{"x" if (mode >> shift) & 1 else "-"}'