]>
Commit | Line | Data |
---|---|---|
f6b5b4d7 | 1 | import logging |
f6b5b4d7 | 2 | import json |
adb31ebb | 3 | import socket |
f6b5b4d7 TL |
4 | from enum import Enum |
5 | from functools import wraps | |
f67539c2 | 6 | from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any, NamedTuple |
801d1391 TL |
7 | from orchestrator import OrchestratorError |
8 | ||
f6b5b4d7 TL |
9 | if TYPE_CHECKING: |
10 | from cephadm import CephadmOrchestrator | |
11 | ||
12 | T = TypeVar('T') | |
13 | logger = logging.getLogger(__name__) | |
14 | ||
15 | ConfEntity = NewType('ConfEntity', str) | |
f91f0fd5 | 16 | |
f6b5b4d7 TL |
17 | |
18 | class CephadmNoImage(Enum): | |
19 | token = 1 | |
20 | ||
21 | ||
f67539c2 TL |
22 | # ceph daemon types that use the ceph container image. |
23 | # NOTE: order important here as these are used for upgrade order | |
24 | CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror', 'cephfs-mirror'] | |
25 | GATEWAY_TYPES = ['iscsi', 'nfs'] | |
33c7a0ef TL |
26 | MONITORING_STACK_TYPES = ['node-exporter', 'prometheus', |
27 | 'alertmanager', 'grafana', 'loki', 'promtail'] | |
28 | RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES = ['nfs'] | |
a4b75251 | 29 | |
f67539c2 TL |
30 | CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES |
31 | ||
a4b75251 TL |
32 | # these daemon types use the ceph container image |
33 | CEPH_IMAGE_TYPES = CEPH_TYPES + ['iscsi', 'nfs'] | |
f67539c2 | 34 | |
f6b5b4d7 TL |
35 | # Used for _run_cephadm used for check-host etc that don't require an --image parameter |
36 | cephadmNoImage = CephadmNoImage.token | |
37 | ||
38 | ||
f67539c2 TL |
39 | class ContainerInspectInfo(NamedTuple): |
40 | image_id: str | |
41 | ceph_version: Optional[str] | |
42 | repo_digests: Optional[List[str]] | |
43 | ||
44 | ||
f6b5b4d7 | 45 | def name_to_config_section(name: str) -> ConfEntity: |
801d1391 TL |
46 | """ |
47 | Map from daemon names to ceph entity names (as seen in config) | |
48 | """ | |
49 | daemon_type = name.split('.', 1)[0] | |
1911f103 | 50 | if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi']: |
f6b5b4d7 | 51 | return ConfEntity('client.' + name) |
801d1391 | 52 | elif daemon_type in ['mon', 'osd', 'mds', 'mgr', 'client']: |
f6b5b4d7 | 53 | return ConfEntity(name) |
801d1391 | 54 | else: |
f6b5b4d7 | 55 | return ConfEntity('mon') |
e306af50 TL |
56 | |
57 | ||
f6b5b4d7 TL |
58 | def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]: |
59 | @wraps(f) | |
adb31ebb | 60 | def forall_hosts_wrapper(*args: Any) -> List[T]: |
f6b5b4d7 TL |
61 | from cephadm.module import CephadmOrchestrator |
62 | ||
63 | # Some weired logic to make calling functions with multiple arguments work. | |
64 | if len(args) == 1: | |
65 | vals = args[0] | |
66 | self = None | |
67 | elif len(args) == 2: | |
68 | self, vals = args | |
69 | else: | |
70 | assert 'either f([...]) or self.f([...])' | |
71 | ||
adb31ebb | 72 | def do_work(arg: Any) -> T: |
f6b5b4d7 TL |
73 | if not isinstance(arg, tuple): |
74 | arg = (arg, ) | |
75 | try: | |
76 | if self: | |
77 | return f(self, *arg) | |
78 | return f(*arg) | |
f67539c2 | 79 | except Exception: |
f6b5b4d7 TL |
80 | logger.exception(f'executing {f.__name__}({args}) failed.') |
81 | raise | |
82 | ||
83 | assert CephadmOrchestrator.instance is not None | |
84 | return CephadmOrchestrator.instance._worker_pool.map(do_work, vals) | |
85 | ||
86 | return forall_hosts_wrapper | |
87 | ||
88 | ||
89 | def get_cluster_health(mgr: 'CephadmOrchestrator') -> str: | |
90 | # check cluster health | |
91 | ret, out, err = mgr.check_mon_command({ | |
92 | 'prefix': 'health', | |
93 | 'format': 'json', | |
94 | }) | |
95 | try: | |
96 | j = json.loads(out) | |
adb31ebb TL |
97 | except ValueError: |
98 | msg = 'Failed to parse health status: Cannot decode JSON' | |
99 | logger.exception('%s: \'%s\'' % (msg, out)) | |
f6b5b4d7 TL |
100 | raise OrchestratorError('failed to parse health status') |
101 | ||
102 | return j['status'] | |
f91f0fd5 TL |
103 | |
104 | ||
105 | def is_repo_digest(image_name: str) -> bool: | |
106 | """ | |
107 | repo digest are something like "ceph/ceph@sha256:blablabla" | |
108 | """ | |
109 | return '@' in image_name | |
110 | ||
111 | ||
adb31ebb TL |
112 | def resolve_ip(hostname: str) -> str: |
113 | try: | |
f67539c2 TL |
114 | r = socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME, |
115 | type=socket.SOCK_STREAM) | |
116 | # pick first v4 IP, if present | |
117 | for a in r: | |
118 | if a[0] == socket.AF_INET: | |
119 | return a[4][0] | |
120 | return r[0][4][0] | |
adb31ebb TL |
121 | except socket.gaierror as e: |
122 | raise OrchestratorError(f"Cannot resolve ip for host {hostname}: {e}") | |
f67539c2 TL |
123 | |
124 | ||
125 | def ceph_release_to_major(release: str) -> int: | |
126 | return ord(release[0]) - ord('a') + 1 | |
b3b6e05e TL |
127 | |
128 | ||
129 | def file_mode_to_str(mode: int) -> str: | |
130 | r = '' | |
131 | for shift in range(0, 9, 3): | |
132 | r = ( | |
133 | f'{"r" if (mode >> shift) & 4 else "-"}' | |
134 | f'{"w" if (mode >> shift) & 2 else "-"}' | |
135 | f'{"x" if (mode >> shift) & 1 else "-"}' | |
136 | ) + r | |
137 | return r |