7 from abc
import ABCMeta
, abstractmethod
8 from typing
import TYPE_CHECKING
, List
, Callable
, TypeVar
, \
9 Optional
, Dict
, Any
, Tuple
, NewType
, cast
11 from mgr_module
import HandleCommandResult
, MonCommandFailed
13 from ceph
.deployment
.service_spec
import ServiceSpec
, RGWSpec
, CephExporterSpec
, MONSpec
14 from ceph
.deployment
.utils
import is_ipv6
, unwrap_ipv6
15 from mgr_util
import build_url
, merge_dicts
16 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
17 from orchestrator
._interface
import daemon_type_to_service
18 from cephadm
import utils
21 from cephadm
.module
import CephadmOrchestrator
23 logger
= logging
.getLogger(__name__
)
25 ServiceSpecs
= TypeVar('ServiceSpecs', bound
=ServiceSpec
)
26 AuthEntity
= NewType('AuthEntity', str)
29 def get_auth_entity(daemon_type
: str, daemon_id
: str, host
: str = "") -> AuthEntity
:
31 Map the daemon id to a cephx keyring entity name
33 # despite this mapping entity names to daemons, self.TYPE within
34 # the CephService class refers to service types, not daemon types
35 if daemon_type
in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress', 'ceph-exporter']:
36 return AuthEntity(f
'client.{daemon_type}.{daemon_id}')
37 elif daemon_type
in ['crash', 'agent']:
39 raise OrchestratorError(
40 f
'Host not provided to generate <{daemon_type}> auth entity name')
41 return AuthEntity(f
'client.{daemon_type}.{host}')
42 elif daemon_type
== 'mon':
43 return AuthEntity('mon.')
44 elif daemon_type
in ['mgr', 'osd', 'mds']:
45 return AuthEntity(f
'{daemon_type}.{daemon_id}')
47 raise OrchestratorError(f
"unknown daemon type {daemon_type}")
50 class CephadmDaemonDeploySpec
:
51 # typing.NamedTuple + Generic is broken in py36
52 def __init__(self
, host
: str, daemon_id
: str,
54 network
: Optional
[str] = None,
55 keyring
: Optional
[str] = None,
56 extra_args
: Optional
[List
[str]] = None,
58 extra_files
: Optional
[Dict
[str, Any
]] = None,
59 daemon_type
: Optional
[str] = None,
60 ip
: Optional
[str] = None,
61 ports
: Optional
[List
[int]] = None,
62 rank
: Optional
[int] = None,
63 rank_generation
: Optional
[int] = None,
64 extra_container_args
: Optional
[List
[str]] = None,
65 extra_entrypoint_args
: Optional
[List
[str]] = None,
68 A data struction to encapsulate `cephadm deploy ...
71 self
.daemon_id
= daemon_id
72 self
.service_name
= service_name
73 daemon_type
= daemon_type
or (service_name
.split('.')[0])
74 assert daemon_type
is not None
75 self
.daemon_type
: str = daemon_type
78 self
.network
= network
81 self
.keyring
: Optional
[str] = keyring
83 # For run_cephadm. Would be great to have more expressive names.
84 self
.extra_args
: List
[str] = extra_args
or []
86 self
.ceph_conf
= ceph_conf
87 self
.extra_files
= extra_files
or {}
89 # TCP ports used by the daemon
90 self
.ports
: List
[int] = ports
or []
91 self
.ip
: Optional
[str] = ip
93 # values to be populated during generate_config calls
94 # and then used in _run_cephadm
95 self
.final_config
: Dict
[str, Any
] = {}
96 self
.deps
: List
[str] = []
98 self
.rank
: Optional
[int] = rank
99 self
.rank_generation
: Optional
[int] = rank_generation
101 self
.extra_container_args
= extra_container_args
102 self
.extra_entrypoint_args
= extra_entrypoint_args
104 def name(self
) -> str:
105 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
107 def entity_name(self
) -> str:
108 return get_auth_entity(self
.daemon_type
, self
.daemon_id
, host
=self
.host
)
110 def config_get_files(self
) -> Dict
[str, Any
]:
111 files
= self
.extra_files
113 files
['config'] = self
.ceph_conf
118 def from_daemon_description(dd
: DaemonDescription
) -> 'CephadmDaemonDeploySpec':
121 assert dd
.daemon_type
122 return CephadmDaemonDeploySpec(
124 daemon_id
=dd
.daemon_id
,
125 daemon_type
=dd
.daemon_type
,
126 service_name
=dd
.service_name(),
130 rank_generation
=dd
.rank_generation
,
131 extra_container_args
=dd
.extra_container_args
,
132 extra_entrypoint_args
=dd
.extra_entrypoint_args
,
135 def to_daemon_description(self
, status
: DaemonDescriptionStatus
, status_desc
: str) -> DaemonDescription
:
136 return DaemonDescription(
137 daemon_type
=self
.daemon_type
,
138 daemon_id
=self
.daemon_id
,
139 service_name
=self
.service_name
,
142 status_desc
=status_desc
,
146 rank_generation
=self
.rank_generation
,
147 extra_container_args
=self
.extra_container_args
,
148 extra_entrypoint_args
=self
.extra_entrypoint_args
,
152 class CephadmService(metaclass
=ABCMeta
):
154 Base class for service types. Often providing a create() and config() fn.
159 def TYPE(self
) -> str:
162 def __init__(self
, mgr
: "CephadmOrchestrator"):
163 self
.mgr
: "CephadmOrchestrator" = mgr
165 def allow_colo(self
) -> bool:
167 Return True if multiple daemons of the same type can colocate on
172 def primary_daemon_type(self
, spec
: Optional
[ServiceSpec
] = None) -> str:
174 This is the type of the primary (usually only) daemon to be deployed.
178 def per_host_daemon_type(self
, spec
: Optional
[ServiceSpec
] = None) -> Optional
[str]:
180 If defined, this type of daemon will be deployed once for each host
181 containing one or more daemons of the primary type.
185 def ranked(self
) -> bool:
187 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
188 generation (0, 1, ...) to each daemon we create/deploy.
192 def fence_old_ranks(self
,
194 rank_map
: Dict
[int, Dict
[int, Optional
[str]]],
195 num_ranks
: int) -> None:
198 def make_daemon_spec(
204 daemon_type
: Optional
[str] = None,
205 ports
: Optional
[List
[int]] = None,
206 ip
: Optional
[str] = None,
207 rank
: Optional
[int] = None,
208 rank_generation
: Optional
[int] = None,
209 ) -> CephadmDaemonDeploySpec
:
210 return CephadmDaemonDeploySpec(
213 service_name
=spec
.service_name(),
215 daemon_type
=daemon_type
,
219 rank_generation
=rank_generation
,
220 extra_container_args
=spec
.extra_container_args
if hasattr(
221 spec
, 'extra_container_args') else None,
222 extra_entrypoint_args
=spec
.extra_entrypoint_args
if hasattr(
223 spec
, 'extra_entrypoint_args') else None,
226 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
227 raise NotImplementedError()
229 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
230 raise NotImplementedError()
232 def config(self
, spec
: ServiceSpec
) -> None:
234 Configure the cluster for this service. Only called *once* per
235 service apply. Not for every daemon.
239 def daemon_check_post(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
240 """The post actions needed to be done after daemons are checked"""
241 if self
.mgr
.config_dashboard
:
242 if 'dashboard' in self
.mgr
.get('mgr_map')['modules']:
243 self
.config_dashboard(daemon_descrs
)
245 logger
.debug('Dashboard is not enabled. Skip configuration.')
247 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
248 """Config dashboard settings."""
249 raise NotImplementedError()
251 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
252 # if this is called for a service type where it hasn't explicitly been
253 # defined, return empty Daemon Desc
254 return DaemonDescription()
256 def get_keyring_with_caps(self
, entity
: AuthEntity
, caps
: List
[str]) -> str:
257 ret
, keyring
, err
= self
.mgr
.mon_command({
258 'prefix': 'auth get-or-create',
263 ret
, out
, err
= self
.mgr
.mon_command({
264 'prefix': 'auth caps',
269 self
.mgr
.log
.warning(f
"Unable to update caps for {entity}")
272 ret
, keyring
, err
= self
.mgr
.mon_command({
273 'prefix': 'auth get',
277 raise OrchestratorError(f
"Unable to fetch keyring for {entity}: {err}")
280 # - don't include caps (auth get includes them; get-or-create does not)
281 # - use pending key if present
283 for line
in keyring
.splitlines():
284 if ' = ' not in line
:
287 (ls
, rs
) = line
.split(' = ', 1)
288 if ls
== 'key' and not key
:
290 if ls
== 'pending key':
292 keyring
= f
'[{entity}]\nkey = {key}\n'
295 def _inventory_get_fqdn(self
, hostname
: str) -> str:
296 """Get a host's FQDN with its hostname.
298 If the FQDN can't be resolved, the address from the inventory will
301 addr
= self
.mgr
.inventory
.get_addr(hostname
)
302 return socket
.getfqdn(addr
)
304 def _set_service_url_on_dashboard(self
,
308 service_url
: str) -> None:
309 """A helper to get and set service_url via Dashboard's MON command.
311 If result of get_mon_cmd differs from service_url, set_mon_cmd will
312 be sent to set the service_url.
314 def get_set_cmd_dicts(out
: str) -> List
[dict]:
316 'prefix': set_mon_cmd
,
319 return [cmd_dict
] if service_url
!= out
else []
321 self
._check
_and
_set
_dashboard
(
322 service_name
=service_name
,
324 get_set_cmd_dicts
=get_set_cmd_dicts
327 def _check_and_set_dashboard(self
,
330 get_set_cmd_dicts
: Callable
[[str], List
[dict]]) -> None:
331 """A helper to set configs in the Dashboard.
333 The method is useful for the pattern:
334 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
336 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
337 - Determine if the config need to be update. NOTE: This step is important because if a
338 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
339 kicks the serve() loop and the logic using this method is likely to be called again.
340 A config should be updated only when needed.
341 - Update a config in Dashboard by using a Dashboard command.
343 :param service_name: the service name to be used for logging
344 :type service_name: str
345 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
347 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
351 'prefix': 'dashboard iscsi-gateway-add',
352 'service_url': 'http://admin:admin@aaa:5000',
356 'prefix': 'dashboard iscsi-gateway-add',
357 'service_url': 'http://admin:admin@bbb:5000',
361 The function should return empty list if no command need to be sent.
362 :type get_set_cmd_dicts: Callable[[str], List[dict]]
366 _
, out
, _
= self
.mgr
.check_mon_command({
369 except MonCommandFailed
as e
:
370 logger
.warning('Failed to get Dashboard config for %s: %s', service_name
, e
)
372 cmd_dicts
= get_set_cmd_dicts(out
.strip())
373 for cmd_dict
in list(cmd_dicts
):
375 inbuf
= cmd_dict
.pop('inbuf', None)
376 _
, out
, _
= self
.mgr
.check_mon_command(cmd_dict
, inbuf
)
377 except MonCommandFailed
as e
:
378 logger
.warning('Failed to set Dashboard config for %s: %s', service_name
, e
)
383 known
: Optional
[List
[str]] = None, # output argument
384 force
: bool = False) -> HandleCommandResult
:
385 r
= HandleCommandResult(*self
.mgr
.mon_command({
386 'prefix': "osd ok-to-stop",
392 j
= json
.loads(r
.stdout
)
393 except json
.decoder
.JSONDecodeError
:
394 self
.mgr
.log
.warning("osd ok-to-stop didn't return structured result")
398 if known
is not None and j
and j
.get('ok_to_stop'):
399 self
.mgr
.log
.debug(f
"got {j}")
400 known
.extend([f
'osd.{x}' for x
in j
.get('osds', [])])
401 return HandleCommandResult(
403 f
'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
409 daemon_ids
: List
[str],
411 known
: Optional
[List
[str]] = None # output argument
412 ) -> HandleCommandResult
:
413 names
= [f
'{self.TYPE}.{d_id}' for d_id
in daemon_ids
]
414 out
= f
'It appears safe to stop {",".join(names)}'
415 err
= f
'It is NOT safe to stop {",".join(names)} at this time'
417 if self
.TYPE
not in ['mon', 'osd', 'mds']:
419 return HandleCommandResult(0, out
)
421 if self
.TYPE
== 'osd':
422 return self
.ok_to_stop_osd(daemon_ids
, known
, force
)
424 r
= HandleCommandResult(*self
.mgr
.mon_command({
425 'prefix': f
'{self.TYPE} ok-to-stop',
430 err
= f
'{err}: {r.stderr}' if r
.stderr
else err
432 return HandleCommandResult(r
.retval
, r
.stdout
, err
)
434 out
= f
'{out}: {r.stdout}' if r
.stdout
else out
436 return HandleCommandResult(r
.retval
, out
, r
.stderr
)
438 def _enough_daemons_to_stop(self
, daemon_type
: str, daemon_ids
: List
[str], service
: str, low_limit
: int, alert
: bool = False) -> Tuple
[bool, str]:
439 # Provides a warning about if it possible or not to stop <n> daemons in a service
440 names
= [f
'{daemon_type}.{d_id}' for d_id
in daemon_ids
]
441 number_of_running_daemons
= len(
443 for daemon
in self
.mgr
.cache
.get_daemons_by_type(daemon_type
)
444 if daemon
.status
== DaemonDescriptionStatus
.running
])
445 if (number_of_running_daemons
- len(daemon_ids
)) >= low_limit
:
446 return False, f
'It is presumed safe to stop {names}'
448 num_daemons_left
= number_of_running_daemons
- len(daemon_ids
)
450 def plural(count
: int) -> str:
451 return 'daemon' if count
== 1 else 'daemons'
453 left_count
= "no" if num_daemons_left
== 0 else num_daemons_left
456 out
= (f
'ALERT: Cannot stop {names} in {service} service. '
457 f
'Not enough remaining {service} daemons. '
458 f
'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
460 out
= (f
'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
461 f
'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
462 f
'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
465 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
467 Called before the daemon is removed.
469 assert daemon
.daemon_type
is not None
470 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
471 logger
.debug(f
'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
473 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
475 Called after the daemon is removed.
477 assert daemon
.daemon_type
is not None
478 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
479 logger
.debug(f
'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
481 def purge(self
, service_name
: str) -> None:
482 """Called to carry out any purge tasks following service removal"""
483 logger
.debug(f
'Purge called for {self.TYPE} - no action taken')
486 class CephService(CephadmService
):
487 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
488 # Ceph.daemons (mon, mgr, mds, osd, etc)
489 cephadm_config
= self
.get_config_and_keyring(
490 daemon_spec
.daemon_type
,
491 daemon_spec
.daemon_id
,
492 host
=daemon_spec
.host
,
493 keyring
=daemon_spec
.keyring
,
494 extra_ceph_config
=daemon_spec
.ceph_conf
)
496 if daemon_spec
.config_get_files():
497 cephadm_config
.update({'files': daemon_spec
.config_get_files()})
499 return cephadm_config
, []
501 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
502 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
503 self
.remove_keyring(daemon
)
505 def get_auth_entity(self
, daemon_id
: str, host
: str = "") -> AuthEntity
:
506 return get_auth_entity(self
.TYPE
, daemon_id
, host
=host
)
508 def get_config_and_keyring(self
,
512 keyring
: Optional
[str] = None,
513 extra_ceph_config
: Optional
[str] = None
517 entity
: AuthEntity
= self
.get_auth_entity(daemon_id
, host
=host
)
518 ret
, keyring
, err
= self
.mgr
.check_mon_command({
519 'prefix': 'auth get',
522 config
= self
.mgr
.get_minimal_ceph_conf()
524 if extra_ceph_config
:
525 config
+= extra_ceph_config
532 def remove_keyring(self
, daemon
: DaemonDescription
) -> None:
533 assert daemon
.daemon_id
is not None
534 assert daemon
.hostname
is not None
535 daemon_id
: str = daemon
.daemon_id
536 host
: str = daemon
.hostname
538 assert daemon
.daemon_type
!= 'mon'
540 entity
= self
.get_auth_entity(daemon_id
, host
=host
)
542 logger
.info(f
'Removing key for {entity}')
543 ret
, out
, err
= self
.mgr
.mon_command({
549 class MonService(CephService
):
552 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
554 Create a new monitor on the given host.
556 assert self
.TYPE
== daemon_spec
.daemon_type
557 name
, _
, network
= daemon_spec
.daemon_id
, daemon_spec
.host
, daemon_spec
.network
560 ret
, keyring
, err
= self
.mgr
.check_mon_command({
561 'prefix': 'auth get',
562 'entity': daemon_spec
.entity_name(),
565 extra_config
= '[mon.%s]\n' % name
567 # infer whether this is a CIDR network, addrvec, or plain IP
569 extra_config
+= 'public network = %s\n' % network
570 elif network
.startswith('[v') and network
.endswith(']'):
571 extra_config
+= 'public addrv = %s\n' % network
572 elif is_ipv6(network
):
573 extra_config
+= 'public addr = %s\n' % unwrap_ipv6(network
)
574 elif ':' not in network
:
575 extra_config
+= 'public addr = %s\n' % network
577 raise OrchestratorError(
578 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network
)
580 # try to get the public_network from the config
581 ret
, network
, err
= self
.mgr
.check_mon_command({
582 'prefix': 'config get',
584 'key': 'public_network',
586 network
= network
.strip() if network
else network
588 raise OrchestratorError(
589 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
590 if '/' not in network
:
591 raise OrchestratorError(
592 'public_network is set but does not look like a CIDR network: \'%s\'' % network
)
593 extra_config
+= 'public network = %s\n' % network
595 daemon_spec
.ceph_conf
= extra_config
596 daemon_spec
.keyring
= keyring
598 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
602 def config(self
, spec
: ServiceSpec
) -> None:
603 assert self
.TYPE
== spec
.service_type
604 self
.set_crush_locations(self
.mgr
.cache
.get_daemons_by_type('mon'), spec
)
606 def _get_quorum_status(self
) -> Dict
[Any
, Any
]:
607 ret
, out
, err
= self
.mgr
.check_mon_command({
608 'prefix': 'quorum_status',
612 except Exception as e
:
613 raise OrchestratorError(f
'failed to parse mon quorum status: {e}')
616 def _check_safe_to_destroy(self
, mon_id
: str) -> None:
617 quorum_status
= self
._get
_quorum
_status
()
618 mons
= [m
['name'] for m
in quorum_status
['monmap']['mons']]
619 if mon_id
not in mons
:
620 logger
.info('Safe to remove mon.%s: not in monmap (%s)' % (
623 new_mons
= [m
for m
in mons
if m
!= mon_id
]
624 new_quorum
= [m
for m
in quorum_status
['quorum_names'] if m
!= mon_id
]
625 if len(new_quorum
) > len(new_mons
) / 2:
626 logger
.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
627 (mon_id
, new_quorum
, new_mons
))
629 raise OrchestratorError(
630 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id
, new_quorum
, new_mons
))
632 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
633 super().pre_remove(daemon
)
635 assert daemon
.daemon_id
is not None
636 daemon_id
: str = daemon
.daemon_id
637 self
._check
_safe
_to
_destroy
(daemon_id
)
639 # remove mon from quorum before we destroy the daemon
640 logger
.info('Removing monitor %s from monmap...' % daemon_id
)
641 ret
, out
, err
= self
.mgr
.check_mon_command({
646 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
647 # Do not remove the mon keyring.
648 # super().post_remove(daemon)
651 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
652 daemon_spec
.final_config
, daemon_spec
.deps
= super().generate_config(daemon_spec
)
654 # realistically, we expect there to always be a mon spec
655 # in a real deployment, but the way teuthology deploys some daemons
656 # it's possible there might not be. For that reason we need to
657 # verify the service is present in the spec store.
658 if daemon_spec
.service_name
in self
.mgr
.spec_store
:
659 mon_spec
= cast(MONSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
660 if mon_spec
.crush_locations
:
661 if daemon_spec
.host
in mon_spec
.crush_locations
:
662 # the --crush-location flag only supports a single bucket=loc pair so
663 # others will have to be handled later. The idea is to set the flag
664 # for the first bucket=loc pair in the list in order to facilitate
665 # replacing a tiebreaker mon (https://docs.ceph.com/en/quincy/rados/operations/stretch-mode/#other-commands)
666 c_loc
= mon_spec
.crush_locations
[daemon_spec
.host
][0]
667 daemon_spec
.final_config
['crush_location'] = c_loc
669 return daemon_spec
.final_config
, daemon_spec
.deps
671 def set_crush_locations(self
, daemon_descrs
: List
[DaemonDescription
], spec
: ServiceSpec
) -> None:
672 logger
.debug('Setting mon crush locations from spec')
673 if not daemon_descrs
:
675 assert self
.TYPE
== spec
.service_type
676 mon_spec
= cast(MONSpec
, spec
)
678 if not mon_spec
.crush_locations
:
681 quorum_status
= self
._get
_quorum
_status
()
682 mons_in_monmap
= [m
['name'] for m
in quorum_status
['monmap']['mons']]
683 for dd
in daemon_descrs
:
684 assert dd
.daemon_id
is not None
685 assert dd
.hostname
is not None
686 if dd
.hostname
not in mon_spec
.crush_locations
:
688 if dd
.daemon_id
not in mons_in_monmap
:
690 # expected format for crush_locations from the quorum status is
691 # {bucket1=loc1,bucket2=loc2} etc. for the number of bucket=loc pairs
693 current_crush_locs
= [m
['crush_location'] for m
in quorum_status
['monmap']['mons'] if m
['name'] == dd
.daemon_id
][0]
694 except (KeyError, IndexError) as e
:
695 logger
.warning(f
'Failed setting crush location for mon {dd.daemon_id}: {e}\n'
696 'Mon may not have a monmap entry yet. Try re-applying mon spec once mon is confirmed up.')
697 desired_crush_locs
= '{' + ','.join(mon_spec
.crush_locations
[dd
.hostname
]) + '}'
698 logger
.debug(f
'Found spec defined crush locations for mon on {dd.hostname}: {desired_crush_locs}')
699 logger
.debug(f
'Current crush locations for mon on {dd.hostname}: {current_crush_locs}')
700 if current_crush_locs
!= desired_crush_locs
:
701 logger
.info(f
'Setting crush location for mon {dd.daemon_id} to {desired_crush_locs}')
703 ret
, out
, err
= self
.mgr
.check_mon_command({
704 'prefix': 'mon set_location',
705 'name': dd
.daemon_id
,
706 'args': mon_spec
.crush_locations
[dd
.hostname
]
708 except Exception as e
:
709 logger
.error(f
'Failed setting crush location for mon {dd.daemon_id}: {e}')
712 class MgrService(CephService
):
715 def allow_colo(self
) -> bool:
716 if self
.mgr
.get_ceph_option('mgr_standby_modules'):
717 # traditional mgr mode: standby daemons' modules listen on
718 # ports and redirect to the primary. we must not schedule
719 # multiple mgrs on the same host or else ports will
723 # standby daemons do nothing, and therefore port conflicts
727 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
729 Create a new manager instance on a host.
731 assert self
.TYPE
== daemon_spec
.daemon_type
732 mgr_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
735 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mgr_id
),
736 ['mon', 'profile mgr',
740 # Retrieve ports used by manager modules
741 # In the case of the dashboard port and with several manager daemons
742 # running in different hosts, it exists the possibility that the
743 # user has decided to use different dashboard ports in each server
744 # If this is the case then the dashboard port opened will be only the used
747 ret
, mgr_services
, err
= self
.mgr
.check_mon_command({
748 'prefix': 'mgr services',
751 mgr_endpoints
= json
.loads(mgr_services
)
752 for end_point
in mgr_endpoints
.values():
753 port
= re
.search(r
'\:\d+\/', end_point
)
755 ports
.append(int(port
[0][1:-1]))
758 daemon_spec
.ports
= ports
760 daemon_spec
.ports
.append(self
.mgr
.service_discovery_port
)
761 daemon_spec
.keyring
= keyring
763 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
767 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
768 for daemon
in daemon_descrs
:
769 assert daemon
.daemon_type
is not None
770 assert daemon
.daemon_id
is not None
771 if self
.mgr
.daemon_is_self(daemon
.daemon_type
, daemon
.daemon_id
):
773 # if no active mgr found, return empty Daemon Desc
774 return DaemonDescription()
776 def fail_over(self
) -> None:
777 # this has been seen to sometimes transiently fail even when there are multiple
778 # mgr daemons. As long as there are multiple known mgr daemons, we should retry.
779 class NoStandbyError(OrchestratorError
):
781 no_standby_exc
= NoStandbyError('Need standby mgr daemon', event_kind_subject
=(
782 'daemon', 'mgr' + self
.mgr
.get_mgr_id()))
783 for sleep_secs
in [2, 8, 15]:
785 if not self
.mgr_map_has_standby():
787 self
.mgr
.events
.for_daemon('mgr' + self
.mgr
.get_mgr_id(),
788 'INFO', 'Failing over to other MGR')
789 logger
.info('Failing over to other MGR')
792 ret
, out
, err
= self
.mgr
.check_mon_command({
793 'prefix': 'mgr fail',
794 'who': self
.mgr
.get_mgr_id(),
797 except NoStandbyError
:
799 f
'Failed to find standby mgr for failover. Retrying in {sleep_secs} seconds')
800 time
.sleep(sleep_secs
)
803 def mgr_map_has_standby(self
) -> bool:
805 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
806 we know it joined the cluster
808 mgr_map
= self
.mgr
.get('mgr_map')
809 num
= len(mgr_map
.get('standbys'))
814 daemon_ids
: List
[str],
816 known
: Optional
[List
[str]] = None # output argument
817 ) -> HandleCommandResult
:
818 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
820 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'Mgr', 1, True)
822 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
824 mgr_daemons
= self
.mgr
.cache
.get_daemons_by_type(self
.TYPE
)
825 active
= self
.get_active_daemon(mgr_daemons
).daemon_id
826 if active
in daemon_ids
:
827 warn_message
= 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
828 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
830 return HandleCommandResult(0, warn_message
, '')
833 class MdsService(CephService
):
836 def allow_colo(self
) -> bool:
839 def config(self
, spec
: ServiceSpec
) -> None:
840 assert self
.TYPE
== spec
.service_type
841 assert spec
.service_id
843 # ensure mds_join_fs is set for these daemons
844 ret
, out
, err
= self
.mgr
.check_mon_command({
845 'prefix': 'config set',
846 'who': 'mds.' + spec
.service_id
,
847 'name': 'mds_join_fs',
848 'value': spec
.service_id
,
851 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
852 assert self
.TYPE
== daemon_spec
.daemon_type
853 mds_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
856 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mds_id
),
857 ['mon', 'profile mds',
858 'osd', 'allow rw tag cephfs *=*',
860 daemon_spec
.keyring
= keyring
862 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
866 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
867 active_mds_strs
= list()
868 for fs
in self
.mgr
.get('fs_map')['filesystems']:
869 mds_map
= fs
['mdsmap']
870 if mds_map
is not None:
871 for mds_id
, mds_status
in mds_map
['info'].items():
872 if mds_status
['state'] == 'up:active':
873 active_mds_strs
.append(mds_status
['name'])
874 if len(active_mds_strs
) != 0:
875 for daemon
in daemon_descrs
:
876 if daemon
.daemon_id
in active_mds_strs
:
878 # if no mds found, return empty Daemon Desc
879 return DaemonDescription()
881 def purge(self
, service_name
: str) -> None:
882 self
.mgr
.check_mon_command({
883 'prefix': 'config rm',
885 'name': 'mds_join_fs',
889 class RgwService(CephService
):
892 def allow_colo(self
) -> bool:
895 def config(self
, spec
: RGWSpec
) -> None: # type: ignore
896 assert self
.TYPE
== spec
.service_type
898 # set rgw_realm rgw_zonegroup and rgw_zone, if present
900 ret
, out
, err
= self
.mgr
.check_mon_command({
901 'prefix': 'config set',
902 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
904 'value': spec
.rgw_realm
,
906 if spec
.rgw_zonegroup
:
907 ret
, out
, err
= self
.mgr
.check_mon_command({
908 'prefix': 'config set',
909 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
910 'name': 'rgw_zonegroup',
911 'value': spec
.rgw_zonegroup
,
914 ret
, out
, err
= self
.mgr
.check_mon_command({
915 'prefix': 'config set',
916 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
918 'value': spec
.rgw_zone
,
921 if spec
.rgw_frontend_ssl_certificate
:
922 if isinstance(spec
.rgw_frontend_ssl_certificate
, list):
923 cert_data
= '\n'.join(spec
.rgw_frontend_ssl_certificate
)
924 elif isinstance(spec
.rgw_frontend_ssl_certificate
, str):
925 cert_data
= spec
.rgw_frontend_ssl_certificate
927 raise OrchestratorError(
928 'Invalid rgw_frontend_ssl_certificate: %s'
929 % spec
.rgw_frontend_ssl_certificate
)
930 ret
, out
, err
= self
.mgr
.check_mon_command({
931 'prefix': 'config-key set',
932 'key': f
'rgw/cert/{spec.service_name()}',
936 # TODO: fail, if we don't have a spec
937 logger
.info('Saving service %s spec with placement %s' % (
938 spec
.service_name(), spec
.placement
.pretty_str()))
939 self
.mgr
.spec_store
.save(spec
)
940 self
.mgr
.trigger_connect_dashboard_rgw()
942 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
943 assert self
.TYPE
== daemon_spec
.daemon_type
944 rgw_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
945 spec
= cast(RGWSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
947 keyring
= self
.get_keyring(rgw_id
)
949 if daemon_spec
.ports
:
950 port
= daemon_spec
.ports
[0]
952 # this is a redeploy of older instance that doesn't have an explicitly
953 # assigned port, in which case we can assume there is only 1 per host
954 # and it matches the spec.
955 port
= spec
.get_port()
959 ftype
= spec
.rgw_frontend_type
or "beast"
964 f
"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
966 args
.append(f
"ssl_port={port}")
967 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
970 args
.append(f
"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
972 args
.append(f
"port={port}")
973 elif ftype
== 'civetweb':
976 # note the 's' suffix on port
977 args
.append(f
"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
979 args
.append(f
"port={port}s") # note the 's' suffix on port
980 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
983 args
.append(f
"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
985 args
.append(f
"port={port}")
987 raise OrchestratorError(f
'Invalid rgw_frontend_type parameter: {ftype}. Valid values are: beast, civetweb.')
989 if spec
.rgw_frontend_extra_args
is not None:
990 args
.extend(spec
.rgw_frontend_extra_args
)
992 frontend
= f
'{ftype} {" ".join(args)}'
994 ret
, out
, err
= self
.mgr
.check_mon_command({
995 'prefix': 'config set',
996 'who': utils
.name_to_config_section(daemon_spec
.name()),
997 'name': 'rgw_frontends',
1001 daemon_spec
.keyring
= keyring
1002 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1006 def get_keyring(self
, rgw_id
: str) -> str:
1007 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(rgw_id
),
1010 'osd', 'allow rwx tag rgw *=*'])
1013 def purge(self
, service_name
: str) -> None:
1014 self
.mgr
.check_mon_command({
1015 'prefix': 'config rm',
1016 'who': utils
.name_to_config_section(service_name
),
1017 'name': 'rgw_realm',
1019 self
.mgr
.check_mon_command({
1020 'prefix': 'config rm',
1021 'who': utils
.name_to_config_section(service_name
),
1024 self
.mgr
.check_mon_command({
1025 'prefix': 'config-key rm',
1026 'key': f
'rgw/cert/{service_name}',
1028 self
.mgr
.trigger_connect_dashboard_rgw()
1030 def post_remove(self
, daemon
: DaemonDescription
, is_failed_deploy
: bool) -> None:
1031 super().post_remove(daemon
, is_failed_deploy
=is_failed_deploy
)
1032 self
.mgr
.check_mon_command({
1033 'prefix': 'config rm',
1034 'who': utils
.name_to_config_section(daemon
.name()),
1035 'name': 'rgw_frontends',
1040 daemon_ids
: List
[str],
1041 force
: bool = False,
1042 known
: Optional
[List
[str]] = None # output argument
1043 ) -> HandleCommandResult
:
1044 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
1045 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
1046 def ingress_present() -> bool:
1047 running_ingress_daemons
= [
1048 daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type('ingress') if daemon
.status
== 1]
1049 running_haproxy_daemons
= [
1050 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'haproxy']
1051 running_keepalived_daemons
= [
1052 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'keepalived']
1053 # check that there is at least one haproxy and keepalived daemon running
1054 if running_haproxy_daemons
and running_keepalived_daemons
:
1058 # if only 1 rgw, alert user (this is not passable with --force)
1059 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'RGW', 1, True)
1061 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
1063 # if reached here, there is > 1 rgw daemon.
1064 # Say okay if load balancer present or force flag set
1065 if ingress_present() or force
:
1066 return HandleCommandResult(0, warn_message
, '')
1068 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
1070 warn_message
= "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
1071 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
1073 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
1074 self
.mgr
.trigger_connect_dashboard_rgw()
1077 class RbdMirrorService(CephService
):
1080 def allow_colo(self
) -> bool:
1083 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1084 assert self
.TYPE
== daemon_spec
.daemon_type
1085 daemon_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
1087 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
),
1088 ['mon', 'profile rbd-mirror',
1089 'osd', 'profile rbd'])
1091 daemon_spec
.keyring
= keyring
1093 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1099 daemon_ids
: List
[str],
1100 force
: bool = False,
1101 known
: Optional
[List
[str]] = None # output argument
1102 ) -> HandleCommandResult
:
1103 # if only 1 rbd-mirror, alert user (this is not passable with --force)
1104 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(
1105 self
.TYPE
, daemon_ids
, 'Rbdmirror', 1, True)
1107 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
1108 return HandleCommandResult(0, warn_message
, '')
1111 class CrashService(CephService
):
1114 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1115 assert self
.TYPE
== daemon_spec
.daemon_type
1116 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
1118 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
),
1119 ['mon', 'profile crash',
1120 'mgr', 'profile crash'])
1122 daemon_spec
.keyring
= keyring
1124 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1129 class CephExporterService(CephService
):
1130 TYPE
= 'ceph-exporter'
1131 DEFAULT_SERVICE_PORT
= 9926
1133 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1134 assert self
.TYPE
== daemon_spec
.daemon_type
1135 spec
= cast(CephExporterSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
1136 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_spec
.daemon_id
),
1137 ['mon', 'profile ceph-exporter',
1141 exporter_config
= {}
1143 exporter_config
.update({'sock-dir': spec
.sock_dir
})
1145 exporter_config
.update({'port': f
'{spec.port}'})
1146 if spec
.prio_limit
is not None:
1147 exporter_config
.update({'prio-limit': f
'{spec.prio_limit}'})
1148 if spec
.stats_period
:
1149 exporter_config
.update({'stats-period': f
'{spec.stats_period}'})
1151 daemon_spec
.keyring
= keyring
1152 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1153 daemon_spec
.final_config
= merge_dicts(daemon_spec
.final_config
, exporter_config
)
1157 class CephfsMirrorService(CephService
):
1158 TYPE
= 'cephfs-mirror'
1160 def config(self
, spec
: ServiceSpec
) -> None:
1161 # make sure mirroring module is enabled
1162 mgr_map
= self
.mgr
.get('mgr_map')
1163 mod_name
= 'mirroring'
1164 if mod_name
not in mgr_map
.get('services', {}):
1165 self
.mgr
.check_mon_command({
1166 'prefix': 'mgr module enable',
1169 # we shouldn't get here (mon will tell the mgr to respawn), but no
1170 # harm done if we do.
1172 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1173 assert self
.TYPE
== daemon_spec
.daemon_type
1175 ret
, keyring
, err
= self
.mgr
.check_mon_command({
1176 'prefix': 'auth get-or-create',
1177 'entity': daemon_spec
.entity_name(),
1178 'caps': ['mon', 'profile cephfs-mirror',
1180 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
1184 daemon_spec
.keyring
= keyring
1185 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1189 class CephadmAgent(CephService
):
1192 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
1193 assert self
.TYPE
== daemon_spec
.daemon_type
1194 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
1196 if not self
.mgr
.http_server
.agent
:
1197 raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
1199 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
), [])
1200 daemon_spec
.keyring
= keyring
1201 self
.mgr
.agent_cache
.agent_keys
[host
] = keyring
1203 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
1207 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
1208 agent
= self
.mgr
.http_server
.agent
1211 assert agent
.ssl_certs
.get_root_cert()
1212 assert agent
.server_port
1214 raise OrchestratorError(
1215 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
1217 cfg
= {'target_ip': self
.mgr
.get_mgr_ip(),
1218 'target_port': agent
.server_port
,
1219 'refresh_period': self
.mgr
.agent_refresh_rate
,
1220 'listener_port': self
.mgr
.agent_starting_port
,
1221 'host': daemon_spec
.host
,
1222 'device_enhanced_scan': str(self
.mgr
.device_enhanced_scan
)}
1224 listener_cert
, listener_key
= agent
.ssl_certs
.generate_cert(daemon_spec
.host
, self
.mgr
.inventory
.get_addr(daemon_spec
.host
))
1226 'agent.json': json
.dumps(cfg
),
1227 'keyring': daemon_spec
.keyring
,
1228 'root_cert.pem': agent
.ssl_certs
.get_root_cert(),
1229 'listener.crt': listener_cert
,
1230 'listener.key': listener_key
,
1233 return config
, sorted([str(self
.mgr
.get_mgr_ip()), str(agent
.server_port
),
1234 agent
.ssl_certs
.get_root_cert(),
1235 str(self
.mgr
.get_module_option('device_enhanced_scan'))])