]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
CommitLineData
f67539c2 1import errno
f6b5b4d7 2import json
e306af50 3import logging
f67539c2 4import re
33c7a0ef
TL
5import socket
6import time
f6b5b4d7 7from abc import ABCMeta, abstractmethod
f67539c2
TL
8from typing import TYPE_CHECKING, List, Callable, TypeVar, \
9 Optional, Dict, Any, Tuple, NewType, cast
e306af50 10
f6b5b4d7 11from mgr_module import HandleCommandResult, MonCommandFailed
e306af50 12
aee94f69
TL
13from ceph.deployment.service_spec import (
14 ArgumentList,
15 CephExporterSpec,
16 GeneralArgList,
f51cf556 17 InitContainerSpec,
aee94f69
TL
18 MONSpec,
19 RGWSpec,
20 ServiceSpec,
21)
f91f0fd5 22from ceph.deployment.utils import is_ipv6, unwrap_ipv6
39ae355f 23from mgr_util import build_url, merge_dicts
f67539c2
TL
24from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
25from orchestrator._interface import daemon_type_to_service
e306af50
TL
26from cephadm import utils
27
28if TYPE_CHECKING:
29 from cephadm.module import CephadmOrchestrator
30
31logger = logging.getLogger(__name__)
32
f6b5b4d7 33ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
f91f0fd5 34AuthEntity = NewType('AuthEntity', str)
e306af50 35
f6b5b4d7 36
39ae355f
TL
37def get_auth_entity(daemon_type: str, daemon_id: str, host: str = "") -> AuthEntity:
38 """
39 Map the daemon id to a cephx keyring entity name
40 """
41 # despite this mapping entity names to daemons, self.TYPE within
42 # the CephService class refers to service types, not daemon types
aee94f69 43 if daemon_type in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'nvmeof', 'ingress', 'ceph-exporter']:
39ae355f 44 return AuthEntity(f'client.{daemon_type}.{daemon_id}')
f38dd50b 45 elif daemon_type in ['crash', 'agent', 'node-proxy']:
39ae355f
TL
46 if host == "":
47 raise OrchestratorError(
48 f'Host not provided to generate <{daemon_type}> auth entity name')
49 return AuthEntity(f'client.{daemon_type}.{host}')
50 elif daemon_type == 'mon':
51 return AuthEntity('mon.')
52 elif daemon_type in ['mgr', 'osd', 'mds']:
53 return AuthEntity(f'{daemon_type}.{daemon_id}')
54 else:
55 raise OrchestratorError(f"unknown daemon type {daemon_type}")
56
57
f51cf556
TL
58def simplified_keyring(entity: str, contents: str) -> str:
59 # strip down keyring
60 # - don't include caps (auth get includes them; get-or-create does not)
61 # - use pending key if present
62 key = None
63 for line in contents.splitlines():
64 if ' = ' not in line:
65 continue
66 line = line.strip()
67 (ls, rs) = line.split(' = ', 1)
68 if ls == 'key' and not key:
69 key = rs
70 if ls == 'pending key':
71 key = rs
72 keyring = f'[{entity}]\nkey = {key}\n'
73 return keyring
74
75
f67539c2 76class CephadmDaemonDeploySpec:
f6b5b4d7 77 # typing.NamedTuple + Generic is broken in py36
f91f0fd5 78 def __init__(self, host: str, daemon_id: str,
f67539c2 79 service_name: str,
f91f0fd5
TL
80 network: Optional[str] = None,
81 keyring: Optional[str] = None,
82 extra_args: Optional[List[str]] = None,
83 ceph_conf: str = '',
84 extra_files: Optional[Dict[str, Any]] = None,
85 daemon_type: Optional[str] = None,
f67539c2 86 ip: Optional[str] = None,
b3b6e05e 87 ports: Optional[List[int]] = None,
aee94f69 88 port_ips: Optional[Dict[str, str]] = None,
b3b6e05e 89 rank: Optional[int] = None,
20effc67 90 rank_generation: Optional[int] = None,
aee94f69
TL
91 extra_container_args: Optional[ArgumentList] = None,
92 extra_entrypoint_args: Optional[ArgumentList] = None,
f51cf556 93 init_containers: Optional[List[InitContainerSpec]] = None,
2a845540 94 ):
f6b5b4d7 95 """
f67539c2 96 A data struction to encapsulate `cephadm deploy ...
f6b5b4d7
TL
97 """
98 self.host: str = host
99 self.daemon_id = daemon_id
f67539c2
TL
100 self.service_name = service_name
101 daemon_type = daemon_type or (service_name.split('.')[0])
f6b5b4d7
TL
102 assert daemon_type is not None
103 self.daemon_type: str = daemon_type
104
f6b5b4d7
TL
105 # mons
106 self.network = network
107
108 # for run_cephadm.
109 self.keyring: Optional[str] = keyring
110
aee94f69 111 # FIXME: finish removing this
f6b5b4d7 112 # For run_cephadm. Would be great to have more expressive names.
aee94f69
TL
113 # self.extra_args: List[str] = extra_args or []
114 assert not extra_args
f91f0fd5
TL
115
116 self.ceph_conf = ceph_conf
117 self.extra_files = extra_files or {}
f6b5b4d7
TL
118
119 # TCP ports used by the daemon
f67539c2 120 self.ports: List[int] = ports or []
aee94f69
TL
121 # mapping of ports to IP addresses for ports
122 # we know we will only bind to on a specific IP.
123 # Useful for allowing multiple daemons to bind
124 # to the same port on different IPs on the same node
125 self.port_ips: Dict[str, str] = port_ips or {}
f67539c2
TL
126 self.ip: Optional[str] = ip
127
128 # values to be populated during generate_config calls
129 # and then used in _run_cephadm
130 self.final_config: Dict[str, Any] = {}
131 self.deps: List[str] = []
f6b5b4d7 132
b3b6e05e
TL
133 self.rank: Optional[int] = rank
134 self.rank_generation: Optional[int] = rank_generation
135
20effc67 136 self.extra_container_args = extra_container_args
39ae355f 137 self.extra_entrypoint_args = extra_entrypoint_args
f51cf556
TL
138 self.init_containers = init_containers
139
140 def __setattr__(self, name: str, value: Any) -> None:
141 if value is not None and name in ('extra_container_args', 'extra_entrypoint_args'):
142 for v in value:
143 tname = str(type(v))
144 if 'ArgumentSpec' not in tname:
145 raise TypeError(f"{name} is not all ArgumentSpec values: {v!r}(is {type(v)} in {value!r}")
146
147 super().__setattr__(name, value)
20effc67 148
f6b5b4d7
TL
149 def name(self) -> str:
150 return '%s.%s' % (self.daemon_type, self.daemon_id)
151
39ae355f
TL
152 def entity_name(self) -> str:
153 return get_auth_entity(self.daemon_type, self.daemon_id, host=self.host)
154
f91f0fd5
TL
155 def config_get_files(self) -> Dict[str, Any]:
156 files = self.extra_files
157 if self.ceph_conf:
158 files['config'] = self.ceph_conf
159
160 return files
161
f67539c2
TL
162 @staticmethod
163 def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
164 assert dd.hostname
165 assert dd.daemon_id
166 assert dd.daemon_type
167 return CephadmDaemonDeploySpec(
168 host=dd.hostname,
169 daemon_id=dd.daemon_id,
170 daemon_type=dd.daemon_type,
171 service_name=dd.service_name(),
172 ip=dd.ip,
173 ports=dd.ports,
b3b6e05e
TL
174 rank=dd.rank,
175 rank_generation=dd.rank_generation,
20effc67 176 extra_container_args=dd.extra_container_args,
39ae355f 177 extra_entrypoint_args=dd.extra_entrypoint_args,
f67539c2
TL
178 )
179
180 def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
181 return DaemonDescription(
182 daemon_type=self.daemon_type,
183 daemon_id=self.daemon_id,
b3b6e05e 184 service_name=self.service_name,
f67539c2
TL
185 hostname=self.host,
186 status=status,
187 status_desc=status_desc,
188 ip=self.ip,
189 ports=self.ports,
b3b6e05e
TL
190 rank=self.rank,
191 rank_generation=self.rank_generation,
aee94f69
TL
192 extra_container_args=cast(GeneralArgList, self.extra_container_args),
193 extra_entrypoint_args=cast(GeneralArgList, self.extra_entrypoint_args),
f67539c2
TL
194 )
195
aee94f69
TL
196 @property
197 def extra_args(self) -> List[str]:
198 return []
199
f6b5b4d7
TL
200
201class CephadmService(metaclass=ABCMeta):
e306af50
TL
202 """
203 Base class for service types. Often providing a create() and config() fn.
204 """
f6b5b4d7
TL
205
206 @property
207 @abstractmethod
f91f0fd5 208 def TYPE(self) -> str:
f6b5b4d7
TL
209 pass
210
e306af50
TL
211 def __init__(self, mgr: "CephadmOrchestrator"):
212 self.mgr: "CephadmOrchestrator" = mgr
213
f67539c2 214 def allow_colo(self) -> bool:
b3b6e05e
TL
215 """
216 Return True if multiple daemons of the same type can colocate on
217 the same host.
218 """
f67539c2
TL
219 return False
220
1e59de90 221 def primary_daemon_type(self, spec: Optional[ServiceSpec] = None) -> str:
b3b6e05e
TL
222 """
223 This is the type of the primary (usually only) daemon to be deployed.
224 """
225 return self.TYPE
226
1e59de90 227 def per_host_daemon_type(self, spec: Optional[ServiceSpec] = None) -> Optional[str]:
b3b6e05e
TL
228 """
229 If defined, this type of daemon will be deployed once for each host
230 containing one or more daemons of the primary type.
231 """
f67539c2
TL
232 return None
233
b3b6e05e
TL
234 def ranked(self) -> bool:
235 """
236 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
237 generation (0, 1, ...) to each daemon we create/deploy.
238 """
239 return False
240
241 def fence_old_ranks(self,
242 spec: ServiceSpec,
243 rank_map: Dict[int, Dict[int, Optional[str]]],
244 num_ranks: int) -> None:
245 assert False
f67539c2
TL
246
247 def make_daemon_spec(
b3b6e05e
TL
248 self,
249 host: str,
f67539c2
TL
250 daemon_id: str,
251 network: str,
252 spec: ServiceSpecs,
253 daemon_type: Optional[str] = None,
254 ports: Optional[List[int]] = None,
255 ip: Optional[str] = None,
b3b6e05e
TL
256 rank: Optional[int] = None,
257 rank_generation: Optional[int] = None,
f67539c2
TL
258 ) -> CephadmDaemonDeploySpec:
259 return CephadmDaemonDeploySpec(
f6b5b4d7
TL
260 host=host,
261 daemon_id=daemon_id,
f67539c2
TL
262 service_name=spec.service_name(),
263 network=network,
264 daemon_type=daemon_type,
265 ports=ports,
266 ip=ip,
b3b6e05e
TL
267 rank=rank,
268 rank_generation=rank_generation,
2a845540
TL
269 extra_container_args=spec.extra_container_args if hasattr(
270 spec, 'extra_container_args') else None,
39ae355f
TL
271 extra_entrypoint_args=spec.extra_entrypoint_args if hasattr(
272 spec, 'extra_entrypoint_args') else None,
f51cf556 273 init_containers=getattr(spec, 'init_containers', None),
f6b5b4d7
TL
274 )
275
f67539c2 276 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7
TL
277 raise NotImplementedError()
278
f67539c2 279 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
f91f0fd5 280 raise NotImplementedError()
f6b5b4d7 281
522d829b 282 def config(self, spec: ServiceSpec) -> None:
f67539c2
TL
283 """
284 Configure the cluster for this service. Only called *once* per
285 service apply. Not for every daemon.
286 """
287 pass
288
f91f0fd5 289 def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
e306af50 290 """The post actions needed to be done after daemons are checked"""
f6b5b4d7
TL
291 if self.mgr.config_dashboard:
292 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
293 self.config_dashboard(daemon_descrs)
294 else:
295 logger.debug('Dashboard is not enabled. Skip configuration.')
296
f91f0fd5 297 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
f6b5b4d7 298 """Config dashboard settings."""
e306af50
TL
299 raise NotImplementedError()
300
301 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
1e59de90 302 # if this is called for a service type where it hasn't explicitly been
f6b5b4d7
TL
303 # defined, return empty Daemon Desc
304 return DaemonDescription()
e306af50 305
f67539c2
TL
306 def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
307 ret, keyring, err = self.mgr.mon_command({
308 'prefix': 'auth get-or-create',
309 'entity': entity,
310 'caps': caps,
311 })
312 if err:
313 ret, out, err = self.mgr.mon_command({
314 'prefix': 'auth caps',
315 'entity': entity,
316 'caps': caps,
317 })
318 if err:
319 self.mgr.log.warning(f"Unable to update caps for {entity}")
39ae355f
TL
320
321 # get keyring anyway
322 ret, keyring, err = self.mgr.mon_command({
323 'prefix': 'auth get',
324 'entity': entity,
325 })
326 if err:
327 raise OrchestratorError(f"Unable to fetch keyring for {entity}: {err}")
f51cf556 328 return simplified_keyring(entity, keyring)
f67539c2 329
33c7a0ef
TL
330 def _inventory_get_fqdn(self, hostname: str) -> str:
331 """Get a host's FQDN with its hostname.
332
333 If the FQDN can't be resolved, the address from the inventory will
334 be returned instead.
335 """
336 addr = self.mgr.inventory.get_addr(hostname)
337 return socket.getfqdn(addr)
e306af50
TL
338
339 def _set_service_url_on_dashboard(self,
340 service_name: str,
341 get_mon_cmd: str,
342 set_mon_cmd: str,
f91f0fd5 343 service_url: str) -> None:
f6b5b4d7
TL
344 """A helper to get and set service_url via Dashboard's MON command.
345
346 If result of get_mon_cmd differs from service_url, set_mon_cmd will
347 be sent to set the service_url.
348 """
349 def get_set_cmd_dicts(out: str) -> List[dict]:
350 cmd_dict = {
351 'prefix': set_mon_cmd,
352 'value': service_url
353 }
354 return [cmd_dict] if service_url != out else []
355
356 self._check_and_set_dashboard(
357 service_name=service_name,
358 get_cmd=get_mon_cmd,
359 get_set_cmd_dicts=get_set_cmd_dicts
360 )
361
362 def _check_and_set_dashboard(self,
363 service_name: str,
364 get_cmd: str,
f91f0fd5 365 get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
f6b5b4d7
TL
366 """A helper to set configs in the Dashboard.
367
368 The method is useful for the pattern:
369 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
370 gateways.
371 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
372 - Determine if the config need to be update. NOTE: This step is important because if a
373 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
374 kicks the serve() loop and the logic using this method is likely to be called again.
375 A config should be updated only when needed.
376 - Update a config in Dashboard by using a Dashboard command.
377
378 :param service_name: the service name to be used for logging
379 :type service_name: str
380 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
381 :type get_cmd: str
382 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
383 e.g.
384 [
385 {
386 'prefix': 'dashboard iscsi-gateway-add',
387 'service_url': 'http://admin:admin@aaa:5000',
388 'name': 'aaa'
389 },
390 {
391 'prefix': 'dashboard iscsi-gateway-add',
392 'service_url': 'http://admin:admin@bbb:5000',
393 'name': 'bbb'
394 }
395 ]
396 The function should return empty list if no command need to be sent.
397 :type get_set_cmd_dicts: Callable[[str], List[dict]]
398 """
399
e306af50
TL
400 try:
401 _, out, _ = self.mgr.check_mon_command({
f6b5b4d7 402 'prefix': get_cmd
e306af50
TL
403 })
404 except MonCommandFailed as e:
f6b5b4d7 405 logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
e306af50 406 return
f6b5b4d7
TL
407 cmd_dicts = get_set_cmd_dicts(out.strip())
408 for cmd_dict in list(cmd_dicts):
e306af50 409 try:
cd265ab1
TL
410 inbuf = cmd_dict.pop('inbuf', None)
411 _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
e306af50 412 except MonCommandFailed as e:
f6b5b4d7
TL
413 logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
414
f67539c2
TL
415 def ok_to_stop_osd(
416 self,
417 osds: List[str],
418 known: Optional[List[str]] = None, # output argument
419 force: bool = False) -> HandleCommandResult:
420 r = HandleCommandResult(*self.mgr.mon_command({
421 'prefix': "osd ok-to-stop",
422 'ids': osds,
423 'max': 16,
424 }))
425 j = None
426 try:
427 j = json.loads(r.stdout)
428 except json.decoder.JSONDecodeError:
429 self.mgr.log.warning("osd ok-to-stop didn't return structured result")
430 raise
431 if r.retval:
432 return r
433 if known is not None and j and j.get('ok_to_stop'):
434 self.mgr.log.debug(f"got {j}")
435 known.extend([f'osd.{x}' for x in j.get('osds', [])])
436 return HandleCommandResult(
437 0,
438 f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
439 ''
440 )
441
442 def ok_to_stop(
443 self,
444 daemon_ids: List[str],
445 force: bool = False,
446 known: Optional[List[str]] = None # output argument
447 ) -> HandleCommandResult:
f6b5b4d7 448 names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
f67539c2
TL
449 out = f'It appears safe to stop {",".join(names)}'
450 err = f'It is NOT safe to stop {",".join(names)} at this time'
f6b5b4d7
TL
451
452 if self.TYPE not in ['mon', 'osd', 'mds']:
f67539c2
TL
453 logger.debug(out)
454 return HandleCommandResult(0, out)
455
456 if self.TYPE == 'osd':
457 return self.ok_to_stop_osd(daemon_ids, known, force)
f6b5b4d7
TL
458
459 r = HandleCommandResult(*self.mgr.mon_command({
460 'prefix': f'{self.TYPE} ok-to-stop',
461 'ids': daemon_ids,
462 }))
463
464 if r.retval:
465 err = f'{err}: {r.stderr}' if r.stderr else err
f67539c2 466 logger.debug(err)
f6b5b4d7 467 return HandleCommandResult(r.retval, r.stdout, err)
e306af50 468
f6b5b4d7 469 out = f'{out}: {r.stdout}' if r.stdout else out
f67539c2 470 logger.debug(out)
f6b5b4d7
TL
471 return HandleCommandResult(r.retval, out, r.stderr)
472
f67539c2
TL
473 def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
474 # Provides a warning about if it possible or not to stop <n> daemons in a service
475 names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
476 number_of_running_daemons = len(
477 [daemon
478 for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
479 if daemon.status == DaemonDescriptionStatus.running])
480 if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
481 return False, f'It is presumed safe to stop {names}'
482
483 num_daemons_left = number_of_running_daemons - len(daemon_ids)
484
485 def plural(count: int) -> str:
486 return 'daemon' if count == 1 else 'daemons'
487
488 left_count = "no" if num_daemons_left == 0 else num_daemons_left
489
490 if alert:
491 out = (f'ALERT: Cannot stop {names} in {service} service. '
492 f'Not enough remaining {service} daemons. '
493 f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
494 else:
495 out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
496 f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
497 f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
498 return True, out
499
f91f0fd5 500 def pre_remove(self, daemon: DaemonDescription) -> None:
f6b5b4d7
TL
501 """
502 Called before the daemon is removed.
503 """
f67539c2
TL
504 assert daemon.daemon_type is not None
505 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
f91f0fd5
TL
506 logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
507
a4b75251 508 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
f91f0fd5
TL
509 """
510 Called after the daemon is removed.
511 """
f67539c2
TL
512 assert daemon.daemon_type is not None
513 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
f91f0fd5
TL
514 logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
515
f67539c2
TL
516 def purge(self, service_name: str) -> None:
517 """Called to carry out any purge tasks following service removal"""
518 logger.debug(f'Purge called for {self.TYPE} - no action taken')
519
f91f0fd5
TL
520
521class CephService(CephadmService):
f67539c2 522 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
f91f0fd5
TL
523 # Ceph.daemons (mon, mgr, mds, osd, etc)
524 cephadm_config = self.get_config_and_keyring(
525 daemon_spec.daemon_type,
526 daemon_spec.daemon_id,
527 host=daemon_spec.host,
528 keyring=daemon_spec.keyring,
529 extra_ceph_config=daemon_spec.ceph_conf)
530
531 if daemon_spec.config_get_files():
532 cephadm_config.update({'files': daemon_spec.config_get_files()})
533
534 return cephadm_config, []
535
a4b75251
TL
536 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
537 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
f91f0fd5 538 self.remove_keyring(daemon)
e306af50 539
f91f0fd5 540 def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
39ae355f 541 return get_auth_entity(self.TYPE, daemon_id, host=host)
f91f0fd5
TL
542
543 def get_config_and_keyring(self,
544 daemon_type: str,
545 daemon_id: str,
546 host: str,
547 keyring: Optional[str] = None,
548 extra_ceph_config: Optional[str] = None
549 ) -> Dict[str, Any]:
550 # keyring
551 if not keyring:
552 entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
553 ret, keyring, err = self.mgr.check_mon_command({
554 'prefix': 'auth get',
555 'entity': entity,
556 })
f91f0fd5
TL
557 config = self.mgr.get_minimal_ceph_conf()
558
559 if extra_ceph_config:
560 config += extra_ceph_config
561
562 return {
563 'config': config,
564 'keyring': keyring,
565 }
566
567 def remove_keyring(self, daemon: DaemonDescription) -> None:
f67539c2
TL
568 assert daemon.daemon_id is not None
569 assert daemon.hostname is not None
f91f0fd5
TL
570 daemon_id: str = daemon.daemon_id
571 host: str = daemon.hostname
572
a4b75251 573 assert daemon.daemon_type != 'mon'
f91f0fd5
TL
574
575 entity = self.get_auth_entity(daemon_id, host=host)
576
f67539c2
TL
577 logger.info(f'Removing key for {entity}')
578 ret, out, err = self.mgr.mon_command({
f91f0fd5
TL
579 'prefix': 'auth rm',
580 'entity': entity,
581 })
582
583
584class MonService(CephService):
f6b5b4d7
TL
585 TYPE = 'mon'
586
f67539c2 587 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
e306af50
TL
588 """
589 Create a new monitor on the given host.
590 """
f6b5b4d7 591 assert self.TYPE == daemon_spec.daemon_type
f67539c2 592 name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
f6b5b4d7 593
e306af50
TL
594 # get mon. key
595 ret, keyring, err = self.mgr.check_mon_command({
596 'prefix': 'auth get',
39ae355f 597 'entity': daemon_spec.entity_name(),
e306af50
TL
598 })
599
600 extra_config = '[mon.%s]\n' % name
601 if network:
602 # infer whether this is a CIDR network, addrvec, or plain IP
603 if '/' in network:
604 extra_config += 'public network = %s\n' % network
605 elif network.startswith('[v') and network.endswith(']'):
606 extra_config += 'public addrv = %s\n' % network
f91f0fd5
TL
607 elif is_ipv6(network):
608 extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
e306af50
TL
609 elif ':' not in network:
610 extra_config += 'public addr = %s\n' % network
611 else:
f91f0fd5
TL
612 raise OrchestratorError(
613 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
e306af50
TL
614 else:
615 # try to get the public_network from the config
616 ret, network, err = self.mgr.check_mon_command({
617 'prefix': 'config get',
618 'who': 'mon',
619 'key': 'public_network',
620 })
f6b5b4d7 621 network = network.strip() if network else network
e306af50 622 if not network:
f91f0fd5
TL
623 raise OrchestratorError(
624 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
e306af50 625 if '/' not in network:
f91f0fd5
TL
626 raise OrchestratorError(
627 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
e306af50
TL
628 extra_config += 'public network = %s\n' % network
629
f91f0fd5
TL
630 daemon_spec.ceph_conf = extra_config
631 daemon_spec.keyring = keyring
f6b5b4d7 632
f67539c2
TL
633 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
634
f91f0fd5 635 return daemon_spec
f6b5b4d7 636
1e59de90
TL
637 def config(self, spec: ServiceSpec) -> None:
638 assert self.TYPE == spec.service_type
639 self.set_crush_locations(self.mgr.cache.get_daemons_by_type('mon'), spec)
640
641 def _get_quorum_status(self) -> Dict[Any, Any]:
f6b5b4d7
TL
642 ret, out, err = self.mgr.check_mon_command({
643 'prefix': 'quorum_status',
644 })
645 try:
646 j = json.loads(out)
1e59de90
TL
647 except Exception as e:
648 raise OrchestratorError(f'failed to parse mon quorum status: {e}')
649 return j
f6b5b4d7 650
1e59de90
TL
651 def _check_safe_to_destroy(self, mon_id: str) -> None:
652 quorum_status = self._get_quorum_status()
653 mons = [m['name'] for m in quorum_status['monmap']['mons']]
f6b5b4d7
TL
654 if mon_id not in mons:
655 logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
656 mon_id, mons))
657 return
658 new_mons = [m for m in mons if m != mon_id]
1e59de90 659 new_quorum = [m for m in quorum_status['quorum_names'] if m != mon_id]
f6b5b4d7 660 if len(new_quorum) > len(new_mons) / 2:
f91f0fd5
TL
661 logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
662 (mon_id, new_quorum, new_mons))
f6b5b4d7 663 return
f91f0fd5
TL
664 raise OrchestratorError(
665 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
f6b5b4d7 666
f91f0fd5
TL
667 def pre_remove(self, daemon: DaemonDescription) -> None:
668 super().pre_remove(daemon)
f6b5b4d7 669
f67539c2 670 assert daemon.daemon_id is not None
f91f0fd5 671 daemon_id: str = daemon.daemon_id
f6b5b4d7
TL
672 self._check_safe_to_destroy(daemon_id)
673
674 # remove mon from quorum before we destroy the daemon
675 logger.info('Removing monitor %s from monmap...' % daemon_id)
676 ret, out, err = self.mgr.check_mon_command({
677 'prefix': 'mon rm',
678 'name': daemon_id,
679 })
e306af50 680
a4b75251
TL
681 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
682 # Do not remove the mon keyring.
683 # super().post_remove(daemon)
684 pass
685
1e59de90
TL
686 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
687 daemon_spec.final_config, daemon_spec.deps = super().generate_config(daemon_spec)
688
689 # realistically, we expect there to always be a mon spec
690 # in a real deployment, but the way teuthology deploys some daemons
691 # it's possible there might not be. For that reason we need to
692 # verify the service is present in the spec store.
693 if daemon_spec.service_name in self.mgr.spec_store:
694 mon_spec = cast(MONSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
695 if mon_spec.crush_locations:
696 if daemon_spec.host in mon_spec.crush_locations:
697 # the --crush-location flag only supports a single bucket=loc pair so
698 # others will have to be handled later. The idea is to set the flag
699 # for the first bucket=loc pair in the list in order to facilitate
700 # replacing a tiebreaker mon (https://docs.ceph.com/en/quincy/rados/operations/stretch-mode/#other-commands)
701 c_loc = mon_spec.crush_locations[daemon_spec.host][0]
702 daemon_spec.final_config['crush_location'] = c_loc
703
704 return daemon_spec.final_config, daemon_spec.deps
705
706 def set_crush_locations(self, daemon_descrs: List[DaemonDescription], spec: ServiceSpec) -> None:
707 logger.debug('Setting mon crush locations from spec')
708 if not daemon_descrs:
709 return
710 assert self.TYPE == spec.service_type
711 mon_spec = cast(MONSpec, spec)
712
713 if not mon_spec.crush_locations:
714 return
715
716 quorum_status = self._get_quorum_status()
717 mons_in_monmap = [m['name'] for m in quorum_status['monmap']['mons']]
718 for dd in daemon_descrs:
719 assert dd.daemon_id is not None
720 assert dd.hostname is not None
721 if dd.hostname not in mon_spec.crush_locations:
722 continue
723 if dd.daemon_id not in mons_in_monmap:
724 continue
725 # expected format for crush_locations from the quorum status is
726 # {bucket1=loc1,bucket2=loc2} etc. for the number of bucket=loc pairs
727 try:
728 current_crush_locs = [m['crush_location'] for m in quorum_status['monmap']['mons'] if m['name'] == dd.daemon_id][0]
729 except (KeyError, IndexError) as e:
730 logger.warning(f'Failed setting crush location for mon {dd.daemon_id}: {e}\n'
731 'Mon may not have a monmap entry yet. Try re-applying mon spec once mon is confirmed up.')
732 desired_crush_locs = '{' + ','.join(mon_spec.crush_locations[dd.hostname]) + '}'
733 logger.debug(f'Found spec defined crush locations for mon on {dd.hostname}: {desired_crush_locs}')
734 logger.debug(f'Current crush locations for mon on {dd.hostname}: {current_crush_locs}')
735 if current_crush_locs != desired_crush_locs:
736 logger.info(f'Setting crush location for mon {dd.daemon_id} to {desired_crush_locs}')
737 try:
738 ret, out, err = self.mgr.check_mon_command({
739 'prefix': 'mon set_location',
740 'name': dd.daemon_id,
741 'args': mon_spec.crush_locations[dd.hostname]
742 })
743 except Exception as e:
744 logger.error(f'Failed setting crush location for mon {dd.daemon_id}: {e}')
745
e306af50 746
f91f0fd5 747class MgrService(CephService):
f6b5b4d7
TL
748 TYPE = 'mgr'
749
b3b6e05e
TL
750 def allow_colo(self) -> bool:
751 if self.mgr.get_ceph_option('mgr_standby_modules'):
752 # traditional mgr mode: standby daemons' modules listen on
753 # ports and redirect to the primary. we must not schedule
754 # multiple mgrs on the same host or else ports will
755 # conflict.
756 return False
757 else:
758 # standby daemons do nothing, and therefore port conflicts
759 # are not a concern.
760 return True
761
f67539c2 762 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
e306af50
TL
763 """
764 Create a new manager instance on a host.
765 """
f6b5b4d7 766 assert self.TYPE == daemon_spec.daemon_type
f67539c2 767 mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
f6b5b4d7 768
e306af50 769 # get mgr. key
f67539c2
TL
770 keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
771 ['mon', 'profile mgr',
772 'osd', 'allow *',
773 'mds', 'allow *'])
e306af50 774
f6b5b4d7
TL
775 # Retrieve ports used by manager modules
776 # In the case of the dashboard port and with several manager daemons
777 # running in different hosts, it exists the possibility that the
778 # user has decided to use different dashboard ports in each server
779 # If this is the case then the dashboard port opened will be only the used
780 # as default.
781 ports = []
f6b5b4d7 782 ret, mgr_services, err = self.mgr.check_mon_command({
f91f0fd5 783 'prefix': 'mgr services',
f6b5b4d7
TL
784 })
785 if mgr_services:
786 mgr_endpoints = json.loads(mgr_services)
787 for end_point in mgr_endpoints.values():
f91f0fd5 788 port = re.search(r'\:\d+\/', end_point)
f6b5b4d7
TL
789 if port:
790 ports.append(int(port[0][1:-1]))
791
792 if ports:
793 daemon_spec.ports = ports
794
1e59de90 795 daemon_spec.ports.append(self.mgr.service_discovery_port)
f6b5b4d7
TL
796 daemon_spec.keyring = keyring
797
f67539c2
TL
798 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
799
f91f0fd5
TL
800 return daemon_spec
801
f6b5b4d7 802 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
f6b5b4d7 803 for daemon in daemon_descrs:
f67539c2
TL
804 assert daemon.daemon_type is not None
805 assert daemon.daemon_id is not None
f91f0fd5 806 if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
f6b5b4d7
TL
807 return daemon
808 # if no active mgr found, return empty Daemon Desc
809 return DaemonDescription()
e306af50 810
f91f0fd5 811 def fail_over(self) -> None:
33c7a0ef
TL
812 # this has been seen to sometimes transiently fail even when there are multiple
813 # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
814 class NoStandbyError(OrchestratorError):
815 pass
816 no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=(
817 'daemon', 'mgr' + self.mgr.get_mgr_id()))
818 for sleep_secs in [2, 8, 15]:
819 try:
820 if not self.mgr_map_has_standby():
821 raise no_standby_exc
822 self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
823 'INFO', 'Failing over to other MGR')
824 logger.info('Failing over to other MGR')
825
826 # fail over
827 ret, out, err = self.mgr.check_mon_command({
828 'prefix': 'mgr fail',
829 'who': self.mgr.get_mgr_id(),
830 })
831 return
832 except NoStandbyError:
833 logger.info(
834 f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
835 time.sleep(sleep_secs)
836 raise no_standby_exc
f91f0fd5
TL
837
838 def mgr_map_has_standby(self) -> bool:
839 """
840 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
841 we know it joined the cluster
842 """
843 mgr_map = self.mgr.get('mgr_map')
844 num = len(mgr_map.get('standbys'))
845 return bool(num)
846
f67539c2
TL
847 def ok_to_stop(
848 self,
849 daemon_ids: List[str],
850 force: bool = False,
851 known: Optional[List[str]] = None # output argument
852 ) -> HandleCommandResult:
853 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
854
855 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
856 if warn:
857 return HandleCommandResult(-errno.EBUSY, '', warn_message)
858
859 mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
860 active = self.get_active_daemon(mgr_daemons).daemon_id
861 if active in daemon_ids:
862 warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
863 return HandleCommandResult(-errno.EBUSY, '', warn_message)
864
865 return HandleCommandResult(0, warn_message, '')
866
e306af50 867
f91f0fd5 868class MdsService(CephService):
f6b5b4d7
TL
869 TYPE = 'mds'
870
f67539c2
TL
871 def allow_colo(self) -> bool:
872 return True
873
522d829b 874 def config(self, spec: ServiceSpec) -> None:
f6b5b4d7 875 assert self.TYPE == spec.service_type
e306af50 876 assert spec.service_id
f6b5b4d7
TL
877
878 # ensure mds_join_fs is set for these daemons
e306af50
TL
879 ret, out, err = self.mgr.check_mon_command({
880 'prefix': 'config set',
881 'who': 'mds.' + spec.service_id,
882 'name': 'mds_join_fs',
883 'value': spec.service_id,
884 })
885
f67539c2 886 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7 887 assert self.TYPE == daemon_spec.daemon_type
f67539c2 888 mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
f6b5b4d7 889
f67539c2
TL
890 # get mds. key
891 keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
892 ['mon', 'profile mds',
893 'osd', 'allow rw tag cephfs *=*',
894 'mds', 'allow'])
f6b5b4d7
TL
895 daemon_spec.keyring = keyring
896
f67539c2
TL
897 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
898
f91f0fd5
TL
899 return daemon_spec
900
f6b5b4d7
TL
901 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
902 active_mds_strs = list()
903 for fs in self.mgr.get('fs_map')['filesystems']:
904 mds_map = fs['mdsmap']
905 if mds_map is not None:
906 for mds_id, mds_status in mds_map['info'].items():
907 if mds_status['state'] == 'up:active':
908 active_mds_strs.append(mds_status['name'])
909 if len(active_mds_strs) != 0:
910 for daemon in daemon_descrs:
911 if daemon.daemon_id in active_mds_strs:
912 return daemon
913 # if no mds found, return empty Daemon Desc
914 return DaemonDescription()
e306af50 915
f67539c2
TL
916 def purge(self, service_name: str) -> None:
917 self.mgr.check_mon_command({
918 'prefix': 'config rm',
919 'who': service_name,
920 'name': 'mds_join_fs',
921 })
922
e306af50 923
f91f0fd5 924class RgwService(CephService):
f6b5b4d7
TL
925 TYPE = 'rgw'
926
f67539c2
TL
927 def allow_colo(self) -> bool:
928 return True
f6b5b4d7 929
522d829b 930 def config(self, spec: RGWSpec) -> None: # type: ignore
f67539c2 931 assert self.TYPE == spec.service_type
f6b5b4d7 932
1e59de90 933 # set rgw_realm rgw_zonegroup and rgw_zone, if present
f67539c2
TL
934 if spec.rgw_realm:
935 ret, out, err = self.mgr.check_mon_command({
936 'prefix': 'config set',
937 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
938 'name': 'rgw_realm',
939 'value': spec.rgw_realm,
940 })
1e59de90
TL
941 if spec.rgw_zonegroup:
942 ret, out, err = self.mgr.check_mon_command({
943 'prefix': 'config set',
944 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
945 'name': 'rgw_zonegroup',
946 'value': spec.rgw_zonegroup,
947 })
f67539c2
TL
948 if spec.rgw_zone:
949 ret, out, err = self.mgr.check_mon_command({
950 'prefix': 'config set',
951 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
952 'name': 'rgw_zone',
953 'value': spec.rgw_zone,
954 })
e306af50
TL
955
956 if spec.rgw_frontend_ssl_certificate:
957 if isinstance(spec.rgw_frontend_ssl_certificate, list):
958 cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
959 elif isinstance(spec.rgw_frontend_ssl_certificate, str):
960 cert_data = spec.rgw_frontend_ssl_certificate
961 else:
962 raise OrchestratorError(
f91f0fd5
TL
963 'Invalid rgw_frontend_ssl_certificate: %s'
964 % spec.rgw_frontend_ssl_certificate)
e306af50
TL
965 ret, out, err = self.mgr.check_mon_command({
966 'prefix': 'config-key set',
f67539c2 967 'key': f'rgw/cert/{spec.service_name()}',
e306af50
TL
968 'val': cert_data,
969 })
970
f51cf556
TL
971 if spec.zonegroup_hostnames:
972 zg_update_cmd = {
973 'prefix': 'rgw zonegroup modify',
974 'realm_name': spec.rgw_realm,
975 'zonegroup_name': spec.rgw_zonegroup,
976 'zone_name': spec.rgw_zone,
977 'hostnames': spec.zonegroup_hostnames,
978 }
979 logger.debug(f'rgw cmd: {zg_update_cmd}')
980 ret, out, err = self.mgr.check_mon_command(zg_update_cmd)
981
f67539c2 982 # TODO: fail, if we don't have a spec
e306af50
TL
983 logger.info('Saving service %s spec with placement %s' % (
984 spec.service_name(), spec.placement.pretty_str()))
985 self.mgr.spec_store.save(spec)
522d829b 986 self.mgr.trigger_connect_dashboard_rgw()
e306af50 987
f67539c2 988 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7 989 assert self.TYPE == daemon_spec.daemon_type
f67539c2
TL
990 rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
991 spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
f6b5b4d7
TL
992
993 keyring = self.get_keyring(rgw_id)
994
f67539c2
TL
995 if daemon_spec.ports:
996 port = daemon_spec.ports[0]
997 else:
998 # this is a redeploy of older instance that doesn't have an explicitly
999 # assigned port, in which case we can assume there is only 1 per host
1000 # and it matches the spec.
1001 port = spec.get_port()
1002
1003 # configure frontend
1004 args = []
1005 ftype = spec.rgw_frontend_type or "beast"
1006 if ftype == 'beast':
1007 if spec.ssl:
1008 if daemon_spec.ip:
a4b75251
TL
1009 args.append(
1010 f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
f67539c2
TL
1011 else:
1012 args.append(f"ssl_port={port}")
1013 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
1014 else:
1015 if daemon_spec.ip:
a4b75251 1016 args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
f67539c2
TL
1017 else:
1018 args.append(f"port={port}")
1019 elif ftype == 'civetweb':
1020 if spec.ssl:
1021 if daemon_spec.ip:
a4b75251
TL
1022 # note the 's' suffix on port
1023 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
f67539c2
TL
1024 else:
1025 args.append(f"port={port}s") # note the 's' suffix on port
1026 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
1027 else:
1028 if daemon_spec.ip:
a4b75251 1029 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
f67539c2
TL
1030 else:
1031 args.append(f"port={port}")
1e59de90
TL
1032 else:
1033 raise OrchestratorError(f'Invalid rgw_frontend_type parameter: {ftype}. Valid values are: beast, civetweb.')
1034
1035 if spec.rgw_frontend_extra_args is not None:
1036 args.extend(spec.rgw_frontend_extra_args)
1037
f67539c2 1038 frontend = f'{ftype} {" ".join(args)}'
f51cf556 1039 daemon_name = utils.name_to_config_section(daemon_spec.name())
f67539c2
TL
1040
1041 ret, out, err = self.mgr.check_mon_command({
1042 'prefix': 'config set',
f51cf556 1043 'who': daemon_name,
f67539c2
TL
1044 'name': 'rgw_frontends',
1045 'value': frontend
1046 })
1047
f51cf556
TL
1048 if spec.rgw_user_counters_cache:
1049 ret, out, err = self.mgr.check_mon_command({
1050 'prefix': 'config set',
1051 'who': daemon_name,
1052 'name': 'rgw_user_counters_cache',
1053 'value': 'true',
1054 })
1055 if spec.rgw_bucket_counters_cache:
1056 ret, out, err = self.mgr.check_mon_command({
1057 'prefix': 'config set',
1058 'who': daemon_name,
1059 'name': 'rgw_bucket_counters_cache',
1060 'value': 'true',
1061 })
1062
1063 if spec.rgw_user_counters_cache_size:
1064 ret, out, err = self.mgr.check_mon_command({
1065 'prefix': 'config set',
1066 'who': daemon_name,
1067 'name': 'rgw_user_counters_cache_size',
1068 'value': str(spec.rgw_user_counters_cache_size),
1069 })
1070
1071 if spec.rgw_bucket_counters_cache_size:
1072 ret, out, err = self.mgr.check_mon_command({
1073 'prefix': 'config set',
1074 'who': daemon_name,
1075 'name': 'rgw_bucket_counters_cache_size',
1076 'value': str(spec.rgw_bucket_counters_cache_size),
1077 })
1078
f6b5b4d7 1079 daemon_spec.keyring = keyring
f67539c2 1080 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
f6b5b4d7 1081
f91f0fd5 1082 return daemon_spec
f6b5b4d7 1083
f91f0fd5 1084 def get_keyring(self, rgw_id: str) -> str:
f67539c2
TL
1085 keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
1086 ['mon', 'allow *',
1087 'mgr', 'allow rw',
1088 'osd', 'allow rwx tag rgw *=*'])
f6b5b4d7
TL
1089 return keyring
1090
f67539c2
TL
1091 def purge(self, service_name: str) -> None:
1092 self.mgr.check_mon_command({
1093 'prefix': 'config rm',
1094 'who': utils.name_to_config_section(service_name),
1095 'name': 'rgw_realm',
1096 })
1097 self.mgr.check_mon_command({
1098 'prefix': 'config rm',
1099 'who': utils.name_to_config_section(service_name),
1100 'name': 'rgw_zone',
1101 })
1102 self.mgr.check_mon_command({
1103 'prefix': 'config-key rm',
1104 'key': f'rgw/cert/{service_name}',
1105 })
522d829b 1106 self.mgr.trigger_connect_dashboard_rgw()
f67539c2 1107
a4b75251
TL
1108 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
1109 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
f67539c2
TL
1110 self.mgr.check_mon_command({
1111 'prefix': 'config rm',
1112 'who': utils.name_to_config_section(daemon.name()),
1113 'name': 'rgw_frontends',
1114 })
1115
1116 def ok_to_stop(
1117 self,
1118 daemon_ids: List[str],
1119 force: bool = False,
1120 known: Optional[List[str]] = None # output argument
1121 ) -> HandleCommandResult:
1122 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
1123 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
1124 def ingress_present() -> bool:
1125 running_ingress_daemons = [
1126 daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
1127 running_haproxy_daemons = [
1128 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
1129 running_keepalived_daemons = [
1130 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
1131 # check that there is at least one haproxy and keepalived daemon running
1132 if running_haproxy_daemons and running_keepalived_daemons:
1133 return True
1134 return False
1135
1136 # if only 1 rgw, alert user (this is not passable with --force)
1137 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
1138 if warn:
1139 return HandleCommandResult(-errno.EBUSY, '', warn_message)
1140
1141 # if reached here, there is > 1 rgw daemon.
1142 # Say okay if load balancer present or force flag set
1143 if ingress_present() or force:
1144 return HandleCommandResult(0, warn_message, '')
1145
1146 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
1147 # Provide warning
1148 warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
1149 return HandleCommandResult(-errno.EBUSY, '', warn_message)
e306af50 1150
522d829b
TL
1151 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
1152 self.mgr.trigger_connect_dashboard_rgw()
1153
e306af50 1154
f91f0fd5 1155class RbdMirrorService(CephService):
f6b5b4d7
TL
1156 TYPE = 'rbd-mirror'
1157
f67539c2
TL
1158 def allow_colo(self) -> bool:
1159 return True
1160
1161 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7 1162 assert self.TYPE == daemon_spec.daemon_type
f67539c2 1163 daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
f6b5b4d7 1164
f67539c2
TL
1165 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
1166 ['mon', 'profile rbd-mirror',
1167 'osd', 'profile rbd'])
f6b5b4d7
TL
1168
1169 daemon_spec.keyring = keyring
1170
f67539c2
TL
1171 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1172
f91f0fd5 1173 return daemon_spec
e306af50 1174
f67539c2
TL
1175 def ok_to_stop(
1176 self,
1177 daemon_ids: List[str],
1178 force: bool = False,
1179 known: Optional[List[str]] = None # output argument
1180 ) -> HandleCommandResult:
1181 # if only 1 rbd-mirror, alert user (this is not passable with --force)
1182 warn, warn_message = self._enough_daemons_to_stop(
1183 self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
1184 if warn:
1185 return HandleCommandResult(-errno.EBUSY, '', warn_message)
1186 return HandleCommandResult(0, warn_message, '')
1187
e306af50 1188
f91f0fd5 1189class CrashService(CephService):
f6b5b4d7
TL
1190 TYPE = 'crash'
1191
f67539c2 1192 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7
TL
1193 assert self.TYPE == daemon_spec.daemon_type
1194 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
1195
f67539c2
TL
1196 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
1197 ['mon', 'profile crash',
1198 'mgr', 'profile crash'])
1199
1200 daemon_spec.keyring = keyring
1201
1202 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1203
1204 return daemon_spec
1205
1206
39ae355f
TL
1207class CephExporterService(CephService):
1208 TYPE = 'ceph-exporter'
1209 DEFAULT_SERVICE_PORT = 9926
1210
1211 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1212 assert self.TYPE == daemon_spec.daemon_type
1213 spec = cast(CephExporterSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
1214 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_spec.daemon_id),
1215 ['mon', 'profile ceph-exporter',
1216 'mon', 'allow r',
1217 'mgr', 'allow r',
1218 'osd', 'allow r'])
1219 exporter_config = {}
1220 if spec.sock_dir:
1221 exporter_config.update({'sock-dir': spec.sock_dir})
1222 if spec.port:
1223 exporter_config.update({'port': f'{spec.port}'})
1224 if spec.prio_limit is not None:
1225 exporter_config.update({'prio-limit': f'{spec.prio_limit}'})
1226 if spec.stats_period:
1227 exporter_config.update({'stats-period': f'{spec.stats_period}'})
1228
1229 daemon_spec.keyring = keyring
1230 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1231 daemon_spec.final_config = merge_dicts(daemon_spec.final_config, exporter_config)
1232 return daemon_spec
1233
1234
f67539c2
TL
1235class CephfsMirrorService(CephService):
1236 TYPE = 'cephfs-mirror'
1237
20effc67
TL
1238 def config(self, spec: ServiceSpec) -> None:
1239 # make sure mirroring module is enabled
1240 mgr_map = self.mgr.get('mgr_map')
1241 mod_name = 'mirroring'
1242 if mod_name not in mgr_map.get('services', {}):
1243 self.mgr.check_mon_command({
1244 'prefix': 'mgr module enable',
1245 'module': mod_name
1246 })
1247 # we shouldn't get here (mon will tell the mgr to respawn), but no
1248 # harm done if we do.
1249
f67539c2
TL
1250 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1251 assert self.TYPE == daemon_spec.daemon_type
1252
e306af50
TL
1253 ret, keyring, err = self.mgr.check_mon_command({
1254 'prefix': 'auth get-or-create',
39ae355f 1255 'entity': daemon_spec.entity_name(),
b3b6e05e 1256 'caps': ['mon', 'profile cephfs-mirror',
f67539c2
TL
1257 'mds', 'allow r',
1258 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
1259 'mgr', 'allow r'],
e306af50 1260 })
f6b5b4d7
TL
1261
1262 daemon_spec.keyring = keyring
f67539c2 1263 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
f91f0fd5 1264 return daemon_spec
20effc67
TL
1265
1266
1267class CephadmAgent(CephService):
1268 TYPE = 'agent'
1269
1270 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1271 assert self.TYPE == daemon_spec.daemon_type
1272 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
1273
1e59de90 1274 if not self.mgr.http_server.agent:
20effc67
TL
1275 raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
1276
1277 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
1278 daemon_spec.keyring = keyring
1279 self.mgr.agent_cache.agent_keys[host] = keyring
1280
1281 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1282
1283 return daemon_spec
1284
1285 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
1e59de90 1286 agent = self.mgr.http_server.agent
20effc67 1287 try:
1e59de90
TL
1288 assert agent
1289 assert agent.ssl_certs.get_root_cert()
1290 assert agent.server_port
20effc67
TL
1291 except Exception:
1292 raise OrchestratorError(
1293 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
1294
1295 cfg = {'target_ip': self.mgr.get_mgr_ip(),
1e59de90 1296 'target_port': agent.server_port,
20effc67
TL
1297 'refresh_period': self.mgr.agent_refresh_rate,
1298 'listener_port': self.mgr.agent_starting_port,
1299 'host': daemon_spec.host,
1300 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
1301
1e59de90 1302 listener_cert, listener_key = agent.ssl_certs.generate_cert(daemon_spec.host, self.mgr.inventory.get_addr(daemon_spec.host))
20effc67
TL
1303 config = {
1304 'agent.json': json.dumps(cfg),
1305 'keyring': daemon_spec.keyring,
1e59de90 1306 'root_cert.pem': agent.ssl_certs.get_root_cert(),
20effc67
TL
1307 'listener.crt': listener_cert,
1308 'listener.key': listener_key,
1309 }
1310
1e59de90
TL
1311 return config, sorted([str(self.mgr.get_mgr_ip()), str(agent.server_port),
1312 agent.ssl_certs.get_root_cert(),
20effc67 1313 str(self.mgr.get_module_option('device_enhanced_scan'))])