]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
1 import errno
2 import json
3 import logging
4 import re
5 from abc import ABCMeta, abstractmethod
6 from typing import TYPE_CHECKING, List, Callable, TypeVar, \
7 Optional, Dict, Any, Tuple, NewType, cast
8
9 from mgr_module import HandleCommandResult, MonCommandFailed
10
11 from ceph.deployment.service_spec import ServiceSpec, RGWSpec
12 from ceph.deployment.utils import is_ipv6, unwrap_ipv6
13 from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
14 from orchestrator._interface import daemon_type_to_service
15 from cephadm import utils
16
17 if TYPE_CHECKING:
18 from cephadm.module import CephadmOrchestrator
19
20 logger = logging.getLogger(__name__)
21
22 ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
23 AuthEntity = NewType('AuthEntity', str)
24
25
26 class CephadmDaemonDeploySpec:
27 # typing.NamedTuple + Generic is broken in py36
28 def __init__(self, host: str, daemon_id: str,
29 service_name: str,
30 network: Optional[str] = None,
31 keyring: Optional[str] = None,
32 extra_args: Optional[List[str]] = None,
33 ceph_conf: str = '',
34 extra_files: Optional[Dict[str, Any]] = None,
35 daemon_type: Optional[str] = None,
36 ip: Optional[str] = None,
37 ports: Optional[List[int]] = None):
38 """
39 A data struction to encapsulate `cephadm deploy ...
40 """
41 self.host: str = host
42 self.daemon_id = daemon_id
43 self.service_name = service_name
44 daemon_type = daemon_type or (service_name.split('.')[0])
45 assert daemon_type is not None
46 self.daemon_type: str = daemon_type
47
48 # mons
49 self.network = network
50
51 # for run_cephadm.
52 self.keyring: Optional[str] = keyring
53
54 # For run_cephadm. Would be great to have more expressive names.
55 self.extra_args: List[str] = extra_args or []
56
57 self.ceph_conf = ceph_conf
58 self.extra_files = extra_files or {}
59
60 # TCP ports used by the daemon
61 self.ports: List[int] = ports or []
62 self.ip: Optional[str] = ip
63
64 # values to be populated during generate_config calls
65 # and then used in _run_cephadm
66 self.final_config: Dict[str, Any] = {}
67 self.deps: List[str] = []
68
69 def name(self) -> str:
70 return '%s.%s' % (self.daemon_type, self.daemon_id)
71
72 def config_get_files(self) -> Dict[str, Any]:
73 files = self.extra_files
74 if self.ceph_conf:
75 files['config'] = self.ceph_conf
76
77 return files
78
79 @staticmethod
80 def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
81 assert dd.hostname
82 assert dd.daemon_id
83 assert dd.daemon_type
84 return CephadmDaemonDeploySpec(
85 host=dd.hostname,
86 daemon_id=dd.daemon_id,
87 daemon_type=dd.daemon_type,
88 service_name=dd.service_name(),
89 ip=dd.ip,
90 ports=dd.ports,
91 )
92
93 def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
94 return DaemonDescription(
95 daemon_type=self.daemon_type,
96 daemon_id=self.daemon_id,
97 hostname=self.host,
98 status=status,
99 status_desc=status_desc,
100 ip=self.ip,
101 ports=self.ports,
102 )
103
104
105 class CephadmService(metaclass=ABCMeta):
106 """
107 Base class for service types. Often providing a create() and config() fn.
108 """
109
110 @property
111 @abstractmethod
112 def TYPE(self) -> str:
113 pass
114
115 def __init__(self, mgr: "CephadmOrchestrator"):
116 self.mgr: "CephadmOrchestrator" = mgr
117
118 def allow_colo(self) -> bool:
119 return False
120
121 def per_host_daemon_type(self) -> Optional[str]:
122 return None
123
124 def primary_daemon_type(self) -> str:
125 return self.TYPE
126
127 def make_daemon_spec(
128 self, host: str,
129 daemon_id: str,
130 network: str,
131 spec: ServiceSpecs,
132 daemon_type: Optional[str] = None,
133 ports: Optional[List[int]] = None,
134 ip: Optional[str] = None,
135 ) -> CephadmDaemonDeploySpec:
136 return CephadmDaemonDeploySpec(
137 host=host,
138 daemon_id=daemon_id,
139 service_name=spec.service_name(),
140 network=network,
141 daemon_type=daemon_type,
142 ports=ports,
143 ip=ip,
144 )
145
146 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
147 raise NotImplementedError()
148
149 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
150 raise NotImplementedError()
151
152 def config(self, spec: ServiceSpec, daemon_id: str) -> None:
153 """
154 Configure the cluster for this service. Only called *once* per
155 service apply. Not for every daemon.
156 """
157 pass
158
159 def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
160 """The post actions needed to be done after daemons are checked"""
161 if self.mgr.config_dashboard:
162 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
163 self.config_dashboard(daemon_descrs)
164 else:
165 logger.debug('Dashboard is not enabled. Skip configuration.')
166
167 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
168 """Config dashboard settings."""
169 raise NotImplementedError()
170
171 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
172 # if this is called for a service type where it hasn't explcitly been
173 # defined, return empty Daemon Desc
174 return DaemonDescription()
175
176 def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
177 ret, keyring, err = self.mgr.mon_command({
178 'prefix': 'auth get-or-create',
179 'entity': entity,
180 'caps': caps,
181 })
182 if err:
183 ret, out, err = self.mgr.mon_command({
184 'prefix': 'auth caps',
185 'entity': entity,
186 'caps': caps,
187 })
188 if err:
189 self.mgr.log.warning(f"Unable to update caps for {entity}")
190 return keyring
191
192 def _inventory_get_addr(self, hostname: str) -> str:
193 """Get a host's address with its hostname."""
194 return self.mgr.inventory.get_addr(hostname)
195
196 def _set_service_url_on_dashboard(self,
197 service_name: str,
198 get_mon_cmd: str,
199 set_mon_cmd: str,
200 service_url: str) -> None:
201 """A helper to get and set service_url via Dashboard's MON command.
202
203 If result of get_mon_cmd differs from service_url, set_mon_cmd will
204 be sent to set the service_url.
205 """
206 def get_set_cmd_dicts(out: str) -> List[dict]:
207 cmd_dict = {
208 'prefix': set_mon_cmd,
209 'value': service_url
210 }
211 return [cmd_dict] if service_url != out else []
212
213 self._check_and_set_dashboard(
214 service_name=service_name,
215 get_cmd=get_mon_cmd,
216 get_set_cmd_dicts=get_set_cmd_dicts
217 )
218
219 def _check_and_set_dashboard(self,
220 service_name: str,
221 get_cmd: str,
222 get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
223 """A helper to set configs in the Dashboard.
224
225 The method is useful for the pattern:
226 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
227 gateways.
228 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
229 - Determine if the config need to be update. NOTE: This step is important because if a
230 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
231 kicks the serve() loop and the logic using this method is likely to be called again.
232 A config should be updated only when needed.
233 - Update a config in Dashboard by using a Dashboard command.
234
235 :param service_name: the service name to be used for logging
236 :type service_name: str
237 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
238 :type get_cmd: str
239 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
240 e.g.
241 [
242 {
243 'prefix': 'dashboard iscsi-gateway-add',
244 'service_url': 'http://admin:admin@aaa:5000',
245 'name': 'aaa'
246 },
247 {
248 'prefix': 'dashboard iscsi-gateway-add',
249 'service_url': 'http://admin:admin@bbb:5000',
250 'name': 'bbb'
251 }
252 ]
253 The function should return empty list if no command need to be sent.
254 :type get_set_cmd_dicts: Callable[[str], List[dict]]
255 """
256
257 try:
258 _, out, _ = self.mgr.check_mon_command({
259 'prefix': get_cmd
260 })
261 except MonCommandFailed as e:
262 logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
263 return
264 cmd_dicts = get_set_cmd_dicts(out.strip())
265 for cmd_dict in list(cmd_dicts):
266 try:
267 inbuf = cmd_dict.pop('inbuf', None)
268 _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
269 except MonCommandFailed as e:
270 logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
271
272 def ok_to_stop_osd(
273 self,
274 osds: List[str],
275 known: Optional[List[str]] = None, # output argument
276 force: bool = False) -> HandleCommandResult:
277 r = HandleCommandResult(*self.mgr.mon_command({
278 'prefix': "osd ok-to-stop",
279 'ids': osds,
280 'max': 16,
281 }))
282 j = None
283 try:
284 j = json.loads(r.stdout)
285 except json.decoder.JSONDecodeError:
286 self.mgr.log.warning("osd ok-to-stop didn't return structured result")
287 raise
288 if r.retval:
289 return r
290 if known is not None and j and j.get('ok_to_stop'):
291 self.mgr.log.debug(f"got {j}")
292 known.extend([f'osd.{x}' for x in j.get('osds', [])])
293 return HandleCommandResult(
294 0,
295 f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
296 ''
297 )
298
299 def ok_to_stop(
300 self,
301 daemon_ids: List[str],
302 force: bool = False,
303 known: Optional[List[str]] = None # output argument
304 ) -> HandleCommandResult:
305 names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
306 out = f'It appears safe to stop {",".join(names)}'
307 err = f'It is NOT safe to stop {",".join(names)} at this time'
308
309 if self.TYPE not in ['mon', 'osd', 'mds']:
310 logger.debug(out)
311 return HandleCommandResult(0, out)
312
313 if self.TYPE == 'osd':
314 return self.ok_to_stop_osd(daemon_ids, known, force)
315
316 r = HandleCommandResult(*self.mgr.mon_command({
317 'prefix': f'{self.TYPE} ok-to-stop',
318 'ids': daemon_ids,
319 }))
320
321 if r.retval:
322 err = f'{err}: {r.stderr}' if r.stderr else err
323 logger.debug(err)
324 return HandleCommandResult(r.retval, r.stdout, err)
325
326 out = f'{out}: {r.stdout}' if r.stdout else out
327 logger.debug(out)
328 return HandleCommandResult(r.retval, out, r.stderr)
329
330 def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
331 # Provides a warning about if it possible or not to stop <n> daemons in a service
332 names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
333 number_of_running_daemons = len(
334 [daemon
335 for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
336 if daemon.status == DaemonDescriptionStatus.running])
337 if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
338 return False, f'It is presumed safe to stop {names}'
339
340 num_daemons_left = number_of_running_daemons - len(daemon_ids)
341
342 def plural(count: int) -> str:
343 return 'daemon' if count == 1 else 'daemons'
344
345 left_count = "no" if num_daemons_left == 0 else num_daemons_left
346
347 if alert:
348 out = (f'ALERT: Cannot stop {names} in {service} service. '
349 f'Not enough remaining {service} daemons. '
350 f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
351 else:
352 out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
353 f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
354 f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
355 return True, out
356
357 def pre_remove(self, daemon: DaemonDescription) -> None:
358 """
359 Called before the daemon is removed.
360 """
361 assert daemon.daemon_type is not None
362 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
363 logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
364
365 def post_remove(self, daemon: DaemonDescription) -> None:
366 """
367 Called after the daemon is removed.
368 """
369 assert daemon.daemon_type is not None
370 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
371 logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
372
373 def purge(self, service_name: str) -> None:
374 """Called to carry out any purge tasks following service removal"""
375 logger.debug(f'Purge called for {self.TYPE} - no action taken')
376
377
378 class CephService(CephadmService):
379 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
380 # Ceph.daemons (mon, mgr, mds, osd, etc)
381 cephadm_config = self.get_config_and_keyring(
382 daemon_spec.daemon_type,
383 daemon_spec.daemon_id,
384 host=daemon_spec.host,
385 keyring=daemon_spec.keyring,
386 extra_ceph_config=daemon_spec.ceph_conf)
387
388 if daemon_spec.config_get_files():
389 cephadm_config.update({'files': daemon_spec.config_get_files()})
390
391 return cephadm_config, []
392
393 def post_remove(self, daemon: DaemonDescription) -> None:
394 super().post_remove(daemon)
395 self.remove_keyring(daemon)
396
397 def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
398 """
399 Map the daemon id to a cephx keyring entity name
400 """
401 # despite this mapping entity names to daemons, self.TYPE within
402 # the CephService class refers to service types, not daemon types
403 if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
404 return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
405 elif self.TYPE == 'crash':
406 if host == "":
407 raise OrchestratorError("Host not provided to generate <crash> auth entity name")
408 return AuthEntity(f'client.{self.TYPE}.{host}')
409 elif self.TYPE == 'mon':
410 return AuthEntity('mon.')
411 elif self.TYPE in ['mgr', 'osd', 'mds']:
412 return AuthEntity(f'{self.TYPE}.{daemon_id}')
413 else:
414 raise OrchestratorError("unknown daemon type")
415
416 def get_config_and_keyring(self,
417 daemon_type: str,
418 daemon_id: str,
419 host: str,
420 keyring: Optional[str] = None,
421 extra_ceph_config: Optional[str] = None
422 ) -> Dict[str, Any]:
423 # keyring
424 if not keyring:
425 entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
426 ret, keyring, err = self.mgr.check_mon_command({
427 'prefix': 'auth get',
428 'entity': entity,
429 })
430
431 config = self.mgr.get_minimal_ceph_conf()
432
433 if extra_ceph_config:
434 config += extra_ceph_config
435
436 return {
437 'config': config,
438 'keyring': keyring,
439 }
440
441 def remove_keyring(self, daemon: DaemonDescription) -> None:
442 assert daemon.daemon_id is not None
443 assert daemon.hostname is not None
444 daemon_id: str = daemon.daemon_id
445 host: str = daemon.hostname
446
447 if daemon_id == 'mon':
448 # do not remove the mon keyring
449 return
450
451 entity = self.get_auth_entity(daemon_id, host=host)
452
453 logger.info(f'Removing key for {entity}')
454 ret, out, err = self.mgr.mon_command({
455 'prefix': 'auth rm',
456 'entity': entity,
457 })
458
459
460 class MonService(CephService):
461 TYPE = 'mon'
462
463 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
464 """
465 Create a new monitor on the given host.
466 """
467 assert self.TYPE == daemon_spec.daemon_type
468 name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
469
470 # get mon. key
471 ret, keyring, err = self.mgr.check_mon_command({
472 'prefix': 'auth get',
473 'entity': self.get_auth_entity(name),
474 })
475
476 extra_config = '[mon.%s]\n' % name
477 if network:
478 # infer whether this is a CIDR network, addrvec, or plain IP
479 if '/' in network:
480 extra_config += 'public network = %s\n' % network
481 elif network.startswith('[v') and network.endswith(']'):
482 extra_config += 'public addrv = %s\n' % network
483 elif is_ipv6(network):
484 extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
485 elif ':' not in network:
486 extra_config += 'public addr = %s\n' % network
487 else:
488 raise OrchestratorError(
489 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
490 else:
491 # try to get the public_network from the config
492 ret, network, err = self.mgr.check_mon_command({
493 'prefix': 'config get',
494 'who': 'mon',
495 'key': 'public_network',
496 })
497 network = network.strip() if network else network
498 if not network:
499 raise OrchestratorError(
500 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
501 if '/' not in network:
502 raise OrchestratorError(
503 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
504 extra_config += 'public network = %s\n' % network
505
506 daemon_spec.ceph_conf = extra_config
507 daemon_spec.keyring = keyring
508
509 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
510
511 return daemon_spec
512
513 def _check_safe_to_destroy(self, mon_id: str) -> None:
514 ret, out, err = self.mgr.check_mon_command({
515 'prefix': 'quorum_status',
516 })
517 try:
518 j = json.loads(out)
519 except Exception:
520 raise OrchestratorError('failed to parse quorum status')
521
522 mons = [m['name'] for m in j['monmap']['mons']]
523 if mon_id not in mons:
524 logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
525 mon_id, mons))
526 return
527 new_mons = [m for m in mons if m != mon_id]
528 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
529 if len(new_quorum) > len(new_mons) / 2:
530 logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
531 (mon_id, new_quorum, new_mons))
532 return
533 raise OrchestratorError(
534 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
535
536 def pre_remove(self, daemon: DaemonDescription) -> None:
537 super().pre_remove(daemon)
538
539 assert daemon.daemon_id is not None
540 daemon_id: str = daemon.daemon_id
541 self._check_safe_to_destroy(daemon_id)
542
543 # remove mon from quorum before we destroy the daemon
544 logger.info('Removing monitor %s from monmap...' % daemon_id)
545 ret, out, err = self.mgr.check_mon_command({
546 'prefix': 'mon rm',
547 'name': daemon_id,
548 })
549
550
551 class MgrService(CephService):
552 TYPE = 'mgr'
553
554 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
555 """
556 Create a new manager instance on a host.
557 """
558 assert self.TYPE == daemon_spec.daemon_type
559 mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
560
561 # get mgr. key
562 keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
563 ['mon', 'profile mgr',
564 'osd', 'allow *',
565 'mds', 'allow *'])
566
567 # Retrieve ports used by manager modules
568 # In the case of the dashboard port and with several manager daemons
569 # running in different hosts, it exists the possibility that the
570 # user has decided to use different dashboard ports in each server
571 # If this is the case then the dashboard port opened will be only the used
572 # as default.
573 ports = []
574 ret, mgr_services, err = self.mgr.check_mon_command({
575 'prefix': 'mgr services',
576 })
577 if mgr_services:
578 mgr_endpoints = json.loads(mgr_services)
579 for end_point in mgr_endpoints.values():
580 port = re.search(r'\:\d+\/', end_point)
581 if port:
582 ports.append(int(port[0][1:-1]))
583
584 if ports:
585 daemon_spec.ports = ports
586
587 daemon_spec.keyring = keyring
588
589 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
590
591 return daemon_spec
592
593 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
594 for daemon in daemon_descrs:
595 assert daemon.daemon_type is not None
596 assert daemon.daemon_id is not None
597 if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
598 return daemon
599 # if no active mgr found, return empty Daemon Desc
600 return DaemonDescription()
601
602 def fail_over(self) -> None:
603 if not self.mgr_map_has_standby():
604 raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
605 'daemon', 'mgr' + self.mgr.get_mgr_id()))
606
607 self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
608 'INFO', 'Failing over to other MGR')
609 logger.info('Failing over to other MGR')
610
611 # fail over
612 ret, out, err = self.mgr.check_mon_command({
613 'prefix': 'mgr fail',
614 'who': self.mgr.get_mgr_id(),
615 })
616
617 def mgr_map_has_standby(self) -> bool:
618 """
619 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
620 we know it joined the cluster
621 """
622 mgr_map = self.mgr.get('mgr_map')
623 num = len(mgr_map.get('standbys'))
624 return bool(num)
625
626 def ok_to_stop(
627 self,
628 daemon_ids: List[str],
629 force: bool = False,
630 known: Optional[List[str]] = None # output argument
631 ) -> HandleCommandResult:
632 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
633
634 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
635 if warn:
636 return HandleCommandResult(-errno.EBUSY, '', warn_message)
637
638 mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
639 active = self.get_active_daemon(mgr_daemons).daemon_id
640 if active in daemon_ids:
641 warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
642 return HandleCommandResult(-errno.EBUSY, '', warn_message)
643
644 return HandleCommandResult(0, warn_message, '')
645
646
647 class MdsService(CephService):
648 TYPE = 'mds'
649
650 def allow_colo(self) -> bool:
651 return True
652
653 def config(self, spec: ServiceSpec, daemon_id: str) -> None:
654 assert self.TYPE == spec.service_type
655 assert spec.service_id
656
657 # ensure mds_join_fs is set for these daemons
658 ret, out, err = self.mgr.check_mon_command({
659 'prefix': 'config set',
660 'who': 'mds.' + spec.service_id,
661 'name': 'mds_join_fs',
662 'value': spec.service_id,
663 })
664
665 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
666 assert self.TYPE == daemon_spec.daemon_type
667 mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
668
669 # get mds. key
670 keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
671 ['mon', 'profile mds',
672 'osd', 'allow rw tag cephfs *=*',
673 'mds', 'allow'])
674 daemon_spec.keyring = keyring
675
676 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
677
678 return daemon_spec
679
680 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
681 active_mds_strs = list()
682 for fs in self.mgr.get('fs_map')['filesystems']:
683 mds_map = fs['mdsmap']
684 if mds_map is not None:
685 for mds_id, mds_status in mds_map['info'].items():
686 if mds_status['state'] == 'up:active':
687 active_mds_strs.append(mds_status['name'])
688 if len(active_mds_strs) != 0:
689 for daemon in daemon_descrs:
690 if daemon.daemon_id in active_mds_strs:
691 return daemon
692 # if no mds found, return empty Daemon Desc
693 return DaemonDescription()
694
695 def purge(self, service_name: str) -> None:
696 self.mgr.check_mon_command({
697 'prefix': 'config rm',
698 'who': service_name,
699 'name': 'mds_join_fs',
700 })
701
702
703 class RgwService(CephService):
704 TYPE = 'rgw'
705
706 def allow_colo(self) -> bool:
707 return True
708
709 def config(self, spec: RGWSpec, rgw_id: str) -> None: # type: ignore
710 assert self.TYPE == spec.service_type
711
712 # set rgw_realm and rgw_zone, if present
713 if spec.rgw_realm:
714 ret, out, err = self.mgr.check_mon_command({
715 'prefix': 'config set',
716 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
717 'name': 'rgw_realm',
718 'value': spec.rgw_realm,
719 })
720 if spec.rgw_zone:
721 ret, out, err = self.mgr.check_mon_command({
722 'prefix': 'config set',
723 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
724 'name': 'rgw_zone',
725 'value': spec.rgw_zone,
726 })
727
728 if spec.rgw_frontend_ssl_certificate:
729 if isinstance(spec.rgw_frontend_ssl_certificate, list):
730 cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
731 elif isinstance(spec.rgw_frontend_ssl_certificate, str):
732 cert_data = spec.rgw_frontend_ssl_certificate
733 else:
734 raise OrchestratorError(
735 'Invalid rgw_frontend_ssl_certificate: %s'
736 % spec.rgw_frontend_ssl_certificate)
737 ret, out, err = self.mgr.check_mon_command({
738 'prefix': 'config-key set',
739 'key': f'rgw/cert/{spec.service_name()}',
740 'val': cert_data,
741 })
742
743 # TODO: fail, if we don't have a spec
744 logger.info('Saving service %s spec with placement %s' % (
745 spec.service_name(), spec.placement.pretty_str()))
746 self.mgr.spec_store.save(spec)
747
748 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
749 assert self.TYPE == daemon_spec.daemon_type
750 rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
751 spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
752
753 keyring = self.get_keyring(rgw_id)
754
755 if daemon_spec.ports:
756 port = daemon_spec.ports[0]
757 else:
758 # this is a redeploy of older instance that doesn't have an explicitly
759 # assigned port, in which case we can assume there is only 1 per host
760 # and it matches the spec.
761 port = spec.get_port()
762
763 # configure frontend
764 args = []
765 ftype = spec.rgw_frontend_type or "beast"
766 if ftype == 'beast':
767 if spec.ssl:
768 if daemon_spec.ip:
769 args.append(f"ssl_endpoint={daemon_spec.ip}:{port}")
770 else:
771 args.append(f"ssl_port={port}")
772 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
773 else:
774 if daemon_spec.ip:
775 args.append(f"endpoint={daemon_spec.ip}:{port}")
776 else:
777 args.append(f"port={port}")
778 elif ftype == 'civetweb':
779 if spec.ssl:
780 if daemon_spec.ip:
781 args.append(f"port={daemon_spec.ip}:{port}s") # note the 's' suffix on port
782 else:
783 args.append(f"port={port}s") # note the 's' suffix on port
784 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
785 else:
786 if daemon_spec.ip:
787 args.append(f"port={daemon_spec.ip}:{port}")
788 else:
789 args.append(f"port={port}")
790 frontend = f'{ftype} {" ".join(args)}'
791
792 ret, out, err = self.mgr.check_mon_command({
793 'prefix': 'config set',
794 'who': utils.name_to_config_section(daemon_spec.name()),
795 'name': 'rgw_frontends',
796 'value': frontend
797 })
798
799 daemon_spec.keyring = keyring
800 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
801
802 return daemon_spec
803
804 def get_keyring(self, rgw_id: str) -> str:
805 keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
806 ['mon', 'allow *',
807 'mgr', 'allow rw',
808 'osd', 'allow rwx tag rgw *=*'])
809 return keyring
810
811 def purge(self, service_name: str) -> None:
812 self.mgr.check_mon_command({
813 'prefix': 'config rm',
814 'who': utils.name_to_config_section(service_name),
815 'name': 'rgw_realm',
816 })
817 self.mgr.check_mon_command({
818 'prefix': 'config rm',
819 'who': utils.name_to_config_section(service_name),
820 'name': 'rgw_zone',
821 })
822 self.mgr.check_mon_command({
823 'prefix': 'config-key rm',
824 'key': f'rgw/cert/{service_name}',
825 })
826
827 def post_remove(self, daemon: DaemonDescription) -> None:
828 super().post_remove(daemon)
829 self.mgr.check_mon_command({
830 'prefix': 'config rm',
831 'who': utils.name_to_config_section(daemon.name()),
832 'name': 'rgw_frontends',
833 })
834
835 def ok_to_stop(
836 self,
837 daemon_ids: List[str],
838 force: bool = False,
839 known: Optional[List[str]] = None # output argument
840 ) -> HandleCommandResult:
841 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
842 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
843 def ingress_present() -> bool:
844 running_ingress_daemons = [
845 daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
846 running_haproxy_daemons = [
847 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
848 running_keepalived_daemons = [
849 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
850 # check that there is at least one haproxy and keepalived daemon running
851 if running_haproxy_daemons and running_keepalived_daemons:
852 return True
853 return False
854
855 # if only 1 rgw, alert user (this is not passable with --force)
856 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
857 if warn:
858 return HandleCommandResult(-errno.EBUSY, '', warn_message)
859
860 # if reached here, there is > 1 rgw daemon.
861 # Say okay if load balancer present or force flag set
862 if ingress_present() or force:
863 return HandleCommandResult(0, warn_message, '')
864
865 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
866 # Provide warning
867 warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
868 return HandleCommandResult(-errno.EBUSY, '', warn_message)
869
870
871 class RbdMirrorService(CephService):
872 TYPE = 'rbd-mirror'
873
874 def allow_colo(self) -> bool:
875 return True
876
877 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
878 assert self.TYPE == daemon_spec.daemon_type
879 daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
880
881 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
882 ['mon', 'profile rbd-mirror',
883 'osd', 'profile rbd'])
884
885 daemon_spec.keyring = keyring
886
887 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
888
889 return daemon_spec
890
891 def ok_to_stop(
892 self,
893 daemon_ids: List[str],
894 force: bool = False,
895 known: Optional[List[str]] = None # output argument
896 ) -> HandleCommandResult:
897 # if only 1 rbd-mirror, alert user (this is not passable with --force)
898 warn, warn_message = self._enough_daemons_to_stop(
899 self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
900 if warn:
901 return HandleCommandResult(-errno.EBUSY, '', warn_message)
902 return HandleCommandResult(0, warn_message, '')
903
904
905 class CrashService(CephService):
906 TYPE = 'crash'
907
908 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
909 assert self.TYPE == daemon_spec.daemon_type
910 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
911
912 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
913 ['mon', 'profile crash',
914 'mgr', 'profile crash'])
915
916 daemon_spec.keyring = keyring
917
918 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
919
920 return daemon_spec
921
922
923 class CephfsMirrorService(CephService):
924 TYPE = 'cephfs-mirror'
925
926 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
927 assert self.TYPE == daemon_spec.daemon_type
928
929 ret, keyring, err = self.mgr.check_mon_command({
930 'prefix': 'auth get-or-create',
931 'entity': self.get_auth_entity(daemon_spec.daemon_id),
932 'caps': ['mon', 'allow r',
933 'mds', 'allow r',
934 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
935 'mgr', 'allow r'],
936 })
937
938 daemon_spec.keyring = keyring
939 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
940 return daemon_spec