]>
Commit | Line | Data |
---|---|---|
f67539c2 | 1 | import errno |
f6b5b4d7 | 2 | import json |
e306af50 | 3 | import logging |
f67539c2 | 4 | import re |
33c7a0ef TL |
5 | import socket |
6 | import time | |
f6b5b4d7 | 7 | from abc import ABCMeta, abstractmethod |
f67539c2 TL |
8 | from typing import TYPE_CHECKING, List, Callable, TypeVar, \ |
9 | Optional, Dict, Any, Tuple, NewType, cast | |
e306af50 | 10 | |
f6b5b4d7 | 11 | from mgr_module import HandleCommandResult, MonCommandFailed |
e306af50 | 12 | |
aee94f69 TL |
13 | from ceph.deployment.service_spec import ( |
14 | ArgumentList, | |
15 | CephExporterSpec, | |
16 | GeneralArgList, | |
f51cf556 | 17 | InitContainerSpec, |
aee94f69 TL |
18 | MONSpec, |
19 | RGWSpec, | |
20 | ServiceSpec, | |
21 | ) | |
f91f0fd5 | 22 | from ceph.deployment.utils import is_ipv6, unwrap_ipv6 |
39ae355f | 23 | from mgr_util import build_url, merge_dicts |
f67539c2 TL |
24 | from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus |
25 | from orchestrator._interface import daemon_type_to_service | |
e306af50 TL |
26 | from cephadm import utils |
27 | ||
28 | if TYPE_CHECKING: | |
29 | from cephadm.module import CephadmOrchestrator | |
30 | ||
31 | logger = logging.getLogger(__name__) | |
32 | ||
f6b5b4d7 | 33 | ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec) |
f91f0fd5 | 34 | AuthEntity = NewType('AuthEntity', str) |
e306af50 | 35 | |
f6b5b4d7 | 36 | |
39ae355f TL |
37 | def get_auth_entity(daemon_type: str, daemon_id: str, host: str = "") -> AuthEntity: |
38 | """ | |
39 | Map the daemon id to a cephx keyring entity name | |
40 | """ | |
41 | # despite this mapping entity names to daemons, self.TYPE within | |
42 | # the CephService class refers to service types, not daemon types | |
aee94f69 | 43 | if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'nvmeof', 'ingress', 'ceph-exporter']: |
39ae355f | 44 | return AuthEntity(f'client.{daemon_type}.{daemon_id}') |
f38dd50b | 45 | elif daemon_type in ['crash', 'agent', 'node-proxy']: |
39ae355f TL |
46 | if host == "": |
47 | raise OrchestratorError( | |
48 | f'Host not provided to generate <{daemon_type}> auth entity name') | |
49 | return AuthEntity(f'client.{daemon_type}.{host}') | |
50 | elif daemon_type == 'mon': | |
51 | return AuthEntity('mon.') | |
52 | elif daemon_type in ['mgr', 'osd', 'mds']: | |
53 | return AuthEntity(f'{daemon_type}.{daemon_id}') | |
54 | else: | |
55 | raise OrchestratorError(f"unknown daemon type {daemon_type}") | |
56 | ||
57 | ||
f51cf556 TL |
58 | def simplified_keyring(entity: str, contents: str) -> str: |
59 | # strip down keyring | |
60 | # - don't include caps (auth get includes them; get-or-create does not) | |
61 | # - use pending key if present | |
62 | key = None | |
63 | for line in contents.splitlines(): | |
64 | if ' = ' not in line: | |
65 | continue | |
66 | line = line.strip() | |
67 | (ls, rs) = line.split(' = ', 1) | |
68 | if ls == 'key' and not key: | |
69 | key = rs | |
70 | if ls == 'pending key': | |
71 | key = rs | |
72 | keyring = f'[{entity}]\nkey = {key}\n' | |
73 | return keyring | |
74 | ||
75 | ||
f67539c2 | 76 | class CephadmDaemonDeploySpec: |
f6b5b4d7 | 77 | # typing.NamedTuple + Generic is broken in py36 |
f91f0fd5 | 78 | def __init__(self, host: str, daemon_id: str, |
f67539c2 | 79 | service_name: str, |
f91f0fd5 TL |
80 | network: Optional[str] = None, |
81 | keyring: Optional[str] = None, | |
82 | extra_args: Optional[List[str]] = None, | |
83 | ceph_conf: str = '', | |
84 | extra_files: Optional[Dict[str, Any]] = None, | |
85 | daemon_type: Optional[str] = None, | |
f67539c2 | 86 | ip: Optional[str] = None, |
b3b6e05e | 87 | ports: Optional[List[int]] = None, |
aee94f69 | 88 | port_ips: Optional[Dict[str, str]] = None, |
b3b6e05e | 89 | rank: Optional[int] = None, |
20effc67 | 90 | rank_generation: Optional[int] = None, |
aee94f69 TL |
91 | extra_container_args: Optional[ArgumentList] = None, |
92 | extra_entrypoint_args: Optional[ArgumentList] = None, | |
f51cf556 | 93 | init_containers: Optional[List[InitContainerSpec]] = None, |
2a845540 | 94 | ): |
f6b5b4d7 | 95 | """ |
f67539c2 | 96 | A data struction to encapsulate `cephadm deploy ... |
f6b5b4d7 TL |
97 | """ |
98 | self.host: str = host | |
99 | self.daemon_id = daemon_id | |
f67539c2 TL |
100 | self.service_name = service_name |
101 | daemon_type = daemon_type or (service_name.split('.')[0]) | |
f6b5b4d7 TL |
102 | assert daemon_type is not None |
103 | self.daemon_type: str = daemon_type | |
104 | ||
f6b5b4d7 TL |
105 | # mons |
106 | self.network = network | |
107 | ||
108 | # for run_cephadm. | |
109 | self.keyring: Optional[str] = keyring | |
110 | ||
aee94f69 | 111 | # FIXME: finish removing this |
f6b5b4d7 | 112 | # For run_cephadm. Would be great to have more expressive names. |
aee94f69 TL |
113 | # self.extra_args: List[str] = extra_args or [] |
114 | assert not extra_args | |
f91f0fd5 TL |
115 | |
116 | self.ceph_conf = ceph_conf | |
117 | self.extra_files = extra_files or {} | |
f6b5b4d7 TL |
118 | |
119 | # TCP ports used by the daemon | |
f67539c2 | 120 | self.ports: List[int] = ports or [] |
aee94f69 TL |
121 | # mapping of ports to IP addresses for ports |
122 | # we know we will only bind to on a specific IP. | |
123 | # Useful for allowing multiple daemons to bind | |
124 | # to the same port on different IPs on the same node | |
125 | self.port_ips: Dict[str, str] = port_ips or {} | |
f67539c2 TL |
126 | self.ip: Optional[str] = ip |
127 | ||
128 | # values to be populated during generate_config calls | |
129 | # and then used in _run_cephadm | |
130 | self.final_config: Dict[str, Any] = {} | |
131 | self.deps: List[str] = [] | |
f6b5b4d7 | 132 | |
b3b6e05e TL |
133 | self.rank: Optional[int] = rank |
134 | self.rank_generation: Optional[int] = rank_generation | |
135 | ||
20effc67 | 136 | self.extra_container_args = extra_container_args |
39ae355f | 137 | self.extra_entrypoint_args = extra_entrypoint_args |
f51cf556 TL |
138 | self.init_containers = init_containers |
139 | ||
140 | def __setattr__(self, name: str, value: Any) -> None: | |
141 | if value is not None and name in ('extra_container_args', 'extra_entrypoint_args'): | |
142 | for v in value: | |
143 | tname = str(type(v)) | |
144 | if 'ArgumentSpec' not in tname: | |
145 | raise TypeError(f"{name} is not all ArgumentSpec values: {v!r}(is {type(v)} in {value!r}") | |
146 | ||
147 | super().__setattr__(name, value) | |
20effc67 | 148 | |
f6b5b4d7 TL |
149 | def name(self) -> str: |
150 | return '%s.%s' % (self.daemon_type, self.daemon_id) | |
151 | ||
39ae355f TL |
152 | def entity_name(self) -> str: |
153 | return get_auth_entity(self.daemon_type, self.daemon_id, host=self.host) | |
154 | ||
f91f0fd5 TL |
155 | def config_get_files(self) -> Dict[str, Any]: |
156 | files = self.extra_files | |
157 | if self.ceph_conf: | |
158 | files['config'] = self.ceph_conf | |
159 | ||
160 | return files | |
161 | ||
f67539c2 TL |
162 | @staticmethod |
163 | def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec': | |
164 | assert dd.hostname | |
165 | assert dd.daemon_id | |
166 | assert dd.daemon_type | |
167 | return CephadmDaemonDeploySpec( | |
168 | host=dd.hostname, | |
169 | daemon_id=dd.daemon_id, | |
170 | daemon_type=dd.daemon_type, | |
171 | service_name=dd.service_name(), | |
172 | ip=dd.ip, | |
173 | ports=dd.ports, | |
b3b6e05e TL |
174 | rank=dd.rank, |
175 | rank_generation=dd.rank_generation, | |
20effc67 | 176 | extra_container_args=dd.extra_container_args, |
39ae355f | 177 | extra_entrypoint_args=dd.extra_entrypoint_args, |
f67539c2 TL |
178 | ) |
179 | ||
180 | def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription: | |
181 | return DaemonDescription( | |
182 | daemon_type=self.daemon_type, | |
183 | daemon_id=self.daemon_id, | |
b3b6e05e | 184 | service_name=self.service_name, |
f67539c2 TL |
185 | hostname=self.host, |
186 | status=status, | |
187 | status_desc=status_desc, | |
188 | ip=self.ip, | |
189 | ports=self.ports, | |
b3b6e05e TL |
190 | rank=self.rank, |
191 | rank_generation=self.rank_generation, | |
aee94f69 TL |
192 | extra_container_args=cast(GeneralArgList, self.extra_container_args), |
193 | extra_entrypoint_args=cast(GeneralArgList, self.extra_entrypoint_args), | |
f67539c2 TL |
194 | ) |
195 | ||
aee94f69 TL |
196 | @property |
197 | def extra_args(self) -> List[str]: | |
198 | return [] | |
199 | ||
f6b5b4d7 TL |
200 | |
201 | class CephadmService(metaclass=ABCMeta): | |
e306af50 TL |
202 | """ |
203 | Base class for service types. Often providing a create() and config() fn. | |
204 | """ | |
f6b5b4d7 TL |
205 | |
206 | @property | |
207 | @abstractmethod | |
f91f0fd5 | 208 | def TYPE(self) -> str: |
f6b5b4d7 TL |
209 | pass |
210 | ||
e306af50 TL |
211 | def __init__(self, mgr: "CephadmOrchestrator"): |
212 | self.mgr: "CephadmOrchestrator" = mgr | |
213 | ||
f67539c2 | 214 | def allow_colo(self) -> bool: |
b3b6e05e TL |
215 | """ |
216 | Return True if multiple daemons of the same type can colocate on | |
217 | the same host. | |
218 | """ | |
f67539c2 TL |
219 | return False |
220 | ||
1e59de90 | 221 | def primary_daemon_type(self, spec: Optional[ServiceSpec] = None) -> str: |
b3b6e05e TL |
222 | """ |
223 | This is the type of the primary (usually only) daemon to be deployed. | |
224 | """ | |
225 | return self.TYPE | |
226 | ||
1e59de90 | 227 | def per_host_daemon_type(self, spec: Optional[ServiceSpec] = None) -> Optional[str]: |
b3b6e05e TL |
228 | """ |
229 | If defined, this type of daemon will be deployed once for each host | |
230 | containing one or more daemons of the primary type. | |
231 | """ | |
f67539c2 TL |
232 | return None |
233 | ||
b3b6e05e TL |
234 | def ranked(self) -> bool: |
235 | """ | |
236 | If True, we will assign a stable rank (0, 1, ...) and monotonically increasing | |
237 | generation (0, 1, ...) to each daemon we create/deploy. | |
238 | """ | |
239 | return False | |
240 | ||
241 | def fence_old_ranks(self, | |
242 | spec: ServiceSpec, | |
243 | rank_map: Dict[int, Dict[int, Optional[str]]], | |
244 | num_ranks: int) -> None: | |
245 | assert False | |
f67539c2 TL |
246 | |
247 | def make_daemon_spec( | |
b3b6e05e TL |
248 | self, |
249 | host: str, | |
f67539c2 TL |
250 | daemon_id: str, |
251 | network: str, | |
252 | spec: ServiceSpecs, | |
253 | daemon_type: Optional[str] = None, | |
254 | ports: Optional[List[int]] = None, | |
255 | ip: Optional[str] = None, | |
b3b6e05e TL |
256 | rank: Optional[int] = None, |
257 | rank_generation: Optional[int] = None, | |
f67539c2 TL |
258 | ) -> CephadmDaemonDeploySpec: |
259 | return CephadmDaemonDeploySpec( | |
f6b5b4d7 TL |
260 | host=host, |
261 | daemon_id=daemon_id, | |
f67539c2 TL |
262 | service_name=spec.service_name(), |
263 | network=network, | |
264 | daemon_type=daemon_type, | |
265 | ports=ports, | |
266 | ip=ip, | |
b3b6e05e TL |
267 | rank=rank, |
268 | rank_generation=rank_generation, | |
2a845540 TL |
269 | extra_container_args=spec.extra_container_args if hasattr( |
270 | spec, 'extra_container_args') else None, | |
39ae355f TL |
271 | extra_entrypoint_args=spec.extra_entrypoint_args if hasattr( |
272 | spec, 'extra_entrypoint_args') else None, | |
f51cf556 | 273 | init_containers=getattr(spec, 'init_containers', None), |
f6b5b4d7 TL |
274 | ) |
275 | ||
f67539c2 | 276 | def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: |
f6b5b4d7 TL |
277 | raise NotImplementedError() |
278 | ||
f67539c2 | 279 | def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: |
f91f0fd5 | 280 | raise NotImplementedError() |
f6b5b4d7 | 281 | |
522d829b | 282 | def config(self, spec: ServiceSpec) -> None: |
f67539c2 TL |
283 | """ |
284 | Configure the cluster for this service. Only called *once* per | |
285 | service apply. Not for every daemon. | |
286 | """ | |
287 | pass | |
288 | ||
f91f0fd5 | 289 | def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None: |
e306af50 | 290 | """The post actions needed to be done after daemons are checked""" |
f6b5b4d7 TL |
291 | if self.mgr.config_dashboard: |
292 | if 'dashboard' in self.mgr.get('mgr_map')['modules']: | |
293 | self.config_dashboard(daemon_descrs) | |
294 | else: | |
295 | logger.debug('Dashboard is not enabled. Skip configuration.') | |
296 | ||
f91f0fd5 | 297 | def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: |
f6b5b4d7 | 298 | """Config dashboard settings.""" |
e306af50 TL |
299 | raise NotImplementedError() |
300 | ||
301 | def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription: | |
1e59de90 | 302 | # if this is called for a service type where it hasn't explicitly been |
f6b5b4d7 TL |
303 | # defined, return empty Daemon Desc |
304 | return DaemonDescription() | |
e306af50 | 305 | |
f67539c2 TL |
306 | def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str: |
307 | ret, keyring, err = self.mgr.mon_command({ | |
308 | 'prefix': 'auth get-or-create', | |
309 | 'entity': entity, | |
310 | 'caps': caps, | |
311 | }) | |
312 | if err: | |
313 | ret, out, err = self.mgr.mon_command({ | |
314 | 'prefix': 'auth caps', | |
315 | 'entity': entity, | |
316 | 'caps': caps, | |
317 | }) | |
318 | if err: | |
319 | self.mgr.log.warning(f"Unable to update caps for {entity}") | |
39ae355f TL |
320 | |
321 | # get keyring anyway | |
322 | ret, keyring, err = self.mgr.mon_command({ | |
323 | 'prefix': 'auth get', | |
324 | 'entity': entity, | |
325 | }) | |
326 | if err: | |
327 | raise OrchestratorError(f"Unable to fetch keyring for {entity}: {err}") | |
f51cf556 | 328 | return simplified_keyring(entity, keyring) |
f67539c2 | 329 | |
33c7a0ef TL |
330 | def _inventory_get_fqdn(self, hostname: str) -> str: |
331 | """Get a host's FQDN with its hostname. | |
332 | ||
333 | If the FQDN can't be resolved, the address from the inventory will | |
334 | be returned instead. | |
335 | """ | |
336 | addr = self.mgr.inventory.get_addr(hostname) | |
337 | return socket.getfqdn(addr) | |
e306af50 TL |
338 | |
339 | def _set_service_url_on_dashboard(self, | |
340 | service_name: str, | |
341 | get_mon_cmd: str, | |
342 | set_mon_cmd: str, | |
f91f0fd5 | 343 | service_url: str) -> None: |
f6b5b4d7 TL |
344 | """A helper to get and set service_url via Dashboard's MON command. |
345 | ||
346 | If result of get_mon_cmd differs from service_url, set_mon_cmd will | |
347 | be sent to set the service_url. | |
348 | """ | |
349 | def get_set_cmd_dicts(out: str) -> List[dict]: | |
350 | cmd_dict = { | |
351 | 'prefix': set_mon_cmd, | |
352 | 'value': service_url | |
353 | } | |
354 | return [cmd_dict] if service_url != out else [] | |
355 | ||
356 | self._check_and_set_dashboard( | |
357 | service_name=service_name, | |
358 | get_cmd=get_mon_cmd, | |
359 | get_set_cmd_dicts=get_set_cmd_dicts | |
360 | ) | |
361 | ||
362 | def _check_and_set_dashboard(self, | |
363 | service_name: str, | |
364 | get_cmd: str, | |
f91f0fd5 | 365 | get_set_cmd_dicts: Callable[[str], List[dict]]) -> None: |
f6b5b4d7 TL |
366 | """A helper to set configs in the Dashboard. |
367 | ||
368 | The method is useful for the pattern: | |
369 | - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI | |
370 | gateways. | |
371 | - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string. | |
372 | - Determine if the config need to be update. NOTE: This step is important because if a | |
373 | Dashboard command modified Ceph config, cephadm's config_notify() is called. Which | |
374 | kicks the serve() loop and the logic using this method is likely to be called again. | |
375 | A config should be updated only when needed. | |
376 | - Update a config in Dashboard by using a Dashboard command. | |
377 | ||
378 | :param service_name: the service name to be used for logging | |
379 | :type service_name: str | |
380 | :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url | |
381 | :type get_cmd: str | |
382 | :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary. | |
383 | e.g. | |
384 | [ | |
385 | { | |
386 | 'prefix': 'dashboard iscsi-gateway-add', | |
387 | 'service_url': 'http://admin:admin@aaa:5000', | |
388 | 'name': 'aaa' | |
389 | }, | |
390 | { | |
391 | 'prefix': 'dashboard iscsi-gateway-add', | |
392 | 'service_url': 'http://admin:admin@bbb:5000', | |
393 | 'name': 'bbb' | |
394 | } | |
395 | ] | |
396 | The function should return empty list if no command need to be sent. | |
397 | :type get_set_cmd_dicts: Callable[[str], List[dict]] | |
398 | """ | |
399 | ||
e306af50 TL |
400 | try: |
401 | _, out, _ = self.mgr.check_mon_command({ | |
f6b5b4d7 | 402 | 'prefix': get_cmd |
e306af50 TL |
403 | }) |
404 | except MonCommandFailed as e: | |
f6b5b4d7 | 405 | logger.warning('Failed to get Dashboard config for %s: %s', service_name, e) |
e306af50 | 406 | return |
f6b5b4d7 TL |
407 | cmd_dicts = get_set_cmd_dicts(out.strip()) |
408 | for cmd_dict in list(cmd_dicts): | |
e306af50 | 409 | try: |
cd265ab1 TL |
410 | inbuf = cmd_dict.pop('inbuf', None) |
411 | _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf) | |
e306af50 | 412 | except MonCommandFailed as e: |
f6b5b4d7 TL |
413 | logger.warning('Failed to set Dashboard config for %s: %s', service_name, e) |
414 | ||
f67539c2 TL |
415 | def ok_to_stop_osd( |
416 | self, | |
417 | osds: List[str], | |
418 | known: Optional[List[str]] = None, # output argument | |
419 | force: bool = False) -> HandleCommandResult: | |
420 | r = HandleCommandResult(*self.mgr.mon_command({ | |
421 | 'prefix': "osd ok-to-stop", | |
422 | 'ids': osds, | |
423 | 'max': 16, | |
424 | })) | |
425 | j = None | |
426 | try: | |
427 | j = json.loads(r.stdout) | |
428 | except json.decoder.JSONDecodeError: | |
429 | self.mgr.log.warning("osd ok-to-stop didn't return structured result") | |
430 | raise | |
431 | if r.retval: | |
432 | return r | |
433 | if known is not None and j and j.get('ok_to_stop'): | |
434 | self.mgr.log.debug(f"got {j}") | |
435 | known.extend([f'osd.{x}' for x in j.get('osds', [])]) | |
436 | return HandleCommandResult( | |
437 | 0, | |
438 | f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart', | |
439 | '' | |
440 | ) | |
441 | ||
442 | def ok_to_stop( | |
443 | self, | |
444 | daemon_ids: List[str], | |
445 | force: bool = False, | |
446 | known: Optional[List[str]] = None # output argument | |
447 | ) -> HandleCommandResult: | |
f6b5b4d7 | 448 | names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids] |
f67539c2 TL |
449 | out = f'It appears safe to stop {",".join(names)}' |
450 | err = f'It is NOT safe to stop {",".join(names)} at this time' | |
f6b5b4d7 TL |
451 | |
452 | if self.TYPE not in ['mon', 'osd', 'mds']: | |
f67539c2 TL |
453 | logger.debug(out) |
454 | return HandleCommandResult(0, out) | |
455 | ||
456 | if self.TYPE == 'osd': | |
457 | return self.ok_to_stop_osd(daemon_ids, known, force) | |
f6b5b4d7 TL |
458 | |
459 | r = HandleCommandResult(*self.mgr.mon_command({ | |
460 | 'prefix': f'{self.TYPE} ok-to-stop', | |
461 | 'ids': daemon_ids, | |
462 | })) | |
463 | ||
464 | if r.retval: | |
465 | err = f'{err}: {r.stderr}' if r.stderr else err | |
f67539c2 | 466 | logger.debug(err) |
f6b5b4d7 | 467 | return HandleCommandResult(r.retval, r.stdout, err) |
e306af50 | 468 | |
f6b5b4d7 | 469 | out = f'{out}: {r.stdout}' if r.stdout else out |
f67539c2 | 470 | logger.debug(out) |
f6b5b4d7 TL |
471 | return HandleCommandResult(r.retval, out, r.stderr) |
472 | ||
f67539c2 TL |
473 | def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]: |
474 | # Provides a warning about if it possible or not to stop <n> daemons in a service | |
475 | names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids] | |
476 | number_of_running_daemons = len( | |
477 | [daemon | |
478 | for daemon in self.mgr.cache.get_daemons_by_type(daemon_type) | |
479 | if daemon.status == DaemonDescriptionStatus.running]) | |
480 | if (number_of_running_daemons - len(daemon_ids)) >= low_limit: | |
481 | return False, f'It is presumed safe to stop {names}' | |
482 | ||
483 | num_daemons_left = number_of_running_daemons - len(daemon_ids) | |
484 | ||
485 | def plural(count: int) -> str: | |
486 | return 'daemon' if count == 1 else 'daemons' | |
487 | ||
488 | left_count = "no" if num_daemons_left == 0 else num_daemons_left | |
489 | ||
490 | if alert: | |
491 | out = (f'ALERT: Cannot stop {names} in {service} service. ' | |
492 | f'Not enough remaining {service} daemons. ' | |
493 | f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ') | |
494 | else: | |
495 | out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. ' | |
496 | f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. ' | |
497 | f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ') | |
498 | return True, out | |
499 | ||
f91f0fd5 | 500 | def pre_remove(self, daemon: DaemonDescription) -> None: |
f6b5b4d7 TL |
501 | """ |
502 | Called before the daemon is removed. | |
503 | """ | |
f67539c2 TL |
504 | assert daemon.daemon_type is not None |
505 | assert self.TYPE == daemon_type_to_service(daemon.daemon_type) | |
f91f0fd5 TL |
506 | logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}') |
507 | ||
a4b75251 | 508 | def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: |
f91f0fd5 TL |
509 | """ |
510 | Called after the daemon is removed. | |
511 | """ | |
f67539c2 TL |
512 | assert daemon.daemon_type is not None |
513 | assert self.TYPE == daemon_type_to_service(daemon.daemon_type) | |
f91f0fd5 TL |
514 | logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}') |
515 | ||
f67539c2 TL |
516 | def purge(self, service_name: str) -> None: |
517 | """Called to carry out any purge tasks following service removal""" | |
518 | logger.debug(f'Purge called for {self.TYPE} - no action taken') | |
519 | ||
f91f0fd5 TL |
520 | |
521 | class CephService(CephadmService): | |
f67539c2 | 522 | def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: |
f91f0fd5 TL |
523 | # Ceph.daemons (mon, mgr, mds, osd, etc) |
524 | cephadm_config = self.get_config_and_keyring( | |
525 | daemon_spec.daemon_type, | |
526 | daemon_spec.daemon_id, | |
527 | host=daemon_spec.host, | |
528 | keyring=daemon_spec.keyring, | |
529 | extra_ceph_config=daemon_spec.ceph_conf) | |
530 | ||
531 | if daemon_spec.config_get_files(): | |
532 | cephadm_config.update({'files': daemon_spec.config_get_files()}) | |
533 | ||
534 | return cephadm_config, [] | |
535 | ||
a4b75251 TL |
536 | def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: |
537 | super().post_remove(daemon, is_failed_deploy=is_failed_deploy) | |
f91f0fd5 | 538 | self.remove_keyring(daemon) |
e306af50 | 539 | |
f91f0fd5 | 540 | def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity: |
39ae355f | 541 | return get_auth_entity(self.TYPE, daemon_id, host=host) |
f91f0fd5 TL |
542 | |
543 | def get_config_and_keyring(self, | |
544 | daemon_type: str, | |
545 | daemon_id: str, | |
546 | host: str, | |
547 | keyring: Optional[str] = None, | |
548 | extra_ceph_config: Optional[str] = None | |
549 | ) -> Dict[str, Any]: | |
550 | # keyring | |
551 | if not keyring: | |
552 | entity: AuthEntity = self.get_auth_entity(daemon_id, host=host) | |
553 | ret, keyring, err = self.mgr.check_mon_command({ | |
554 | 'prefix': 'auth get', | |
555 | 'entity': entity, | |
556 | }) | |
f91f0fd5 TL |
557 | config = self.mgr.get_minimal_ceph_conf() |
558 | ||
559 | if extra_ceph_config: | |
560 | config += extra_ceph_config | |
561 | ||
562 | return { | |
563 | 'config': config, | |
564 | 'keyring': keyring, | |
565 | } | |
566 | ||
567 | def remove_keyring(self, daemon: DaemonDescription) -> None: | |
f67539c2 TL |
568 | assert daemon.daemon_id is not None |
569 | assert daemon.hostname is not None | |
f91f0fd5 TL |
570 | daemon_id: str = daemon.daemon_id |
571 | host: str = daemon.hostname | |
572 | ||
a4b75251 | 573 | assert daemon.daemon_type != 'mon' |
f91f0fd5 TL |
574 | |
575 | entity = self.get_auth_entity(daemon_id, host=host) | |
576 | ||
f67539c2 TL |
577 | logger.info(f'Removing key for {entity}') |
578 | ret, out, err = self.mgr.mon_command({ | |
f91f0fd5 TL |
579 | 'prefix': 'auth rm', |
580 | 'entity': entity, | |
581 | }) | |
582 | ||
583 | ||
584 | class MonService(CephService): | |
f6b5b4d7 TL |
585 | TYPE = 'mon' |
586 | ||
f67539c2 | 587 | def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: |
e306af50 TL |
588 | """ |
589 | Create a new monitor on the given host. | |
590 | """ | |
f6b5b4d7 | 591 | assert self.TYPE == daemon_spec.daemon_type |
f67539c2 | 592 | name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network |
f6b5b4d7 | 593 | |
e306af50 TL |
594 | # get mon. key |
595 | ret, keyring, err = self.mgr.check_mon_command({ | |
596 | 'prefix': 'auth get', | |
39ae355f | 597 | 'entity': daemon_spec.entity_name(), |
e306af50 TL |
598 | }) |
599 | ||
600 | extra_config = '[mon.%s]\n' % name | |
601 | if network: | |
602 | # infer whether this is a CIDR network, addrvec, or plain IP | |
603 | if '/' in network: | |
604 | extra_config += 'public network = %s\n' % network | |
605 | elif network.startswith('[v') and network.endswith(']'): | |
606 | extra_config += 'public addrv = %s\n' % network | |
f91f0fd5 TL |
607 | elif is_ipv6(network): |
608 | extra_config += 'public addr = %s\n' % unwrap_ipv6(network) | |
e306af50 TL |
609 | elif ':' not in network: |
610 | extra_config += 'public addr = %s\n' % network | |
611 | else: | |
f91f0fd5 TL |
612 | raise OrchestratorError( |
613 | 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network) | |
e306af50 TL |
614 | else: |
615 | # try to get the public_network from the config | |
616 | ret, network, err = self.mgr.check_mon_command({ | |
617 | 'prefix': 'config get', | |
618 | 'who': 'mon', | |
619 | 'key': 'public_network', | |
620 | }) | |
f6b5b4d7 | 621 | network = network.strip() if network else network |
e306af50 | 622 | if not network: |
f91f0fd5 TL |
623 | raise OrchestratorError( |
624 | 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP') | |
e306af50 | 625 | if '/' not in network: |
f91f0fd5 TL |
626 | raise OrchestratorError( |
627 | 'public_network is set but does not look like a CIDR network: \'%s\'' % network) | |
e306af50 TL |
628 | extra_config += 'public network = %s\n' % network |
629 | ||
f91f0fd5 TL |
630 | daemon_spec.ceph_conf = extra_config |
631 | daemon_spec.keyring = keyring | |
f6b5b4d7 | 632 | |
f67539c2 TL |
633 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) |
634 | ||
f91f0fd5 | 635 | return daemon_spec |
f6b5b4d7 | 636 | |
1e59de90 TL |
637 | def config(self, spec: ServiceSpec) -> None: |
638 | assert self.TYPE == spec.service_type | |
639 | self.set_crush_locations(self.mgr.cache.get_daemons_by_type('mon'), spec) | |
640 | ||
641 | def _get_quorum_status(self) -> Dict[Any, Any]: | |
f6b5b4d7 TL |
642 | ret, out, err = self.mgr.check_mon_command({ |
643 | 'prefix': 'quorum_status', | |
644 | }) | |
645 | try: | |
646 | j = json.loads(out) | |
1e59de90 TL |
647 | except Exception as e: |
648 | raise OrchestratorError(f'failed to parse mon quorum status: {e}') | |
649 | return j | |
f6b5b4d7 | 650 | |
1e59de90 TL |
651 | def _check_safe_to_destroy(self, mon_id: str) -> None: |
652 | quorum_status = self._get_quorum_status() | |
653 | mons = [m['name'] for m in quorum_status['monmap']['mons']] | |
f6b5b4d7 TL |
654 | if mon_id not in mons: |
655 | logger.info('Safe to remove mon.%s: not in monmap (%s)' % ( | |
656 | mon_id, mons)) | |
657 | return | |
658 | new_mons = [m for m in mons if m != mon_id] | |
1e59de90 | 659 | new_quorum = [m for m in quorum_status['quorum_names'] if m != mon_id] |
f6b5b4d7 | 660 | if len(new_quorum) > len(new_mons) / 2: |
f91f0fd5 TL |
661 | logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' % |
662 | (mon_id, new_quorum, new_mons)) | |
f6b5b4d7 | 663 | return |
f91f0fd5 TL |
664 | raise OrchestratorError( |
665 | 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons)) | |
f6b5b4d7 | 666 | |
f91f0fd5 TL |
667 | def pre_remove(self, daemon: DaemonDescription) -> None: |
668 | super().pre_remove(daemon) | |
f6b5b4d7 | 669 | |
f67539c2 | 670 | assert daemon.daemon_id is not None |
f91f0fd5 | 671 | daemon_id: str = daemon.daemon_id |
f6b5b4d7 TL |
672 | self._check_safe_to_destroy(daemon_id) |
673 | ||
674 | # remove mon from quorum before we destroy the daemon | |
675 | logger.info('Removing monitor %s from monmap...' % daemon_id) | |
676 | ret, out, err = self.mgr.check_mon_command({ | |
677 | 'prefix': 'mon rm', | |
678 | 'name': daemon_id, | |
679 | }) | |
e306af50 | 680 | |
a4b75251 TL |
681 | def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: |
682 | # Do not remove the mon keyring. | |
683 | # super().post_remove(daemon) | |
684 | pass | |
685 | ||
1e59de90 TL |
686 | def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: |
687 | daemon_spec.final_config, daemon_spec.deps = super().generate_config(daemon_spec) | |
688 | ||
689 | # realistically, we expect there to always be a mon spec | |
690 | # in a real deployment, but the way teuthology deploys some daemons | |
691 | # it's possible there might not be. For that reason we need to | |
692 | # verify the service is present in the spec store. | |
693 | if daemon_spec.service_name in self.mgr.spec_store: | |
694 | mon_spec = cast(MONSpec, self.mgr.spec_store[daemon_spec.service_name].spec) | |
695 | if mon_spec.crush_locations: | |
696 | if daemon_spec.host in mon_spec.crush_locations: | |
697 | # the --crush-location flag only supports a single bucket=loc pair so | |
698 | # others will have to be handled later. The idea is to set the flag | |
699 | # for the first bucket=loc pair in the list in order to facilitate | |
700 | # replacing a tiebreaker mon (https://docs.ceph.com/en/quincy/rados/operations/stretch-mode/#other-commands) | |
701 | c_loc = mon_spec.crush_locations[daemon_spec.host][0] | |
702 | daemon_spec.final_config['crush_location'] = c_loc | |
703 | ||
704 | return daemon_spec.final_config, daemon_spec.deps | |
705 | ||
706 | def set_crush_locations(self, daemon_descrs: List[DaemonDescription], spec: ServiceSpec) -> None: | |
707 | logger.debug('Setting mon crush locations from spec') | |
708 | if not daemon_descrs: | |
709 | return | |
710 | assert self.TYPE == spec.service_type | |
711 | mon_spec = cast(MONSpec, spec) | |
712 | ||
713 | if not mon_spec.crush_locations: | |
714 | return | |
715 | ||
716 | quorum_status = self._get_quorum_status() | |
717 | mons_in_monmap = [m['name'] for m in quorum_status['monmap']['mons']] | |
718 | for dd in daemon_descrs: | |
719 | assert dd.daemon_id is not None | |
720 | assert dd.hostname is not None | |
721 | if dd.hostname not in mon_spec.crush_locations: | |
722 | continue | |
723 | if dd.daemon_id not in mons_in_monmap: | |
724 | continue | |
725 | # expected format for crush_locations from the quorum status is | |
726 | # {bucket1=loc1,bucket2=loc2} etc. for the number of bucket=loc pairs | |
727 | try: | |
728 | current_crush_locs = [m['crush_location'] for m in quorum_status['monmap']['mons'] if m['name'] == dd.daemon_id][0] | |
729 | except (KeyError, IndexError) as e: | |
730 | logger.warning(f'Failed setting crush location for mon {dd.daemon_id}: {e}\n' | |
731 | 'Mon may not have a monmap entry yet. Try re-applying mon spec once mon is confirmed up.') | |
732 | desired_crush_locs = '{' + ','.join(mon_spec.crush_locations[dd.hostname]) + '}' | |
733 | logger.debug(f'Found spec defined crush locations for mon on {dd.hostname}: {desired_crush_locs}') | |
734 | logger.debug(f'Current crush locations for mon on {dd.hostname}: {current_crush_locs}') | |
735 | if current_crush_locs != desired_crush_locs: | |
736 | logger.info(f'Setting crush location for mon {dd.daemon_id} to {desired_crush_locs}') | |
737 | try: | |
738 | ret, out, err = self.mgr.check_mon_command({ | |
739 | 'prefix': 'mon set_location', | |
740 | 'name': dd.daemon_id, | |
741 | 'args': mon_spec.crush_locations[dd.hostname] | |
742 | }) | |
743 | except Exception as e: | |
744 | logger.error(f'Failed setting crush location for mon {dd.daemon_id}: {e}') | |
745 | ||
e306af50 | 746 | |
f91f0fd5 | 747 | class MgrService(CephService): |
f6b5b4d7 TL |
748 | TYPE = 'mgr' |
749 | ||
b3b6e05e TL |
750 | def allow_colo(self) -> bool: |
751 | if self.mgr.get_ceph_option('mgr_standby_modules'): | |
752 | # traditional mgr mode: standby daemons' modules listen on | |
753 | # ports and redirect to the primary. we must not schedule | |
754 | # multiple mgrs on the same host or else ports will | |
755 | # conflict. | |
756 | return False | |
757 | else: | |
758 | # standby daemons do nothing, and therefore port conflicts | |
759 | # are not a concern. | |
760 | return True | |
761 | ||
f67539c2 | 762 | def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: |
e306af50 TL |
763 | """ |
764 | Create a new manager instance on a host. | |
765 | """ | |
f6b5b4d7 | 766 | assert self.TYPE == daemon_spec.daemon_type |
f67539c2 | 767 | mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host |
f6b5b4d7 | 768 | |
e306af50 | 769 | # get mgr. key |
f67539c2 TL |
770 | keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id), |
771 | ['mon', 'profile mgr', | |
772 | 'osd', 'allow *', | |
773 | 'mds', 'allow *']) | |
e306af50 | 774 | |
f6b5b4d7 TL |
775 | # Retrieve ports used by manager modules |
776 | # In the case of the dashboard port and with several manager daemons | |
777 | # running in different hosts, it exists the possibility that the | |
778 | # user has decided to use different dashboard ports in each server | |
779 | # If this is the case then the dashboard port opened will be only the used | |
780 | # as default. | |
781 | ports = [] | |
f6b5b4d7 | 782 | ret, mgr_services, err = self.mgr.check_mon_command({ |
f91f0fd5 | 783 | 'prefix': 'mgr services', |
f6b5b4d7 TL |
784 | }) |
785 | if mgr_services: | |
786 | mgr_endpoints = json.loads(mgr_services) | |
787 | for end_point in mgr_endpoints.values(): | |
f91f0fd5 | 788 | port = re.search(r'\:\d+\/', end_point) |
f6b5b4d7 TL |
789 | if port: |
790 | ports.append(int(port[0][1:-1])) | |
791 | ||
792 | if ports: | |
793 | daemon_spec.ports = ports | |
794 | ||
1e59de90 | 795 | daemon_spec.ports.append(self.mgr.service_discovery_port) |
f6b5b4d7 TL |
796 | daemon_spec.keyring = keyring |
797 | ||
f67539c2 TL |
798 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) |
799 | ||
f91f0fd5 TL |
800 | return daemon_spec |
801 | ||
f6b5b4d7 | 802 | def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription: |
f6b5b4d7 | 803 | for daemon in daemon_descrs: |
f67539c2 TL |
804 | assert daemon.daemon_type is not None |
805 | assert daemon.daemon_id is not None | |
f91f0fd5 | 806 | if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id): |
f6b5b4d7 TL |
807 | return daemon |
808 | # if no active mgr found, return empty Daemon Desc | |
809 | return DaemonDescription() | |
e306af50 | 810 | |
f91f0fd5 | 811 | def fail_over(self) -> None: |
33c7a0ef TL |
812 | # this has been seen to sometimes transiently fail even when there are multiple |
813 | # mgr daemons. As long as there are multiple known mgr daemons, we should retry. | |
814 | class NoStandbyError(OrchestratorError): | |
815 | pass | |
816 | no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=( | |
817 | 'daemon', 'mgr' + self.mgr.get_mgr_id())) | |
818 | for sleep_secs in [2, 8, 15]: | |
819 | try: | |
820 | if not self.mgr_map_has_standby(): | |
821 | raise no_standby_exc | |
822 | self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(), | |
823 | 'INFO', 'Failing over to other MGR') | |
824 | logger.info('Failing over to other MGR') | |
825 | ||
826 | # fail over | |
827 | ret, out, err = self.mgr.check_mon_command({ | |
828 | 'prefix': 'mgr fail', | |
829 | 'who': self.mgr.get_mgr_id(), | |
830 | }) | |
831 | return | |
832 | except NoStandbyError: | |
833 | logger.info( | |
834 | f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds') | |
835 | time.sleep(sleep_secs) | |
836 | raise no_standby_exc | |
f91f0fd5 TL |
837 | |
838 | def mgr_map_has_standby(self) -> bool: | |
839 | """ | |
840 | This is a bit safer than asking our inventory. If the mgr joined the mgr map, | |
841 | we know it joined the cluster | |
842 | """ | |
843 | mgr_map = self.mgr.get('mgr_map') | |
844 | num = len(mgr_map.get('standbys')) | |
845 | return bool(num) | |
846 | ||
f67539c2 TL |
847 | def ok_to_stop( |
848 | self, | |
849 | daemon_ids: List[str], | |
850 | force: bool = False, | |
851 | known: Optional[List[str]] = None # output argument | |
852 | ) -> HandleCommandResult: | |
853 | # ok to stop if there is more than 1 mgr and not trying to stop the active mgr | |
854 | ||
855 | warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True) | |
856 | if warn: | |
857 | return HandleCommandResult(-errno.EBUSY, '', warn_message) | |
858 | ||
859 | mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE) | |
860 | active = self.get_active_daemon(mgr_daemons).daemon_id | |
861 | if active in daemon_ids: | |
862 | warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active | |
863 | return HandleCommandResult(-errno.EBUSY, '', warn_message) | |
864 | ||
865 | return HandleCommandResult(0, warn_message, '') | |
866 | ||
e306af50 | 867 | |
f91f0fd5 | 868 | class MdsService(CephService): |
f6b5b4d7 TL |
869 | TYPE = 'mds' |
870 | ||
f67539c2 TL |
871 | def allow_colo(self) -> bool: |
872 | return True | |
873 | ||
522d829b | 874 | def config(self, spec: ServiceSpec) -> None: |
f6b5b4d7 | 875 | assert self.TYPE == spec.service_type |
e306af50 | 876 | assert spec.service_id |
f6b5b4d7 TL |
877 | |
878 | # ensure mds_join_fs is set for these daemons | |
e306af50 TL |
879 | ret, out, err = self.mgr.check_mon_command({ |
880 | 'prefix': 'config set', | |
881 | 'who': 'mds.' + spec.service_id, | |
882 | 'name': 'mds_join_fs', | |
883 | 'value': spec.service_id, | |
884 | }) | |
885 | ||
f67539c2 | 886 | def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: |
f6b5b4d7 | 887 | assert self.TYPE == daemon_spec.daemon_type |
f67539c2 | 888 | mds_id, _ = daemon_spec.daemon_id, daemon_spec.host |
f6b5b4d7 | 889 | |
f67539c2 TL |
890 | # get mds. key |
891 | keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id), | |
892 | ['mon', 'profile mds', | |
893 | 'osd', 'allow rw tag cephfs *=*', | |
894 | 'mds', 'allow']) | |
f6b5b4d7 TL |
895 | daemon_spec.keyring = keyring |
896 | ||
f67539c2 TL |
897 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) |
898 | ||
f91f0fd5 TL |
899 | return daemon_spec |
900 | ||
f6b5b4d7 TL |
901 | def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription: |
902 | active_mds_strs = list() | |
903 | for fs in self.mgr.get('fs_map')['filesystems']: | |
904 | mds_map = fs['mdsmap'] | |
905 | if mds_map is not None: | |
906 | for mds_id, mds_status in mds_map['info'].items(): | |
907 | if mds_status['state'] == 'up:active': | |
908 | active_mds_strs.append(mds_status['name']) | |
909 | if len(active_mds_strs) != 0: | |
910 | for daemon in daemon_descrs: | |
911 | if daemon.daemon_id in active_mds_strs: | |
912 | return daemon | |
913 | # if no mds found, return empty Daemon Desc | |
914 | return DaemonDescription() | |
e306af50 | 915 | |
f67539c2 TL |
916 | def purge(self, service_name: str) -> None: |
917 | self.mgr.check_mon_command({ | |
918 | 'prefix': 'config rm', | |
919 | 'who': service_name, | |
920 | 'name': 'mds_join_fs', | |
921 | }) | |
922 | ||
e306af50 | 923 | |
f91f0fd5 | 924 | class RgwService(CephService): |
f6b5b4d7 TL |
925 | TYPE = 'rgw' |
926 | ||
f67539c2 TL |
927 | def allow_colo(self) -> bool: |
928 | return True | |
f6b5b4d7 | 929 | |
522d829b | 930 | def config(self, spec: RGWSpec) -> None: # type: ignore |
f67539c2 | 931 | assert self.TYPE == spec.service_type |
f6b5b4d7 | 932 | |
1e59de90 | 933 | # set rgw_realm rgw_zonegroup and rgw_zone, if present |
f67539c2 TL |
934 | if spec.rgw_realm: |
935 | ret, out, err = self.mgr.check_mon_command({ | |
936 | 'prefix': 'config set', | |
937 | 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}", | |
938 | 'name': 'rgw_realm', | |
939 | 'value': spec.rgw_realm, | |
940 | }) | |
1e59de90 TL |
941 | if spec.rgw_zonegroup: |
942 | ret, out, err = self.mgr.check_mon_command({ | |
943 | 'prefix': 'config set', | |
944 | 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}", | |
945 | 'name': 'rgw_zonegroup', | |
946 | 'value': spec.rgw_zonegroup, | |
947 | }) | |
f67539c2 TL |
948 | if spec.rgw_zone: |
949 | ret, out, err = self.mgr.check_mon_command({ | |
950 | 'prefix': 'config set', | |
951 | 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}", | |
952 | 'name': 'rgw_zone', | |
953 | 'value': spec.rgw_zone, | |
954 | }) | |
e306af50 TL |
955 | |
956 | if spec.rgw_frontend_ssl_certificate: | |
957 | if isinstance(spec.rgw_frontend_ssl_certificate, list): | |
958 | cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate) | |
959 | elif isinstance(spec.rgw_frontend_ssl_certificate, str): | |
960 | cert_data = spec.rgw_frontend_ssl_certificate | |
961 | else: | |
962 | raise OrchestratorError( | |
f91f0fd5 TL |
963 | 'Invalid rgw_frontend_ssl_certificate: %s' |
964 | % spec.rgw_frontend_ssl_certificate) | |
e306af50 TL |
965 | ret, out, err = self.mgr.check_mon_command({ |
966 | 'prefix': 'config-key set', | |
f67539c2 | 967 | 'key': f'rgw/cert/{spec.service_name()}', |
e306af50 TL |
968 | 'val': cert_data, |
969 | }) | |
970 | ||
f51cf556 TL |
971 | if spec.zonegroup_hostnames: |
972 | zg_update_cmd = { | |
973 | 'prefix': 'rgw zonegroup modify', | |
974 | 'realm_name': spec.rgw_realm, | |
975 | 'zonegroup_name': spec.rgw_zonegroup, | |
976 | 'zone_name': spec.rgw_zone, | |
977 | 'hostnames': spec.zonegroup_hostnames, | |
978 | } | |
979 | logger.debug(f'rgw cmd: {zg_update_cmd}') | |
980 | ret, out, err = self.mgr.check_mon_command(zg_update_cmd) | |
981 | ||
f67539c2 | 982 | # TODO: fail, if we don't have a spec |
e306af50 TL |
983 | logger.info('Saving service %s spec with placement %s' % ( |
984 | spec.service_name(), spec.placement.pretty_str())) | |
985 | self.mgr.spec_store.save(spec) | |
522d829b | 986 | self.mgr.trigger_connect_dashboard_rgw() |
e306af50 | 987 | |
f67539c2 | 988 | def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: |
f6b5b4d7 | 989 | assert self.TYPE == daemon_spec.daemon_type |
f67539c2 TL |
990 | rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host |
991 | spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec) | |
f6b5b4d7 TL |
992 | |
993 | keyring = self.get_keyring(rgw_id) | |
994 | ||
f67539c2 TL |
995 | if daemon_spec.ports: |
996 | port = daemon_spec.ports[0] | |
997 | else: | |
998 | # this is a redeploy of older instance that doesn't have an explicitly | |
999 | # assigned port, in which case we can assume there is only 1 per host | |
1000 | # and it matches the spec. | |
1001 | port = spec.get_port() | |
1002 | ||
1003 | # configure frontend | |
1004 | args = [] | |
1005 | ftype = spec.rgw_frontend_type or "beast" | |
1006 | if ftype == 'beast': | |
1007 | if spec.ssl: | |
1008 | if daemon_spec.ip: | |
a4b75251 TL |
1009 | args.append( |
1010 | f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}") | |
f67539c2 TL |
1011 | else: |
1012 | args.append(f"ssl_port={port}") | |
1013 | args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}") | |
1014 | else: | |
1015 | if daemon_spec.ip: | |
a4b75251 | 1016 | args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}") |
f67539c2 TL |
1017 | else: |
1018 | args.append(f"port={port}") | |
1019 | elif ftype == 'civetweb': | |
1020 | if spec.ssl: | |
1021 | if daemon_spec.ip: | |
a4b75251 TL |
1022 | # note the 's' suffix on port |
1023 | args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s") | |
f67539c2 TL |
1024 | else: |
1025 | args.append(f"port={port}s") # note the 's' suffix on port | |
1026 | args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}") | |
1027 | else: | |
1028 | if daemon_spec.ip: | |
a4b75251 | 1029 | args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}") |
f67539c2 TL |
1030 | else: |
1031 | args.append(f"port={port}") | |
1e59de90 TL |
1032 | else: |
1033 | raise OrchestratorError(f'Invalid rgw_frontend_type parameter: {ftype}. Valid values are: beast, civetweb.') | |
1034 | ||
1035 | if spec.rgw_frontend_extra_args is not None: | |
1036 | args.extend(spec.rgw_frontend_extra_args) | |
1037 | ||
f67539c2 | 1038 | frontend = f'{ftype} {" ".join(args)}' |
f51cf556 | 1039 | daemon_name = utils.name_to_config_section(daemon_spec.name()) |
f67539c2 TL |
1040 | |
1041 | ret, out, err = self.mgr.check_mon_command({ | |
1042 | 'prefix': 'config set', | |
f51cf556 | 1043 | 'who': daemon_name, |
f67539c2 TL |
1044 | 'name': 'rgw_frontends', |
1045 | 'value': frontend | |
1046 | }) | |
1047 | ||
f51cf556 TL |
1048 | if spec.rgw_user_counters_cache: |
1049 | ret, out, err = self.mgr.check_mon_command({ | |
1050 | 'prefix': 'config set', | |
1051 | 'who': daemon_name, | |
1052 | 'name': 'rgw_user_counters_cache', | |
1053 | 'value': 'true', | |
1054 | }) | |
1055 | if spec.rgw_bucket_counters_cache: | |
1056 | ret, out, err = self.mgr.check_mon_command({ | |
1057 | 'prefix': 'config set', | |
1058 | 'who': daemon_name, | |
1059 | 'name': 'rgw_bucket_counters_cache', | |
1060 | 'value': 'true', | |
1061 | }) | |
1062 | ||
1063 | if spec.rgw_user_counters_cache_size: | |
1064 | ret, out, err = self.mgr.check_mon_command({ | |
1065 | 'prefix': 'config set', | |
1066 | 'who': daemon_name, | |
1067 | 'name': 'rgw_user_counters_cache_size', | |
1068 | 'value': str(spec.rgw_user_counters_cache_size), | |
1069 | }) | |
1070 | ||
1071 | if spec.rgw_bucket_counters_cache_size: | |
1072 | ret, out, err = self.mgr.check_mon_command({ | |
1073 | 'prefix': 'config set', | |
1074 | 'who': daemon_name, | |
1075 | 'name': 'rgw_bucket_counters_cache_size', | |
1076 | 'value': str(spec.rgw_bucket_counters_cache_size), | |
1077 | }) | |
1078 | ||
f6b5b4d7 | 1079 | daemon_spec.keyring = keyring |
f67539c2 | 1080 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) |
f6b5b4d7 | 1081 | |
f91f0fd5 | 1082 | return daemon_spec |
f6b5b4d7 | 1083 | |
f91f0fd5 | 1084 | def get_keyring(self, rgw_id: str) -> str: |
f67539c2 TL |
1085 | keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id), |
1086 | ['mon', 'allow *', | |
1087 | 'mgr', 'allow rw', | |
1088 | 'osd', 'allow rwx tag rgw *=*']) | |
f6b5b4d7 TL |
1089 | return keyring |
1090 | ||
f67539c2 TL |
1091 | def purge(self, service_name: str) -> None: |
1092 | self.mgr.check_mon_command({ | |
1093 | 'prefix': 'config rm', | |
1094 | 'who': utils.name_to_config_section(service_name), | |
1095 | 'name': 'rgw_realm', | |
1096 | }) | |
1097 | self.mgr.check_mon_command({ | |
1098 | 'prefix': 'config rm', | |
1099 | 'who': utils.name_to_config_section(service_name), | |
1100 | 'name': 'rgw_zone', | |
1101 | }) | |
1102 | self.mgr.check_mon_command({ | |
1103 | 'prefix': 'config-key rm', | |
1104 | 'key': f'rgw/cert/{service_name}', | |
1105 | }) | |
522d829b | 1106 | self.mgr.trigger_connect_dashboard_rgw() |
f67539c2 | 1107 | |
a4b75251 TL |
1108 | def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None: |
1109 | super().post_remove(daemon, is_failed_deploy=is_failed_deploy) | |
f67539c2 TL |
1110 | self.mgr.check_mon_command({ |
1111 | 'prefix': 'config rm', | |
1112 | 'who': utils.name_to_config_section(daemon.name()), | |
1113 | 'name': 'rgw_frontends', | |
1114 | }) | |
1115 | ||
1116 | def ok_to_stop( | |
1117 | self, | |
1118 | daemon_ids: List[str], | |
1119 | force: bool = False, | |
1120 | known: Optional[List[str]] = None # output argument | |
1121 | ) -> HandleCommandResult: | |
1122 | # if load balancer (ingress) is present block if only 1 daemon up otherwise ok | |
1123 | # if no load balancer, warn if > 1 daemon, block if only 1 daemon | |
1124 | def ingress_present() -> bool: | |
1125 | running_ingress_daemons = [ | |
1126 | daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1] | |
1127 | running_haproxy_daemons = [ | |
1128 | daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy'] | |
1129 | running_keepalived_daemons = [ | |
1130 | daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived'] | |
1131 | # check that there is at least one haproxy and keepalived daemon running | |
1132 | if running_haproxy_daemons and running_keepalived_daemons: | |
1133 | return True | |
1134 | return False | |
1135 | ||
1136 | # if only 1 rgw, alert user (this is not passable with --force) | |
1137 | warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True) | |
1138 | if warn: | |
1139 | return HandleCommandResult(-errno.EBUSY, '', warn_message) | |
1140 | ||
1141 | # if reached here, there is > 1 rgw daemon. | |
1142 | # Say okay if load balancer present or force flag set | |
1143 | if ingress_present() or force: | |
1144 | return HandleCommandResult(0, warn_message, '') | |
1145 | ||
1146 | # if reached here, > 1 RGW daemon, no load balancer and no force flag. | |
1147 | # Provide warning | |
1148 | warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. " | |
1149 | return HandleCommandResult(-errno.EBUSY, '', warn_message) | |
e306af50 | 1150 | |
522d829b TL |
1151 | def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None: |
1152 | self.mgr.trigger_connect_dashboard_rgw() | |
1153 | ||
e306af50 | 1154 | |
f91f0fd5 | 1155 | class RbdMirrorService(CephService): |
f6b5b4d7 TL |
1156 | TYPE = 'rbd-mirror' |
1157 | ||
f67539c2 TL |
1158 | def allow_colo(self) -> bool: |
1159 | return True | |
1160 | ||
1161 | def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: | |
f6b5b4d7 | 1162 | assert self.TYPE == daemon_spec.daemon_type |
f67539c2 | 1163 | daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host |
f6b5b4d7 | 1164 | |
f67539c2 TL |
1165 | keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id), |
1166 | ['mon', 'profile rbd-mirror', | |
1167 | 'osd', 'profile rbd']) | |
f6b5b4d7 TL |
1168 | |
1169 | daemon_spec.keyring = keyring | |
1170 | ||
f67539c2 TL |
1171 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) |
1172 | ||
f91f0fd5 | 1173 | return daemon_spec |
e306af50 | 1174 | |
f67539c2 TL |
1175 | def ok_to_stop( |
1176 | self, | |
1177 | daemon_ids: List[str], | |
1178 | force: bool = False, | |
1179 | known: Optional[List[str]] = None # output argument | |
1180 | ) -> HandleCommandResult: | |
1181 | # if only 1 rbd-mirror, alert user (this is not passable with --force) | |
1182 | warn, warn_message = self._enough_daemons_to_stop( | |
1183 | self.TYPE, daemon_ids, 'Rbdmirror', 1, True) | |
1184 | if warn: | |
1185 | return HandleCommandResult(-errno.EBUSY, '', warn_message) | |
1186 | return HandleCommandResult(0, warn_message, '') | |
1187 | ||
e306af50 | 1188 | |
f91f0fd5 | 1189 | class CrashService(CephService): |
f6b5b4d7 TL |
1190 | TYPE = 'crash' |
1191 | ||
f67539c2 | 1192 | def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: |
f6b5b4d7 TL |
1193 | assert self.TYPE == daemon_spec.daemon_type |
1194 | daemon_id, host = daemon_spec.daemon_id, daemon_spec.host | |
1195 | ||
f67539c2 TL |
1196 | keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), |
1197 | ['mon', 'profile crash', | |
1198 | 'mgr', 'profile crash']) | |
1199 | ||
1200 | daemon_spec.keyring = keyring | |
1201 | ||
1202 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) | |
1203 | ||
1204 | return daemon_spec | |
1205 | ||
1206 | ||
39ae355f TL |
1207 | class CephExporterService(CephService): |
1208 | TYPE = 'ceph-exporter' | |
1209 | DEFAULT_SERVICE_PORT = 9926 | |
1210 | ||
1211 | def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: | |
1212 | assert self.TYPE == daemon_spec.daemon_type | |
1213 | spec = cast(CephExporterSpec, self.mgr.spec_store[daemon_spec.service_name].spec) | |
1214 | keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_spec.daemon_id), | |
1215 | ['mon', 'profile ceph-exporter', | |
1216 | 'mon', 'allow r', | |
1217 | 'mgr', 'allow r', | |
1218 | 'osd', 'allow r']) | |
1219 | exporter_config = {} | |
1220 | if spec.sock_dir: | |
1221 | exporter_config.update({'sock-dir': spec.sock_dir}) | |
1222 | if spec.port: | |
1223 | exporter_config.update({'port': f'{spec.port}'}) | |
1224 | if spec.prio_limit is not None: | |
1225 | exporter_config.update({'prio-limit': f'{spec.prio_limit}'}) | |
1226 | if spec.stats_period: | |
1227 | exporter_config.update({'stats-period': f'{spec.stats_period}'}) | |
1228 | ||
1229 | daemon_spec.keyring = keyring | |
1230 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) | |
1231 | daemon_spec.final_config = merge_dicts(daemon_spec.final_config, exporter_config) | |
1232 | return daemon_spec | |
1233 | ||
1234 | ||
f67539c2 TL |
1235 | class CephfsMirrorService(CephService): |
1236 | TYPE = 'cephfs-mirror' | |
1237 | ||
20effc67 TL |
1238 | def config(self, spec: ServiceSpec) -> None: |
1239 | # make sure mirroring module is enabled | |
1240 | mgr_map = self.mgr.get('mgr_map') | |
1241 | mod_name = 'mirroring' | |
1242 | if mod_name not in mgr_map.get('services', {}): | |
1243 | self.mgr.check_mon_command({ | |
1244 | 'prefix': 'mgr module enable', | |
1245 | 'module': mod_name | |
1246 | }) | |
1247 | # we shouldn't get here (mon will tell the mgr to respawn), but no | |
1248 | # harm done if we do. | |
1249 | ||
f67539c2 TL |
1250 | def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: |
1251 | assert self.TYPE == daemon_spec.daemon_type | |
1252 | ||
e306af50 TL |
1253 | ret, keyring, err = self.mgr.check_mon_command({ |
1254 | 'prefix': 'auth get-or-create', | |
39ae355f | 1255 | 'entity': daemon_spec.entity_name(), |
b3b6e05e | 1256 | 'caps': ['mon', 'profile cephfs-mirror', |
f67539c2 TL |
1257 | 'mds', 'allow r', |
1258 | 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*', | |
1259 | 'mgr', 'allow r'], | |
e306af50 | 1260 | }) |
f6b5b4d7 TL |
1261 | |
1262 | daemon_spec.keyring = keyring | |
f67539c2 | 1263 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) |
f91f0fd5 | 1264 | return daemon_spec |
20effc67 TL |
1265 | |
1266 | ||
1267 | class CephadmAgent(CephService): | |
1268 | TYPE = 'agent' | |
1269 | ||
1270 | def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec: | |
1271 | assert self.TYPE == daemon_spec.daemon_type | |
1272 | daemon_id, host = daemon_spec.daemon_id, daemon_spec.host | |
1273 | ||
1e59de90 | 1274 | if not self.mgr.http_server.agent: |
20effc67 TL |
1275 | raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint') |
1276 | ||
1277 | keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), []) | |
1278 | daemon_spec.keyring = keyring | |
1279 | self.mgr.agent_cache.agent_keys[host] = keyring | |
1280 | ||
1281 | daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec) | |
1282 | ||
1283 | return daemon_spec | |
1284 | ||
1285 | def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]: | |
1e59de90 | 1286 | agent = self.mgr.http_server.agent |
20effc67 | 1287 | try: |
1e59de90 TL |
1288 | assert agent |
1289 | assert agent.ssl_certs.get_root_cert() | |
1290 | assert agent.server_port | |
20effc67 TL |
1291 | except Exception: |
1292 | raise OrchestratorError( | |
1293 | 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs') | |
1294 | ||
1295 | cfg = {'target_ip': self.mgr.get_mgr_ip(), | |
1e59de90 | 1296 | 'target_port': agent.server_port, |
20effc67 TL |
1297 | 'refresh_period': self.mgr.agent_refresh_rate, |
1298 | 'listener_port': self.mgr.agent_starting_port, | |
1299 | 'host': daemon_spec.host, | |
1300 | 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)} | |
1301 | ||
1e59de90 | 1302 | listener_cert, listener_key = agent.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host)) |
20effc67 TL |
1303 | config = { |
1304 | 'agent.json': json.dumps(cfg), | |
1305 | 'keyring': daemon_spec.keyring, | |
1e59de90 | 1306 | 'root_cert.pem': agent.ssl_certs.get_root_cert(), |
20effc67 TL |
1307 | 'listener.crt': listener_cert, |
1308 | 'listener.key': listener_key, | |
1309 | } | |
1310 | ||
1e59de90 TL |
1311 | return config, sorted([str(self.mgr.get_mgr_ip()), str(agent.server_port), |
1312 | agent.ssl_certs.get_root_cert(), | |
20effc67 | 1313 | str(self.mgr.get_module_option('device_enhanced_scan'))]) |