5 from abc
import ABCMeta
, abstractmethod
6 from typing
import TYPE_CHECKING
, List
, Callable
, TypeVar
, \
7 Optional
, Dict
, Any
, Tuple
, NewType
, cast
9 from mgr_module
import HandleCommandResult
, MonCommandFailed
11 from ceph
.deployment
.service_spec
import ServiceSpec
, RGWSpec
12 from ceph
.deployment
.utils
import is_ipv6
, unwrap_ipv6
13 from orchestrator
import OrchestratorError
, DaemonDescription
, DaemonDescriptionStatus
14 from orchestrator
._interface
import daemon_type_to_service
15 from cephadm
import utils
18 from cephadm
.module
import CephadmOrchestrator
20 logger
= logging
.getLogger(__name__
)
22 ServiceSpecs
= TypeVar('ServiceSpecs', bound
=ServiceSpec
)
23 AuthEntity
= NewType('AuthEntity', str)
26 class CephadmDaemonDeploySpec
:
27 # typing.NamedTuple + Generic is broken in py36
28 def __init__(self
, host
: str, daemon_id
: str,
30 network
: Optional
[str] = None,
31 keyring
: Optional
[str] = None,
32 extra_args
: Optional
[List
[str]] = None,
34 extra_files
: Optional
[Dict
[str, Any
]] = None,
35 daemon_type
: Optional
[str] = None,
36 ip
: Optional
[str] = None,
37 ports
: Optional
[List
[int]] = None):
39 A data struction to encapsulate `cephadm deploy ...
42 self
.daemon_id
= daemon_id
43 self
.service_name
= service_name
44 daemon_type
= daemon_type
or (service_name
.split('.')[0])
45 assert daemon_type
is not None
46 self
.daemon_type
: str = daemon_type
49 self
.network
= network
52 self
.keyring
: Optional
[str] = keyring
54 # For run_cephadm. Would be great to have more expressive names.
55 self
.extra_args
: List
[str] = extra_args
or []
57 self
.ceph_conf
= ceph_conf
58 self
.extra_files
= extra_files
or {}
60 # TCP ports used by the daemon
61 self
.ports
: List
[int] = ports
or []
62 self
.ip
: Optional
[str] = ip
64 # values to be populated during generate_config calls
65 # and then used in _run_cephadm
66 self
.final_config
: Dict
[str, Any
] = {}
67 self
.deps
: List
[str] = []
69 def name(self
) -> str:
70 return '%s.%s' % (self
.daemon_type
, self
.daemon_id
)
72 def config_get_files(self
) -> Dict
[str, Any
]:
73 files
= self
.extra_files
75 files
['config'] = self
.ceph_conf
80 def from_daemon_description(dd
: DaemonDescription
) -> 'CephadmDaemonDeploySpec':
84 return CephadmDaemonDeploySpec(
86 daemon_id
=dd
.daemon_id
,
87 daemon_type
=dd
.daemon_type
,
88 service_name
=dd
.service_name(),
93 def to_daemon_description(self
, status
: DaemonDescriptionStatus
, status_desc
: str) -> DaemonDescription
:
94 return DaemonDescription(
95 daemon_type
=self
.daemon_type
,
96 daemon_id
=self
.daemon_id
,
99 status_desc
=status_desc
,
105 class CephadmService(metaclass
=ABCMeta
):
107 Base class for service types. Often providing a create() and config() fn.
112 def TYPE(self
) -> str:
115 def __init__(self
, mgr
: "CephadmOrchestrator"):
116 self
.mgr
: "CephadmOrchestrator" = mgr
118 def allow_colo(self
) -> bool:
121 def per_host_daemon_type(self
) -> Optional
[str]:
124 def primary_daemon_type(self
) -> str:
127 def make_daemon_spec(
132 daemon_type
: Optional
[str] = None,
133 ports
: Optional
[List
[int]] = None,
134 ip
: Optional
[str] = None,
135 ) -> CephadmDaemonDeploySpec
:
136 return CephadmDaemonDeploySpec(
139 service_name
=spec
.service_name(),
141 daemon_type
=daemon_type
,
146 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
147 raise NotImplementedError()
149 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
150 raise NotImplementedError()
152 def config(self
, spec
: ServiceSpec
, daemon_id
: str) -> None:
154 Configure the cluster for this service. Only called *once* per
155 service apply. Not for every daemon.
159 def daemon_check_post(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
160 """The post actions needed to be done after daemons are checked"""
161 if self
.mgr
.config_dashboard
:
162 if 'dashboard' in self
.mgr
.get('mgr_map')['modules']:
163 self
.config_dashboard(daemon_descrs
)
165 logger
.debug('Dashboard is not enabled. Skip configuration.')
167 def config_dashboard(self
, daemon_descrs
: List
[DaemonDescription
]) -> None:
168 """Config dashboard settings."""
169 raise NotImplementedError()
171 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
172 # if this is called for a service type where it hasn't explcitly been
173 # defined, return empty Daemon Desc
174 return DaemonDescription()
176 def get_keyring_with_caps(self
, entity
: AuthEntity
, caps
: List
[str]) -> str:
177 ret
, keyring
, err
= self
.mgr
.mon_command({
178 'prefix': 'auth get-or-create',
183 ret
, out
, err
= self
.mgr
.mon_command({
184 'prefix': 'auth caps',
189 self
.mgr
.log
.warning(f
"Unable to update caps for {entity}")
192 def _inventory_get_addr(self
, hostname
: str) -> str:
193 """Get a host's address with its hostname."""
194 return self
.mgr
.inventory
.get_addr(hostname
)
196 def _set_service_url_on_dashboard(self
,
200 service_url
: str) -> None:
201 """A helper to get and set service_url via Dashboard's MON command.
203 If result of get_mon_cmd differs from service_url, set_mon_cmd will
204 be sent to set the service_url.
206 def get_set_cmd_dicts(out
: str) -> List
[dict]:
208 'prefix': set_mon_cmd
,
211 return [cmd_dict
] if service_url
!= out
else []
213 self
._check
_and
_set
_dashboard
(
214 service_name
=service_name
,
216 get_set_cmd_dicts
=get_set_cmd_dicts
219 def _check_and_set_dashboard(self
,
222 get_set_cmd_dicts
: Callable
[[str], List
[dict]]) -> None:
223 """A helper to set configs in the Dashboard.
225 The method is useful for the pattern:
226 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
228 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
229 - Determine if the config need to be update. NOTE: This step is important because if a
230 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
231 kicks the serve() loop and the logic using this method is likely to be called again.
232 A config should be updated only when needed.
233 - Update a config in Dashboard by using a Dashboard command.
235 :param service_name: the service name to be used for logging
236 :type service_name: str
237 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
239 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
243 'prefix': 'dashboard iscsi-gateway-add',
244 'service_url': 'http://admin:admin@aaa:5000',
248 'prefix': 'dashboard iscsi-gateway-add',
249 'service_url': 'http://admin:admin@bbb:5000',
253 The function should return empty list if no command need to be sent.
254 :type get_set_cmd_dicts: Callable[[str], List[dict]]
258 _
, out
, _
= self
.mgr
.check_mon_command({
261 except MonCommandFailed
as e
:
262 logger
.warning('Failed to get Dashboard config for %s: %s', service_name
, e
)
264 cmd_dicts
= get_set_cmd_dicts(out
.strip())
265 for cmd_dict
in list(cmd_dicts
):
267 inbuf
= cmd_dict
.pop('inbuf', None)
268 _
, out
, _
= self
.mgr
.check_mon_command(cmd_dict
, inbuf
)
269 except MonCommandFailed
as e
:
270 logger
.warning('Failed to set Dashboard config for %s: %s', service_name
, e
)
275 known
: Optional
[List
[str]] = None, # output argument
276 force
: bool = False) -> HandleCommandResult
:
277 r
= HandleCommandResult(*self
.mgr
.mon_command({
278 'prefix': "osd ok-to-stop",
284 j
= json
.loads(r
.stdout
)
285 except json
.decoder
.JSONDecodeError
:
286 self
.mgr
.log
.warning("osd ok-to-stop didn't return structured result")
290 if known
is not None and j
and j
.get('ok_to_stop'):
291 self
.mgr
.log
.debug(f
"got {j}")
292 known
.extend([f
'osd.{x}' for x
in j
.get('osds', [])])
293 return HandleCommandResult(
295 f
'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
301 daemon_ids
: List
[str],
303 known
: Optional
[List
[str]] = None # output argument
304 ) -> HandleCommandResult
:
305 names
= [f
'{self.TYPE}.{d_id}' for d_id
in daemon_ids
]
306 out
= f
'It appears safe to stop {",".join(names)}'
307 err
= f
'It is NOT safe to stop {",".join(names)} at this time'
309 if self
.TYPE
not in ['mon', 'osd', 'mds']:
311 return HandleCommandResult(0, out
)
313 if self
.TYPE
== 'osd':
314 return self
.ok_to_stop_osd(daemon_ids
, known
, force
)
316 r
= HandleCommandResult(*self
.mgr
.mon_command({
317 'prefix': f
'{self.TYPE} ok-to-stop',
322 err
= f
'{err}: {r.stderr}' if r
.stderr
else err
324 return HandleCommandResult(r
.retval
, r
.stdout
, err
)
326 out
= f
'{out}: {r.stdout}' if r
.stdout
else out
328 return HandleCommandResult(r
.retval
, out
, r
.stderr
)
330 def _enough_daemons_to_stop(self
, daemon_type
: str, daemon_ids
: List
[str], service
: str, low_limit
: int, alert
: bool = False) -> Tuple
[bool, str]:
331 # Provides a warning about if it possible or not to stop <n> daemons in a service
332 names
= [f
'{daemon_type}.{d_id}' for d_id
in daemon_ids
]
333 number_of_running_daemons
= len(
335 for daemon
in self
.mgr
.cache
.get_daemons_by_type(daemon_type
)
336 if daemon
.status
== DaemonDescriptionStatus
.running
])
337 if (number_of_running_daemons
- len(daemon_ids
)) >= low_limit
:
338 return False, f
'It is presumed safe to stop {names}'
340 num_daemons_left
= number_of_running_daemons
- len(daemon_ids
)
342 def plural(count
: int) -> str:
343 return 'daemon' if count
== 1 else 'daemons'
345 left_count
= "no" if num_daemons_left
== 0 else num_daemons_left
348 out
= (f
'ALERT: Cannot stop {names} in {service} service. '
349 f
'Not enough remaining {service} daemons. '
350 f
'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
352 out
= (f
'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
353 f
'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
354 f
'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
357 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
359 Called before the daemon is removed.
361 assert daemon
.daemon_type
is not None
362 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
363 logger
.debug(f
'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
365 def post_remove(self
, daemon
: DaemonDescription
) -> None:
367 Called after the daemon is removed.
369 assert daemon
.daemon_type
is not None
370 assert self
.TYPE
== daemon_type_to_service(daemon
.daemon_type
)
371 logger
.debug(f
'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
373 def purge(self
, service_name
: str) -> None:
374 """Called to carry out any purge tasks following service removal"""
375 logger
.debug(f
'Purge called for {self.TYPE} - no action taken')
378 class CephService(CephadmService
):
379 def generate_config(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> Tuple
[Dict
[str, Any
], List
[str]]:
380 # Ceph.daemons (mon, mgr, mds, osd, etc)
381 cephadm_config
= self
.get_config_and_keyring(
382 daemon_spec
.daemon_type
,
383 daemon_spec
.daemon_id
,
384 host
=daemon_spec
.host
,
385 keyring
=daemon_spec
.keyring
,
386 extra_ceph_config
=daemon_spec
.ceph_conf
)
388 if daemon_spec
.config_get_files():
389 cephadm_config
.update({'files': daemon_spec
.config_get_files()})
391 return cephadm_config
, []
393 def post_remove(self
, daemon
: DaemonDescription
) -> None:
394 super().post_remove(daemon
)
395 self
.remove_keyring(daemon
)
397 def get_auth_entity(self
, daemon_id
: str, host
: str = "") -> AuthEntity
:
399 Map the daemon id to a cephx keyring entity name
401 # despite this mapping entity names to daemons, self.TYPE within
402 # the CephService class refers to service types, not daemon types
403 if self
.TYPE
in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
404 return AuthEntity(f
'client.{self.TYPE}.{daemon_id}')
405 elif self
.TYPE
== 'crash':
407 raise OrchestratorError("Host not provided to generate <crash> auth entity name")
408 return AuthEntity(f
'client.{self.TYPE}.{host}')
409 elif self
.TYPE
== 'mon':
410 return AuthEntity('mon.')
411 elif self
.TYPE
in ['mgr', 'osd', 'mds']:
412 return AuthEntity(f
'{self.TYPE}.{daemon_id}')
414 raise OrchestratorError("unknown daemon type")
416 def get_config_and_keyring(self
,
420 keyring
: Optional
[str] = None,
421 extra_ceph_config
: Optional
[str] = None
425 entity
: AuthEntity
= self
.get_auth_entity(daemon_id
, host
=host
)
426 ret
, keyring
, err
= self
.mgr
.check_mon_command({
427 'prefix': 'auth get',
431 config
= self
.mgr
.get_minimal_ceph_conf()
433 if extra_ceph_config
:
434 config
+= extra_ceph_config
441 def remove_keyring(self
, daemon
: DaemonDescription
) -> None:
442 assert daemon
.daemon_id
is not None
443 assert daemon
.hostname
is not None
444 daemon_id
: str = daemon
.daemon_id
445 host
: str = daemon
.hostname
447 if daemon_id
== 'mon':
448 # do not remove the mon keyring
451 entity
= self
.get_auth_entity(daemon_id
, host
=host
)
453 logger
.info(f
'Removing key for {entity}')
454 ret
, out
, err
= self
.mgr
.mon_command({
460 class MonService(CephService
):
463 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
465 Create a new monitor on the given host.
467 assert self
.TYPE
== daemon_spec
.daemon_type
468 name
, _
, network
= daemon_spec
.daemon_id
, daemon_spec
.host
, daemon_spec
.network
471 ret
, keyring
, err
= self
.mgr
.check_mon_command({
472 'prefix': 'auth get',
473 'entity': self
.get_auth_entity(name
),
476 extra_config
= '[mon.%s]\n' % name
478 # infer whether this is a CIDR network, addrvec, or plain IP
480 extra_config
+= 'public network = %s\n' % network
481 elif network
.startswith('[v') and network
.endswith(']'):
482 extra_config
+= 'public addrv = %s\n' % network
483 elif is_ipv6(network
):
484 extra_config
+= 'public addr = %s\n' % unwrap_ipv6(network
)
485 elif ':' not in network
:
486 extra_config
+= 'public addr = %s\n' % network
488 raise OrchestratorError(
489 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network
)
491 # try to get the public_network from the config
492 ret
, network
, err
= self
.mgr
.check_mon_command({
493 'prefix': 'config get',
495 'key': 'public_network',
497 network
= network
.strip() if network
else network
499 raise OrchestratorError(
500 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
501 if '/' not in network
:
502 raise OrchestratorError(
503 'public_network is set but does not look like a CIDR network: \'%s\'' % network
)
504 extra_config
+= 'public network = %s\n' % network
506 daemon_spec
.ceph_conf
= extra_config
507 daemon_spec
.keyring
= keyring
509 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
513 def _check_safe_to_destroy(self
, mon_id
: str) -> None:
514 ret
, out
, err
= self
.mgr
.check_mon_command({
515 'prefix': 'quorum_status',
520 raise OrchestratorError('failed to parse quorum status')
522 mons
= [m
['name'] for m
in j
['monmap']['mons']]
523 if mon_id
not in mons
:
524 logger
.info('Safe to remove mon.%s: not in monmap (%s)' % (
527 new_mons
= [m
for m
in mons
if m
!= mon_id
]
528 new_quorum
= [m
for m
in j
['quorum_names'] if m
!= mon_id
]
529 if len(new_quorum
) > len(new_mons
) / 2:
530 logger
.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
531 (mon_id
, new_quorum
, new_mons
))
533 raise OrchestratorError(
534 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id
, new_quorum
, new_mons
))
536 def pre_remove(self
, daemon
: DaemonDescription
) -> None:
537 super().pre_remove(daemon
)
539 assert daemon
.daemon_id
is not None
540 daemon_id
: str = daemon
.daemon_id
541 self
._check
_safe
_to
_destroy
(daemon_id
)
543 # remove mon from quorum before we destroy the daemon
544 logger
.info('Removing monitor %s from monmap...' % daemon_id
)
545 ret
, out
, err
= self
.mgr
.check_mon_command({
551 class MgrService(CephService
):
554 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
556 Create a new manager instance on a host.
558 assert self
.TYPE
== daemon_spec
.daemon_type
559 mgr_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
562 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mgr_id
),
563 ['mon', 'profile mgr',
567 # Retrieve ports used by manager modules
568 # In the case of the dashboard port and with several manager daemons
569 # running in different hosts, it exists the possibility that the
570 # user has decided to use different dashboard ports in each server
571 # If this is the case then the dashboard port opened will be only the used
574 ret
, mgr_services
, err
= self
.mgr
.check_mon_command({
575 'prefix': 'mgr services',
578 mgr_endpoints
= json
.loads(mgr_services
)
579 for end_point
in mgr_endpoints
.values():
580 port
= re
.search(r
'\:\d+\/', end_point
)
582 ports
.append(int(port
[0][1:-1]))
585 daemon_spec
.ports
= ports
587 daemon_spec
.keyring
= keyring
589 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
593 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
594 for daemon
in daemon_descrs
:
595 assert daemon
.daemon_type
is not None
596 assert daemon
.daemon_id
is not None
597 if self
.mgr
.daemon_is_self(daemon
.daemon_type
, daemon
.daemon_id
):
599 # if no active mgr found, return empty Daemon Desc
600 return DaemonDescription()
602 def fail_over(self
) -> None:
603 if not self
.mgr_map_has_standby():
604 raise OrchestratorError('Need standby mgr daemon', event_kind_subject
=(
605 'daemon', 'mgr' + self
.mgr
.get_mgr_id()))
607 self
.mgr
.events
.for_daemon('mgr' + self
.mgr
.get_mgr_id(),
608 'INFO', 'Failing over to other MGR')
609 logger
.info('Failing over to other MGR')
612 ret
, out
, err
= self
.mgr
.check_mon_command({
613 'prefix': 'mgr fail',
614 'who': self
.mgr
.get_mgr_id(),
617 def mgr_map_has_standby(self
) -> bool:
619 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
620 we know it joined the cluster
622 mgr_map
= self
.mgr
.get('mgr_map')
623 num
= len(mgr_map
.get('standbys'))
628 daemon_ids
: List
[str],
630 known
: Optional
[List
[str]] = None # output argument
631 ) -> HandleCommandResult
:
632 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
634 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'Mgr', 1, True)
636 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
638 mgr_daemons
= self
.mgr
.cache
.get_daemons_by_type(self
.TYPE
)
639 active
= self
.get_active_daemon(mgr_daemons
).daemon_id
640 if active
in daemon_ids
:
641 warn_message
= 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
642 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
644 return HandleCommandResult(0, warn_message
, '')
647 class MdsService(CephService
):
650 def allow_colo(self
) -> bool:
653 def config(self
, spec
: ServiceSpec
, daemon_id
: str) -> None:
654 assert self
.TYPE
== spec
.service_type
655 assert spec
.service_id
657 # ensure mds_join_fs is set for these daemons
658 ret
, out
, err
= self
.mgr
.check_mon_command({
659 'prefix': 'config set',
660 'who': 'mds.' + spec
.service_id
,
661 'name': 'mds_join_fs',
662 'value': spec
.service_id
,
665 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
666 assert self
.TYPE
== daemon_spec
.daemon_type
667 mds_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
670 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(mds_id
),
671 ['mon', 'profile mds',
672 'osd', 'allow rw tag cephfs *=*',
674 daemon_spec
.keyring
= keyring
676 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
680 def get_active_daemon(self
, daemon_descrs
: List
[DaemonDescription
]) -> DaemonDescription
:
681 active_mds_strs
= list()
682 for fs
in self
.mgr
.get('fs_map')['filesystems']:
683 mds_map
= fs
['mdsmap']
684 if mds_map
is not None:
685 for mds_id
, mds_status
in mds_map
['info'].items():
686 if mds_status
['state'] == 'up:active':
687 active_mds_strs
.append(mds_status
['name'])
688 if len(active_mds_strs
) != 0:
689 for daemon
in daemon_descrs
:
690 if daemon
.daemon_id
in active_mds_strs
:
692 # if no mds found, return empty Daemon Desc
693 return DaemonDescription()
695 def purge(self
, service_name
: str) -> None:
696 self
.mgr
.check_mon_command({
697 'prefix': 'config rm',
699 'name': 'mds_join_fs',
703 class RgwService(CephService
):
706 def allow_colo(self
) -> bool:
709 def config(self
, spec
: RGWSpec
, rgw_id
: str) -> None: # type: ignore
710 assert self
.TYPE
== spec
.service_type
712 # set rgw_realm and rgw_zone, if present
714 ret
, out
, err
= self
.mgr
.check_mon_command({
715 'prefix': 'config set',
716 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
718 'value': spec
.rgw_realm
,
721 ret
, out
, err
= self
.mgr
.check_mon_command({
722 'prefix': 'config set',
723 'who': f
"{utils.name_to_config_section('rgw')}.{spec.service_id}",
725 'value': spec
.rgw_zone
,
728 if spec
.rgw_frontend_ssl_certificate
:
729 if isinstance(spec
.rgw_frontend_ssl_certificate
, list):
730 cert_data
= '\n'.join(spec
.rgw_frontend_ssl_certificate
)
731 elif isinstance(spec
.rgw_frontend_ssl_certificate
, str):
732 cert_data
= spec
.rgw_frontend_ssl_certificate
734 raise OrchestratorError(
735 'Invalid rgw_frontend_ssl_certificate: %s'
736 % spec
.rgw_frontend_ssl_certificate
)
737 ret
, out
, err
= self
.mgr
.check_mon_command({
738 'prefix': 'config-key set',
739 'key': f
'rgw/cert/{spec.service_name()}',
743 # TODO: fail, if we don't have a spec
744 logger
.info('Saving service %s spec with placement %s' % (
745 spec
.service_name(), spec
.placement
.pretty_str()))
746 self
.mgr
.spec_store
.save(spec
)
748 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
749 assert self
.TYPE
== daemon_spec
.daemon_type
750 rgw_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
751 spec
= cast(RGWSpec
, self
.mgr
.spec_store
[daemon_spec
.service_name
].spec
)
753 keyring
= self
.get_keyring(rgw_id
)
755 if daemon_spec
.ports
:
756 port
= daemon_spec
.ports
[0]
758 # this is a redeploy of older instance that doesn't have an explicitly
759 # assigned port, in which case we can assume there is only 1 per host
760 # and it matches the spec.
761 port
= spec
.get_port()
765 ftype
= spec
.rgw_frontend_type
or "beast"
769 args
.append(f
"ssl_endpoint={daemon_spec.ip}:{port}")
771 args
.append(f
"ssl_port={port}")
772 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
775 args
.append(f
"endpoint={daemon_spec.ip}:{port}")
777 args
.append(f
"port={port}")
778 elif ftype
== 'civetweb':
781 args
.append(f
"port={daemon_spec.ip}:{port}s") # note the 's' suffix on port
783 args
.append(f
"port={port}s") # note the 's' suffix on port
784 args
.append(f
"ssl_certificate=config://rgw/cert/{spec.service_name()}")
787 args
.append(f
"port={daemon_spec.ip}:{port}")
789 args
.append(f
"port={port}")
790 frontend
= f
'{ftype} {" ".join(args)}'
792 ret
, out
, err
= self
.mgr
.check_mon_command({
793 'prefix': 'config set',
794 'who': utils
.name_to_config_section(daemon_spec
.name()),
795 'name': 'rgw_frontends',
799 daemon_spec
.keyring
= keyring
800 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
804 def get_keyring(self
, rgw_id
: str) -> str:
805 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(rgw_id
),
808 'osd', 'allow rwx tag rgw *=*'])
811 def purge(self
, service_name
: str) -> None:
812 self
.mgr
.check_mon_command({
813 'prefix': 'config rm',
814 'who': utils
.name_to_config_section(service_name
),
817 self
.mgr
.check_mon_command({
818 'prefix': 'config rm',
819 'who': utils
.name_to_config_section(service_name
),
822 self
.mgr
.check_mon_command({
823 'prefix': 'config-key rm',
824 'key': f
'rgw/cert/{service_name}',
827 def post_remove(self
, daemon
: DaemonDescription
) -> None:
828 super().post_remove(daemon
)
829 self
.mgr
.check_mon_command({
830 'prefix': 'config rm',
831 'who': utils
.name_to_config_section(daemon
.name()),
832 'name': 'rgw_frontends',
837 daemon_ids
: List
[str],
839 known
: Optional
[List
[str]] = None # output argument
840 ) -> HandleCommandResult
:
841 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
842 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
843 def ingress_present() -> bool:
844 running_ingress_daemons
= [
845 daemon
for daemon
in self
.mgr
.cache
.get_daemons_by_type('ingress') if daemon
.status
== 1]
846 running_haproxy_daemons
= [
847 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'haproxy']
848 running_keepalived_daemons
= [
849 daemon
for daemon
in running_ingress_daemons
if daemon
.daemon_type
== 'keepalived']
850 # check that there is at least one haproxy and keepalived daemon running
851 if running_haproxy_daemons
and running_keepalived_daemons
:
855 # if only 1 rgw, alert user (this is not passable with --force)
856 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(self
.TYPE
, daemon_ids
, 'RGW', 1, True)
858 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
860 # if reached here, there is > 1 rgw daemon.
861 # Say okay if load balancer present or force flag set
862 if ingress_present() or force
:
863 return HandleCommandResult(0, warn_message
, '')
865 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
867 warn_message
= "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
868 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
871 class RbdMirrorService(CephService
):
874 def allow_colo(self
) -> bool:
877 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
878 assert self
.TYPE
== daemon_spec
.daemon_type
879 daemon_id
, _
= daemon_spec
.daemon_id
, daemon_spec
.host
881 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
),
882 ['mon', 'profile rbd-mirror',
883 'osd', 'profile rbd'])
885 daemon_spec
.keyring
= keyring
887 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
893 daemon_ids
: List
[str],
895 known
: Optional
[List
[str]] = None # output argument
896 ) -> HandleCommandResult
:
897 # if only 1 rbd-mirror, alert user (this is not passable with --force)
898 warn
, warn_message
= self
._enough
_daemons
_to
_stop
(
899 self
.TYPE
, daemon_ids
, 'Rbdmirror', 1, True)
901 return HandleCommandResult(-errno
.EBUSY
, '', warn_message
)
902 return HandleCommandResult(0, warn_message
, '')
905 class CrashService(CephService
):
908 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
909 assert self
.TYPE
== daemon_spec
.daemon_type
910 daemon_id
, host
= daemon_spec
.daemon_id
, daemon_spec
.host
912 keyring
= self
.get_keyring_with_caps(self
.get_auth_entity(daemon_id
, host
=host
),
913 ['mon', 'profile crash',
914 'mgr', 'profile crash'])
916 daemon_spec
.keyring
= keyring
918 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)
923 class CephfsMirrorService(CephService
):
924 TYPE
= 'cephfs-mirror'
926 def prepare_create(self
, daemon_spec
: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec
:
927 assert self
.TYPE
== daemon_spec
.daemon_type
929 ret
, keyring
, err
= self
.mgr
.check_mon_command({
930 'prefix': 'auth get-or-create',
931 'entity': self
.get_auth_entity(daemon_spec
.daemon_id
),
932 'caps': ['mon', 'allow r',
934 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
938 daemon_spec
.keyring
= keyring
939 daemon_spec
.final_config
, daemon_spec
.deps
= self
.generate_config(daemon_spec
)