]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
1 import errno
2 import json
3 import logging
4 import re
5 from abc import ABCMeta, abstractmethod
6 from typing import TYPE_CHECKING, List, Callable, TypeVar, \
7 Optional, Dict, Any, Tuple, NewType, cast
8
9 from mgr_module import HandleCommandResult, MonCommandFailed
10
11 from ceph.deployment.service_spec import ServiceSpec, RGWSpec
12 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
13 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
14 from orchestrator._interface import daemon_type_to_service
15 from cephadm import utils
16
17 if TYPE_CHECKING:
18 from cephadm.module import CephadmOrchestrator
19
20 logger = logging.getLogger(__name__)
21
22 ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
23 AuthEntity = NewType('AuthEntity', str)
24
25
26 class CephadmDaemonDeploySpec:
27 # typing.NamedTuple + Generic is broken in py36
28 def __init__(self, host: str, daemon_id: str,
29 service_name: str,
30 network: Optional[str] = None,
31 keyring: Optional[str] = None,
32 extra_args: Optional[List[str]] = None,
33 ceph_conf: str = '',
34 extra_files: Optional[Dict[str, Any]] = None,
35 daemon_type: Optional[str] = None,
36 ip: Optional[str] = None,
37 ports: Optional[List[int]] = None,
38 rank: Optional[int] = None,
39 rank_generation: Optional[int] = None):
40 """
41 A data struction to encapsulate `cephadm deploy ...
42 """
43 self.host: str = host
44 self.daemon_id = daemon_id
45 self.service_name = service_name
46 daemon_type = daemon_type or (service_name.split('.')[0])
47 assert daemon_type is not None
48 self.daemon_type: str = daemon_type
49
50 # mons
51 self.network = network
52
53 # for run_cephadm.
54 self.keyring: Optional[str] = keyring
55
56 # For run_cephadm. Would be great to have more expressive names.
57 self.extra_args: List[str] = extra_args or []
58
59 self.ceph_conf = ceph_conf
60 self.extra_files = extra_files or {}
61
62 # TCP ports used by the daemon
63 self.ports: List[int] = ports or []
64 self.ip: Optional[str] = ip
65
66 # values to be populated during generate_config calls
67 # and then used in _run_cephadm
68 self.final_config: Dict[str, Any] = {}
69 self.deps: List[str] = []
70
71 self.rank: Optional[int] = rank
72 self.rank_generation: Optional[int] = rank_generation
73
74 def name(self) -> str:
75 return '%s.%s' % (self.daemon_type, self.daemon_id)
76
77 def config_get_files(self) -> Dict[str, Any]:
78 files = self.extra_files
79 if self.ceph_conf:
80 files['config'] = self.ceph_conf
81
82 return files
83
84 @staticmethod
85 def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
86 assert dd.hostname
87 assert dd.daemon_id
88 assert dd.daemon_type
89 return CephadmDaemonDeploySpec(
90 host=dd.hostname,
91 daemon_id=dd.daemon_id,
92 daemon_type=dd.daemon_type,
93 service_name=dd.service_name(),
94 ip=dd.ip,
95 ports=dd.ports,
96 rank=dd.rank,
97 rank_generation=dd.rank_generation,
98 )
99
100 def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
101 return DaemonDescription(
102 daemon_type=self.daemon_type,
103 daemon_id=self.daemon_id,
104 service_name=self.service_name,
105 hostname=self.host,
106 status=status,
107 status_desc=status_desc,
108 ip=self.ip,
109 ports=self.ports,
110 rank=self.rank,
111 rank_generation=self.rank_generation,
112 )
113
114
115 class CephadmService(metaclass=ABCMeta):
116 """
117 Base class for service types. Often providing a create() and config() fn.
118 """
119
120 @property
121 @abstractmethod
122 def TYPE(self) -> str:
123 pass
124
125 def __init__(self, mgr: "CephadmOrchestrator"):
126 self.mgr: "CephadmOrchestrator" = mgr
127
128 def allow_colo(self) -> bool:
129 """
130 Return True if multiple daemons of the same type can colocate on
131 the same host.
132 """
133 return False
134
135 def primary_daemon_type(self) -> str:
136 """
137 This is the type of the primary (usually only) daemon to be deployed.
138 """
139 return self.TYPE
140
141 def per_host_daemon_type(self) -> Optional[str]:
142 """
143 If defined, this type of daemon will be deployed once for each host
144 containing one or more daemons of the primary type.
145 """
146 return None
147
148 def ranked(self) -> bool:
149 """
150 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
151 generation (0, 1, ...) to each daemon we create/deploy.
152 """
153 return False
154
155 def fence_old_ranks(self,
156 spec: ServiceSpec,
157 rank_map: Dict[int, Dict[int, Optional[str]]],
158 num_ranks: int) -> None:
159 assert False
160
161 def make_daemon_spec(
162 self,
163 host: str,
164 daemon_id: str,
165 network: str,
166 spec: ServiceSpecs,
167 daemon_type: Optional[str] = None,
168 ports: Optional[List[int]] = None,
169 ip: Optional[str] = None,
170 rank: Optional[int] = None,
171 rank_generation: Optional[int] = None,
172 ) -> CephadmDaemonDeploySpec:
173 return CephadmDaemonDeploySpec(
174 host=host,
175 daemon_id=daemon_id,
176 service_name=spec.service_name(),
177 network=network,
178 daemon_type=daemon_type,
179 ports=ports,
180 ip=ip,
181 rank=rank,
182 rank_generation=rank_generation,
183 )
184
185 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
186 raise NotImplementedError()
187
188 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
189 raise NotImplementedError()
190
191 def config(self, spec: ServiceSpec, daemon_id: str) -> None:
192 """
193 Configure the cluster for this service. Only called *once* per
194 service apply. Not for every daemon.
195 """
196 pass
197
198 def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
199 """The post actions needed to be done after daemons are checked"""
200 if self.mgr.config_dashboard:
201 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
202 self.config_dashboard(daemon_descrs)
203 else:
204 logger.debug('Dashboard is not enabled. Skip configuration.')
205
206 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
207 """Config dashboard settings."""
208 raise NotImplementedError()
209
210 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
211 # if this is called for a service type where it hasn't explcitly been
212 # defined, return empty Daemon Desc
213 return DaemonDescription()
214
215 def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
216 ret, keyring, err = self.mgr.mon_command({
217 'prefix': 'auth get-or-create',
218 'entity': entity,
219 'caps': caps,
220 })
221 if err:
222 ret, out, err = self.mgr.mon_command({
223 'prefix': 'auth caps',
224 'entity': entity,
225 'caps': caps,
226 })
227 if err:
228 self.mgr.log.warning(f"Unable to update caps for {entity}")
229 return keyring
230
231 def _inventory_get_addr(self, hostname: str) -> str:
232 """Get a host's address with its hostname."""
233 return self.mgr.inventory.get_addr(hostname)
234
235 def _set_service_url_on_dashboard(self,
236 service_name: str,
237 get_mon_cmd: str,
238 set_mon_cmd: str,
239 service_url: str) -> None:
240 """A helper to get and set service_url via Dashboard's MON command.
241
242 If result of get_mon_cmd differs from service_url, set_mon_cmd will
243 be sent to set the service_url.
244 """
245 def get_set_cmd_dicts(out: str) -> List[dict]:
246 cmd_dict = {
247 'prefix': set_mon_cmd,
248 'value': service_url
249 }
250 return [cmd_dict] if service_url != out else []
251
252 self._check_and_set_dashboard(
253 service_name=service_name,
254 get_cmd=get_mon_cmd,
255 get_set_cmd_dicts=get_set_cmd_dicts
256 )
257
258 def _check_and_set_dashboard(self,
259 service_name: str,
260 get_cmd: str,
261 get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
262 """A helper to set configs in the Dashboard.
263
264 The method is useful for the pattern:
265 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
266 gateways.
267 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
268 - Determine if the config need to be update. NOTE: This step is important because if a
269 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
270 kicks the serve() loop and the logic using this method is likely to be called again.
271 A config should be updated only when needed.
272 - Update a config in Dashboard by using a Dashboard command.
273
274 :param service_name: the service name to be used for logging
275 :type service_name: str
276 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
277 :type get_cmd: str
278 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
279 e.g.
280 [
281 {
282 'prefix': 'dashboard iscsi-gateway-add',
283 'service_url': 'http://admin:admin@aaa:5000',
284 'name': 'aaa'
285 },
286 {
287 'prefix': 'dashboard iscsi-gateway-add',
288 'service_url': 'http://admin:admin@bbb:5000',
289 'name': 'bbb'
290 }
291 ]
292 The function should return empty list if no command need to be sent.
293 :type get_set_cmd_dicts: Callable[[str], List[dict]]
294 """
295
296 try:
297 _, out, _ = self.mgr.check_mon_command({
298 'prefix': get_cmd
299 })
300 except MonCommandFailed as e:
301 logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
302 return
303 cmd_dicts = get_set_cmd_dicts(out.strip())
304 for cmd_dict in list(cmd_dicts):
305 try:
306 inbuf = cmd_dict.pop('inbuf', None)
307 _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
308 except MonCommandFailed as e:
309 logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
310
311 def ok_to_stop_osd(
312 self,
313 osds: List[str],
314 known: Optional[List[str]] = None, # output argument
315 force: bool = False) -> HandleCommandResult:
316 r = HandleCommandResult(*self.mgr.mon_command({
317 'prefix': "osd ok-to-stop",
318 'ids': osds,
319 'max': 16,
320 }))
321 j = None
322 try:
323 j = json.loads(r.stdout)
324 except json.decoder.JSONDecodeError:
325 self.mgr.log.warning("osd ok-to-stop didn't return structured result")
326 raise
327 if r.retval:
328 return r
329 if known is not None and j and j.get('ok_to_stop'):
330 self.mgr.log.debug(f"got {j}")
331 known.extend([f'osd.{x}' for x in j.get('osds', [])])
332 return HandleCommandResult(
333 0,
334 f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
335 ''
336 )
337
338 def ok_to_stop(
339 self,
340 daemon_ids: List[str],
341 force: bool = False,
342 known: Optional[List[str]] = None # output argument
343 ) -> HandleCommandResult:
344 names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
345 out = f'It appears safe to stop {",".join(names)}'
346 err = f'It is NOT safe to stop {",".join(names)} at this time'
347
348 if self.TYPE not in ['mon', 'osd', 'mds']:
349 logger.debug(out)
350 return HandleCommandResult(0, out)
351
352 if self.TYPE == 'osd':
353 return self.ok_to_stop_osd(daemon_ids, known, force)
354
355 r = HandleCommandResult(*self.mgr.mon_command({
356 'prefix': f'{self.TYPE} ok-to-stop',
357 'ids': daemon_ids,
358 }))
359
360 if r.retval:
361 err = f'{err}: {r.stderr}' if r.stderr else err
362 logger.debug(err)
363 return HandleCommandResult(r.retval, r.stdout, err)
364
365 out = f'{out}: {r.stdout}' if r.stdout else out
366 logger.debug(out)
367 return HandleCommandResult(r.retval, out, r.stderr)
368
369 def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
370 # Provides a warning about if it possible or not to stop <n> daemons in a service
371 names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
372 number_of_running_daemons = len(
373 [daemon
374 for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
375 if daemon.status == DaemonDescriptionStatus.running])
376 if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
377 return False, f'It is presumed safe to stop {names}'
378
379 num_daemons_left = number_of_running_daemons - len(daemon_ids)
380
381 def plural(count: int) -> str:
382 return 'daemon' if count == 1 else 'daemons'
383
384 left_count = "no" if num_daemons_left == 0 else num_daemons_left
385
386 if alert:
387 out = (f'ALERT: Cannot stop {names} in {service} service. '
388 f'Not enough remaining {service} daemons. '
389 f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
390 else:
391 out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
392 f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
393 f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
394 return True, out
395
396 def pre_remove(self, daemon: DaemonDescription) -> None:
397 """
398 Called before the daemon is removed.
399 """
400 assert daemon.daemon_type is not None
401 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
402 logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
403
404 def post_remove(self, daemon: DaemonDescription) -> None:
405 """
406 Called after the daemon is removed.
407 """
408 assert daemon.daemon_type is not None
409 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
410 logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
411
412 def purge(self, service_name: str) -> None:
413 """Called to carry out any purge tasks following service removal"""
414 logger.debug(f'Purge called for {self.TYPE} - no action taken')
415
416
417 class CephService(CephadmService):
418 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
419 # Ceph.daemons (mon, mgr, mds, osd, etc)
420 cephadm_config = self.get_config_and_keyring(
421 daemon_spec.daemon_type,
422 daemon_spec.daemon_id,
423 host=daemon_spec.host,
424 keyring=daemon_spec.keyring,
425 extra_ceph_config=daemon_spec.ceph_conf)
426
427 if daemon_spec.config_get_files():
428 cephadm_config.update({'files': daemon_spec.config_get_files()})
429
430 return cephadm_config, []
431
432 def post_remove(self, daemon: DaemonDescription) -> None:
433 super().post_remove(daemon)
434 self.remove_keyring(daemon)
435
436 def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
437 """
438 Map the daemon id to a cephx keyring entity name
439 """
440 # despite this mapping entity names to daemons, self.TYPE within
441 # the CephService class refers to service types, not daemon types
442 if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
443 return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
444 elif self.TYPE == 'crash':
445 if host == "":
446 raise OrchestratorError("Host not provided to generate <crash> auth entity name")
447 return AuthEntity(f'client.{self.TYPE}.{host}')
448 elif self.TYPE == 'mon':
449 return AuthEntity('mon.')
450 elif self.TYPE in ['mgr', 'osd', 'mds']:
451 return AuthEntity(f'{self.TYPE}.{daemon_id}')
452 else:
453 raise OrchestratorError("unknown daemon type")
454
455 def get_config_and_keyring(self,
456 daemon_type: str,
457 daemon_id: str,
458 host: str,
459 keyring: Optional[str] = None,
460 extra_ceph_config: Optional[str] = None
461 ) -> Dict[str, Any]:
462 # keyring
463 if not keyring:
464 entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
465 ret, keyring, err = self.mgr.check_mon_command({
466 'prefix': 'auth get',
467 'entity': entity,
468 })
469
470 config = self.mgr.get_minimal_ceph_conf()
471
472 if extra_ceph_config:
473 config += extra_ceph_config
474
475 return {
476 'config': config,
477 'keyring': keyring,
478 }
479
480 def remove_keyring(self, daemon: DaemonDescription) -> None:
481 assert daemon.daemon_id is not None
482 assert daemon.hostname is not None
483 daemon_id: str = daemon.daemon_id
484 host: str = daemon.hostname
485
486 if daemon_id == 'mon':
487 # do not remove the mon keyring
488 return
489
490 entity = self.get_auth_entity(daemon_id, host=host)
491
492 logger.info(f'Removing key for {entity}')
493 ret, out, err = self.mgr.mon_command({
494 'prefix': 'auth rm',
495 'entity': entity,
496 })
497
498
499 class MonService(CephService):
500 TYPE = 'mon'
501
502 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
503 """
504 Create a new monitor on the given host.
505 """
506 assert self.TYPE == daemon_spec.daemon_type
507 name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
508
509 # get mon. key
510 ret, keyring, err = self.mgr.check_mon_command({
511 'prefix': 'auth get',
512 'entity': self.get_auth_entity(name),
513 })
514
515 extra_config = '[mon.%s]\n' % name
516 if network:
517 # infer whether this is a CIDR network, addrvec, or plain IP
518 if '/' in network:
519 extra_config += 'public network = %s\n' % network
520 elif network.startswith('[v') and network.endswith(']'):
521 extra_config += 'public addrv = %s\n' % network
522 elif is_ipv6(network):
523 extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
524 elif ':' not in network:
525 extra_config += 'public addr = %s\n' % network
526 else:
527 raise OrchestratorError(
528 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
529 else:
530 # try to get the public_network from the config
531 ret, network, err = self.mgr.check_mon_command({
532 'prefix': 'config get',
533 'who': 'mon',
534 'key': 'public_network',
535 })
536 network = network.strip() if network else network
537 if not network:
538 raise OrchestratorError(
539 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
540 if '/' not in network:
541 raise OrchestratorError(
542 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
543 extra_config += 'public network = %s\n' % network
544
545 daemon_spec.ceph_conf = extra_config
546 daemon_spec.keyring = keyring
547
548 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
549
550 return daemon_spec
551
552 def _check_safe_to_destroy(self, mon_id: str) -> None:
553 ret, out, err = self.mgr.check_mon_command({
554 'prefix': 'quorum_status',
555 })
556 try:
557 j = json.loads(out)
558 except Exception:
559 raise OrchestratorError('failed to parse quorum status')
560
561 mons = [m['name'] for m in j['monmap']['mons']]
562 if mon_id not in mons:
563 logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
564 mon_id, mons))
565 return
566 new_mons = [m for m in mons if m != mon_id]
567 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
568 if len(new_quorum) > len(new_mons) / 2:
569 logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
570 (mon_id, new_quorum, new_mons))
571 return
572 raise OrchestratorError(
573 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
574
575 def pre_remove(self, daemon: DaemonDescription) -> None:
576 super().pre_remove(daemon)
577
578 assert daemon.daemon_id is not None
579 daemon_id: str = daemon.daemon_id
580 self._check_safe_to_destroy(daemon_id)
581
582 # remove mon from quorum before we destroy the daemon
583 logger.info('Removing monitor %s from monmap...' % daemon_id)
584 ret, out, err = self.mgr.check_mon_command({
585 'prefix': 'mon rm',
586 'name': daemon_id,
587 })
588
589
590 class MgrService(CephService):
591 TYPE = 'mgr'
592
593 def allow_colo(self) -> bool:
594 if self.mgr.get_ceph_option('mgr_standby_modules'):
595 # traditional mgr mode: standby daemons' modules listen on
596 # ports and redirect to the primary. we must not schedule
597 # multiple mgrs on the same host or else ports will
598 # conflict.
599 return False
600 else:
601 # standby daemons do nothing, and therefore port conflicts
602 # are not a concern.
603 return True
604
605 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
606 """
607 Create a new manager instance on a host.
608 """
609 assert self.TYPE == daemon_spec.daemon_type
610 mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
611
612 # get mgr. key
613 keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
614 ['mon', 'profile mgr',
615 'osd', 'allow *',
616 'mds', 'allow *'])
617
618 # Retrieve ports used by manager modules
619 # In the case of the dashboard port and with several manager daemons
620 # running in different hosts, it exists the possibility that the
621 # user has decided to use different dashboard ports in each server
622 # If this is the case then the dashboard port opened will be only the used
623 # as default.
624 ports = []
625 ret, mgr_services, err = self.mgr.check_mon_command({
626 'prefix': 'mgr services',
627 })
628 if mgr_services:
629 mgr_endpoints = json.loads(mgr_services)
630 for end_point in mgr_endpoints.values():
631 port = re.search(r'\:\d+\/', end_point)
632 if port:
633 ports.append(int(port[0][1:-1]))
634
635 if ports:
636 daemon_spec.ports = ports
637
638 daemon_spec.keyring = keyring
639
640 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
641
642 return daemon_spec
643
644 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
645 for daemon in daemon_descrs:
646 assert daemon.daemon_type is not None
647 assert daemon.daemon_id is not None
648 if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
649 return daemon
650 # if no active mgr found, return empty Daemon Desc
651 return DaemonDescription()
652
653 def fail_over(self) -> None:
654 if not self.mgr_map_has_standby():
655 raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
656 'daemon', 'mgr' + self.mgr.get_mgr_id()))
657
658 self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
659 'INFO', 'Failing over to other MGR')
660 logger.info('Failing over to other MGR')
661
662 # fail over
663 ret, out, err = self.mgr.check_mon_command({
664 'prefix': 'mgr fail',
665 'who': self.mgr.get_mgr_id(),
666 })
667
668 def mgr_map_has_standby(self) -> bool:
669 """
670 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
671 we know it joined the cluster
672 """
673 mgr_map = self.mgr.get('mgr_map')
674 num = len(mgr_map.get('standbys'))
675 return bool(num)
676
677 def ok_to_stop(
678 self,
679 daemon_ids: List[str],
680 force: bool = False,
681 known: Optional[List[str]] = None # output argument
682 ) -> HandleCommandResult:
683 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
684
685 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
686 if warn:
687 return HandleCommandResult(-errno.EBUSY, '', warn_message)
688
689 mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
690 active = self.get_active_daemon(mgr_daemons).daemon_id
691 if active in daemon_ids:
692 warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
693 return HandleCommandResult(-errno.EBUSY, '', warn_message)
694
695 return HandleCommandResult(0, warn_message, '')
696
697
698 class MdsService(CephService):
699 TYPE = 'mds'
700
701 def allow_colo(self) -> bool:
702 return True
703
704 def config(self, spec: ServiceSpec, daemon_id: str) -> None:
705 assert self.TYPE == spec.service_type
706 assert spec.service_id
707
708 # ensure mds_join_fs is set for these daemons
709 ret, out, err = self.mgr.check_mon_command({
710 'prefix': 'config set',
711 'who': 'mds.' + spec.service_id,
712 'name': 'mds_join_fs',
713 'value': spec.service_id,
714 })
715
716 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
717 assert self.TYPE == daemon_spec.daemon_type
718 mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
719
720 # get mds. key
721 keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
722 ['mon', 'profile mds',
723 'osd', 'allow rw tag cephfs *=*',
724 'mds', 'allow'])
725 daemon_spec.keyring = keyring
726
727 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
728
729 return daemon_spec
730
731 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
732 active_mds_strs = list()
733 for fs in self.mgr.get('fs_map')['filesystems']:
734 mds_map = fs['mdsmap']
735 if mds_map is not None:
736 for mds_id, mds_status in mds_map['info'].items():
737 if mds_status['state'] == 'up:active':
738 active_mds_strs.append(mds_status['name'])
739 if len(active_mds_strs) != 0:
740 for daemon in daemon_descrs:
741 if daemon.daemon_id in active_mds_strs:
742 return daemon
743 # if no mds found, return empty Daemon Desc
744 return DaemonDescription()
745
746 def purge(self, service_name: str) -> None:
747 self.mgr.check_mon_command({
748 'prefix': 'config rm',
749 'who': service_name,
750 'name': 'mds_join_fs',
751 })
752
753
754 class RgwService(CephService):
755 TYPE = 'rgw'
756
757 def allow_colo(self) -> bool:
758 return True
759
760 def config(self, spec: RGWSpec, rgw_id: str) -> None: # type: ignore
761 assert self.TYPE == spec.service_type
762
763 # set rgw_realm and rgw_zone, if present
764 if spec.rgw_realm:
765 ret, out, err = self.mgr.check_mon_command({
766 'prefix': 'config set',
767 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
768 'name': 'rgw_realm',
769 'value': spec.rgw_realm,
770 })
771 if spec.rgw_zone:
772 ret, out, err = self.mgr.check_mon_command({
773 'prefix': 'config set',
774 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
775 'name': 'rgw_zone',
776 'value': spec.rgw_zone,
777 })
778
779 if spec.rgw_frontend_ssl_certificate:
780 if isinstance(spec.rgw_frontend_ssl_certificate, list):
781 cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
782 elif isinstance(spec.rgw_frontend_ssl_certificate, str):
783 cert_data = spec.rgw_frontend_ssl_certificate
784 else:
785 raise OrchestratorError(
786 'Invalid rgw_frontend_ssl_certificate: %s'
787 % spec.rgw_frontend_ssl_certificate)
788 ret, out, err = self.mgr.check_mon_command({
789 'prefix': 'config-key set',
790 'key': f'rgw/cert/{spec.service_name()}',
791 'val': cert_data,
792 })
793
794 # TODO: fail, if we don't have a spec
795 logger.info('Saving service %s spec with placement %s' % (
796 spec.service_name(), spec.placement.pretty_str()))
797 self.mgr.spec_store.save(spec)
798
799 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
800 assert self.TYPE == daemon_spec.daemon_type
801 rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
802 spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
803
804 keyring = self.get_keyring(rgw_id)
805
806 if daemon_spec.ports:
807 port = daemon_spec.ports[0]
808 else:
809 # this is a redeploy of older instance that doesn't have an explicitly
810 # assigned port, in which case we can assume there is only 1 per host
811 # and it matches the spec.
812 port = spec.get_port()
813
814 # configure frontend
815 args = []
816 ftype = spec.rgw_frontend_type or "beast"
817 if ftype == 'beast':
818 if spec.ssl:
819 if daemon_spec.ip:
820 args.append(f"ssl_endpoint={daemon_spec.ip}:{port}")
821 else:
822 args.append(f"ssl_port={port}")
823 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
824 else:
825 if daemon_spec.ip:
826 args.append(f"endpoint={daemon_spec.ip}:{port}")
827 else:
828 args.append(f"port={port}")
829 elif ftype == 'civetweb':
830 if spec.ssl:
831 if daemon_spec.ip:
832 args.append(f"port={daemon_spec.ip}:{port}s") # note the 's' suffix on port
833 else:
834 args.append(f"port={port}s") # note the 's' suffix on port
835 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
836 else:
837 if daemon_spec.ip:
838 args.append(f"port={daemon_spec.ip}:{port}")
839 else:
840 args.append(f"port={port}")
841 frontend = f'{ftype} {" ".join(args)}'
842
843 ret, out, err = self.mgr.check_mon_command({
844 'prefix': 'config set',
845 'who': utils.name_to_config_section(daemon_spec.name()),
846 'name': 'rgw_frontends',
847 'value': frontend
848 })
849
850 daemon_spec.keyring = keyring
851 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
852
853 return daemon_spec
854
855 def get_keyring(self, rgw_id: str) -> str:
856 keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
857 ['mon', 'allow *',
858 'mgr', 'allow rw',
859 'osd', 'allow rwx tag rgw *=*'])
860 return keyring
861
862 def purge(self, service_name: str) -> None:
863 self.mgr.check_mon_command({
864 'prefix': 'config rm',
865 'who': utils.name_to_config_section(service_name),
866 'name': 'rgw_realm',
867 })
868 self.mgr.check_mon_command({
869 'prefix': 'config rm',
870 'who': utils.name_to_config_section(service_name),
871 'name': 'rgw_zone',
872 })
873 self.mgr.check_mon_command({
874 'prefix': 'config-key rm',
875 'key': f'rgw/cert/{service_name}',
876 })
877
878 def post_remove(self, daemon: DaemonDescription) -> None:
879 super().post_remove(daemon)
880 self.mgr.check_mon_command({
881 'prefix': 'config rm',
882 'who': utils.name_to_config_section(daemon.name()),
883 'name': 'rgw_frontends',
884 })
885
886 def ok_to_stop(
887 self,
888 daemon_ids: List[str],
889 force: bool = False,
890 known: Optional[List[str]] = None # output argument
891 ) -> HandleCommandResult:
892 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
893 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
894 def ingress_present() -> bool:
895 running_ingress_daemons = [
896 daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
897 running_haproxy_daemons = [
898 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
899 running_keepalived_daemons = [
900 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
901 # check that there is at least one haproxy and keepalived daemon running
902 if running_haproxy_daemons and running_keepalived_daemons:
903 return True
904 return False
905
906 # if only 1 rgw, alert user (this is not passable with --force)
907 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
908 if warn:
909 return HandleCommandResult(-errno.EBUSY, '', warn_message)
910
911 # if reached here, there is > 1 rgw daemon.
912 # Say okay if load balancer present or force flag set
913 if ingress_present() or force:
914 return HandleCommandResult(0, warn_message, '')
915
916 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
917 # Provide warning
918 warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
919 return HandleCommandResult(-errno.EBUSY, '', warn_message)
920
921
922 class RbdMirrorService(CephService):
923 TYPE = 'rbd-mirror'
924
925 def allow_colo(self) -> bool:
926 return True
927
928 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
929 assert self.TYPE == daemon_spec.daemon_type
930 daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
931
932 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
933 ['mon', 'profile rbd-mirror',
934 'osd', 'profile rbd'])
935
936 daemon_spec.keyring = keyring
937
938 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
939
940 return daemon_spec
941
942 def ok_to_stop(
943 self,
944 daemon_ids: List[str],
945 force: bool = False,
946 known: Optional[List[str]] = None # output argument
947 ) -> HandleCommandResult:
948 # if only 1 rbd-mirror, alert user (this is not passable with --force)
949 warn, warn_message = self._enough_daemons_to_stop(
950 self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
951 if warn:
952 return HandleCommandResult(-errno.EBUSY, '', warn_message)
953 return HandleCommandResult(0, warn_message, '')
954
955
956 class CrashService(CephService):
957 TYPE = 'crash'
958
959 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
960 assert self.TYPE == daemon_spec.daemon_type
961 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
962
963 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
964 ['mon', 'profile crash',
965 'mgr', 'profile crash'])
966
967 daemon_spec.keyring = keyring
968
969 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
970
971 return daemon_spec
972
973
974 class CephfsMirrorService(CephService):
975 TYPE = 'cephfs-mirror'
976
977 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
978 assert self.TYPE == daemon_spec.daemon_type
979
980 ret, keyring, err = self.mgr.check_mon_command({
981 'prefix': 'auth get-or-create',
982 'entity': self.get_auth_entity(daemon_spec.daemon_id),
983 'caps': ['mon', 'profile cephfs-mirror',
984 'mds', 'allow r',
985 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
986 'mgr', 'allow r'],
987 })
988
989 daemon_spec.keyring = keyring
990 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
991 return daemon_spec