5 from abc
import ABCMeta
, abstractmethod
6 from typing
import TYPE_CHECKING
, List
, Callable
, TypeVar
, \
7 Optional
, Dict
, Any
, Tuple
, NewType
, cast
9 from mgr_module
import HandleCommandResult
, MonCommandFailed
11 from ceph
.deployment
.service_spec
import ServiceSpec
, RGWSpec
12 from ceph
.deployment
.utils
import is_ipv6
, unwrap_ipv6
13 from mgr_util
import build_url
14 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
15 from orchestrator
._interface
import daemon_type_to_service
16 from cephadm
import utils
19 from cephadm
.module
import CephadmOrchestrator
21 logger
= logging
.getLogger(__name__
)
23 ServiceSpecs
= TypeVar('ServiceSpecs', bound
=ServiceSpec
)
24 AuthEntity
= NewType('AuthEntity', str)
27 class CephadmDaemonDeploySpec
:
28 # typing.NamedTuple + Generic is broken in py36
29 def __init__(self
, host
: str, daemon_id
: str,
31 network
: Optional
[str] = None,
32 keyring
: Optional
[str] = None,
33 extra_args
: Optional
[List
[str]] = None,
35 extra_files
: Optional
[Dict
[str, Any
]] = None,
36 daemon_type
: Optional
[str] = None,
37 ip
: Optional
[str] = None,
38 ports
: Optional
[List
[int]] = None,
39 rank
: Optional
[int] = None,
40 rank_generation
: Optional
[int] = None,
41 extra_container_args
: Optional
[List
[str]] = None):
43 A data struction to encapsulate `cephadm deploy ...
46 self
.daemon_id
= daemon_id
47 self
.service_name
= service_name
48 daemon_type
= daemon_type
or (service_name
.split('.')[0])
49 assert daemon_type
is not None
50 self
.daemon_type
: str = daemon_type
53 self
.network
= network
56 self
.keyring
: Optional
[str] = keyring
58 # For run_cephadm. Would be great to have more expressive names.
59 self
.extra_args
: List
[str] = extra_args
or []
61 self
.ceph_conf
= ceph_conf
62 self
.extra_files
= extra_files
or {}
64 # TCP ports used by the daemon
65 self
.ports
: List
[int] = ports
or []
66 self
.ip
: Optional
[str] = ip
68 # values to be populated during generate_config calls
69 # and then used in _run_cephadm
70 self
.final_config
: Dict
[str, Any
] = {}
71 self
.deps
: List
[str] = []
73 self
.rank
: Optional
[int] = rank
74 self
.rank_generation
: Optional
[int] = rank_generation
76 self
.extra_container_args
= extra_container_args
78 def name(self
) -> str:
79 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
81 def config_get_files(self
) -> Dict
[str, Any
]:
82 files
= self
.extra_files
84 files
['config'] = self
.ceph_conf
89 def from_daemon_description(dd
: DaemonDescription
) -> 'CephadmDaemonDeploySpec':
93 return CephadmDaemonDeploySpec(
95 daemon_id
=dd
.daemon_id
,
96 daemon_type
=dd
.daemon_type
,
97 service_name
=dd
.service_name(),
101 rank_generation
=dd
.rank_generation
,
102 extra_container_args
=dd
.extra_container_args
,
105 def to_daemon_description(self
, status
: DaemonDescriptionStatus
, status_desc
: str) -> DaemonDescription
:
106 return DaemonDescription(
107 daemon_type
=self
.daemon_type
,
108 daemon_id
=self
.daemon_id
,
109 service_name
=self
.service_name
,
112 status_desc
=status_desc
,
116 rank_generation
=self
.rank_generation
,
117 extra_container_args
=self
.extra_container_args
,
121 class CephadmService(metaclass
=ABCMeta
):
123 Base class for service types. Often providing a create() and config() fn.
128 def TYPE(self
) -> str:
131 def __init__(self
, mgr
: "CephadmOrchestrator"):
132 self
.mgr
: "CephadmOrchestrator" = mgr
134 def allow_colo(self
) -> bool:
136 Return True if multiple daemons of the same type can colocate on
141 def primary_daemon_type(self
) -> str:
143 This is the type of the primary (usually only) daemon to be deployed.
147 def per_host_daemon_type(self
) -> Optional
[str]:
149 If defined, this type of daemon will be deployed once for each host
150 containing one or more daemons of the primary type.
154 def ranked(self
) -> bool:
156 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
157 generation (0, 1, ...) to each daemon we create/deploy.
161 def fence_old_ranks(self
,
163 rank_map
: Dict
[int, Dict
[int, Optional
[str]]],
164 num_ranks
: int) -> None:
167 def make_daemon_spec(
173 daemon_type
: Optional
[str] = None,
174 ports
: Optional
[List
[int]] = None,
175 ip
: Optional
[str] = None,
176 rank
: Optional
[int] = None,
177 rank_generation
: Optional
[int] = None,
178 ) -> CephadmDaemonDeploySpec
:
180 eca
= spec
.extra_container_args
181 except AttributeError:
183 return CephadmDaemonDeploySpec(
186 service_name
=spec
.service_name(),
188 daemon_type
=daemon_type
,
192 rank_generation
=rank_generation
,
193 extra_container_args
=eca
,
196 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
197 raise NotImplementedError()
199 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
200 raise NotImplementedError()
202 def config(self
, spec
: ServiceSpec
) -> None:
204 Configure the cluster for this service. Only called *once* per
205 service apply. Not for every daemon.
209 def daemon_check_post(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
210 """The post actions needed to be done after daemons are checked"""
211 if self
.mgr
.config_dashboard
:
212 if 'dashboard' in self
.mgr
.get('mgr_map')['modules']:
213 self
.config_dashboard(daemon_descrs
)
215 logger
.debug('Dashboard is not enabled. Skip configuration.')
217 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
218 """Config dashboard settings."""
219 raise NotImplementedError()
221 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
222 # if this is called for a service type where it hasn't explcitly been
223 # defined, return empty Daemon Desc
224 return DaemonDescription()
226 def get_keyring_with_caps(self
, entity
: AuthEntity
, caps
: List
[str]) -> str:
227 ret
, keyring
, err
= self
.mgr
.mon_command({
228 'prefix': 'auth get-or-create',
233 ret
, out
, err
= self
.mgr
.mon_command({
234 'prefix': 'auth caps',
239 self
.mgr
.log
.warning(f
"Unable to update caps for {entity}")
242 def _inventory_get_addr(self
, hostname
: str) -> str:
243 """Get a host's address with its hostname."""
244 return self
.mgr
.inventory
.get_addr(hostname
)
246 def _set_service_url_on_dashboard(self
,
250 service_url
: str) -> None:
251 """A helper to get and set service_url via Dashboard's MON command.
253 If result of get_mon_cmd differs from service_url, set_mon_cmd will
254 be sent to set the service_url.
256 def get_set_cmd_dicts(out
: str) -> List
[dict]:
258 'prefix': set_mon_cmd
,
261 return [cmd_dict
] if service_url
!= out
else []
263 self
._check
_and
_set
_dashboard
(
264 service_name
=service_name
,
266 get_set_cmd_dicts
=get_set_cmd_dicts
269 def _check_and_set_dashboard(self
,
272 get_set_cmd_dicts
: Callable
[[str], List
[dict]]) -> None:
273 """A helper to set configs in the Dashboard.
275 The method is useful for the pattern:
276 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
278 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
279 - Determine if the config need to be update. NOTE: This step is important because if a
280 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
281 kicks the serve() loop and the logic using this method is likely to be called again.
282 A config should be updated only when needed.
283 - Update a config in Dashboard by using a Dashboard command.
285 :param service_name: the service name to be used for logging
286 :type service_name: str
287 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
289 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
293 'prefix': 'dashboard iscsi-gateway-add',
294 'service_url': 'http://admin:admin@aaa:5000',
298 'prefix': 'dashboard iscsi-gateway-add',
299 'service_url': 'http://admin:admin@bbb:5000',
303 The function should return empty list if no command need to be sent.
304 :type get_set_cmd_dicts: Callable[[str], List[dict]]
308 _
, out
, _
= self
.mgr
.check_mon_command({
311 except MonCommandFailed
as e
:
312 logger
.warning('Failed to get Dashboard config for %s: %s', service_name
, e
)
314 cmd_dicts
= get_set_cmd_dicts(out
.strip())
315 for cmd_dict
in list(cmd_dicts
):
317 inbuf
= cmd_dict
.pop('inbuf', None)
318 _
, out
, _
= self
.mgr
.check_mon_command(cmd_dict
, inbuf
)
319 except MonCommandFailed
as e
:
320 logger
.warning('Failed to set Dashboard config for %s: %s', service_name
, e
)
325 known
: Optional
[List
[str]] = None, # output argument
326 force
: bool = False) -> HandleCommandResult
:
327 r
= HandleCommandResult(*self
.mgr
.mon_command({
328 'prefix': "osd ok-to-stop",
334 j
= json
.loads(r
.stdout
)
335 except json
.decoder
.JSONDecodeError
:
336 self
.mgr
.log
.warning("osd ok-to-stop didn't return structured result")
340 if known
is not None and j
and j
.get('ok_to_stop'):
341 self
.mgr
.log
.debug(f
"got {j}")
342 known
.extend([f
'osd.{x}' for x
in j
.get('osds', [])])
343 return HandleCommandResult(
345 f
'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
351 daemon_ids
: List
[str],
353 known
: Optional
[List
[str]] = None # output argument
354 ) -> HandleCommandResult
:
355 names
= [f
'{self.TYPE}.{d_id}' for d_id
in daemon_ids
]
356 out
= f
'It appears safe to stop {",".join(names)}'
357 err
= f
'It is NOT safe to stop {",".join(names)} at this time'
359 if self
.TYPE
not in ['mon', 'osd', 'mds']:
361 return HandleCommandResult(0, out
)
363 if self
.TYPE
== 'osd':
364 return self
.ok_to_stop_osd(daemon_ids
, known
, force
)
366 r
= HandleCommandResult(*self
.mgr
.mon_command({
367 'prefix': f
'{self.TYPE} ok-to-stop',
372 err
= f
'{err}: {r.stderr}' if r
.stderr
else err
374 return HandleCommandResult(r
.retval
, r
.stdout
, err
)
376 out
= f
'{out}: {r.stdout}' if r
.stdout
else out
378 return HandleCommandResult(r
.retval
, out
, r
.stderr
)
380 def _enough_daemons_to_stop(self
, daemon_type
: str, daemon_ids
: List
[str], service
: str, low_limit
: int, alert
: bool = False) -> Tuple
[bool, str]:
381 # Provides a warning about if it possible or not to stop <n> daemons in a service
382 names
= [f
'{daemon_type}.{d_id}' for d_id
in daemon_ids
]
383 number_of_running_daemons
= len(
385 for daemon
in self
.mgr
.cache
.get_daemons_by_type(daemon_type
)
386 if daemon
.status
== DaemonDescriptionStatus
.running
])
387 if (number_of_running_daemons
- len(daemon_ids
)) >= low_limit
:
388 return False, f
'It is presumed safe to stop {names}'
390 num_daemons_left
= number_of_running_daemons
- len(daemon_ids
)
392 def plural(count
: int) -> str:
393 return 'daemon' if count
== 1 else 'daemons'
395 left_count
= "no" if num_daemons_left
== 0 else num_daemons_left
398 out
= (f
'ALERT: Cannot stop {names} in {service} service. '
399 f
'Not enough remaining {service} daemons. '
400 f
'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
402 out
= (f
'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
403 f
'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
404 f
'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
407 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
409 Called before the daemon is removed.
411 assert daemon
.daemon_type
is not None
412 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
413 logger
.debug(f
'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
415 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
417 Called after the daemon is removed.
419 assert daemon
.daemon_type
is not None
420 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
421 logger
.debug(f
'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
423 def purge(self
, service_name
: str) -> None:
424 """Called to carry out any purge tasks following service removal"""
425 logger
.debug(f
'Purge called for {self.TYPE} - no action taken')
428 class CephService(CephadmService
):
429 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
430 # Ceph.daemons (mon, mgr, mds, osd, etc)
431 cephadm_config
= self
.get_config_and_keyring(
432 daemon_spec
.daemon_type
,
433 daemon_spec
.daemon_id
,
434 host
=daemon_spec
.host
,
435 keyring
=daemon_spec
.keyring
,
436 extra_ceph_config
=daemon_spec
.ceph_conf
)
438 if daemon_spec
.config_get_files():
439 cephadm_config
.update({'files': daemon_spec
.config_get_files()})
441 return cephadm_config
, []
443 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
444 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
445 self
.remove_keyring(daemon
)
447 def get_auth_entity(self
, daemon_id
: str, host
: str = "") -> AuthEntity
:
449 Map the daemon id to a cephx keyring entity name
451 # despite this mapping entity names to daemons, self.TYPE within
452 # the CephService class refers to service types, not daemon types
453 if self
.TYPE
in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
454 return AuthEntity(f
'client.{self.TYPE}.{daemon_id}')
455 elif self
.TYPE
in ['crash', 'agent']:
457 raise OrchestratorError(
458 f
'Host not provided to generate <{self.TYPE}> auth entity name')
459 return AuthEntity(f
'client.{self.TYPE}.{host}')
460 elif self
.TYPE
== 'mon':
461 return AuthEntity('mon.')
462 elif self
.TYPE
in ['mgr', 'osd', 'mds']:
463 return AuthEntity(f
'{self.TYPE}.{daemon_id}')
465 raise OrchestratorError("unknown daemon type")
467 def get_config_and_keyring(self
,
471 keyring
: Optional
[str] = None,
472 extra_ceph_config
: Optional
[str] = None
476 entity
: AuthEntity
= self
.get_auth_entity(daemon_id
, host
=host
)
477 ret
, keyring
, err
= self
.mgr
.check_mon_command({
478 'prefix': 'auth get',
482 config
= self
.mgr
.get_minimal_ceph_conf()
484 if extra_ceph_config
:
485 config
+= extra_ceph_config
492 def remove_keyring(self
, daemon
: DaemonDescription
) -> None:
493 assert daemon
.daemon_id
is not None
494 assert daemon
.hostname
is not None
495 daemon_id
: str = daemon
.daemon_id
496 host
: str = daemon
.hostname
498 assert daemon
.daemon_type
!= 'mon'
500 entity
= self
.get_auth_entity(daemon_id
, host
=host
)
502 logger
.info(f
'Removing key for {entity}')
503 ret
, out
, err
= self
.mgr
.mon_command({
509 class MonService(CephService
):
512 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
514 Create a new monitor on the given host.
516 assert self
.TYPE
== daemon_spec
.daemon_type
517 name
, _
, network
= daemon_spec
.daemon_id
, daemon_spec
.host
, daemon_spec
.network
520 ret
, keyring
, err
= self
.mgr
.check_mon_command({
521 'prefix': 'auth get',
522 'entity': self
.get_auth_entity(name
),
525 extra_config
= '[mon.%s]\n' % name
527 # infer whether this is a CIDR network, addrvec, or plain IP
529 extra_config
+= 'public network = %s\n' % network
530 elif network
.startswith('[v') and network
.endswith(']'):
531 extra_config
+= 'public addrv = %s\n' % network
532 elif is_ipv6(network
):
533 extra_config
+= 'public addr = %s\n' % unwrap_ipv6(network
)
534 elif ':' not in network
:
535 extra_config
+= 'public addr = %s\n' % network
537 raise OrchestratorError(
538 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network
)
540 # try to get the public_network from the config
541 ret
, network
, err
= self
.mgr
.check_mon_command({
542 'prefix': 'config get',
544 'key': 'public_network',
546 network
= network
.strip() if network
else network
548 raise OrchestratorError(
549 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
550 if '/' not in network
:
551 raise OrchestratorError(
552 'public_network is set but does not look like a CIDR network: \'%s\'' % network
)
553 extra_config
+= 'public network = %s\n' % network
555 daemon_spec
.ceph_conf
= extra_config
556 daemon_spec
.keyring
= keyring
558 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
562 def _check_safe_to_destroy(self
, mon_id
: str) -> None:
563 ret
, out
, err
= self
.mgr
.check_mon_command({
564 'prefix': 'quorum_status',
569 raise OrchestratorError('failed to parse quorum status')
571 mons
= [m
['name'] for m
in j
['monmap']['mons']]
572 if mon_id
not in mons
:
573 logger
.info('Safe to remove mon.%s: not in monmap (%s)' % (
576 new_mons
= [m
for m
in mons
if m
!= mon_id
]
577 new_quorum
= [m
for m
in j
['quorum_names'] if m
!= mon_id
]
578 if len(new_quorum
) > len(new_mons
) / 2:
579 logger
.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
580 (mon_id
, new_quorum
, new_mons
))
582 raise OrchestratorError(
583 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id
, new_quorum
, new_mons
))
585 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
586 super().pre_remove(daemon
)
588 assert daemon
.daemon_id
is not None
589 daemon_id
: str = daemon
.daemon_id
590 self
._check
_safe
_to
_destroy
(daemon_id
)
592 # remove mon from quorum before we destroy the daemon
593 logger
.info('Removing monitor %s from monmap...' % daemon_id
)
594 ret
, out
, err
= self
.mgr
.check_mon_command({
599 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
600 # Do not remove the mon keyring.
601 # super().post_remove(daemon)
605 class MgrService(CephService
):
608 def allow_colo(self
) -> bool:
609 if self
.mgr
.get_ceph_option('mgr_standby_modules'):
610 # traditional mgr mode: standby daemons' modules listen on
611 # ports and redirect to the primary. we must not schedule
612 # multiple mgrs on the same host or else ports will
616 # standby daemons do nothing, and therefore port conflicts
620 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
622 Create a new manager instance on a host.
624 assert self
.TYPE
== daemon_spec
.daemon_type
625 mgr_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
628 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mgr_id
),
629 ['mon', 'profile mgr',
633 # Retrieve ports used by manager modules
634 # In the case of the dashboard port and with several manager daemons
635 # running in different hosts, it exists the possibility that the
636 # user has decided to use different dashboard ports in each server
637 # If this is the case then the dashboard port opened will be only the used
640 ret
, mgr_services
, err
= self
.mgr
.check_mon_command({
641 'prefix': 'mgr services',
644 mgr_endpoints
= json
.loads(mgr_services
)
645 for end_point
in mgr_endpoints
.values():
646 port
= re
.search(r
'\:\d+\/', end_point
)
648 ports
.append(int(port
[0][1:-1]))
651 daemon_spec
.ports
= ports
653 daemon_spec
.keyring
= keyring
655 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
659 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
660 for daemon
in daemon_descrs
:
661 assert daemon
.daemon_type
is not None
662 assert daemon
.daemon_id
is not None
663 if self
.mgr
.daemon_is_self(daemon
.daemon_type
, daemon
.daemon_id
):
665 # if no active mgr found, return empty Daemon Desc
666 return DaemonDescription()
668 def fail_over(self
) -> None:
669 if not self
.mgr_map_has_standby():
670 raise OrchestratorError('Need standby mgr daemon', event_kind_subject
=(
671 'daemon', 'mgr' + self
.mgr
.get_mgr_id()))
673 self
.mgr
.events
.for_daemon('mgr' + self
.mgr
.get_mgr_id(),
674 'INFO', 'Failing over to other MGR')
675 logger
.info('Failing over to other MGR')
678 ret
, out
, err
= self
.mgr
.check_mon_command({
679 'prefix': 'mgr fail',
680 'who': self
.mgr
.get_mgr_id(),
683 def mgr_map_has_standby(self
) -> bool:
685 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
686 we know it joined the cluster
688 mgr_map
= self
.mgr
.get('mgr_map')
689 num
= len(mgr_map
.get('standbys'))
694 daemon_ids
: List
[str],
696 known
: Optional
[List
[str]] = None # output argument
697 ) -> HandleCommandResult
:
698 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
700 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'Mgr', 1, True)
702 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
704 mgr_daemons
= self
.mgr
.cache
.get_daemons_by_type(self
.TYPE
)
705 active
= self
.get_active_daemon(mgr_daemons
).daemon_id
706 if active
in daemon_ids
:
707 warn_message
= 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
708 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
710 return HandleCommandResult(0, warn_message
, '')
713 class MdsService(CephService
):
716 def allow_colo(self
) -> bool:
719 def config(self
, spec
: ServiceSpec
) -> None:
720 assert self
.TYPE
== spec
.service_type
721 assert spec
.service_id
723 # ensure mds_join_fs is set for these daemons
724 ret
, out
, err
= self
.mgr
.check_mon_command({
725 'prefix': 'config set',
726 'who': 'mds.' + spec
.service_id
,
727 'name': 'mds_join_fs',
728 'value': spec
.service_id
,
731 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
732 assert self
.TYPE
== daemon_spec
.daemon_type
733 mds_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
736 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mds_id
),
737 ['mon', 'profile mds',
738 'osd', 'allow rw tag cephfs *=*',
740 daemon_spec
.keyring
= keyring
742 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
746 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
747 active_mds_strs
= list()
748 for fs
in self
.mgr
.get('fs_map')['filesystems']:
749 mds_map
= fs
['mdsmap']
750 if mds_map
is not None:
751 for mds_id
, mds_status
in mds_map
['info'].items():
752 if mds_status
['state'] == 'up:active':
753 active_mds_strs
.append(mds_status
['name'])
754 if len(active_mds_strs
) != 0:
755 for daemon
in daemon_descrs
:
756 if daemon
.daemon_id
in active_mds_strs
:
758 # if no mds found, return empty Daemon Desc
759 return DaemonDescription()
761 def purge(self
, service_name
: str) -> None:
762 self
.mgr
.check_mon_command({
763 'prefix': 'config rm',
765 'name': 'mds_join_fs',
769 class RgwService(CephService
):
772 def allow_colo(self
) -> bool:
775 def config(self
, spec
: RGWSpec
) -> None: # type: ignore
776 assert self
.TYPE
== spec
.service_type
778 # set rgw_realm and rgw_zone, if present
780 ret
, out
, err
= self
.mgr
.check_mon_command({
781 'prefix': 'config set',
782 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
784 'value': spec
.rgw_realm
,
787 ret
, out
, err
= self
.mgr
.check_mon_command({
788 'prefix': 'config set',
789 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
791 'value': spec
.rgw_zone
,
794 if spec
.rgw_frontend_ssl_certificate
:
795 if isinstance(spec
.rgw_frontend_ssl_certificate
, list):
796 cert_data
= '\n'.join(spec
.rgw_frontend_ssl_certificate
)
797 elif isinstance(spec
.rgw_frontend_ssl_certificate
, str):
798 cert_data
= spec
.rgw_frontend_ssl_certificate
800 raise OrchestratorError(
801 'Invalid rgw_frontend_ssl_certificate: %s'
802 % spec
.rgw_frontend_ssl_certificate
)
803 ret
, out
, err
= self
.mgr
.check_mon_command({
804 'prefix': 'config-key set',
805 'key': f
'rgw/cert/{spec.service_name()}',
809 # TODO: fail, if we don't have a spec
810 logger
.info('Saving service %s spec with placement %s' % (
811 spec
.service_name(), spec
.placement
.pretty_str()))
812 self
.mgr
.spec_store
.save(spec
)
813 self
.mgr
.trigger_connect_dashboard_rgw()
815 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
816 assert self
.TYPE
== daemon_spec
.daemon_type
817 rgw_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
818 spec
= cast(RGWSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
820 keyring
= self
.get_keyring(rgw_id
)
822 if daemon_spec
.ports
:
823 port
= daemon_spec
.ports
[0]
825 # this is a redeploy of older instance that doesn't have an explicitly
826 # assigned port, in which case we can assume there is only 1 per host
827 # and it matches the spec.
828 port
= spec
.get_port()
832 ftype
= spec
.rgw_frontend_type
or "beast"
837 f
"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
839 args
.append(f
"ssl_port={port}")
840 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
843 args
.append(f
"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
845 args
.append(f
"port={port}")
846 elif ftype
== 'civetweb':
849 # note the 's' suffix on port
850 args
.append(f
"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
852 args
.append(f
"port={port}s") # note the 's' suffix on port
853 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
856 args
.append(f
"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
858 args
.append(f
"port={port}")
859 frontend
= f
'{ftype} {" ".join(args)}'
861 ret
, out
, err
= self
.mgr
.check_mon_command({
862 'prefix': 'config set',
863 'who': utils
.name_to_config_section(daemon_spec
.name()),
864 'name': 'rgw_frontends',
868 daemon_spec
.keyring
= keyring
869 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
873 def get_keyring(self
, rgw_id
: str) -> str:
874 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(rgw_id
),
877 'osd', 'allow rwx tag rgw *=*'])
880 def purge(self
, service_name
: str) -> None:
881 self
.mgr
.check_mon_command({
882 'prefix': 'config rm',
883 'who': utils
.name_to_config_section(service_name
),
886 self
.mgr
.check_mon_command({
887 'prefix': 'config rm',
888 'who': utils
.name_to_config_section(service_name
),
891 self
.mgr
.check_mon_command({
892 'prefix': 'config-key rm',
893 'key': f
'rgw/cert/{service_name}',
895 self
.mgr
.trigger_connect_dashboard_rgw()
897 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
898 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
899 self
.mgr
.check_mon_command({
900 'prefix': 'config rm',
901 'who': utils
.name_to_config_section(daemon
.name()),
902 'name': 'rgw_frontends',
907 daemon_ids
: List
[str],
909 known
: Optional
[List
[str]] = None # output argument
910 ) -> HandleCommandResult
:
911 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
912 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
913 def ingress_present() -> bool:
914 running_ingress_daemons
= [
915 daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type('ingress') if daemon
.status
== 1]
916 running_haproxy_daemons
= [
917 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'haproxy']
918 running_keepalived_daemons
= [
919 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'keepalived']
920 # check that there is at least one haproxy and keepalived daemon running
921 if running_haproxy_daemons
and running_keepalived_daemons
:
925 # if only 1 rgw, alert user (this is not passable with --force)
926 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'RGW', 1, True)
928 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
930 # if reached here, there is > 1 rgw daemon.
931 # Say okay if load balancer present or force flag set
932 if ingress_present() or force
:
933 return HandleCommandResult(0, warn_message
, '')
935 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
937 warn_message
= "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
938 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
940 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
941 self
.mgr
.trigger_connect_dashboard_rgw()
944 class RbdMirrorService(CephService
):
947 def allow_colo(self
) -> bool:
950 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
951 assert self
.TYPE
== daemon_spec
.daemon_type
952 daemon_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
954 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
),
955 ['mon', 'profile rbd-mirror',
956 'osd', 'profile rbd'])
958 daemon_spec
.keyring
= keyring
960 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
966 daemon_ids
: List
[str],
968 known
: Optional
[List
[str]] = None # output argument
969 ) -> HandleCommandResult
:
970 # if only 1 rbd-mirror, alert user (this is not passable with --force)
971 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(
972 self
.TYPE
, daemon_ids
, 'Rbdmirror', 1, True)
974 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
975 return HandleCommandResult(0, warn_message
, '')
978 class CrashService(CephService
):
981 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
982 assert self
.TYPE
== daemon_spec
.daemon_type
983 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
985 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
),
986 ['mon', 'profile crash',
987 'mgr', 'profile crash'])
989 daemon_spec
.keyring
= keyring
991 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
996 class CephfsMirrorService(CephService
):
997 TYPE
= 'cephfs-mirror'
999 def config(self
, spec
: ServiceSpec
) -> None:
1000 # make sure mirroring module is enabled
1001 mgr_map
= self
.mgr
.get('mgr_map')
1002 mod_name
= 'mirroring'
1003 if mod_name
not in mgr_map
.get('services', {}):
1004 self
.mgr
.check_mon_command({
1005 'prefix': 'mgr module enable',
1008 # we shouldn't get here (mon will tell the mgr to respawn), but no
1009 # harm done if we do.
1011 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1012 assert self
.TYPE
== daemon_spec
.daemon_type
1014 ret
, keyring
, err
= self
.mgr
.check_mon_command({
1015 'prefix': 'auth get-or-create',
1016 'entity': self
.get_auth_entity(daemon_spec
.daemon_id
),
1017 'caps': ['mon', 'profile cephfs-mirror',
1019 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
1023 daemon_spec
.keyring
= keyring
1024 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1028 class CephadmAgent(CephService
):
1031 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1032 assert self
.TYPE
== daemon_spec
.daemon_type
1033 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
1035 if not self
.mgr
.cherrypy_thread
:
1036 raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
1038 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
), [])
1039 daemon_spec
.keyring
= keyring
1040 self
.mgr
.agent_cache
.agent_keys
[host
] = keyring
1042 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1046 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
1048 assert self
.mgr
.cherrypy_thread
1049 assert self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert()
1050 assert self
.mgr
.cherrypy_thread
.server_port
1052 raise OrchestratorError(
1053 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
1055 cfg
= {'target_ip': self
.mgr
.get_mgr_ip(),
1056 'target_port': self
.mgr
.cherrypy_thread
.server_port
,
1057 'refresh_period': self
.mgr
.agent_refresh_rate
,
1058 'listener_port': self
.mgr
.agent_starting_port
,
1059 'host': daemon_spec
.host
,
1060 'device_enhanced_scan': str(self
.mgr
.device_enhanced_scan
)}
1062 listener_cert
, listener_key
= self
.mgr
.cherrypy_thread
.ssl_certs
.generate_cert(
1063 self
.mgr
.inventory
.get_addr(daemon_spec
.host
))
1065 'agent.json': json
.dumps(cfg
),
1066 'keyring': daemon_spec
.keyring
,
1067 'root_cert.pem': self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert(),
1068 'listener.crt': listener_cert
,
1069 'listener.key': listener_key
,
1072 return config
, sorted([str(self
.mgr
.get_mgr_ip()), str(self
.mgr
.cherrypy_thread
.server_port
),
1073 self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert(),
1074 str(self
.mgr
.get_module_option('device_enhanced_scan'))])