7 from abc
import ABCMeta
, abstractmethod
8 from typing
import TYPE_CHECKING
, List
, Callable
, TypeVar
, \
9 Optional
, Dict
, Any
, Tuple
, NewType
, cast
11 from mgr_module
import HandleCommandResult
, MonCommandFailed
13 from ceph
.deployment
.service_spec
import ServiceSpec
, RGWSpec
14 from ceph
.deployment
.utils
import is_ipv6
, unwrap_ipv6
15 from mgr_util
import build_url
16 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
17 from orchestrator
._interface
import daemon_type_to_service
18 from cephadm
import utils
21 from cephadm
.module
import CephadmOrchestrator
23 logger
= logging
.getLogger(__name__
)
25 ServiceSpecs
= TypeVar('ServiceSpecs', bound
=ServiceSpec
)
26 AuthEntity
= NewType('AuthEntity', str)
29 class CephadmDaemonDeploySpec
:
30 # typing.NamedTuple + Generic is broken in py36
31 def __init__(self
, host
: str, daemon_id
: str,
33 network
: Optional
[str] = None,
34 keyring
: Optional
[str] = None,
35 extra_args
: Optional
[List
[str]] = None,
37 extra_files
: Optional
[Dict
[str, Any
]] = None,
38 daemon_type
: Optional
[str] = None,
39 ip
: Optional
[str] = None,
40 ports
: Optional
[List
[int]] = None,
41 rank
: Optional
[int] = None,
42 rank_generation
: Optional
[int] = None,
43 extra_container_args
: Optional
[List
[str]] = None):
45 A data struction to encapsulate `cephadm deploy ...
48 self
.daemon_id
= daemon_id
49 self
.service_name
= service_name
50 daemon_type
= daemon_type
or (service_name
.split('.')[0])
51 assert daemon_type
is not None
52 self
.daemon_type
: str = daemon_type
55 self
.network
= network
58 self
.keyring
: Optional
[str] = keyring
60 # For run_cephadm. Would be great to have more expressive names.
61 self
.extra_args
: List
[str] = extra_args
or []
63 self
.ceph_conf
= ceph_conf
64 self
.extra_files
= extra_files
or {}
66 # TCP ports used by the daemon
67 self
.ports
: List
[int] = ports
or []
68 self
.ip
: Optional
[str] = ip
70 # values to be populated during generate_config calls
71 # and then used in _run_cephadm
72 self
.final_config
: Dict
[str, Any
] = {}
73 self
.deps
: List
[str] = []
75 self
.rank
: Optional
[int] = rank
76 self
.rank_generation
: Optional
[int] = rank_generation
78 self
.extra_container_args
= extra_container_args
80 def name(self
) -> str:
81 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
83 def config_get_files(self
) -> Dict
[str, Any
]:
84 files
= self
.extra_files
86 files
['config'] = self
.ceph_conf
91 def from_daemon_description(dd
: DaemonDescription
) -> 'CephadmDaemonDeploySpec':
95 return CephadmDaemonDeploySpec(
97 daemon_id
=dd
.daemon_id
,
98 daemon_type
=dd
.daemon_type
,
99 service_name
=dd
.service_name(),
103 rank_generation
=dd
.rank_generation
,
104 extra_container_args
=dd
.extra_container_args
,
107 def to_daemon_description(self
, status
: DaemonDescriptionStatus
, status_desc
: str) -> DaemonDescription
:
108 return DaemonDescription(
109 daemon_type
=self
.daemon_type
,
110 daemon_id
=self
.daemon_id
,
111 service_name
=self
.service_name
,
114 status_desc
=status_desc
,
118 rank_generation
=self
.rank_generation
,
119 extra_container_args
=self
.extra_container_args
,
123 class CephadmService(metaclass
=ABCMeta
):
125 Base class for service types. Often providing a create() and config() fn.
130 def TYPE(self
) -> str:
133 def __init__(self
, mgr
: "CephadmOrchestrator"):
134 self
.mgr
: "CephadmOrchestrator" = mgr
136 def allow_colo(self
) -> bool:
138 Return True if multiple daemons of the same type can colocate on
143 def primary_daemon_type(self
) -> str:
145 This is the type of the primary (usually only) daemon to be deployed.
149 def per_host_daemon_type(self
) -> Optional
[str]:
151 If defined, this type of daemon will be deployed once for each host
152 containing one or more daemons of the primary type.
156 def ranked(self
) -> bool:
158 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
159 generation (0, 1, ...) to each daemon we create/deploy.
163 def fence_old_ranks(self
,
165 rank_map
: Dict
[int, Dict
[int, Optional
[str]]],
166 num_ranks
: int) -> None:
169 def make_daemon_spec(
175 daemon_type
: Optional
[str] = None,
176 ports
: Optional
[List
[int]] = None,
177 ip
: Optional
[str] = None,
178 rank
: Optional
[int] = None,
179 rank_generation
: Optional
[int] = None,
180 ) -> CephadmDaemonDeploySpec
:
182 eca
= spec
.extra_container_args
183 except AttributeError:
185 return CephadmDaemonDeploySpec(
188 service_name
=spec
.service_name(),
190 daemon_type
=daemon_type
,
194 rank_generation
=rank_generation
,
195 extra_container_args
=eca
,
198 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
199 raise NotImplementedError()
201 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
202 raise NotImplementedError()
204 def config(self
, spec
: ServiceSpec
) -> None:
206 Configure the cluster for this service. Only called *once* per
207 service apply. Not for every daemon.
211 def daemon_check_post(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
212 """The post actions needed to be done after daemons are checked"""
213 if self
.mgr
.config_dashboard
:
214 if 'dashboard' in self
.mgr
.get('mgr_map')['modules']:
215 self
.config_dashboard(daemon_descrs
)
217 logger
.debug('Dashboard is not enabled. Skip configuration.')
219 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
220 """Config dashboard settings."""
221 raise NotImplementedError()
223 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
224 # if this is called for a service type where it hasn't explcitly been
225 # defined, return empty Daemon Desc
226 return DaemonDescription()
228 def get_keyring_with_caps(self
, entity
: AuthEntity
, caps
: List
[str]) -> str:
229 ret
, keyring
, err
= self
.mgr
.mon_command({
230 'prefix': 'auth get-or-create',
235 ret
, out
, err
= self
.mgr
.mon_command({
236 'prefix': 'auth caps',
241 self
.mgr
.log
.warning(f
"Unable to update caps for {entity}")
244 def _inventory_get_fqdn(self
, hostname
: str) -> str:
245 """Get a host's FQDN with its hostname.
247 If the FQDN can't be resolved, the address from the inventory will
250 addr
= self
.mgr
.inventory
.get_addr(hostname
)
251 return socket
.getfqdn(addr
)
253 def _set_service_url_on_dashboard(self
,
257 service_url
: str) -> None:
258 """A helper to get and set service_url via Dashboard's MON command.
260 If result of get_mon_cmd differs from service_url, set_mon_cmd will
261 be sent to set the service_url.
263 def get_set_cmd_dicts(out
: str) -> List
[dict]:
265 'prefix': set_mon_cmd
,
268 return [cmd_dict
] if service_url
!= out
else []
270 self
._check
_and
_set
_dashboard
(
271 service_name
=service_name
,
273 get_set_cmd_dicts
=get_set_cmd_dicts
276 def _check_and_set_dashboard(self
,
279 get_set_cmd_dicts
: Callable
[[str], List
[dict]]) -> None:
280 """A helper to set configs in the Dashboard.
282 The method is useful for the pattern:
283 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
285 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
286 - Determine if the config need to be update. NOTE: This step is important because if a
287 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
288 kicks the serve() loop and the logic using this method is likely to be called again.
289 A config should be updated only when needed.
290 - Update a config in Dashboard by using a Dashboard command.
292 :param service_name: the service name to be used for logging
293 :type service_name: str
294 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
296 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
300 'prefix': 'dashboard iscsi-gateway-add',
301 'service_url': 'http://admin:admin@aaa:5000',
305 'prefix': 'dashboard iscsi-gateway-add',
306 'service_url': 'http://admin:admin@bbb:5000',
310 The function should return empty list if no command need to be sent.
311 :type get_set_cmd_dicts: Callable[[str], List[dict]]
315 _
, out
, _
= self
.mgr
.check_mon_command({
318 except MonCommandFailed
as e
:
319 logger
.warning('Failed to get Dashboard config for %s: %s', service_name
, e
)
321 cmd_dicts
= get_set_cmd_dicts(out
.strip())
322 for cmd_dict
in list(cmd_dicts
):
324 inbuf
= cmd_dict
.pop('inbuf', None)
325 _
, out
, _
= self
.mgr
.check_mon_command(cmd_dict
, inbuf
)
326 except MonCommandFailed
as e
:
327 logger
.warning('Failed to set Dashboard config for %s: %s', service_name
, e
)
332 known
: Optional
[List
[str]] = None, # output argument
333 force
: bool = False) -> HandleCommandResult
:
334 r
= HandleCommandResult(*self
.mgr
.mon_command({
335 'prefix': "osd ok-to-stop",
341 j
= json
.loads(r
.stdout
)
342 except json
.decoder
.JSONDecodeError
:
343 self
.mgr
.log
.warning("osd ok-to-stop didn't return structured result")
347 if known
is not None and j
and j
.get('ok_to_stop'):
348 self
.mgr
.log
.debug(f
"got {j}")
349 known
.extend([f
'osd.{x}' for x
in j
.get('osds', [])])
350 return HandleCommandResult(
352 f
'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
358 daemon_ids
: List
[str],
360 known
: Optional
[List
[str]] = None # output argument
361 ) -> HandleCommandResult
:
362 names
= [f
'{self.TYPE}.{d_id}' for d_id
in daemon_ids
]
363 out
= f
'It appears safe to stop {",".join(names)}'
364 err
= f
'It is NOT safe to stop {",".join(names)} at this time'
366 if self
.TYPE
not in ['mon', 'osd', 'mds']:
368 return HandleCommandResult(0, out
)
370 if self
.TYPE
== 'osd':
371 return self
.ok_to_stop_osd(daemon_ids
, known
, force
)
373 r
= HandleCommandResult(*self
.mgr
.mon_command({
374 'prefix': f
'{self.TYPE} ok-to-stop',
379 err
= f
'{err}: {r.stderr}' if r
.stderr
else err
381 return HandleCommandResult(r
.retval
, r
.stdout
, err
)
383 out
= f
'{out}: {r.stdout}' if r
.stdout
else out
385 return HandleCommandResult(r
.retval
, out
, r
.stderr
)
387 def _enough_daemons_to_stop(self
, daemon_type
: str, daemon_ids
: List
[str], service
: str, low_limit
: int, alert
: bool = False) -> Tuple
[bool, str]:
388 # Provides a warning about if it possible or not to stop <n> daemons in a service
389 names
= [f
'{daemon_type}.{d_id}' for d_id
in daemon_ids
]
390 number_of_running_daemons
= len(
392 for daemon
in self
.mgr
.cache
.get_daemons_by_type(daemon_type
)
393 if daemon
.status
== DaemonDescriptionStatus
.running
])
394 if (number_of_running_daemons
- len(daemon_ids
)) >= low_limit
:
395 return False, f
'It is presumed safe to stop {names}'
397 num_daemons_left
= number_of_running_daemons
- len(daemon_ids
)
399 def plural(count
: int) -> str:
400 return 'daemon' if count
== 1 else 'daemons'
402 left_count
= "no" if num_daemons_left
== 0 else num_daemons_left
405 out
= (f
'ALERT: Cannot stop {names} in {service} service. '
406 f
'Not enough remaining {service} daemons. '
407 f
'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
409 out
= (f
'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
410 f
'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
411 f
'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
414 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
416 Called before the daemon is removed.
418 assert daemon
.daemon_type
is not None
419 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
420 logger
.debug(f
'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
422 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
424 Called after the daemon is removed.
426 assert daemon
.daemon_type
is not None
427 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
428 logger
.debug(f
'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
430 def purge(self
, service_name
: str) -> None:
431 """Called to carry out any purge tasks following service removal"""
432 logger
.debug(f
'Purge called for {self.TYPE} - no action taken')
435 class CephService(CephadmService
):
436 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
437 # Ceph.daemons (mon, mgr, mds, osd, etc)
438 cephadm_config
= self
.get_config_and_keyring(
439 daemon_spec
.daemon_type
,
440 daemon_spec
.daemon_id
,
441 host
=daemon_spec
.host
,
442 keyring
=daemon_spec
.keyring
,
443 extra_ceph_config
=daemon_spec
.ceph_conf
)
445 if daemon_spec
.config_get_files():
446 cephadm_config
.update({'files': daemon_spec
.config_get_files()})
448 return cephadm_config
, []
450 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
451 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
452 self
.remove_keyring(daemon
)
454 def get_auth_entity(self
, daemon_id
: str, host
: str = "") -> AuthEntity
:
456 Map the daemon id to a cephx keyring entity name
458 # despite this mapping entity names to daemons, self.TYPE within
459 # the CephService class refers to service types, not daemon types
460 if self
.TYPE
in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
461 return AuthEntity(f
'client.{self.TYPE}.{daemon_id}')
462 elif self
.TYPE
in ['crash', 'agent']:
464 raise OrchestratorError(
465 f
'Host not provided to generate <{self.TYPE}> auth entity name')
466 return AuthEntity(f
'client.{self.TYPE}.{host}')
467 elif self
.TYPE
== 'mon':
468 return AuthEntity('mon.')
469 elif self
.TYPE
in ['mgr', 'osd', 'mds']:
470 return AuthEntity(f
'{self.TYPE}.{daemon_id}')
472 raise OrchestratorError("unknown daemon type")
474 def get_config_and_keyring(self
,
478 keyring
: Optional
[str] = None,
479 extra_ceph_config
: Optional
[str] = None
483 entity
: AuthEntity
= self
.get_auth_entity(daemon_id
, host
=host
)
484 ret
, keyring
, err
= self
.mgr
.check_mon_command({
485 'prefix': 'auth get',
489 config
= self
.mgr
.get_minimal_ceph_conf()
491 if extra_ceph_config
:
492 config
+= extra_ceph_config
499 def remove_keyring(self
, daemon
: DaemonDescription
) -> None:
500 assert daemon
.daemon_id
is not None
501 assert daemon
.hostname
is not None
502 daemon_id
: str = daemon
.daemon_id
503 host
: str = daemon
.hostname
505 assert daemon
.daemon_type
!= 'mon'
507 entity
= self
.get_auth_entity(daemon_id
, host
=host
)
509 logger
.info(f
'Removing key for {entity}')
510 ret
, out
, err
= self
.mgr
.mon_command({
516 class MonService(CephService
):
519 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
521 Create a new monitor on the given host.
523 assert self
.TYPE
== daemon_spec
.daemon_type
524 name
, _
, network
= daemon_spec
.daemon_id
, daemon_spec
.host
, daemon_spec
.network
527 ret
, keyring
, err
= self
.mgr
.check_mon_command({
528 'prefix': 'auth get',
529 'entity': self
.get_auth_entity(name
),
532 extra_config
= '[mon.%s]\n' % name
534 # infer whether this is a CIDR network, addrvec, or plain IP
536 extra_config
+= 'public network = %s\n' % network
537 elif network
.startswith('[v') and network
.endswith(']'):
538 extra_config
+= 'public addrv = %s\n' % network
539 elif is_ipv6(network
):
540 extra_config
+= 'public addr = %s\n' % unwrap_ipv6(network
)
541 elif ':' not in network
:
542 extra_config
+= 'public addr = %s\n' % network
544 raise OrchestratorError(
545 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network
)
547 # try to get the public_network from the config
548 ret
, network
, err
= self
.mgr
.check_mon_command({
549 'prefix': 'config get',
551 'key': 'public_network',
553 network
= network
.strip() if network
else network
555 raise OrchestratorError(
556 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
557 if '/' not in network
:
558 raise OrchestratorError(
559 'public_network is set but does not look like a CIDR network: \'%s\'' % network
)
560 extra_config
+= 'public network = %s\n' % network
562 daemon_spec
.ceph_conf
= extra_config
563 daemon_spec
.keyring
= keyring
565 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
569 def _check_safe_to_destroy(self
, mon_id
: str) -> None:
570 ret
, out
, err
= self
.mgr
.check_mon_command({
571 'prefix': 'quorum_status',
576 raise OrchestratorError('failed to parse quorum status')
578 mons
= [m
['name'] for m
in j
['monmap']['mons']]
579 if mon_id
not in mons
:
580 logger
.info('Safe to remove mon.%s: not in monmap (%s)' % (
583 new_mons
= [m
for m
in mons
if m
!= mon_id
]
584 new_quorum
= [m
for m
in j
['quorum_names'] if m
!= mon_id
]
585 if len(new_quorum
) > len(new_mons
) / 2:
586 logger
.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
587 (mon_id
, new_quorum
, new_mons
))
589 raise OrchestratorError(
590 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id
, new_quorum
, new_mons
))
592 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
593 super().pre_remove(daemon
)
595 assert daemon
.daemon_id
is not None
596 daemon_id
: str = daemon
.daemon_id
597 self
._check
_safe
_to
_destroy
(daemon_id
)
599 # remove mon from quorum before we destroy the daemon
600 logger
.info('Removing monitor %s from monmap...' % daemon_id
)
601 ret
, out
, err
= self
.mgr
.check_mon_command({
606 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
607 # Do not remove the mon keyring.
608 # super().post_remove(daemon)
612 class MgrService(CephService
):
615 def allow_colo(self
) -> bool:
616 if self
.mgr
.get_ceph_option('mgr_standby_modules'):
617 # traditional mgr mode: standby daemons' modules listen on
618 # ports and redirect to the primary. we must not schedule
619 # multiple mgrs on the same host or else ports will
623 # standby daemons do nothing, and therefore port conflicts
627 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
629 Create a new manager instance on a host.
631 assert self
.TYPE
== daemon_spec
.daemon_type
632 mgr_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
635 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mgr_id
),
636 ['mon', 'profile mgr',
640 # Retrieve ports used by manager modules
641 # In the case of the dashboard port and with several manager daemons
642 # running in different hosts, it exists the possibility that the
643 # user has decided to use different dashboard ports in each server
644 # If this is the case then the dashboard port opened will be only the used
647 ret
, mgr_services
, err
= self
.mgr
.check_mon_command({
648 'prefix': 'mgr services',
651 mgr_endpoints
= json
.loads(mgr_services
)
652 for end_point
in mgr_endpoints
.values():
653 port
= re
.search(r
'\:\d+\/', end_point
)
655 ports
.append(int(port
[0][1:-1]))
658 daemon_spec
.ports
= ports
660 daemon_spec
.keyring
= keyring
662 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
666 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
667 for daemon
in daemon_descrs
:
668 assert daemon
.daemon_type
is not None
669 assert daemon
.daemon_id
is not None
670 if self
.mgr
.daemon_is_self(daemon
.daemon_type
, daemon
.daemon_id
):
672 # if no active mgr found, return empty Daemon Desc
673 return DaemonDescription()
675 def fail_over(self
) -> None:
676 # this has been seen to sometimes transiently fail even when there are multiple
677 # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
678 class NoStandbyError(OrchestratorError
):
680 no_standby_exc
= NoStandbyError('Need standby mgr daemon', event_kind_subject
=(
681 'daemon', 'mgr' + self
.mgr
.get_mgr_id()))
682 for sleep_secs
in [2, 8, 15]:
684 if not self
.mgr_map_has_standby():
686 self
.mgr
.events
.for_daemon('mgr' + self
.mgr
.get_mgr_id(),
687 'INFO', 'Failing over to other MGR')
688 logger
.info('Failing over to other MGR')
691 ret
, out
, err
= self
.mgr
.check_mon_command({
692 'prefix': 'mgr fail',
693 'who': self
.mgr
.get_mgr_id(),
696 except NoStandbyError
:
698 f
'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
699 time
.sleep(sleep_secs
)
702 def mgr_map_has_standby(self
) -> bool:
704 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
705 we know it joined the cluster
707 mgr_map
= self
.mgr
.get('mgr_map')
708 num
= len(mgr_map
.get('standbys'))
713 daemon_ids
: List
[str],
715 known
: Optional
[List
[str]] = None # output argument
716 ) -> HandleCommandResult
:
717 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
719 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'Mgr', 1, True)
721 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
723 mgr_daemons
= self
.mgr
.cache
.get_daemons_by_type(self
.TYPE
)
724 active
= self
.get_active_daemon(mgr_daemons
).daemon_id
725 if active
in daemon_ids
:
726 warn_message
= 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
727 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
729 return HandleCommandResult(0, warn_message
, '')
732 class MdsService(CephService
):
735 def allow_colo(self
) -> bool:
738 def config(self
, spec
: ServiceSpec
) -> None:
739 assert self
.TYPE
== spec
.service_type
740 assert spec
.service_id
742 # ensure mds_join_fs is set for these daemons
743 ret
, out
, err
= self
.mgr
.check_mon_command({
744 'prefix': 'config set',
745 'who': 'mds.' + spec
.service_id
,
746 'name': 'mds_join_fs',
747 'value': spec
.service_id
,
750 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
751 assert self
.TYPE
== daemon_spec
.daemon_type
752 mds_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
755 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mds_id
),
756 ['mon', 'profile mds',
757 'osd', 'allow rw tag cephfs *=*',
759 daemon_spec
.keyring
= keyring
761 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
765 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
766 active_mds_strs
= list()
767 for fs
in self
.mgr
.get('fs_map')['filesystems']:
768 mds_map
= fs
['mdsmap']
769 if mds_map
is not None:
770 for mds_id
, mds_status
in mds_map
['info'].items():
771 if mds_status
['state'] == 'up:active':
772 active_mds_strs
.append(mds_status
['name'])
773 if len(active_mds_strs
) != 0:
774 for daemon
in daemon_descrs
:
775 if daemon
.daemon_id
in active_mds_strs
:
777 # if no mds found, return empty Daemon Desc
778 return DaemonDescription()
780 def purge(self
, service_name
: str) -> None:
781 self
.mgr
.check_mon_command({
782 'prefix': 'config rm',
784 'name': 'mds_join_fs',
788 class RgwService(CephService
):
791 def allow_colo(self
) -> bool:
794 def config(self
, spec
: RGWSpec
) -> None: # type: ignore
795 assert self
.TYPE
== spec
.service_type
797 # set rgw_realm and rgw_zone, if present
799 ret
, out
, err
= self
.mgr
.check_mon_command({
800 'prefix': 'config set',
801 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
803 'value': spec
.rgw_realm
,
806 ret
, out
, err
= self
.mgr
.check_mon_command({
807 'prefix': 'config set',
808 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
810 'value': spec
.rgw_zone
,
813 if spec
.rgw_frontend_ssl_certificate
:
814 if isinstance(spec
.rgw_frontend_ssl_certificate
, list):
815 cert_data
= '\n'.join(spec
.rgw_frontend_ssl_certificate
)
816 elif isinstance(spec
.rgw_frontend_ssl_certificate
, str):
817 cert_data
= spec
.rgw_frontend_ssl_certificate
819 raise OrchestratorError(
820 'Invalid rgw_frontend_ssl_certificate: %s'
821 % spec
.rgw_frontend_ssl_certificate
)
822 ret
, out
, err
= self
.mgr
.check_mon_command({
823 'prefix': 'config-key set',
824 'key': f
'rgw/cert/{spec.service_name()}',
828 # TODO: fail, if we don't have a spec
829 logger
.info('Saving service %s spec with placement %s' % (
830 spec
.service_name(), spec
.placement
.pretty_str()))
831 self
.mgr
.spec_store
.save(spec
)
832 self
.mgr
.trigger_connect_dashboard_rgw()
834 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
835 assert self
.TYPE
== daemon_spec
.daemon_type
836 rgw_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
837 spec
= cast(RGWSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
839 keyring
= self
.get_keyring(rgw_id
)
841 if daemon_spec
.ports
:
842 port
= daemon_spec
.ports
[0]
844 # this is a redeploy of older instance that doesn't have an explicitly
845 # assigned port, in which case we can assume there is only 1 per host
846 # and it matches the spec.
847 port
= spec
.get_port()
851 ftype
= spec
.rgw_frontend_type
or "beast"
856 f
"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
858 args
.append(f
"ssl_port={port}")
859 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
862 args
.append(f
"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
864 args
.append(f
"port={port}")
865 elif ftype
== 'civetweb':
868 # note the 's' suffix on port
869 args
.append(f
"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
871 args
.append(f
"port={port}s") # note the 's' suffix on port
872 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
875 args
.append(f
"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
877 args
.append(f
"port={port}")
878 frontend
= f
'{ftype} {" ".join(args)}'
880 ret
, out
, err
= self
.mgr
.check_mon_command({
881 'prefix': 'config set',
882 'who': utils
.name_to_config_section(daemon_spec
.name()),
883 'name': 'rgw_frontends',
887 daemon_spec
.keyring
= keyring
888 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
892 def get_keyring(self
, rgw_id
: str) -> str:
893 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(rgw_id
),
896 'osd', 'allow rwx tag rgw *=*'])
899 def purge(self
, service_name
: str) -> None:
900 self
.mgr
.check_mon_command({
901 'prefix': 'config rm',
902 'who': utils
.name_to_config_section(service_name
),
905 self
.mgr
.check_mon_command({
906 'prefix': 'config rm',
907 'who': utils
.name_to_config_section(service_name
),
910 self
.mgr
.check_mon_command({
911 'prefix': 'config-key rm',
912 'key': f
'rgw/cert/{service_name}',
914 self
.mgr
.trigger_connect_dashboard_rgw()
916 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
917 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
918 self
.mgr
.check_mon_command({
919 'prefix': 'config rm',
920 'who': utils
.name_to_config_section(daemon
.name()),
921 'name': 'rgw_frontends',
926 daemon_ids
: List
[str],
928 known
: Optional
[List
[str]] = None # output argument
929 ) -> HandleCommandResult
:
930 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
931 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
932 def ingress_present() -> bool:
933 running_ingress_daemons
= [
934 daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type('ingress') if daemon
.status
== 1]
935 running_haproxy_daemons
= [
936 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'haproxy']
937 running_keepalived_daemons
= [
938 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'keepalived']
939 # check that there is at least one haproxy and keepalived daemon running
940 if running_haproxy_daemons
and running_keepalived_daemons
:
944 # if only 1 rgw, alert user (this is not passable with --force)
945 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'RGW', 1, True)
947 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
949 # if reached here, there is > 1 rgw daemon.
950 # Say okay if load balancer present or force flag set
951 if ingress_present() or force
:
952 return HandleCommandResult(0, warn_message
, '')
954 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
956 warn_message
= "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
957 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
959 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
960 self
.mgr
.trigger_connect_dashboard_rgw()
963 class RbdMirrorService(CephService
):
966 def allow_colo(self
) -> bool:
969 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
970 assert self
.TYPE
== daemon_spec
.daemon_type
971 daemon_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
973 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
),
974 ['mon', 'profile rbd-mirror',
975 'osd', 'profile rbd'])
977 daemon_spec
.keyring
= keyring
979 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
985 daemon_ids
: List
[str],
987 known
: Optional
[List
[str]] = None # output argument
988 ) -> HandleCommandResult
:
989 # if only 1 rbd-mirror, alert user (this is not passable with --force)
990 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(
991 self
.TYPE
, daemon_ids
, 'Rbdmirror', 1, True)
993 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
994 return HandleCommandResult(0, warn_message
, '')
997 class CrashService(CephService
):
1000 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1001 assert self
.TYPE
== daemon_spec
.daemon_type
1002 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
1004 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
),
1005 ['mon', 'profile crash',
1006 'mgr', 'profile crash'])
1008 daemon_spec
.keyring
= keyring
1010 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1015 class CephfsMirrorService(CephService
):
1016 TYPE
= 'cephfs-mirror'
1018 def config(self
, spec
: ServiceSpec
) -> None:
1019 # make sure mirroring module is enabled
1020 mgr_map
= self
.mgr
.get('mgr_map')
1021 mod_name
= 'mirroring'
1022 if mod_name
not in mgr_map
.get('services', {}):
1023 self
.mgr
.check_mon_command({
1024 'prefix': 'mgr module enable',
1027 # we shouldn't get here (mon will tell the mgr to respawn), but no
1028 # harm done if we do.
1030 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1031 assert self
.TYPE
== daemon_spec
.daemon_type
1033 ret
, keyring
, err
= self
.mgr
.check_mon_command({
1034 'prefix': 'auth get-or-create',
1035 'entity': self
.get_auth_entity(daemon_spec
.daemon_id
),
1036 'caps': ['mon', 'profile cephfs-mirror',
1038 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
1042 daemon_spec
.keyring
= keyring
1043 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1047 class CephadmAgent(CephService
):
1050 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1051 assert self
.TYPE
== daemon_spec
.daemon_type
1052 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
1054 if not self
.mgr
.cherrypy_thread
:
1055 raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
1057 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
), [])
1058 daemon_spec
.keyring
= keyring
1059 self
.mgr
.agent_cache
.agent_keys
[host
] = keyring
1061 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1065 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
1067 assert self
.mgr
.cherrypy_thread
1068 assert self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert()
1069 assert self
.mgr
.cherrypy_thread
.server_port
1071 raise OrchestratorError(
1072 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
1074 cfg
= {'target_ip': self
.mgr
.get_mgr_ip(),
1075 'target_port': self
.mgr
.cherrypy_thread
.server_port
,
1076 'refresh_period': self
.mgr
.agent_refresh_rate
,
1077 'listener_port': self
.mgr
.agent_starting_port
,
1078 'host': daemon_spec
.host
,
1079 'device_enhanced_scan': str(self
.mgr
.device_enhanced_scan
)}
1081 listener_cert
, listener_key
= self
.mgr
.cherrypy_thread
.ssl_certs
.generate_cert(
1082 self
.mgr
.inventory
.get_addr(daemon_spec
.host
))
1084 'agent.json': json
.dumps(cfg
),
1085 'keyring': daemon_spec
.keyring
,
1086 'root_cert.pem': self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert(),
1087 'listener.crt': listener_cert
,
1088 'listener.key': listener_key
,
1091 return config
, sorted([str(self
.mgr
.get_mgr_ip()), str(self
.mgr
.cherrypy_thread
.server_port
),
1092 self
.mgr
.cherrypy_thread
.ssl_certs
.get_root_cert(),
1093 str(self
.mgr
.get_module_option('device_enhanced_scan'))])