]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
import ceph 16.2.7
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
1 import errno
2 import json
3 import logging
4 import re
5 from abc import ABCMeta, abstractmethod
6 from typing import TYPE_CHECKING, List, Callable, TypeVar, \
7 Optional, Dict, Any, Tuple, NewType, cast
8
9 from mgr_module import HandleCommandResult, MonCommandFailed
10
11 from ceph.deployment.service_spec import ServiceSpec, RGWSpec
12 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
13 from mgr_util import build_url
14 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
15 from orchestrator._interface import daemon_type_to_service
16 from cephadm import utils
17
18 if TYPE_CHECKING:
19 from cephadm.module import CephadmOrchestrator
20
21 logger = logging.getLogger(__name__)
22
23 ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
24 AuthEntity = NewType('AuthEntity', str)
25
26
27 class CephadmDaemonDeploySpec:
28 # typing.NamedTuple + Generic is broken in py36
29 def __init__(self, host: str, daemon_id: str,
30 service_name: str,
31 network: Optional[str] = None,
32 keyring: Optional[str] = None,
33 extra_args: Optional[List[str]] = None,
34 ceph_conf: str = '',
35 extra_files: Optional[Dict[str, Any]] = None,
36 daemon_type: Optional[str] = None,
37 ip: Optional[str] = None,
38 ports: Optional[List[int]] = None,
39 rank: Optional[int] = None,
40 rank_generation: Optional[int] = None):
41 """
42 A data struction to encapsulate `cephadm deploy ...
43 """
44 self.host: str = host
45 self.daemon_id = daemon_id
46 self.service_name = service_name
47 daemon_type = daemon_type or (service_name.split('.')[0])
48 assert daemon_type is not None
49 self.daemon_type: str = daemon_type
50
51 # mons
52 self.network = network
53
54 # for run_cephadm.
55 self.keyring: Optional[str] = keyring
56
57 # For run_cephadm. Would be great to have more expressive names.
58 self.extra_args: List[str] = extra_args or []
59
60 self.ceph_conf = ceph_conf
61 self.extra_files = extra_files or {}
62
63 # TCP ports used by the daemon
64 self.ports: List[int] = ports or []
65 self.ip: Optional[str] = ip
66
67 # values to be populated during generate_config calls
68 # and then used in _run_cephadm
69 self.final_config: Dict[str, Any] = {}
70 self.deps: List[str] = []
71
72 self.rank: Optional[int] = rank
73 self.rank_generation: Optional[int] = rank_generation
74
75 def name(self) -> str:
76 return '%s.%s' % (self.daemon_type, self.daemon_id)
77
78 def config_get_files(self) -> Dict[str, Any]:
79 files = self.extra_files
80 if self.ceph_conf:
81 files['config'] = self.ceph_conf
82
83 return files
84
85 @staticmethod
86 def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
87 assert dd.hostname
88 assert dd.daemon_id
89 assert dd.daemon_type
90 return CephadmDaemonDeploySpec(
91 host=dd.hostname,
92 daemon_id=dd.daemon_id,
93 daemon_type=dd.daemon_type,
94 service_name=dd.service_name(),
95 ip=dd.ip,
96 ports=dd.ports,
97 rank=dd.rank,
98 rank_generation=dd.rank_generation,
99 )
100
101 def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
102 return DaemonDescription(
103 daemon_type=self.daemon_type,
104 daemon_id=self.daemon_id,
105 service_name=self.service_name,
106 hostname=self.host,
107 status=status,
108 status_desc=status_desc,
109 ip=self.ip,
110 ports=self.ports,
111 rank=self.rank,
112 rank_generation=self.rank_generation,
113 )
114
115
116 class CephadmService(metaclass=ABCMeta):
117 """
118 Base class for service types. Often providing a create() and config() fn.
119 """
120
121 @property
122 @abstractmethod
123 def TYPE(self) -> str:
124 pass
125
126 def __init__(self, mgr: "CephadmOrchestrator"):
127 self.mgr: "CephadmOrchestrator" = mgr
128
129 def allow_colo(self) -> bool:
130 """
131 Return True if multiple daemons of the same type can colocate on
132 the same host.
133 """
134 return False
135
136 def primary_daemon_type(self) -> str:
137 """
138 This is the type of the primary (usually only) daemon to be deployed.
139 """
140 return self.TYPE
141
142 def per_host_daemon_type(self) -> Optional[str]:
143 """
144 If defined, this type of daemon will be deployed once for each host
145 containing one or more daemons of the primary type.
146 """
147 return None
148
149 def ranked(self) -> bool:
150 """
151 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
152 generation (0, 1, ...) to each daemon we create/deploy.
153 """
154 return False
155
156 def fence_old_ranks(self,
157 spec: ServiceSpec,
158 rank_map: Dict[int, Dict[int, Optional[str]]],
159 num_ranks: int) -> None:
160 assert False
161
162 def make_daemon_spec(
163 self,
164 host: str,
165 daemon_id: str,
166 network: str,
167 spec: ServiceSpecs,
168 daemon_type: Optional[str] = None,
169 ports: Optional[List[int]] = None,
170 ip: Optional[str] = None,
171 rank: Optional[int] = None,
172 rank_generation: Optional[int] = None,
173 ) -> CephadmDaemonDeploySpec:
174 return CephadmDaemonDeploySpec(
175 host=host,
176 daemon_id=daemon_id,
177 service_name=spec.service_name(),
178 network=network,
179 daemon_type=daemon_type,
180 ports=ports,
181 ip=ip,
182 rank=rank,
183 rank_generation=rank_generation,
184 )
185
186 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
187 raise NotImplementedError()
188
189 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
190 raise NotImplementedError()
191
192 def config(self, spec: ServiceSpec) -> None:
193 """
194 Configure the cluster for this service. Only called *once* per
195 service apply. Not for every daemon.
196 """
197 pass
198
199 def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
200 """The post actions needed to be done after daemons are checked"""
201 if self.mgr.config_dashboard:
202 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
203 self.config_dashboard(daemon_descrs)
204 else:
205 logger.debug('Dashboard is not enabled. Skip configuration.')
206
207 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
208 """Config dashboard settings."""
209 raise NotImplementedError()
210
211 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
212 # if this is called for a service type where it hasn't explcitly been
213 # defined, return empty Daemon Desc
214 return DaemonDescription()
215
216 def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
217 ret, keyring, err = self.mgr.mon_command({
218 'prefix': 'auth get-or-create',
219 'entity': entity,
220 'caps': caps,
221 })
222 if err:
223 ret, out, err = self.mgr.mon_command({
224 'prefix': 'auth caps',
225 'entity': entity,
226 'caps': caps,
227 })
228 if err:
229 self.mgr.log.warning(f"Unable to update caps for {entity}")
230 return keyring
231
232 def _inventory_get_addr(self, hostname: str) -> str:
233 """Get a host's address with its hostname."""
234 return self.mgr.inventory.get_addr(hostname)
235
236 def _set_service_url_on_dashboard(self,
237 service_name: str,
238 get_mon_cmd: str,
239 set_mon_cmd: str,
240 service_url: str) -> None:
241 """A helper to get and set service_url via Dashboard's MON command.
242
243 If result of get_mon_cmd differs from service_url, set_mon_cmd will
244 be sent to set the service_url.
245 """
246 def get_set_cmd_dicts(out: str) -> List[dict]:
247 cmd_dict = {
248 'prefix': set_mon_cmd,
249 'value': service_url
250 }
251 return [cmd_dict] if service_url != out else []
252
253 self._check_and_set_dashboard(
254 service_name=service_name,
255 get_cmd=get_mon_cmd,
256 get_set_cmd_dicts=get_set_cmd_dicts
257 )
258
259 def _check_and_set_dashboard(self,
260 service_name: str,
261 get_cmd: str,
262 get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
263 """A helper to set configs in the Dashboard.
264
265 The method is useful for the pattern:
266 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
267 gateways.
268 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
269 - Determine if the config need to be update. NOTE: This step is important because if a
270 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
271 kicks the serve() loop and the logic using this method is likely to be called again.
272 A config should be updated only when needed.
273 - Update a config in Dashboard by using a Dashboard command.
274
275 :param service_name: the service name to be used for logging
276 :type service_name: str
277 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
278 :type get_cmd: str
279 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
280 e.g.
281 [
282 {
283 'prefix': 'dashboard iscsi-gateway-add',
284 'service_url': 'http://admin:admin@aaa:5000',
285 'name': 'aaa'
286 },
287 {
288 'prefix': 'dashboard iscsi-gateway-add',
289 'service_url': 'http://admin:admin@bbb:5000',
290 'name': 'bbb'
291 }
292 ]
293 The function should return empty list if no command need to be sent.
294 :type get_set_cmd_dicts: Callable[[str], List[dict]]
295 """
296
297 try:
298 _, out, _ = self.mgr.check_mon_command({
299 'prefix': get_cmd
300 })
301 except MonCommandFailed as e:
302 logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
303 return
304 cmd_dicts = get_set_cmd_dicts(out.strip())
305 for cmd_dict in list(cmd_dicts):
306 try:
307 inbuf = cmd_dict.pop('inbuf', None)
308 _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
309 except MonCommandFailed as e:
310 logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
311
312 def ok_to_stop_osd(
313 self,
314 osds: List[str],
315 known: Optional[List[str]] = None, # output argument
316 force: bool = False) -> HandleCommandResult:
317 r = HandleCommandResult(*self.mgr.mon_command({
318 'prefix': "osd ok-to-stop",
319 'ids': osds,
320 'max': 16,
321 }))
322 j = None
323 try:
324 j = json.loads(r.stdout)
325 except json.decoder.JSONDecodeError:
326 self.mgr.log.warning("osd ok-to-stop didn't return structured result")
327 raise
328 if r.retval:
329 return r
330 if known is not None and j and j.get('ok_to_stop'):
331 self.mgr.log.debug(f"got {j}")
332 known.extend([f'osd.{x}' for x in j.get('osds', [])])
333 return HandleCommandResult(
334 0,
335 f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
336 ''
337 )
338
339 def ok_to_stop(
340 self,
341 daemon_ids: List[str],
342 force: bool = False,
343 known: Optional[List[str]] = None # output argument
344 ) -> HandleCommandResult:
345 names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
346 out = f'It appears safe to stop {",".join(names)}'
347 err = f'It is NOT safe to stop {",".join(names)} at this time'
348
349 if self.TYPE not in ['mon', 'osd', 'mds']:
350 logger.debug(out)
351 return HandleCommandResult(0, out)
352
353 if self.TYPE == 'osd':
354 return self.ok_to_stop_osd(daemon_ids, known, force)
355
356 r = HandleCommandResult(*self.mgr.mon_command({
357 'prefix': f'{self.TYPE} ok-to-stop',
358 'ids': daemon_ids,
359 }))
360
361 if r.retval:
362 err = f'{err}: {r.stderr}' if r.stderr else err
363 logger.debug(err)
364 return HandleCommandResult(r.retval, r.stdout, err)
365
366 out = f'{out}: {r.stdout}' if r.stdout else out
367 logger.debug(out)
368 return HandleCommandResult(r.retval, out, r.stderr)
369
370 def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
371 # Provides a warning about if it possible or not to stop <n> daemons in a service
372 names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
373 number_of_running_daemons = len(
374 [daemon
375 for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
376 if daemon.status == DaemonDescriptionStatus.running])
377 if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
378 return False, f'It is presumed safe to stop {names}'
379
380 num_daemons_left = number_of_running_daemons - len(daemon_ids)
381
382 def plural(count: int) -> str:
383 return 'daemon' if count == 1 else 'daemons'
384
385 left_count = "no" if num_daemons_left == 0 else num_daemons_left
386
387 if alert:
388 out = (f'ALERT: Cannot stop {names} in {service} service. '
389 f'Not enough remaining {service} daemons. '
390 f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
391 else:
392 out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
393 f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
394 f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
395 return True, out
396
397 def pre_remove(self, daemon: DaemonDescription) -> None:
398 """
399 Called before the daemon is removed.
400 """
401 assert daemon.daemon_type is not None
402 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
403 logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
404
405 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
406 """
407 Called after the daemon is removed.
408 """
409 assert daemon.daemon_type is not None
410 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
411 logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
412
413 def purge(self, service_name: str) -> None:
414 """Called to carry out any purge tasks following service removal"""
415 logger.debug(f'Purge called for {self.TYPE} - no action taken')
416
417
418 class CephService(CephadmService):
419 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
420 # Ceph.daemons (mon, mgr, mds, osd, etc)
421 cephadm_config = self.get_config_and_keyring(
422 daemon_spec.daemon_type,
423 daemon_spec.daemon_id,
424 host=daemon_spec.host,
425 keyring=daemon_spec.keyring,
426 extra_ceph_config=daemon_spec.ceph_conf)
427
428 if daemon_spec.config_get_files():
429 cephadm_config.update({'files': daemon_spec.config_get_files()})
430
431 return cephadm_config, []
432
433 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
434 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
435 self.remove_keyring(daemon)
436
437 def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
438 """
439 Map the daemon id to a cephx keyring entity name
440 """
441 # despite this mapping entity names to daemons, self.TYPE within
442 # the CephService class refers to service types, not daemon types
443 if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
444 return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
445 elif self.TYPE == 'crash':
446 if host == "":
447 raise OrchestratorError("Host not provided to generate <crash> auth entity name")
448 return AuthEntity(f'client.{self.TYPE}.{host}')
449 elif self.TYPE == 'mon':
450 return AuthEntity('mon.')
451 elif self.TYPE in ['mgr', 'osd', 'mds']:
452 return AuthEntity(f'{self.TYPE}.{daemon_id}')
453 else:
454 raise OrchestratorError("unknown daemon type")
455
456 def get_config_and_keyring(self,
457 daemon_type: str,
458 daemon_id: str,
459 host: str,
460 keyring: Optional[str] = None,
461 extra_ceph_config: Optional[str] = None
462 ) -> Dict[str, Any]:
463 # keyring
464 if not keyring:
465 entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
466 ret, keyring, err = self.mgr.check_mon_command({
467 'prefix': 'auth get',
468 'entity': entity,
469 })
470
471 config = self.mgr.get_minimal_ceph_conf()
472
473 if extra_ceph_config:
474 config += extra_ceph_config
475
476 return {
477 'config': config,
478 'keyring': keyring,
479 }
480
481 def remove_keyring(self, daemon: DaemonDescription) -> None:
482 assert daemon.daemon_id is not None
483 assert daemon.hostname is not None
484 daemon_id: str = daemon.daemon_id
485 host: str = daemon.hostname
486
487 assert daemon.daemon_type != 'mon'
488
489 entity = self.get_auth_entity(daemon_id, host=host)
490
491 logger.info(f'Removing key for {entity}')
492 ret, out, err = self.mgr.mon_command({
493 'prefix': 'auth rm',
494 'entity': entity,
495 })
496
497
498 class MonService(CephService):
499 TYPE = 'mon'
500
501 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
502 """
503 Create a new monitor on the given host.
504 """
505 assert self.TYPE == daemon_spec.daemon_type
506 name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
507
508 # get mon. key
509 ret, keyring, err = self.mgr.check_mon_command({
510 'prefix': 'auth get',
511 'entity': self.get_auth_entity(name),
512 })
513
514 extra_config = '[mon.%s]\n' % name
515 if network:
516 # infer whether this is a CIDR network, addrvec, or plain IP
517 if '/' in network:
518 extra_config += 'public network = %s\n' % network
519 elif network.startswith('[v') and network.endswith(']'):
520 extra_config += 'public addrv = %s\n' % network
521 elif is_ipv6(network):
522 extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
523 elif ':' not in network:
524 extra_config += 'public addr = %s\n' % network
525 else:
526 raise OrchestratorError(
527 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
528 else:
529 # try to get the public_network from the config
530 ret, network, err = self.mgr.check_mon_command({
531 'prefix': 'config get',
532 'who': 'mon',
533 'key': 'public_network',
534 })
535 network = network.strip() if network else network
536 if not network:
537 raise OrchestratorError(
538 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
539 if '/' not in network:
540 raise OrchestratorError(
541 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
542 extra_config += 'public network = %s\n' % network
543
544 daemon_spec.ceph_conf = extra_config
545 daemon_spec.keyring = keyring
546
547 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
548
549 return daemon_spec
550
551 def _check_safe_to_destroy(self, mon_id: str) -> None:
552 ret, out, err = self.mgr.check_mon_command({
553 'prefix': 'quorum_status',
554 })
555 try:
556 j = json.loads(out)
557 except Exception:
558 raise OrchestratorError('failed to parse quorum status')
559
560 mons = [m['name'] for m in j['monmap']['mons']]
561 if mon_id not in mons:
562 logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
563 mon_id, mons))
564 return
565 new_mons = [m for m in mons if m != mon_id]
566 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
567 if len(new_quorum) > len(new_mons) / 2:
568 logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
569 (mon_id, new_quorum, new_mons))
570 return
571 raise OrchestratorError(
572 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
573
574 def pre_remove(self, daemon: DaemonDescription) -> None:
575 super().pre_remove(daemon)
576
577 assert daemon.daemon_id is not None
578 daemon_id: str = daemon.daemon_id
579 self._check_safe_to_destroy(daemon_id)
580
581 # remove mon from quorum before we destroy the daemon
582 logger.info('Removing monitor %s from monmap...' % daemon_id)
583 ret, out, err = self.mgr.check_mon_command({
584 'prefix': 'mon rm',
585 'name': daemon_id,
586 })
587
588 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
589 # Do not remove the mon keyring.
590 # super().post_remove(daemon)
591 pass
592
593
594 class MgrService(CephService):
595 TYPE = 'mgr'
596
597 def allow_colo(self) -> bool:
598 if self.mgr.get_ceph_option('mgr_standby_modules'):
599 # traditional mgr mode: standby daemons' modules listen on
600 # ports and redirect to the primary. we must not schedule
601 # multiple mgrs on the same host or else ports will
602 # conflict.
603 return False
604 else:
605 # standby daemons do nothing, and therefore port conflicts
606 # are not a concern.
607 return True
608
609 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
610 """
611 Create a new manager instance on a host.
612 """
613 assert self.TYPE == daemon_spec.daemon_type
614 mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
615
616 # get mgr. key
617 keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
618 ['mon', 'profile mgr',
619 'osd', 'allow *',
620 'mds', 'allow *'])
621
622 # Retrieve ports used by manager modules
623 # In the case of the dashboard port and with several manager daemons
624 # running in different hosts, it exists the possibility that the
625 # user has decided to use different dashboard ports in each server
626 # If this is the case then the dashboard port opened will be only the used
627 # as default.
628 ports = []
629 ret, mgr_services, err = self.mgr.check_mon_command({
630 'prefix': 'mgr services',
631 })
632 if mgr_services:
633 mgr_endpoints = json.loads(mgr_services)
634 for end_point in mgr_endpoints.values():
635 port = re.search(r'\:\d+\/', end_point)
636 if port:
637 ports.append(int(port[0][1:-1]))
638
639 if ports:
640 daemon_spec.ports = ports
641
642 daemon_spec.keyring = keyring
643
644 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
645
646 return daemon_spec
647
648 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
649 for daemon in daemon_descrs:
650 assert daemon.daemon_type is not None
651 assert daemon.daemon_id is not None
652 if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
653 return daemon
654 # if no active mgr found, return empty Daemon Desc
655 return DaemonDescription()
656
657 def fail_over(self) -> None:
658 if not self.mgr_map_has_standby():
659 raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
660 'daemon', 'mgr' + self.mgr.get_mgr_id()))
661
662 self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
663 'INFO', 'Failing over to other MGR')
664 logger.info('Failing over to other MGR')
665
666 # fail over
667 ret, out, err = self.mgr.check_mon_command({
668 'prefix': 'mgr fail',
669 'who': self.mgr.get_mgr_id(),
670 })
671
672 def mgr_map_has_standby(self) -> bool:
673 """
674 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
675 we know it joined the cluster
676 """
677 mgr_map = self.mgr.get('mgr_map')
678 num = len(mgr_map.get('standbys'))
679 return bool(num)
680
681 def ok_to_stop(
682 self,
683 daemon_ids: List[str],
684 force: bool = False,
685 known: Optional[List[str]] = None # output argument
686 ) -> HandleCommandResult:
687 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
688
689 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
690 if warn:
691 return HandleCommandResult(-errno.EBUSY, '', warn_message)
692
693 mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
694 active = self.get_active_daemon(mgr_daemons).daemon_id
695 if active in daemon_ids:
696 warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
697 return HandleCommandResult(-errno.EBUSY, '', warn_message)
698
699 return HandleCommandResult(0, warn_message, '')
700
701
702 class MdsService(CephService):
703 TYPE = 'mds'
704
705 def allow_colo(self) -> bool:
706 return True
707
708 def config(self, spec: ServiceSpec) -> None:
709 assert self.TYPE == spec.service_type
710 assert spec.service_id
711
712 # ensure mds_join_fs is set for these daemons
713 ret, out, err = self.mgr.check_mon_command({
714 'prefix': 'config set',
715 'who': 'mds.' + spec.service_id,
716 'name': 'mds_join_fs',
717 'value': spec.service_id,
718 })
719
720 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
721 assert self.TYPE == daemon_spec.daemon_type
722 mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
723
724 # get mds. key
725 keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
726 ['mon', 'profile mds',
727 'osd', 'allow rw tag cephfs *=*',
728 'mds', 'allow'])
729 daemon_spec.keyring = keyring
730
731 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
732
733 return daemon_spec
734
735 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
736 active_mds_strs = list()
737 for fs in self.mgr.get('fs_map')['filesystems']:
738 mds_map = fs['mdsmap']
739 if mds_map is not None:
740 for mds_id, mds_status in mds_map['info'].items():
741 if mds_status['state'] == 'up:active':
742 active_mds_strs.append(mds_status['name'])
743 if len(active_mds_strs) != 0:
744 for daemon in daemon_descrs:
745 if daemon.daemon_id in active_mds_strs:
746 return daemon
747 # if no mds found, return empty Daemon Desc
748 return DaemonDescription()
749
750 def purge(self, service_name: str) -> None:
751 self.mgr.check_mon_command({
752 'prefix': 'config rm',
753 'who': service_name,
754 'name': 'mds_join_fs',
755 })
756
757
758 class RgwService(CephService):
759 TYPE = 'rgw'
760
761 def allow_colo(self) -> bool:
762 return True
763
764 def config(self, spec: RGWSpec) -> None: # type: ignore
765 assert self.TYPE == spec.service_type
766
767 # set rgw_realm and rgw_zone, if present
768 if spec.rgw_realm:
769 ret, out, err = self.mgr.check_mon_command({
770 'prefix': 'config set',
771 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
772 'name': 'rgw_realm',
773 'value': spec.rgw_realm,
774 })
775 if spec.rgw_zone:
776 ret, out, err = self.mgr.check_mon_command({
777 'prefix': 'config set',
778 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
779 'name': 'rgw_zone',
780 'value': spec.rgw_zone,
781 })
782
783 if spec.rgw_frontend_ssl_certificate:
784 if isinstance(spec.rgw_frontend_ssl_certificate, list):
785 cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
786 elif isinstance(spec.rgw_frontend_ssl_certificate, str):
787 cert_data = spec.rgw_frontend_ssl_certificate
788 else:
789 raise OrchestratorError(
790 'Invalid rgw_frontend_ssl_certificate: %s'
791 % spec.rgw_frontend_ssl_certificate)
792 ret, out, err = self.mgr.check_mon_command({
793 'prefix': 'config-key set',
794 'key': f'rgw/cert/{spec.service_name()}',
795 'val': cert_data,
796 })
797
798 # TODO: fail, if we don't have a spec
799 logger.info('Saving service %s spec with placement %s' % (
800 spec.service_name(), spec.placement.pretty_str()))
801 self.mgr.spec_store.save(spec)
802 self.mgr.trigger_connect_dashboard_rgw()
803
804 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
805 assert self.TYPE == daemon_spec.daemon_type
806 rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
807 spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
808
809 keyring = self.get_keyring(rgw_id)
810
811 if daemon_spec.ports:
812 port = daemon_spec.ports[0]
813 else:
814 # this is a redeploy of older instance that doesn't have an explicitly
815 # assigned port, in which case we can assume there is only 1 per host
816 # and it matches the spec.
817 port = spec.get_port()
818
819 # configure frontend
820 args = []
821 ftype = spec.rgw_frontend_type or "beast"
822 if ftype == 'beast':
823 if spec.ssl:
824 if daemon_spec.ip:
825 args.append(
826 f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
827 else:
828 args.append(f"ssl_port={port}")
829 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
830 else:
831 if daemon_spec.ip:
832 args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
833 else:
834 args.append(f"port={port}")
835 elif ftype == 'civetweb':
836 if spec.ssl:
837 if daemon_spec.ip:
838 # note the 's' suffix on port
839 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
840 else:
841 args.append(f"port={port}s") # note the 's' suffix on port
842 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
843 else:
844 if daemon_spec.ip:
845 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
846 else:
847 args.append(f"port={port}")
848 frontend = f'{ftype} {" ".join(args)}'
849
850 ret, out, err = self.mgr.check_mon_command({
851 'prefix': 'config set',
852 'who': utils.name_to_config_section(daemon_spec.name()),
853 'name': 'rgw_frontends',
854 'value': frontend
855 })
856
857 daemon_spec.keyring = keyring
858 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
859
860 return daemon_spec
861
862 def get_keyring(self, rgw_id: str) -> str:
863 keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
864 ['mon', 'allow *',
865 'mgr', 'allow rw',
866 'osd', 'allow rwx tag rgw *=*'])
867 return keyring
868
869 def purge(self, service_name: str) -> None:
870 self.mgr.check_mon_command({
871 'prefix': 'config rm',
872 'who': utils.name_to_config_section(service_name),
873 'name': 'rgw_realm',
874 })
875 self.mgr.check_mon_command({
876 'prefix': 'config rm',
877 'who': utils.name_to_config_section(service_name),
878 'name': 'rgw_zone',
879 })
880 self.mgr.check_mon_command({
881 'prefix': 'config-key rm',
882 'key': f'rgw/cert/{service_name}',
883 })
884 self.mgr.trigger_connect_dashboard_rgw()
885
886 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
887 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
888 self.mgr.check_mon_command({
889 'prefix': 'config rm',
890 'who': utils.name_to_config_section(daemon.name()),
891 'name': 'rgw_frontends',
892 })
893
894 def ok_to_stop(
895 self,
896 daemon_ids: List[str],
897 force: bool = False,
898 known: Optional[List[str]] = None # output argument
899 ) -> HandleCommandResult:
900 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
901 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
902 def ingress_present() -> bool:
903 running_ingress_daemons = [
904 daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
905 running_haproxy_daemons = [
906 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
907 running_keepalived_daemons = [
908 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
909 # check that there is at least one haproxy and keepalived daemon running
910 if running_haproxy_daemons and running_keepalived_daemons:
911 return True
912 return False
913
914 # if only 1 rgw, alert user (this is not passable with --force)
915 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
916 if warn:
917 return HandleCommandResult(-errno.EBUSY, '', warn_message)
918
919 # if reached here, there is > 1 rgw daemon.
920 # Say okay if load balancer present or force flag set
921 if ingress_present() or force:
922 return HandleCommandResult(0, warn_message, '')
923
924 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
925 # Provide warning
926 warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
927 return HandleCommandResult(-errno.EBUSY, '', warn_message)
928
929 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
930 self.mgr.trigger_connect_dashboard_rgw()
931
932
933 class RbdMirrorService(CephService):
934 TYPE = 'rbd-mirror'
935
936 def allow_colo(self) -> bool:
937 return True
938
939 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
940 assert self.TYPE == daemon_spec.daemon_type
941 daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
942
943 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
944 ['mon', 'profile rbd-mirror',
945 'osd', 'profile rbd'])
946
947 daemon_spec.keyring = keyring
948
949 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
950
951 return daemon_spec
952
953 def ok_to_stop(
954 self,
955 daemon_ids: List[str],
956 force: bool = False,
957 known: Optional[List[str]] = None # output argument
958 ) -> HandleCommandResult:
959 # if only 1 rbd-mirror, alert user (this is not passable with --force)
960 warn, warn_message = self._enough_daemons_to_stop(
961 self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
962 if warn:
963 return HandleCommandResult(-errno.EBUSY, '', warn_message)
964 return HandleCommandResult(0, warn_message, '')
965
966
967 class CrashService(CephService):
968 TYPE = 'crash'
969
970 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
971 assert self.TYPE == daemon_spec.daemon_type
972 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
973
974 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
975 ['mon', 'profile crash',
976 'mgr', 'profile crash'])
977
978 daemon_spec.keyring = keyring
979
980 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
981
982 return daemon_spec
983
984
985 class CephfsMirrorService(CephService):
986 TYPE = 'cephfs-mirror'
987
988 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
989 assert self.TYPE == daemon_spec.daemon_type
990
991 ret, keyring, err = self.mgr.check_mon_command({
992 'prefix': 'auth get-or-create',
993 'entity': self.get_auth_entity(daemon_spec.daemon_id),
994 'caps': ['mon', 'profile cephfs-mirror',
995 'mds', 'allow r',
996 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
997 'mgr', 'allow r'],
998 })
999
1000 daemon_spec.keyring = keyring
1001 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1002 return daemon_spec