5 from abc
import ABCMeta
, abstractmethod
6 from typing
import TYPE_CHECKING
, List
, Callable
, TypeVar
, \
7 Optional
, Dict
, Any
, Tuple
, NewType
, cast
9 from mgr_module
import HandleCommandResult
, MonCommandFailed
11 from ceph
.deployment
.service_spec
import ServiceSpec
, RGWSpec
12 from ceph
.deployment
.utils
import is_ipv6
, unwrap_ipv6
13 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
14 from orchestrator
._interface
import daemon_type_to_service
15 from cephadm
import utils
18 from cephadm
.module
import CephadmOrchestrator
20 logger
= logging
.getLogger(__name__
)
22 ServiceSpecs
= TypeVar('ServiceSpecs', bound
=ServiceSpec
)
23 AuthEntity
= NewType('AuthEntity', str)
26 class CephadmDaemonDeploySpec
:
27 # typing.NamedTuple + Generic is broken in py36
28 def __init__(self
, host
: str, daemon_id
: str,
30 network
: Optional
[str] = None,
31 keyring
: Optional
[str] = None,
32 extra_args
: Optional
[List
[str]] = None,
34 extra_files
: Optional
[Dict
[str, Any
]] = None,
35 daemon_type
: Optional
[str] = None,
36 ip
: Optional
[str] = None,
37 ports
: Optional
[List
[int]] = None,
38 rank
: Optional
[int] = None,
39 rank_generation
: Optional
[int] = None):
41 A data struction to encapsulate `cephadm deploy ...
44 self
.daemon_id
= daemon_id
45 self
.service_name
= service_name
46 daemon_type
= daemon_type
or (service_name
.split('.')[0])
47 assert daemon_type
is not None
48 self
.daemon_type
: str = daemon_type
51 self
.network
= network
54 self
.keyring
: Optional
[str] = keyring
56 # For run_cephadm. Would be great to have more expressive names.
57 self
.extra_args
: List
[str] = extra_args
or []
59 self
.ceph_conf
= ceph_conf
60 self
.extra_files
= extra_files
or {}
62 # TCP ports used by the daemon
63 self
.ports
: List
[int] = ports
or []
64 self
.ip
: Optional
[str] = ip
66 # values to be populated during generate_config calls
67 # and then used in _run_cephadm
68 self
.final_config
: Dict
[str, Any
] = {}
69 self
.deps
: List
[str] = []
71 self
.rank
: Optional
[int] = rank
72 self
.rank_generation
: Optional
[int] = rank_generation
74 def name(self
) -> str:
75 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
77 def config_get_files(self
) -> Dict
[str, Any
]:
78 files
= self
.extra_files
80 files
['config'] = self
.ceph_conf
85 def from_daemon_description(dd
: DaemonDescription
) -> 'CephadmDaemonDeploySpec':
89 return CephadmDaemonDeploySpec(
91 daemon_id
=dd
.daemon_id
,
92 daemon_type
=dd
.daemon_type
,
93 service_name
=dd
.service_name(),
97 rank_generation
=dd
.rank_generation
,
100 def to_daemon_description(self
, status
: DaemonDescriptionStatus
, status_desc
: str) -> DaemonDescription
:
101 return DaemonDescription(
102 daemon_type
=self
.daemon_type
,
103 daemon_id
=self
.daemon_id
,
104 service_name
=self
.service_name
,
107 status_desc
=status_desc
,
111 rank_generation
=self
.rank_generation
,
115 class CephadmService(metaclass
=ABCMeta
):
117 Base class for service types. Often providing a create() and config() fn.
122 def TYPE(self
) -> str:
125 def __init__(self
, mgr
: "CephadmOrchestrator"):
126 self
.mgr
: "CephadmOrchestrator" = mgr
128 def allow_colo(self
) -> bool:
130 Return True if multiple daemons of the same type can colocate on
135 def primary_daemon_type(self
) -> str:
137 This is the type of the primary (usually only) daemon to be deployed.
141 def per_host_daemon_type(self
) -> Optional
[str]:
143 If defined, this type of daemon will be deployed once for each host
144 containing one or more daemons of the primary type.
148 def ranked(self
) -> bool:
150 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
151 generation (0, 1, ...) to each daemon we create/deploy.
155 def fence_old_ranks(self
,
157 rank_map
: Dict
[int, Dict
[int, Optional
[str]]],
158 num_ranks
: int) -> None:
161 def make_daemon_spec(
167 daemon_type
: Optional
[str] = None,
168 ports
: Optional
[List
[int]] = None,
169 ip
: Optional
[str] = None,
170 rank
: Optional
[int] = None,
171 rank_generation
: Optional
[int] = None,
172 ) -> CephadmDaemonDeploySpec
:
173 return CephadmDaemonDeploySpec(
176 service_name
=spec
.service_name(),
178 daemon_type
=daemon_type
,
182 rank_generation
=rank_generation
,
185 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
186 raise NotImplementedError()
188 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
189 raise NotImplementedError()
191 def config(self
, spec
: ServiceSpec
) -> None:
193 Configure the cluster for this service. Only called *once* per
194 service apply. Not for every daemon.
198 def daemon_check_post(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
199 """The post actions needed to be done after daemons are checked"""
200 if self
.mgr
.config_dashboard
:
201 if 'dashboard' in self
.mgr
.get('mgr_map')['modules']:
202 self
.config_dashboard(daemon_descrs
)
204 logger
.debug('Dashboard is not enabled. Skip configuration.')
206 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
207 """Config dashboard settings."""
208 raise NotImplementedError()
210 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
211 # if this is called for a service type where it hasn't explcitly been
212 # defined, return empty Daemon Desc
213 return DaemonDescription()
215 def get_keyring_with_caps(self
, entity
: AuthEntity
, caps
: List
[str]) -> str:
216 ret
, keyring
, err
= self
.mgr
.mon_command({
217 'prefix': 'auth get-or-create',
222 ret
, out
, err
= self
.mgr
.mon_command({
223 'prefix': 'auth caps',
228 self
.mgr
.log
.warning(f
"Unable to update caps for {entity}")
231 def _inventory_get_addr(self
, hostname
: str) -> str:
232 """Get a host's address with its hostname."""
233 return self
.mgr
.inventory
.get_addr(hostname
)
235 def _set_service_url_on_dashboard(self
,
239 service_url
: str) -> None:
240 """A helper to get and set service_url via Dashboard's MON command.
242 If result of get_mon_cmd differs from service_url, set_mon_cmd will
243 be sent to set the service_url.
245 def get_set_cmd_dicts(out
: str) -> List
[dict]:
247 'prefix': set_mon_cmd
,
250 return [cmd_dict
] if service_url
!= out
else []
252 self
._check
_and
_set
_dashboard
(
253 service_name
=service_name
,
255 get_set_cmd_dicts
=get_set_cmd_dicts
258 def _check_and_set_dashboard(self
,
261 get_set_cmd_dicts
: Callable
[[str], List
[dict]]) -> None:
262 """A helper to set configs in the Dashboard.
264 The method is useful for the pattern:
265 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
267 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
268 - Determine if the config need to be update. NOTE: This step is important because if a
269 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
270 kicks the serve() loop and the logic using this method is likely to be called again.
271 A config should be updated only when needed.
272 - Update a config in Dashboard by using a Dashboard command.
274 :param service_name: the service name to be used for logging
275 :type service_name: str
276 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
278 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
282 'prefix': 'dashboard iscsi-gateway-add',
283 'service_url': 'http://admin:admin@aaa:5000',
287 'prefix': 'dashboard iscsi-gateway-add',
288 'service_url': 'http://admin:admin@bbb:5000',
292 The function should return empty list if no command need to be sent.
293 :type get_set_cmd_dicts: Callable[[str], List[dict]]
297 _
, out
, _
= self
.mgr
.check_mon_command({
300 except MonCommandFailed
as e
:
301 logger
.warning('Failed to get Dashboard config for %s: %s', service_name
, e
)
303 cmd_dicts
= get_set_cmd_dicts(out
.strip())
304 for cmd_dict
in list(cmd_dicts
):
306 inbuf
= cmd_dict
.pop('inbuf', None)
307 _
, out
, _
= self
.mgr
.check_mon_command(cmd_dict
, inbuf
)
308 except MonCommandFailed
as e
:
309 logger
.warning('Failed to set Dashboard config for %s: %s', service_name
, e
)
314 known
: Optional
[List
[str]] = None, # output argument
315 force
: bool = False) -> HandleCommandResult
:
316 r
= HandleCommandResult(*self
.mgr
.mon_command({
317 'prefix': "osd ok-to-stop",
323 j
= json
.loads(r
.stdout
)
324 except json
.decoder
.JSONDecodeError
:
325 self
.mgr
.log
.warning("osd ok-to-stop didn't return structured result")
329 if known
is not None and j
and j
.get('ok_to_stop'):
330 self
.mgr
.log
.debug(f
"got {j}")
331 known
.extend([f
'osd.{x}' for x
in j
.get('osds', [])])
332 return HandleCommandResult(
334 f
'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
340 daemon_ids
: List
[str],
342 known
: Optional
[List
[str]] = None # output argument
343 ) -> HandleCommandResult
:
344 names
= [f
'{self.TYPE}.{d_id}' for d_id
in daemon_ids
]
345 out
= f
'It appears safe to stop {",".join(names)}'
346 err
= f
'It is NOT safe to stop {",".join(names)} at this time'
348 if self
.TYPE
not in ['mon', 'osd', 'mds']:
350 return HandleCommandResult(0, out
)
352 if self
.TYPE
== 'osd':
353 return self
.ok_to_stop_osd(daemon_ids
, known
, force
)
355 r
= HandleCommandResult(*self
.mgr
.mon_command({
356 'prefix': f
'{self.TYPE} ok-to-stop',
361 err
= f
'{err}: {r.stderr}' if r
.stderr
else err
363 return HandleCommandResult(r
.retval
, r
.stdout
, err
)
365 out
= f
'{out}: {r.stdout}' if r
.stdout
else out
367 return HandleCommandResult(r
.retval
, out
, r
.stderr
)
369 def _enough_daemons_to_stop(self
, daemon_type
: str, daemon_ids
: List
[str], service
: str, low_limit
: int, alert
: bool = False) -> Tuple
[bool, str]:
370 # Provides a warning about if it possible or not to stop <n> daemons in a service
371 names
= [f
'{daemon_type}.{d_id}' for d_id
in daemon_ids
]
372 number_of_running_daemons
= len(
374 for daemon
in self
.mgr
.cache
.get_daemons_by_type(daemon_type
)
375 if daemon
.status
== DaemonDescriptionStatus
.running
])
376 if (number_of_running_daemons
- len(daemon_ids
)) >= low_limit
:
377 return False, f
'It is presumed safe to stop {names}'
379 num_daemons_left
= number_of_running_daemons
- len(daemon_ids
)
381 def plural(count
: int) -> str:
382 return 'daemon' if count
== 1 else 'daemons'
384 left_count
= "no" if num_daemons_left
== 0 else num_daemons_left
387 out
= (f
'ALERT: Cannot stop {names} in {service} service. '
388 f
'Not enough remaining {service} daemons. '
389 f
'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
391 out
= (f
'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
392 f
'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
393 f
'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
396 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
398 Called before the daemon is removed.
400 assert daemon
.daemon_type
is not None
401 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
402 logger
.debug(f
'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
404 def post_remove(self
, daemon
: DaemonDescription
) -> None:
406 Called after the daemon is removed.
408 assert daemon
.daemon_type
is not None
409 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
410 logger
.debug(f
'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
412 def purge(self
, service_name
: str) -> None:
413 """Called to carry out any purge tasks following service removal"""
414 logger
.debug(f
'Purge called for {self.TYPE} - no action taken')
417 class CephService(CephadmService
):
418 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
419 # Ceph.daemons (mon, mgr, mds, osd, etc)
420 cephadm_config
= self
.get_config_and_keyring(
421 daemon_spec
.daemon_type
,
422 daemon_spec
.daemon_id
,
423 host
=daemon_spec
.host
,
424 keyring
=daemon_spec
.keyring
,
425 extra_ceph_config
=daemon_spec
.ceph_conf
)
427 if daemon_spec
.config_get_files():
428 cephadm_config
.update({'files': daemon_spec
.config_get_files()})
430 return cephadm_config
, []
432 def post_remove(self
, daemon
: DaemonDescription
) -> None:
433 super().post_remove(daemon
)
434 self
.remove_keyring(daemon
)
436 def get_auth_entity(self
, daemon_id
: str, host
: str = "") -> AuthEntity
:
438 Map the daemon id to a cephx keyring entity name
440 # despite this mapping entity names to daemons, self.TYPE within
441 # the CephService class refers to service types, not daemon types
442 if self
.TYPE
in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
443 return AuthEntity(f
'client.{self.TYPE}.{daemon_id}')
444 elif self
.TYPE
== 'crash':
446 raise OrchestratorError("Host not provided to generate <crash> auth entity name")
447 return AuthEntity(f
'client.{self.TYPE}.{host}')
448 elif self
.TYPE
== 'mon':
449 return AuthEntity('mon.')
450 elif self
.TYPE
in ['mgr', 'osd', 'mds']:
451 return AuthEntity(f
'{self.TYPE}.{daemon_id}')
453 raise OrchestratorError("unknown daemon type")
455 def get_config_and_keyring(self
,
459 keyring
: Optional
[str] = None,
460 extra_ceph_config
: Optional
[str] = None
464 entity
: AuthEntity
= self
.get_auth_entity(daemon_id
, host
=host
)
465 ret
, keyring
, err
= self
.mgr
.check_mon_command({
466 'prefix': 'auth get',
470 config
= self
.mgr
.get_minimal_ceph_conf()
472 if extra_ceph_config
:
473 config
+= extra_ceph_config
480 def remove_keyring(self
, daemon
: DaemonDescription
) -> None:
481 assert daemon
.daemon_id
is not None
482 assert daemon
.hostname
is not None
483 daemon_id
: str = daemon
.daemon_id
484 host
: str = daemon
.hostname
486 if daemon_id
== 'mon':
487 # do not remove the mon keyring
490 entity
= self
.get_auth_entity(daemon_id
, host
=host
)
492 logger
.info(f
'Removing key for {entity}')
493 ret
, out
, err
= self
.mgr
.mon_command({
499 class MonService(CephService
):
502 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
504 Create a new monitor on the given host.
506 assert self
.TYPE
== daemon_spec
.daemon_type
507 name
, _
, network
= daemon_spec
.daemon_id
, daemon_spec
.host
, daemon_spec
.network
510 ret
, keyring
, err
= self
.mgr
.check_mon_command({
511 'prefix': 'auth get',
512 'entity': self
.get_auth_entity(name
),
515 extra_config
= '[mon.%s]\n' % name
517 # infer whether this is a CIDR network, addrvec, or plain IP
519 extra_config
+= 'public network = %s\n' % network
520 elif network
.startswith('[v') and network
.endswith(']'):
521 extra_config
+= 'public addrv = %s\n' % network
522 elif is_ipv6(network
):
523 extra_config
+= 'public addr = %s\n' % unwrap_ipv6(network
)
524 elif ':' not in network
:
525 extra_config
+= 'public addr = %s\n' % network
527 raise OrchestratorError(
528 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network
)
530 # try to get the public_network from the config
531 ret
, network
, err
= self
.mgr
.check_mon_command({
532 'prefix': 'config get',
534 'key': 'public_network',
536 network
= network
.strip() if network
else network
538 raise OrchestratorError(
539 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
540 if '/' not in network
:
541 raise OrchestratorError(
542 'public_network is set but does not look like a CIDR network: \'%s\'' % network
)
543 extra_config
+= 'public network = %s\n' % network
545 daemon_spec
.ceph_conf
= extra_config
546 daemon_spec
.keyring
= keyring
548 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
552 def _check_safe_to_destroy(self
, mon_id
: str) -> None:
553 ret
, out
, err
= self
.mgr
.check_mon_command({
554 'prefix': 'quorum_status',
559 raise OrchestratorError('failed to parse quorum status')
561 mons
= [m
['name'] for m
in j
['monmap']['mons']]
562 if mon_id
not in mons
:
563 logger
.info('Safe to remove mon.%s: not in monmap (%s)' % (
566 new_mons
= [m
for m
in mons
if m
!= mon_id
]
567 new_quorum
= [m
for m
in j
['quorum_names'] if m
!= mon_id
]
568 if len(new_quorum
) > len(new_mons
) / 2:
569 logger
.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
570 (mon_id
, new_quorum
, new_mons
))
572 raise OrchestratorError(
573 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id
, new_quorum
, new_mons
))
575 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
576 super().pre_remove(daemon
)
578 assert daemon
.daemon_id
is not None
579 daemon_id
: str = daemon
.daemon_id
580 self
._check
_safe
_to
_destroy
(daemon_id
)
582 # remove mon from quorum before we destroy the daemon
583 logger
.info('Removing monitor %s from monmap...' % daemon_id
)
584 ret
, out
, err
= self
.mgr
.check_mon_command({
590 class MgrService(CephService
):
593 def allow_colo(self
) -> bool:
594 if self
.mgr
.get_ceph_option('mgr_standby_modules'):
595 # traditional mgr mode: standby daemons' modules listen on
596 # ports and redirect to the primary. we must not schedule
597 # multiple mgrs on the same host or else ports will
601 # standby daemons do nothing, and therefore port conflicts
605 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
607 Create a new manager instance on a host.
609 assert self
.TYPE
== daemon_spec
.daemon_type
610 mgr_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
613 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mgr_id
),
614 ['mon', 'profile mgr',
618 # Retrieve ports used by manager modules
619 # In the case of the dashboard port and with several manager daemons
620 # running in different hosts, it exists the possibility that the
621 # user has decided to use different dashboard ports in each server
622 # If this is the case then the dashboard port opened will be only the used
625 ret
, mgr_services
, err
= self
.mgr
.check_mon_command({
626 'prefix': 'mgr services',
629 mgr_endpoints
= json
.loads(mgr_services
)
630 for end_point
in mgr_endpoints
.values():
631 port
= re
.search(r
'\:\d+\/', end_point
)
633 ports
.append(int(port
[0][1:-1]))
636 daemon_spec
.ports
= ports
638 daemon_spec
.keyring
= keyring
640 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
644 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
645 for daemon
in daemon_descrs
:
646 assert daemon
.daemon_type
is not None
647 assert daemon
.daemon_id
is not None
648 if self
.mgr
.daemon_is_self(daemon
.daemon_type
, daemon
.daemon_id
):
650 # if no active mgr found, return empty Daemon Desc
651 return DaemonDescription()
653 def fail_over(self
) -> None:
654 if not self
.mgr_map_has_standby():
655 raise OrchestratorError('Need standby mgr daemon', event_kind_subject
=(
656 'daemon', 'mgr' + self
.mgr
.get_mgr_id()))
658 self
.mgr
.events
.for_daemon('mgr' + self
.mgr
.get_mgr_id(),
659 'INFO', 'Failing over to other MGR')
660 logger
.info('Failing over to other MGR')
663 ret
, out
, err
= self
.mgr
.check_mon_command({
664 'prefix': 'mgr fail',
665 'who': self
.mgr
.get_mgr_id(),
668 def mgr_map_has_standby(self
) -> bool:
670 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
671 we know it joined the cluster
673 mgr_map
= self
.mgr
.get('mgr_map')
674 num
= len(mgr_map
.get('standbys'))
679 daemon_ids
: List
[str],
681 known
: Optional
[List
[str]] = None # output argument
682 ) -> HandleCommandResult
:
683 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
685 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'Mgr', 1, True)
687 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
689 mgr_daemons
= self
.mgr
.cache
.get_daemons_by_type(self
.TYPE
)
690 active
= self
.get_active_daemon(mgr_daemons
).daemon_id
691 if active
in daemon_ids
:
692 warn_message
= 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
693 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
695 return HandleCommandResult(0, warn_message
, '')
698 class MdsService(CephService
):
701 def allow_colo(self
) -> bool:
704 def config(self
, spec
: ServiceSpec
) -> None:
705 assert self
.TYPE
== spec
.service_type
706 assert spec
.service_id
708 # ensure mds_join_fs is set for these daemons
709 ret
, out
, err
= self
.mgr
.check_mon_command({
710 'prefix': 'config set',
711 'who': 'mds.' + spec
.service_id
,
712 'name': 'mds_join_fs',
713 'value': spec
.service_id
,
716 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
717 assert self
.TYPE
== daemon_spec
.daemon_type
718 mds_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
721 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mds_id
),
722 ['mon', 'profile mds',
723 'osd', 'allow rw tag cephfs *=*',
725 daemon_spec
.keyring
= keyring
727 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
731 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
732 active_mds_strs
= list()
733 for fs
in self
.mgr
.get('fs_map')['filesystems']:
734 mds_map
= fs
['mdsmap']
735 if mds_map
is not None:
736 for mds_id
, mds_status
in mds_map
['info'].items():
737 if mds_status
['state'] == 'up:active':
738 active_mds_strs
.append(mds_status
['name'])
739 if len(active_mds_strs
) != 0:
740 for daemon
in daemon_descrs
:
741 if daemon
.daemon_id
in active_mds_strs
:
743 # if no mds found, return empty Daemon Desc
744 return DaemonDescription()
746 def purge(self
, service_name
: str) -> None:
747 self
.mgr
.check_mon_command({
748 'prefix': 'config rm',
750 'name': 'mds_join_fs',
754 class RgwService(CephService
):
757 def allow_colo(self
) -> bool:
760 def config(self
, spec
: RGWSpec
) -> None: # type: ignore
761 assert self
.TYPE
== spec
.service_type
763 # set rgw_realm and rgw_zone, if present
765 ret
, out
, err
= self
.mgr
.check_mon_command({
766 'prefix': 'config set',
767 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
769 'value': spec
.rgw_realm
,
772 ret
, out
, err
= self
.mgr
.check_mon_command({
773 'prefix': 'config set',
774 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
776 'value': spec
.rgw_zone
,
779 if spec
.rgw_frontend_ssl_certificate
:
780 if isinstance(spec
.rgw_frontend_ssl_certificate
, list):
781 cert_data
= '\n'.join(spec
.rgw_frontend_ssl_certificate
)
782 elif isinstance(spec
.rgw_frontend_ssl_certificate
, str):
783 cert_data
= spec
.rgw_frontend_ssl_certificate
785 raise OrchestratorError(
786 'Invalid rgw_frontend_ssl_certificate: %s'
787 % spec
.rgw_frontend_ssl_certificate
)
788 ret
, out
, err
= self
.mgr
.check_mon_command({
789 'prefix': 'config-key set',
790 'key': f
'rgw/cert/{spec.service_name()}',
794 # TODO: fail, if we don't have a spec
795 logger
.info('Saving service %s spec with placement %s' % (
796 spec
.service_name(), spec
.placement
.pretty_str()))
797 self
.mgr
.spec_store
.save(spec
)
798 self
.mgr
.trigger_connect_dashboard_rgw()
800 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
801 assert self
.TYPE
== daemon_spec
.daemon_type
802 rgw_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
803 spec
= cast(RGWSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
805 keyring
= self
.get_keyring(rgw_id
)
807 if daemon_spec
.ports
:
808 port
= daemon_spec
.ports
[0]
810 # this is a redeploy of older instance that doesn't have an explicitly
811 # assigned port, in which case we can assume there is only 1 per host
812 # and it matches the spec.
813 port
= spec
.get_port()
817 ftype
= spec
.rgw_frontend_type
or "beast"
821 args
.append(f
"ssl_endpoint={daemon_spec.ip}:{port}")
823 args
.append(f
"ssl_port={port}")
824 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
827 args
.append(f
"endpoint={daemon_spec.ip}:{port}")
829 args
.append(f
"port={port}")
830 elif ftype
== 'civetweb':
833 args
.append(f
"port={daemon_spec.ip}:{port}s") # note the 's' suffix on port
835 args
.append(f
"port={port}s") # note the 's' suffix on port
836 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
839 args
.append(f
"port={daemon_spec.ip}:{port}")
841 args
.append(f
"port={port}")
842 frontend
= f
'{ftype} {" ".join(args)}'
844 ret
, out
, err
= self
.mgr
.check_mon_command({
845 'prefix': 'config set',
846 'who': utils
.name_to_config_section(daemon_spec
.name()),
847 'name': 'rgw_frontends',
851 daemon_spec
.keyring
= keyring
852 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
856 def get_keyring(self
, rgw_id
: str) -> str:
857 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(rgw_id
),
860 'osd', 'allow rwx tag rgw *=*'])
863 def purge(self
, service_name
: str) -> None:
864 self
.mgr
.check_mon_command({
865 'prefix': 'config rm',
866 'who': utils
.name_to_config_section(service_name
),
869 self
.mgr
.check_mon_command({
870 'prefix': 'config rm',
871 'who': utils
.name_to_config_section(service_name
),
874 self
.mgr
.check_mon_command({
875 'prefix': 'config-key rm',
876 'key': f
'rgw/cert/{service_name}',
878 self
.mgr
.trigger_connect_dashboard_rgw()
880 def post_remove(self
, daemon
: DaemonDescription
) -> None:
881 super().post_remove(daemon
)
882 self
.mgr
.check_mon_command({
883 'prefix': 'config rm',
884 'who': utils
.name_to_config_section(daemon
.name()),
885 'name': 'rgw_frontends',
890 daemon_ids
: List
[str],
892 known
: Optional
[List
[str]] = None # output argument
893 ) -> HandleCommandResult
:
894 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
895 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
896 def ingress_present() -> bool:
897 running_ingress_daemons
= [
898 daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type('ingress') if daemon
.status
== 1]
899 running_haproxy_daemons
= [
900 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'haproxy']
901 running_keepalived_daemons
= [
902 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'keepalived']
903 # check that there is at least one haproxy and keepalived daemon running
904 if running_haproxy_daemons
and running_keepalived_daemons
:
908 # if only 1 rgw, alert user (this is not passable with --force)
909 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'RGW', 1, True)
911 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
913 # if reached here, there is > 1 rgw daemon.
914 # Say okay if load balancer present or force flag set
915 if ingress_present() or force
:
916 return HandleCommandResult(0, warn_message
, '')
918 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
920 warn_message
= "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
921 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
923 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
924 self
.mgr
.trigger_connect_dashboard_rgw()
927 class RbdMirrorService(CephService
):
930 def allow_colo(self
) -> bool:
933 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
934 assert self
.TYPE
== daemon_spec
.daemon_type
935 daemon_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
937 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
),
938 ['mon', 'profile rbd-mirror',
939 'osd', 'profile rbd'])
941 daemon_spec
.keyring
= keyring
943 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
949 daemon_ids
: List
[str],
951 known
: Optional
[List
[str]] = None # output argument
952 ) -> HandleCommandResult
:
953 # if only 1 rbd-mirror, alert user (this is not passable with --force)
954 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(
955 self
.TYPE
, daemon_ids
, 'Rbdmirror', 1, True)
957 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
958 return HandleCommandResult(0, warn_message
, '')
961 class CrashService(CephService
):
964 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
965 assert self
.TYPE
== daemon_spec
.daemon_type
966 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
968 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
),
969 ['mon', 'profile crash',
970 'mgr', 'profile crash'])
972 daemon_spec
.keyring
= keyring
974 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
979 class CephfsMirrorService(CephService
):
980 TYPE
= 'cephfs-mirror'
982 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
983 assert self
.TYPE
== daemon_spec
.daemon_type
985 ret
, keyring
, err
= self
.mgr
.check_mon_command({
986 'prefix': 'auth get-or-create',
987 'entity': self
.get_auth_entity(daemon_spec
.daemon_id
),
988 'caps': ['mon', 'profile cephfs-mirror',
990 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
994 daemon_spec
.keyring
= keyring
995 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)