5 from functools
import wraps
6 from typing
import Optional
, Callable
, TypeVar
, List
, NewType
, TYPE_CHECKING
, Any
, NamedTuple
7 from orchestrator
import OrchestratorError
10 from cephadm
import CephadmOrchestrator
13 logger
= logging
.getLogger(__name__
)
15 ConfEntity
= NewType('ConfEntity', str)
18 class CephadmNoImage(Enum
):
22 # ceph daemon types that use the ceph container image.
23 # NOTE: order important here as these are used for upgrade order
24 CEPH_TYPES
= ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw',
25 'rbd-mirror', 'cephfs-mirror', 'ceph-exporter']
26 GATEWAY_TYPES
= ['iscsi', 'nfs']
27 MONITORING_STACK_TYPES
= ['node-exporter', 'prometheus',
28 'alertmanager', 'grafana', 'loki', 'promtail']
29 RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES
= ['haproxy', 'nfs']
31 CEPH_UPGRADE_ORDER
= CEPH_TYPES
+ GATEWAY_TYPES
+ MONITORING_STACK_TYPES
33 # these daemon types use the ceph container image
34 CEPH_IMAGE_TYPES
= CEPH_TYPES
+ ['iscsi', 'nfs']
36 # Used for _run_cephadm used for check-host etc that don't require an --image parameter
37 cephadmNoImage
= CephadmNoImage
.token
40 class ContainerInspectInfo(NamedTuple
):
42 ceph_version
: Optional
[str]
43 repo_digests
: Optional
[List
[str]]
46 def name_to_config_section(name
: str) -> ConfEntity
:
48 Map from daemon names to ceph entity names (as seen in config)
50 daemon_type
= name
.split('.', 1)[0]
51 if daemon_type
in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi', 'ceph-exporter']:
52 return ConfEntity('client.' + name
)
53 elif daemon_type
in ['mon', 'osd', 'mds', 'mgr', 'client']:
54 return ConfEntity(name
)
56 return ConfEntity('mon')
59 def forall_hosts(f
: Callable
[..., T
]) -> Callable
[..., List
[T
]]:
61 def forall_hosts_wrapper(*args
: Any
) -> List
[T
]:
62 from cephadm
.module
import CephadmOrchestrator
64 # Some weird logic to make calling functions with multiple arguments work.
71 assert 'either f([...]) or self.f([...])'
73 def do_work(arg
: Any
) -> T
:
74 if not isinstance(arg
, tuple):
81 logger
.exception(f
'executing {f.__name__}({args}) failed.')
84 assert CephadmOrchestrator
.instance
is not None
85 return CephadmOrchestrator
.instance
._worker
_pool
.map(do_work
, vals
)
87 return forall_hosts_wrapper
90 def get_cluster_health(mgr
: 'CephadmOrchestrator') -> str:
91 # check cluster health
92 ret
, out
, err
= mgr
.check_mon_command({
99 msg
= 'Failed to parse health status: Cannot decode JSON'
100 logger
.exception('%s: \'%s\'' % (msg
, out
))
101 raise OrchestratorError('failed to parse health status')
106 def is_repo_digest(image_name
: str) -> bool:
108 repo digest are something like "ceph/ceph@sha256:blablabla"
110 return '@' in image_name
113 def resolve_ip(hostname
: str) -> str:
115 r
= socket
.getaddrinfo(hostname
, None, flags
=socket
.AI_CANONNAME
,
116 type=socket
.SOCK_STREAM
)
117 # pick first v4 IP, if present
119 if a
[0] == socket
.AF_INET
:
122 except socket
.gaierror
as e
:
123 raise OrchestratorError(f
"Cannot resolve ip for host {hostname}: {e}")
126 def ceph_release_to_major(release
: str) -> int:
127 return ord(release
[0]) - ord('a') + 1
130 def file_mode_to_str(mode
: int) -> str:
132 for shift
in range(0, 9, 3):
134 f
'{"r" if (mode >> shift) & 4 else "-"}'
135 f
'{"w" if (mode >> shift) & 2 else "-"}'
136 f
'{"x" if (mode >> shift) & 1 else "-"}'