]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
1 import errno
2 import json
3 import logging
4 import re
5 from abc import ABCMeta, abstractmethod
6 from typing import TYPE_CHECKING, List, Callable, TypeVar, \
7 Optional, Dict, Any, Tuple, NewType, cast
8
9 from mgr_module import HandleCommandResult, MonCommandFailed
10
11 from ceph.deployment.service_spec import ServiceSpec, RGWSpec
12 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
13 from mgr_util import build_url
14 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
15 from orchestrator._interface import daemon_type_to_service
16 from cephadm import utils
17
18 if TYPE_CHECKING:
19 from cephadm.module import CephadmOrchestrator
20
21 logger = logging.getLogger(__name__)
22
23 ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
24 AuthEntity = NewType('AuthEntity', str)
25
26
27 class CephadmDaemonDeploySpec:
28 # typing.NamedTuple + Generic is broken in py36
29 def __init__(self, host: str, daemon_id: str,
30 service_name: str,
31 network: Optional[str] = None,
32 keyring: Optional[str] = None,
33 extra_args: Optional[List[str]] = None,
34 ceph_conf: str = '',
35 extra_files: Optional[Dict[str, Any]] = None,
36 daemon_type: Optional[str] = None,
37 ip: Optional[str] = None,
38 ports: Optional[List[int]] = None,
39 rank: Optional[int] = None,
40 rank_generation: Optional[int] = None,
41 extra_container_args: Optional[List[str]] = None):
42 """
43 A data struction to encapsulate `cephadm deploy ...
44 """
45 self.host: str = host
46 self.daemon_id = daemon_id
47 self.service_name = service_name
48 daemon_type = daemon_type or (service_name.split('.')[0])
49 assert daemon_type is not None
50 self.daemon_type: str = daemon_type
51
52 # mons
53 self.network = network
54
55 # for run_cephadm.
56 self.keyring: Optional[str] = keyring
57
58 # For run_cephadm. Would be great to have more expressive names.
59 self.extra_args: List[str] = extra_args or []
60
61 self.ceph_conf = ceph_conf
62 self.extra_files = extra_files or {}
63
64 # TCP ports used by the daemon
65 self.ports: List[int] = ports or []
66 self.ip: Optional[str] = ip
67
68 # values to be populated during generate_config calls
69 # and then used in _run_cephadm
70 self.final_config: Dict[str, Any] = {}
71 self.deps: List[str] = []
72
73 self.rank: Optional[int] = rank
74 self.rank_generation: Optional[int] = rank_generation
75
76 self.extra_container_args = extra_container_args
77
78 def name(self) -> str:
79 return '%s.%s' % (self.daemon_type, self.daemon_id)
80
81 def config_get_files(self) -> Dict[str, Any]:
82 files = self.extra_files
83 if self.ceph_conf:
84 files['config'] = self.ceph_conf
85
86 return files
87
88 @staticmethod
89 def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
90 assert dd.hostname
91 assert dd.daemon_id
92 assert dd.daemon_type
93 return CephadmDaemonDeploySpec(
94 host=dd.hostname,
95 daemon_id=dd.daemon_id,
96 daemon_type=dd.daemon_type,
97 service_name=dd.service_name(),
98 ip=dd.ip,
99 ports=dd.ports,
100 rank=dd.rank,
101 rank_generation=dd.rank_generation,
102 extra_container_args=dd.extra_container_args,
103 )
104
105 def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
106 return DaemonDescription(
107 daemon_type=self.daemon_type,
108 daemon_id=self.daemon_id,
109 service_name=self.service_name,
110 hostname=self.host,
111 status=status,
112 status_desc=status_desc,
113 ip=self.ip,
114 ports=self.ports,
115 rank=self.rank,
116 rank_generation=self.rank_generation,
117 extra_container_args=self.extra_container_args,
118 )
119
120
121 class CephadmService(metaclass=ABCMeta):
122 """
123 Base class for service types. Often providing a create() and config() fn.
124 """
125
126 @property
127 @abstractmethod
128 def TYPE(self) -> str:
129 pass
130
131 def __init__(self, mgr: "CephadmOrchestrator"):
132 self.mgr: "CephadmOrchestrator" = mgr
133
134 def allow_colo(self) -> bool:
135 """
136 Return True if multiple daemons of the same type can colocate on
137 the same host.
138 """
139 return False
140
141 def primary_daemon_type(self) -> str:
142 """
143 This is the type of the primary (usually only) daemon to be deployed.
144 """
145 return self.TYPE
146
147 def per_host_daemon_type(self) -> Optional[str]:
148 """
149 If defined, this type of daemon will be deployed once for each host
150 containing one or more daemons of the primary type.
151 """
152 return None
153
154 def ranked(self) -> bool:
155 """
156 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
157 generation (0, 1, ...) to each daemon we create/deploy.
158 """
159 return False
160
161 def fence_old_ranks(self,
162 spec: ServiceSpec,
163 rank_map: Dict[int, Dict[int, Optional[str]]],
164 num_ranks: int) -> None:
165 assert False
166
167 def make_daemon_spec(
168 self,
169 host: str,
170 daemon_id: str,
171 network: str,
172 spec: ServiceSpecs,
173 daemon_type: Optional[str] = None,
174 ports: Optional[List[int]] = None,
175 ip: Optional[str] = None,
176 rank: Optional[int] = None,
177 rank_generation: Optional[int] = None,
178 ) -> CephadmDaemonDeploySpec:
179 try:
180 eca = spec.extra_container_args
181 except AttributeError:
182 eca = None
183 return CephadmDaemonDeploySpec(
184 host=host,
185 daemon_id=daemon_id,
186 service_name=spec.service_name(),
187 network=network,
188 daemon_type=daemon_type,
189 ports=ports,
190 ip=ip,
191 rank=rank,
192 rank_generation=rank_generation,
193 extra_container_args=eca,
194 )
195
196 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
197 raise NotImplementedError()
198
199 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
200 raise NotImplementedError()
201
202 def config(self, spec: ServiceSpec) -> None:
203 """
204 Configure the cluster for this service. Only called *once* per
205 service apply. Not for every daemon.
206 """
207 pass
208
209 def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
210 """The post actions needed to be done after daemons are checked"""
211 if self.mgr.config_dashboard:
212 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
213 self.config_dashboard(daemon_descrs)
214 else:
215 logger.debug('Dashboard is not enabled. Skip configuration.')
216
217 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
218 """Config dashboard settings."""
219 raise NotImplementedError()
220
221 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
222 # if this is called for a service type where it hasn't explcitly been
223 # defined, return empty Daemon Desc
224 return DaemonDescription()
225
226 def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
227 ret, keyring, err = self.mgr.mon_command({
228 'prefix': 'auth get-or-create',
229 'entity': entity,
230 'caps': caps,
231 })
232 if err:
233 ret, out, err = self.mgr.mon_command({
234 'prefix': 'auth caps',
235 'entity': entity,
236 'caps': caps,
237 })
238 if err:
239 self.mgr.log.warning(f"Unable to update caps for {entity}")
240 return keyring
241
242 def _inventory_get_addr(self, hostname: str) -> str:
243 """Get a host's address with its hostname."""
244 return self.mgr.inventory.get_addr(hostname)
245
246 def _set_service_url_on_dashboard(self,
247 service_name: str,
248 get_mon_cmd: str,
249 set_mon_cmd: str,
250 service_url: str) -> None:
251 """A helper to get and set service_url via Dashboard's MON command.
252
253 If result of get_mon_cmd differs from service_url, set_mon_cmd will
254 be sent to set the service_url.
255 """
256 def get_set_cmd_dicts(out: str) -> List[dict]:
257 cmd_dict = {
258 'prefix': set_mon_cmd,
259 'value': service_url
260 }
261 return [cmd_dict] if service_url != out else []
262
263 self._check_and_set_dashboard(
264 service_name=service_name,
265 get_cmd=get_mon_cmd,
266 get_set_cmd_dicts=get_set_cmd_dicts
267 )
268
269 def _check_and_set_dashboard(self,
270 service_name: str,
271 get_cmd: str,
272 get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
273 """A helper to set configs in the Dashboard.
274
275 The method is useful for the pattern:
276 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
277 gateways.
278 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
279 - Determine if the config need to be update. NOTE: This step is important because if a
280 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
281 kicks the serve() loop and the logic using this method is likely to be called again.
282 A config should be updated only when needed.
283 - Update a config in Dashboard by using a Dashboard command.
284
285 :param service_name: the service name to be used for logging
286 :type service_name: str
287 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
288 :type get_cmd: str
289 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
290 e.g.
291 [
292 {
293 'prefix': 'dashboard iscsi-gateway-add',
294 'service_url': 'http://admin:admin@aaa:5000',
295 'name': 'aaa'
296 },
297 {
298 'prefix': 'dashboard iscsi-gateway-add',
299 'service_url': 'http://admin:admin@bbb:5000',
300 'name': 'bbb'
301 }
302 ]
303 The function should return empty list if no command need to be sent.
304 :type get_set_cmd_dicts: Callable[[str], List[dict]]
305 """
306
307 try:
308 _, out, _ = self.mgr.check_mon_command({
309 'prefix': get_cmd
310 })
311 except MonCommandFailed as e:
312 logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
313 return
314 cmd_dicts = get_set_cmd_dicts(out.strip())
315 for cmd_dict in list(cmd_dicts):
316 try:
317 inbuf = cmd_dict.pop('inbuf', None)
318 _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
319 except MonCommandFailed as e:
320 logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
321
322 def ok_to_stop_osd(
323 self,
324 osds: List[str],
325 known: Optional[List[str]] = None, # output argument
326 force: bool = False) -> HandleCommandResult:
327 r = HandleCommandResult(*self.mgr.mon_command({
328 'prefix': "osd ok-to-stop",
329 'ids': osds,
330 'max': 16,
331 }))
332 j = None
333 try:
334 j = json.loads(r.stdout)
335 except json.decoder.JSONDecodeError:
336 self.mgr.log.warning("osd ok-to-stop didn't return structured result")
337 raise
338 if r.retval:
339 return r
340 if known is not None and j and j.get('ok_to_stop'):
341 self.mgr.log.debug(f"got {j}")
342 known.extend([f'osd.{x}' for x in j.get('osds', [])])
343 return HandleCommandResult(
344 0,
345 f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
346 ''
347 )
348
349 def ok_to_stop(
350 self,
351 daemon_ids: List[str],
352 force: bool = False,
353 known: Optional[List[str]] = None # output argument
354 ) -> HandleCommandResult:
355 names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
356 out = f'It appears safe to stop {",".join(names)}'
357 err = f'It is NOT safe to stop {",".join(names)} at this time'
358
359 if self.TYPE not in ['mon', 'osd', 'mds']:
360 logger.debug(out)
361 return HandleCommandResult(0, out)
362
363 if self.TYPE == 'osd':
364 return self.ok_to_stop_osd(daemon_ids, known, force)
365
366 r = HandleCommandResult(*self.mgr.mon_command({
367 'prefix': f'{self.TYPE} ok-to-stop',
368 'ids': daemon_ids,
369 }))
370
371 if r.retval:
372 err = f'{err}: {r.stderr}' if r.stderr else err
373 logger.debug(err)
374 return HandleCommandResult(r.retval, r.stdout, err)
375
376 out = f'{out}: {r.stdout}' if r.stdout else out
377 logger.debug(out)
378 return HandleCommandResult(r.retval, out, r.stderr)
379
380 def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
381 # Provides a warning about if it possible or not to stop <n> daemons in a service
382 names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
383 number_of_running_daemons = len(
384 [daemon
385 for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
386 if daemon.status == DaemonDescriptionStatus.running])
387 if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
388 return False, f'It is presumed safe to stop {names}'
389
390 num_daemons_left = number_of_running_daemons - len(daemon_ids)
391
392 def plural(count: int) -> str:
393 return 'daemon' if count == 1 else 'daemons'
394
395 left_count = "no" if num_daemons_left == 0 else num_daemons_left
396
397 if alert:
398 out = (f'ALERT: Cannot stop {names} in {service} service. '
399 f'Not enough remaining {service} daemons. '
400 f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
401 else:
402 out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
403 f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
404 f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
405 return True, out
406
407 def pre_remove(self, daemon: DaemonDescription) -> None:
408 """
409 Called before the daemon is removed.
410 """
411 assert daemon.daemon_type is not None
412 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
413 logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
414
415 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
416 """
417 Called after the daemon is removed.
418 """
419 assert daemon.daemon_type is not None
420 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
421 logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
422
423 def purge(self, service_name: str) -> None:
424 """Called to carry out any purge tasks following service removal"""
425 logger.debug(f'Purge called for {self.TYPE} - no action taken')
426
427
428 class CephService(CephadmService):
429 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
430 # Ceph.daemons (mon, mgr, mds, osd, etc)
431 cephadm_config = self.get_config_and_keyring(
432 daemon_spec.daemon_type,
433 daemon_spec.daemon_id,
434 host=daemon_spec.host,
435 keyring=daemon_spec.keyring,
436 extra_ceph_config=daemon_spec.ceph_conf)
437
438 if daemon_spec.config_get_files():
439 cephadm_config.update({'files': daemon_spec.config_get_files()})
440
441 return cephadm_config, []
442
443 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
444 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
445 self.remove_keyring(daemon)
446
447 def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
448 """
449 Map the daemon id to a cephx keyring entity name
450 """
451 # despite this mapping entity names to daemons, self.TYPE within
452 # the CephService class refers to service types, not daemon types
453 if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
454 return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
455 elif self.TYPE in ['crash', 'agent']:
456 if host == "":
457 raise OrchestratorError(
458 f'Host not provided to generate <{self.TYPE}> auth entity name')
459 return AuthEntity(f'client.{self.TYPE}.{host}')
460 elif self.TYPE == 'mon':
461 return AuthEntity('mon.')
462 elif self.TYPE in ['mgr', 'osd', 'mds']:
463 return AuthEntity(f'{self.TYPE}.{daemon_id}')
464 else:
465 raise OrchestratorError("unknown daemon type")
466
467 def get_config_and_keyring(self,
468 daemon_type: str,
469 daemon_id: str,
470 host: str,
471 keyring: Optional[str] = None,
472 extra_ceph_config: Optional[str] = None
473 ) -> Dict[str, Any]:
474 # keyring
475 if not keyring:
476 entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
477 ret, keyring, err = self.mgr.check_mon_command({
478 'prefix': 'auth get',
479 'entity': entity,
480 })
481
482 config = self.mgr.get_minimal_ceph_conf()
483
484 if extra_ceph_config:
485 config += extra_ceph_config
486
487 return {
488 'config': config,
489 'keyring': keyring,
490 }
491
492 def remove_keyring(self, daemon: DaemonDescription) -> None:
493 assert daemon.daemon_id is not None
494 assert daemon.hostname is not None
495 daemon_id: str = daemon.daemon_id
496 host: str = daemon.hostname
497
498 assert daemon.daemon_type != 'mon'
499
500 entity = self.get_auth_entity(daemon_id, host=host)
501
502 logger.info(f'Removing key for {entity}')
503 ret, out, err = self.mgr.mon_command({
504 'prefix': 'auth rm',
505 'entity': entity,
506 })
507
508
509 class MonService(CephService):
510 TYPE = 'mon'
511
512 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
513 """
514 Create a new monitor on the given host.
515 """
516 assert self.TYPE == daemon_spec.daemon_type
517 name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
518
519 # get mon. key
520 ret, keyring, err = self.mgr.check_mon_command({
521 'prefix': 'auth get',
522 'entity': self.get_auth_entity(name),
523 })
524
525 extra_config = '[mon.%s]\n' % name
526 if network:
527 # infer whether this is a CIDR network, addrvec, or plain IP
528 if '/' in network:
529 extra_config += 'public network = %s\n' % network
530 elif network.startswith('[v') and network.endswith(']'):
531 extra_config += 'public addrv = %s\n' % network
532 elif is_ipv6(network):
533 extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
534 elif ':' not in network:
535 extra_config += 'public addr = %s\n' % network
536 else:
537 raise OrchestratorError(
538 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
539 else:
540 # try to get the public_network from the config
541 ret, network, err = self.mgr.check_mon_command({
542 'prefix': 'config get',
543 'who': 'mon',
544 'key': 'public_network',
545 })
546 network = network.strip() if network else network
547 if not network:
548 raise OrchestratorError(
549 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
550 if '/' not in network:
551 raise OrchestratorError(
552 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
553 extra_config += 'public network = %s\n' % network
554
555 daemon_spec.ceph_conf = extra_config
556 daemon_spec.keyring = keyring
557
558 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
559
560 return daemon_spec
561
562 def _check_safe_to_destroy(self, mon_id: str) -> None:
563 ret, out, err = self.mgr.check_mon_command({
564 'prefix': 'quorum_status',
565 })
566 try:
567 j = json.loads(out)
568 except Exception:
569 raise OrchestratorError('failed to parse quorum status')
570
571 mons = [m['name'] for m in j['monmap']['mons']]
572 if mon_id not in mons:
573 logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
574 mon_id, mons))
575 return
576 new_mons = [m for m in mons if m != mon_id]
577 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
578 if len(new_quorum) > len(new_mons) / 2:
579 logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
580 (mon_id, new_quorum, new_mons))
581 return
582 raise OrchestratorError(
583 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
584
585 def pre_remove(self, daemon: DaemonDescription) -> None:
586 super().pre_remove(daemon)
587
588 assert daemon.daemon_id is not None
589 daemon_id: str = daemon.daemon_id
590 self._check_safe_to_destroy(daemon_id)
591
592 # remove mon from quorum before we destroy the daemon
593 logger.info('Removing monitor %s from monmap...' % daemon_id)
594 ret, out, err = self.mgr.check_mon_command({
595 'prefix': 'mon rm',
596 'name': daemon_id,
597 })
598
599 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
600 # Do not remove the mon keyring.
601 # super().post_remove(daemon)
602 pass
603
604
605 class MgrService(CephService):
606 TYPE = 'mgr'
607
608 def allow_colo(self) -> bool:
609 if self.mgr.get_ceph_option('mgr_standby_modules'):
610 # traditional mgr mode: standby daemons' modules listen on
611 # ports and redirect to the primary. we must not schedule
612 # multiple mgrs on the same host or else ports will
613 # conflict.
614 return False
615 else:
616 # standby daemons do nothing, and therefore port conflicts
617 # are not a concern.
618 return True
619
620 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
621 """
622 Create a new manager instance on a host.
623 """
624 assert self.TYPE == daemon_spec.daemon_type
625 mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
626
627 # get mgr. key
628 keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
629 ['mon', 'profile mgr',
630 'osd', 'allow *',
631 'mds', 'allow *'])
632
633 # Retrieve ports used by manager modules
634 # In the case of the dashboard port and with several manager daemons
635 # running in different hosts, it exists the possibility that the
636 # user has decided to use different dashboard ports in each server
637 # If this is the case then the dashboard port opened will be only the used
638 # as default.
639 ports = []
640 ret, mgr_services, err = self.mgr.check_mon_command({
641 'prefix': 'mgr services',
642 })
643 if mgr_services:
644 mgr_endpoints = json.loads(mgr_services)
645 for end_point in mgr_endpoints.values():
646 port = re.search(r'\:\d+\/', end_point)
647 if port:
648 ports.append(int(port[0][1:-1]))
649
650 if ports:
651 daemon_spec.ports = ports
652
653 daemon_spec.keyring = keyring
654
655 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
656
657 return daemon_spec
658
659 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
660 for daemon in daemon_descrs:
661 assert daemon.daemon_type is not None
662 assert daemon.daemon_id is not None
663 if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
664 return daemon
665 # if no active mgr found, return empty Daemon Desc
666 return DaemonDescription()
667
668 def fail_over(self) -> None:
669 if not self.mgr_map_has_standby():
670 raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
671 'daemon', 'mgr' + self.mgr.get_mgr_id()))
672
673 self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
674 'INFO', 'Failing over to other MGR')
675 logger.info('Failing over to other MGR')
676
677 # fail over
678 ret, out, err = self.mgr.check_mon_command({
679 'prefix': 'mgr fail',
680 'who': self.mgr.get_mgr_id(),
681 })
682
683 def mgr_map_has_standby(self) -> bool:
684 """
685 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
686 we know it joined the cluster
687 """
688 mgr_map = self.mgr.get('mgr_map')
689 num = len(mgr_map.get('standbys'))
690 return bool(num)
691
692 def ok_to_stop(
693 self,
694 daemon_ids: List[str],
695 force: bool = False,
696 known: Optional[List[str]] = None # output argument
697 ) -> HandleCommandResult:
698 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
699
700 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
701 if warn:
702 return HandleCommandResult(-errno.EBUSY, '', warn_message)
703
704 mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
705 active = self.get_active_daemon(mgr_daemons).daemon_id
706 if active in daemon_ids:
707 warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
708 return HandleCommandResult(-errno.EBUSY, '', warn_message)
709
710 return HandleCommandResult(0, warn_message, '')
711
712
713 class MdsService(CephService):
714 TYPE = 'mds'
715
716 def allow_colo(self) -> bool:
717 return True
718
719 def config(self, spec: ServiceSpec) -> None:
720 assert self.TYPE == spec.service_type
721 assert spec.service_id
722
723 # ensure mds_join_fs is set for these daemons
724 ret, out, err = self.mgr.check_mon_command({
725 'prefix': 'config set',
726 'who': 'mds.' + spec.service_id,
727 'name': 'mds_join_fs',
728 'value': spec.service_id,
729 })
730
731 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
732 assert self.TYPE == daemon_spec.daemon_type
733 mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
734
735 # get mds. key
736 keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
737 ['mon', 'profile mds',
738 'osd', 'allow rw tag cephfs *=*',
739 'mds', 'allow'])
740 daemon_spec.keyring = keyring
741
742 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
743
744 return daemon_spec
745
746 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
747 active_mds_strs = list()
748 for fs in self.mgr.get('fs_map')['filesystems']:
749 mds_map = fs['mdsmap']
750 if mds_map is not None:
751 for mds_id, mds_status in mds_map['info'].items():
752 if mds_status['state'] == 'up:active':
753 active_mds_strs.append(mds_status['name'])
754 if len(active_mds_strs) != 0:
755 for daemon in daemon_descrs:
756 if daemon.daemon_id in active_mds_strs:
757 return daemon
758 # if no mds found, return empty Daemon Desc
759 return DaemonDescription()
760
761 def purge(self, service_name: str) -> None:
762 self.mgr.check_mon_command({
763 'prefix': 'config rm',
764 'who': service_name,
765 'name': 'mds_join_fs',
766 })
767
768
769 class RgwService(CephService):
770 TYPE = 'rgw'
771
772 def allow_colo(self) -> bool:
773 return True
774
775 def config(self, spec: RGWSpec) -> None: # type: ignore
776 assert self.TYPE == spec.service_type
777
778 # set rgw_realm and rgw_zone, if present
779 if spec.rgw_realm:
780 ret, out, err = self.mgr.check_mon_command({
781 'prefix': 'config set',
782 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
783 'name': 'rgw_realm',
784 'value': spec.rgw_realm,
785 })
786 if spec.rgw_zone:
787 ret, out, err = self.mgr.check_mon_command({
788 'prefix': 'config set',
789 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
790 'name': 'rgw_zone',
791 'value': spec.rgw_zone,
792 })
793
794 if spec.rgw_frontend_ssl_certificate:
795 if isinstance(spec.rgw_frontend_ssl_certificate, list):
796 cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
797 elif isinstance(spec.rgw_frontend_ssl_certificate, str):
798 cert_data = spec.rgw_frontend_ssl_certificate
799 else:
800 raise OrchestratorError(
801 'Invalid rgw_frontend_ssl_certificate: %s'
802 % spec.rgw_frontend_ssl_certificate)
803 ret, out, err = self.mgr.check_mon_command({
804 'prefix': 'config-key set',
805 'key': f'rgw/cert/{spec.service_name()}',
806 'val': cert_data,
807 })
808
809 # TODO: fail, if we don't have a spec
810 logger.info('Saving service %s spec with placement %s' % (
811 spec.service_name(), spec.placement.pretty_str()))
812 self.mgr.spec_store.save(spec)
813 self.mgr.trigger_connect_dashboard_rgw()
814
815 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
816 assert self.TYPE == daemon_spec.daemon_type
817 rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
818 spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
819
820 keyring = self.get_keyring(rgw_id)
821
822 if daemon_spec.ports:
823 port = daemon_spec.ports[0]
824 else:
825 # this is a redeploy of older instance that doesn't have an explicitly
826 # assigned port, in which case we can assume there is only 1 per host
827 # and it matches the spec.
828 port = spec.get_port()
829
830 # configure frontend
831 args = []
832 ftype = spec.rgw_frontend_type or "beast"
833 if ftype == 'beast':
834 if spec.ssl:
835 if daemon_spec.ip:
836 args.append(
837 f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
838 else:
839 args.append(f"ssl_port={port}")
840 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
841 else:
842 if daemon_spec.ip:
843 args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
844 else:
845 args.append(f"port={port}")
846 elif ftype == 'civetweb':
847 if spec.ssl:
848 if daemon_spec.ip:
849 # note the 's' suffix on port
850 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
851 else:
852 args.append(f"port={port}s") # note the 's' suffix on port
853 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
854 else:
855 if daemon_spec.ip:
856 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
857 else:
858 args.append(f"port={port}")
859 frontend = f'{ftype} {" ".join(args)}'
860
861 ret, out, err = self.mgr.check_mon_command({
862 'prefix': 'config set',
863 'who': utils.name_to_config_section(daemon_spec.name()),
864 'name': 'rgw_frontends',
865 'value': frontend
866 })
867
868 daemon_spec.keyring = keyring
869 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
870
871 return daemon_spec
872
873 def get_keyring(self, rgw_id: str) -> str:
874 keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
875 ['mon', 'allow *',
876 'mgr', 'allow rw',
877 'osd', 'allow rwx tag rgw *=*'])
878 return keyring
879
880 def purge(self, service_name: str) -> None:
881 self.mgr.check_mon_command({
882 'prefix': 'config rm',
883 'who': utils.name_to_config_section(service_name),
884 'name': 'rgw_realm',
885 })
886 self.mgr.check_mon_command({
887 'prefix': 'config rm',
888 'who': utils.name_to_config_section(service_name),
889 'name': 'rgw_zone',
890 })
891 self.mgr.check_mon_command({
892 'prefix': 'config-key rm',
893 'key': f'rgw/cert/{service_name}',
894 })
895 self.mgr.trigger_connect_dashboard_rgw()
896
897 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
898 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
899 self.mgr.check_mon_command({
900 'prefix': 'config rm',
901 'who': utils.name_to_config_section(daemon.name()),
902 'name': 'rgw_frontends',
903 })
904
905 def ok_to_stop(
906 self,
907 daemon_ids: List[str],
908 force: bool = False,
909 known: Optional[List[str]] = None # output argument
910 ) -> HandleCommandResult:
911 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
912 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
913 def ingress_present() -> bool:
914 running_ingress_daemons = [
915 daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
916 running_haproxy_daemons = [
917 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
918 running_keepalived_daemons = [
919 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
920 # check that there is at least one haproxy and keepalived daemon running
921 if running_haproxy_daemons and running_keepalived_daemons:
922 return True
923 return False
924
925 # if only 1 rgw, alert user (this is not passable with --force)
926 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
927 if warn:
928 return HandleCommandResult(-errno.EBUSY, '', warn_message)
929
930 # if reached here, there is > 1 rgw daemon.
931 # Say okay if load balancer present or force flag set
932 if ingress_present() or force:
933 return HandleCommandResult(0, warn_message, '')
934
935 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
936 # Provide warning
937 warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
938 return HandleCommandResult(-errno.EBUSY, '', warn_message)
939
940 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
941 self.mgr.trigger_connect_dashboard_rgw()
942
943
944 class RbdMirrorService(CephService):
945 TYPE = 'rbd-mirror'
946
947 def allow_colo(self) -> bool:
948 return True
949
950 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
951 assert self.TYPE == daemon_spec.daemon_type
952 daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
953
954 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
955 ['mon', 'profile rbd-mirror',
956 'osd', 'profile rbd'])
957
958 daemon_spec.keyring = keyring
959
960 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
961
962 return daemon_spec
963
964 def ok_to_stop(
965 self,
966 daemon_ids: List[str],
967 force: bool = False,
968 known: Optional[List[str]] = None # output argument
969 ) -> HandleCommandResult:
970 # if only 1 rbd-mirror, alert user (this is not passable with --force)
971 warn, warn_message = self._enough_daemons_to_stop(
972 self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
973 if warn:
974 return HandleCommandResult(-errno.EBUSY, '', warn_message)
975 return HandleCommandResult(0, warn_message, '')
976
977
978 class CrashService(CephService):
979 TYPE = 'crash'
980
981 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
982 assert self.TYPE == daemon_spec.daemon_type
983 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
984
985 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
986 ['mon', 'profile crash',
987 'mgr', 'profile crash'])
988
989 daemon_spec.keyring = keyring
990
991 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
992
993 return daemon_spec
994
995
996 class CephfsMirrorService(CephService):
997 TYPE = 'cephfs-mirror'
998
999 def config(self, spec: ServiceSpec) -> None:
1000 # make sure mirroring module is enabled
1001 mgr_map = self.mgr.get('mgr_map')
1002 mod_name = 'mirroring'
1003 if mod_name not in mgr_map.get('services', {}):
1004 self.mgr.check_mon_command({
1005 'prefix': 'mgr module enable',
1006 'module': mod_name
1007 })
1008 # we shouldn't get here (mon will tell the mgr to respawn), but no
1009 # harm done if we do.
1010
1011 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1012 assert self.TYPE == daemon_spec.daemon_type
1013
1014 ret, keyring, err = self.mgr.check_mon_command({
1015 'prefix': 'auth get-or-create',
1016 'entity': self.get_auth_entity(daemon_spec.daemon_id),
1017 'caps': ['mon', 'profile cephfs-mirror',
1018 'mds', 'allow r',
1019 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
1020 'mgr', 'allow r'],
1021 })
1022
1023 daemon_spec.keyring = keyring
1024 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1025 return daemon_spec
1026
1027
1028 class CephadmAgent(CephService):
1029 TYPE = 'agent'
1030
1031 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1032 assert self.TYPE == daemon_spec.daemon_type
1033 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
1034
1035 if not self.mgr.cherrypy_thread:
1036 raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
1037
1038 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
1039 daemon_spec.keyring = keyring
1040 self.mgr.agent_cache.agent_keys[host] = keyring
1041
1042 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1043
1044 return daemon_spec
1045
1046 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
1047 try:
1048 assert self.mgr.cherrypy_thread
1049 assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
1050 assert self.mgr.cherrypy_thread.server_port
1051 except Exception:
1052 raise OrchestratorError(
1053 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
1054
1055 cfg = {'target_ip': self.mgr.get_mgr_ip(),
1056 'target_port': self.mgr.cherrypy_thread.server_port,
1057 'refresh_period': self.mgr.agent_refresh_rate,
1058 'listener_port': self.mgr.agent_starting_port,
1059 'host': daemon_spec.host,
1060 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
1061
1062 listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
1063 self.mgr.inventory.get_addr(daemon_spec.host))
1064 config = {
1065 'agent.json': json.dumps(cfg),
1066 'keyring': daemon_spec.keyring,
1067 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
1068 'listener.crt': listener_cert,
1069 'listener.key': listener_key,
1070 }
1071
1072 return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
1073 self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
1074 str(self.mgr.get_module_option('device_enhanced_scan'))])