5 from abc
import ABCMeta
, abstractmethod
6 from typing
import TYPE_CHECKING
, List
, Callable
, TypeVar
, \
7 Optional
, Dict
, Any
, Tuple
, NewType
, cast
9 from mgr_module
import HandleCommandResult
, MonCommandFailed
11 from ceph
.deployment
.service_spec
import ServiceSpec
, RGWSpec
12 from ceph
.deployment
.utils
import is_ipv6
, unwrap_ipv6
13 from mgr_util
import build_url
14 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
15 from orchestrator
._interface
import daemon_type_to_service
16 from cephadm
import utils
19 from cephadm
.module
import CephadmOrchestrator
21 logger
= logging
.getLogger(__name__
)
23 ServiceSpecs
= TypeVar('ServiceSpecs', bound
=ServiceSpec
)
24 AuthEntity
= NewType('AuthEntity', str)
27 class CephadmDaemonDeploySpec
:
28 # typing.NamedTuple + Generic is broken in py36
29 def __init__(self
, host
: str, daemon_id
: str,
31 network
: Optional
[str] = None,
32 keyring
: Optional
[str] = None,
33 extra_args
: Optional
[List
[str]] = None,
35 extra_files
: Optional
[Dict
[str, Any
]] = None,
36 daemon_type
: Optional
[str] = None,
37 ip
: Optional
[str] = None,
38 ports
: Optional
[List
[int]] = None,
39 rank
: Optional
[int] = None,
40 rank_generation
: Optional
[int] = None):
42 A data struction to encapsulate `cephadm deploy ...
45 self
.daemon_id
= daemon_id
46 self
.service_name
= service_name
47 daemon_type
= daemon_type
or (service_name
.split('.')[0])
48 assert daemon_type
is not None
49 self
.daemon_type
: str = daemon_type
52 self
.network
= network
55 self
.keyring
: Optional
[str] = keyring
57 # For run_cephadm. Would be great to have more expressive names.
58 self
.extra_args
: List
[str] = extra_args
or []
60 self
.ceph_conf
= ceph_conf
61 self
.extra_files
= extra_files
or {}
63 # TCP ports used by the daemon
64 self
.ports
: List
[int] = ports
or []
65 self
.ip
: Optional
[str] = ip
67 # values to be populated during generate_config calls
68 # and then used in _run_cephadm
69 self
.final_config
: Dict
[str, Any
] = {}
70 self
.deps
: List
[str] = []
72 self
.rank
: Optional
[int] = rank
73 self
.rank_generation
: Optional
[int] = rank_generation
75 def name(self
) -> str:
76 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
78 def config_get_files(self
) -> Dict
[str, Any
]:
79 files
= self
.extra_files
81 files
['config'] = self
.ceph_conf
86 def from_daemon_description(dd
: DaemonDescription
) -> 'CephadmDaemonDeploySpec':
90 return CephadmDaemonDeploySpec(
92 daemon_id
=dd
.daemon_id
,
93 daemon_type
=dd
.daemon_type
,
94 service_name
=dd
.service_name(),
98 rank_generation
=dd
.rank_generation
,
101 def to_daemon_description(self
, status
: DaemonDescriptionStatus
, status_desc
: str) -> DaemonDescription
:
102 return DaemonDescription(
103 daemon_type
=self
.daemon_type
,
104 daemon_id
=self
.daemon_id
,
105 service_name
=self
.service_name
,
108 status_desc
=status_desc
,
112 rank_generation
=self
.rank_generation
,
116 class CephadmService(metaclass
=ABCMeta
):
118 Base class for service types. Often providing a create() and config() fn.
123 def TYPE(self
) -> str:
126 def __init__(self
, mgr
: "CephadmOrchestrator"):
127 self
.mgr
: "CephadmOrchestrator" = mgr
129 def allow_colo(self
) -> bool:
131 Return True if multiple daemons of the same type can colocate on
136 def primary_daemon_type(self
) -> str:
138 This is the type of the primary (usually only) daemon to be deployed.
142 def per_host_daemon_type(self
) -> Optional
[str]:
144 If defined, this type of daemon will be deployed once for each host
145 containing one or more daemons of the primary type.
149 def ranked(self
) -> bool:
151 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
152 generation (0, 1, ...) to each daemon we create/deploy.
156 def fence_old_ranks(self
,
158 rank_map
: Dict
[int, Dict
[int, Optional
[str]]],
159 num_ranks
: int) -> None:
162 def make_daemon_spec(
168 daemon_type
: Optional
[str] = None,
169 ports
: Optional
[List
[int]] = None,
170 ip
: Optional
[str] = None,
171 rank
: Optional
[int] = None,
172 rank_generation
: Optional
[int] = None,
173 ) -> CephadmDaemonDeploySpec
:
174 return CephadmDaemonDeploySpec(
177 service_name
=spec
.service_name(),
179 daemon_type
=daemon_type
,
183 rank_generation
=rank_generation
,
186 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
187 raise NotImplementedError()
189 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
190 raise NotImplementedError()
192 def config(self
, spec
: ServiceSpec
) -> None:
194 Configure the cluster for this service. Only called *once* per
195 service apply. Not for every daemon.
199 def daemon_check_post(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
200 """The post actions needed to be done after daemons are checked"""
201 if self
.mgr
.config_dashboard
:
202 if 'dashboard' in self
.mgr
.get('mgr_map')['modules']:
203 self
.config_dashboard(daemon_descrs
)
205 logger
.debug('Dashboard is not enabled. Skip configuration.')
207 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
208 """Config dashboard settings."""
209 raise NotImplementedError()
211 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
212 # if this is called for a service type where it hasn't explcitly been
213 # defined, return empty Daemon Desc
214 return DaemonDescription()
216 def get_keyring_with_caps(self
, entity
: AuthEntity
, caps
: List
[str]) -> str:
217 ret
, keyring
, err
= self
.mgr
.mon_command({
218 'prefix': 'auth get-or-create',
223 ret
, out
, err
= self
.mgr
.mon_command({
224 'prefix': 'auth caps',
229 self
.mgr
.log
.warning(f
"Unable to update caps for {entity}")
232 def _inventory_get_addr(self
, hostname
: str) -> str:
233 """Get a host's address with its hostname."""
234 return self
.mgr
.inventory
.get_addr(hostname
)
236 def _set_service_url_on_dashboard(self
,
240 service_url
: str) -> None:
241 """A helper to get and set service_url via Dashboard's MON command.
243 If result of get_mon_cmd differs from service_url, set_mon_cmd will
244 be sent to set the service_url.
246 def get_set_cmd_dicts(out
: str) -> List
[dict]:
248 'prefix': set_mon_cmd
,
251 return [cmd_dict
] if service_url
!= out
else []
253 self
._check
_and
_set
_dashboard
(
254 service_name
=service_name
,
256 get_set_cmd_dicts
=get_set_cmd_dicts
259 def _check_and_set_dashboard(self
,
262 get_set_cmd_dicts
: Callable
[[str], List
[dict]]) -> None:
263 """A helper to set configs in the Dashboard.
265 The method is useful for the pattern:
266 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
268 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
269 - Determine if the config need to be update. NOTE: This step is important because if a
270 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
271 kicks the serve() loop and the logic using this method is likely to be called again.
272 A config should be updated only when needed.
273 - Update a config in Dashboard by using a Dashboard command.
275 :param service_name: the service name to be used for logging
276 :type service_name: str
277 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
279 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
283 'prefix': 'dashboard iscsi-gateway-add',
284 'service_url': 'http://admin:admin@aaa:5000',
288 'prefix': 'dashboard iscsi-gateway-add',
289 'service_url': 'http://admin:admin@bbb:5000',
293 The function should return empty list if no command need to be sent.
294 :type get_set_cmd_dicts: Callable[[str], List[dict]]
298 _
, out
, _
= self
.mgr
.check_mon_command({
301 except MonCommandFailed
as e
:
302 logger
.warning('Failed to get Dashboard config for %s: %s', service_name
, e
)
304 cmd_dicts
= get_set_cmd_dicts(out
.strip())
305 for cmd_dict
in list(cmd_dicts
):
307 inbuf
= cmd_dict
.pop('inbuf', None)
308 _
, out
, _
= self
.mgr
.check_mon_command(cmd_dict
, inbuf
)
309 except MonCommandFailed
as e
:
310 logger
.warning('Failed to set Dashboard config for %s: %s', service_name
, e
)
315 known
: Optional
[List
[str]] = None, # output argument
316 force
: bool = False) -> HandleCommandResult
:
317 r
= HandleCommandResult(*self
.mgr
.mon_command({
318 'prefix': "osd ok-to-stop",
324 j
= json
.loads(r
.stdout
)
325 except json
.decoder
.JSONDecodeError
:
326 self
.mgr
.log
.warning("osd ok-to-stop didn't return structured result")
330 if known
is not None and j
and j
.get('ok_to_stop'):
331 self
.mgr
.log
.debug(f
"got {j}")
332 known
.extend([f
'osd.{x}' for x
in j
.get('osds', [])])
333 return HandleCommandResult(
335 f
'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
341 daemon_ids
: List
[str],
343 known
: Optional
[List
[str]] = None # output argument
344 ) -> HandleCommandResult
:
345 names
= [f
'{self.TYPE}.{d_id}' for d_id
in daemon_ids
]
346 out
= f
'It appears safe to stop {",".join(names)}'
347 err
= f
'It is NOT safe to stop {",".join(names)} at this time'
349 if self
.TYPE
not in ['mon', 'osd', 'mds']:
351 return HandleCommandResult(0, out
)
353 if self
.TYPE
== 'osd':
354 return self
.ok_to_stop_osd(daemon_ids
, known
, force
)
356 r
= HandleCommandResult(*self
.mgr
.mon_command({
357 'prefix': f
'{self.TYPE} ok-to-stop',
362 err
= f
'{err}: {r.stderr}' if r
.stderr
else err
364 return HandleCommandResult(r
.retval
, r
.stdout
, err
)
366 out
= f
'{out}: {r.stdout}' if r
.stdout
else out
368 return HandleCommandResult(r
.retval
, out
, r
.stderr
)
370 def _enough_daemons_to_stop(self
, daemon_type
: str, daemon_ids
: List
[str], service
: str, low_limit
: int, alert
: bool = False) -> Tuple
[bool, str]:
371 # Provides a warning about if it possible or not to stop <n> daemons in a service
372 names
= [f
'{daemon_type}.{d_id}' for d_id
in daemon_ids
]
373 number_of_running_daemons
= len(
375 for daemon
in self
.mgr
.cache
.get_daemons_by_type(daemon_type
)
376 if daemon
.status
== DaemonDescriptionStatus
.running
])
377 if (number_of_running_daemons
- len(daemon_ids
)) >= low_limit
:
378 return False, f
'It is presumed safe to stop {names}'
380 num_daemons_left
= number_of_running_daemons
- len(daemon_ids
)
382 def plural(count
: int) -> str:
383 return 'daemon' if count
== 1 else 'daemons'
385 left_count
= "no" if num_daemons_left
== 0 else num_daemons_left
388 out
= (f
'ALERT: Cannot stop {names} in {service} service. '
389 f
'Not enough remaining {service} daemons. '
390 f
'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
392 out
= (f
'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
393 f
'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
394 f
'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
397 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
399 Called before the daemon is removed.
401 assert daemon
.daemon_type
is not None
402 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
403 logger
.debug(f
'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
405 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
407 Called after the daemon is removed.
409 assert daemon
.daemon_type
is not None
410 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
411 logger
.debug(f
'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
413 def purge(self
, service_name
: str) -> None:
414 """Called to carry out any purge tasks following service removal"""
415 logger
.debug(f
'Purge called for {self.TYPE} - no action taken')
418 class CephService(CephadmService
):
419 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
420 # Ceph.daemons (mon, mgr, mds, osd, etc)
421 cephadm_config
= self
.get_config_and_keyring(
422 daemon_spec
.daemon_type
,
423 daemon_spec
.daemon_id
,
424 host
=daemon_spec
.host
,
425 keyring
=daemon_spec
.keyring
,
426 extra_ceph_config
=daemon_spec
.ceph_conf
)
428 if daemon_spec
.config_get_files():
429 cephadm_config
.update({'files': daemon_spec
.config_get_files()})
431 return cephadm_config
, []
433 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
434 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
435 self
.remove_keyring(daemon
)
437 def get_auth_entity(self
, daemon_id
: str, host
: str = "") -> AuthEntity
:
439 Map the daemon id to a cephx keyring entity name
441 # despite this mapping entity names to daemons, self.TYPE within
442 # the CephService class refers to service types, not daemon types
443 if self
.TYPE
in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
444 return AuthEntity(f
'client.{self.TYPE}.{daemon_id}')
445 elif self
.TYPE
== 'crash':
447 raise OrchestratorError("Host not provided to generate <crash> auth entity name")
448 return AuthEntity(f
'client.{self.TYPE}.{host}')
449 elif self
.TYPE
== 'mon':
450 return AuthEntity('mon.')
451 elif self
.TYPE
in ['mgr', 'osd', 'mds']:
452 return AuthEntity(f
'{self.TYPE}.{daemon_id}')
454 raise OrchestratorError("unknown daemon type")
456 def get_config_and_keyring(self
,
460 keyring
: Optional
[str] = None,
461 extra_ceph_config
: Optional
[str] = None
465 entity
: AuthEntity
= self
.get_auth_entity(daemon_id
, host
=host
)
466 ret
, keyring
, err
= self
.mgr
.check_mon_command({
467 'prefix': 'auth get',
471 config
= self
.mgr
.get_minimal_ceph_conf()
473 if extra_ceph_config
:
474 config
+= extra_ceph_config
481 def remove_keyring(self
, daemon
: DaemonDescription
) -> None:
482 assert daemon
.daemon_id
is not None
483 assert daemon
.hostname
is not None
484 daemon_id
: str = daemon
.daemon_id
485 host
: str = daemon
.hostname
487 assert daemon
.daemon_type
!= 'mon'
489 entity
= self
.get_auth_entity(daemon_id
, host
=host
)
491 logger
.info(f
'Removing key for {entity}')
492 ret
, out
, err
= self
.mgr
.mon_command({
498 class MonService(CephService
):
501 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
503 Create a new monitor on the given host.
505 assert self
.TYPE
== daemon_spec
.daemon_type
506 name
, _
, network
= daemon_spec
.daemon_id
, daemon_spec
.host
, daemon_spec
.network
509 ret
, keyring
, err
= self
.mgr
.check_mon_command({
510 'prefix': 'auth get',
511 'entity': self
.get_auth_entity(name
),
514 extra_config
= '[mon.%s]\n' % name
516 # infer whether this is a CIDR network, addrvec, or plain IP
518 extra_config
+= 'public network = %s\n' % network
519 elif network
.startswith('[v') and network
.endswith(']'):
520 extra_config
+= 'public addrv = %s\n' % network
521 elif is_ipv6(network
):
522 extra_config
+= 'public addr = %s\n' % unwrap_ipv6(network
)
523 elif ':' not in network
:
524 extra_config
+= 'public addr = %s\n' % network
526 raise OrchestratorError(
527 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network
)
529 # try to get the public_network from the config
530 ret
, network
, err
= self
.mgr
.check_mon_command({
531 'prefix': 'config get',
533 'key': 'public_network',
535 network
= network
.strip() if network
else network
537 raise OrchestratorError(
538 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
539 if '/' not in network
:
540 raise OrchestratorError(
541 'public_network is set but does not look like a CIDR network: \'%s\'' % network
)
542 extra_config
+= 'public network = %s\n' % network
544 daemon_spec
.ceph_conf
= extra_config
545 daemon_spec
.keyring
= keyring
547 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
551 def _check_safe_to_destroy(self
, mon_id
: str) -> None:
552 ret
, out
, err
= self
.mgr
.check_mon_command({
553 'prefix': 'quorum_status',
558 raise OrchestratorError('failed to parse quorum status')
560 mons
= [m
['name'] for m
in j
['monmap']['mons']]
561 if mon_id
not in mons
:
562 logger
.info('Safe to remove mon.%s: not in monmap (%s)' % (
565 new_mons
= [m
for m
in mons
if m
!= mon_id
]
566 new_quorum
= [m
for m
in j
['quorum_names'] if m
!= mon_id
]
567 if len(new_quorum
) > len(new_mons
) / 2:
568 logger
.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
569 (mon_id
, new_quorum
, new_mons
))
571 raise OrchestratorError(
572 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id
, new_quorum
, new_mons
))
574 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
575 super().pre_remove(daemon
)
577 assert daemon
.daemon_id
is not None
578 daemon_id
: str = daemon
.daemon_id
579 self
._check
_safe
_to
_destroy
(daemon_id
)
581 # remove mon from quorum before we destroy the daemon
582 logger
.info('Removing monitor %s from monmap...' % daemon_id
)
583 ret
, out
, err
= self
.mgr
.check_mon_command({
588 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
589 # Do not remove the mon keyring.
590 # super().post_remove(daemon)
594 class MgrService(CephService
):
597 def allow_colo(self
) -> bool:
598 if self
.mgr
.get_ceph_option('mgr_standby_modules'):
599 # traditional mgr mode: standby daemons' modules listen on
600 # ports and redirect to the primary. we must not schedule
601 # multiple mgrs on the same host or else ports will
605 # standby daemons do nothing, and therefore port conflicts
609 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
611 Create a new manager instance on a host.
613 assert self
.TYPE
== daemon_spec
.daemon_type
614 mgr_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
617 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mgr_id
),
618 ['mon', 'profile mgr',
622 # Retrieve ports used by manager modules
623 # In the case of the dashboard port and with several manager daemons
624 # running in different hosts, it exists the possibility that the
625 # user has decided to use different dashboard ports in each server
626 # If this is the case then the dashboard port opened will be only the used
629 ret
, mgr_services
, err
= self
.mgr
.check_mon_command({
630 'prefix': 'mgr services',
633 mgr_endpoints
= json
.loads(mgr_services
)
634 for end_point
in mgr_endpoints
.values():
635 port
= re
.search(r
'\:\d+\/', end_point
)
637 ports
.append(int(port
[0][1:-1]))
640 daemon_spec
.ports
= ports
642 daemon_spec
.keyring
= keyring
644 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
648 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
649 for daemon
in daemon_descrs
:
650 assert daemon
.daemon_type
is not None
651 assert daemon
.daemon_id
is not None
652 if self
.mgr
.daemon_is_self(daemon
.daemon_type
, daemon
.daemon_id
):
654 # if no active mgr found, return empty Daemon Desc
655 return DaemonDescription()
657 def fail_over(self
) -> None:
658 if not self
.mgr_map_has_standby():
659 raise OrchestratorError('Need standby mgr daemon', event_kind_subject
=(
660 'daemon', 'mgr' + self
.mgr
.get_mgr_id()))
662 self
.mgr
.events
.for_daemon('mgr' + self
.mgr
.get_mgr_id(),
663 'INFO', 'Failing over to other MGR')
664 logger
.info('Failing over to other MGR')
667 ret
, out
, err
= self
.mgr
.check_mon_command({
668 'prefix': 'mgr fail',
669 'who': self
.mgr
.get_mgr_id(),
672 def mgr_map_has_standby(self
) -> bool:
674 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
675 we know it joined the cluster
677 mgr_map
= self
.mgr
.get('mgr_map')
678 num
= len(mgr_map
.get('standbys'))
683 daemon_ids
: List
[str],
685 known
: Optional
[List
[str]] = None # output argument
686 ) -> HandleCommandResult
:
687 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
689 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'Mgr', 1, True)
691 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
693 mgr_daemons
= self
.mgr
.cache
.get_daemons_by_type(self
.TYPE
)
694 active
= self
.get_active_daemon(mgr_daemons
).daemon_id
695 if active
in daemon_ids
:
696 warn_message
= 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
697 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
699 return HandleCommandResult(0, warn_message
, '')
702 class MdsService(CephService
):
705 def allow_colo(self
) -> bool:
708 def config(self
, spec
: ServiceSpec
) -> None:
709 assert self
.TYPE
== spec
.service_type
710 assert spec
.service_id
712 # ensure mds_join_fs is set for these daemons
713 ret
, out
, err
= self
.mgr
.check_mon_command({
714 'prefix': 'config set',
715 'who': 'mds.' + spec
.service_id
,
716 'name': 'mds_join_fs',
717 'value': spec
.service_id
,
720 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
721 assert self
.TYPE
== daemon_spec
.daemon_type
722 mds_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
725 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mds_id
),
726 ['mon', 'profile mds',
727 'osd', 'allow rw tag cephfs *=*',
729 daemon_spec
.keyring
= keyring
731 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
735 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
736 active_mds_strs
= list()
737 for fs
in self
.mgr
.get('fs_map')['filesystems']:
738 mds_map
= fs
['mdsmap']
739 if mds_map
is not None:
740 for mds_id
, mds_status
in mds_map
['info'].items():
741 if mds_status
['state'] == 'up:active':
742 active_mds_strs
.append(mds_status
['name'])
743 if len(active_mds_strs
) != 0:
744 for daemon
in daemon_descrs
:
745 if daemon
.daemon_id
in active_mds_strs
:
747 # if no mds found, return empty Daemon Desc
748 return DaemonDescription()
750 def purge(self
, service_name
: str) -> None:
751 self
.mgr
.check_mon_command({
752 'prefix': 'config rm',
754 'name': 'mds_join_fs',
758 class RgwService(CephService
):
761 def allow_colo(self
) -> bool:
764 def config(self
, spec
: RGWSpec
) -> None: # type: ignore
765 assert self
.TYPE
== spec
.service_type
767 # set rgw_realm and rgw_zone, if present
769 ret
, out
, err
= self
.mgr
.check_mon_command({
770 'prefix': 'config set',
771 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
773 'value': spec
.rgw_realm
,
776 ret
, out
, err
= self
.mgr
.check_mon_command({
777 'prefix': 'config set',
778 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
780 'value': spec
.rgw_zone
,
783 if spec
.rgw_frontend_ssl_certificate
:
784 if isinstance(spec
.rgw_frontend_ssl_certificate
, list):
785 cert_data
= '\n'.join(spec
.rgw_frontend_ssl_certificate
)
786 elif isinstance(spec
.rgw_frontend_ssl_certificate
, str):
787 cert_data
= spec
.rgw_frontend_ssl_certificate
789 raise OrchestratorError(
790 'Invalid rgw_frontend_ssl_certificate: %s'
791 % spec
.rgw_frontend_ssl_certificate
)
792 ret
, out
, err
= self
.mgr
.check_mon_command({
793 'prefix': 'config-key set',
794 'key': f
'rgw/cert/{spec.service_name()}',
798 # TODO: fail, if we don't have a spec
799 logger
.info('Saving service %s spec with placement %s' % (
800 spec
.service_name(), spec
.placement
.pretty_str()))
801 self
.mgr
.spec_store
.save(spec
)
802 self
.mgr
.trigger_connect_dashboard_rgw()
804 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
805 assert self
.TYPE
== daemon_spec
.daemon_type
806 rgw_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
807 spec
= cast(RGWSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
809 keyring
= self
.get_keyring(rgw_id
)
811 if daemon_spec
.ports
:
812 port
= daemon_spec
.ports
[0]
814 # this is a redeploy of older instance that doesn't have an explicitly
815 # assigned port, in which case we can assume there is only 1 per host
816 # and it matches the spec.
817 port
= spec
.get_port()
821 ftype
= spec
.rgw_frontend_type
or "beast"
826 f
"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
828 args
.append(f
"ssl_port={port}")
829 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
832 args
.append(f
"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
834 args
.append(f
"port={port}")
835 elif ftype
== 'civetweb':
838 # note the 's' suffix on port
839 args
.append(f
"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
841 args
.append(f
"port={port}s") # note the 's' suffix on port
842 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
845 args
.append(f
"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
847 args
.append(f
"port={port}")
848 frontend
= f
'{ftype} {" ".join(args)}'
850 ret
, out
, err
= self
.mgr
.check_mon_command({
851 'prefix': 'config set',
852 'who': utils
.name_to_config_section(daemon_spec
.name()),
853 'name': 'rgw_frontends',
857 daemon_spec
.keyring
= keyring
858 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
862 def get_keyring(self
, rgw_id
: str) -> str:
863 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(rgw_id
),
866 'osd', 'allow rwx tag rgw *=*'])
869 def purge(self
, service_name
: str) -> None:
870 self
.mgr
.check_mon_command({
871 'prefix': 'config rm',
872 'who': utils
.name_to_config_section(service_name
),
875 self
.mgr
.check_mon_command({
876 'prefix': 'config rm',
877 'who': utils
.name_to_config_section(service_name
),
880 self
.mgr
.check_mon_command({
881 'prefix': 'config-key rm',
882 'key': f
'rgw/cert/{service_name}',
884 self
.mgr
.trigger_connect_dashboard_rgw()
886 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
887 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
888 self
.mgr
.check_mon_command({
889 'prefix': 'config rm',
890 'who': utils
.name_to_config_section(daemon
.name()),
891 'name': 'rgw_frontends',
896 daemon_ids
: List
[str],
898 known
: Optional
[List
[str]] = None # output argument
899 ) -> HandleCommandResult
:
900 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
901 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
902 def ingress_present() -> bool:
903 running_ingress_daemons
= [
904 daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type('ingress') if daemon
.status
== 1]
905 running_haproxy_daemons
= [
906 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'haproxy']
907 running_keepalived_daemons
= [
908 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'keepalived']
909 # check that there is at least one haproxy and keepalived daemon running
910 if running_haproxy_daemons
and running_keepalived_daemons
:
914 # if only 1 rgw, alert user (this is not passable with --force)
915 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'RGW', 1, True)
917 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
919 # if reached here, there is > 1 rgw daemon.
920 # Say okay if load balancer present or force flag set
921 if ingress_present() or force
:
922 return HandleCommandResult(0, warn_message
, '')
924 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
926 warn_message
= "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
927 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
929 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
930 self
.mgr
.trigger_connect_dashboard_rgw()
933 class RbdMirrorService(CephService
):
936 def allow_colo(self
) -> bool:
939 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
940 assert self
.TYPE
== daemon_spec
.daemon_type
941 daemon_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
943 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
),
944 ['mon', 'profile rbd-mirror',
945 'osd', 'profile rbd'])
947 daemon_spec
.keyring
= keyring
949 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
955 daemon_ids
: List
[str],
957 known
: Optional
[List
[str]] = None # output argument
958 ) -> HandleCommandResult
:
959 # if only 1 rbd-mirror, alert user (this is not passable with --force)
960 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(
961 self
.TYPE
, daemon_ids
, 'Rbdmirror', 1, True)
963 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
964 return HandleCommandResult(0, warn_message
, '')
967 class CrashService(CephService
):
970 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
971 assert self
.TYPE
== daemon_spec
.daemon_type
972 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
974 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
),
975 ['mon', 'profile crash',
976 'mgr', 'profile crash'])
978 daemon_spec
.keyring
= keyring
980 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
985 class CephfsMirrorService(CephService
):
986 TYPE
= 'cephfs-mirror'
988 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
989 assert self
.TYPE
== daemon_spec
.daemon_type
991 ret
, keyring
, err
= self
.mgr
.check_mon_command({
992 'prefix': 'auth get-or-create',
993 'entity': self
.get_auth_entity(daemon_spec
.daemon_id
),
994 'caps': ['mon', 'profile cephfs-mirror',
996 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
1000 daemon_spec
.keyring
= keyring
1001 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)