]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
import ceph 16.2.7
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
CommitLineData
f67539c2 1import errno
f6b5b4d7 2import json
e306af50 3import logging
f67539c2 4import re
f6b5b4d7 5from abc import ABCMeta, abstractmethod
f67539c2
TL
6from typing import TYPE_CHECKING, List, Callable, TypeVar, \
7 Optional, Dict, Any, Tuple, NewType, cast
e306af50 8
f6b5b4d7 9from mgr_module import HandleCommandResult, MonCommandFailed
e306af50
TL
10
11from ceph.deployment.service_spec import ServiceSpec, RGWSpec
f91f0fd5 12from ceph.deployment.utils import is_ipv6, unwrap_ipv6
a4b75251 13from mgr_util import build_url
f67539c2
TL
14from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
15from orchestrator._interface import daemon_type_to_service
e306af50
TL
16from cephadm import utils
17
18if TYPE_CHECKING:
19 from cephadm.module import CephadmOrchestrator
20
21logger = logging.getLogger(__name__)
22
f6b5b4d7 23ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
f91f0fd5 24AuthEntity = NewType('AuthEntity', str)
e306af50 25
f6b5b4d7 26
f67539c2 27class CephadmDaemonDeploySpec:
f6b5b4d7 28 # typing.NamedTuple + Generic is broken in py36
f91f0fd5 29 def __init__(self, host: str, daemon_id: str,
f67539c2 30 service_name: str,
f91f0fd5
TL
31 network: Optional[str] = None,
32 keyring: Optional[str] = None,
33 extra_args: Optional[List[str]] = None,
34 ceph_conf: str = '',
35 extra_files: Optional[Dict[str, Any]] = None,
36 daemon_type: Optional[str] = None,
f67539c2 37 ip: Optional[str] = None,
b3b6e05e
TL
38 ports: Optional[List[int]] = None,
39 rank: Optional[int] = None,
40 rank_generation: Optional[int] = None):
f6b5b4d7 41 """
f67539c2 42 A data struction to encapsulate `cephadm deploy ...
f6b5b4d7
TL
43 """
44 self.host: str = host
45 self.daemon_id = daemon_id
f67539c2
TL
46 self.service_name = service_name
47 daemon_type = daemon_type or (service_name.split('.')[0])
f6b5b4d7
TL
48 assert daemon_type is not None
49 self.daemon_type: str = daemon_type
50
f6b5b4d7
TL
51 # mons
52 self.network = network
53
54 # for run_cephadm.
55 self.keyring: Optional[str] = keyring
56
57 # For run_cephadm. Would be great to have more expressive names.
58 self.extra_args: List[str] = extra_args or []
f91f0fd5
TL
59
60 self.ceph_conf = ceph_conf
61 self.extra_files = extra_files or {}
f6b5b4d7
TL
62
63 # TCP ports used by the daemon
f67539c2
TL
64 self.ports: List[int] = ports or []
65 self.ip: Optional[str] = ip
66
67 # values to be populated during generate_config calls
68 # and then used in _run_cephadm
69 self.final_config: Dict[str, Any] = {}
70 self.deps: List[str] = []
f6b5b4d7 71
b3b6e05e
TL
72 self.rank: Optional[int] = rank
73 self.rank_generation: Optional[int] = rank_generation
74
f6b5b4d7
TL
75 def name(self) -> str:
76 return '%s.%s' % (self.daemon_type, self.daemon_id)
77
f91f0fd5
TL
78 def config_get_files(self) -> Dict[str, Any]:
79 files = self.extra_files
80 if self.ceph_conf:
81 files['config'] = self.ceph_conf
82
83 return files
84
f67539c2
TL
85 @staticmethod
86 def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
87 assert dd.hostname
88 assert dd.daemon_id
89 assert dd.daemon_type
90 return CephadmDaemonDeploySpec(
91 host=dd.hostname,
92 daemon_id=dd.daemon_id,
93 daemon_type=dd.daemon_type,
94 service_name=dd.service_name(),
95 ip=dd.ip,
96 ports=dd.ports,
b3b6e05e
TL
97 rank=dd.rank,
98 rank_generation=dd.rank_generation,
f67539c2
TL
99 )
100
101 def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
102 return DaemonDescription(
103 daemon_type=self.daemon_type,
104 daemon_id=self.daemon_id,
b3b6e05e 105 service_name=self.service_name,
f67539c2
TL
106 hostname=self.host,
107 status=status,
108 status_desc=status_desc,
109 ip=self.ip,
110 ports=self.ports,
b3b6e05e
TL
111 rank=self.rank,
112 rank_generation=self.rank_generation,
f67539c2
TL
113 )
114
f6b5b4d7
TL
115
116class CephadmService(metaclass=ABCMeta):
e306af50
TL
117 """
118 Base class for service types. Often providing a create() and config() fn.
119 """
f6b5b4d7
TL
120
121 @property
122 @abstractmethod
f91f0fd5 123 def TYPE(self) -> str:
f6b5b4d7
TL
124 pass
125
e306af50
TL
126 def __init__(self, mgr: "CephadmOrchestrator"):
127 self.mgr: "CephadmOrchestrator" = mgr
128
f67539c2 129 def allow_colo(self) -> bool:
b3b6e05e
TL
130 """
131 Return True if multiple daemons of the same type can colocate on
132 the same host.
133 """
f67539c2
TL
134 return False
135
b3b6e05e
TL
136 def primary_daemon_type(self) -> str:
137 """
138 This is the type of the primary (usually only) daemon to be deployed.
139 """
140 return self.TYPE
141
f67539c2 142 def per_host_daemon_type(self) -> Optional[str]:
b3b6e05e
TL
143 """
144 If defined, this type of daemon will be deployed once for each host
145 containing one or more daemons of the primary type.
146 """
f67539c2
TL
147 return None
148
b3b6e05e
TL
149 def ranked(self) -> bool:
150 """
151 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
152 generation (0, 1, ...) to each daemon we create/deploy.
153 """
154 return False
155
156 def fence_old_ranks(self,
157 spec: ServiceSpec,
158 rank_map: Dict[int, Dict[int, Optional[str]]],
159 num_ranks: int) -> None:
160 assert False
f67539c2
TL
161
162 def make_daemon_spec(
b3b6e05e
TL
163 self,
164 host: str,
f67539c2
TL
165 daemon_id: str,
166 network: str,
167 spec: ServiceSpecs,
168 daemon_type: Optional[str] = None,
169 ports: Optional[List[int]] = None,
170 ip: Optional[str] = None,
b3b6e05e
TL
171 rank: Optional[int] = None,
172 rank_generation: Optional[int] = None,
f67539c2
TL
173 ) -> CephadmDaemonDeploySpec:
174 return CephadmDaemonDeploySpec(
f6b5b4d7
TL
175 host=host,
176 daemon_id=daemon_id,
f67539c2
TL
177 service_name=spec.service_name(),
178 network=network,
179 daemon_type=daemon_type,
180 ports=ports,
181 ip=ip,
b3b6e05e
TL
182 rank=rank,
183 rank_generation=rank_generation,
f6b5b4d7
TL
184 )
185
f67539c2 186 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7
TL
187 raise NotImplementedError()
188
f67539c2 189 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
f91f0fd5 190 raise NotImplementedError()
f6b5b4d7 191
522d829b 192 def config(self, spec: ServiceSpec) -> None:
f67539c2
TL
193 """
194 Configure the cluster for this service. Only called *once* per
195 service apply. Not for every daemon.
196 """
197 pass
198
f91f0fd5 199 def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
e306af50 200 """The post actions needed to be done after daemons are checked"""
f6b5b4d7
TL
201 if self.mgr.config_dashboard:
202 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
203 self.config_dashboard(daemon_descrs)
204 else:
205 logger.debug('Dashboard is not enabled. Skip configuration.')
206
f91f0fd5 207 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
f6b5b4d7 208 """Config dashboard settings."""
e306af50
TL
209 raise NotImplementedError()
210
211 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
f6b5b4d7
TL
212 # if this is called for a service type where it hasn't explcitly been
213 # defined, return empty Daemon Desc
214 return DaemonDescription()
e306af50 215
f67539c2
TL
216 def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
217 ret, keyring, err = self.mgr.mon_command({
218 'prefix': 'auth get-or-create',
219 'entity': entity,
220 'caps': caps,
221 })
222 if err:
223 ret, out, err = self.mgr.mon_command({
224 'prefix': 'auth caps',
225 'entity': entity,
226 'caps': caps,
227 })
228 if err:
229 self.mgr.log.warning(f"Unable to update caps for {entity}")
230 return keyring
231
f6b5b4d7 232 def _inventory_get_addr(self, hostname: str) -> str:
e306af50
TL
233 """Get a host's address with its hostname."""
234 return self.mgr.inventory.get_addr(hostname)
235
236 def _set_service_url_on_dashboard(self,
237 service_name: str,
238 get_mon_cmd: str,
239 set_mon_cmd: str,
f91f0fd5 240 service_url: str) -> None:
f6b5b4d7
TL
241 """A helper to get and set service_url via Dashboard's MON command.
242
243 If result of get_mon_cmd differs from service_url, set_mon_cmd will
244 be sent to set the service_url.
245 """
246 def get_set_cmd_dicts(out: str) -> List[dict]:
247 cmd_dict = {
248 'prefix': set_mon_cmd,
249 'value': service_url
250 }
251 return [cmd_dict] if service_url != out else []
252
253 self._check_and_set_dashboard(
254 service_name=service_name,
255 get_cmd=get_mon_cmd,
256 get_set_cmd_dicts=get_set_cmd_dicts
257 )
258
259 def _check_and_set_dashboard(self,
260 service_name: str,
261 get_cmd: str,
f91f0fd5 262 get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
f6b5b4d7
TL
263 """A helper to set configs in the Dashboard.
264
265 The method is useful for the pattern:
266 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
267 gateways.
268 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
269 - Determine if the config need to be update. NOTE: This step is important because if a
270 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
271 kicks the serve() loop and the logic using this method is likely to be called again.
272 A config should be updated only when needed.
273 - Update a config in Dashboard by using a Dashboard command.
274
275 :param service_name: the service name to be used for logging
276 :type service_name: str
277 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
278 :type get_cmd: str
279 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
280 e.g.
281 [
282 {
283 'prefix': 'dashboard iscsi-gateway-add',
284 'service_url': 'http://admin:admin@aaa:5000',
285 'name': 'aaa'
286 },
287 {
288 'prefix': 'dashboard iscsi-gateway-add',
289 'service_url': 'http://admin:admin@bbb:5000',
290 'name': 'bbb'
291 }
292 ]
293 The function should return empty list if no command need to be sent.
294 :type get_set_cmd_dicts: Callable[[str], List[dict]]
295 """
296
e306af50
TL
297 try:
298 _, out, _ = self.mgr.check_mon_command({
f6b5b4d7 299 'prefix': get_cmd
e306af50
TL
300 })
301 except MonCommandFailed as e:
f6b5b4d7 302 logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
e306af50 303 return
f6b5b4d7
TL
304 cmd_dicts = get_set_cmd_dicts(out.strip())
305 for cmd_dict in list(cmd_dicts):
e306af50 306 try:
cd265ab1
TL
307 inbuf = cmd_dict.pop('inbuf', None)
308 _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
e306af50 309 except MonCommandFailed as e:
f6b5b4d7
TL
310 logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
311
f67539c2
TL
312 def ok_to_stop_osd(
313 self,
314 osds: List[str],
315 known: Optional[List[str]] = None, # output argument
316 force: bool = False) -> HandleCommandResult:
317 r = HandleCommandResult(*self.mgr.mon_command({
318 'prefix': "osd ok-to-stop",
319 'ids': osds,
320 'max': 16,
321 }))
322 j = None
323 try:
324 j = json.loads(r.stdout)
325 except json.decoder.JSONDecodeError:
326 self.mgr.log.warning("osd ok-to-stop didn't return structured result")
327 raise
328 if r.retval:
329 return r
330 if known is not None and j and j.get('ok_to_stop'):
331 self.mgr.log.debug(f"got {j}")
332 known.extend([f'osd.{x}' for x in j.get('osds', [])])
333 return HandleCommandResult(
334 0,
335 f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
336 ''
337 )
338
339 def ok_to_stop(
340 self,
341 daemon_ids: List[str],
342 force: bool = False,
343 known: Optional[List[str]] = None # output argument
344 ) -> HandleCommandResult:
f6b5b4d7 345 names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
f67539c2
TL
346 out = f'It appears safe to stop {",".join(names)}'
347 err = f'It is NOT safe to stop {",".join(names)} at this time'
f6b5b4d7
TL
348
349 if self.TYPE not in ['mon', 'osd', 'mds']:
f67539c2
TL
350 logger.debug(out)
351 return HandleCommandResult(0, out)
352
353 if self.TYPE == 'osd':
354 return self.ok_to_stop_osd(daemon_ids, known, force)
f6b5b4d7
TL
355
356 r = HandleCommandResult(*self.mgr.mon_command({
357 'prefix': f'{self.TYPE} ok-to-stop',
358 'ids': daemon_ids,
359 }))
360
361 if r.retval:
362 err = f'{err}: {r.stderr}' if r.stderr else err
f67539c2 363 logger.debug(err)
f6b5b4d7 364 return HandleCommandResult(r.retval, r.stdout, err)
e306af50 365
f6b5b4d7 366 out = f'{out}: {r.stdout}' if r.stdout else out
f67539c2 367 logger.debug(out)
f6b5b4d7
TL
368 return HandleCommandResult(r.retval, out, r.stderr)
369
f67539c2
TL
370 def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
371 # Provides a warning about if it possible or not to stop <n> daemons in a service
372 names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
373 number_of_running_daemons = len(
374 [daemon
375 for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
376 if daemon.status == DaemonDescriptionStatus.running])
377 if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
378 return False, f'It is presumed safe to stop {names}'
379
380 num_daemons_left = number_of_running_daemons - len(daemon_ids)
381
382 def plural(count: int) -> str:
383 return 'daemon' if count == 1 else 'daemons'
384
385 left_count = "no" if num_daemons_left == 0 else num_daemons_left
386
387 if alert:
388 out = (f'ALERT: Cannot stop {names} in {service} service. '
389 f'Not enough remaining {service} daemons. '
390 f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
391 else:
392 out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
393 f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
394 f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
395 return True, out
396
f91f0fd5 397 def pre_remove(self, daemon: DaemonDescription) -> None:
f6b5b4d7
TL
398 """
399 Called before the daemon is removed.
400 """
f67539c2
TL
401 assert daemon.daemon_type is not None
402 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
f91f0fd5
TL
403 logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
404
a4b75251 405 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
f91f0fd5
TL
406 """
407 Called after the daemon is removed.
408 """
f67539c2
TL
409 assert daemon.daemon_type is not None
410 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
f91f0fd5
TL
411 logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
412
f67539c2
TL
413 def purge(self, service_name: str) -> None:
414 """Called to carry out any purge tasks following service removal"""
415 logger.debug(f'Purge called for {self.TYPE} - no action taken')
416
f91f0fd5
TL
417
418class CephService(CephadmService):
f67539c2 419 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
f91f0fd5
TL
420 # Ceph.daemons (mon, mgr, mds, osd, etc)
421 cephadm_config = self.get_config_and_keyring(
422 daemon_spec.daemon_type,
423 daemon_spec.daemon_id,
424 host=daemon_spec.host,
425 keyring=daemon_spec.keyring,
426 extra_ceph_config=daemon_spec.ceph_conf)
427
428 if daemon_spec.config_get_files():
429 cephadm_config.update({'files': daemon_spec.config_get_files()})
430
431 return cephadm_config, []
432
a4b75251
TL
433 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
434 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
f91f0fd5 435 self.remove_keyring(daemon)
e306af50 436
f91f0fd5
TL
437 def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
438 """
439 Map the daemon id to a cephx keyring entity name
440 """
f67539c2
TL
441 # despite this mapping entity names to daemons, self.TYPE within
442 # the CephService class refers to service types, not daemon types
443 if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
f91f0fd5
TL
444 return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
445 elif self.TYPE == 'crash':
446 if host == "":
447 raise OrchestratorError("Host not provided to generate <crash> auth entity name")
448 return AuthEntity(f'client.{self.TYPE}.{host}')
449 elif self.TYPE == 'mon':
450 return AuthEntity('mon.')
451 elif self.TYPE in ['mgr', 'osd', 'mds']:
452 return AuthEntity(f'{self.TYPE}.{daemon_id}')
453 else:
454 raise OrchestratorError("unknown daemon type")
455
456 def get_config_and_keyring(self,
457 daemon_type: str,
458 daemon_id: str,
459 host: str,
460 keyring: Optional[str] = None,
461 extra_ceph_config: Optional[str] = None
462 ) -> Dict[str, Any]:
463 # keyring
464 if not keyring:
465 entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
466 ret, keyring, err = self.mgr.check_mon_command({
467 'prefix': 'auth get',
468 'entity': entity,
469 })
470
471 config = self.mgr.get_minimal_ceph_conf()
472
473 if extra_ceph_config:
474 config += extra_ceph_config
475
476 return {
477 'config': config,
478 'keyring': keyring,
479 }
480
481 def remove_keyring(self, daemon: DaemonDescription) -> None:
f67539c2
TL
482 assert daemon.daemon_id is not None
483 assert daemon.hostname is not None
f91f0fd5
TL
484 daemon_id: str = daemon.daemon_id
485 host: str = daemon.hostname
486
a4b75251 487 assert daemon.daemon_type != 'mon'
f91f0fd5
TL
488
489 entity = self.get_auth_entity(daemon_id, host=host)
490
f67539c2
TL
491 logger.info(f'Removing key for {entity}')
492 ret, out, err = self.mgr.mon_command({
f91f0fd5
TL
493 'prefix': 'auth rm',
494 'entity': entity,
495 })
496
497
498class MonService(CephService):
f6b5b4d7
TL
499 TYPE = 'mon'
500
f67539c2 501 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
e306af50
TL
502 """
503 Create a new monitor on the given host.
504 """
f6b5b4d7 505 assert self.TYPE == daemon_spec.daemon_type
f67539c2 506 name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
f6b5b4d7 507
e306af50
TL
508 # get mon. key
509 ret, keyring, err = self.mgr.check_mon_command({
510 'prefix': 'auth get',
f91f0fd5 511 'entity': self.get_auth_entity(name),
e306af50
TL
512 })
513
514 extra_config = '[mon.%s]\n' % name
515 if network:
516 # infer whether this is a CIDR network, addrvec, or plain IP
517 if '/' in network:
518 extra_config += 'public network = %s\n' % network
519 elif network.startswith('[v') and network.endswith(']'):
520 extra_config += 'public addrv = %s\n' % network
f91f0fd5
TL
521 elif is_ipv6(network):
522 extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
e306af50
TL
523 elif ':' not in network:
524 extra_config += 'public addr = %s\n' % network
525 else:
f91f0fd5
TL
526 raise OrchestratorError(
527 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
e306af50
TL
528 else:
529 # try to get the public_network from the config
530 ret, network, err = self.mgr.check_mon_command({
531 'prefix': 'config get',
532 'who': 'mon',
533 'key': 'public_network',
534 })
f6b5b4d7 535 network = network.strip() if network else network
e306af50 536 if not network:
f91f0fd5
TL
537 raise OrchestratorError(
538 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
e306af50 539 if '/' not in network:
f91f0fd5
TL
540 raise OrchestratorError(
541 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
e306af50
TL
542 extra_config += 'public network = %s\n' % network
543
f91f0fd5
TL
544 daemon_spec.ceph_conf = extra_config
545 daemon_spec.keyring = keyring
f6b5b4d7 546
f67539c2
TL
547 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
548
f91f0fd5 549 return daemon_spec
f6b5b4d7
TL
550
551 def _check_safe_to_destroy(self, mon_id: str) -> None:
552 ret, out, err = self.mgr.check_mon_command({
553 'prefix': 'quorum_status',
554 })
555 try:
556 j = json.loads(out)
f67539c2 557 except Exception:
f6b5b4d7
TL
558 raise OrchestratorError('failed to parse quorum status')
559
560 mons = [m['name'] for m in j['monmap']['mons']]
561 if mon_id not in mons:
562 logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
563 mon_id, mons))
564 return
565 new_mons = [m for m in mons if m != mon_id]
566 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
567 if len(new_quorum) > len(new_mons) / 2:
f91f0fd5
TL
568 logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
569 (mon_id, new_quorum, new_mons))
f6b5b4d7 570 return
f91f0fd5
TL
571 raise OrchestratorError(
572 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
f6b5b4d7 573
f91f0fd5
TL
574 def pre_remove(self, daemon: DaemonDescription) -> None:
575 super().pre_remove(daemon)
f6b5b4d7 576
f67539c2 577 assert daemon.daemon_id is not None
f91f0fd5 578 daemon_id: str = daemon.daemon_id
f6b5b4d7
TL
579 self._check_safe_to_destroy(daemon_id)
580
581 # remove mon from quorum before we destroy the daemon
582 logger.info('Removing monitor %s from monmap...' % daemon_id)
583 ret, out, err = self.mgr.check_mon_command({
584 'prefix': 'mon rm',
585 'name': daemon_id,
586 })
e306af50 587
a4b75251
TL
588 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
589 # Do not remove the mon keyring.
590 # super().post_remove(daemon)
591 pass
592
e306af50 593
f91f0fd5 594class MgrService(CephService):
f6b5b4d7
TL
595 TYPE = 'mgr'
596
b3b6e05e
TL
597 def allow_colo(self) -> bool:
598 if self.mgr.get_ceph_option('mgr_standby_modules'):
599 # traditional mgr mode: standby daemons' modules listen on
600 # ports and redirect to the primary. we must not schedule
601 # multiple mgrs on the same host or else ports will
602 # conflict.
603 return False
604 else:
605 # standby daemons do nothing, and therefore port conflicts
606 # are not a concern.
607 return True
608
f67539c2 609 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
e306af50
TL
610 """
611 Create a new manager instance on a host.
612 """
f6b5b4d7 613 assert self.TYPE == daemon_spec.daemon_type
f67539c2 614 mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
f6b5b4d7 615
e306af50 616 # get mgr. key
f67539c2
TL
617 keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
618 ['mon', 'profile mgr',
619 'osd', 'allow *',
620 'mds', 'allow *'])
e306af50 621
f6b5b4d7
TL
622 # Retrieve ports used by manager modules
623 # In the case of the dashboard port and with several manager daemons
624 # running in different hosts, it exists the possibility that the
625 # user has decided to use different dashboard ports in each server
626 # If this is the case then the dashboard port opened will be only the used
627 # as default.
628 ports = []
f6b5b4d7 629 ret, mgr_services, err = self.mgr.check_mon_command({
f91f0fd5 630 'prefix': 'mgr services',
f6b5b4d7
TL
631 })
632 if mgr_services:
633 mgr_endpoints = json.loads(mgr_services)
634 for end_point in mgr_endpoints.values():
f91f0fd5 635 port = re.search(r'\:\d+\/', end_point)
f6b5b4d7
TL
636 if port:
637 ports.append(int(port[0][1:-1]))
638
639 if ports:
640 daemon_spec.ports = ports
641
642 daemon_spec.keyring = keyring
643
f67539c2
TL
644 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
645
f91f0fd5
TL
646 return daemon_spec
647
f6b5b4d7 648 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
f6b5b4d7 649 for daemon in daemon_descrs:
f67539c2
TL
650 assert daemon.daemon_type is not None
651 assert daemon.daemon_id is not None
f91f0fd5 652 if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
f6b5b4d7
TL
653 return daemon
654 # if no active mgr found, return empty Daemon Desc
655 return DaemonDescription()
e306af50 656
f91f0fd5
TL
657 def fail_over(self) -> None:
658 if not self.mgr_map_has_standby():
659 raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
660 'daemon', 'mgr' + self.mgr.get_mgr_id()))
661
662 self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
663 'INFO', 'Failing over to other MGR')
664 logger.info('Failing over to other MGR')
665
666 # fail over
667 ret, out, err = self.mgr.check_mon_command({
668 'prefix': 'mgr fail',
669 'who': self.mgr.get_mgr_id(),
670 })
671
672 def mgr_map_has_standby(self) -> bool:
673 """
674 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
675 we know it joined the cluster
676 """
677 mgr_map = self.mgr.get('mgr_map')
678 num = len(mgr_map.get('standbys'))
679 return bool(num)
680
f67539c2
TL
681 def ok_to_stop(
682 self,
683 daemon_ids: List[str],
684 force: bool = False,
685 known: Optional[List[str]] = None # output argument
686 ) -> HandleCommandResult:
687 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
688
689 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
690 if warn:
691 return HandleCommandResult(-errno.EBUSY, '', warn_message)
692
693 mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
694 active = self.get_active_daemon(mgr_daemons).daemon_id
695 if active in daemon_ids:
696 warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
697 return HandleCommandResult(-errno.EBUSY, '', warn_message)
698
699 return HandleCommandResult(0, warn_message, '')
700
e306af50 701
f91f0fd5 702class MdsService(CephService):
f6b5b4d7
TL
703 TYPE = 'mds'
704
f67539c2
TL
705 def allow_colo(self) -> bool:
706 return True
707
522d829b 708 def config(self, spec: ServiceSpec) -> None:
f6b5b4d7 709 assert self.TYPE == spec.service_type
e306af50 710 assert spec.service_id
f6b5b4d7
TL
711
712 # ensure mds_join_fs is set for these daemons
e306af50
TL
713 ret, out, err = self.mgr.check_mon_command({
714 'prefix': 'config set',
715 'who': 'mds.' + spec.service_id,
716 'name': 'mds_join_fs',
717 'value': spec.service_id,
718 })
719
f67539c2 720 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7 721 assert self.TYPE == daemon_spec.daemon_type
f67539c2 722 mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
f6b5b4d7 723
f67539c2
TL
724 # get mds. key
725 keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
726 ['mon', 'profile mds',
727 'osd', 'allow rw tag cephfs *=*',
728 'mds', 'allow'])
f6b5b4d7
TL
729 daemon_spec.keyring = keyring
730
f67539c2
TL
731 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
732
f91f0fd5
TL
733 return daemon_spec
734
f6b5b4d7
TL
735 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
736 active_mds_strs = list()
737 for fs in self.mgr.get('fs_map')['filesystems']:
738 mds_map = fs['mdsmap']
739 if mds_map is not None:
740 for mds_id, mds_status in mds_map['info'].items():
741 if mds_status['state'] == 'up:active':
742 active_mds_strs.append(mds_status['name'])
743 if len(active_mds_strs) != 0:
744 for daemon in daemon_descrs:
745 if daemon.daemon_id in active_mds_strs:
746 return daemon
747 # if no mds found, return empty Daemon Desc
748 return DaemonDescription()
e306af50 749
f67539c2
TL
750 def purge(self, service_name: str) -> None:
751 self.mgr.check_mon_command({
752 'prefix': 'config rm',
753 'who': service_name,
754 'name': 'mds_join_fs',
755 })
756
e306af50 757
f91f0fd5 758class RgwService(CephService):
f6b5b4d7
TL
759 TYPE = 'rgw'
760
f67539c2
TL
761 def allow_colo(self) -> bool:
762 return True
f6b5b4d7 763
522d829b 764 def config(self, spec: RGWSpec) -> None: # type: ignore
f67539c2 765 assert self.TYPE == spec.service_type
f6b5b4d7 766
f67539c2
TL
767 # set rgw_realm and rgw_zone, if present
768 if spec.rgw_realm:
769 ret, out, err = self.mgr.check_mon_command({
770 'prefix': 'config set',
771 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
772 'name': 'rgw_realm',
773 'value': spec.rgw_realm,
774 })
775 if spec.rgw_zone:
776 ret, out, err = self.mgr.check_mon_command({
777 'prefix': 'config set',
778 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
779 'name': 'rgw_zone',
780 'value': spec.rgw_zone,
781 })
e306af50
TL
782
783 if spec.rgw_frontend_ssl_certificate:
784 if isinstance(spec.rgw_frontend_ssl_certificate, list):
785 cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
786 elif isinstance(spec.rgw_frontend_ssl_certificate, str):
787 cert_data = spec.rgw_frontend_ssl_certificate
788 else:
789 raise OrchestratorError(
f91f0fd5
TL
790 'Invalid rgw_frontend_ssl_certificate: %s'
791 % spec.rgw_frontend_ssl_certificate)
e306af50
TL
792 ret, out, err = self.mgr.check_mon_command({
793 'prefix': 'config-key set',
f67539c2 794 'key': f'rgw/cert/{spec.service_name()}',
e306af50
TL
795 'val': cert_data,
796 })
797
f67539c2 798 # TODO: fail, if we don't have a spec
e306af50
TL
799 logger.info('Saving service %s spec with placement %s' % (
800 spec.service_name(), spec.placement.pretty_str()))
801 self.mgr.spec_store.save(spec)
522d829b 802 self.mgr.trigger_connect_dashboard_rgw()
e306af50 803
f67539c2 804 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7 805 assert self.TYPE == daemon_spec.daemon_type
f67539c2
TL
806 rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
807 spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
f6b5b4d7
TL
808
809 keyring = self.get_keyring(rgw_id)
810
f67539c2
TL
811 if daemon_spec.ports:
812 port = daemon_spec.ports[0]
813 else:
814 # this is a redeploy of older instance that doesn't have an explicitly
815 # assigned port, in which case we can assume there is only 1 per host
816 # and it matches the spec.
817 port = spec.get_port()
818
819 # configure frontend
820 args = []
821 ftype = spec.rgw_frontend_type or "beast"
822 if ftype == 'beast':
823 if spec.ssl:
824 if daemon_spec.ip:
a4b75251
TL
825 args.append(
826 f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
f67539c2
TL
827 else:
828 args.append(f"ssl_port={port}")
829 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
830 else:
831 if daemon_spec.ip:
a4b75251 832 args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
f67539c2
TL
833 else:
834 args.append(f"port={port}")
835 elif ftype == 'civetweb':
836 if spec.ssl:
837 if daemon_spec.ip:
a4b75251
TL
838 # note the 's' suffix on port
839 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
f67539c2
TL
840 else:
841 args.append(f"port={port}s") # note the 's' suffix on port
842 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
843 else:
844 if daemon_spec.ip:
a4b75251 845 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
f67539c2
TL
846 else:
847 args.append(f"port={port}")
848 frontend = f'{ftype} {" ".join(args)}'
849
850 ret, out, err = self.mgr.check_mon_command({
851 'prefix': 'config set',
852 'who': utils.name_to_config_section(daemon_spec.name()),
853 'name': 'rgw_frontends',
854 'value': frontend
855 })
856
f6b5b4d7 857 daemon_spec.keyring = keyring
f67539c2 858 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
f6b5b4d7 859
f91f0fd5 860 return daemon_spec
f6b5b4d7 861
f91f0fd5 862 def get_keyring(self, rgw_id: str) -> str:
f67539c2
TL
863 keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
864 ['mon', 'allow *',
865 'mgr', 'allow rw',
866 'osd', 'allow rwx tag rgw *=*'])
f6b5b4d7
TL
867 return keyring
868
f67539c2
TL
869 def purge(self, service_name: str) -> None:
870 self.mgr.check_mon_command({
871 'prefix': 'config rm',
872 'who': utils.name_to_config_section(service_name),
873 'name': 'rgw_realm',
874 })
875 self.mgr.check_mon_command({
876 'prefix': 'config rm',
877 'who': utils.name_to_config_section(service_name),
878 'name': 'rgw_zone',
879 })
880 self.mgr.check_mon_command({
881 'prefix': 'config-key rm',
882 'key': f'rgw/cert/{service_name}',
883 })
522d829b 884 self.mgr.trigger_connect_dashboard_rgw()
f67539c2 885
a4b75251
TL
886 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
887 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
f67539c2
TL
888 self.mgr.check_mon_command({
889 'prefix': 'config rm',
890 'who': utils.name_to_config_section(daemon.name()),
891 'name': 'rgw_frontends',
892 })
893
894 def ok_to_stop(
895 self,
896 daemon_ids: List[str],
897 force: bool = False,
898 known: Optional[List[str]] = None # output argument
899 ) -> HandleCommandResult:
900 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
901 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
902 def ingress_present() -> bool:
903 running_ingress_daemons = [
904 daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
905 running_haproxy_daemons = [
906 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
907 running_keepalived_daemons = [
908 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
909 # check that there is at least one haproxy and keepalived daemon running
910 if running_haproxy_daemons and running_keepalived_daemons:
911 return True
912 return False
913
914 # if only 1 rgw, alert user (this is not passable with --force)
915 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
916 if warn:
917 return HandleCommandResult(-errno.EBUSY, '', warn_message)
918
919 # if reached here, there is > 1 rgw daemon.
920 # Say okay if load balancer present or force flag set
921 if ingress_present() or force:
922 return HandleCommandResult(0, warn_message, '')
923
924 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
925 # Provide warning
926 warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
927 return HandleCommandResult(-errno.EBUSY, '', warn_message)
e306af50 928
522d829b
TL
929 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
930 self.mgr.trigger_connect_dashboard_rgw()
931
e306af50 932
f91f0fd5 933class RbdMirrorService(CephService):
f6b5b4d7
TL
934 TYPE = 'rbd-mirror'
935
f67539c2
TL
936 def allow_colo(self) -> bool:
937 return True
938
939 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7 940 assert self.TYPE == daemon_spec.daemon_type
f67539c2 941 daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
f6b5b4d7 942
f67539c2
TL
943 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
944 ['mon', 'profile rbd-mirror',
945 'osd', 'profile rbd'])
f6b5b4d7
TL
946
947 daemon_spec.keyring = keyring
948
f67539c2
TL
949 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
950
f91f0fd5 951 return daemon_spec
e306af50 952
f67539c2
TL
953 def ok_to_stop(
954 self,
955 daemon_ids: List[str],
956 force: bool = False,
957 known: Optional[List[str]] = None # output argument
958 ) -> HandleCommandResult:
959 # if only 1 rbd-mirror, alert user (this is not passable with --force)
960 warn, warn_message = self._enough_daemons_to_stop(
961 self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
962 if warn:
963 return HandleCommandResult(-errno.EBUSY, '', warn_message)
964 return HandleCommandResult(0, warn_message, '')
965
e306af50 966
f91f0fd5 967class CrashService(CephService):
f6b5b4d7
TL
968 TYPE = 'crash'
969
f67539c2 970 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7
TL
971 assert self.TYPE == daemon_spec.daemon_type
972 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
973
f67539c2
TL
974 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
975 ['mon', 'profile crash',
976 'mgr', 'profile crash'])
977
978 daemon_spec.keyring = keyring
979
980 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
981
982 return daemon_spec
983
984
985class CephfsMirrorService(CephService):
986 TYPE = 'cephfs-mirror'
987
988 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
989 assert self.TYPE == daemon_spec.daemon_type
990
e306af50
TL
991 ret, keyring, err = self.mgr.check_mon_command({
992 'prefix': 'auth get-or-create',
f67539c2 993 'entity': self.get_auth_entity(daemon_spec.daemon_id),
b3b6e05e 994 'caps': ['mon', 'profile cephfs-mirror',
f67539c2
TL
995 'mds', 'allow r',
996 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
997 'mgr', 'allow r'],
e306af50 998 })
f6b5b4d7
TL
999
1000 daemon_spec.keyring = keyring
f67539c2 1001 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
f91f0fd5 1002 return daemon_spec