]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
1 import errno
2 import json
3 import logging
4 import re
5 import socket
6 import time
7 from abc import ABCMeta, abstractmethod
8 from typing import TYPE_CHECKING, List, Callable, TypeVar, \
9 Optional, Dict, Any, Tuple, NewType, cast
10
11 from mgr_module import HandleCommandResult, MonCommandFailed
12
13 from ceph.deployment.service_spec import ServiceSpec, RGWSpec
14 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
15 from mgr_util import build_url
16 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
17 from orchestrator._interface import daemon_type_to_service
18 from cephadm import utils
19
20 if TYPE_CHECKING:
21 from cephadm.module import CephadmOrchestrator
22
23 logger = logging.getLogger(__name__)
24
25 ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
26 AuthEntity = NewType('AuthEntity', str)
27
28
29 class CephadmDaemonDeploySpec:
30 # typing.NamedTuple + Generic is broken in py36
31 def __init__(self, host: str, daemon_id: str,
32 service_name: str,
33 network: Optional[str] = None,
34 keyring: Optional[str] = None,
35 extra_args: Optional[List[str]] = None,
36 ceph_conf: str = '',
37 extra_files: Optional[Dict[str, Any]] = None,
38 daemon_type: Optional[str] = None,
39 ip: Optional[str] = None,
40 ports: Optional[List[int]] = None,
41 rank: Optional[int] = None,
42 rank_generation: Optional[int] = None,
43 extra_container_args: Optional[List[str]] = None):
44 """
45 A data struction to encapsulate `cephadm deploy ...
46 """
47 self.host: str = host
48 self.daemon_id = daemon_id
49 self.service_name = service_name
50 daemon_type = daemon_type or (service_name.split('.')[0])
51 assert daemon_type is not None
52 self.daemon_type: str = daemon_type
53
54 # mons
55 self.network = network
56
57 # for run_cephadm.
58 self.keyring: Optional[str] = keyring
59
60 # For run_cephadm. Would be great to have more expressive names.
61 self.extra_args: List[str] = extra_args or []
62
63 self.ceph_conf = ceph_conf
64 self.extra_files = extra_files or {}
65
66 # TCP ports used by the daemon
67 self.ports: List[int] = ports or []
68 self.ip: Optional[str] = ip
69
70 # values to be populated during generate_config calls
71 # and then used in _run_cephadm
72 self.final_config: Dict[str, Any] = {}
73 self.deps: List[str] = []
74
75 self.rank: Optional[int] = rank
76 self.rank_generation: Optional[int] = rank_generation
77
78 self.extra_container_args = extra_container_args
79
80 def name(self) -> str:
81 return '%s.%s' % (self.daemon_type, self.daemon_id)
82
83 def config_get_files(self) -> Dict[str, Any]:
84 files = self.extra_files
85 if self.ceph_conf:
86 files['config'] = self.ceph_conf
87
88 return files
89
90 @staticmethod
91 def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
92 assert dd.hostname
93 assert dd.daemon_id
94 assert dd.daemon_type
95 return CephadmDaemonDeploySpec(
96 host=dd.hostname,
97 daemon_id=dd.daemon_id,
98 daemon_type=dd.daemon_type,
99 service_name=dd.service_name(),
100 ip=dd.ip,
101 ports=dd.ports,
102 rank=dd.rank,
103 rank_generation=dd.rank_generation,
104 extra_container_args=dd.extra_container_args,
105 )
106
107 def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
108 return DaemonDescription(
109 daemon_type=self.daemon_type,
110 daemon_id=self.daemon_id,
111 service_name=self.service_name,
112 hostname=self.host,
113 status=status,
114 status_desc=status_desc,
115 ip=self.ip,
116 ports=self.ports,
117 rank=self.rank,
118 rank_generation=self.rank_generation,
119 extra_container_args=self.extra_container_args,
120 )
121
122
123 class CephadmService(metaclass=ABCMeta):
124 """
125 Base class for service types. Often providing a create() and config() fn.
126 """
127
128 @property
129 @abstractmethod
130 def TYPE(self) -> str:
131 pass
132
133 def __init__(self, mgr: "CephadmOrchestrator"):
134 self.mgr: "CephadmOrchestrator" = mgr
135
136 def allow_colo(self) -> bool:
137 """
138 Return True if multiple daemons of the same type can colocate on
139 the same host.
140 """
141 return False
142
143 def primary_daemon_type(self) -> str:
144 """
145 This is the type of the primary (usually only) daemon to be deployed.
146 """
147 return self.TYPE
148
149 def per_host_daemon_type(self) -> Optional[str]:
150 """
151 If defined, this type of daemon will be deployed once for each host
152 containing one or more daemons of the primary type.
153 """
154 return None
155
156 def ranked(self) -> bool:
157 """
158 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
159 generation (0, 1, ...) to each daemon we create/deploy.
160 """
161 return False
162
163 def fence_old_ranks(self,
164 spec: ServiceSpec,
165 rank_map: Dict[int, Dict[int, Optional[str]]],
166 num_ranks: int) -> None:
167 assert False
168
169 def make_daemon_spec(
170 self,
171 host: str,
172 daemon_id: str,
173 network: str,
174 spec: ServiceSpecs,
175 daemon_type: Optional[str] = None,
176 ports: Optional[List[int]] = None,
177 ip: Optional[str] = None,
178 rank: Optional[int] = None,
179 rank_generation: Optional[int] = None,
180 ) -> CephadmDaemonDeploySpec:
181 try:
182 eca = spec.extra_container_args
183 except AttributeError:
184 eca = None
185 return CephadmDaemonDeploySpec(
186 host=host,
187 daemon_id=daemon_id,
188 service_name=spec.service_name(),
189 network=network,
190 daemon_type=daemon_type,
191 ports=ports,
192 ip=ip,
193 rank=rank,
194 rank_generation=rank_generation,
195 extra_container_args=eca,
196 )
197
198 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
199 raise NotImplementedError()
200
201 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
202 raise NotImplementedError()
203
204 def config(self, spec: ServiceSpec) -> None:
205 """
206 Configure the cluster for this service. Only called *once* per
207 service apply. Not for every daemon.
208 """
209 pass
210
211 def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
212 """The post actions needed to be done after daemons are checked"""
213 if self.mgr.config_dashboard:
214 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
215 self.config_dashboard(daemon_descrs)
216 else:
217 logger.debug('Dashboard is not enabled. Skip configuration.')
218
219 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
220 """Config dashboard settings."""
221 raise NotImplementedError()
222
223 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
224 # if this is called for a service type where it hasn't explcitly been
225 # defined, return empty Daemon Desc
226 return DaemonDescription()
227
228 def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
229 ret, keyring, err = self.mgr.mon_command({
230 'prefix': 'auth get-or-create',
231 'entity': entity,
232 'caps': caps,
233 })
234 if err:
235 ret, out, err = self.mgr.mon_command({
236 'prefix': 'auth caps',
237 'entity': entity,
238 'caps': caps,
239 })
240 if err:
241 self.mgr.log.warning(f"Unable to update caps for {entity}")
242 return keyring
243
244 def _inventory_get_fqdn(self, hostname: str) -> str:
245 """Get a host's FQDN with its hostname.
246
247 If the FQDN can't be resolved, the address from the inventory will
248 be returned instead.
249 """
250 addr = self.mgr.inventory.get_addr(hostname)
251 return socket.getfqdn(addr)
252
253 def _set_service_url_on_dashboard(self,
254 service_name: str,
255 get_mon_cmd: str,
256 set_mon_cmd: str,
257 service_url: str) -> None:
258 """A helper to get and set service_url via Dashboard's MON command.
259
260 If result of get_mon_cmd differs from service_url, set_mon_cmd will
261 be sent to set the service_url.
262 """
263 def get_set_cmd_dicts(out: str) -> List[dict]:
264 cmd_dict = {
265 'prefix': set_mon_cmd,
266 'value': service_url
267 }
268 return [cmd_dict] if service_url != out else []
269
270 self._check_and_set_dashboard(
271 service_name=service_name,
272 get_cmd=get_mon_cmd,
273 get_set_cmd_dicts=get_set_cmd_dicts
274 )
275
276 def _check_and_set_dashboard(self,
277 service_name: str,
278 get_cmd: str,
279 get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
280 """A helper to set configs in the Dashboard.
281
282 The method is useful for the pattern:
283 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
284 gateways.
285 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
286 - Determine if the config need to be update. NOTE: This step is important because if a
287 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
288 kicks the serve() loop and the logic using this method is likely to be called again.
289 A config should be updated only when needed.
290 - Update a config in Dashboard by using a Dashboard command.
291
292 :param service_name: the service name to be used for logging
293 :type service_name: str
294 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
295 :type get_cmd: str
296 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
297 e.g.
298 [
299 {
300 'prefix': 'dashboard iscsi-gateway-add',
301 'service_url': 'http://admin:admin@aaa:5000',
302 'name': 'aaa'
303 },
304 {
305 'prefix': 'dashboard iscsi-gateway-add',
306 'service_url': 'http://admin:admin@bbb:5000',
307 'name': 'bbb'
308 }
309 ]
310 The function should return empty list if no command need to be sent.
311 :type get_set_cmd_dicts: Callable[[str], List[dict]]
312 """
313
314 try:
315 _, out, _ = self.mgr.check_mon_command({
316 'prefix': get_cmd
317 })
318 except MonCommandFailed as e:
319 logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
320 return
321 cmd_dicts = get_set_cmd_dicts(out.strip())
322 for cmd_dict in list(cmd_dicts):
323 try:
324 inbuf = cmd_dict.pop('inbuf', None)
325 _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
326 except MonCommandFailed as e:
327 logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
328
329 def ok_to_stop_osd(
330 self,
331 osds: List[str],
332 known: Optional[List[str]] = None, # output argument
333 force: bool = False) -> HandleCommandResult:
334 r = HandleCommandResult(*self.mgr.mon_command({
335 'prefix': "osd ok-to-stop",
336 'ids': osds,
337 'max': 16,
338 }))
339 j = None
340 try:
341 j = json.loads(r.stdout)
342 except json.decoder.JSONDecodeError:
343 self.mgr.log.warning("osd ok-to-stop didn't return structured result")
344 raise
345 if r.retval:
346 return r
347 if known is not None and j and j.get('ok_to_stop'):
348 self.mgr.log.debug(f"got {j}")
349 known.extend([f'osd.{x}' for x in j.get('osds', [])])
350 return HandleCommandResult(
351 0,
352 f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
353 ''
354 )
355
356 def ok_to_stop(
357 self,
358 daemon_ids: List[str],
359 force: bool = False,
360 known: Optional[List[str]] = None # output argument
361 ) -> HandleCommandResult:
362 names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
363 out = f'It appears safe to stop {",".join(names)}'
364 err = f'It is NOT safe to stop {",".join(names)} at this time'
365
366 if self.TYPE not in ['mon', 'osd', 'mds']:
367 logger.debug(out)
368 return HandleCommandResult(0, out)
369
370 if self.TYPE == 'osd':
371 return self.ok_to_stop_osd(daemon_ids, known, force)
372
373 r = HandleCommandResult(*self.mgr.mon_command({
374 'prefix': f'{self.TYPE} ok-to-stop',
375 'ids': daemon_ids,
376 }))
377
378 if r.retval:
379 err = f'{err}: {r.stderr}' if r.stderr else err
380 logger.debug(err)
381 return HandleCommandResult(r.retval, r.stdout, err)
382
383 out = f'{out}: {r.stdout}' if r.stdout else out
384 logger.debug(out)
385 return HandleCommandResult(r.retval, out, r.stderr)
386
387 def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
388 # Provides a warning about if it possible or not to stop <n> daemons in a service
389 names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
390 number_of_running_daemons = len(
391 [daemon
392 for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
393 if daemon.status == DaemonDescriptionStatus.running])
394 if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
395 return False, f'It is presumed safe to stop {names}'
396
397 num_daemons_left = number_of_running_daemons - len(daemon_ids)
398
399 def plural(count: int) -> str:
400 return 'daemon' if count == 1 else 'daemons'
401
402 left_count = "no" if num_daemons_left == 0 else num_daemons_left
403
404 if alert:
405 out = (f'ALERT: Cannot stop {names} in {service} service. '
406 f'Not enough remaining {service} daemons. '
407 f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
408 else:
409 out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
410 f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
411 f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
412 return True, out
413
414 def pre_remove(self, daemon: DaemonDescription) -> None:
415 """
416 Called before the daemon is removed.
417 """
418 assert daemon.daemon_type is not None
419 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
420 logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
421
422 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
423 """
424 Called after the daemon is removed.
425 """
426 assert daemon.daemon_type is not None
427 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
428 logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
429
430 def purge(self, service_name: str) -> None:
431 """Called to carry out any purge tasks following service removal"""
432 logger.debug(f'Purge called for {self.TYPE} - no action taken')
433
434
435 class CephService(CephadmService):
436 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
437 # Ceph.daemons (mon, mgr, mds, osd, etc)
438 cephadm_config = self.get_config_and_keyring(
439 daemon_spec.daemon_type,
440 daemon_spec.daemon_id,
441 host=daemon_spec.host,
442 keyring=daemon_spec.keyring,
443 extra_ceph_config=daemon_spec.ceph_conf)
444
445 if daemon_spec.config_get_files():
446 cephadm_config.update({'files': daemon_spec.config_get_files()})
447
448 return cephadm_config, []
449
450 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
451 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
452 self.remove_keyring(daemon)
453
454 def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
455 """
456 Map the daemon id to a cephx keyring entity name
457 """
458 # despite this mapping entity names to daemons, self.TYPE within
459 # the CephService class refers to service types, not daemon types
460 if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
461 return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
462 elif self.TYPE in ['crash', 'agent']:
463 if host == "":
464 raise OrchestratorError(
465 f'Host not provided to generate <{self.TYPE}> auth entity name')
466 return AuthEntity(f'client.{self.TYPE}.{host}')
467 elif self.TYPE == 'mon':
468 return AuthEntity('mon.')
469 elif self.TYPE in ['mgr', 'osd', 'mds']:
470 return AuthEntity(f'{self.TYPE}.{daemon_id}')
471 else:
472 raise OrchestratorError("unknown daemon type")
473
474 def get_config_and_keyring(self,
475 daemon_type: str,
476 daemon_id: str,
477 host: str,
478 keyring: Optional[str] = None,
479 extra_ceph_config: Optional[str] = None
480 ) -> Dict[str, Any]:
481 # keyring
482 if not keyring:
483 entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
484 ret, keyring, err = self.mgr.check_mon_command({
485 'prefix': 'auth get',
486 'entity': entity,
487 })
488
489 config = self.mgr.get_minimal_ceph_conf()
490
491 if extra_ceph_config:
492 config += extra_ceph_config
493
494 return {
495 'config': config,
496 'keyring': keyring,
497 }
498
499 def remove_keyring(self, daemon: DaemonDescription) -> None:
500 assert daemon.daemon_id is not None
501 assert daemon.hostname is not None
502 daemon_id: str = daemon.daemon_id
503 host: str = daemon.hostname
504
505 assert daemon.daemon_type != 'mon'
506
507 entity = self.get_auth_entity(daemon_id, host=host)
508
509 logger.info(f'Removing key for {entity}')
510 ret, out, err = self.mgr.mon_command({
511 'prefix': 'auth rm',
512 'entity': entity,
513 })
514
515
516 class MonService(CephService):
517 TYPE = 'mon'
518
519 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
520 """
521 Create a new monitor on the given host.
522 """
523 assert self.TYPE == daemon_spec.daemon_type
524 name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
525
526 # get mon. key
527 ret, keyring, err = self.mgr.check_mon_command({
528 'prefix': 'auth get',
529 'entity': self.get_auth_entity(name),
530 })
531
532 extra_config = '[mon.%s]\n' % name
533 if network:
534 # infer whether this is a CIDR network, addrvec, or plain IP
535 if '/' in network:
536 extra_config += 'public network = %s\n' % network
537 elif network.startswith('[v') and network.endswith(']'):
538 extra_config += 'public addrv = %s\n' % network
539 elif is_ipv6(network):
540 extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
541 elif ':' not in network:
542 extra_config += 'public addr = %s\n' % network
543 else:
544 raise OrchestratorError(
545 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
546 else:
547 # try to get the public_network from the config
548 ret, network, err = self.mgr.check_mon_command({
549 'prefix': 'config get',
550 'who': 'mon',
551 'key': 'public_network',
552 })
553 network = network.strip() if network else network
554 if not network:
555 raise OrchestratorError(
556 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
557 if '/' not in network:
558 raise OrchestratorError(
559 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
560 extra_config += 'public network = %s\n' % network
561
562 daemon_spec.ceph_conf = extra_config
563 daemon_spec.keyring = keyring
564
565 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
566
567 return daemon_spec
568
569 def _check_safe_to_destroy(self, mon_id: str) -> None:
570 ret, out, err = self.mgr.check_mon_command({
571 'prefix': 'quorum_status',
572 })
573 try:
574 j = json.loads(out)
575 except Exception:
576 raise OrchestratorError('failed to parse quorum status')
577
578 mons = [m['name'] for m in j['monmap']['mons']]
579 if mon_id not in mons:
580 logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
581 mon_id, mons))
582 return
583 new_mons = [m for m in mons if m != mon_id]
584 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
585 if len(new_quorum) > len(new_mons) / 2:
586 logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
587 (mon_id, new_quorum, new_mons))
588 return
589 raise OrchestratorError(
590 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
591
592 def pre_remove(self, daemon: DaemonDescription) -> None:
593 super().pre_remove(daemon)
594
595 assert daemon.daemon_id is not None
596 daemon_id: str = daemon.daemon_id
597 self._check_safe_to_destroy(daemon_id)
598
599 # remove mon from quorum before we destroy the daemon
600 logger.info('Removing monitor %s from monmap...' % daemon_id)
601 ret, out, err = self.mgr.check_mon_command({
602 'prefix': 'mon rm',
603 'name': daemon_id,
604 })
605
606 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
607 # Do not remove the mon keyring.
608 # super().post_remove(daemon)
609 pass
610
611
612 class MgrService(CephService):
613 TYPE = 'mgr'
614
615 def allow_colo(self) -> bool:
616 if self.mgr.get_ceph_option('mgr_standby_modules'):
617 # traditional mgr mode: standby daemons' modules listen on
618 # ports and redirect to the primary. we must not schedule
619 # multiple mgrs on the same host or else ports will
620 # conflict.
621 return False
622 else:
623 # standby daemons do nothing, and therefore port conflicts
624 # are not a concern.
625 return True
626
627 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
628 """
629 Create a new manager instance on a host.
630 """
631 assert self.TYPE == daemon_spec.daemon_type
632 mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
633
634 # get mgr. key
635 keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
636 ['mon', 'profile mgr',
637 'osd', 'allow *',
638 'mds', 'allow *'])
639
640 # Retrieve ports used by manager modules
641 # In the case of the dashboard port and with several manager daemons
642 # running in different hosts, it exists the possibility that the
643 # user has decided to use different dashboard ports in each server
644 # If this is the case then the dashboard port opened will be only the used
645 # as default.
646 ports = []
647 ret, mgr_services, err = self.mgr.check_mon_command({
648 'prefix': 'mgr services',
649 })
650 if mgr_services:
651 mgr_endpoints = json.loads(mgr_services)
652 for end_point in mgr_endpoints.values():
653 port = re.search(r'\:\d+\/', end_point)
654 if port:
655 ports.append(int(port[0][1:-1]))
656
657 if ports:
658 daemon_spec.ports = ports
659
660 daemon_spec.keyring = keyring
661
662 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
663
664 return daemon_spec
665
666 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
667 for daemon in daemon_descrs:
668 assert daemon.daemon_type is not None
669 assert daemon.daemon_id is not None
670 if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
671 return daemon
672 # if no active mgr found, return empty Daemon Desc
673 return DaemonDescription()
674
675 def fail_over(self) -> None:
676 # this has been seen to sometimes transiently fail even when there are multiple
677 # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
678 class NoStandbyError(OrchestratorError):
679 pass
680 no_standby_exc = NoStandbyError('Need standby mgr daemon', event_kind_subject=(
681 'daemon', 'mgr' + self.mgr.get_mgr_id()))
682 for sleep_secs in [2, 8, 15]:
683 try:
684 if not self.mgr_map_has_standby():
685 raise no_standby_exc
686 self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
687 'INFO', 'Failing over to other MGR')
688 logger.info('Failing over to other MGR')
689
690 # fail over
691 ret, out, err = self.mgr.check_mon_command({
692 'prefix': 'mgr fail',
693 'who': self.mgr.get_mgr_id(),
694 })
695 return
696 except NoStandbyError:
697 logger.info(
698 f'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
699 time.sleep(sleep_secs)
700 raise no_standby_exc
701
702 def mgr_map_has_standby(self) -> bool:
703 """
704 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
705 we know it joined the cluster
706 """
707 mgr_map = self.mgr.get('mgr_map')
708 num = len(mgr_map.get('standbys'))
709 return bool(num)
710
711 def ok_to_stop(
712 self,
713 daemon_ids: List[str],
714 force: bool = False,
715 known: Optional[List[str]] = None # output argument
716 ) -> HandleCommandResult:
717 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
718
719 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
720 if warn:
721 return HandleCommandResult(-errno.EBUSY, '', warn_message)
722
723 mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
724 active = self.get_active_daemon(mgr_daemons).daemon_id
725 if active in daemon_ids:
726 warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
727 return HandleCommandResult(-errno.EBUSY, '', warn_message)
728
729 return HandleCommandResult(0, warn_message, '')
730
731
732 class MdsService(CephService):
733 TYPE = 'mds'
734
735 def allow_colo(self) -> bool:
736 return True
737
738 def config(self, spec: ServiceSpec) -> None:
739 assert self.TYPE == spec.service_type
740 assert spec.service_id
741
742 # ensure mds_join_fs is set for these daemons
743 ret, out, err = self.mgr.check_mon_command({
744 'prefix': 'config set',
745 'who': 'mds.' + spec.service_id,
746 'name': 'mds_join_fs',
747 'value': spec.service_id,
748 })
749
750 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
751 assert self.TYPE == daemon_spec.daemon_type
752 mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
753
754 # get mds. key
755 keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
756 ['mon', 'profile mds',
757 'osd', 'allow rw tag cephfs *=*',
758 'mds', 'allow'])
759 daemon_spec.keyring = keyring
760
761 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
762
763 return daemon_spec
764
765 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
766 active_mds_strs = list()
767 for fs in self.mgr.get('fs_map')['filesystems']:
768 mds_map = fs['mdsmap']
769 if mds_map is not None:
770 for mds_id, mds_status in mds_map['info'].items():
771 if mds_status['state'] == 'up:active':
772 active_mds_strs.append(mds_status['name'])
773 if len(active_mds_strs) != 0:
774 for daemon in daemon_descrs:
775 if daemon.daemon_id in active_mds_strs:
776 return daemon
777 # if no mds found, return empty Daemon Desc
778 return DaemonDescription()
779
780 def purge(self, service_name: str) -> None:
781 self.mgr.check_mon_command({
782 'prefix': 'config rm',
783 'who': service_name,
784 'name': 'mds_join_fs',
785 })
786
787
788 class RgwService(CephService):
789 TYPE = 'rgw'
790
791 def allow_colo(self) -> bool:
792 return True
793
794 def config(self, spec: RGWSpec) -> None: # type: ignore
795 assert self.TYPE == spec.service_type
796
797 # set rgw_realm and rgw_zone, if present
798 if spec.rgw_realm:
799 ret, out, err = self.mgr.check_mon_command({
800 'prefix': 'config set',
801 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
802 'name': 'rgw_realm',
803 'value': spec.rgw_realm,
804 })
805 if spec.rgw_zone:
806 ret, out, err = self.mgr.check_mon_command({
807 'prefix': 'config set',
808 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
809 'name': 'rgw_zone',
810 'value': spec.rgw_zone,
811 })
812
813 if spec.rgw_frontend_ssl_certificate:
814 if isinstance(spec.rgw_frontend_ssl_certificate, list):
815 cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
816 elif isinstance(spec.rgw_frontend_ssl_certificate, str):
817 cert_data = spec.rgw_frontend_ssl_certificate
818 else:
819 raise OrchestratorError(
820 'Invalid rgw_frontend_ssl_certificate: %s'
821 % spec.rgw_frontend_ssl_certificate)
822 ret, out, err = self.mgr.check_mon_command({
823 'prefix': 'config-key set',
824 'key': f'rgw/cert/{spec.service_name()}',
825 'val': cert_data,
826 })
827
828 # TODO: fail, if we don't have a spec
829 logger.info('Saving service %s spec with placement %s' % (
830 spec.service_name(), spec.placement.pretty_str()))
831 self.mgr.spec_store.save(spec)
832 self.mgr.trigger_connect_dashboard_rgw()
833
834 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
835 assert self.TYPE == daemon_spec.daemon_type
836 rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
837 spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
838
839 keyring = self.get_keyring(rgw_id)
840
841 if daemon_spec.ports:
842 port = daemon_spec.ports[0]
843 else:
844 # this is a redeploy of older instance that doesn't have an explicitly
845 # assigned port, in which case we can assume there is only 1 per host
846 # and it matches the spec.
847 port = spec.get_port()
848
849 # configure frontend
850 args = []
851 ftype = spec.rgw_frontend_type or "beast"
852 if ftype == 'beast':
853 if spec.ssl:
854 if daemon_spec.ip:
855 args.append(
856 f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
857 else:
858 args.append(f"ssl_port={port}")
859 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
860 else:
861 if daemon_spec.ip:
862 args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
863 else:
864 args.append(f"port={port}")
865 elif ftype == 'civetweb':
866 if spec.ssl:
867 if daemon_spec.ip:
868 # note the 's' suffix on port
869 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
870 else:
871 args.append(f"port={port}s") # note the 's' suffix on port
872 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
873 else:
874 if daemon_spec.ip:
875 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
876 else:
877 args.append(f"port={port}")
878 frontend = f'{ftype} {" ".join(args)}'
879
880 ret, out, err = self.mgr.check_mon_command({
881 'prefix': 'config set',
882 'who': utils.name_to_config_section(daemon_spec.name()),
883 'name': 'rgw_frontends',
884 'value': frontend
885 })
886
887 daemon_spec.keyring = keyring
888 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
889
890 return daemon_spec
891
892 def get_keyring(self, rgw_id: str) -> str:
893 keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
894 ['mon', 'allow *',
895 'mgr', 'allow rw',
896 'osd', 'allow rwx tag rgw *=*'])
897 return keyring
898
899 def purge(self, service_name: str) -> None:
900 self.mgr.check_mon_command({
901 'prefix': 'config rm',
902 'who': utils.name_to_config_section(service_name),
903 'name': 'rgw_realm',
904 })
905 self.mgr.check_mon_command({
906 'prefix': 'config rm',
907 'who': utils.name_to_config_section(service_name),
908 'name': 'rgw_zone',
909 })
910 self.mgr.check_mon_command({
911 'prefix': 'config-key rm',
912 'key': f'rgw/cert/{service_name}',
913 })
914 self.mgr.trigger_connect_dashboard_rgw()
915
916 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
917 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
918 self.mgr.check_mon_command({
919 'prefix': 'config rm',
920 'who': utils.name_to_config_section(daemon.name()),
921 'name': 'rgw_frontends',
922 })
923
924 def ok_to_stop(
925 self,
926 daemon_ids: List[str],
927 force: bool = False,
928 known: Optional[List[str]] = None # output argument
929 ) -> HandleCommandResult:
930 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
931 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
932 def ingress_present() -> bool:
933 running_ingress_daemons = [
934 daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
935 running_haproxy_daemons = [
936 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
937 running_keepalived_daemons = [
938 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
939 # check that there is at least one haproxy and keepalived daemon running
940 if running_haproxy_daemons and running_keepalived_daemons:
941 return True
942 return False
943
944 # if only 1 rgw, alert user (this is not passable with --force)
945 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
946 if warn:
947 return HandleCommandResult(-errno.EBUSY, '', warn_message)
948
949 # if reached here, there is > 1 rgw daemon.
950 # Say okay if load balancer present or force flag set
951 if ingress_present() or force:
952 return HandleCommandResult(0, warn_message, '')
953
954 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
955 # Provide warning
956 warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
957 return HandleCommandResult(-errno.EBUSY, '', warn_message)
958
959 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
960 self.mgr.trigger_connect_dashboard_rgw()
961
962
963 class RbdMirrorService(CephService):
964 TYPE = 'rbd-mirror'
965
966 def allow_colo(self) -> bool:
967 return True
968
969 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
970 assert self.TYPE == daemon_spec.daemon_type
971 daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
972
973 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
974 ['mon', 'profile rbd-mirror',
975 'osd', 'profile rbd'])
976
977 daemon_spec.keyring = keyring
978
979 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
980
981 return daemon_spec
982
983 def ok_to_stop(
984 self,
985 daemon_ids: List[str],
986 force: bool = False,
987 known: Optional[List[str]] = None # output argument
988 ) -> HandleCommandResult:
989 # if only 1 rbd-mirror, alert user (this is not passable with --force)
990 warn, warn_message = self._enough_daemons_to_stop(
991 self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
992 if warn:
993 return HandleCommandResult(-errno.EBUSY, '', warn_message)
994 return HandleCommandResult(0, warn_message, '')
995
996
997 class CrashService(CephService):
998 TYPE = 'crash'
999
1000 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1001 assert self.TYPE == daemon_spec.daemon_type
1002 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
1003
1004 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
1005 ['mon', 'profile crash',
1006 'mgr', 'profile crash'])
1007
1008 daemon_spec.keyring = keyring
1009
1010 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1011
1012 return daemon_spec
1013
1014
1015 class CephfsMirrorService(CephService):
1016 TYPE = 'cephfs-mirror'
1017
1018 def config(self, spec: ServiceSpec) -> None:
1019 # make sure mirroring module is enabled
1020 mgr_map = self.mgr.get('mgr_map')
1021 mod_name = 'mirroring'
1022 if mod_name not in mgr_map.get('services', {}):
1023 self.mgr.check_mon_command({
1024 'prefix': 'mgr module enable',
1025 'module': mod_name
1026 })
1027 # we shouldn't get here (mon will tell the mgr to respawn), but no
1028 # harm done if we do.
1029
1030 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1031 assert self.TYPE == daemon_spec.daemon_type
1032
1033 ret, keyring, err = self.mgr.check_mon_command({
1034 'prefix': 'auth get-or-create',
1035 'entity': self.get_auth_entity(daemon_spec.daemon_id),
1036 'caps': ['mon', 'profile cephfs-mirror',
1037 'mds', 'allow r',
1038 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
1039 'mgr', 'allow r'],
1040 })
1041
1042 daemon_spec.keyring = keyring
1043 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1044 return daemon_spec
1045
1046
1047 class CephadmAgent(CephService):
1048 TYPE = 'agent'
1049
1050 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1051 assert self.TYPE == daemon_spec.daemon_type
1052 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
1053
1054 if not self.mgr.cherrypy_thread:
1055 raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
1056
1057 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
1058 daemon_spec.keyring = keyring
1059 self.mgr.agent_cache.agent_keys[host] = keyring
1060
1061 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1062
1063 return daemon_spec
1064
1065 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
1066 try:
1067 assert self.mgr.cherrypy_thread
1068 assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
1069 assert self.mgr.cherrypy_thread.server_port
1070 except Exception:
1071 raise OrchestratorError(
1072 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
1073
1074 cfg = {'target_ip': self.mgr.get_mgr_ip(),
1075 'target_port': self.mgr.cherrypy_thread.server_port,
1076 'refresh_period': self.mgr.agent_refresh_rate,
1077 'listener_port': self.mgr.agent_starting_port,
1078 'host': daemon_spec.host,
1079 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
1080
1081 listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
1082 self.mgr.inventory.get_addr(daemon_spec.host))
1083 config = {
1084 'agent.json': json.dumps(cfg),
1085 'keyring': daemon_spec.keyring,
1086 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
1087 'listener.crt': listener_cert,
1088 'listener.key': listener_key,
1089 }
1090
1091 return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
1092 self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
1093 str(self.mgr.get_module_option('device_enhanced_scan'))])