7 from abc
import ABCMeta
, abstractmethod
8 from typing
import TYPE_CHECKING
, List
, Callable
, TypeVar
, \
9 Optional
, Dict
, Any
, Tuple
, NewType
, cast
11 from mgr_module
import HandleCommandResult
, MonCommandFailed
13 from ceph
.deployment
.service_spec
import ServiceSpec
, RGWSpec
14 from ceph
.deployment
.utils
import is_ipv6
, unwrap_ipv6
15 from mgr_util
import build_url
16 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
17 from orchestrator
._interface
import daemon_type_to_service
18 from cephadm
import utils
21 from cephadm
.module
import CephadmOrchestrator
23 logger
= logging
.getLogger(__name__
)
25 ServiceSpecs
= TypeVar('ServiceSpecs', bound
=ServiceSpec
)
26 AuthEntity
= NewType('AuthEntity', str)
29 class CephadmDaemonDeploySpec
:
30 # typing.NamedTuple + Generic is broken in py36
31 def __init__(self
, host
: str, daemon_id
: str,
33 network
: Optional
[str] = None,
34 keyring
: Optional
[str] = None,
35 extra_args
: Optional
[List
[str]] = None,
37 extra_files
: Optional
[Dict
[str, Any
]] = None,
38 daemon_type
: Optional
[str] = None,
39 ip
: Optional
[str] = None,
40 ports
: Optional
[List
[int]] = None,
41 rank
: Optional
[int] = None,
42 rank_generation
: Optional
[int] = None,
43 extra_container_args
: Optional
[List
[str]] = None,
46 A data struction to encapsulate `cephadm deploy ...
49 self
.daemon_id
= daemon_id
50 self
.service_name
= service_name
51 daemon_type
= daemon_type
or (service_name
.split('.')[0])
52 assert daemon_type
is not None
53 self
.daemon_type
: str = daemon_type
56 self
.network
= network
59 self
.keyring
: Optional
[str] = keyring
61 # For run_cephadm. Would be great to have more expressive names.
62 self
.extra_args
: List
[str] = extra_args
or []
64 self
.ceph_conf
= ceph_conf
65 self
.extra_files
= extra_files
or {}
67 # TCP ports used by the daemon
68 self
.ports
: List
[int] = ports
or []
69 self
.ip
: Optional
[str] = ip
71 # values to be populated during generate_config calls
72 # and then used in _run_cephadm
73 self
.final_config
: Dict
[str, Any
] = {}
74 self
.deps
: List
[str] = []
76 self
.rank
: Optional
[int] = rank
77 self
.rank_generation
: Optional
[int] = rank_generation
79 self
.extra_container_args
= extra_container_args
81 def name(self
) -> str:
82 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
84 def config_get_files(self
) -> Dict
[str, Any
]:
85 files
= self
.extra_files
87 files
['config'] = self
.ceph_conf
92 def from_daemon_description(dd
: DaemonDescription
) -> 'CephadmDaemonDeploySpec':
96 return CephadmDaemonDeploySpec(
98 daemon_id
=dd
.daemon_id
,
99 daemon_type
=dd
.daemon_type
,
100 service_name
=dd
.service_name(),
104 rank_generation
=dd
.rank_generation
,
105 extra_container_args
=dd
.extra_container_args
,
108 def to_daemon_description(self
, status
: DaemonDescriptionStatus
, status_desc
: str) -> DaemonDescription
:
109 return DaemonDescription(
110 daemon_type
=self
.daemon_type
,
111 daemon_id
=self
.daemon_id
,
112 service_name
=self
.service_name
,
115 status_desc
=status_desc
,
119 rank_generation
=self
.rank_generation
,
120 extra_container_args
=self
.extra_container_args
,
124 class CephadmService(metaclass
=ABCMeta
):
126 Base class for service types. Often providing a create() and config() fn.
131 def TYPE(self
) -> str:
134 def __init__(self
, mgr
: "CephadmOrchestrator"):
135 self
.mgr
: "CephadmOrchestrator" = mgr
137 def allow_colo(self
) -> bool:
139 Return True if multiple daemons of the same type can colocate on
144 def primary_daemon_type(self
) -> str:
146 This is the type of the primary (usually only) daemon to be deployed.
150 def per_host_daemon_type(self
) -> Optional
[str]:
152 If defined, this type of daemon will be deployed once for each host
153 containing one or more daemons of the primary type.
157 def ranked(self
) -> bool:
159 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
160 generation (0, 1, ...) to each daemon we create/deploy.
164 def fence_old_ranks(self
,
166 rank_map
: Dict
[int, Dict
[int, Optional
[str]]],
167 num_ranks
: int) -> None:
170 def make_daemon_spec(
176 daemon_type
: Optional
[str] = None,
177 ports
: Optional
[List
[int]] = None,
178 ip
: Optional
[str] = None,
179 rank
: Optional
[int] = None,
180 rank_generation
: Optional
[int] = None,
181 ) -> CephadmDaemonDeploySpec
:
182 return CephadmDaemonDeploySpec(
185 service_name
=spec
.service_name(),
187 daemon_type
=daemon_type
,
191 rank_generation
=rank_generation
,
192 extra_container_args
=spec
.extra_container_args
if hasattr(
193 spec
, 'extra_container_args') else None,
196 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
197 raise NotImplementedError()
199 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
200 raise NotImplementedError()
202 def config(self
, spec
: ServiceSpec
) -> None:
204 Configure the cluster for this service. Only called *once* per
205 service apply. Not for every daemon.
209 def daemon_check_post(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
210 """The post actions needed to be done after daemons are checked"""
211 if self
.mgr
.config_dashboard
:
212 if 'dashboard' in self
.mgr
.get('mgr_map')['modules']:
213 self
.config_dashboard(daemon_descrs
)
215 logger
.debug('Dashboard is not enabled. Skip configuration.')
217 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
218 """Config dashboard settings."""
219 raise NotImplementedError()
221 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
222 # if this is called for a service type where it hasn't explcitly been
223 # defined, return empty Daemon Desc
224 return DaemonDescription()
226 def get_keyring_with_caps(self
, entity
: AuthEntity
, caps
: List
[str]) -> str:
227 ret
, keyring
, err
= self
.mgr
.mon_command({
228 'prefix': 'auth get-or-create',
233 ret
, out
, err
= self
.mgr
.mon_command({
234 'prefix': 'auth caps',
239 self
.mgr
.log
.warning(f
"Unable to update caps for {entity}")
242 def _inventory_get_fqdn(self
, hostname
: str) -> str:
243 """Get a host's FQDN with its hostname.
245 If the FQDN can't be resolved, the address from the inventory will
248 addr
= self
.mgr
.inventory
.get_addr(hostname
)
249 return socket
.getfqdn(addr
)
251 def _set_service_url_on_dashboard(self
,
255 service_url
: str) -> None:
256 """A helper to get and set service_url via Dashboard's MON command.
258 If result of get_mon_cmd differs from service_url, set_mon_cmd will
259 be sent to set the service_url.
261 def get_set_cmd_dicts(out
: str) -> List
[dict]:
263 'prefix': set_mon_cmd
,
266 return [cmd_dict
] if service_url
!= out
else []
268 self
._check
_and
_set
_dashboard
(
269 service_name
=service_name
,
271 get_set_cmd_dicts
=get_set_cmd_dicts
274 def _check_and_set_dashboard(self
,
277 get_set_cmd_dicts
: Callable
[[str], List
[dict]]) -> None:
278 """A helper to set configs in the Dashboard.
280 The method is useful for the pattern:
281 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
283 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
284 - Determine if the config need to be update. NOTE: This step is important because if a
285 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
286 kicks the serve() loop and the logic using this method is likely to be called again.
287 A config should be updated only when needed.
288 - Update a config in Dashboard by using a Dashboard command.
290 :param service_name: the service name to be used for logging
291 :type service_name: str
292 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
294 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
298 'prefix': 'dashboard iscsi-gateway-add',
299 'service_url': 'http://admin:admin@aaa:5000',
303 'prefix': 'dashboard iscsi-gateway-add',
304 'service_url': 'http://admin:admin@bbb:5000',
308 The function should return empty list if no command need to be sent.
309 :type get_set_cmd_dicts: Callable[[str], List[dict]]
313 _
, out
, _
= self
.mgr
.check_mon_command({
316 except MonCommandFailed
as e
:
317 logger
.warning('Failed to get Dashboard config for %s: %s', service_name
, e
)
319 cmd_dicts
= get_set_cmd_dicts(out
.strip())
320 for cmd_dict
in list(cmd_dicts
):
322 inbuf
= cmd_dict
.pop('inbuf', None)
323 _
, out
, _
= self
.mgr
.check_mon_command(cmd_dict
, inbuf
)
324 except MonCommandFailed
as e
:
325 logger
.warning('Failed to set Dashboard config for %s: %s', service_name
, e
)
330 known
: Optional
[List
[str]] = None, # output argument
331 force
: bool = False) -> HandleCommandResult
:
332 r
= HandleCommandResult(*self
.mgr
.mon_command({
333 'prefix': "osd ok-to-stop",
339 j
= json
.loads(r
.stdout
)
340 except json
.decoder
.JSONDecodeError
:
341 self
.mgr
.log
.warning("osd ok-to-stop didn't return structured result")
345 if known
is not None and j
and j
.get('ok_to_stop'):
346 self
.mgr
.log
.debug(f
"got {j}")
347 known
.extend([f
'osd.{x}' for x
in j
.get('osds', [])])
348 return HandleCommandResult(
350 f
'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
356 daemon_ids
: List
[str],
358 known
: Optional
[List
[str]] = None # output argument
359 ) -> HandleCommandResult
:
360 names
= [f
'{self.TYPE}.{d_id}' for d_id
in daemon_ids
]
361 out
= f
'It appears safe to stop {",".join(names)}'
362 err
= f
'It is NOT safe to stop {",".join(names)} at this time'
364 if self
.TYPE
not in ['mon', 'osd', 'mds']:
366 return HandleCommandResult(0, out
)
368 if self
.TYPE
== 'osd':
369 return self
.ok_to_stop_osd(daemon_ids
, known
, force
)
371 r
= HandleCommandResult(*self
.mgr
.mon_command({
372 'prefix': f
'{self.TYPE} ok-to-stop',
377 err
= f
'{err}: {r.stderr}' if r
.stderr
else err
379 return HandleCommandResult(r
.retval
, r
.stdout
, err
)
381 out
= f
'{out}: {r.stdout}' if r
.stdout
else out
383 return HandleCommandResult(r
.retval
, out
, r
.stderr
)
385 def _enough_daemons_to_stop(self
, daemon_type
: str, daemon_ids
: List
[str], service
: str, low_limit
: int, alert
: bool = False) -> Tuple
[bool, str]:
386 # Provides a warning about if it possible or not to stop <n> daemons in a service
387 names
= [f
'{daemon_type}.{d_id}' for d_id
in daemon_ids
]
388 number_of_running_daemons
= len(
390 for daemon
in self
.mgr
.cache
.get_daemons_by_type(daemon_type
)
391 if daemon
.status
== DaemonDescriptionStatus
.running
])
392 if (number_of_running_daemons
- len(daemon_ids
)) >= low_limit
:
393 return False, f
'It is presumed safe to stop {names}'
395 num_daemons_left
= number_of_running_daemons
- len(daemon_ids
)
397 def plural(count
: int) -> str:
398 return 'daemon' if count
== 1 else 'daemons'
400 left_count
= "no" if num_daemons_left
== 0 else num_daemons_left
403 out
= (f
'ALERT: Cannot stop {names} in {service} service. '
404 f
'Not enough remaining {service} daemons. '
405 f
'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
407 out
= (f
'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
408 f
'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
409 f
'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
412 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
414 Called before the daemon is removed.
416 assert daemon
.daemon_type
is not None
417 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
418 logger
.debug(f
'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
420 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
422 Called after the daemon is removed.
424 assert daemon
.daemon_type
is not None
425 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
426 logger
.debug(f
'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
428 def purge(self
, service_name
: str) -> None:
429 """Called to carry out any purge tasks following service removal"""
430 logger
.debug(f
'Purge called for {self.TYPE} - no action taken')
433 class CephService(CephadmService
):
434 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
435 # Ceph.daemons (mon, mgr, mds, osd, etc)
436 cephadm_config
= self
.get_config_and_keyring(
437 daemon_spec
.daemon_type
,
438 daemon_spec
.daemon_id
,
439 host
=daemon_spec
.host
,
440 keyring
=daemon_spec
.keyring
,
441 extra_ceph_config
=daemon_spec
.ceph_conf
)
443 if daemon_spec
.config_get_files():
444 cephadm_config
.update({'files': daemon_spec
.config_get_files()})
446 return cephadm_config
, []
448 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
449 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
450 self
.remove_keyring(daemon
)
452 def get_auth_entity(self
, daemon_id
: str, host
: str = "") -> AuthEntity
:
454 Map the daemon id to a cephx keyring entity name
456 # despite this mapping entity names to daemons, self.TYPE within
457 # the CephService class refers to service types, not daemon types
458 if self
.TYPE
in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
459 return AuthEntity(f
'client.{self.TYPE}.{daemon_id}')
460 elif self
.TYPE
in ['crash', 'agent']:
462 raise OrchestratorError(
463 f
'Host not provided to generate <{self.TYPE}> auth entity name')
464 return AuthEntity(f
'client.{self.TYPE}.{host}')
465 elif self
.TYPE
== 'mon':
466 return AuthEntity('mon.')
467 elif self
.TYPE
in ['mgr', 'osd', 'mds']:
468 return AuthEntity(f
'{self.TYPE}.{daemon_id}')
470 raise OrchestratorError("unknown daemon type")
472 def get_config_and_keyring(self
,
476 keyring
: Optional
[str] = None,
477 extra_ceph_config
: Optional
[str] = None
481 entity
: AuthEntity
= self
.get_auth_entity(daemon_id
, host
=host
)
482 ret
, keyring
, err
= self
.mgr
.check_mon_command({
483 'prefix': 'auth get',
487 config
= self
.mgr
.get_minimal_ceph_conf()
489 if extra_ceph_config
:
490 config
+= extra_ceph_config
497 def remove_keyring(self
, daemon
: DaemonDescription
) -> None:
498 assert daemon
.daemon_id
is not None
499 assert daemon
.hostname
is not None
500 daemon_id
: str = daemon
.daemon_id
501 host
: str = daemon
.hostname
503 assert daemon
.daemon_type
!= 'mon'
505 entity
= self
.get_auth_entity(daemon_id
, host
=host
)
507 logger
.info(f
'Removing key for {entity}')
508 ret
, out
, err
= self
.mgr
.mon_command({
514 class MonService(CephService
):
517 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
519 Create a new monitor on the given host.
521 assert self
.TYPE
== daemon_spec
.daemon_type
522 name
, _
, network
= daemon_spec
.daemon_id
, daemon_spec
.host
, daemon_spec
.network
525 ret
, keyring
, err
= self
.mgr
.check_mon_command({
526 'prefix': 'auth get',
527 'entity': self
.get_auth_entity(name
),
530 extra_config
= '[mon.%s]\n' % name
532 # infer whether this is a CIDR network, addrvec, or plain IP
534 extra_config
+= 'public network = %s\n' % network
535 elif network
.startswith('[v') and network
.endswith(']'):
536 extra_config
+= 'public addrv = %s\n' % network
537 elif is_ipv6(network
):
538 extra_config
+= 'public addr = %s\n' % unwrap_ipv6(network
)
539 elif ':' not in network
:
540 extra_config
+= 'public addr = %s\n' % network
542 raise OrchestratorError(
543 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network
)
545 # try to get the public_network from the config
546 ret
, network
, err
= self
.mgr
.check_mon_command({
547 'prefix': 'config get',
549 'key': 'public_network',
551 network
= network
.strip() if network
else network
553 raise OrchestratorError(
554 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
555 if '/' not in network
:
556 raise OrchestratorError(
557 'public_network is set but does not look like a CIDR network: \'%s\'' % network
)
558 extra_config
+= 'public network = %s\n' % network
560 daemon_spec
.ceph_conf
= extra_config
561 daemon_spec
.keyring
= keyring
563 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
567 def _check_safe_to_destroy(self
, mon_id
: str) -> None:
568 ret
, out
, err
= self
.mgr
.check_mon_command({
569 'prefix': 'quorum_status',
574 raise OrchestratorError('failed to parse quorum status')
576 mons
= [m
['name'] for m
in j
['monmap']['mons']]
577 if mon_id
not in mons
:
578 logger
.info('Safe to remove mon.%s: not in monmap (%s)' % (
581 new_mons
= [m
for m
in mons
if m
!= mon_id
]
582 new_quorum
= [m
for m
in j
['quorum_names'] if m
!= mon_id
]
583 if len(new_quorum
) > len(new_mons
) / 2:
584 logger
.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
585 (mon_id
, new_quorum
, new_mons
))
587 raise OrchestratorError(
588 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id
, new_quorum
, new_mons
))
590 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
591 super().pre_remove(daemon
)
593 assert daemon
.daemon_id
is not None
594 daemon_id
: str = daemon
.daemon_id
595 self
._check
_safe
_to
_destroy
(daemon_id
)
597 # remove mon from quorum before we destroy the daemon
598 logger
.info('Removing monitor %s from monmap...' % daemon_id
)
599 ret
, out
, err
= self
.mgr
.check_mon_command({
604 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
605 # Do not remove the mon keyring.
606 # super().post_remove(daemon)
610 class MgrService(CephService
):
613 def allow_colo(self
) -> bool:
614 if self
.mgr
.get_ceph_option('mgr_standby_modules'):
615 # traditional mgr mode: standby daemons' modules listen on
616 # ports and redirect to the primary. we must not schedule
617 # multiple mgrs on the same host or else ports will
621 # standby daemons do nothing, and therefore port conflicts
625 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
627 Create a new manager instance on a host.
629 assert self
.TYPE
== daemon_spec
.daemon_type
630 mgr_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
633 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mgr_id
),
634 ['mon', 'profile mgr',
638 # Retrieve ports used by manager modules
639 # In the case of the dashboard port and with several manager daemons
640 # running in different hosts, it exists the possibility that the
641 # user has decided to use different dashboard ports in each server
642 # If this is the case then the dashboard port opened will be only the used
645 ret
, mgr_services
, err
= self
.mgr
.check_mon_command({
646 'prefix': 'mgr services',
649 mgr_endpoints
= json
.loads(mgr_services
)
650 for end_point
in mgr_endpoints
.values():
651 port
= re
.search(r
'\:\d+\/', end_point
)
653 ports
.append(int(port
[0][1:-1]))
656 daemon_spec
.ports
= ports
658 daemon_spec
.keyring
= keyring
660 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
664 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
665 for daemon
in daemon_descrs
:
666 assert daemon
.daemon_type
is not None
667 assert daemon
.daemon_id
is not None
668 if self
.mgr
.daemon_is_self(daemon
.daemon_type
, daemon
.daemon_id
):
670 # if no active mgr found, return empty Daemon Desc
671 return DaemonDescription()
673 def fail_over(self
) -> None:
674 # this has been seen to sometimes transiently fail even when there are multiple
675 # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
676 class NoStandbyError(OrchestratorError
):
678 no_standby_exc
= NoStandbyError('Need standby mgr daemon', event_kind_subject
=(
679 'daemon', 'mgr' + self
.mgr
.get_mgr_id()))
680 for sleep_secs
in [2, 8, 15]:
682 if not self
.mgr_map_has_standby():
684 self
.mgr
.events
.for_daemon('mgr' + self
.mgr
.get_mgr_id(),
685 'INFO', 'Failing over to other MGR')
686 logger
.info('Failing over to other MGR')
689 ret
, out
, err
= self
.mgr
.check_mon_command({
690 'prefix': 'mgr fail',
691 'who': self
.mgr
.get_mgr_id(),
694 except NoStandbyError
:
696 f
'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
697 time
.sleep(sleep_secs
)
700 def mgr_map_has_standby(self
) -> bool:
702 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
703 we know it joined the cluster
705 mgr_map
= self
.mgr
.get('mgr_map')
706 num
= len(mgr_map
.get('standbys'))
711 daemon_ids
: List
[str],
713 known
: Optional
[List
[str]] = None # output argument
714 ) -> HandleCommandResult
:
715 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
717 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'Mgr', 1, True)
719 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
721 mgr_daemons
= self
.mgr
.cache
.get_daemons_by_type(self
.TYPE
)
722 active
= self
.get_active_daemon(mgr_daemons
).daemon_id
723 if active
in daemon_ids
:
724 warn_message
= 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
725 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
727 return HandleCommandResult(0, warn_message
, '')
730 class MdsService(CephService
):
733 def allow_colo(self
) -> bool:
736 def config(self
, spec
: ServiceSpec
) -> None:
737 assert self
.TYPE
== spec
.service_type
738 assert spec
.service_id
740 # ensure mds_join_fs is set for these daemons
741 ret
, out
, err
= self
.mgr
.check_mon_command({
742 'prefix': 'config set',
743 'who': 'mds.' + spec
.service_id
,
744 'name': 'mds_join_fs',
745 'value': spec
.service_id
,
748 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
749 assert self
.TYPE
== daemon_spec
.daemon_type
750 mds_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
753 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mds_id
),
754 ['mon', 'profile mds',
755 'osd', 'allow rw tag cephfs *=*',
757 daemon_spec
.keyring
= keyring
759 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
763 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
764 active_mds_strs
= list()
765 for fs
in self
.mgr
.get('fs_map')['filesystems']:
766 mds_map
= fs
['mdsmap']
767 if mds_map
is not None:
768 for mds_id
, mds_status
in mds_map
['info'].items():
769 if mds_status
['state'] == 'up:active':
770 active_mds_strs
.append(mds_status
['name'])
771 if len(active_mds_strs
) != 0:
772 for daemon
in daemon_descrs
:
773 if daemon
.daemon_id
in active_mds_strs
:
775 # if no mds found, return empty Daemon Desc
776 return DaemonDescription()
778 def purge(self
, service_name
: str) -> None:
779 self
.mgr
.check_mon_command({
780 'prefix': 'config rm',
782 'name': 'mds_join_fs',
786 class RgwService(CephService
):
789 def allow_colo(self
) -> bool:
792 def config(self
, spec
: RGWSpec
) -> None: # type: ignore
793 assert self
.TYPE
== spec
.service_type
795 # set rgw_realm and rgw_zone, if present
797 ret
, out
, err
= self
.mgr
.check_mon_command({
798 'prefix': 'config set',
799 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
801 'value': spec
.rgw_realm
,
804 ret
, out
, err
= self
.mgr
.check_mon_command({
805 'prefix': 'config set',
806 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
808 'value': spec
.rgw_zone
,
811 if spec
.rgw_frontend_ssl_certificate
:
812 if isinstance(spec
.rgw_frontend_ssl_certificate
, list):
813 cert_data
= '\n'.join(spec
.rgw_frontend_ssl_certificate
)
814 elif isinstance(spec
.rgw_frontend_ssl_certificate
, str):
815 cert_data
= spec
.rgw_frontend_ssl_certificate
817 raise OrchestratorError(
818 'Invalid rgw_frontend_ssl_certificate: %s'
819 % spec
.rgw_frontend_ssl_certificate
)
820 ret
, out
, err
= self
.mgr
.check_mon_command({
821 'prefix': 'config-key set',
822 'key': f
'rgw/cert/{spec.service_name()}',
826 # TODO: fail, if we don't have a spec
827 logger
.info('Saving service %s spec with placement %s' % (
828 spec
.service_name(), spec
.placement
.pretty_str()))
829 self
.mgr
.spec_store
.save(spec
)
830 self
.mgr
.trigger_connect_dashboard_rgw()
832 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
833 assert self
.TYPE
== daemon_spec
.daemon_type
834 rgw_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
835 spec
= cast(RGWSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
837 keyring
= self
.get_keyring(rgw_id
)
839 if daemon_spec
.ports
:
840 port
= daemon_spec
.ports
[0]
842 # this is a redeploy of older instance that doesn't have an explicitly
843 # assigned port, in which case we can assume there is only 1 per host
844 # and it matches the spec.
845 port
= spec
.get_port()
849 ftype
= spec
.rgw_frontend_type
or "beast"
854 f
"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
856 args
.append(f
"ssl_port={port}")
857 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
860 args
.append(f
"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
862 args
.append(f
"port={port}")
863 elif ftype
== 'civetweb':
866 # note the 's' suffix on port
867 args
.append(f
"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
869 args
.append(f
"port={port}s") # note the 's' suffix on port
870 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
873 args
.append(f
"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
875 args
.append(f
"port={port}")
876 frontend
= f
'{ftype} {" ".join(args)}'
878 ret
, out
, err
= self
.mgr
.check_mon_command({
879 'prefix': 'config set',
880 'who': utils
.name_to_config_section(daemon_spec
.name()),
881 'name': 'rgw_frontends',
885 daemon_spec
.keyring
= keyring
886 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
890 def get_keyring(self
, rgw_id
: str) -> str:
891 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(rgw_id
),
894 'osd', 'allow rwx tag rgw *=*'])
897 def purge(self
, service_name
: str) -> None:
898 self
.mgr
.check_mon_command({
899 'prefix': 'config rm',
900 'who': utils
.name_to_config_section(service_name
),
903 self
.mgr
.check_mon_command({
904 'prefix': 'config rm',
905 'who': utils
.name_to_config_section(service_name
),
908 self
.mgr
.check_mon_command({
909 'prefix': 'config-key rm',
910 'key': f
'rgw/cert/{service_name}',
912 self
.mgr
.trigger_connect_dashboard_rgw()
914 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
915 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
916 self
.mgr
.check_mon_command({
917 'prefix': 'config rm',
918 'who': utils
.name_to_config_section(daemon
.name()),
919 'name': 'rgw_frontends',
924 daemon_ids
: List
[str],
926 known
: Optional
[List
[str]] = None # output argument
927 ) -> HandleCommandResult
:
928 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
929 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
930 def ingress_present() -> bool:
931 running_ingress_daemons
= [
932 daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type('ingress') if daemon
.status
== 1]
933 running_haproxy_daemons
= [
934 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'haproxy']
935 running_keepalived_daemons
= [
936 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'keepalived']
937 # check that there is at least one haproxy and keepalived daemon running
938 if running_haproxy_daemons
and running_keepalived_daemons
:
942 # if only 1 rgw, alert user (this is not passable with --force)
943 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'RGW', 1, True)
945 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
947 # if reached here, there is > 1 rgw daemon.
948 # Say okay if load balancer present or force flag set
949 if ingress_present() or force
:
950 return HandleCommandResult(0, warn_message
, '')
952 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
954 warn_message
= "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
955 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
957 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
958 self
.mgr
.trigger_connect_dashboard_rgw()
961 class RbdMirrorService(CephService
):
964 def allow_colo(self
) -> bool:
967 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
968 assert self
.TYPE
== daemon_spec
.daemon_type
969 daemon_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
971 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
),
972 ['mon', 'profile rbd-mirror',
973 'osd', 'profile rbd'])
975 daemon_spec
.keyring
= keyring
977 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
983 daemon_ids
: List
[str],
985 known
: Optional
[List
[str]] = None # output argument
986 ) -> HandleCommandResult
:
987 # if only 1 rbd-mirror, alert user (this is not passable with --force)
988 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(
989 self
.TYPE
, daemon_ids
, 'Rbdmirror', 1, True)
991 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
992 return HandleCommandResult(0, warn_message
, '')
995 class CrashService(CephService
):
998 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
999 assert self
.TYPE
== daemon_spec
.daemon_type
1000 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
1002 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
),
1003 ['mon', 'profile crash',
1004 'mgr', 'profile crash'])
1006 daemon_spec
.keyring
= keyring
1008 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1013 class CephfsMirrorService(CephService
):
1014 TYPE
= 'cephfs-mirror'
1016 def config(self
, spec
: ServiceSpec
) -> None:
1017 # make sure mirroring module is enabled
1018 mgr_map
= self
.mgr
.get('mgr_map')
1019 mod_name
= 'mirroring'
1020 if mod_name
not in mgr_map
.get('services', {}):
1021 self
.mgr
.check_mon_command({
1022 'prefix': 'mgr module enable',
1025 # we shouldn't get here (mon will tell the mgr to respawn), but no
1026 # harm done if we do.
1028 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1029 assert self
.TYPE
== daemon_spec
.daemon_type
1031 ret
, keyring
, err
= self
.mgr
.check_mon_command({
1032 'prefix': 'auth get-or-create',
1033 'entity': self
.get_auth_entity(daemon_spec
.daemon_id
),
1034 'caps': ['mon', 'profile cephfs-mirror',
1036 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
1040 daemon_spec
.keyring
= keyring
1041 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1045 class CephadmAgent(CephService
):
1048 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1049 assert self
.TYPE
== daemon_spec
.daemon_type
1050 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
1052 if not self
.mgr
.cherrypy_thread
:
1053 raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
1055 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
), [])
1056 daemon_spec
.keyring
= keyring
1057 self
.mgr
.agent_cache
.agent_keys
[host
] = keyring
1059 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1063 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
1065 assert self
.mgr
.cherrypy_thread
1066 assert self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert()
1067 assert self
.mgr
.cherrypy_thread
.server_port
1069 raise OrchestratorError(
1070 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
1072 cfg
= {'target_ip': self
.mgr
.get_mgr_ip(),
1073 'target_port': self
.mgr
.cherrypy_thread
.server_port
,
1074 'refresh_period': self
.mgr
.agent_refresh_rate
,
1075 'listener_port': self
.mgr
.agent_starting_port
,
1076 'host': daemon_spec
.host
,
1077 'device_enhanced_scan': str(self
.mgr
.device_enhanced_scan
)}
1079 listener_cert
, listener_key
= self
.mgr
.cherrypy_thread
.ssl_certs
.generate_cert(
1080 self
.mgr
.inventory
.get_addr(daemon_spec
.host
))
1082 'agent.json': json
.dumps(cfg
),
1083 'keyring': daemon_spec
.keyring
,
1084 'root_cert.pem': self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert(),
1085 'listener.crt': listener_cert
,
1086 'listener.key': listener_key
,
1089 return config
, sorted([str(self
.mgr
.get_mgr_ip()), str(self
.mgr
.cherrypy_thread
.server_port
),
1090 self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert(),
1091 str(self
.mgr
.get_module_option('device_enhanced_scan'))])