]>
Commit | Line | Data |
---|---|---|
f6b5b4d7 | 1 | import logging |
f6b5b4d7 | 2 | import json |
adb31ebb | 3 | import socket |
f6b5b4d7 TL |
4 | from enum import Enum |
5 | from functools import wraps | |
f67539c2 | 6 | from typing import Optional, Callable, TypeVar, List, NewType, TYPE_CHECKING, Any, NamedTuple |
801d1391 TL |
7 | from orchestrator import OrchestratorError |
8 | ||
f6b5b4d7 TL |
9 | if TYPE_CHECKING: |
10 | from cephadm import CephadmOrchestrator | |
11 | ||
12 | T = TypeVar('T') | |
13 | logger = logging.getLogger(__name__) | |
14 | ||
15 | ConfEntity = NewType('ConfEntity', str) | |
f91f0fd5 | 16 | |
f6b5b4d7 TL |
17 | |
18 | class CephadmNoImage(Enum): | |
19 | token = 1 | |
20 | ||
21 | ||
f67539c2 TL |
22 | # ceph daemon types that use the ceph container image. |
23 | # NOTE: order important here as these are used for upgrade order | |
24 | CEPH_TYPES = ['mgr', 'mon', 'crash', 'osd', 'mds', 'rgw', 'rbd-mirror', 'cephfs-mirror'] | |
25 | GATEWAY_TYPES = ['iscsi', 'nfs'] | |
26 | MONITORING_STACK_TYPES = ['node-exporter', 'prometheus', 'alertmanager', 'grafana'] | |
27 | CEPH_UPGRADE_ORDER = CEPH_TYPES + GATEWAY_TYPES + MONITORING_STACK_TYPES | |
28 | ||
29 | ||
f6b5b4d7 TL |
30 | # Used for _run_cephadm used for check-host etc that don't require an --image parameter |
31 | cephadmNoImage = CephadmNoImage.token | |
32 | ||
33 | ||
f67539c2 TL |
34 | class ContainerInspectInfo(NamedTuple): |
35 | image_id: str | |
36 | ceph_version: Optional[str] | |
37 | repo_digests: Optional[List[str]] | |
38 | ||
39 | ||
f6b5b4d7 | 40 | def name_to_config_section(name: str) -> ConfEntity: |
801d1391 TL |
41 | """ |
42 | Map from daemon names to ceph entity names (as seen in config) | |
43 | """ | |
44 | daemon_type = name.split('.', 1)[0] | |
1911f103 | 45 | if daemon_type in ['rgw', 'rbd-mirror', 'nfs', 'crash', 'iscsi']: |
f6b5b4d7 | 46 | return ConfEntity('client.' + name) |
801d1391 | 47 | elif daemon_type in ['mon', 'osd', 'mds', 'mgr', 'client']: |
f6b5b4d7 | 48 | return ConfEntity(name) |
801d1391 | 49 | else: |
f6b5b4d7 | 50 | return ConfEntity('mon') |
e306af50 TL |
51 | |
52 | ||
f6b5b4d7 TL |
53 | def forall_hosts(f: Callable[..., T]) -> Callable[..., List[T]]: |
54 | @wraps(f) | |
adb31ebb | 55 | def forall_hosts_wrapper(*args: Any) -> List[T]: |
f6b5b4d7 TL |
56 | from cephadm.module import CephadmOrchestrator |
57 | ||
58 | # Some weired logic to make calling functions with multiple arguments work. | |
59 | if len(args) == 1: | |
60 | vals = args[0] | |
61 | self = None | |
62 | elif len(args) == 2: | |
63 | self, vals = args | |
64 | else: | |
65 | assert 'either f([...]) or self.f([...])' | |
66 | ||
adb31ebb | 67 | def do_work(arg: Any) -> T: |
f6b5b4d7 TL |
68 | if not isinstance(arg, tuple): |
69 | arg = (arg, ) | |
70 | try: | |
71 | if self: | |
72 | return f(self, *arg) | |
73 | return f(*arg) | |
f67539c2 | 74 | except Exception: |
f6b5b4d7 TL |
75 | logger.exception(f'executing {f.__name__}({args}) failed.') |
76 | raise | |
77 | ||
78 | assert CephadmOrchestrator.instance is not None | |
79 | return CephadmOrchestrator.instance._worker_pool.map(do_work, vals) | |
80 | ||
81 | return forall_hosts_wrapper | |
82 | ||
83 | ||
84 | def get_cluster_health(mgr: 'CephadmOrchestrator') -> str: | |
85 | # check cluster health | |
86 | ret, out, err = mgr.check_mon_command({ | |
87 | 'prefix': 'health', | |
88 | 'format': 'json', | |
89 | }) | |
90 | try: | |
91 | j = json.loads(out) | |
adb31ebb TL |
92 | except ValueError: |
93 | msg = 'Failed to parse health status: Cannot decode JSON' | |
94 | logger.exception('%s: \'%s\'' % (msg, out)) | |
f6b5b4d7 TL |
95 | raise OrchestratorError('failed to parse health status') |
96 | ||
97 | return j['status'] | |
f91f0fd5 TL |
98 | |
99 | ||
100 | def is_repo_digest(image_name: str) -> bool: | |
101 | """ | |
102 | repo digest are something like "ceph/ceph@sha256:blablabla" | |
103 | """ | |
104 | return '@' in image_name | |
105 | ||
106 | ||
adb31ebb TL |
107 | def resolve_ip(hostname: str) -> str: |
108 | try: | |
f67539c2 TL |
109 | r = socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME, |
110 | type=socket.SOCK_STREAM) | |
111 | # pick first v4 IP, if present | |
112 | for a in r: | |
113 | if a[0] == socket.AF_INET: | |
114 | return a[4][0] | |
115 | return r[0][4][0] | |
adb31ebb TL |
116 | except socket.gaierror as e: |
117 | raise OrchestratorError(f"Cannot resolve ip for host {hostname}: {e}") | |
f67539c2 TL |
118 | |
119 | ||
120 | def ceph_release_to_major(release: str) -> int: | |
121 | return ord(release[0]) - ord('a') + 1 | |
b3b6e05e TL |
122 | |
123 | ||
124 | def file_mode_to_str(mode: int) -> str: | |
125 | r = '' | |
126 | for shift in range(0, 9, 3): | |
127 | r = ( | |
128 | f'{"r" if (mode >> shift) & 4 else "-"}' | |
129 | f'{"w" if (mode >> shift) & 2 else "-"}' | |
130 | f'{"x" if (mode >> shift) & 1 else "-"}' | |
131 | ) + r | |
132 | return r |