]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
8028b27c661cac56b9fc2d5dc15f263f951d53a3
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
1 import errno
2 import json
3 import logging
4 import re
5 import socket
6 import time
7 from abc import ABCMeta, abstractmethod
8 from typing import TYPE_CHECKING, List, Callable, TypeVar, \
9 Optional, Dict, Any, Tuple, NewType, cast
10
11 from mgr_module import HandleCommandResult, MonCommandFailed
12
13 from ceph.deployment.service_spec import ServiceSpec, RGWSpec
14 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
15 from mgr_util import build_url
16 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
17 from orchestrator._interface import daemon_type_to_service
18 from cephadm import utils
19
20 if TYPE_CHECKING:
21 from cephadm.module import CephadmOrchestrator
22
23 logger = logging.getLogger(__name__)
24
25 ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
26 AuthEntity = NewType('AuthEntity', str)
27
28
29 class CephadmDaemonDeploySpec:
30 # typing.NamedTuple + Generic is broken in py36
31 def __init__(self, host: str, daemon_id: str,
32 service_name: str,
33 network: Optional[str] = None,
34 keyring: Optional[str] = None,
35 extra_args: Optional[List[str]] = None,
36 ceph_conf: str = '',
37 extra_files: Optional[Dict[str, Any]] = None,
38 daemon_type: Optional[str] = None,
39 ip: Optional[str] = None,
40 ports: Optional[List[int]] = None,
41 rank: Optional[int] = None,
42 rank_generation: Optional[int] = None,
43 extra_container_args: Optional[List[str]] = None,
44 ):
45 """
46 A data struction to encapsulate `cephadm deploy ...
47 """
48 self.host: str = host
49 self.daemon_id = daemon_id
50 self.service_name = service_name
51 daemon_type = daemon_type or (service_name.split('.')[0])
52 assert daemon_type is not None
53 self.daemon_type: str = daemon_type
54
55 # mons
56 self.network = network
57
58 # for run_cephadm.
59 self.keyring: Optional[str] = keyring
60
61 # For run_cephadm. Would be great to have more expressive names.
62 self.extra_args: List[str] = extra_args or []
63
64 self.ceph_conf = ceph_conf
65 self.extra_files = extra_files or {}
66
67 # TCP ports used by the daemon
68 self.ports: List[int] = ports or []
69 self.ip: Optional[str] = ip
70
71 # values to be populated during generate_config calls
72 # and then used in _run_cephadm
73 self.final_config: Dict[str, Any] = {}
74 self.deps: List[str] = []
75
76 self.rank: Optional[int] = rank
77 self.rank_generation: Optional[int] = rank_generation
78
79 self.extra_container_args = extra_container_args
80
81 def name(self) -> str:
82 return '%s.%s' % (self.daemon_type, self.daemon_id)
83
84 def config_get_files(self) -> Dict[str, Any]:
85 files = self.extra_files
86 if self.ceph_conf:
87 files['config'] = self.ceph_conf
88
89 return files
90
91 @staticmethod
92 def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
93 assert dd.hostname
94 assert dd.daemon_id
95 assert dd.daemon_type
96 return CephadmDaemonDeploySpec(
97 host=dd.hostname,
98 daemon_id=dd.daemon_id,
99 daemon_type=dd.daemon_type,
100 service_name=dd.service_name(),
101 ip=dd.ip,
102 ports=dd.ports,
103 rank=dd.rank,
104 rank_generation=dd.rank_generation,
105 extra_container_args=dd.extra_container_args,
106 )
107
108 def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
109 return DaemonDescription(
110 daemon_type=self.daemon_type,
111 daemon_id=self.daemon_id,
112 service_name=self.service_name,
113 hostname=self.host,
114 status=status,
115 status_desc=status_desc,
116 ip=self.ip,
117 ports=self.ports,
118 rank=self.rank,
119 rank_generation=self.rank_generation,
120 extra_container_args=self.extra_container_args,
121 )
122
123
124 class CephadmService(metaclass=ABCMeta):
125 """
126 Base class for service types. Often providing a create() and config() fn.
127 """
128
129 @property
130 @abstractmethod
131 def TYPE(self) -> str:
132 pass
133
134 def __init__(self, mgr: "CephadmOrchestrator"):
135 self.mgr: "CephadmOrchestrator" = mgr
136
137 def allow_colo(self) -> bool:
138 """
139 Return True if multiple daemons of the same type can colocate on
140 the same host.
141 """
142 return False
143
144 def primary_daemon_type(self) -> str:
145 """
146 This is the type of the primary (usually only) daemon to be deployed.
147 """
148 return self.TYPE
149
150 def per_host_daemon_type(self) -> Optional[str]:
151 """
152 If defined, this type of daemon will be deployed once for each host
153 containing one or more daemons of the primary type.
154 """
155 return None
156
157 def ranked(self) -> bool:
158 """
159 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
160 generation (0, 1, ...) to each daemon we create/deploy.
161 """
162 return False
163
164 def fence_old_ranks(self,
165 spec: ServiceSpec,
166 rank_map: Dict[int, Dict[int, Optional[str]]],
167 num_ranks: int) -> None:
168 assert False
169
170 def make_daemon_spec(
171 self,
172 host: str,
173 daemon_id: str,
174 network: str,
175 spec: ServiceSpecs,
176 daemon_type: Optional[str] = None,
177 ports: Optional[List[int]] = None,
178 ip: Optional[str] = None,
179 rank: Optional[int] = None,
180 rank_generation: Optional[int] = None,
181 ) -> CephadmDaemonDeploySpec:
182 return CephadmDaemonDeploySpec(
183 host=host,
184 daemon_id=daemon_id,
185 service_name=spec.service_name(),
186 network=network,
187 daemon_type=daemon_type,
188 ports=ports,
189 ip=ip,
190 rank=rank,
191 rank_generation=rank_generation,
192 extra_container_args=spec.extra_container_args if hasattr(
193 spec, 'extra_container_args') else None,
194 )
195
196 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
197 raise NotImplementedError()
198
199 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
200 raise NotImplementedError()
201
202 def config(self, spec: ServiceSpec) -> None:
203 """
204 Configure the cluster for this service. Only called *once* per
205 service apply. Not for every daemon.
206 """
207 pass
208
209 def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
210 """The post actions needed to be done after daemons are checked"""
211 if self.mgr.config_dashboard:
212 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
213 self.config_dashboard(daemon_descrs)
214 else:
215 logger.debug('Dashboard is not enabled. Skip configuration.')
216
217 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
218 """Config dashboard settings."""
219 raise NotImplementedError()
220
221 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
222 # if this is called for a service type where it hasn't explcitly been
223 # defined, return empty Daemon Desc
224 return DaemonDescription()
225
226 def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
227 ret, keyring, err = self.mgr.mon_command({
228 'prefix': 'auth get-or-create',
229 'entity': entity,
230 'caps': caps,
231 })
232 if err:
233 ret, out, err = self.mgr.mon_command({
234 'prefix': 'auth caps',
235 'entity': entity,
236 'caps': caps,
237 })
238 if err:
239 self.mgr.log.warning(f"Unable to update caps for {entity}")
240 return keyring
241
242 def _inventory_get_fqdn(self, hostname: str) -> str:
243 """Get a host's FQDN with its hostname.
244
245 If the FQDN can't be resolved, the address from the inventory will
246 be returned instead.
247 """
248 addr = self.mgr.inventory.get_addr(hostname)
249 return socket.getfqdn(addr)
250
251 def _set_service_url_on_dashboard(self,
252 service_name: str,
253 get_mon_cmd: str,
254 set_mon_cmd: str,
255 service_url: str) -> None:
256 """A helper to get and set service_url via Dashboard's MON command.
257
258 If result of get_mon_cmd differs from service_url, set_mon_cmd will
259 be sent to set the service_url.
260 """
261 def get_set_cmd_dicts(out: str) -> List[dict]:
262 cmd_dict = {
263 'prefix': set_mon_cmd,
264 'value': service_url
265 }
266 return [cmd_dict] if service_url != out else []
267
268 self._check_and_set_dashboard(
269 service_name=service_name,
270 get_cmd=get_mon_cmd,
271 get_set_cmd_dicts=get_set_cmd_dicts
272 )
273
274 def _check_and_set_dashboard(self,
275 service_name: str,
276 get_cmd: str,
277 get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
278 """A helper to set configs in the Dashboard.
279
280 The method is useful for the pattern:
281 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
282 gateways.
283 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
284 - Determine if the config need to be update. NOTE: This step is important because if a
285 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
286 kicks the serve() loop and the logic using this method is likely to be called again.
287 A config should be updated only when needed.
288 - Update a config in Dashboard by using a Dashboard command.
289
290 :param service_name: the service name to be used for logging
291 :type service_name: str
292 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
293 :type get_cmd: str
294 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
295 e.g.
296 [
297 {
298 'prefix': 'dashboard iscsi-gateway-add',
299 'service_url': 'http://admin:admin@aaa:5000',
300 'name': 'aaa'
301 },
302 {
303 'prefix': 'dashboard iscsi-gateway-add',
304 'service_url': 'http://admin:admin@bbb:5000',
305 'name': 'bbb'
306 }
307 ]
308 The function should return empty list if no command need to be sent.
309 :type get_set_cmd_dicts: Callable[[str], List[dict]]
310 """
311
312 try:
313 _, out, _ = self.mgr.check_mon_command({
314 'prefix': get_cmd
315 })
316 except MonCommandFailed as e:
317 logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
318 return
319 cmd_dicts = get_set_cmd_dicts(out.strip())
320 for cmd_dict in list(cmd_dicts):
321 try:
322 inbuf = cmd_dict.pop('inbuf', None)
323 _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
324 except MonCommandFailed as e:
325 logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
326
327 def ok_to_stop_osd(
328 self,
329 osds: List[str],
330 known: Optional[List[str]] = None, # output argument
331 force: bool = False) -> HandleCommandResult:
332 r = HandleCommandResult(*self.mgr.mon_command({
333 'prefix': "osd ok-to-stop",
334 'ids': osds,
335 'max': 16,
336 }))
337 j = None
338 try:
339 j = json.loads(r.stdout)
340 except json.decoder.JSONDecodeError:
341 self.mgr.log.warning("osd ok-to-stop didn't return structured result")
342 raise
343 if r.retval:
344 return r
345 if known is not None and j and j.get('ok_to_stop'):
346 self.mgr.log.debug(f"got {j}")
347 known.extend([f'osd.{x}' for x in j.get('osds', [])])
348 return HandleCommandResult(
349 0,
350 f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
351 ''
352 )
353
354 def ok_to_stop(
355 self,
356 daemon_ids: List[str],
357 force: bool = False,
358 known: Optional[List[str]] = None # output argument
359 ) -> HandleCommandResult:
360 names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
361 out = f'It appears safe to stop {",".join(names)}'
362 err = f'It is NOT safe to stop {",".join(names)} at this time'
363
364 if self.TYPE not in ['mon', 'osd', 'mds']:
365 logger.debug(out)
366 return HandleCommandResult(0, out)
367
368 if self.TYPE == 'osd':
369 return self.ok_to_stop_osd(daemon_ids, known, force)
370
371 r = HandleCommandResult(*self.mgr.mon_command({
372 'prefix': f'{self.TYPE} ok-to-stop',
373 'ids': daemon_ids,
374 }))
375
376 if r.retval:
377 err = f'{err}: {r.stderr}' if r.stderr else err
378 logger.debug(err)
379 return HandleCommandResult(r.retval, r.stdout, err)
380
381 out = f'{out}: {r.stdout}' if r.stdout else out
382 logger.debug(out)
383 return HandleCommandResult(r.retval, out, r.stderr)
384
385 def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
386 # Provides a warning about if it possible or not to stop <n> daemons in a service
387 names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
388 number_of_running_daemons = len(
389 [daemon
390 for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
391 if daemon.status == DaemonDescriptionStatus.running])
392 if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
393 return False, f'It is presumed safe to stop {names}'
394
395 num_daemons_left = number_of_running_daemons - len(daemon_ids)
396
397 def plural(count: int) -> str:
398 return 'daemon' if count == 1 else 'daemons'
399
400 left_count = "no" if num_daemons_left == 0 else num_daemons_left
401
402 if alert:
403 out = (f'ALERT: Cannot stop {names} in {service} service. '
404 f'Not enough remaining {service} daemons. '
405 f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
406 else:
407 out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
408 f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
409 f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
410 return True, out
411
412 def pre_remove(self, daemon: DaemonDescription) -> None:
413 """
414 Called before the daemon is removed.
415 """
416 assert daemon.daemon_type is not None
417 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
418 logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
419
420 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
421 """
422 Called after the daemon is removed.
423 """
424 assert daemon.daemon_type is not None
425 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
426 logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
427
428 def purge(self, service_name: str) -> None:
429 """Called to carry out any purge tasks following service removal"""
430 logger.debug(f'Purge called for {self.TYPE} - no action taken')
431
432
433 class CephService(CephadmService):
434 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
435 # Ceph.daemons (mon, mgr, mds, osd, etc)
436 cephadm_config = self.get_config_and_keyring(
437 daemon_spec.daemon_type,
438 daemon_spec.daemon_id,
439 host=daemon_spec.host,
440 keyring=daemon_spec.keyring,
441 extra_ceph_config=daemon_spec.ceph_conf)
442
443 if daemon_spec.config_get_files():
444 cephadm_config.update({'files': daemon_spec.config_get_files()})
445
446 return cephadm_config, []
447
448 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
449 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
450 self.remove_keyring(daemon)
451
452 def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
453 """
454 Map the daemon id to a cephx keyring entity name
455 """
456 # despite this mapping entity names to daemons, self.TYPE within
457 # the CephService class refers to service types, not daemon types
458 if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
459 return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
460 elif self.TYPE in ['crash', 'agent']:
461 if host == "":
462 raise OrchestratorError(
463 f'Host not provided to generate <{self.TYPE}> auth entity name')
464 return AuthEntity(f'client.{self.TYPE}.{host}')
465 elif self.TYPE == 'mon':
466 return AuthEntity('mon.')
467 elif self.TYPE in ['mgr', 'osd', 'mds']:
468 return AuthEntity(f'{self.TYPE}.{daemon_id}')
469 else:
470 raise OrchestratorError("unknown daemon type")
471
472 def get_config_and_keyring(self,
473 daemon_type: str,
474 daemon_id: str,
475 host: str,
476 keyring: Optional[str] = None,
477 extra_ceph_config: Optional[str] = None
478 ) -> Dict[str, Any]:
479 # keyring
480 if not keyring:
481 entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
482 ret, keyring, err = self.mgr.check_mon_command({
483 'prefix': 'auth get',
484 'entity': entity,
485 })
486
487 config = self.mgr.get_minimal_ceph_conf()
488
489 if extra_ceph_config:
490 config += extra_ceph_config
491
492 return {
493 'config': config,
494 'keyring': keyring,
495 }
496
497 def remove_keyring(self, daemon: DaemonDescription) -> None:
498 assert daemon.daemon_id is not None
499 assert daemon.hostname is not None
500 daemon_id: str = daemon.daemon_id
501 host: str = daemon.hostname
502
503 assert daemon.daemon_type != 'mon'
504
505 entity = self.get_auth_entity(daemon_id, host=host)
506
507 logger.info(f'Removing key for {entity}')
508 ret, out, err = self.mgr.mon_command({
509 'prefix': 'auth rm',
510 'entity': entity,
511 })
512
513
514 class MonService(CephService):
515 TYPE = 'mon'
516
517 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
518 """
519 Create a new monitor on the given host.
520 """
521 assert self.TYPE == daemon_spec.daemon_type
522 name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
523
524 # get mon. key
525 ret, keyring, err = self.mgr.check_mon_command({
526 'prefix': 'auth get',
527 'entity': self.get_auth_entity(name),
528 })
529
530 extra_config = '[mon.%s]\n' % name
531 if network:
532 # infer whether this is a CIDR network, addrvec, or plain IP
533 if '/' in network:
534 extra_config += 'public network = %s\n' % network
535 elif network.startswith('[v') and network.endswith(']'):
536 extra_config += 'public addrv = %s\n' % network
537 elif is_ipv6(network):
538 extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
539 elif ':' not in network:
540 extra_config += 'public addr = %s\n' % network
541 else:
542 raise OrchestratorError(
543 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
544 else:
545 # try to get the public_network from the config
546 ret, network, err = self.mgr.check_mon_command({
547 'prefix': 'config get',
548 'who': 'mon',
549 'key': 'public_network',
550 })
551 network = network.strip() if network else network
552 if not network:
553 raise OrchestratorError(
554 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
555 if '/' not in network:
556 raise OrchestratorError(
557 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
558 extra_config += 'public network = %s\n' % network
559
560 daemon_spec.ceph_conf = extra_config
561 daemon_spec.keyring = keyring
562
563 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
564
565 return daemon_spec
566
567 def _check_safe_to_destroy(self, mon_id: str) -> None:
568 ret, out, err = self.mgr.check_mon_command({
569 'prefix': 'quorum_status',
570 })
571 try:
572 j = json.loads(out)
573 except Exception:
574 raise OrchestratorError('failed to parse quorum status')
575
576 mons = [m['name'] for m in j['monmap']['mons']]
577 if mon_id not in mons:
578 logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
579 mon_id, mons))
580 return
581 new_mons = [m for m in mons if m != mon_id]
582 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
583 if len(new_quorum) > len(new_mons) / 2:
584 logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
585 (mon_id, new_quorum, new_mons))
586 return
587 raise OrchestratorError(
588 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
589
590 def pre_remove(self, daemon: DaemonDescription) -> None:
591 super().pre_remove(daemon)
592
593 assert daemon.daemon_id is not None
594 daemon_id: str = daemon.daemon_id
595 self._check_safe_to_destroy(daemon_id)
596
597 # remove mon from quorum before we destroy the daemon
598 logger.info('Removing monitor %s from monmap...' % daemon_id)
599 ret, out, err = self.mgr.check_mon_command({
600 'prefix': 'mon rm',
601 'name': daemon_id,
602 })
603
604 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
605 # Do not remove the mon keyring.
606 # super().post_remove(daemon)
607 pass
608
609
610 class MgrService(CephService):
611 TYPE = 'mgr'
612
613 def allow_colo(self) -> bool:
614 if self.mgr.get_ceph_option('mgr_standby_modules'):
615 # traditional mgr mode: standby daemons' modules listen on
616 # ports and redirect to the primary. we must not schedule
617 # multiple mgrs on the same host or else ports will
618 # conflict.
619 return False
620 else:
621 # standby daemons do nothing, and therefore port conflicts
622 # are not a concern.
623 return True
624
625 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
626 """
627 Create a new manager instance on a host.
628 """
629 assert self.TYPE == daemon_spec.daemon_type
630 mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
631
632 # get mgr. key
633 keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
634 ['mon', 'profile mgr',
635 'osd', 'allow *',
636 'mds', 'allow *'])
637
638 # Retrieve ports used by manager modules
639 # In the case of the dashboard port and with several manager daemons
640 # running in different hosts, it exists the possibility that the
641 # user has decided to use different dashboard ports in each server
642 # If this is the case then the dashboard port opened will be only the used
643 # as default.
644 ports = []
645 ret, mgr_services, err = self.mgr.check_mon_command({
646 'prefix': 'mgr services',
647 })
648 if mgr_services:
649 mgr_endpoints = json.loads(mgr_services)
650 for end_point in mgr_endpoints.values():
651 port = re.search(r'\:\d+\/', end_point)
652 if port:
653 ports.append(int(port[0][1:-1]))
654
655 if ports:
656 daemon_spec.ports = ports
657
658 daemon_spec.keyring = keyring
659
660 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
661
662 return daemon_spec
663
664 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
665 for daemon in daemon_descrs:
666 assert daemon.daemon_type is not None
667 assert daemon.daemon_id is not None
668 if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
669 return daemon
670 # if no active mgr found, return empty Daemon Desc
671 return DaemonDescription()
672
673 def fail_over(self) -> None:
674 # this has been seen to sometimes transiently fail even when there are multiple
675 # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
676 class NoStandbyError(OrchestratorError):
677 pass
678 no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=(
679 'daemon', 'mgr' + self.mgr.get_mgr_id()))
680 for sleep_secs in [2, 8, 15]:
681 try:
682 if not self.mgr_map_has_standby():
683 raise no_standby_exc
684 self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
685 'INFO', 'Failing over to other MGR')
686 logger.info('Failing over to other MGR')
687
688 # fail over
689 ret, out, err = self.mgr.check_mon_command({
690 'prefix': 'mgr fail',
691 'who': self.mgr.get_mgr_id(),
692 })
693 return
694 except NoStandbyError:
695 logger.info(
696 f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
697 time.sleep(sleep_secs)
698 raise no_standby_exc
699
700 def mgr_map_has_standby(self) -> bool:
701 """
702 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
703 we know it joined the cluster
704 """
705 mgr_map = self.mgr.get('mgr_map')
706 num = len(mgr_map.get('standbys'))
707 return bool(num)
708
709 def ok_to_stop(
710 self,
711 daemon_ids: List[str],
712 force: bool = False,
713 known: Optional[List[str]] = None # output argument
714 ) -> HandleCommandResult:
715 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
716
717 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
718 if warn:
719 return HandleCommandResult(-errno.EBUSY, '', warn_message)
720
721 mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
722 active = self.get_active_daemon(mgr_daemons).daemon_id
723 if active in daemon_ids:
724 warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
725 return HandleCommandResult(-errno.EBUSY, '', warn_message)
726
727 return HandleCommandResult(0, warn_message, '')
728
729
730 class MdsService(CephService):
731 TYPE = 'mds'
732
733 def allow_colo(self) -> bool:
734 return True
735
736 def config(self, spec: ServiceSpec) -> None:
737 assert self.TYPE == spec.service_type
738 assert spec.service_id
739
740 # ensure mds_join_fs is set for these daemons
741 ret, out, err = self.mgr.check_mon_command({
742 'prefix': 'config set',
743 'who': 'mds.' + spec.service_id,
744 'name': 'mds_join_fs',
745 'value': spec.service_id,
746 })
747
748 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
749 assert self.TYPE == daemon_spec.daemon_type
750 mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
751
752 # get mds. key
753 keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
754 ['mon', 'profile mds',
755 'osd', 'allow rw tag cephfs *=*',
756 'mds', 'allow'])
757 daemon_spec.keyring = keyring
758
759 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
760
761 return daemon_spec
762
763 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
764 active_mds_strs = list()
765 for fs in self.mgr.get('fs_map')['filesystems']:
766 mds_map = fs['mdsmap']
767 if mds_map is not None:
768 for mds_id, mds_status in mds_map['info'].items():
769 if mds_status['state'] == 'up:active':
770 active_mds_strs.append(mds_status['name'])
771 if len(active_mds_strs) != 0:
772 for daemon in daemon_descrs:
773 if daemon.daemon_id in active_mds_strs:
774 return daemon
775 # if no mds found, return empty Daemon Desc
776 return DaemonDescription()
777
778 def purge(self, service_name: str) -> None:
779 self.mgr.check_mon_command({
780 'prefix': 'config rm',
781 'who': service_name,
782 'name': 'mds_join_fs',
783 })
784
785
786 class RgwService(CephService):
787 TYPE = 'rgw'
788
789 def allow_colo(self) -> bool:
790 return True
791
792 def config(self, spec: RGWSpec) -> None: # type: ignore
793 assert self.TYPE == spec.service_type
794
795 # set rgw_realm and rgw_zone, if present
796 if spec.rgw_realm:
797 ret, out, err = self.mgr.check_mon_command({
798 'prefix': 'config set',
799 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
800 'name': 'rgw_realm',
801 'value': spec.rgw_realm,
802 })
803 if spec.rgw_zone:
804 ret, out, err = self.mgr.check_mon_command({
805 'prefix': 'config set',
806 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
807 'name': 'rgw_zone',
808 'value': spec.rgw_zone,
809 })
810
811 if spec.rgw_frontend_ssl_certificate:
812 if isinstance(spec.rgw_frontend_ssl_certificate, list):
813 cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
814 elif isinstance(spec.rgw_frontend_ssl_certificate, str):
815 cert_data = spec.rgw_frontend_ssl_certificate
816 else:
817 raise OrchestratorError(
818 'Invalid rgw_frontend_ssl_certificate: %s'
819 % spec.rgw_frontend_ssl_certificate)
820 ret, out, err = self.mgr.check_mon_command({
821 'prefix': 'config-key set',
822 'key': f'rgw/cert/{spec.service_name()}',
823 'val': cert_data,
824 })
825
826 # TODO: fail, if we don't have a spec
827 logger.info('Saving service %s spec with placement %s' % (
828 spec.service_name(), spec.placement.pretty_str()))
829 self.mgr.spec_store.save(spec)
830 self.mgr.trigger_connect_dashboard_rgw()
831
832 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
833 assert self.TYPE == daemon_spec.daemon_type
834 rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
835 spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
836
837 keyring = self.get_keyring(rgw_id)
838
839 if daemon_spec.ports:
840 port = daemon_spec.ports[0]
841 else:
842 # this is a redeploy of older instance that doesn't have an explicitly
843 # assigned port, in which case we can assume there is only 1 per host
844 # and it matches the spec.
845 port = spec.get_port()
846
847 # configure frontend
848 args = []
849 ftype = spec.rgw_frontend_type or "beast"
850 if ftype == 'beast':
851 if spec.ssl:
852 if daemon_spec.ip:
853 args.append(
854 f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
855 else:
856 args.append(f"ssl_port={port}")
857 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
858 else:
859 if daemon_spec.ip:
860 args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
861 else:
862 args.append(f"port={port}")
863 elif ftype == 'civetweb':
864 if spec.ssl:
865 if daemon_spec.ip:
866 # note the 's' suffix on port
867 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
868 else:
869 args.append(f"port={port}s") # note the 's' suffix on port
870 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
871 else:
872 if daemon_spec.ip:
873 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
874 else:
875 args.append(f"port={port}")
876 frontend = f'{ftype} {" ".join(args)}'
877
878 ret, out, err = self.mgr.check_mon_command({
879 'prefix': 'config set',
880 'who': utils.name_to_config_section(daemon_spec.name()),
881 'name': 'rgw_frontends',
882 'value': frontend
883 })
884
885 daemon_spec.keyring = keyring
886 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
887
888 return daemon_spec
889
890 def get_keyring(self, rgw_id: str) -> str:
891 keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
892 ['mon', 'allow *',
893 'mgr', 'allow rw',
894 'osd', 'allow rwx tag rgw *=*'])
895 return keyring
896
897 def purge(self, service_name: str) -> None:
898 self.mgr.check_mon_command({
899 'prefix': 'config rm',
900 'who': utils.name_to_config_section(service_name),
901 'name': 'rgw_realm',
902 })
903 self.mgr.check_mon_command({
904 'prefix': 'config rm',
905 'who': utils.name_to_config_section(service_name),
906 'name': 'rgw_zone',
907 })
908 self.mgr.check_mon_command({
909 'prefix': 'config-key rm',
910 'key': f'rgw/cert/{service_name}',
911 })
912 self.mgr.trigger_connect_dashboard_rgw()
913
914 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
915 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
916 self.mgr.check_mon_command({
917 'prefix': 'config rm',
918 'who': utils.name_to_config_section(daemon.name()),
919 'name': 'rgw_frontends',
920 })
921
922 def ok_to_stop(
923 self,
924 daemon_ids: List[str],
925 force: bool = False,
926 known: Optional[List[str]] = None # output argument
927 ) -> HandleCommandResult:
928 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
929 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
930 def ingress_present() -> bool:
931 running_ingress_daemons = [
932 daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
933 running_haproxy_daemons = [
934 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
935 running_keepalived_daemons = [
936 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
937 # check that there is at least one haproxy and keepalived daemon running
938 if running_haproxy_daemons and running_keepalived_daemons:
939 return True
940 return False
941
942 # if only 1 rgw, alert user (this is not passable with --force)
943 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
944 if warn:
945 return HandleCommandResult(-errno.EBUSY, '', warn_message)
946
947 # if reached here, there is > 1 rgw daemon.
948 # Say okay if load balancer present or force flag set
949 if ingress_present() or force:
950 return HandleCommandResult(0, warn_message, '')
951
952 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
953 # Provide warning
954 warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
955 return HandleCommandResult(-errno.EBUSY, '', warn_message)
956
957 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
958 self.mgr.trigger_connect_dashboard_rgw()
959
960
961 class RbdMirrorService(CephService):
962 TYPE = 'rbd-mirror'
963
964 def allow_colo(self) -> bool:
965 return True
966
967 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
968 assert self.TYPE == daemon_spec.daemon_type
969 daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
970
971 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
972 ['mon', 'profile rbd-mirror',
973 'osd', 'profile rbd'])
974
975 daemon_spec.keyring = keyring
976
977 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
978
979 return daemon_spec
980
981 def ok_to_stop(
982 self,
983 daemon_ids: List[str],
984 force: bool = False,
985 known: Optional[List[str]] = None # output argument
986 ) -> HandleCommandResult:
987 # if only 1 rbd-mirror, alert user (this is not passable with --force)
988 warn, warn_message = self._enough_daemons_to_stop(
989 self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
990 if warn:
991 return HandleCommandResult(-errno.EBUSY, '', warn_message)
992 return HandleCommandResult(0, warn_message, '')
993
994
995 class CrashService(CephService):
996 TYPE = 'crash'
997
998 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
999 assert self.TYPE == daemon_spec.daemon_type
1000 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
1001
1002 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
1003 ['mon', 'profile crash',
1004 'mgr', 'profile crash'])
1005
1006 daemon_spec.keyring = keyring
1007
1008 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1009
1010 return daemon_spec
1011
1012
1013 class CephfsMirrorService(CephService):
1014 TYPE = 'cephfs-mirror'
1015
1016 def config(self, spec: ServiceSpec) -> None:
1017 # make sure mirroring module is enabled
1018 mgr_map = self.mgr.get('mgr_map')
1019 mod_name = 'mirroring'
1020 if mod_name not in mgr_map.get('services', {}):
1021 self.mgr.check_mon_command({
1022 'prefix': 'mgr module enable',
1023 'module': mod_name
1024 })
1025 # we shouldn't get here (mon will tell the mgr to respawn), but no
1026 # harm done if we do.
1027
1028 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1029 assert self.TYPE == daemon_spec.daemon_type
1030
1031 ret, keyring, err = self.mgr.check_mon_command({
1032 'prefix': 'auth get-or-create',
1033 'entity': self.get_auth_entity(daemon_spec.daemon_id),
1034 'caps': ['mon', 'profile cephfs-mirror',
1035 'mds', 'allow r',
1036 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
1037 'mgr', 'allow r'],
1038 })
1039
1040 daemon_spec.keyring = keyring
1041 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1042 return daemon_spec
1043
1044
1045 class CephadmAgent(CephService):
1046 TYPE = 'agent'
1047
1048 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1049 assert self.TYPE == daemon_spec.daemon_type
1050 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
1051
1052 if not self.mgr.cherrypy_thread:
1053 raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
1054
1055 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
1056 daemon_spec.keyring = keyring
1057 self.mgr.agent_cache.agent_keys[host] = keyring
1058
1059 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1060
1061 return daemon_spec
1062
1063 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
1064 try:
1065 assert self.mgr.cherrypy_thread
1066 assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
1067 assert self.mgr.cherrypy_thread.server_port
1068 except Exception:
1069 raise OrchestratorError(
1070 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
1071
1072 cfg = {'target_ip': self.mgr.get_mgr_ip(),
1073 'target_port': self.mgr.cherrypy_thread.server_port,
1074 'refresh_period': self.mgr.agent_refresh_rate,
1075 'listener_port': self.mgr.agent_starting_port,
1076 'host': daemon_spec.host,
1077 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
1078
1079 listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
1080 self.mgr.inventory.get_addr(daemon_spec.host))
1081 config = {
1082 'agent.json': json.dumps(cfg),
1083 'keyring': daemon_spec.keyring,
1084 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
1085 'listener.crt': listener_cert,
1086 'listener.key': listener_key,
1087 }
1088
1089 return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
1090 self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
1091 str(self.mgr.get_module_option('device_enhanced_scan'))])