]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/services/cephadmservice.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / cephadm / services / cephadmservice.py
CommitLineData
f67539c2 1import errno
f6b5b4d7 2import json
e306af50 3import logging
f67539c2 4import re
f6b5b4d7 5from abc import ABCMeta, abstractmethod
f67539c2
TL
6from typing import TYPE_CHECKING, List, Callable, TypeVar, \
7 Optional, Dict, Any, Tuple, NewType, cast
e306af50 8
f6b5b4d7 9from mgr_module import HandleCommandResult, MonCommandFailed
e306af50
TL
10
11from ceph.deployment.service_spec import ServiceSpec, RGWSpec
f91f0fd5 12from ceph.deployment.utils import is_ipv6, unwrap_ipv6
a4b75251 13from mgr_util import build_url
f67539c2
TL
14from orchestrator import OrchestratorError, DaemonDescription, DaemonDescriptionStatus
15from orchestrator._interface import daemon_type_to_service
e306af50
TL
16from cephadm import utils
17
18if TYPE_CHECKING:
19 from cephadm.module import CephadmOrchestrator
20
21logger = logging.getLogger(__name__)
22
f6b5b4d7 23ServiceSpecs = TypeVar('ServiceSpecs', bound=ServiceSpec)
f91f0fd5 24AuthEntity = NewType('AuthEntity', str)
e306af50 25
f6b5b4d7 26
f67539c2 27class CephadmDaemonDeploySpec:
f6b5b4d7 28 # typing.NamedTuple + Generic is broken in py36
f91f0fd5 29 def __init__(self, host: str, daemon_id: str,
f67539c2 30 service_name: str,
f91f0fd5
TL
31 network: Optional[str] = None,
32 keyring: Optional[str] = None,
33 extra_args: Optional[List[str]] = None,
34 ceph_conf: str = '',
35 extra_files: Optional[Dict[str, Any]] = None,
36 daemon_type: Optional[str] = None,
f67539c2 37 ip: Optional[str] = None,
b3b6e05e
TL
38 ports: Optional[List[int]] = None,
39 rank: Optional[int] = None,
20effc67
TL
40 rank_generation: Optional[int] = None,
41 extra_container_args: Optional[List[str]] = None):
f6b5b4d7 42 """
f67539c2 43 A data struction to encapsulate `cephadm deploy ...
f6b5b4d7
TL
44 """
45 self.host: str = host
46 self.daemon_id = daemon_id
f67539c2
TL
47 self.service_name = service_name
48 daemon_type = daemon_type or (service_name.split('.')[0])
f6b5b4d7
TL
49 assert daemon_type is not None
50 self.daemon_type: str = daemon_type
51
f6b5b4d7
TL
52 # mons
53 self.network = network
54
55 # for run_cephadm.
56 self.keyring: Optional[str] = keyring
57
58 # For run_cephadm. Would be great to have more expressive names.
59 self.extra_args: List[str] = extra_args or []
f91f0fd5
TL
60
61 self.ceph_conf = ceph_conf
62 self.extra_files = extra_files or {}
f6b5b4d7
TL
63
64 # TCP ports used by the daemon
f67539c2
TL
65 self.ports: List[int] = ports or []
66 self.ip: Optional[str] = ip
67
68 # values to be populated during generate_config calls
69 # and then used in _run_cephadm
70 self.final_config: Dict[str, Any] = {}
71 self.deps: List[str] = []
f6b5b4d7 72
b3b6e05e
TL
73 self.rank: Optional[int] = rank
74 self.rank_generation: Optional[int] = rank_generation
75
20effc67
TL
76 self.extra_container_args = extra_container_args
77
f6b5b4d7
TL
78 def name(self) -> str:
79 return '%s.%s' % (self.daemon_type, self.daemon_id)
80
f91f0fd5
TL
81 def config_get_files(self) -> Dict[str, Any]:
82 files = self.extra_files
83 if self.ceph_conf:
84 files['config'] = self.ceph_conf
85
86 return files
87
f67539c2
TL
88 @staticmethod
89 def from_daemon_description(dd: DaemonDescription) -> 'CephadmDaemonDeploySpec':
90 assert dd.hostname
91 assert dd.daemon_id
92 assert dd.daemon_type
93 return CephadmDaemonDeploySpec(
94 host=dd.hostname,
95 daemon_id=dd.daemon_id,
96 daemon_type=dd.daemon_type,
97 service_name=dd.service_name(),
98 ip=dd.ip,
99 ports=dd.ports,
b3b6e05e
TL
100 rank=dd.rank,
101 rank_generation=dd.rank_generation,
20effc67 102 extra_container_args=dd.extra_container_args,
f67539c2
TL
103 )
104
105 def to_daemon_description(self, status: DaemonDescriptionStatus, status_desc: str) -> DaemonDescription:
106 return DaemonDescription(
107 daemon_type=self.daemon_type,
108 daemon_id=self.daemon_id,
b3b6e05e 109 service_name=self.service_name,
f67539c2
TL
110 hostname=self.host,
111 status=status,
112 status_desc=status_desc,
113 ip=self.ip,
114 ports=self.ports,
b3b6e05e
TL
115 rank=self.rank,
116 rank_generation=self.rank_generation,
20effc67 117 extra_container_args=self.extra_container_args,
f67539c2
TL
118 )
119
f6b5b4d7
TL
120
121class CephadmService(metaclass=ABCMeta):
e306af50
TL
122 """
123 Base class for service types. Often providing a create() and config() fn.
124 """
f6b5b4d7
TL
125
126 @property
127 @abstractmethod
f91f0fd5 128 def TYPE(self) -> str:
f6b5b4d7
TL
129 pass
130
e306af50
TL
131 def __init__(self, mgr: "CephadmOrchestrator"):
132 self.mgr: "CephadmOrchestrator" = mgr
133
f67539c2 134 def allow_colo(self) -> bool:
b3b6e05e
TL
135 """
136 Return True if multiple daemons of the same type can colocate on
137 the same host.
138 """
f67539c2
TL
139 return False
140
b3b6e05e
TL
141 def primary_daemon_type(self) -> str:
142 """
143 This is the type of the primary (usually only) daemon to be deployed.
144 """
145 return self.TYPE
146
f67539c2 147 def per_host_daemon_type(self) -> Optional[str]:
b3b6e05e
TL
148 """
149 If defined, this type of daemon will be deployed once for each host
150 containing one or more daemons of the primary type.
151 """
f67539c2
TL
152 return None
153
b3b6e05e
TL
154 def ranked(self) -> bool:
155 """
156 If True, we will assign a stable rank (0, 1, ...) and monotonically increasing
157 generation (0, 1, ...) to each daemon we create/deploy.
158 """
159 return False
160
161 def fence_old_ranks(self,
162 spec: ServiceSpec,
163 rank_map: Dict[int, Dict[int, Optional[str]]],
164 num_ranks: int) -> None:
165 assert False
f67539c2
TL
166
167 def make_daemon_spec(
b3b6e05e
TL
168 self,
169 host: str,
f67539c2
TL
170 daemon_id: str,
171 network: str,
172 spec: ServiceSpecs,
173 daemon_type: Optional[str] = None,
174 ports: Optional[List[int]] = None,
175 ip: Optional[str] = None,
b3b6e05e
TL
176 rank: Optional[int] = None,
177 rank_generation: Optional[int] = None,
f67539c2 178 ) -> CephadmDaemonDeploySpec:
20effc67
TL
179 try:
180 eca = spec.extra_container_args
181 except AttributeError:
182 eca = None
f67539c2 183 return CephadmDaemonDeploySpec(
f6b5b4d7
TL
184 host=host,
185 daemon_id=daemon_id,
f67539c2
TL
186 service_name=spec.service_name(),
187 network=network,
188 daemon_type=daemon_type,
189 ports=ports,
190 ip=ip,
b3b6e05e
TL
191 rank=rank,
192 rank_generation=rank_generation,
20effc67 193 extra_container_args=eca,
f6b5b4d7
TL
194 )
195
f67539c2 196 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7
TL
197 raise NotImplementedError()
198
f67539c2 199 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
f91f0fd5 200 raise NotImplementedError()
f6b5b4d7 201
522d829b 202 def config(self, spec: ServiceSpec) -> None:
f67539c2
TL
203 """
204 Configure the cluster for this service. Only called *once* per
205 service apply. Not for every daemon.
206 """
207 pass
208
f91f0fd5 209 def daemon_check_post(self, daemon_descrs: List[DaemonDescription]) -> None:
e306af50 210 """The post actions needed to be done after daemons are checked"""
f6b5b4d7
TL
211 if self.mgr.config_dashboard:
212 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
213 self.config_dashboard(daemon_descrs)
214 else:
215 logger.debug('Dashboard is not enabled. Skip configuration.')
216
f91f0fd5 217 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
f6b5b4d7 218 """Config dashboard settings."""
e306af50
TL
219 raise NotImplementedError()
220
221 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
f6b5b4d7
TL
222 # if this is called for a service type where it hasn't explcitly been
223 # defined, return empty Daemon Desc
224 return DaemonDescription()
e306af50 225
f67539c2
TL
226 def get_keyring_with_caps(self, entity: AuthEntity, caps: List[str]) -> str:
227 ret, keyring, err = self.mgr.mon_command({
228 'prefix': 'auth get-or-create',
229 'entity': entity,
230 'caps': caps,
231 })
232 if err:
233 ret, out, err = self.mgr.mon_command({
234 'prefix': 'auth caps',
235 'entity': entity,
236 'caps': caps,
237 })
238 if err:
239 self.mgr.log.warning(f"Unable to update caps for {entity}")
240 return keyring
241
f6b5b4d7 242 def _inventory_get_addr(self, hostname: str) -> str:
e306af50
TL
243 """Get a host's address with its hostname."""
244 return self.mgr.inventory.get_addr(hostname)
245
246 def _set_service_url_on_dashboard(self,
247 service_name: str,
248 get_mon_cmd: str,
249 set_mon_cmd: str,
f91f0fd5 250 service_url: str) -> None:
f6b5b4d7
TL
251 """A helper to get and set service_url via Dashboard's MON command.
252
253 If result of get_mon_cmd differs from service_url, set_mon_cmd will
254 be sent to set the service_url.
255 """
256 def get_set_cmd_dicts(out: str) -> List[dict]:
257 cmd_dict = {
258 'prefix': set_mon_cmd,
259 'value': service_url
260 }
261 return [cmd_dict] if service_url != out else []
262
263 self._check_and_set_dashboard(
264 service_name=service_name,
265 get_cmd=get_mon_cmd,
266 get_set_cmd_dicts=get_set_cmd_dicts
267 )
268
269 def _check_and_set_dashboard(self,
270 service_name: str,
271 get_cmd: str,
f91f0fd5 272 get_set_cmd_dicts: Callable[[str], List[dict]]) -> None:
f6b5b4d7
TL
273 """A helper to set configs in the Dashboard.
274
275 The method is useful for the pattern:
276 - Getting a config from Dashboard by using a Dashboard command. e.g. current iSCSI
277 gateways.
278 - Parse or deserialize previous output. e.g. Dashboard command returns a JSON string.
279 - Determine if the config need to be update. NOTE: This step is important because if a
280 Dashboard command modified Ceph config, cephadm's config_notify() is called. Which
281 kicks the serve() loop and the logic using this method is likely to be called again.
282 A config should be updated only when needed.
283 - Update a config in Dashboard by using a Dashboard command.
284
285 :param service_name: the service name to be used for logging
286 :type service_name: str
287 :param get_cmd: Dashboard command prefix to get config. e.g. dashboard get-grafana-api-url
288 :type get_cmd: str
289 :param get_set_cmd_dicts: function to create a list, and each item is a command dictionary.
290 e.g.
291 [
292 {
293 'prefix': 'dashboard iscsi-gateway-add',
294 'service_url': 'http://admin:admin@aaa:5000',
295 'name': 'aaa'
296 },
297 {
298 'prefix': 'dashboard iscsi-gateway-add',
299 'service_url': 'http://admin:admin@bbb:5000',
300 'name': 'bbb'
301 }
302 ]
303 The function should return empty list if no command need to be sent.
304 :type get_set_cmd_dicts: Callable[[str], List[dict]]
305 """
306
e306af50
TL
307 try:
308 _, out, _ = self.mgr.check_mon_command({
f6b5b4d7 309 'prefix': get_cmd
e306af50
TL
310 })
311 except MonCommandFailed as e:
f6b5b4d7 312 logger.warning('Failed to get Dashboard config for %s: %s', service_name, e)
e306af50 313 return
f6b5b4d7
TL
314 cmd_dicts = get_set_cmd_dicts(out.strip())
315 for cmd_dict in list(cmd_dicts):
e306af50 316 try:
cd265ab1
TL
317 inbuf = cmd_dict.pop('inbuf', None)
318 _, out, _ = self.mgr.check_mon_command(cmd_dict, inbuf)
e306af50 319 except MonCommandFailed as e:
f6b5b4d7
TL
320 logger.warning('Failed to set Dashboard config for %s: %s', service_name, e)
321
f67539c2
TL
322 def ok_to_stop_osd(
323 self,
324 osds: List[str],
325 known: Optional[List[str]] = None, # output argument
326 force: bool = False) -> HandleCommandResult:
327 r = HandleCommandResult(*self.mgr.mon_command({
328 'prefix': "osd ok-to-stop",
329 'ids': osds,
330 'max': 16,
331 }))
332 j = None
333 try:
334 j = json.loads(r.stdout)
335 except json.decoder.JSONDecodeError:
336 self.mgr.log.warning("osd ok-to-stop didn't return structured result")
337 raise
338 if r.retval:
339 return r
340 if known is not None and j and j.get('ok_to_stop'):
341 self.mgr.log.debug(f"got {j}")
342 known.extend([f'osd.{x}' for x in j.get('osds', [])])
343 return HandleCommandResult(
344 0,
345 f'{",".join(["osd.%s" % o for o in osds])} {"is" if len(osds) == 1 else "are"} safe to restart',
346 ''
347 )
348
349 def ok_to_stop(
350 self,
351 daemon_ids: List[str],
352 force: bool = False,
353 known: Optional[List[str]] = None # output argument
354 ) -> HandleCommandResult:
f6b5b4d7 355 names = [f'{self.TYPE}.{d_id}' for d_id in daemon_ids]
f67539c2
TL
356 out = f'It appears safe to stop {",".join(names)}'
357 err = f'It is NOT safe to stop {",".join(names)} at this time'
f6b5b4d7
TL
358
359 if self.TYPE not in ['mon', 'osd', 'mds']:
f67539c2
TL
360 logger.debug(out)
361 return HandleCommandResult(0, out)
362
363 if self.TYPE == 'osd':
364 return self.ok_to_stop_osd(daemon_ids, known, force)
f6b5b4d7
TL
365
366 r = HandleCommandResult(*self.mgr.mon_command({
367 'prefix': f'{self.TYPE} ok-to-stop',
368 'ids': daemon_ids,
369 }))
370
371 if r.retval:
372 err = f'{err}: {r.stderr}' if r.stderr else err
f67539c2 373 logger.debug(err)
f6b5b4d7 374 return HandleCommandResult(r.retval, r.stdout, err)
e306af50 375
f6b5b4d7 376 out = f'{out}: {r.stdout}' if r.stdout else out
f67539c2 377 logger.debug(out)
f6b5b4d7
TL
378 return HandleCommandResult(r.retval, out, r.stderr)
379
f67539c2
TL
380 def _enough_daemons_to_stop(self, daemon_type: str, daemon_ids: List[str], service: str, low_limit: int, alert: bool = False) -> Tuple[bool, str]:
381 # Provides a warning about if it possible or not to stop <n> daemons in a service
382 names = [f'{daemon_type}.{d_id}' for d_id in daemon_ids]
383 number_of_running_daemons = len(
384 [daemon
385 for daemon in self.mgr.cache.get_daemons_by_type(daemon_type)
386 if daemon.status == DaemonDescriptionStatus.running])
387 if (number_of_running_daemons - len(daemon_ids)) >= low_limit:
388 return False, f'It is presumed safe to stop {names}'
389
390 num_daemons_left = number_of_running_daemons - len(daemon_ids)
391
392 def plural(count: int) -> str:
393 return 'daemon' if count == 1 else 'daemons'
394
395 left_count = "no" if num_daemons_left == 0 else num_daemons_left
396
397 if alert:
398 out = (f'ALERT: Cannot stop {names} in {service} service. '
399 f'Not enough remaining {service} daemons. '
400 f'Please deploy at least {low_limit + 1} {service} daemons before stopping {names}. ')
401 else:
402 out = (f'WARNING: Stopping {len(daemon_ids)} out of {number_of_running_daemons} daemons in {service} service. '
403 f'Service will not be operational with {left_count} {plural(num_daemons_left)} left. '
404 f'At least {low_limit} {plural(low_limit)} must be running to guarantee service. ')
405 return True, out
406
f91f0fd5 407 def pre_remove(self, daemon: DaemonDescription) -> None:
f6b5b4d7
TL
408 """
409 Called before the daemon is removed.
410 """
f67539c2
TL
411 assert daemon.daemon_type is not None
412 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
f91f0fd5
TL
413 logger.debug(f'Pre remove daemon {self.TYPE}.{daemon.daemon_id}')
414
a4b75251 415 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
f91f0fd5
TL
416 """
417 Called after the daemon is removed.
418 """
f67539c2
TL
419 assert daemon.daemon_type is not None
420 assert self.TYPE == daemon_type_to_service(daemon.daemon_type)
f91f0fd5
TL
421 logger.debug(f'Post remove daemon {self.TYPE}.{daemon.daemon_id}')
422
f67539c2
TL
423 def purge(self, service_name: str) -> None:
424 """Called to carry out any purge tasks following service removal"""
425 logger.debug(f'Purge called for {self.TYPE} - no action taken')
426
f91f0fd5
TL
427
428class CephService(CephadmService):
f67539c2 429 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
f91f0fd5
TL
430 # Ceph.daemons (mon, mgr, mds, osd, etc)
431 cephadm_config = self.get_config_and_keyring(
432 daemon_spec.daemon_type,
433 daemon_spec.daemon_id,
434 host=daemon_spec.host,
435 keyring=daemon_spec.keyring,
436 extra_ceph_config=daemon_spec.ceph_conf)
437
438 if daemon_spec.config_get_files():
439 cephadm_config.update({'files': daemon_spec.config_get_files()})
440
441 return cephadm_config, []
442
a4b75251
TL
443 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
444 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
f91f0fd5 445 self.remove_keyring(daemon)
e306af50 446
f91f0fd5
TL
447 def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
448 """
449 Map the daemon id to a cephx keyring entity name
450 """
f67539c2
TL
451 # despite this mapping entity names to daemons, self.TYPE within
452 # the CephService class refers to service types, not daemon types
453 if self.TYPE in ['rgw', 'rbd-mirror', 'cephfs-mirror', 'nfs', "iscsi", 'ingress']:
f91f0fd5 454 return AuthEntity(f'client.{self.TYPE}.{daemon_id}')
20effc67 455 elif self.TYPE in ['crash', 'agent']:
f91f0fd5 456 if host == "":
20effc67
TL
457 raise OrchestratorError(
458 f'Host not provided to generate <{self.TYPE}> auth entity name')
f91f0fd5
TL
459 return AuthEntity(f'client.{self.TYPE}.{host}')
460 elif self.TYPE == 'mon':
461 return AuthEntity('mon.')
462 elif self.TYPE in ['mgr', 'osd', 'mds']:
463 return AuthEntity(f'{self.TYPE}.{daemon_id}')
464 else:
465 raise OrchestratorError("unknown daemon type")
466
467 def get_config_and_keyring(self,
468 daemon_type: str,
469 daemon_id: str,
470 host: str,
471 keyring: Optional[str] = None,
472 extra_ceph_config: Optional[str] = None
473 ) -> Dict[str, Any]:
474 # keyring
475 if not keyring:
476 entity: AuthEntity = self.get_auth_entity(daemon_id, host=host)
477 ret, keyring, err = self.mgr.check_mon_command({
478 'prefix': 'auth get',
479 'entity': entity,
480 })
481
482 config = self.mgr.get_minimal_ceph_conf()
483
484 if extra_ceph_config:
485 config += extra_ceph_config
486
487 return {
488 'config': config,
489 'keyring': keyring,
490 }
491
492 def remove_keyring(self, daemon: DaemonDescription) -> None:
f67539c2
TL
493 assert daemon.daemon_id is not None
494 assert daemon.hostname is not None
f91f0fd5
TL
495 daemon_id: str = daemon.daemon_id
496 host: str = daemon.hostname
497
a4b75251 498 assert daemon.daemon_type != 'mon'
f91f0fd5
TL
499
500 entity = self.get_auth_entity(daemon_id, host=host)
501
f67539c2
TL
502 logger.info(f'Removing key for {entity}')
503 ret, out, err = self.mgr.mon_command({
f91f0fd5
TL
504 'prefix': 'auth rm',
505 'entity': entity,
506 })
507
508
509class MonService(CephService):
f6b5b4d7
TL
510 TYPE = 'mon'
511
f67539c2 512 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
e306af50
TL
513 """
514 Create a new monitor on the given host.
515 """
f6b5b4d7 516 assert self.TYPE == daemon_spec.daemon_type
f67539c2 517 name, _, network = daemon_spec.daemon_id, daemon_spec.host, daemon_spec.network
f6b5b4d7 518
e306af50
TL
519 # get mon. key
520 ret, keyring, err = self.mgr.check_mon_command({
521 'prefix': 'auth get',
f91f0fd5 522 'entity': self.get_auth_entity(name),
e306af50
TL
523 })
524
525 extra_config = '[mon.%s]\n' % name
526 if network:
527 # infer whether this is a CIDR network, addrvec, or plain IP
528 if '/' in network:
529 extra_config += 'public network = %s\n' % network
530 elif network.startswith('[v') and network.endswith(']'):
531 extra_config += 'public addrv = %s\n' % network
f91f0fd5
TL
532 elif is_ipv6(network):
533 extra_config += 'public addr = %s\n' % unwrap_ipv6(network)
e306af50
TL
534 elif ':' not in network:
535 extra_config += 'public addr = %s\n' % network
536 else:
f91f0fd5
TL
537 raise OrchestratorError(
538 'Must specify a CIDR network, ceph addrvec, or plain IP: \'%s\'' % network)
e306af50
TL
539 else:
540 # try to get the public_network from the config
541 ret, network, err = self.mgr.check_mon_command({
542 'prefix': 'config get',
543 'who': 'mon',
544 'key': 'public_network',
545 })
f6b5b4d7 546 network = network.strip() if network else network
e306af50 547 if not network:
f91f0fd5
TL
548 raise OrchestratorError(
549 'Must set public_network config option or specify a CIDR network, ceph addrvec, or plain IP')
e306af50 550 if '/' not in network:
f91f0fd5
TL
551 raise OrchestratorError(
552 'public_network is set but does not look like a CIDR network: \'%s\'' % network)
e306af50
TL
553 extra_config += 'public network = %s\n' % network
554
f91f0fd5
TL
555 daemon_spec.ceph_conf = extra_config
556 daemon_spec.keyring = keyring
f6b5b4d7 557
f67539c2
TL
558 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
559
f91f0fd5 560 return daemon_spec
f6b5b4d7
TL
561
562 def _check_safe_to_destroy(self, mon_id: str) -> None:
563 ret, out, err = self.mgr.check_mon_command({
564 'prefix': 'quorum_status',
565 })
566 try:
567 j = json.loads(out)
f67539c2 568 except Exception:
f6b5b4d7
TL
569 raise OrchestratorError('failed to parse quorum status')
570
571 mons = [m['name'] for m in j['monmap']['mons']]
572 if mon_id not in mons:
573 logger.info('Safe to remove mon.%s: not in monmap (%s)' % (
574 mon_id, mons))
575 return
576 new_mons = [m for m in mons if m != mon_id]
577 new_quorum = [m for m in j['quorum_names'] if m != mon_id]
578 if len(new_quorum) > len(new_mons) / 2:
f91f0fd5
TL
579 logger.info('Safe to remove mon.%s: new quorum should be %s (from %s)' %
580 (mon_id, new_quorum, new_mons))
f6b5b4d7 581 return
f91f0fd5
TL
582 raise OrchestratorError(
583 'Removing %s would break mon quorum (new quorum %s, new mons %s)' % (mon_id, new_quorum, new_mons))
f6b5b4d7 584
f91f0fd5
TL
585 def pre_remove(self, daemon: DaemonDescription) -> None:
586 super().pre_remove(daemon)
f6b5b4d7 587
f67539c2 588 assert daemon.daemon_id is not None
f91f0fd5 589 daemon_id: str = daemon.daemon_id
f6b5b4d7
TL
590 self._check_safe_to_destroy(daemon_id)
591
592 # remove mon from quorum before we destroy the daemon
593 logger.info('Removing monitor %s from monmap...' % daemon_id)
594 ret, out, err = self.mgr.check_mon_command({
595 'prefix': 'mon rm',
596 'name': daemon_id,
597 })
e306af50 598
a4b75251
TL
599 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
600 # Do not remove the mon keyring.
601 # super().post_remove(daemon)
602 pass
603
e306af50 604
f91f0fd5 605class MgrService(CephService):
f6b5b4d7
TL
606 TYPE = 'mgr'
607
b3b6e05e
TL
608 def allow_colo(self) -> bool:
609 if self.mgr.get_ceph_option('mgr_standby_modules'):
610 # traditional mgr mode: standby daemons' modules listen on
611 # ports and redirect to the primary. we must not schedule
612 # multiple mgrs on the same host or else ports will
613 # conflict.
614 return False
615 else:
616 # standby daemons do nothing, and therefore port conflicts
617 # are not a concern.
618 return True
619
f67539c2 620 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
e306af50
TL
621 """
622 Create a new manager instance on a host.
623 """
f6b5b4d7 624 assert self.TYPE == daemon_spec.daemon_type
f67539c2 625 mgr_id, _ = daemon_spec.daemon_id, daemon_spec.host
f6b5b4d7 626
e306af50 627 # get mgr. key
f67539c2
TL
628 keyring = self.get_keyring_with_caps(self.get_auth_entity(mgr_id),
629 ['mon', 'profile mgr',
630 'osd', 'allow *',
631 'mds', 'allow *'])
e306af50 632
f6b5b4d7
TL
633 # Retrieve ports used by manager modules
634 # In the case of the dashboard port and with several manager daemons
635 # running in different hosts, it exists the possibility that the
636 # user has decided to use different dashboard ports in each server
637 # If this is the case then the dashboard port opened will be only the used
638 # as default.
639 ports = []
f6b5b4d7 640 ret, mgr_services, err = self.mgr.check_mon_command({
f91f0fd5 641 'prefix': 'mgr services',
f6b5b4d7
TL
642 })
643 if mgr_services:
644 mgr_endpoints = json.loads(mgr_services)
645 for end_point in mgr_endpoints.values():
f91f0fd5 646 port = re.search(r'\:\d+\/', end_point)
f6b5b4d7
TL
647 if port:
648 ports.append(int(port[0][1:-1]))
649
650 if ports:
651 daemon_spec.ports = ports
652
653 daemon_spec.keyring = keyring
654
f67539c2
TL
655 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
656
f91f0fd5
TL
657 return daemon_spec
658
f6b5b4d7 659 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
f6b5b4d7 660 for daemon in daemon_descrs:
f67539c2
TL
661 assert daemon.daemon_type is not None
662 assert daemon.daemon_id is not None
f91f0fd5 663 if self.mgr.daemon_is_self(daemon.daemon_type, daemon.daemon_id):
f6b5b4d7
TL
664 return daemon
665 # if no active mgr found, return empty Daemon Desc
666 return DaemonDescription()
e306af50 667
f91f0fd5
TL
668 def fail_over(self) -> None:
669 if not self.mgr_map_has_standby():
670 raise OrchestratorError('Need standby mgr daemon', event_kind_subject=(
671 'daemon', 'mgr' + self.mgr.get_mgr_id()))
672
673 self.mgr.events.for_daemon('mgr' + self.mgr.get_mgr_id(),
674 'INFO', 'Failing over to other MGR')
675 logger.info('Failing over to other MGR')
676
677 # fail over
678 ret, out, err = self.mgr.check_mon_command({
679 'prefix': 'mgr fail',
680 'who': self.mgr.get_mgr_id(),
681 })
682
683 def mgr_map_has_standby(self) -> bool:
684 """
685 This is a bit safer than asking our inventory. If the mgr joined the mgr map,
686 we know it joined the cluster
687 """
688 mgr_map = self.mgr.get('mgr_map')
689 num = len(mgr_map.get('standbys'))
690 return bool(num)
691
f67539c2
TL
692 def ok_to_stop(
693 self,
694 daemon_ids: List[str],
695 force: bool = False,
696 known: Optional[List[str]] = None # output argument
697 ) -> HandleCommandResult:
698 # ok to stop if there is more than 1 mgr and not trying to stop the active mgr
699
700 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'Mgr', 1, True)
701 if warn:
702 return HandleCommandResult(-errno.EBUSY, '', warn_message)
703
704 mgr_daemons = self.mgr.cache.get_daemons_by_type(self.TYPE)
705 active = self.get_active_daemon(mgr_daemons).daemon_id
706 if active in daemon_ids:
707 warn_message = 'ALERT: Cannot stop active Mgr daemon, Please switch active Mgrs with \'ceph mgr fail %s\'' % active
708 return HandleCommandResult(-errno.EBUSY, '', warn_message)
709
710 return HandleCommandResult(0, warn_message, '')
711
e306af50 712
f91f0fd5 713class MdsService(CephService):
f6b5b4d7
TL
714 TYPE = 'mds'
715
f67539c2
TL
716 def allow_colo(self) -> bool:
717 return True
718
522d829b 719 def config(self, spec: ServiceSpec) -> None:
f6b5b4d7 720 assert self.TYPE == spec.service_type
e306af50 721 assert spec.service_id
f6b5b4d7
TL
722
723 # ensure mds_join_fs is set for these daemons
e306af50
TL
724 ret, out, err = self.mgr.check_mon_command({
725 'prefix': 'config set',
726 'who': 'mds.' + spec.service_id,
727 'name': 'mds_join_fs',
728 'value': spec.service_id,
729 })
730
f67539c2 731 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7 732 assert self.TYPE == daemon_spec.daemon_type
f67539c2 733 mds_id, _ = daemon_spec.daemon_id, daemon_spec.host
f6b5b4d7 734
f67539c2
TL
735 # get mds. key
736 keyring = self.get_keyring_with_caps(self.get_auth_entity(mds_id),
737 ['mon', 'profile mds',
738 'osd', 'allow rw tag cephfs *=*',
739 'mds', 'allow'])
f6b5b4d7
TL
740 daemon_spec.keyring = keyring
741
f67539c2
TL
742 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
743
f91f0fd5
TL
744 return daemon_spec
745
f6b5b4d7
TL
746 def get_active_daemon(self, daemon_descrs: List[DaemonDescription]) -> DaemonDescription:
747 active_mds_strs = list()
748 for fs in self.mgr.get('fs_map')['filesystems']:
749 mds_map = fs['mdsmap']
750 if mds_map is not None:
751 for mds_id, mds_status in mds_map['info'].items():
752 if mds_status['state'] == 'up:active':
753 active_mds_strs.append(mds_status['name'])
754 if len(active_mds_strs) != 0:
755 for daemon in daemon_descrs:
756 if daemon.daemon_id in active_mds_strs:
757 return daemon
758 # if no mds found, return empty Daemon Desc
759 return DaemonDescription()
e306af50 760
f67539c2
TL
761 def purge(self, service_name: str) -> None:
762 self.mgr.check_mon_command({
763 'prefix': 'config rm',
764 'who': service_name,
765 'name': 'mds_join_fs',
766 })
767
e306af50 768
f91f0fd5 769class RgwService(CephService):
f6b5b4d7
TL
770 TYPE = 'rgw'
771
f67539c2
TL
772 def allow_colo(self) -> bool:
773 return True
f6b5b4d7 774
522d829b 775 def config(self, spec: RGWSpec) -> None: # type: ignore
f67539c2 776 assert self.TYPE == spec.service_type
f6b5b4d7 777
f67539c2
TL
778 # set rgw_realm and rgw_zone, if present
779 if spec.rgw_realm:
780 ret, out, err = self.mgr.check_mon_command({
781 'prefix': 'config set',
782 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
783 'name': 'rgw_realm',
784 'value': spec.rgw_realm,
785 })
786 if spec.rgw_zone:
787 ret, out, err = self.mgr.check_mon_command({
788 'prefix': 'config set',
789 'who': f"{utils.name_to_config_section('rgw')}.{spec.service_id}",
790 'name': 'rgw_zone',
791 'value': spec.rgw_zone,
792 })
e306af50
TL
793
794 if spec.rgw_frontend_ssl_certificate:
795 if isinstance(spec.rgw_frontend_ssl_certificate, list):
796 cert_data = '\n'.join(spec.rgw_frontend_ssl_certificate)
797 elif isinstance(spec.rgw_frontend_ssl_certificate, str):
798 cert_data = spec.rgw_frontend_ssl_certificate
799 else:
800 raise OrchestratorError(
f91f0fd5
TL
801 'Invalid rgw_frontend_ssl_certificate: %s'
802 % spec.rgw_frontend_ssl_certificate)
e306af50
TL
803 ret, out, err = self.mgr.check_mon_command({
804 'prefix': 'config-key set',
f67539c2 805 'key': f'rgw/cert/{spec.service_name()}',
e306af50
TL
806 'val': cert_data,
807 })
808
f67539c2 809 # TODO: fail, if we don't have a spec
e306af50
TL
810 logger.info('Saving service %s spec with placement %s' % (
811 spec.service_name(), spec.placement.pretty_str()))
812 self.mgr.spec_store.save(spec)
522d829b 813 self.mgr.trigger_connect_dashboard_rgw()
e306af50 814
f67539c2 815 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7 816 assert self.TYPE == daemon_spec.daemon_type
f67539c2
TL
817 rgw_id, _ = daemon_spec.daemon_id, daemon_spec.host
818 spec = cast(RGWSpec, self.mgr.spec_store[daemon_spec.service_name].spec)
f6b5b4d7
TL
819
820 keyring = self.get_keyring(rgw_id)
821
f67539c2
TL
822 if daemon_spec.ports:
823 port = daemon_spec.ports[0]
824 else:
825 # this is a redeploy of older instance that doesn't have an explicitly
826 # assigned port, in which case we can assume there is only 1 per host
827 # and it matches the spec.
828 port = spec.get_port()
829
830 # configure frontend
831 args = []
832 ftype = spec.rgw_frontend_type or "beast"
833 if ftype == 'beast':
834 if spec.ssl:
835 if daemon_spec.ip:
a4b75251
TL
836 args.append(
837 f"ssl_endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
f67539c2
TL
838 else:
839 args.append(f"ssl_port={port}")
840 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
841 else:
842 if daemon_spec.ip:
a4b75251 843 args.append(f"endpoint={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
f67539c2
TL
844 else:
845 args.append(f"port={port}")
846 elif ftype == 'civetweb':
847 if spec.ssl:
848 if daemon_spec.ip:
a4b75251
TL
849 # note the 's' suffix on port
850 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}s")
f67539c2
TL
851 else:
852 args.append(f"port={port}s") # note the 's' suffix on port
853 args.append(f"ssl_certificate=config://rgw/cert/{spec.service_name()}")
854 else:
855 if daemon_spec.ip:
a4b75251 856 args.append(f"port={build_url(host=daemon_spec.ip, port=port).lstrip('/')}")
f67539c2
TL
857 else:
858 args.append(f"port={port}")
859 frontend = f'{ftype} {" ".join(args)}'
860
861 ret, out, err = self.mgr.check_mon_command({
862 'prefix': 'config set',
863 'who': utils.name_to_config_section(daemon_spec.name()),
864 'name': 'rgw_frontends',
865 'value': frontend
866 })
867
f6b5b4d7 868 daemon_spec.keyring = keyring
f67539c2 869 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
f6b5b4d7 870
f91f0fd5 871 return daemon_spec
f6b5b4d7 872
f91f0fd5 873 def get_keyring(self, rgw_id: str) -> str:
f67539c2
TL
874 keyring = self.get_keyring_with_caps(self.get_auth_entity(rgw_id),
875 ['mon', 'allow *',
876 'mgr', 'allow rw',
877 'osd', 'allow rwx tag rgw *=*'])
f6b5b4d7
TL
878 return keyring
879
f67539c2
TL
880 def purge(self, service_name: str) -> None:
881 self.mgr.check_mon_command({
882 'prefix': 'config rm',
883 'who': utils.name_to_config_section(service_name),
884 'name': 'rgw_realm',
885 })
886 self.mgr.check_mon_command({
887 'prefix': 'config rm',
888 'who': utils.name_to_config_section(service_name),
889 'name': 'rgw_zone',
890 })
891 self.mgr.check_mon_command({
892 'prefix': 'config-key rm',
893 'key': f'rgw/cert/{service_name}',
894 })
522d829b 895 self.mgr.trigger_connect_dashboard_rgw()
f67539c2 896
a4b75251
TL
897 def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
898 super().post_remove(daemon, is_failed_deploy=is_failed_deploy)
f67539c2
TL
899 self.mgr.check_mon_command({
900 'prefix': 'config rm',
901 'who': utils.name_to_config_section(daemon.name()),
902 'name': 'rgw_frontends',
903 })
904
905 def ok_to_stop(
906 self,
907 daemon_ids: List[str],
908 force: bool = False,
909 known: Optional[List[str]] = None # output argument
910 ) -> HandleCommandResult:
911 # if load balancer (ingress) is present block if only 1 daemon up otherwise ok
912 # if no load balancer, warn if > 1 daemon, block if only 1 daemon
913 def ingress_present() -> bool:
914 running_ingress_daemons = [
915 daemon for daemon in self.mgr.cache.get_daemons_by_type('ingress') if daemon.status == 1]
916 running_haproxy_daemons = [
917 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'haproxy']
918 running_keepalived_daemons = [
919 daemon for daemon in running_ingress_daemons if daemon.daemon_type == 'keepalived']
920 # check that there is at least one haproxy and keepalived daemon running
921 if running_haproxy_daemons and running_keepalived_daemons:
922 return True
923 return False
924
925 # if only 1 rgw, alert user (this is not passable with --force)
926 warn, warn_message = self._enough_daemons_to_stop(self.TYPE, daemon_ids, 'RGW', 1, True)
927 if warn:
928 return HandleCommandResult(-errno.EBUSY, '', warn_message)
929
930 # if reached here, there is > 1 rgw daemon.
931 # Say okay if load balancer present or force flag set
932 if ingress_present() or force:
933 return HandleCommandResult(0, warn_message, '')
934
935 # if reached here, > 1 RGW daemon, no load balancer and no force flag.
936 # Provide warning
937 warn_message = "WARNING: Removing RGW daemons can cause clients to lose connectivity. "
938 return HandleCommandResult(-errno.EBUSY, '', warn_message)
e306af50 939
522d829b
TL
940 def config_dashboard(self, daemon_descrs: List[DaemonDescription]) -> None:
941 self.mgr.trigger_connect_dashboard_rgw()
942
e306af50 943
f91f0fd5 944class RbdMirrorService(CephService):
f6b5b4d7
TL
945 TYPE = 'rbd-mirror'
946
f67539c2
TL
947 def allow_colo(self) -> bool:
948 return True
949
950 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7 951 assert self.TYPE == daemon_spec.daemon_type
f67539c2 952 daemon_id, _ = daemon_spec.daemon_id, daemon_spec.host
f6b5b4d7 953
f67539c2
TL
954 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id),
955 ['mon', 'profile rbd-mirror',
956 'osd', 'profile rbd'])
f6b5b4d7
TL
957
958 daemon_spec.keyring = keyring
959
f67539c2
TL
960 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
961
f91f0fd5 962 return daemon_spec
e306af50 963
f67539c2
TL
964 def ok_to_stop(
965 self,
966 daemon_ids: List[str],
967 force: bool = False,
968 known: Optional[List[str]] = None # output argument
969 ) -> HandleCommandResult:
970 # if only 1 rbd-mirror, alert user (this is not passable with --force)
971 warn, warn_message = self._enough_daemons_to_stop(
972 self.TYPE, daemon_ids, 'Rbdmirror', 1, True)
973 if warn:
974 return HandleCommandResult(-errno.EBUSY, '', warn_message)
975 return HandleCommandResult(0, warn_message, '')
976
e306af50 977
f91f0fd5 978class CrashService(CephService):
f6b5b4d7
TL
979 TYPE = 'crash'
980
f67539c2 981 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
f6b5b4d7
TL
982 assert self.TYPE == daemon_spec.daemon_type
983 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
984
f67539c2
TL
985 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host),
986 ['mon', 'profile crash',
987 'mgr', 'profile crash'])
988
989 daemon_spec.keyring = keyring
990
991 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
992
993 return daemon_spec
994
995
996class CephfsMirrorService(CephService):
997 TYPE = 'cephfs-mirror'
998
20effc67
TL
999 def config(self, spec: ServiceSpec) -> None:
1000 # make sure mirroring module is enabled
1001 mgr_map = self.mgr.get('mgr_map')
1002 mod_name = 'mirroring'
1003 if mod_name not in mgr_map.get('services', {}):
1004 self.mgr.check_mon_command({
1005 'prefix': 'mgr module enable',
1006 'module': mod_name
1007 })
1008 # we shouldn't get here (mon will tell the mgr to respawn), but no
1009 # harm done if we do.
1010
f67539c2
TL
1011 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1012 assert self.TYPE == daemon_spec.daemon_type
1013
e306af50
TL
1014 ret, keyring, err = self.mgr.check_mon_command({
1015 'prefix': 'auth get-or-create',
f67539c2 1016 'entity': self.get_auth_entity(daemon_spec.daemon_id),
b3b6e05e 1017 'caps': ['mon', 'profile cephfs-mirror',
f67539c2
TL
1018 'mds', 'allow r',
1019 'osd', 'allow rw tag cephfs metadata=*, allow r tag cephfs data=*',
1020 'mgr', 'allow r'],
e306af50 1021 })
f6b5b4d7
TL
1022
1023 daemon_spec.keyring = keyring
f67539c2 1024 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
f91f0fd5 1025 return daemon_spec
20effc67
TL
1026
1027
1028class CephadmAgent(CephService):
1029 TYPE = 'agent'
1030
1031 def prepare_create(self, daemon_spec: CephadmDaemonDeploySpec) -> CephadmDaemonDeploySpec:
1032 assert self.TYPE == daemon_spec.daemon_type
1033 daemon_id, host = daemon_spec.daemon_id, daemon_spec.host
1034
1035 if not self.mgr.cherrypy_thread:
1036 raise OrchestratorError('Cannot deploy agent before creating cephadm endpoint')
1037
1038 keyring = self.get_keyring_with_caps(self.get_auth_entity(daemon_id, host=host), [])
1039 daemon_spec.keyring = keyring
1040 self.mgr.agent_cache.agent_keys[host] = keyring
1041
1042 daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
1043
1044 return daemon_spec
1045
1046 def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
1047 try:
1048 assert self.mgr.cherrypy_thread
1049 assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
1050 assert self.mgr.cherrypy_thread.server_port
1051 except Exception:
1052 raise OrchestratorError(
1053 'Cannot deploy agent daemons until cephadm endpoint has finished generating certs')
1054
1055 cfg = {'target_ip': self.mgr.get_mgr_ip(),
1056 'target_port': self.mgr.cherrypy_thread.server_port,
1057 'refresh_period': self.mgr.agent_refresh_rate,
1058 'listener_port': self.mgr.agent_starting_port,
1059 'host': daemon_spec.host,
1060 'device_enhanced_scan': str(self.mgr.device_enhanced_scan)}
1061
1062 listener_cert, listener_key = self.mgr.cherrypy_thread.ssl_certs.generate_cert(
1063 self.mgr.inventory.get_addr(daemon_spec.host))
1064 config = {
1065 'agent.json': json.dumps(cfg),
1066 'keyring': daemon_spec.keyring,
1067 'root_cert.pem': self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
1068 'listener.crt': listener_cert,
1069 'listener.key': listener_key,
1070 }
1071
1072 return config, sorted([str(self.mgr.get_mgr_ip()), str(self.mgr.cherrypy_thread.server_port),
1073 self.mgr.cherrypy_thread.ssl_certs.get_root_cert(),
1074 str(self.mgr.get_module_option('device_enhanced_scan'))])