]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import json |
2 | import errno | |
3 | import logging | |
f91f0fd5 TL |
4 | import re |
5 | import shlex | |
e306af50 | 6 | from collections import defaultdict |
f91f0fd5 | 7 | from configparser import ConfigParser |
9f95a23c | 8 | from functools import wraps |
e306af50 TL |
9 | from tempfile import TemporaryDirectory |
10 | from threading import Event | |
9f95a23c TL |
11 | |
12 | import string | |
e306af50 | 13 | from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ |
f67539c2 | 14 | Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence, Type |
9f95a23c TL |
15 | |
16 | import datetime | |
9f95a23c TL |
17 | import os |
18 | import random | |
19 | import tempfile | |
20 | import multiprocessing.pool | |
9f95a23c | 21 | import subprocess |
f67539c2 | 22 | from prettytable import PrettyTable |
9f95a23c | 23 | |
e306af50 | 24 | from ceph.deployment import inventory |
9f95a23c | 25 | from ceph.deployment.drive_group import DriveGroupSpec |
801d1391 | 26 | from ceph.deployment.service_spec import \ |
a4b75251 | 27 | ServiceSpec, PlacementSpec, assert_valid_host, \ |
f67539c2 | 28 | HostPlacementSpec, IngressSpec |
adb31ebb | 29 | from ceph.utils import str_to_datetime, datetime_to_str, datetime_now |
f91f0fd5 | 30 | from cephadm.serve import CephadmServe |
f67539c2 | 31 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec |
9f95a23c | 32 | |
f67539c2 TL |
33 | from mgr_module import MgrModule, HandleCommandResult, Option |
34 | from mgr_util import create_self_signed_cert | |
35 | import secrets | |
9f95a23c | 36 | import orchestrator |
f67539c2 TL |
37 | from orchestrator.module import to_format, Format |
38 | ||
9f95a23c | 39 | from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ |
f67539c2 TL |
40 | CLICommandMeta, DaemonDescription, DaemonDescriptionStatus, handle_orch_error, \ |
41 | service_to_daemon_types | |
e306af50 | 42 | from orchestrator._interface import GenericSpec |
f67539c2 | 43 | from orchestrator._interface import daemon_type_to_service |
9f95a23c TL |
44 | |
45 | from . import remotes | |
801d1391 | 46 | from . import utils |
f6b5b4d7 | 47 | from .migrations import Migrations |
e306af50 | 48 | from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \ |
f67539c2 TL |
49 | RbdMirrorService, CrashService, CephadmService, CephfsMirrorService |
50 | from .services.ingress import IngressService | |
f91f0fd5 | 51 | from .services.container import CustomContainerService |
e306af50 TL |
52 | from .services.iscsi import IscsiService |
53 | from .services.nfs import NFSService | |
f67539c2 | 54 | from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError |
e306af50 TL |
55 | from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ |
56 | NodeExporterService | |
f67539c2 | 57 | from .services.exporter import CephadmExporter, CephadmExporterConfig |
f91f0fd5 | 58 | from .schedule import HostAssignment |
b3b6e05e | 59 | from .inventory import Inventory, SpecStore, HostCache, EventStore, ClientKeyringStore, ClientKeyringSpec |
f67539c2 | 60 | from .upgrade import CephadmUpgrade |
e306af50 | 61 | from .template import TemplateMgr |
a4b75251 | 62 | from .utils import CEPH_IMAGE_TYPES, forall_hosts, cephadmNoImage |
f67539c2 | 63 | from .configchecks import CephadmConfigChecks |
9f95a23c TL |
64 | |
65 | try: | |
66 | import remoto | |
e306af50 TL |
67 | # NOTE(mattoliverau) Patch remoto until remoto PR |
68 | # (https://github.com/alfredodeza/remoto/pull/56) lands | |
69 | from distutils.version import StrictVersion | |
70 | if StrictVersion(remoto.__version__) <= StrictVersion('1.2'): | |
adb31ebb | 71 | def remoto_has_connection(self: Any) -> bool: |
e306af50 TL |
72 | return self.gateway.hasreceiver() |
73 | ||
74 | from remoto.backends import BaseConnection | |
75 | BaseConnection.has_connection = remoto_has_connection | |
9f95a23c | 76 | import remoto.process |
9f95a23c TL |
77 | except ImportError as e: |
78 | remoto = None | |
79 | remoto_import_error = str(e) | |
80 | ||
9f95a23c TL |
81 | logger = logging.getLogger(__name__) |
82 | ||
e306af50 TL |
83 | T = TypeVar('T') |
84 | ||
1911f103 TL |
85 | DEFAULT_SSH_CONFIG = """ |
86 | Host * | |
87 | User root | |
88 | StrictHostKeyChecking no | |
89 | UserKnownHostsFile /dev/null | |
90 | ConnectTimeout=30 | |
91 | """ | |
9f95a23c | 92 | |
f67539c2 | 93 | # Default container images ----------------------------------------------------- |
522d829b TL |
94 | DEFAULT_IMAGE = 'quay.io/ceph/ceph' |
95 | DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.18.1' | |
96 | DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v0.18.1' | |
97 | DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.20.0' | |
98 | DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:6.7.4' | |
b3b6e05e | 99 | DEFAULT_HAPROXY_IMAGE = 'docker.io/library/haproxy:2.3' |
522d829b | 100 | DEFAULT_KEEPALIVED_IMAGE = 'docker.io/arcts/keepalived' |
f67539c2 TL |
101 | # ------------------------------------------------------------------------------ |
102 | ||
103 | ||
104 | def service_inactive(spec_name: str) -> Callable: | |
105 | def inner(func: Callable) -> Callable: | |
106 | @wraps(func) | |
107 | def wrapper(*args: Any, **kwargs: Any) -> Any: | |
108 | obj = args[0] | |
109 | if obj.get_store(f"spec.{spec_name}") is not None: | |
110 | return 1, "", f"Unable to change configuration of an active service {spec_name}" | |
111 | return func(*args, **kwargs) | |
112 | return wrapper | |
113 | return inner | |
114 | ||
115 | ||
116 | def host_exists(hostname_position: int = 1) -> Callable: | |
117 | """Check that a hostname exists in the inventory""" | |
118 | def inner(func: Callable) -> Callable: | |
119 | @wraps(func) | |
120 | def wrapper(*args: Any, **kwargs: Any) -> Any: | |
121 | this = args[0] # self object | |
122 | hostname = args[hostname_position] | |
123 | if hostname not in this.cache.get_hosts(): | |
124 | candidates = ','.join([h for h in this.cache.get_hosts() if h.startswith(hostname)]) | |
125 | help_msg = f"Did you mean {candidates}?" if candidates else "" | |
126 | raise OrchestratorError( | |
127 | f"Cannot find host '{hostname}' in the inventory. {help_msg}") | |
9f95a23c | 128 | |
f67539c2 TL |
129 | return func(*args, **kwargs) |
130 | return wrapper | |
131 | return inner | |
f91f0fd5 TL |
132 | |
133 | ||
f67539c2 TL |
134 | class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, |
135 | metaclass=CLICommandMeta): | |
9f95a23c TL |
136 | |
137 | _STORE_HOST_PREFIX = "host" | |
138 | ||
139 | instance = None | |
140 | NATIVE_OPTIONS = [] # type: List[Any] | |
f67539c2 TL |
141 | MODULE_OPTIONS = [ |
142 | Option( | |
143 | 'ssh_config_file', | |
144 | type='str', | |
145 | default=None, | |
146 | desc='customized SSH config file to connect to managed hosts', | |
147 | ), | |
148 | Option( | |
149 | 'device_cache_timeout', | |
150 | type='secs', | |
151 | default=30 * 60, | |
152 | desc='seconds to cache device inventory', | |
153 | ), | |
154 | Option( | |
155 | 'device_enhanced_scan', | |
156 | type='bool', | |
157 | default=False, | |
158 | desc='Use libstoragemgmt during device scans', | |
159 | ), | |
160 | Option( | |
161 | 'daemon_cache_timeout', | |
162 | type='secs', | |
163 | default=10 * 60, | |
164 | desc='seconds to cache service (daemon) inventory', | |
165 | ), | |
166 | Option( | |
167 | 'facts_cache_timeout', | |
168 | type='secs', | |
169 | default=1 * 60, | |
170 | desc='seconds to cache host facts data', | |
171 | ), | |
172 | Option( | |
173 | 'host_check_interval', | |
174 | type='secs', | |
175 | default=10 * 60, | |
176 | desc='how frequently to perform a host check', | |
177 | ), | |
178 | Option( | |
179 | 'mode', | |
180 | type='str', | |
181 | enum_allowed=['root', 'cephadm-package'], | |
182 | default='root', | |
183 | desc='mode for remote execution of cephadm', | |
184 | ), | |
185 | Option( | |
186 | 'container_image_base', | |
187 | default=DEFAULT_IMAGE, | |
188 | desc='Container image name, without the tag', | |
189 | runtime=True, | |
190 | ), | |
191 | Option( | |
192 | 'container_image_prometheus', | |
193 | default=DEFAULT_PROMETHEUS_IMAGE, | |
194 | desc='Prometheus container image', | |
195 | ), | |
196 | Option( | |
197 | 'container_image_grafana', | |
198 | default=DEFAULT_GRAFANA_IMAGE, | |
199 | desc='Prometheus container image', | |
200 | ), | |
201 | Option( | |
202 | 'container_image_alertmanager', | |
203 | default=DEFAULT_ALERT_MANAGER_IMAGE, | |
204 | desc='Prometheus container image', | |
205 | ), | |
206 | Option( | |
207 | 'container_image_node_exporter', | |
208 | default=DEFAULT_NODE_EXPORTER_IMAGE, | |
209 | desc='Prometheus container image', | |
210 | ), | |
211 | Option( | |
212 | 'container_image_haproxy', | |
b3b6e05e | 213 | default=DEFAULT_HAPROXY_IMAGE, |
f67539c2 TL |
214 | desc='HAproxy container image', |
215 | ), | |
216 | Option( | |
217 | 'container_image_keepalived', | |
522d829b | 218 | default=DEFAULT_KEEPALIVED_IMAGE, |
f67539c2 TL |
219 | desc='Keepalived container image', |
220 | ), | |
221 | Option( | |
222 | 'warn_on_stray_hosts', | |
223 | type='bool', | |
224 | default=True, | |
225 | desc='raise a health warning if daemons are detected on a host ' | |
226 | 'that is not managed by cephadm', | |
227 | ), | |
228 | Option( | |
229 | 'warn_on_stray_daemons', | |
230 | type='bool', | |
231 | default=True, | |
232 | desc='raise a health warning if daemons are detected ' | |
233 | 'that are not managed by cephadm', | |
234 | ), | |
235 | Option( | |
236 | 'warn_on_failed_host_check', | |
237 | type='bool', | |
238 | default=True, | |
239 | desc='raise a health warning if the host check fails', | |
240 | ), | |
241 | Option( | |
242 | 'log_to_cluster', | |
243 | type='bool', | |
244 | default=True, | |
245 | desc='log to the "cephadm" cluster log channel"', | |
246 | ), | |
247 | Option( | |
248 | 'allow_ptrace', | |
249 | type='bool', | |
250 | default=False, | |
251 | desc='allow SYS_PTRACE capability on ceph containers', | |
252 | long_desc='The SYS_PTRACE capability is needed to attach to a ' | |
253 | 'process with gdb or strace. Enabling this options ' | |
254 | 'can allow debugging daemons that encounter problems ' | |
255 | 'at runtime.', | |
256 | ), | |
257 | Option( | |
258 | 'container_init', | |
259 | type='bool', | |
260 | default=True, | |
261 | desc='Run podman/docker with `--init`' | |
262 | ), | |
263 | Option( | |
264 | 'prometheus_alerts_path', | |
265 | type='str', | |
266 | default='/etc/prometheus/ceph/ceph_default_alerts.yml', | |
267 | desc='location of alerts to include in prometheus deployments', | |
268 | ), | |
269 | Option( | |
270 | 'migration_current', | |
271 | type='int', | |
272 | default=None, | |
273 | desc='internal - do not modify', | |
f6b5b4d7 | 274 | # used to track track spec and other data migrations. |
f67539c2 TL |
275 | ), |
276 | Option( | |
277 | 'config_dashboard', | |
278 | type='bool', | |
279 | default=True, | |
280 | desc='manage configs like API endpoints in Dashboard.' | |
281 | ), | |
282 | Option( | |
283 | 'manage_etc_ceph_ceph_conf', | |
284 | type='bool', | |
285 | default=False, | |
286 | desc='Manage and own /etc/ceph/ceph.conf on the hosts.', | |
287 | ), | |
b3b6e05e TL |
288 | Option( |
289 | 'manage_etc_ceph_ceph_conf_hosts', | |
290 | type='str', | |
291 | default='*', | |
292 | desc='PlacementSpec describing on which hosts to manage /etc/ceph/ceph.conf', | |
293 | ), | |
f67539c2 TL |
294 | Option( |
295 | 'registry_url', | |
296 | type='str', | |
297 | default=None, | |
298 | desc='Custom repository url' | |
299 | ), | |
300 | Option( | |
301 | 'registry_username', | |
302 | type='str', | |
303 | default=None, | |
304 | desc='Custom repository username' | |
305 | ), | |
306 | Option( | |
307 | 'registry_password', | |
308 | type='str', | |
309 | default=None, | |
310 | desc='Custom repository password' | |
311 | ), | |
a4b75251 TL |
312 | Option( |
313 | 'registry_insecure', | |
314 | type='bool', | |
315 | default=False, | |
316 | desc='Registry is to be considered insecure (no TLS available). Only for development purposes.' | |
317 | ), | |
f67539c2 TL |
318 | Option( |
319 | 'use_repo_digest', | |
320 | type='bool', | |
321 | default=True, | |
322 | desc='Automatically convert image tags to image digest. Make sure all daemons use the same image', | |
323 | ), | |
324 | Option( | |
325 | 'config_checks_enabled', | |
326 | type='bool', | |
327 | default=False, | |
328 | desc='Enable or disable the cephadm configuration analysis', | |
329 | ), | |
330 | Option( | |
331 | 'default_registry', | |
332 | type='str', | |
333 | default='docker.io', | |
334 | desc='Registry to which we should normalize unqualified image names', | |
335 | ), | |
336 | Option( | |
337 | 'max_count_per_host', | |
338 | type='int', | |
339 | default=10, | |
340 | desc='max number of daemons per service per host', | |
341 | ), | |
b3b6e05e TL |
342 | Option( |
343 | 'autotune_memory_target_ratio', | |
344 | type='float', | |
345 | default=.7, | |
346 | desc='ratio of total system memory to divide amongst autotuned daemons' | |
347 | ), | |
348 | Option( | |
349 | 'autotune_interval', | |
350 | type='secs', | |
351 | default=10 * 60, | |
352 | desc='how frequently to autotune daemon memory' | |
353 | ), | |
a4b75251 TL |
354 | Option( |
355 | 'max_osd_draining_count', | |
356 | type='int', | |
357 | default=10, | |
358 | desc='max number of osds that will be drained simultaneously when osds are removed' | |
359 | ), | |
9f95a23c TL |
360 | ] |
361 | ||
adb31ebb | 362 | def __init__(self, *args: Any, **kwargs: Any): |
9f95a23c | 363 | super(CephadmOrchestrator, self).__init__(*args, **kwargs) |
f67539c2 | 364 | self._cluster_fsid: str = self.get('mon_map')['fsid'] |
f6b5b4d7 | 365 | self.last_monmap: Optional[datetime.datetime] = None |
9f95a23c TL |
366 | |
367 | # for serve() | |
368 | self.run = True | |
369 | self.event = Event() | |
370 | ||
371 | if self.get_store('pause'): | |
372 | self.paused = True | |
373 | else: | |
374 | self.paused = False | |
375 | ||
376 | # for mypy which does not run the code | |
377 | if TYPE_CHECKING: | |
378 | self.ssh_config_file = None # type: Optional[str] | |
379 | self.device_cache_timeout = 0 | |
380 | self.daemon_cache_timeout = 0 | |
adb31ebb | 381 | self.facts_cache_timeout = 0 |
9f95a23c | 382 | self.host_check_interval = 0 |
f67539c2 | 383 | self.max_count_per_host = 0 |
9f95a23c TL |
384 | self.mode = '' |
385 | self.container_image_base = '' | |
e306af50 TL |
386 | self.container_image_prometheus = '' |
387 | self.container_image_grafana = '' | |
388 | self.container_image_alertmanager = '' | |
389 | self.container_image_node_exporter = '' | |
f67539c2 TL |
390 | self.container_image_haproxy = '' |
391 | self.container_image_keepalived = '' | |
9f95a23c TL |
392 | self.warn_on_stray_hosts = True |
393 | self.warn_on_stray_daemons = True | |
394 | self.warn_on_failed_host_check = True | |
395 | self.allow_ptrace = False | |
f67539c2 | 396 | self.container_init = True |
801d1391 | 397 | self.prometheus_alerts_path = '' |
adb31ebb | 398 | self.migration_current: Optional[int] = None |
f6b5b4d7 TL |
399 | self.config_dashboard = True |
400 | self.manage_etc_ceph_ceph_conf = True | |
b3b6e05e | 401 | self.manage_etc_ceph_ceph_conf_hosts = '*' |
f6b5b4d7 TL |
402 | self.registry_url: Optional[str] = None |
403 | self.registry_username: Optional[str] = None | |
404 | self.registry_password: Optional[str] = None | |
a4b75251 | 405 | self.registry_insecure: bool = False |
f67539c2 TL |
406 | self.use_repo_digest = True |
407 | self.default_registry = '' | |
b3b6e05e TL |
408 | self.autotune_memory_target_ratio = 0.0 |
409 | self.autotune_interval = 0 | |
a4b75251 TL |
410 | self.apply_spec_fails: List[Tuple[str, str]] = [] |
411 | self.max_osd_draining_count = 10 | |
9f95a23c | 412 | |
f91f0fd5 TL |
413 | self._cons: Dict[str, Tuple[remoto.backends.BaseConnection, |
414 | remoto.backends.LegacyModuleExecute]] = {} | |
f6b5b4d7 TL |
415 | |
416 | self.notify('mon_map', None) | |
9f95a23c TL |
417 | self.config_notify() |
418 | ||
419 | path = self.get_ceph_option('cephadm_path') | |
420 | try: | |
f67539c2 | 421 | assert isinstance(path, str) |
9f95a23c TL |
422 | with open(path, 'r') as f: |
423 | self._cephadm = f.read() | |
424 | except (IOError, TypeError) as e: | |
425 | raise RuntimeError("unable to read cephadm at '%s': %s" % ( | |
426 | path, str(e))) | |
427 | ||
f67539c2 TL |
428 | self.cephadm_binary_path = self._get_cephadm_binary_path() |
429 | ||
9f95a23c TL |
430 | self._worker_pool = multiprocessing.pool.ThreadPool(10) |
431 | ||
432 | self._reconfig_ssh() | |
433 | ||
434 | CephadmOrchestrator.instance = self | |
435 | ||
e306af50 | 436 | self.upgrade = CephadmUpgrade(self) |
9f95a23c | 437 | |
adb31ebb | 438 | self.health_checks: Dict[str, dict] = {} |
9f95a23c | 439 | |
e306af50 | 440 | self.inventory = Inventory(self) |
9f95a23c TL |
441 | |
442 | self.cache = HostCache(self) | |
443 | self.cache.load() | |
f6b5b4d7 | 444 | |
adb31ebb TL |
445 | self.to_remove_osds = OSDRemovalQueue(self) |
446 | self.to_remove_osds.load_from_store() | |
9f95a23c TL |
447 | |
448 | self.spec_store = SpecStore(self) | |
449 | self.spec_store.load() | |
450 | ||
b3b6e05e TL |
451 | self.keys = ClientKeyringStore(self) |
452 | self.keys.load() | |
453 | ||
9f95a23c TL |
454 | # ensure the host lists are in sync |
455 | for h in self.inventory.keys(): | |
456 | if h not in self.cache.daemons: | |
457 | self.cache.prime_empty_host(h) | |
458 | for h in self.cache.get_hosts(): | |
459 | if h not in self.inventory: | |
460 | self.cache.rm_host(h) | |
461 | ||
1911f103 | 462 | # in-memory only. |
f6b5b4d7 | 463 | self.events = EventStore(self) |
1911f103 TL |
464 | self.offline_hosts: Set[str] = set() |
465 | ||
f6b5b4d7 TL |
466 | self.migration = Migrations(self) |
467 | ||
f67539c2 TL |
468 | _service_clses: Sequence[Type[CephadmService]] = [ |
469 | OSDService, NFSService, MonService, MgrService, MdsService, | |
470 | RgwService, RbdMirrorService, GrafanaService, AlertmanagerService, | |
471 | PrometheusService, NodeExporterService, CrashService, IscsiService, | |
472 | IngressService, CustomContainerService, CephadmExporter, CephfsMirrorService | |
473 | ] | |
474 | ||
475 | # https://github.com/python/mypy/issues/8993 | |
476 | self.cephadm_services: Dict[str, CephadmService] = { | |
477 | cls.TYPE: cls(self) for cls in _service_clses} # type: ignore | |
478 | ||
479 | self.mgr_service: MgrService = cast(MgrService, self.cephadm_services['mgr']) | |
480 | self.osd_service: OSDService = cast(OSDService, self.cephadm_services['osd']) | |
e306af50 | 481 | |
f91f0fd5 | 482 | self.template = TemplateMgr(self) |
e306af50 | 483 | |
adb31ebb | 484 | self.requires_post_actions: Set[str] = set() |
522d829b | 485 | self.need_connect_dashboard_rgw = False |
f6b5b4d7 | 486 | |
f67539c2 TL |
487 | self.config_checker = CephadmConfigChecks(self) |
488 | ||
adb31ebb | 489 | def shutdown(self) -> None: |
9f95a23c TL |
490 | self.log.debug('shutdown') |
491 | self._worker_pool.close() | |
492 | self._worker_pool.join() | |
493 | self.run = False | |
494 | self.event.set() | |
495 | ||
e306af50 TL |
496 | def _get_cephadm_service(self, service_type: str) -> CephadmService: |
497 | assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES | |
498 | return self.cephadm_services[service_type] | |
499 | ||
f67539c2 TL |
500 | def _get_cephadm_binary_path(self) -> str: |
501 | import hashlib | |
502 | m = hashlib.sha256() | |
503 | m.update(self._cephadm.encode()) | |
504 | return f'/var/lib/ceph/{self._cluster_fsid}/cephadm.{m.hexdigest()}' | |
505 | ||
adb31ebb | 506 | def _kick_serve_loop(self) -> None: |
9f95a23c TL |
507 | self.log.debug('_kick_serve_loop') |
508 | self.event.set() | |
509 | ||
f6b5b4d7 TL |
510 | def serve(self) -> None: |
511 | """ | |
512 | The main loop of cephadm. | |
513 | ||
514 | A command handler will typically change the declarative state | |
515 | of cephadm. This loop will then attempt to apply this new state. | |
516 | """ | |
f91f0fd5 TL |
517 | serve = CephadmServe(self) |
518 | serve.serve() | |
519 | ||
adb31ebb | 520 | def set_container_image(self, entity: str, image: str) -> None: |
f91f0fd5 TL |
521 | self.check_mon_command({ |
522 | 'prefix': 'config set', | |
523 | 'name': 'container_image', | |
524 | 'value': image, | |
525 | 'who': entity, | |
526 | }) | |
f6b5b4d7 | 527 | |
adb31ebb | 528 | def config_notify(self) -> None: |
9f95a23c TL |
529 | """ |
530 | This method is called whenever one of our config options is changed. | |
f6b5b4d7 TL |
531 | |
532 | TODO: this method should be moved into mgr_module.py | |
9f95a23c TL |
533 | """ |
534 | for opt in self.MODULE_OPTIONS: | |
535 | setattr(self, | |
536 | opt['name'], # type: ignore | |
537 | self.get_module_option(opt['name'])) # type: ignore | |
538 | self.log.debug(' mgr option %s = %s', | |
539 | opt['name'], getattr(self, opt['name'])) # type: ignore | |
540 | for opt in self.NATIVE_OPTIONS: | |
541 | setattr(self, | |
542 | opt, # type: ignore | |
543 | self.get_ceph_option(opt)) | |
544 | self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore | |
545 | ||
546 | self.event.set() | |
547 | ||
adb31ebb | 548 | def notify(self, notify_type: str, notify_id: Optional[str]) -> None: |
f6b5b4d7 TL |
549 | if notify_type == "mon_map": |
550 | # get monmap mtime so we can refresh configs when mons change | |
551 | monmap = self.get('mon_map') | |
adb31ebb TL |
552 | self.last_monmap = str_to_datetime(monmap['modified']) |
553 | if self.last_monmap and self.last_monmap > datetime_now(): | |
f6b5b4d7 | 554 | self.last_monmap = None # just in case clocks are skewed |
f91f0fd5 TL |
555 | if getattr(self, 'manage_etc_ceph_ceph_conf', False): |
556 | # getattr, due to notify() being called before config_notify() | |
557 | self._kick_serve_loop() | |
f6b5b4d7 TL |
558 | if notify_type == "pg_summary": |
559 | self._trigger_osd_removal() | |
560 | ||
adb31ebb | 561 | def _trigger_osd_removal(self) -> None: |
f6b5b4d7 TL |
562 | data = self.get("osd_stats") |
563 | for osd in data.get('osd_stats', []): | |
564 | if osd.get('num_pgs') == 0: | |
565 | # if _ANY_ osd that is currently in the queue appears to be empty, | |
566 | # start the removal process | |
567 | if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids(): | |
f67539c2 | 568 | self.log.debug('Found empty osd. Starting removal process') |
f6b5b4d7 TL |
569 | # if the osd that is now empty is also part of the removal queue |
570 | # start the process | |
adb31ebb | 571 | self._kick_serve_loop() |
9f95a23c | 572 | |
adb31ebb | 573 | def pause(self) -> None: |
9f95a23c TL |
574 | if not self.paused: |
575 | self.log.info('Paused') | |
576 | self.set_store('pause', 'true') | |
577 | self.paused = True | |
578 | # wake loop so we update the health status | |
579 | self._kick_serve_loop() | |
580 | ||
adb31ebb | 581 | def resume(self) -> None: |
9f95a23c TL |
582 | if self.paused: |
583 | self.log.info('Resumed') | |
584 | self.paused = False | |
585 | self.set_store('pause', None) | |
586 | # unconditionally wake loop so that 'orch resume' can be used to kick | |
587 | # cephadm | |
588 | self._kick_serve_loop() | |
589 | ||
b3b6e05e TL |
590 | def get_unique_name( |
591 | self, | |
592 | daemon_type: str, | |
593 | host: str, | |
594 | existing: List[orchestrator.DaemonDescription], | |
595 | prefix: Optional[str] = None, | |
596 | forcename: Optional[str] = None, | |
597 | rank: Optional[int] = None, | |
598 | rank_generation: Optional[int] = None, | |
599 | ) -> str: | |
9f95a23c TL |
600 | """ |
601 | Generate a unique random service name | |
602 | """ | |
603 | suffix = daemon_type not in [ | |
b3b6e05e | 604 | 'mon', 'crash', |
9f95a23c | 605 | 'prometheus', 'node-exporter', 'grafana', 'alertmanager', |
f67539c2 | 606 | 'container', 'cephadm-exporter', |
9f95a23c TL |
607 | ] |
608 | if forcename: | |
609 | if len([d for d in existing if d.daemon_id == forcename]): | |
f91f0fd5 TL |
610 | raise orchestrator.OrchestratorValidationError( |
611 | f'name {daemon_type}.{forcename} already in use') | |
9f95a23c TL |
612 | return forcename |
613 | ||
614 | if '.' in host: | |
615 | host = host.split('.')[0] | |
616 | while True: | |
617 | if prefix: | |
618 | name = prefix + '.' | |
619 | else: | |
620 | name = '' | |
b3b6e05e TL |
621 | if rank is not None and rank_generation is not None: |
622 | name += f'{rank}.{rank_generation}.' | |
9f95a23c TL |
623 | name += host |
624 | if suffix: | |
625 | name += '.' + ''.join(random.choice(string.ascii_lowercase) | |
626 | for _ in range(6)) | |
627 | if len([d for d in existing if d.daemon_id == name]): | |
628 | if not suffix: | |
f91f0fd5 TL |
629 | raise orchestrator.OrchestratorValidationError( |
630 | f'name {daemon_type}.{name} already in use') | |
9f95a23c TL |
631 | self.log.debug('name %s exists, trying again', name) |
632 | continue | |
633 | return name | |
634 | ||
adb31ebb | 635 | def _reconfig_ssh(self) -> None: |
9f95a23c TL |
636 | temp_files = [] # type: list |
637 | ssh_options = [] # type: List[str] | |
638 | ||
639 | # ssh_config | |
640 | ssh_config_fname = self.ssh_config_file | |
641 | ssh_config = self.get_store("ssh_config") | |
642 | if ssh_config is not None or ssh_config_fname is None: | |
643 | if not ssh_config: | |
644 | ssh_config = DEFAULT_SSH_CONFIG | |
645 | f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-') | |
646 | os.fchmod(f.fileno(), 0o600) | |
647 | f.write(ssh_config.encode('utf-8')) | |
648 | f.flush() # make visible to other processes | |
649 | temp_files += [f] | |
650 | ssh_config_fname = f.name | |
651 | if ssh_config_fname: | |
801d1391 | 652 | self.validate_ssh_config_fname(ssh_config_fname) |
9f95a23c | 653 | ssh_options += ['-F', ssh_config_fname] |
f6b5b4d7 | 654 | self.ssh_config = ssh_config |
9f95a23c TL |
655 | |
656 | # identity | |
657 | ssh_key = self.get_store("ssh_identity_key") | |
658 | ssh_pub = self.get_store("ssh_identity_pub") | |
659 | self.ssh_pub = ssh_pub | |
660 | self.ssh_key = ssh_key | |
661 | if ssh_key and ssh_pub: | |
662 | tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-') | |
663 | tkey.write(ssh_key.encode('utf-8')) | |
664 | os.fchmod(tkey.fileno(), 0o600) | |
665 | tkey.flush() # make visible to other processes | |
666 | tpub = open(tkey.name + '.pub', 'w') | |
667 | os.fchmod(tpub.fileno(), 0o600) | |
668 | tpub.write(ssh_pub) | |
669 | tpub.flush() # make visible to other processes | |
670 | temp_files += [tkey, tpub] | |
671 | ssh_options += ['-i', tkey.name] | |
672 | ||
673 | self._temp_files = temp_files | |
674 | if ssh_options: | |
675 | self._ssh_options = ' '.join(ssh_options) # type: Optional[str] | |
676 | else: | |
677 | self._ssh_options = None | |
678 | ||
679 | if self.mode == 'root': | |
f6b5b4d7 | 680 | self.ssh_user = self.get_store('ssh_user', default='root') |
9f95a23c TL |
681 | elif self.mode == 'cephadm-package': |
682 | self.ssh_user = 'cephadm' | |
683 | ||
684 | self._reset_cons() | |
685 | ||
adb31ebb | 686 | def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None: |
f91f0fd5 TL |
687 | if ssh_config is None or len(ssh_config.strip()) == 0: |
688 | raise OrchestratorValidationError('ssh_config cannot be empty') | |
689 | # StrictHostKeyChecking is [yes|no] ? | |
f67539c2 TL |
690 | res = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config) |
691 | if not res: | |
f91f0fd5 | 692 | raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking') |
f67539c2 | 693 | for s in res: |
f91f0fd5 TL |
694 | if 'ask' in s.lower(): |
695 | raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'') | |
696 | ||
adb31ebb | 697 | def validate_ssh_config_fname(self, ssh_config_fname: str) -> None: |
801d1391 TL |
698 | if not os.path.isfile(ssh_config_fname): |
699 | raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format( | |
700 | ssh_config_fname)) | |
701 | ||
adb31ebb | 702 | def _reset_con(self, host: str) -> None: |
9f95a23c TL |
703 | conn, r = self._cons.get(host, (None, None)) |
704 | if conn: | |
705 | self.log.debug('_reset_con close %s' % host) | |
706 | conn.exit() | |
707 | del self._cons[host] | |
708 | ||
adb31ebb | 709 | def _reset_cons(self) -> None: |
9f95a23c TL |
710 | for host, conn_and_r in self._cons.items(): |
711 | self.log.debug('_reset_cons close %s' % host) | |
712 | conn, r = conn_and_r | |
713 | conn.exit() | |
714 | self._cons = {} | |
715 | ||
adb31ebb | 716 | def offline_hosts_remove(self, host: str) -> None: |
1911f103 TL |
717 | if host in self.offline_hosts: |
718 | self.offline_hosts.remove(host) | |
719 | ||
9f95a23c | 720 | @staticmethod |
adb31ebb | 721 | def can_run() -> Tuple[bool, str]: |
9f95a23c TL |
722 | if remoto is not None: |
723 | return True, "" | |
724 | else: | |
725 | return False, "loading remoto library:{}".format( | |
f91f0fd5 | 726 | remoto_import_error) |
9f95a23c | 727 | |
f67539c2 | 728 | def available(self) -> Tuple[bool, str, Dict[str, Any]]: |
9f95a23c TL |
729 | """ |
730 | The cephadm orchestrator is always available. | |
731 | """ | |
f6b5b4d7 TL |
732 | ok, err = self.can_run() |
733 | if not ok: | |
f67539c2 | 734 | return ok, err, {} |
f6b5b4d7 | 735 | if not self.ssh_key or not self.ssh_pub: |
f67539c2 | 736 | return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`', {} |
9f95a23c | 737 | |
f67539c2 TL |
738 | # mypy is unable to determine type for _processes since it's private |
739 | worker_count: int = self._worker_pool._processes # type: ignore | |
740 | ret = { | |
741 | "workers": worker_count, | |
742 | "paused": self.paused, | |
743 | } | |
9f95a23c | 744 | |
f67539c2 | 745 | return True, err, ret |
9f95a23c | 746 | |
b3b6e05e TL |
747 | def _validate_and_set_ssh_val(self, what: str, new: Optional[str], old: Optional[str]) -> None: |
748 | self.set_store(what, new) | |
749 | self._reconfig_ssh() | |
750 | if self.cache.get_hosts(): | |
751 | # Can't check anything without hosts | |
752 | host = self.cache.get_hosts()[0] | |
753 | r = CephadmServe(self)._check_host(host) | |
754 | if r is not None: | |
755 | # connection failed reset user | |
756 | self.set_store(what, old) | |
757 | self._reconfig_ssh() | |
758 | raise OrchestratorError('ssh connection %s@%s failed' % (self.ssh_user, host)) | |
759 | self.log.info(f'Set ssh {what}') | |
760 | ||
9f95a23c | 761 | @orchestrator._cli_write_command( |
f67539c2 | 762 | prefix='cephadm set-ssh-config') |
adb31ebb | 763 | def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: |
9f95a23c | 764 | """ |
f67539c2 | 765 | Set the ssh_config file (use -i <ssh_config>) |
9f95a23c | 766 | """ |
f67539c2 TL |
767 | # Set an ssh_config file provided from stdin |
768 | ||
b3b6e05e TL |
769 | old = self.ssh_config |
770 | if inbuf == old: | |
f6b5b4d7 | 771 | return 0, "value unchanged", "" |
f91f0fd5 | 772 | self.validate_ssh_config_content(inbuf) |
b3b6e05e | 773 | self._validate_and_set_ssh_val('ssh_config', inbuf, old) |
9f95a23c TL |
774 | return 0, "", "" |
775 | ||
f67539c2 | 776 | @orchestrator._cli_write_command('cephadm clear-ssh-config') |
adb31ebb | 777 | def _clear_ssh_config(self) -> Tuple[int, str, str]: |
9f95a23c | 778 | """ |
f67539c2 | 779 | Clear the ssh_config file |
9f95a23c | 780 | """ |
f67539c2 | 781 | # Clear the ssh_config file provided from stdin |
9f95a23c TL |
782 | self.set_store("ssh_config", None) |
783 | self.ssh_config_tmp = None | |
784 | self.log.info('Cleared ssh_config') | |
f6b5b4d7 | 785 | self._reconfig_ssh() |
9f95a23c TL |
786 | return 0, "", "" |
787 | ||
f67539c2 | 788 | @orchestrator._cli_read_command('cephadm get-ssh-config') |
adb31ebb | 789 | def _get_ssh_config(self) -> HandleCommandResult: |
f67539c2 TL |
790 | """ |
791 | Returns the ssh config as used by cephadm | |
792 | """ | |
801d1391 TL |
793 | if self.ssh_config_file: |
794 | self.validate_ssh_config_fname(self.ssh_config_file) | |
795 | with open(self.ssh_config_file) as f: | |
796 | return HandleCommandResult(stdout=f.read()) | |
797 | ssh_config = self.get_store("ssh_config") | |
798 | if ssh_config: | |
799 | return HandleCommandResult(stdout=ssh_config) | |
800 | return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG) | |
801 | ||
f67539c2 | 802 | @orchestrator._cli_write_command('cephadm generate-key') |
adb31ebb | 803 | def _generate_key(self) -> Tuple[int, str, str]: |
f67539c2 TL |
804 | """ |
805 | Generate a cluster SSH key (if not present) | |
806 | """ | |
9f95a23c TL |
807 | if not self.ssh_pub or not self.ssh_key: |
808 | self.log.info('Generating ssh key...') | |
809 | tmp_dir = TemporaryDirectory() | |
810 | path = tmp_dir.name + '/key' | |
811 | try: | |
1911f103 | 812 | subprocess.check_call([ |
9f95a23c TL |
813 | '/usr/bin/ssh-keygen', |
814 | '-C', 'ceph-%s' % self._cluster_fsid, | |
815 | '-N', '', | |
816 | '-f', path | |
817 | ]) | |
818 | with open(path, 'r') as f: | |
819 | secret = f.read() | |
820 | with open(path + '.pub', 'r') as f: | |
821 | pub = f.read() | |
822 | finally: | |
823 | os.unlink(path) | |
824 | os.unlink(path + '.pub') | |
825 | tmp_dir.cleanup() | |
826 | self.set_store('ssh_identity_key', secret) | |
827 | self.set_store('ssh_identity_pub', pub) | |
828 | self._reconfig_ssh() | |
829 | return 0, '', '' | |
830 | ||
e306af50 | 831 | @orchestrator._cli_write_command( |
f67539c2 | 832 | 'cephadm set-priv-key') |
adb31ebb | 833 | def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: |
f67539c2 | 834 | """Set cluster SSH private key (use -i <private_key>)""" |
e306af50 TL |
835 | if inbuf is None or len(inbuf) == 0: |
836 | return -errno.EINVAL, "", "empty private ssh key provided" | |
b3b6e05e TL |
837 | old = self.ssh_key |
838 | if inbuf == old: | |
f6b5b4d7 | 839 | return 0, "value unchanged", "" |
b3b6e05e | 840 | self._validate_and_set_ssh_val('ssh_identity_key', inbuf, old) |
e306af50 | 841 | self.log.info('Set ssh private key') |
e306af50 TL |
842 | return 0, "", "" |
843 | ||
844 | @orchestrator._cli_write_command( | |
f67539c2 | 845 | 'cephadm set-pub-key') |
adb31ebb | 846 | def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: |
f67539c2 | 847 | """Set cluster SSH public key (use -i <public_key>)""" |
e306af50 TL |
848 | if inbuf is None or len(inbuf) == 0: |
849 | return -errno.EINVAL, "", "empty public ssh key provided" | |
b3b6e05e TL |
850 | old = self.ssh_pub |
851 | if inbuf == old: | |
f6b5b4d7 | 852 | return 0, "value unchanged", "" |
b3b6e05e | 853 | self._validate_and_set_ssh_val('ssh_identity_pub', inbuf, old) |
e306af50 TL |
854 | return 0, "", "" |
855 | ||
9f95a23c | 856 | @orchestrator._cli_write_command( |
f67539c2 | 857 | 'cephadm clear-key') |
adb31ebb | 858 | def _clear_key(self) -> Tuple[int, str, str]: |
f67539c2 | 859 | """Clear cluster SSH key""" |
9f95a23c TL |
860 | self.set_store('ssh_identity_key', None) |
861 | self.set_store('ssh_identity_pub', None) | |
862 | self._reconfig_ssh() | |
863 | self.log.info('Cleared cluster SSH key') | |
864 | return 0, '', '' | |
865 | ||
866 | @orchestrator._cli_read_command( | |
f67539c2 | 867 | 'cephadm get-pub-key') |
adb31ebb | 868 | def _get_pub_key(self) -> Tuple[int, str, str]: |
f67539c2 | 869 | """Show SSH public key for connecting to cluster hosts""" |
9f95a23c TL |
870 | if self.ssh_pub: |
871 | return 0, self.ssh_pub, '' | |
872 | else: | |
873 | return -errno.ENOENT, '', 'No cluster SSH key defined' | |
874 | ||
875 | @orchestrator._cli_read_command( | |
f67539c2 | 876 | 'cephadm get-user') |
adb31ebb | 877 | def _get_user(self) -> Tuple[int, str, str]: |
f67539c2 TL |
878 | """ |
879 | Show user for SSHing to cluster hosts | |
880 | """ | |
881 | if self.ssh_user is None: | |
882 | return -errno.ENOENT, '', 'No cluster SSH user configured' | |
883 | else: | |
884 | return 0, self.ssh_user, '' | |
9f95a23c | 885 | |
f6b5b4d7 | 886 | @orchestrator._cli_read_command( |
f67539c2 | 887 | 'cephadm set-user') |
adb31ebb | 888 | def set_ssh_user(self, user: str) -> Tuple[int, str, str]: |
f67539c2 TL |
889 | """ |
890 | Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users | |
891 | """ | |
f6b5b4d7 TL |
892 | current_user = self.ssh_user |
893 | if user == current_user: | |
894 | return 0, "value unchanged", "" | |
895 | ||
b3b6e05e | 896 | self._validate_and_set_ssh_val('ssh_user', user, current_user) |
f6b5b4d7 TL |
897 | |
898 | msg = 'ssh user set to %s' % user | |
899 | if user != 'root': | |
b3b6e05e | 900 | msg += '. sudo will be used' |
f6b5b4d7 TL |
901 | self.log.info(msg) |
902 | return 0, msg, '' | |
903 | ||
904 | @orchestrator._cli_read_command( | |
f67539c2 | 905 | 'cephadm registry-login') |
adb31ebb | 906 | def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]: |
f67539c2 TL |
907 | """ |
908 | Set custom registry login info by providing url, username and password or json file with login info (-i <file>) | |
909 | """ | |
f6b5b4d7 TL |
910 | # if password not given in command line, get it through file input |
911 | if not (url and username and password) and (inbuf is None or len(inbuf) == 0): | |
912 | return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> " | |
f91f0fd5 | 913 | "or -i <login credentials json file>") |
f6b5b4d7 | 914 | elif not (url and username and password): |
adb31ebb | 915 | assert isinstance(inbuf, str) |
f6b5b4d7 TL |
916 | login_info = json.loads(inbuf) |
917 | if "url" in login_info and "username" in login_info and "password" in login_info: | |
918 | url = login_info["url"] | |
919 | username = login_info["username"] | |
920 | password = login_info["password"] | |
921 | else: | |
922 | return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. " | |
f91f0fd5 TL |
923 | "Please setup json file as\n" |
924 | "{\n" | |
925 | " \"url\": \"REGISTRY_URL\",\n" | |
926 | " \"username\": \"REGISTRY_USERNAME\",\n" | |
927 | " \"password\": \"REGISTRY_PASSWORD\"\n" | |
928 | "}\n") | |
f6b5b4d7 TL |
929 | # verify login info works by attempting login on random host |
930 | host = None | |
931 | for host_name in self.inventory.keys(): | |
932 | host = host_name | |
933 | break | |
934 | if not host: | |
935 | raise OrchestratorError('no hosts defined') | |
f67539c2 | 936 | r = CephadmServe(self)._registry_login(host, url, username, password) |
f6b5b4d7 TL |
937 | if r is not None: |
938 | return 1, '', r | |
939 | # if logins succeeded, store info | |
940 | self.log.debug("Host logins successful. Storing login info.") | |
941 | self.set_module_option('registry_url', url) | |
942 | self.set_module_option('registry_username', username) | |
943 | self.set_module_option('registry_password', password) | |
944 | # distribute new login info to all hosts | |
945 | self.cache.distribute_new_registry_login_info() | |
946 | return 0, "registry login scheduled", '' | |
947 | ||
f67539c2 | 948 | @orchestrator._cli_read_command('cephadm check-host') |
adb31ebb | 949 | def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: |
f67539c2 | 950 | """Check whether we can access and manage a remote host""" |
f6b5b4d7 | 951 | try: |
f67539c2 TL |
952 | out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'check-host', |
953 | ['--expect-hostname', host], | |
954 | addr=addr, | |
955 | error_ok=True, no_fsid=True) | |
f6b5b4d7 TL |
956 | if code: |
957 | return 1, '', ('check-host failed:\n' + '\n'.join(err)) | |
f67539c2 | 958 | except OrchestratorError: |
f6b5b4d7 | 959 | self.log.exception(f"check-host failed for '{host}'") |
f67539c2 TL |
960 | return 1, '', ('check-host failed:\n' |
961 | + f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.") | |
9f95a23c TL |
962 | # if we have an outstanding health alert for this host, give the |
963 | # serve thread a kick | |
964 | if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: | |
965 | for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: | |
966 | if item.startswith('host %s ' % host): | |
967 | self.event.set() | |
adb31ebb | 968 | return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) |
9f95a23c TL |
969 | |
970 | @orchestrator._cli_read_command( | |
f67539c2 | 971 | 'cephadm prepare-host') |
adb31ebb | 972 | def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: |
f67539c2 TL |
973 | """Prepare a remote host for use with cephadm""" |
974 | out, err, code = CephadmServe(self)._run_cephadm(host, cephadmNoImage, 'prepare-host', | |
975 | ['--expect-hostname', host], | |
976 | addr=addr, | |
977 | error_ok=True, no_fsid=True) | |
9f95a23c TL |
978 | if code: |
979 | return 1, '', ('prepare-host failed:\n' + '\n'.join(err)) | |
980 | # if we have an outstanding health alert for this host, give the | |
981 | # serve thread a kick | |
982 | if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: | |
983 | for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: | |
984 | if item.startswith('host %s ' % host): | |
985 | self.event.set() | |
adb31ebb | 986 | return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) |
9f95a23c | 987 | |
f91f0fd5 | 988 | @orchestrator._cli_write_command( |
f67539c2 | 989 | prefix='cephadm set-extra-ceph-conf') |
adb31ebb | 990 | def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult: |
f67539c2 TL |
991 | """ |
992 | Text that is appended to all daemon's ceph.conf. | |
993 | Mainly a workaround, till `config generate-minimal-conf` generates | |
994 | a complete ceph.conf. | |
995 | ||
996 | Warning: this is a dangerous operation. | |
997 | """ | |
f91f0fd5 TL |
998 | if inbuf: |
999 | # sanity check. | |
1000 | cp = ConfigParser() | |
1001 | cp.read_string(inbuf, source='<infile>') | |
1002 | ||
1003 | self.set_store("extra_ceph_conf", json.dumps({ | |
1004 | 'conf': inbuf, | |
adb31ebb | 1005 | 'last_modified': datetime_to_str(datetime_now()) |
f91f0fd5 TL |
1006 | })) |
1007 | self.log.info('Set extra_ceph_conf') | |
1008 | self._kick_serve_loop() | |
1009 | return HandleCommandResult() | |
1010 | ||
1011 | @orchestrator._cli_read_command( | |
f67539c2 | 1012 | 'cephadm get-extra-ceph-conf') |
f91f0fd5 | 1013 | def _get_extra_ceph_conf(self) -> HandleCommandResult: |
f67539c2 TL |
1014 | """ |
1015 | Get extra ceph conf that is appended | |
1016 | """ | |
f91f0fd5 TL |
1017 | return HandleCommandResult(stdout=self.extra_ceph_conf().conf) |
1018 | ||
f67539c2 TL |
1019 | def _set_exporter_config(self, config: Dict[str, str]) -> None: |
1020 | self.set_store('exporter_config', json.dumps(config)) | |
1021 | ||
1022 | def _get_exporter_config(self) -> Dict[str, str]: | |
1023 | cfg_str = self.get_store('exporter_config') | |
1024 | return json.loads(cfg_str) if cfg_str else {} | |
1025 | ||
1026 | def _set_exporter_option(self, option: str, value: Optional[str] = None) -> None: | |
1027 | kv_option = f'exporter_{option}' | |
1028 | self.set_store(kv_option, value) | |
1029 | ||
1030 | def _get_exporter_option(self, option: str) -> Optional[str]: | |
1031 | kv_option = f'exporter_{option}' | |
1032 | return self.get_store(kv_option) | |
1033 | ||
1034 | @orchestrator._cli_write_command( | |
1035 | prefix='cephadm generate-exporter-config') | |
1036 | @service_inactive('cephadm-exporter') | |
1037 | def _generate_exporter_config(self) -> Tuple[int, str, str]: | |
1038 | """ | |
1039 | Generate default SSL crt/key and token for cephadm exporter daemons | |
1040 | """ | |
1041 | self._set_exporter_defaults() | |
1042 | self.log.info('Default settings created for cephadm exporter(s)') | |
1043 | return 0, "", "" | |
1044 | ||
1045 | def _set_exporter_defaults(self) -> None: | |
1046 | crt, key = self._generate_exporter_ssl() | |
1047 | token = self._generate_exporter_token() | |
1048 | self._set_exporter_config({ | |
1049 | "crt": crt, | |
1050 | "key": key, | |
1051 | "token": token, | |
1052 | "port": CephadmExporterConfig.DEFAULT_PORT | |
1053 | }) | |
1054 | self._set_exporter_option('enabled', 'true') | |
1055 | ||
1056 | def _generate_exporter_ssl(self) -> Tuple[str, str]: | |
1057 | return create_self_signed_cert(dname={"O": "Ceph", "OU": "cephadm-exporter"}) | |
1058 | ||
1059 | def _generate_exporter_token(self) -> str: | |
1060 | return secrets.token_hex(32) | |
1061 | ||
1062 | @orchestrator._cli_write_command( | |
1063 | prefix='cephadm clear-exporter-config') | |
1064 | @service_inactive('cephadm-exporter') | |
1065 | def _clear_exporter_config(self) -> Tuple[int, str, str]: | |
1066 | """ | |
1067 | Clear the SSL configuration used by cephadm exporter daemons | |
1068 | """ | |
1069 | self._clear_exporter_config_settings() | |
1070 | self.log.info('Cleared cephadm exporter configuration') | |
1071 | return 0, "", "" | |
1072 | ||
1073 | def _clear_exporter_config_settings(self) -> None: | |
1074 | self.set_store('exporter_config', None) | |
1075 | self._set_exporter_option('enabled', None) | |
1076 | ||
1077 | @orchestrator._cli_write_command( | |
1078 | prefix='cephadm set-exporter-config') | |
1079 | @service_inactive('cephadm-exporter') | |
1080 | def _store_exporter_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: | |
1081 | """ | |
1082 | Set custom cephadm-exporter configuration from a json file (-i <file>). JSON must contain crt, key, token and port | |
1083 | """ | |
1084 | if not inbuf: | |
1085 | return 1, "", "JSON configuration has not been provided (-i <filename>)" | |
1086 | ||
1087 | cfg = CephadmExporterConfig(self) | |
1088 | rc, reason = cfg.load_from_json(inbuf) | |
1089 | if rc: | |
1090 | return 1, "", reason | |
1091 | ||
1092 | rc, reason = cfg.validate_config() | |
1093 | if rc: | |
1094 | return 1, "", reason | |
1095 | ||
1096 | self._set_exporter_config({ | |
1097 | "crt": cfg.crt, | |
1098 | "key": cfg.key, | |
1099 | "token": cfg.token, | |
1100 | "port": cfg.port | |
1101 | }) | |
1102 | self.log.info("Loaded and verified the TLS configuration") | |
1103 | return 0, "", "" | |
1104 | ||
1105 | @orchestrator._cli_read_command( | |
1106 | 'cephadm get-exporter-config') | |
1107 | def _show_exporter_config(self) -> Tuple[int, str, str]: | |
1108 | """ | |
1109 | Show the current cephadm-exporter configuraion (JSON)' | |
1110 | """ | |
1111 | cfg = self._get_exporter_config() | |
1112 | return 0, json.dumps(cfg, indent=2), "" | |
1113 | ||
1114 | @orchestrator._cli_read_command('cephadm config-check ls') | |
1115 | def _config_checks_list(self, format: Format = Format.plain) -> HandleCommandResult: | |
1116 | """List the available configuration checks and their current state""" | |
1117 | ||
1118 | if format not in [Format.plain, Format.json, Format.json_pretty]: | |
1119 | return HandleCommandResult( | |
1120 | retval=1, | |
1121 | stderr="Requested format is not supported when listing configuration checks" | |
1122 | ) | |
1123 | ||
1124 | if format in [Format.json, Format.json_pretty]: | |
1125 | return HandleCommandResult( | |
1126 | stdout=to_format(self.config_checker.health_checks, | |
1127 | format, | |
1128 | many=True, | |
1129 | cls=None)) | |
1130 | ||
1131 | # plain formatting | |
1132 | table = PrettyTable( | |
1133 | ['NAME', | |
1134 | 'HEALTHCHECK', | |
1135 | 'STATUS', | |
1136 | 'DESCRIPTION' | |
1137 | ], border=False) | |
1138 | table.align['NAME'] = 'l' | |
1139 | table.align['HEALTHCHECK'] = 'l' | |
1140 | table.align['STATUS'] = 'l' | |
1141 | table.align['DESCRIPTION'] = 'l' | |
1142 | table.left_padding_width = 0 | |
1143 | table.right_padding_width = 2 | |
1144 | for c in self.config_checker.health_checks: | |
1145 | table.add_row(( | |
1146 | c.name, | |
1147 | c.healthcheck_name, | |
1148 | c.status, | |
1149 | c.description, | |
1150 | )) | |
1151 | ||
1152 | return HandleCommandResult(stdout=table.get_string()) | |
1153 | ||
1154 | @orchestrator._cli_read_command('cephadm config-check status') | |
1155 | def _config_check_status(self) -> HandleCommandResult: | |
1156 | """Show whether the configuration checker feature is enabled/disabled""" | |
1157 | status = self.get_module_option('config_checks_enabled') | |
1158 | return HandleCommandResult(stdout="Enabled" if status else "Disabled") | |
1159 | ||
1160 | @orchestrator._cli_write_command('cephadm config-check enable') | |
1161 | def _config_check_enable(self, check_name: str) -> HandleCommandResult: | |
1162 | """Enable a specific configuration check""" | |
1163 | if not self._config_check_valid(check_name): | |
1164 | return HandleCommandResult(retval=1, stderr="Invalid check name") | |
1165 | ||
1166 | err, msg = self._update_config_check(check_name, 'enabled') | |
1167 | if err: | |
1168 | return HandleCommandResult( | |
1169 | retval=err, | |
1170 | stderr=f"Failed to enable check '{check_name}' : {msg}") | |
1171 | ||
1172 | return HandleCommandResult(stdout="ok") | |
1173 | ||
1174 | @orchestrator._cli_write_command('cephadm config-check disable') | |
1175 | def _config_check_disable(self, check_name: str) -> HandleCommandResult: | |
1176 | """Disable a specific configuration check""" | |
1177 | if not self._config_check_valid(check_name): | |
1178 | return HandleCommandResult(retval=1, stderr="Invalid check name") | |
1179 | ||
1180 | err, msg = self._update_config_check(check_name, 'disabled') | |
1181 | if err: | |
1182 | return HandleCommandResult(retval=err, stderr=f"Failed to disable check '{check_name}': {msg}") | |
1183 | else: | |
1184 | # drop any outstanding raised healthcheck for this check | |
1185 | config_check = self.config_checker.lookup_check(check_name) | |
1186 | if config_check: | |
1187 | if config_check.healthcheck_name in self.health_checks: | |
1188 | self.health_checks.pop(config_check.healthcheck_name, None) | |
1189 | self.set_health_checks(self.health_checks) | |
1190 | else: | |
1191 | self.log.error( | |
1192 | f"Unable to resolve a check name ({check_name}) to a healthcheck definition?") | |
1193 | ||
1194 | return HandleCommandResult(stdout="ok") | |
1195 | ||
1196 | def _config_check_valid(self, check_name: str) -> bool: | |
1197 | return check_name in [chk.name for chk in self.config_checker.health_checks] | |
1198 | ||
1199 | def _update_config_check(self, check_name: str, status: str) -> Tuple[int, str]: | |
1200 | checks_raw = self.get_store('config_checks') | |
1201 | if not checks_raw: | |
1202 | return 1, "config_checks setting is not available" | |
1203 | ||
1204 | checks = json.loads(checks_raw) | |
1205 | checks.update({ | |
1206 | check_name: status | |
1207 | }) | |
1208 | self.log.info(f"updated config check '{check_name}' : {status}") | |
1209 | self.set_store('config_checks', json.dumps(checks)) | |
1210 | return 0, "" | |
1211 | ||
f91f0fd5 TL |
1212 | class ExtraCephConf(NamedTuple): |
1213 | conf: str | |
1214 | last_modified: Optional[datetime.datetime] | |
1215 | ||
1216 | def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf': | |
1217 | data = self.get_store('extra_ceph_conf') | |
1218 | if not data: | |
1219 | return CephadmOrchestrator.ExtraCephConf('', None) | |
1220 | try: | |
1221 | j = json.loads(data) | |
1222 | except ValueError: | |
adb31ebb TL |
1223 | msg = 'Unable to load extra_ceph_conf: Cannot decode JSON' |
1224 | self.log.exception('%s: \'%s\'', msg, data) | |
f91f0fd5 TL |
1225 | return CephadmOrchestrator.ExtraCephConf('', None) |
1226 | return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified'])) | |
1227 | ||
1228 | def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool: | |
1229 | conf = self.extra_ceph_conf() | |
1230 | if not conf.last_modified: | |
1231 | return False | |
1232 | return conf.last_modified > dt | |
1233 | ||
f67539c2 TL |
1234 | @orchestrator._cli_write_command( |
1235 | 'cephadm osd activate' | |
1236 | ) | |
1237 | def _osd_activate(self, host: List[str]) -> HandleCommandResult: | |
1238 | """ | |
1239 | Start OSD containers for existing OSDs | |
1240 | """ | |
1241 | ||
1242 | @forall_hosts | |
1243 | def run(h: str) -> str: | |
1244 | return self.osd_service.deploy_osd_daemons_for_existing_osds(h, 'osd') | |
1245 | ||
1246 | return HandleCommandResult(stdout='\n'.join(run(host))) | |
1247 | ||
b3b6e05e TL |
1248 | @orchestrator._cli_read_command('orch client-keyring ls') |
1249 | def _client_keyring_ls(self, format: Format = Format.plain) -> HandleCommandResult: | |
522d829b TL |
1250 | """ |
1251 | List client keyrings under cephadm management | |
1252 | """ | |
b3b6e05e TL |
1253 | if format != Format.plain: |
1254 | output = to_format(self.keys.keys.values(), format, many=True, cls=ClientKeyringSpec) | |
1255 | else: | |
1256 | table = PrettyTable( | |
1257 | ['ENTITY', 'PLACEMENT', 'MODE', 'OWNER', 'PATH'], | |
1258 | border=False) | |
1259 | table.align = 'l' | |
1260 | table.left_padding_width = 0 | |
1261 | table.right_padding_width = 2 | |
1262 | for ks in sorted(self.keys.keys.values(), key=lambda ks: ks.entity): | |
1263 | table.add_row(( | |
1264 | ks.entity, ks.placement.pretty_str(), | |
1265 | utils.file_mode_to_str(ks.mode), | |
1266 | f'{ks.uid}:{ks.gid}', | |
1267 | ks.path, | |
1268 | )) | |
1269 | output = table.get_string() | |
1270 | return HandleCommandResult(stdout=output) | |
1271 | ||
1272 | @orchestrator._cli_write_command('orch client-keyring set') | |
1273 | def _client_keyring_set( | |
1274 | self, | |
1275 | entity: str, | |
1276 | placement: str, | |
1277 | owner: Optional[str] = None, | |
1278 | mode: Optional[str] = None, | |
1279 | ) -> HandleCommandResult: | |
522d829b TL |
1280 | """ |
1281 | Add or update client keyring under cephadm management | |
1282 | """ | |
b3b6e05e TL |
1283 | if not entity.startswith('client.'): |
1284 | raise OrchestratorError('entity must start with client.') | |
1285 | if owner: | |
1286 | try: | |
1287 | uid, gid = map(int, owner.split(':')) | |
1288 | except Exception: | |
1289 | raise OrchestratorError('owner must look like "<uid>:<gid>", e.g., "0:0"') | |
1290 | else: | |
1291 | uid = 0 | |
1292 | gid = 0 | |
1293 | if mode: | |
1294 | try: | |
1295 | imode = int(mode, 8) | |
1296 | except Exception: | |
1297 | raise OrchestratorError('mode must be an octal mode, e.g. "600"') | |
1298 | else: | |
1299 | imode = 0o600 | |
1300 | pspec = PlacementSpec.from_string(placement) | |
1301 | ks = ClientKeyringSpec(entity, pspec, mode=imode, uid=uid, gid=gid) | |
1302 | self.keys.update(ks) | |
1303 | self._kick_serve_loop() | |
1304 | return HandleCommandResult() | |
1305 | ||
1306 | @orchestrator._cli_write_command('orch client-keyring rm') | |
1307 | def _client_keyring_rm( | |
1308 | self, | |
1309 | entity: str, | |
1310 | ) -> HandleCommandResult: | |
522d829b TL |
1311 | """ |
1312 | Remove client keyring from cephadm management | |
1313 | """ | |
b3b6e05e TL |
1314 | self.keys.rm(entity) |
1315 | self._kick_serve_loop() | |
1316 | return HandleCommandResult() | |
1317 | ||
adb31ebb TL |
1318 | def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection', |
1319 | 'remoto.backends.LegacyModuleExecute']: | |
9f95a23c TL |
1320 | """ |
1321 | Setup a connection for running commands on remote host. | |
1322 | """ | |
e306af50 TL |
1323 | conn, r = self._cons.get(host, (None, None)) |
1324 | if conn: | |
1325 | if conn.has_connection(): | |
1326 | self.log.debug('Have connection to %s' % host) | |
1327 | return conn, r | |
1328 | else: | |
1329 | self._reset_con(host) | |
f67539c2 | 1330 | assert self.ssh_user |
9f95a23c TL |
1331 | n = self.ssh_user + '@' + host |
1332 | self.log.debug("Opening connection to {} with ssh options '{}'".format( | |
1333 | n, self._ssh_options)) | |
f91f0fd5 | 1334 | child_logger = self.log.getChild(n) |
9f95a23c TL |
1335 | child_logger.setLevel('WARNING') |
1336 | conn = remoto.Connection( | |
1337 | n, | |
1338 | logger=child_logger, | |
f6b5b4d7 TL |
1339 | ssh_options=self._ssh_options, |
1340 | sudo=True if self.ssh_user != 'root' else False) | |
9f95a23c TL |
1341 | |
1342 | r = conn.import_module(remotes) | |
1343 | self._cons[host] = conn, r | |
1344 | ||
1345 | return conn, r | |
1346 | ||
adb31ebb | 1347 | def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str: |
9f95a23c TL |
1348 | """ |
1349 | Remote validator that accepts a connection object to ensure that a certain | |
1350 | executable is available returning its full path if so. | |
1351 | ||
1352 | Otherwise an exception with thorough details will be raised, informing the | |
1353 | user that the executable was not found. | |
1354 | """ | |
1355 | executable_path = conn.remote_module.which(executable) | |
1356 | if not executable_path: | |
1357 | raise RuntimeError("Executable '{}' not found on host '{}'".format( | |
1358 | executable, conn.hostname)) | |
1359 | self.log.debug("Found executable '{}' at path '{}'".format(executable, | |
f91f0fd5 | 1360 | executable_path)) |
9f95a23c TL |
1361 | return executable_path |
1362 | ||
f91f0fd5 | 1363 | def _get_container_image(self, daemon_name: str) -> Optional[str]: |
f6b5b4d7 | 1364 | daemon_type = daemon_name.split('.', 1)[0] # type: ignore |
f67539c2 | 1365 | image: Optional[str] = None |
a4b75251 | 1366 | if daemon_type in CEPH_IMAGE_TYPES: |
f6b5b4d7 | 1367 | # get container image |
f67539c2 TL |
1368 | image = str(self.get_foreign_ceph_option( |
1369 | utils.name_to_config_section(daemon_name), | |
1370 | 'container_image' | |
1371 | )).strip() | |
f6b5b4d7 TL |
1372 | elif daemon_type == 'prometheus': |
1373 | image = self.container_image_prometheus | |
1374 | elif daemon_type == 'grafana': | |
1375 | image = self.container_image_grafana | |
1376 | elif daemon_type == 'alertmanager': | |
1377 | image = self.container_image_alertmanager | |
1378 | elif daemon_type == 'node-exporter': | |
1379 | image = self.container_image_node_exporter | |
f67539c2 TL |
1380 | elif daemon_type == 'haproxy': |
1381 | image = self.container_image_haproxy | |
1382 | elif daemon_type == 'keepalived': | |
1383 | image = self.container_image_keepalived | |
f91f0fd5 TL |
1384 | elif daemon_type == CustomContainerService.TYPE: |
1385 | # The image can't be resolved, the necessary information | |
1386 | # is only available when a container is deployed (given | |
1387 | # via spec). | |
1388 | image = None | |
f6b5b4d7 TL |
1389 | else: |
1390 | assert False, daemon_type | |
1391 | ||
1392 | self.log.debug('%s container image %s' % (daemon_name, image)) | |
1393 | ||
1394 | return image | |
1395 | ||
b3b6e05e | 1396 | def _schedulable_hosts(self) -> List[HostSpec]: |
f91f0fd5 | 1397 | """ |
f67539c2 | 1398 | Returns all usable hosts that went through _refresh_host_daemons(). |
9f95a23c | 1399 | |
f91f0fd5 TL |
1400 | This mitigates a potential race, where new host was added *after* |
1401 | ``_refresh_host_daemons()`` was called, but *before* | |
1402 | ``_apply_all_specs()`` was called. thus we end up with a hosts | |
1403 | where daemons might be running, but we have not yet detected them. | |
1404 | """ | |
1405 | return [ | |
1406 | h for h in self.inventory.all_specs() | |
b3b6e05e TL |
1407 | if ( |
1408 | self.cache.host_had_daemon_refresh(h.hostname) | |
b3b6e05e TL |
1409 | and '_no_schedule' not in h.labels |
1410 | ) | |
f91f0fd5 | 1411 | ] |
9f95a23c | 1412 | |
522d829b TL |
1413 | def _unreachable_hosts(self) -> List[HostSpec]: |
1414 | """ | |
1415 | Return all hosts that are offline or in maintenance mode. | |
1416 | ||
1417 | The idea is we should not touch the daemons on these hosts (since | |
1418 | in theory the hosts are inaccessible so we CAN'T touch them) but | |
1419 | we still want to count daemons that exist on these hosts toward the | |
1420 | placement so daemons on these hosts aren't just moved elsewhere | |
1421 | """ | |
1422 | return [ | |
1423 | h for h in self.inventory.all_specs() | |
1424 | if ( | |
1425 | h.status.lower() in ['maintenance', 'offline'] | |
1426 | or h.hostname in self.offline_hosts | |
1427 | ) | |
1428 | ] | |
1429 | ||
b3b6e05e | 1430 | def _check_valid_addr(self, host: str, addr: str) -> str: |
a4b75251 TL |
1431 | # make sure mgr is not resolving own ip |
1432 | if addr in self.get_mgr_id(): | |
1433 | raise OrchestratorError( | |
1434 | "Can not automatically resolve ip address of host where active mgr is running. Please explicitly provide the address.") | |
1435 | ||
b3b6e05e TL |
1436 | # make sure hostname is resolvable before trying to make a connection |
1437 | try: | |
1438 | ip_addr = utils.resolve_ip(addr) | |
1439 | except OrchestratorError as e: | |
1440 | msg = str(e) + f''' | |
1441 | You may need to supply an address for {addr} | |
1442 | ||
1443 | Please make sure that the host is reachable and accepts connections using the cephadm SSH key | |
1444 | To add the cephadm SSH key to the host: | |
1445 | > ceph cephadm get-pub-key > ~/ceph.pub | |
1446 | > ssh-copy-id -f -i ~/ceph.pub {self.ssh_user}@{addr} | |
1447 | ||
1448 | To check that the host is reachable open a new shell with the --no-hosts flag: | |
1449 | > cephadm shell --no-hosts | |
1450 | ||
1451 | Then run the following: | |
1452 | > ceph cephadm get-ssh-config > ssh_config | |
1453 | > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key | |
1454 | > chmod 0600 ~/cephadm_private_key | |
1455 | > ssh -F ssh_config -i ~/cephadm_private_key {self.ssh_user}@{addr}''' | |
1456 | raise OrchestratorError(msg) | |
1457 | ||
1458 | out, err, code = CephadmServe(self)._run_cephadm( | |
1459 | host, cephadmNoImage, 'check-host', | |
1460 | ['--expect-hostname', host], | |
1461 | addr=addr, | |
1462 | error_ok=True, no_fsid=True) | |
1463 | if code: | |
a4b75251 | 1464 | msg = 'check-host failed:\n' + '\n'.join(err) |
b3b6e05e TL |
1465 | # err will contain stdout and stderr, so we filter on the message text to |
1466 | # only show the errors | |
1467 | errors = [_i.replace("ERROR: ", "") for _i in err if _i.startswith('ERROR')] | |
a4b75251 TL |
1468 | if errors: |
1469 | msg = f'Host {host} ({addr}) failed check(s): {errors}' | |
1470 | raise OrchestratorError(msg) | |
b3b6e05e TL |
1471 | return ip_addr |
1472 | ||
e306af50 | 1473 | def _add_host(self, spec): |
9f95a23c TL |
1474 | # type: (HostSpec) -> str |
1475 | """ | |
1476 | Add a host to be managed by the orchestrator. | |
1477 | ||
1478 | :param host: host name | |
1479 | """ | |
1480 | assert_valid_host(spec.hostname) | |
b3b6e05e TL |
1481 | ip_addr = self._check_valid_addr(spec.hostname, spec.addr) |
1482 | if spec.addr == spec.hostname and ip_addr: | |
1483 | spec.addr = ip_addr | |
9f95a23c | 1484 | |
a4b75251 TL |
1485 | if spec.hostname in self.inventory and self.inventory.get_addr(spec.hostname) != spec.addr: |
1486 | self.cache.refresh_all_host_info(spec.hostname) | |
1487 | ||
b3b6e05e TL |
1488 | # prime crush map? |
1489 | if spec.location: | |
1490 | self.check_mon_command({ | |
1491 | 'prefix': 'osd crush add-bucket', | |
1492 | 'name': spec.hostname, | |
1493 | 'type': 'host', | |
1494 | 'args': [f'{k}={v}' for k, v in spec.location.items()], | |
1495 | }) | |
1496 | ||
1497 | if spec.hostname not in self.inventory: | |
1498 | self.cache.prime_empty_host(spec.hostname) | |
e306af50 | 1499 | self.inventory.add_host(spec) |
1911f103 | 1500 | self.offline_hosts_remove(spec.hostname) |
a4b75251 TL |
1501 | if spec.status == 'maintenance': |
1502 | self._set_maintenance_healthcheck() | |
9f95a23c TL |
1503 | self.event.set() # refresh stray health check |
1504 | self.log.info('Added host %s' % spec.hostname) | |
b3b6e05e | 1505 | return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr) |
9f95a23c | 1506 | |
f67539c2 | 1507 | @handle_orch_error |
e306af50 TL |
1508 | def add_host(self, spec: HostSpec) -> str: |
1509 | return self._add_host(spec) | |
1510 | ||
f67539c2 | 1511 | @handle_orch_error |
522d829b | 1512 | def remove_host(self, host: str, force: bool = False, offline: bool = False) -> str: |
9f95a23c TL |
1513 | """ |
1514 | Remove a host from orchestrator management. | |
1515 | ||
1516 | :param host: host name | |
522d829b TL |
1517 | :param force: bypass running daemons check |
1518 | :param offline: remove offline host | |
9f95a23c | 1519 | """ |
522d829b TL |
1520 | |
1521 | # check if host is offline | |
1522 | host_offline = host in self.offline_hosts | |
1523 | ||
1524 | if host_offline and not offline: | |
1525 | return "{} is offline, please use --offline and --force to remove this host. This can potentially cause data loss".format(host) | |
1526 | ||
1527 | if not host_offline and offline: | |
1528 | return "{} is online, please remove host without --offline.".format(host) | |
1529 | ||
1530 | if offline and not force: | |
1531 | return "Removing an offline host requires --force" | |
1532 | ||
1533 | # check if there are daemons on the host | |
1534 | if not force: | |
1535 | daemons = self.cache.get_daemons_by_host(host) | |
1536 | if daemons: | |
1537 | self.log.warning(f"Blocked {host} removal. Daemons running: {daemons}") | |
1538 | ||
1539 | daemons_table = "" | |
1540 | daemons_table += "{:<20} {:<15}\n".format("type", "id") | |
1541 | daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15) | |
1542 | for d in daemons: | |
1543 | daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id) | |
1544 | ||
1545 | return "Not allowed to remove %s from cluster. " \ | |
1546 | "The following daemons are running in the host:" \ | |
1547 | "\n%s\nPlease run 'ceph orch host drain %s' to remove daemons from host" % ( | |
1548 | host, daemons_table, host) | |
1549 | ||
1550 | def run_cmd(cmd_args: dict) -> None: | |
1551 | ret, out, err = self.mon_command(cmd_args) | |
1552 | if ret != 0: | |
1553 | self.log.debug(f"ran {cmd_args} with mon_command") | |
1554 | self.log.error( | |
1555 | f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})") | |
1556 | self.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}") | |
1557 | ||
1558 | if offline: | |
1559 | daemons = self.cache.get_daemons_by_host(host) | |
1560 | for d in daemons: | |
1561 | self.log.info(f"removing: {d.name()}") | |
1562 | ||
1563 | if d.daemon_type != 'osd': | |
1564 | self.cephadm_services[str(d.daemon_type)].pre_remove(d) | |
a4b75251 | 1565 | self.cephadm_services[str(d.daemon_type)].post_remove(d, is_failed_deploy=False) |
522d829b TL |
1566 | else: |
1567 | cmd_args = { | |
1568 | 'prefix': 'osd purge-actual', | |
1569 | 'id': int(str(d.daemon_id)), | |
1570 | 'yes_i_really_mean_it': True | |
1571 | } | |
1572 | run_cmd(cmd_args) | |
1573 | ||
1574 | cmd_args = { | |
1575 | 'prefix': 'osd crush rm', | |
1576 | 'name': host | |
1577 | } | |
1578 | run_cmd(cmd_args) | |
1579 | ||
e306af50 | 1580 | self.inventory.rm_host(host) |
9f95a23c TL |
1581 | self.cache.rm_host(host) |
1582 | self._reset_con(host) | |
1583 | self.event.set() # refresh stray health check | |
1584 | self.log.info('Removed host %s' % host) | |
522d829b | 1585 | return "Removed {} host '{}'".format('offline' if offline else '', host) |
9f95a23c | 1586 | |
f67539c2 | 1587 | @handle_orch_error |
adb31ebb | 1588 | def update_host_addr(self, host: str, addr: str) -> str: |
b3b6e05e | 1589 | self._check_valid_addr(host, addr) |
e306af50 | 1590 | self.inventory.set_addr(host, addr) |
9f95a23c TL |
1591 | self._reset_con(host) |
1592 | self.event.set() # refresh stray health check | |
1593 | self.log.info('Set host %s addr to %s' % (host, addr)) | |
1594 | return "Updated host '{}' addr to '{}'".format(host, addr) | |
1595 | ||
f67539c2 | 1596 | @handle_orch_error |
9f95a23c TL |
1597 | def get_hosts(self): |
1598 | # type: () -> List[orchestrator.HostSpec] | |
1599 | """ | |
1600 | Return a list of hosts managed by the orchestrator. | |
1601 | ||
1602 | Notes: | |
1603 | - skip async: manager reads from cache. | |
1604 | """ | |
e306af50 | 1605 | return list(self.inventory.all_specs()) |
9f95a23c | 1606 | |
a4b75251 TL |
1607 | @handle_orch_error |
1608 | def get_facts(self, hostname: Optional[str] = None) -> List[Dict[str, Any]]: | |
1609 | """ | |
1610 | Return a list of hosts metadata(gather_facts) managed by the orchestrator. | |
1611 | ||
1612 | Notes: | |
1613 | - skip async: manager reads from cache. | |
1614 | """ | |
1615 | if hostname: | |
1616 | return [self.cache.get_facts(hostname)] | |
1617 | ||
1618 | return [self.cache.get_facts(hostname) for hostname in self.cache.get_hosts()] | |
1619 | ||
f67539c2 | 1620 | @handle_orch_error |
adb31ebb | 1621 | def add_host_label(self, host: str, label: str) -> str: |
e306af50 | 1622 | self.inventory.add_label(host, label) |
9f95a23c | 1623 | self.log.info('Added label %s to host %s' % (label, host)) |
b3b6e05e | 1624 | self._kick_serve_loop() |
9f95a23c TL |
1625 | return 'Added label %s to host %s' % (label, host) |
1626 | ||
f67539c2 | 1627 | @handle_orch_error |
adb31ebb | 1628 | def remove_host_label(self, host: str, label: str) -> str: |
e306af50 | 1629 | self.inventory.rm_label(host, label) |
9f95a23c | 1630 | self.log.info('Removed label %s to host %s' % (label, host)) |
b3b6e05e | 1631 | self._kick_serve_loop() |
9f95a23c TL |
1632 | return 'Removed label %s from host %s' % (label, host) |
1633 | ||
f67539c2 TL |
1634 | def _host_ok_to_stop(self, hostname: str, force: bool = False) -> Tuple[int, str]: |
1635 | self.log.debug("running host-ok-to-stop checks") | |
f6b5b4d7 | 1636 | daemons = self.cache.get_daemons() |
f67539c2 | 1637 | daemon_map: Dict[str, List[str]] = defaultdict(lambda: []) |
f6b5b4d7 | 1638 | for dd in daemons: |
f67539c2 TL |
1639 | assert dd.hostname is not None |
1640 | assert dd.daemon_type is not None | |
1641 | assert dd.daemon_id is not None | |
f6b5b4d7 TL |
1642 | if dd.hostname == hostname: |
1643 | daemon_map[dd.daemon_type].append(dd.daemon_id) | |
1644 | ||
f67539c2 TL |
1645 | notifications: List[str] = [] |
1646 | error_notifications: List[str] = [] | |
1647 | okay: bool = True | |
f91f0fd5 | 1648 | for daemon_type, daemon_ids in daemon_map.items(): |
f67539c2 TL |
1649 | r = self.cephadm_services[daemon_type_to_service( |
1650 | daemon_type)].ok_to_stop(daemon_ids, force=force) | |
f6b5b4d7 | 1651 | if r.retval: |
f67539c2 TL |
1652 | okay = False |
1653 | # collect error notifications so user can see every daemon causing host | |
1654 | # to not be okay to stop | |
1655 | error_notifications.append(r.stderr) | |
1656 | if r.stdout: | |
1657 | # if extra notifications to print for user, add them to notifications list | |
1658 | notifications.append(r.stdout) | |
1659 | ||
1660 | if not okay: | |
1661 | # at least one daemon is not okay to stop | |
1662 | return 1, '\n'.join(error_notifications) | |
1663 | ||
1664 | if notifications: | |
1665 | return 0, (f'It is presumed safe to stop host {hostname}. ' | |
1666 | + 'Note the following:\n\n' + '\n'.join(notifications)) | |
1667 | return 0, f'It is presumed safe to stop host {hostname}' | |
1668 | ||
1669 | @handle_orch_error | |
1670 | def host_ok_to_stop(self, hostname: str) -> str: | |
1671 | if hostname not in self.cache.get_hosts(): | |
1672 | raise OrchestratorError(f'Cannot find host "{hostname}"') | |
1673 | ||
1674 | rc, msg = self._host_ok_to_stop(hostname) | |
1675 | if rc: | |
1676 | raise OrchestratorError(msg, errno=rc) | |
f6b5b4d7 | 1677 | |
f6b5b4d7 TL |
1678 | self.log.info(msg) |
1679 | return msg | |
1680 | ||
f67539c2 TL |
1681 | def _set_maintenance_healthcheck(self) -> None: |
1682 | """Raise/update or clear the maintenance health check as needed""" | |
1683 | ||
1684 | in_maintenance = self.inventory.get_host_with_state("maintenance") | |
1685 | if not in_maintenance: | |
a4b75251 | 1686 | self.remove_health_warning('HOST_IN_MAINTENANCE') |
f67539c2 TL |
1687 | else: |
1688 | s = "host is" if len(in_maintenance) == 1 else "hosts are" | |
a4b75251 TL |
1689 | self.set_health_warning("HOST_IN_MAINTENANCE", f"{len(in_maintenance)} {s} in maintenance mode", 1, [ |
1690 | f"{h} is in maintenance" for h in in_maintenance]) | |
f67539c2 TL |
1691 | |
1692 | @handle_orch_error | |
1693 | @host_exists() | |
1694 | def enter_host_maintenance(self, hostname: str, force: bool = False) -> str: | |
1695 | """ Attempt to place a cluster host in maintenance | |
1696 | ||
1697 | Placing a host into maintenance disables the cluster's ceph target in systemd | |
1698 | and stops all ceph daemons. If the host is an osd host we apply the noout flag | |
1699 | for the host subtree in crush to prevent data movement during a host maintenance | |
1700 | window. | |
1701 | ||
1702 | :param hostname: (str) name of the host (must match an inventory hostname) | |
1703 | ||
1704 | :raises OrchestratorError: Hostname is invalid, host is already in maintenance | |
1705 | """ | |
1706 | if len(self.cache.get_hosts()) == 1: | |
1707 | raise OrchestratorError("Maintenance feature is not supported on single node clusters") | |
1708 | ||
1709 | # if upgrade is active, deny | |
1710 | if self.upgrade.upgrade_state: | |
1711 | raise OrchestratorError( | |
1712 | f"Unable to place {hostname} in maintenance with upgrade active/paused") | |
1713 | ||
1714 | tgt_host = self.inventory._inventory[hostname] | |
1715 | if tgt_host.get("status", "").lower() == "maintenance": | |
1716 | raise OrchestratorError(f"Host {hostname} is already in maintenance") | |
1717 | ||
1718 | host_daemons = self.cache.get_daemon_types(hostname) | |
1719 | self.log.debug("daemons on host {}".format(','.join(host_daemons))) | |
1720 | if host_daemons: | |
1721 | # daemons on this host, so check the daemons can be stopped | |
1722 | # and if so, place the host into maintenance by disabling the target | |
1723 | rc, msg = self._host_ok_to_stop(hostname, force) | |
1724 | if rc: | |
1725 | raise OrchestratorError( | |
1726 | msg + '\nNote: Warnings can be bypassed with the --force flag', errno=rc) | |
1727 | ||
1728 | # call the host-maintenance function | |
1729 | _out, _err, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, "host-maintenance", | |
1730 | ["enter"], | |
1731 | error_ok=True) | |
a4b75251 TL |
1732 | returned_msg = _err[0].split('\n')[-1] |
1733 | if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'): | |
f67539c2 TL |
1734 | raise OrchestratorError( |
1735 | f"Failed to place {hostname} into maintenance for cluster {self._cluster_fsid}") | |
1736 | ||
1737 | if "osd" in host_daemons: | |
1738 | crush_node = hostname if '.' not in hostname else hostname.split('.')[0] | |
1739 | rc, out, err = self.mon_command({ | |
1740 | 'prefix': 'osd set-group', | |
1741 | 'flags': 'noout', | |
1742 | 'who': [crush_node], | |
1743 | 'format': 'json' | |
1744 | }) | |
1745 | if rc: | |
1746 | self.log.warning( | |
1747 | f"maintenance mode request for {hostname} failed to SET the noout group (rc={rc})") | |
1748 | raise OrchestratorError( | |
1749 | f"Unable to set the osds on {hostname} to noout (rc={rc})") | |
1750 | else: | |
1751 | self.log.info( | |
1752 | f"maintenance mode request for {hostname} has SET the noout group") | |
1753 | ||
1754 | # update the host status in the inventory | |
1755 | tgt_host["status"] = "maintenance" | |
1756 | self.inventory._inventory[hostname] = tgt_host | |
1757 | self.inventory.save() | |
1758 | ||
1759 | self._set_maintenance_healthcheck() | |
522d829b | 1760 | return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode' |
f67539c2 TL |
1761 | |
1762 | @handle_orch_error | |
1763 | @host_exists() | |
1764 | def exit_host_maintenance(self, hostname: str) -> str: | |
1765 | """Exit maintenance mode and return a host to an operational state | |
1766 | ||
1767 | Returning from maintnenance will enable the clusters systemd target and | |
1768 | start it, and remove any noout that has been added for the host if the | |
1769 | host has osd daemons | |
1770 | ||
1771 | :param hostname: (str) host name | |
1772 | ||
1773 | :raises OrchestratorError: Unable to return from maintenance, or unset the | |
1774 | noout flag | |
1775 | """ | |
1776 | tgt_host = self.inventory._inventory[hostname] | |
1777 | if tgt_host['status'] != "maintenance": | |
1778 | raise OrchestratorError(f"Host {hostname} is not in maintenance mode") | |
1779 | ||
1780 | outs, errs, _code = CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, 'host-maintenance', | |
1781 | ['exit'], | |
1782 | error_ok=True) | |
a4b75251 TL |
1783 | returned_msg = errs[0].split('\n')[-1] |
1784 | if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'): | |
f67539c2 TL |
1785 | raise OrchestratorError( |
1786 | f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}") | |
1787 | ||
1788 | if "osd" in self.cache.get_daemon_types(hostname): | |
1789 | crush_node = hostname if '.' not in hostname else hostname.split('.')[0] | |
1790 | rc, _out, _err = self.mon_command({ | |
1791 | 'prefix': 'osd unset-group', | |
1792 | 'flags': 'noout', | |
1793 | 'who': [crush_node], | |
1794 | 'format': 'json' | |
1795 | }) | |
1796 | if rc: | |
1797 | self.log.warning( | |
1798 | f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})") | |
1799 | raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})") | |
1800 | else: | |
1801 | self.log.info( | |
1802 | f"exit maintenance request has UNSET for the noout group on host {hostname}") | |
1803 | ||
1804 | # update the host record status | |
1805 | tgt_host['status'] = "" | |
1806 | self.inventory._inventory[hostname] = tgt_host | |
1807 | self.inventory.save() | |
1808 | ||
1809 | self._set_maintenance_healthcheck() | |
1810 | ||
1811 | return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode" | |
1812 | ||
f91f0fd5 TL |
1813 | def get_minimal_ceph_conf(self) -> str: |
1814 | _, config, _ = self.check_mon_command({ | |
f6b5b4d7 TL |
1815 | "prefix": "config generate-minimal-conf", |
1816 | }) | |
f91f0fd5 TL |
1817 | extra = self.extra_ceph_conf().conf |
1818 | if extra: | |
1819 | config += '\n\n' + extra.strip() + '\n' | |
1820 | return config | |
f6b5b4d7 | 1821 | |
adb31ebb | 1822 | def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None: |
f6b5b4d7 TL |
1823 | if filter_host: |
1824 | self.cache.invalidate_host_daemons(filter_host) | |
1825 | else: | |
1826 | for h in self.cache.get_hosts(): | |
1827 | # Also discover daemons deployed manually | |
1828 | self.cache.invalidate_host_daemons(h) | |
1829 | ||
1830 | self._kick_serve_loop() | |
1831 | ||
f67539c2 | 1832 | @handle_orch_error |
f6b5b4d7 TL |
1833 | def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, |
1834 | refresh: bool = False) -> List[orchestrator.ServiceDescription]: | |
9f95a23c | 1835 | if refresh: |
f6b5b4d7 | 1836 | self._invalidate_daemons_and_kick_serve() |
b3b6e05e | 1837 | self.log.debug('Kicked serve() loop to refresh all services') |
f6b5b4d7 | 1838 | |
f6b5b4d7 | 1839 | sm: Dict[str, orchestrator.ServiceDescription] = {} |
f67539c2 TL |
1840 | |
1841 | # known services | |
1842 | for nm, spec in self.spec_store.all_specs.items(): | |
1843 | if service_type is not None and service_type != spec.service_type: | |
1844 | continue | |
1845 | if service_name is not None and service_name != nm: | |
1846 | continue | |
1847 | sm[nm] = orchestrator.ServiceDescription( | |
1848 | spec=spec, | |
b3b6e05e | 1849 | size=spec.placement.get_target_count(self._schedulable_hosts()), |
f67539c2 TL |
1850 | running=0, |
1851 | events=self.events.get_for_service(spec.service_name()), | |
1852 | created=self.spec_store.spec_created[nm], | |
1853 | deleted=self.spec_store.spec_deleted.get(nm, None), | |
1854 | virtual_ip=spec.get_virtual_ip(), | |
1855 | ports=spec.get_port_start(), | |
1856 | ) | |
f67539c2 TL |
1857 | if spec.service_type == 'ingress': |
1858 | # ingress has 2 daemons running per host | |
1859 | sm[nm].size *= 2 | |
1860 | ||
1861 | # factor daemons into status | |
1911f103 | 1862 | for h, dm in self.cache.get_daemons_with_volatile_status(): |
9f95a23c | 1863 | for name, dd in dm.items(): |
f67539c2 TL |
1864 | assert dd.hostname is not None, f'no hostname for {dd!r}' |
1865 | assert dd.daemon_type is not None, f'no daemon_type for {dd!r}' | |
1866 | ||
9f95a23c | 1867 | n: str = dd.service_name() |
f67539c2 TL |
1868 | |
1869 | if ( | |
1870 | service_type | |
b3b6e05e | 1871 | and service_type != daemon_type_to_service(dd.daemon_type) |
f67539c2 TL |
1872 | ): |
1873 | continue | |
9f95a23c TL |
1874 | if service_name and service_name != n: |
1875 | continue | |
f67539c2 TL |
1876 | |
1877 | if n not in sm: | |
1878 | # new unmanaged service | |
1911f103 TL |
1879 | spec = ServiceSpec( |
1880 | unmanaged=True, | |
f67539c2 | 1881 | service_type=daemon_type_to_service(dd.daemon_type), |
1911f103 | 1882 | service_id=dd.service_id(), |
1911f103 | 1883 | ) |
9f95a23c | 1884 | sm[n] = orchestrator.ServiceDescription( |
9f95a23c TL |
1885 | last_refresh=dd.last_refresh, |
1886 | container_image_id=dd.container_image_id, | |
1887 | container_image_name=dd.container_image_name, | |
1888 | spec=spec, | |
f67539c2 | 1889 | size=0, |
9f95a23c | 1890 | ) |
f67539c2 TL |
1891 | |
1892 | if dd.status == DaemonDescriptionStatus.running: | |
9f95a23c | 1893 | sm[n].running += 1 |
f67539c2 TL |
1894 | if dd.daemon_type == 'osd': |
1895 | # The osd count can't be determined by the Placement spec. | |
1896 | # Showing an actual/expected representation cannot be determined | |
1897 | # here. So we're setting running = size for now. | |
1898 | sm[n].size += 1 | |
1899 | if ( | |
1900 | not sm[n].last_refresh | |
1901 | or not dd.last_refresh | |
1902 | or dd.last_refresh < sm[n].last_refresh # type: ignore | |
1903 | ): | |
9f95a23c | 1904 | sm[n].last_refresh = dd.last_refresh |
f67539c2 | 1905 | |
1911f103 | 1906 | return list(sm.values()) |
9f95a23c | 1907 | |
f67539c2 | 1908 | @handle_orch_error |
f6b5b4d7 TL |
1909 | def list_daemons(self, |
1910 | service_name: Optional[str] = None, | |
1911 | daemon_type: Optional[str] = None, | |
1912 | daemon_id: Optional[str] = None, | |
1913 | host: Optional[str] = None, | |
1914 | refresh: bool = False) -> List[orchestrator.DaemonDescription]: | |
9f95a23c | 1915 | if refresh: |
f6b5b4d7 | 1916 | self._invalidate_daemons_and_kick_serve(host) |
b3b6e05e | 1917 | self.log.debug('Kicked serve() loop to refresh all daemons') |
f6b5b4d7 | 1918 | |
9f95a23c | 1919 | result = [] |
1911f103 | 1920 | for h, dm in self.cache.get_daemons_with_volatile_status(): |
9f95a23c TL |
1921 | if host and h != host: |
1922 | continue | |
1923 | for name, dd in dm.items(): | |
801d1391 TL |
1924 | if daemon_type is not None and daemon_type != dd.daemon_type: |
1925 | continue | |
1926 | if daemon_id is not None and daemon_id != dd.daemon_id: | |
9f95a23c | 1927 | continue |
801d1391 | 1928 | if service_name is not None and service_name != dd.service_name(): |
9f95a23c | 1929 | continue |
b3b6e05e TL |
1930 | if not dd.memory_request and dd.daemon_type in ['osd', 'mon']: |
1931 | dd.memory_request = cast(Optional[int], self.get_foreign_ceph_option( | |
1932 | dd.name(), | |
1933 | f"{dd.daemon_type}_memory_target" | |
1934 | )) | |
9f95a23c TL |
1935 | result.append(dd) |
1936 | return result | |
1937 | ||
f67539c2 | 1938 | @handle_orch_error |
adb31ebb TL |
1939 | def service_action(self, action: str, service_name: str) -> List[str]: |
1940 | dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name) | |
f67539c2 TL |
1941 | if not dds: |
1942 | raise OrchestratorError(f'No daemons exist under service name "{service_name}".' | |
1943 | + ' View currently running services using "ceph orch ls"') | |
522d829b TL |
1944 | if action == 'stop' and service_name.split('.')[0].lower() in ['mgr', 'mon', 'osd']: |
1945 | return [f'Stopping entire {service_name} service is prohibited.'] | |
9f95a23c | 1946 | self.log.info('%s service %s' % (action.capitalize(), service_name)) |
adb31ebb TL |
1947 | return [ |
1948 | self._schedule_daemon_action(dd.name(), action) | |
1949 | for dd in dds | |
1950 | ] | |
f6b5b4d7 | 1951 | |
f67539c2 TL |
1952 | def _daemon_action(self, |
1953 | daemon_spec: CephadmDaemonDeploySpec, | |
1954 | action: str, | |
1955 | image: Optional[str] = None) -> str: | |
1956 | self._daemon_action_set_image(action, image, daemon_spec.daemon_type, | |
1957 | daemon_spec.daemon_id) | |
f6b5b4d7 | 1958 | |
b3b6e05e TL |
1959 | if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(daemon_spec.daemon_type, |
1960 | daemon_spec.daemon_id): | |
f67539c2 TL |
1961 | self.mgr_service.fail_over() |
1962 | return '' # unreachable | |
9f95a23c | 1963 | |
f67539c2 TL |
1964 | if action == 'redeploy' or action == 'reconfig': |
1965 | if daemon_spec.daemon_type != 'osd': | |
1966 | daemon_spec = self.cephadm_services[daemon_type_to_service( | |
1967 | daemon_spec.daemon_type)].prepare_create(daemon_spec) | |
1968 | return CephadmServe(self)._create_daemon(daemon_spec, reconfig=(action == 'reconfig')) | |
9f95a23c TL |
1969 | |
1970 | actions = { | |
1971 | 'start': ['reset-failed', 'start'], | |
1972 | 'stop': ['stop'], | |
1973 | 'restart': ['reset-failed', 'restart'], | |
1974 | } | |
f6b5b4d7 | 1975 | name = daemon_spec.name() |
9f95a23c | 1976 | for a in actions[action]: |
f6b5b4d7 | 1977 | try: |
f67539c2 TL |
1978 | out, err, code = CephadmServe(self)._run_cephadm( |
1979 | daemon_spec.host, name, 'unit', | |
f6b5b4d7 TL |
1980 | ['--name', name, a]) |
1981 | except Exception: | |
f67539c2 | 1982 | self.log.exception(f'`{daemon_spec.host}: cephadm unit {name} {a}` failed') |
f6b5b4d7 TL |
1983 | self.cache.invalidate_host_daemons(daemon_spec.host) |
1984 | msg = "{} {} from host '{}'".format(action, name, daemon_spec.host) | |
1985 | self.events.for_daemon(name, 'INFO', msg) | |
1986 | return msg | |
9f95a23c | 1987 | |
adb31ebb | 1988 | def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None: |
f91f0fd5 TL |
1989 | if image is not None: |
1990 | if action != 'redeploy': | |
1991 | raise OrchestratorError( | |
1992 | f'Cannot execute {action} with new image. `action` needs to be `redeploy`') | |
a4b75251 | 1993 | if daemon_type not in CEPH_IMAGE_TYPES: |
f91f0fd5 TL |
1994 | raise OrchestratorError( |
1995 | f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported ' | |
a4b75251 | 1996 | f'types are: {", ".join(CEPH_IMAGE_TYPES)}') |
f91f0fd5 TL |
1997 | |
1998 | self.check_mon_command({ | |
1999 | 'prefix': 'config set', | |
2000 | 'name': 'container_image', | |
2001 | 'value': image, | |
2002 | 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id), | |
2003 | }) | |
2004 | ||
f67539c2 | 2005 | @handle_orch_error |
f91f0fd5 | 2006 | def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str: |
f6b5b4d7 | 2007 | d = self.cache.get_daemon(daemon_name) |
f67539c2 TL |
2008 | assert d.daemon_type is not None |
2009 | assert d.daemon_id is not None | |
f6b5b4d7 | 2010 | |
b3b6e05e | 2011 | if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(d.daemon_type, d.daemon_id) \ |
f91f0fd5 TL |
2012 | and not self.mgr_service.mgr_map_has_standby(): |
2013 | raise OrchestratorError( | |
2014 | f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') | |
2015 | ||
2016 | self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id) | |
2017 | ||
2018 | self.log.info(f'Schedule {action} daemon {daemon_name}') | |
2019 | return self._schedule_daemon_action(daemon_name, action) | |
2020 | ||
2021 | def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool: | |
2022 | return daemon_type == 'mgr' and daemon_id == self.get_mgr_id() | |
2023 | ||
f67539c2 TL |
2024 | def get_active_mgr_digests(self) -> List[str]: |
2025 | digests = self.mgr_service.get_active_daemon( | |
2026 | self.cache.get_daemons_by_type('mgr')).container_image_digests | |
2027 | return digests if digests else [] | |
2028 | ||
adb31ebb | 2029 | def _schedule_daemon_action(self, daemon_name: str, action: str) -> str: |
f91f0fd5 | 2030 | dd = self.cache.get_daemon(daemon_name) |
f67539c2 TL |
2031 | assert dd.daemon_type is not None |
2032 | assert dd.daemon_id is not None | |
2033 | assert dd.hostname is not None | |
b3b6e05e | 2034 | if (action == 'redeploy' or action == 'restart') and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \ |
f91f0fd5 TL |
2035 | and not self.mgr_service.mgr_map_has_standby(): |
2036 | raise OrchestratorError( | |
2037 | f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') | |
2038 | self.cache.schedule_daemon_action(dd.hostname, dd.name(), action) | |
2039 | msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname) | |
2040 | self._kick_serve_loop() | |
2041 | return msg | |
9f95a23c | 2042 | |
f67539c2 | 2043 | @handle_orch_error |
9f95a23c | 2044 | def remove_daemons(self, names): |
e306af50 | 2045 | # type: (List[str]) -> List[str] |
9f95a23c TL |
2046 | args = [] |
2047 | for host, dm in self.cache.daemons.items(): | |
2048 | for name in names: | |
2049 | if name in dm: | |
2050 | args.append((name, host)) | |
2051 | if not args: | |
2052 | raise OrchestratorError('Unable to find daemon(s) %s' % (names)) | |
f67539c2 | 2053 | self.log.info('Remove daemons %s' % ' '.join([a[0] for a in args])) |
9f95a23c TL |
2054 | return self._remove_daemons(args) |
2055 | ||
f67539c2 | 2056 | @handle_orch_error |
a4b75251 | 2057 | def remove_service(self, service_name: str, force: bool = False) -> str: |
9f95a23c | 2058 | self.log.info('Remove service %s' % service_name) |
e306af50 | 2059 | self._trigger_preview_refresh(service_name=service_name) |
f67539c2 TL |
2060 | if service_name in self.spec_store: |
2061 | if self.spec_store[service_name].spec.service_type in ('mon', 'mgr'): | |
2062 | return f'Unable to remove {service_name} service.\n' \ | |
2063 | f'Note, you might want to mark the {service_name} service as "unmanaged"' | |
b3b6e05e | 2064 | |
a4b75251 TL |
2065 | # Report list of affected OSDs? |
2066 | if not force and service_name.startswith('osd.'): | |
2067 | osds_msg = {} | |
b3b6e05e TL |
2068 | for h, dm in self.cache.get_daemons_with_volatile_status(): |
2069 | osds_to_remove = [] | |
2070 | for name, dd in dm.items(): | |
2071 | if dd.daemon_type == 'osd' and (dd.service_name() == service_name or not dd.osdspec_affinity): | |
2072 | osds_to_remove.append(str(dd.daemon_id)) | |
2073 | if osds_to_remove: | |
2074 | osds_msg[h] = osds_to_remove | |
b3b6e05e | 2075 | if osds_msg: |
a4b75251 TL |
2076 | msg = '' |
2077 | for h, ls in osds_msg.items(): | |
2078 | msg += f'\thost {h}: {" ".join([f"osd.{id}" for id in ls])}' | |
2079 | raise OrchestratorError(f'If {service_name} is removed then the following OSDs will remain, --force to proceed anyway\n{msg}') | |
2080 | ||
2081 | found = self.spec_store.rm(service_name) | |
2082 | if found and service_name.startswith('osd.'): | |
2083 | self.spec_store.finally_rm(service_name) | |
2084 | self._kick_serve_loop() | |
2085 | return f'Removed service {service_name}' | |
9f95a23c | 2086 | |
f67539c2 | 2087 | @handle_orch_error |
adb31ebb | 2088 | def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: |
9f95a23c TL |
2089 | """ |
2090 | Return the storage inventory of hosts matching the given filter. | |
2091 | ||
2092 | :param host_filter: host filter | |
2093 | ||
2094 | TODO: | |
2095 | - add filtering by label | |
2096 | """ | |
2097 | if refresh: | |
f6b5b4d7 TL |
2098 | if host_filter and host_filter.hosts: |
2099 | for h in host_filter.hosts: | |
b3b6e05e | 2100 | self.log.debug(f'will refresh {h} devs') |
f6b5b4d7 | 2101 | self.cache.invalidate_host_devices(h) |
9f95a23c | 2102 | else: |
f6b5b4d7 | 2103 | for h in self.cache.get_hosts(): |
b3b6e05e | 2104 | self.log.debug(f'will refresh {h} devs') |
f6b5b4d7 TL |
2105 | self.cache.invalidate_host_devices(h) |
2106 | ||
2107 | self.event.set() | |
b3b6e05e | 2108 | self.log.debug('Kicked serve() loop to refresh devices') |
9f95a23c TL |
2109 | |
2110 | result = [] | |
2111 | for host, dls in self.cache.devices.items(): | |
f6b5b4d7 | 2112 | if host_filter and host_filter.hosts and host not in host_filter.hosts: |
9f95a23c TL |
2113 | continue |
2114 | result.append(orchestrator.InventoryHost(host, | |
2115 | inventory.Devices(dls))) | |
2116 | return result | |
2117 | ||
f67539c2 | 2118 | @handle_orch_error |
adb31ebb | 2119 | def zap_device(self, host: str, path: str) -> str: |
a4b75251 TL |
2120 | """Zap a device on a managed host. |
2121 | ||
2122 | Use ceph-volume zap to return a device to an unused/free state | |
2123 | ||
2124 | Args: | |
2125 | host (str): hostname of the cluster host | |
2126 | path (str): device path | |
2127 | ||
2128 | Raises: | |
2129 | OrchestratorError: host is not a cluster host | |
2130 | OrchestratorError: host is in maintenance and therefore unavailable | |
2131 | OrchestratorError: device path not found on the host | |
2132 | OrchestratorError: device is known to a different ceph cluster | |
2133 | OrchestratorError: device holds active osd | |
2134 | OrchestratorError: device cache hasn't been populated yet.. | |
2135 | ||
2136 | Returns: | |
2137 | str: output from the zap command | |
2138 | """ | |
2139 | ||
9f95a23c | 2140 | self.log.info('Zap device %s:%s' % (host, path)) |
a4b75251 TL |
2141 | |
2142 | if host not in self.inventory.keys(): | |
2143 | raise OrchestratorError( | |
2144 | f"Host '{host}' is not a member of the cluster") | |
2145 | ||
2146 | host_info = self.inventory._inventory.get(host, {}) | |
2147 | if host_info.get('status', '').lower() == 'maintenance': | |
2148 | raise OrchestratorError( | |
2149 | f"Host '{host}' is in maintenance mode, which prevents any actions against it.") | |
2150 | ||
2151 | if host not in self.cache.devices: | |
2152 | raise OrchestratorError( | |
2153 | f"Host '{host} hasn't been scanned yet to determine it's inventory. Please try again later.") | |
2154 | ||
2155 | host_devices = self.cache.devices[host] | |
2156 | path_found = False | |
2157 | osd_id_list: List[str] = [] | |
2158 | ||
2159 | for dev in host_devices: | |
2160 | if dev.path == path: | |
2161 | # match, so look a little deeper | |
2162 | if dev.lvs: | |
2163 | for lv in cast(List[Dict[str, str]], dev.lvs): | |
2164 | if lv.get('osd_id', ''): | |
2165 | lv_fsid = lv.get('cluster_fsid') | |
2166 | if lv_fsid != self._cluster_fsid: | |
2167 | raise OrchestratorError( | |
2168 | f"device {path} has lv's from a different Ceph cluster ({lv_fsid})") | |
2169 | osd_id_list.append(lv.get('osd_id', '')) | |
2170 | path_found = True | |
2171 | break | |
2172 | if not path_found: | |
2173 | raise OrchestratorError( | |
2174 | f"Device path '{path}' not found on host '{host}'") | |
2175 | ||
2176 | if osd_id_list: | |
2177 | dev_name = os.path.basename(path) | |
2178 | active_osds: List[str] = [] | |
2179 | for osd_id in osd_id_list: | |
2180 | metadata = self.get_metadata('osd', str(osd_id)) | |
2181 | if metadata: | |
2182 | if metadata.get('hostname', '') == host and dev_name in metadata.get('devices', '').split(','): | |
2183 | active_osds.append("osd." + osd_id) | |
2184 | if active_osds: | |
2185 | raise OrchestratorError( | |
2186 | f"Unable to zap: device '{path}' on {host} has {len(active_osds)} active " | |
2187 | f"OSD{'s' if len(active_osds) > 1 else ''}" | |
2188 | f" ({', '.join(active_osds)}). Use 'ceph orch osd rm' first.") | |
2189 | ||
f67539c2 | 2190 | out, err, code = CephadmServe(self)._run_cephadm( |
9f95a23c TL |
2191 | host, 'osd', 'ceph-volume', |
2192 | ['--', 'lvm', 'zap', '--destroy', path], | |
2193 | error_ok=True) | |
a4b75251 | 2194 | |
9f95a23c TL |
2195 | self.cache.invalidate_host_devices(host) |
2196 | if code: | |
2197 | raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) | |
a4b75251 TL |
2198 | msg = f'zap successful for {path} on {host}' |
2199 | self.log.info(msg) | |
2200 | ||
2201 | return msg + '\n' | |
9f95a23c | 2202 | |
f67539c2 | 2203 | @handle_orch_error |
f91f0fd5 TL |
2204 | def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: |
2205 | """ | |
2206 | Blink a device light. Calling something like:: | |
2207 | ||
2208 | lsmcli local-disk-ident-led-on --path $path | |
2209 | ||
2210 | If you must, you can customize this via:: | |
2211 | ||
2212 | ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>' | |
2213 | ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>' | |
2214 | ||
2215 | See templates/blink_device_light_cmd.j2 | |
2216 | """ | |
e306af50 | 2217 | @forall_hosts |
adb31ebb | 2218 | def blink(host: str, dev: str, path: str) -> str: |
f91f0fd5 TL |
2219 | cmd_line = self.template.render('blink_device_light_cmd.j2', |
2220 | { | |
2221 | 'on': on, | |
2222 | 'ident_fault': ident_fault, | |
2223 | 'dev': dev, | |
2224 | 'path': path | |
2225 | }, | |
2226 | host=host) | |
2227 | cmd_args = shlex.split(cmd_line) | |
2228 | ||
f67539c2 | 2229 | out, err, code = CephadmServe(self)._run_cephadm( |
f91f0fd5 | 2230 | host, 'osd', 'shell', ['--'] + cmd_args, |
9f95a23c TL |
2231 | error_ok=True) |
2232 | if code: | |
f6b5b4d7 | 2233 | raise OrchestratorError( |
9f95a23c | 2234 | 'Unable to affect %s light for %s:%s. Command: %s' % ( |
f91f0fd5 | 2235 | ident_fault, host, dev, ' '.join(cmd_args))) |
9f95a23c TL |
2236 | self.log.info('Set %s light for %s:%s %s' % ( |
2237 | ident_fault, host, dev, 'on' if on else 'off')) | |
2238 | return "Set %s light for %s:%s %s" % ( | |
2239 | ident_fault, host, dev, 'on' if on else 'off') | |
2240 | ||
2241 | return blink(locs) | |
2242 | ||
2243 | def get_osd_uuid_map(self, only_up=False): | |
1911f103 | 2244 | # type: (bool) -> Dict[str, str] |
9f95a23c TL |
2245 | osd_map = self.get('osd_map') |
2246 | r = {} | |
2247 | for o in osd_map['osds']: | |
2248 | # only include OSDs that have ever started in this map. this way | |
2249 | # an interrupted osd create can be repeated and succeed the second | |
2250 | # time around. | |
1911f103 TL |
2251 | osd_id = o.get('osd') |
2252 | if osd_id is None: | |
2253 | raise OrchestratorError("Could not retrieve osd_id from osd_map") | |
2254 | if not only_up or (o['up_from'] > 0): | |
2255 | r[str(osd_id)] = o.get('uuid', '') | |
9f95a23c TL |
2256 | return r |
2257 | ||
e306af50 TL |
2258 | def _trigger_preview_refresh(self, |
2259 | specs: Optional[List[DriveGroupSpec]] = None, | |
f6b5b4d7 TL |
2260 | service_name: Optional[str] = None, |
2261 | ) -> None: | |
2262 | # Only trigger a refresh when a spec has changed | |
2263 | trigger_specs = [] | |
2264 | if specs: | |
2265 | for spec in specs: | |
2266 | preview_spec = self.spec_store.spec_preview.get(spec.service_name()) | |
2267 | # the to-be-preview spec != the actual spec, this means we need to | |
2268 | # trigger a refresh, if the spec has been removed (==None) we need to | |
2269 | # refresh as well. | |
2270 | if not preview_spec or spec != preview_spec: | |
2271 | trigger_specs.append(spec) | |
2272 | if service_name: | |
2273 | trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))] | |
2274 | if not any(trigger_specs): | |
2275 | return None | |
2276 | ||
2277 | refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs) | |
e306af50 TL |
2278 | for host in refresh_hosts: |
2279 | self.log.info(f"Marking host: {host} for OSDSpec preview refresh.") | |
2280 | self.cache.osdspec_previews_refresh_queue.append(host) | |
2281 | ||
f67539c2 | 2282 | @handle_orch_error |
f6b5b4d7 TL |
2283 | def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]: |
2284 | """ | |
2285 | Deprecated. Please use `apply()` instead. | |
2286 | ||
2287 | Keeping this around to be compapatible to mgr/dashboard | |
2288 | """ | |
9f95a23c TL |
2289 | return [self._apply(spec) for spec in specs] |
2290 | ||
f67539c2 | 2291 | @handle_orch_error |
f6b5b4d7 TL |
2292 | def create_osds(self, drive_group: DriveGroupSpec) -> str: |
2293 | return self.osd_service.create_from_spec(drive_group) | |
9f95a23c | 2294 | |
f6b5b4d7 TL |
2295 | def _preview_osdspecs(self, |
2296 | osdspecs: Optional[List[DriveGroupSpec]] = None | |
adb31ebb | 2297 | ) -> dict: |
f6b5b4d7 TL |
2298 | if not osdspecs: |
2299 | return {'n/a': [{'error': True, | |
2300 | 'message': 'No OSDSpec or matching hosts found.'}]} | |
2301 | matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs) | |
e306af50 TL |
2302 | if not matching_hosts: |
2303 | return {'n/a': [{'error': True, | |
2304 | 'message': 'No OSDSpec or matching hosts found.'}]} | |
adb31ebb | 2305 | # Is any host still loading previews or still in the queue to be previewed |
e306af50 | 2306 | pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts} |
adb31ebb | 2307 | if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts): |
e306af50 TL |
2308 | # Report 'pending' when any of the matching hosts is still loading previews (flag is True) |
2309 | return {'n/a': [{'error': True, | |
2310 | 'message': 'Preview data is being generated.. ' | |
f6b5b4d7 TL |
2311 | 'Please re-run this command in a bit.'}]} |
2312 | # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs | |
2313 | previews_for_specs = {} | |
2314 | for host, raw_reports in self.cache.osdspec_previews.items(): | |
2315 | if host not in matching_hosts: | |
2316 | continue | |
2317 | osd_reports = [] | |
2318 | for osd_report in raw_reports: | |
2319 | if osd_report.get('osdspec') in [x.service_id for x in osdspecs]: | |
2320 | osd_reports.append(osd_report) | |
2321 | previews_for_specs.update({host: osd_reports}) | |
2322 | return previews_for_specs | |
9f95a23c | 2323 | |
f67539c2 TL |
2324 | def _calc_daemon_deps(self, |
2325 | spec: Optional[ServiceSpec], | |
2326 | daemon_type: str, | |
2327 | daemon_id: str) -> List[str]: | |
9f95a23c | 2328 | deps = [] |
f67539c2 TL |
2329 | if daemon_type == 'haproxy': |
2330 | # because cephadm creates new daemon instances whenever | |
2331 | # port or ip changes, identifying daemons by name is | |
2332 | # sufficient to detect changes. | |
2333 | if not spec: | |
2334 | return [] | |
2335 | ingress_spec = cast(IngressSpec, spec) | |
2336 | assert ingress_spec.backend_service | |
2337 | daemons = self.cache.get_daemons_by_service(ingress_spec.backend_service) | |
2338 | deps = [d.name() for d in daemons] | |
2339 | elif daemon_type == 'keepalived': | |
2340 | # because cephadm creates new daemon instances whenever | |
2341 | # port or ip changes, identifying daemons by name is | |
2342 | # sufficient to detect changes. | |
2343 | if not spec: | |
2344 | return [] | |
2345 | daemons = self.cache.get_daemons_by_service(spec.service_name()) | |
2346 | deps = [d.name() for d in daemons if d.daemon_type == 'haproxy'] | |
a4b75251 TL |
2347 | elif daemon_type == 'iscsi': |
2348 | deps = [self.get_mgr_ip()] | |
f67539c2 TL |
2349 | else: |
2350 | need = { | |
2351 | 'prometheus': ['mgr', 'alertmanager', 'node-exporter', 'ingress'], | |
2352 | 'grafana': ['prometheus'], | |
2353 | 'alertmanager': ['mgr', 'alertmanager'], | |
2354 | } | |
2355 | for dep_type in need.get(daemon_type, []): | |
2356 | for dd in self.cache.get_daemons_by_type(dep_type): | |
2357 | deps.append(dd.name()) | |
9f95a23c TL |
2358 | return sorted(deps) |
2359 | ||
e306af50 | 2360 | @forall_hosts |
adb31ebb | 2361 | def _remove_daemons(self, name: str, host: str) -> str: |
f67539c2 | 2362 | return CephadmServe(self)._remove_daemon(name, host) |
9f95a23c | 2363 | |
adb31ebb | 2364 | def _check_pool_exists(self, pool: str, service_name: str) -> None: |
e306af50 TL |
2365 | logger.info(f'Checking pool "{pool}" exists for service {service_name}') |
2366 | if not self.rados.pool_exists(pool): | |
2367 | raise OrchestratorError(f'Cannot find pool "{pool}" for ' | |
2368 | f'service {service_name}') | |
2369 | ||
adb31ebb TL |
2370 | def _add_daemon(self, |
2371 | daemon_type: str, | |
f67539c2 | 2372 | spec: ServiceSpec) -> List[str]: |
9f95a23c TL |
2373 | """ |
2374 | Add (and place) a daemon. Require explicit host placement. Do not | |
2375 | schedule, and do not apply the related scheduling limitations. | |
2376 | """ | |
f67539c2 TL |
2377 | if spec.service_name() not in self.spec_store: |
2378 | raise OrchestratorError('Unable to add a Daemon without Service.\n' | |
2379 | 'Please use `ceph orch apply ...` to create a Service.\n' | |
2380 | 'Note, you might want to create the service with "unmanaged=true"') | |
2381 | ||
9f95a23c TL |
2382 | self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement)) |
2383 | if not spec.placement.hosts: | |
2384 | raise OrchestratorError('must specify host(s) to deploy on') | |
2385 | count = spec.placement.count or len(spec.placement.hosts) | |
2386 | daemons = self.cache.get_daemons_by_service(spec.service_name()) | |
2387 | return self._create_daemons(daemon_type, spec, daemons, | |
f67539c2 | 2388 | spec.placement.hosts, count) |
9f95a23c | 2389 | |
adb31ebb TL |
2390 | def _create_daemons(self, |
2391 | daemon_type: str, | |
2392 | spec: ServiceSpec, | |
2393 | daemons: List[DaemonDescription], | |
2394 | hosts: List[HostPlacementSpec], | |
f67539c2 | 2395 | count: int) -> List[str]: |
9f95a23c TL |
2396 | if count > len(hosts): |
2397 | raise OrchestratorError('too few hosts: want %d, have %s' % ( | |
2398 | count, hosts)) | |
2399 | ||
f6b5b4d7 | 2400 | did_config = False |
f67539c2 | 2401 | service_type = daemon_type_to_service(daemon_type) |
9f95a23c | 2402 | |
f67539c2 | 2403 | args = [] # type: List[CephadmDaemonDeploySpec] |
9f95a23c TL |
2404 | for host, network, name in hosts: |
2405 | daemon_id = self.get_unique_name(daemon_type, host, daemons, | |
e306af50 TL |
2406 | prefix=spec.service_id, |
2407 | forcename=name) | |
f6b5b4d7 | 2408 | |
f67539c2 | 2409 | if not did_config: |
522d829b | 2410 | self.cephadm_services[service_type].config(spec) |
f6b5b4d7 TL |
2411 | did_config = True |
2412 | ||
f67539c2 TL |
2413 | daemon_spec = self.cephadm_services[service_type].make_daemon_spec( |
2414 | host, daemon_id, network, spec, | |
2415 | # NOTE: this does not consider port conflicts! | |
2416 | ports=spec.get_port_start()) | |
9f95a23c TL |
2417 | self.log.debug('Placing %s.%s on host %s' % ( |
2418 | daemon_type, daemon_id, host)) | |
f6b5b4d7 | 2419 | args.append(daemon_spec) |
9f95a23c TL |
2420 | |
2421 | # add to daemon list so next name(s) will also be unique | |
2422 | sd = orchestrator.DaemonDescription( | |
2423 | hostname=host, | |
2424 | daemon_type=daemon_type, | |
2425 | daemon_id=daemon_id, | |
2426 | ) | |
2427 | daemons.append(sd) | |
2428 | ||
f67539c2 | 2429 | @ forall_hosts |
adb31ebb | 2430 | def create_func_map(*args: Any) -> str: |
f67539c2 TL |
2431 | daemon_spec = self.cephadm_services[daemon_type].prepare_create(*args) |
2432 | return CephadmServe(self)._create_daemon(daemon_spec) | |
9f95a23c TL |
2433 | |
2434 | return create_func_map(args) | |
2435 | ||
f67539c2 TL |
2436 | @handle_orch_error |
2437 | def add_daemon(self, spec: ServiceSpec) -> List[str]: | |
2438 | ret: List[str] = [] | |
2439 | for d_type in service_to_daemon_types(spec.service_type): | |
2440 | ret.extend(self._add_daemon(d_type, spec)) | |
2441 | return ret | |
2442 | ||
2443 | @handle_orch_error | |
adb31ebb | 2444 | def apply_mon(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2445 | return self._apply(spec) |
2446 | ||
e306af50 TL |
2447 | def _apply(self, spec: GenericSpec) -> str: |
2448 | if spec.service_type == 'host': | |
2449 | return self._add_host(cast(HostSpec, spec)) | |
9f95a23c | 2450 | |
f6b5b4d7 TL |
2451 | if spec.service_type == 'osd': |
2452 | # _trigger preview refresh needs to be smart and | |
2453 | # should only refresh if a change has been detected | |
2454 | self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)]) | |
2455 | ||
e306af50 | 2456 | return self._apply_service_spec(cast(ServiceSpec, spec)) |
9f95a23c | 2457 | |
a4b75251 TL |
2458 | def set_health_warning(self, name: str, summary: str, count: int, detail: List[str]) -> None: |
2459 | self.health_checks[name] = { | |
2460 | 'severity': 'warning', | |
2461 | 'summary': summary, | |
2462 | 'count': count, | |
2463 | 'detail': detail, | |
2464 | } | |
2465 | self.set_health_checks(self.health_checks) | |
2466 | ||
2467 | def remove_health_warning(self, name: str) -> None: | |
2468 | if name in self.health_checks: | |
2469 | del self.health_checks[name] | |
2470 | self.set_health_checks(self.health_checks) | |
2471 | ||
adb31ebb | 2472 | def _plan(self, spec: ServiceSpec) -> dict: |
f6b5b4d7 TL |
2473 | if spec.service_type == 'osd': |
2474 | return {'service_name': spec.service_name(), | |
2475 | 'service_type': spec.service_type, | |
2476 | 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])} | |
2477 | ||
b3b6e05e | 2478 | svc = self.cephadm_services[spec.service_type] |
f6b5b4d7 TL |
2479 | ha = HostAssignment( |
2480 | spec=spec, | |
b3b6e05e | 2481 | hosts=self._schedulable_hosts(), |
522d829b | 2482 | unreachable_hosts=self._unreachable_hosts(), |
f67539c2 TL |
2483 | networks=self.cache.networks, |
2484 | daemons=self.cache.get_daemons_by_service(spec.service_name()), | |
b3b6e05e TL |
2485 | allow_colo=svc.allow_colo(), |
2486 | rank_map=self.spec_store[spec.service_name()].rank_map if svc.ranked() else None | |
f6b5b4d7 TL |
2487 | ) |
2488 | ha.validate() | |
f67539c2 | 2489 | hosts, to_add, to_remove = ha.place() |
f6b5b4d7 TL |
2490 | |
2491 | return { | |
2492 | 'service_name': spec.service_name(), | |
2493 | 'service_type': spec.service_type, | |
f67539c2 | 2494 | 'add': [hs.hostname for hs in to_add], |
b3b6e05e | 2495 | 'remove': [d.name() for d in to_remove] |
f6b5b4d7 TL |
2496 | } |
2497 | ||
f67539c2 TL |
2498 | @handle_orch_error |
2499 | def plan(self, specs: Sequence[GenericSpec]) -> List: | |
f6b5b4d7 TL |
2500 | results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n' |
2501 | 'to the current inventory setup. If any on these conditions changes, the \n' | |
2502 | 'preview will be invalid. Please make sure to have a minimal \n' | |
2503 | 'timeframe between planning and applying the specs.'}] | |
2504 | if any([spec.service_type == 'host' for spec in specs]): | |
2505 | return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}] | |
2506 | for spec in specs: | |
2507 | results.append(self._plan(cast(ServiceSpec, spec))) | |
2508 | return results | |
2509 | ||
e306af50 | 2510 | def _apply_service_spec(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2511 | if spec.placement.is_empty(): |
2512 | # fill in default placement | |
2513 | defaults = { | |
2514 | 'mon': PlacementSpec(count=5), | |
2515 | 'mgr': PlacementSpec(count=2), | |
2516 | 'mds': PlacementSpec(count=2), | |
2517 | 'rgw': PlacementSpec(count=2), | |
f67539c2 | 2518 | 'ingress': PlacementSpec(count=2), |
1911f103 | 2519 | 'iscsi': PlacementSpec(count=1), |
9f95a23c | 2520 | 'rbd-mirror': PlacementSpec(count=2), |
f67539c2 | 2521 | 'cephfs-mirror': PlacementSpec(count=1), |
801d1391 | 2522 | 'nfs': PlacementSpec(count=1), |
9f95a23c TL |
2523 | 'grafana': PlacementSpec(count=1), |
2524 | 'alertmanager': PlacementSpec(count=1), | |
2525 | 'prometheus': PlacementSpec(count=1), | |
2526 | 'node-exporter': PlacementSpec(host_pattern='*'), | |
2527 | 'crash': PlacementSpec(host_pattern='*'), | |
f91f0fd5 | 2528 | 'container': PlacementSpec(count=1), |
f67539c2 | 2529 | 'cephadm-exporter': PlacementSpec(host_pattern='*'), |
9f95a23c TL |
2530 | } |
2531 | spec.placement = defaults[spec.service_type] | |
2532 | elif spec.service_type in ['mon', 'mgr'] and \ | |
f91f0fd5 TL |
2533 | spec.placement.count is not None and \ |
2534 | spec.placement.count < 1: | |
9f95a23c TL |
2535 | raise OrchestratorError('cannot scale %s service below 1' % ( |
2536 | spec.service_type)) | |
2537 | ||
f67539c2 TL |
2538 | host_count = len(self.inventory.keys()) |
2539 | max_count = self.max_count_per_host | |
2540 | ||
2541 | if spec.placement.count is not None: | |
2542 | if spec.service_type in ['mon', 'mgr']: | |
2543 | if spec.placement.count > max(5, host_count): | |
2544 | raise OrchestratorError( | |
2545 | (f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {max(5, host_count)}.')) | |
2546 | elif spec.service_type != 'osd': | |
2547 | if spec.placement.count > (max_count * host_count): | |
2548 | raise OrchestratorError((f'The maximum number of {spec.service_type} daemons allowed with {host_count} hosts is {host_count*max_count} ({host_count}x{max_count}).' | |
2549 | + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option')) | |
2550 | ||
2551 | if spec.placement.count_per_host is not None and spec.placement.count_per_host > max_count and spec.service_type != 'osd': | |
2552 | raise OrchestratorError((f'The maximum count_per_host allowed is {max_count}.' | |
2553 | + ' This limit can be adjusted by changing the mgr/cephadm/max_count_per_host config option')) | |
2554 | ||
9f95a23c TL |
2555 | HostAssignment( |
2556 | spec=spec, | |
f91f0fd5 | 2557 | hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh |
522d829b | 2558 | unreachable_hosts=self._unreachable_hosts(), |
f67539c2 TL |
2559 | networks=self.cache.networks, |
2560 | daemons=self.cache.get_daemons_by_service(spec.service_name()), | |
2561 | allow_colo=self.cephadm_services[spec.service_type].allow_colo(), | |
9f95a23c TL |
2562 | ).validate() |
2563 | ||
2564 | self.log.info('Saving service %s spec with placement %s' % ( | |
2565 | spec.service_name(), spec.placement.pretty_str())) | |
2566 | self.spec_store.save(spec) | |
2567 | self._kick_serve_loop() | |
1911f103 | 2568 | return "Scheduled %s update..." % spec.service_name() |
9f95a23c | 2569 | |
f67539c2 TL |
2570 | @handle_orch_error |
2571 | def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]: | |
e306af50 TL |
2572 | results = [] |
2573 | for spec in specs: | |
f67539c2 TL |
2574 | if no_overwrite: |
2575 | if spec.service_type == 'host' and cast(HostSpec, spec).hostname in self.inventory: | |
2576 | results.append('Skipped %s host spec. To change %s spec omit --no-overwrite flag' | |
2577 | % (cast(HostSpec, spec).hostname, spec.service_type)) | |
2578 | continue | |
2579 | elif cast(ServiceSpec, spec).service_name() in self.spec_store: | |
2580 | results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag' | |
2581 | % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name())) | |
2582 | continue | |
e306af50 TL |
2583 | results.append(self._apply(spec)) |
2584 | return results | |
9f95a23c | 2585 | |
f67539c2 | 2586 | @handle_orch_error |
adb31ebb | 2587 | def apply_mgr(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2588 | return self._apply(spec) |
2589 | ||
f67539c2 | 2590 | @handle_orch_error |
f6b5b4d7 | 2591 | def apply_mds(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2592 | return self._apply(spec) |
2593 | ||
f67539c2 | 2594 | @handle_orch_error |
adb31ebb | 2595 | def apply_rgw(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2596 | return self._apply(spec) |
2597 | ||
f67539c2 TL |
2598 | @handle_orch_error |
2599 | def apply_ingress(self, spec: ServiceSpec) -> str: | |
2600 | return self._apply(spec) | |
1911f103 | 2601 | |
f67539c2 | 2602 | @handle_orch_error |
adb31ebb | 2603 | def apply_iscsi(self, spec: ServiceSpec) -> str: |
1911f103 TL |
2604 | return self._apply(spec) |
2605 | ||
f67539c2 | 2606 | @handle_orch_error |
adb31ebb | 2607 | def apply_rbd_mirror(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2608 | return self._apply(spec) |
2609 | ||
f67539c2 | 2610 | @handle_orch_error |
adb31ebb | 2611 | def apply_nfs(self, spec: ServiceSpec) -> str: |
801d1391 TL |
2612 | return self._apply(spec) |
2613 | ||
9f95a23c TL |
2614 | def _get_dashboard_url(self): |
2615 | # type: () -> str | |
2616 | return self.get('mgr_map').get('services', {}).get('dashboard', '') | |
2617 | ||
f67539c2 | 2618 | @handle_orch_error |
adb31ebb | 2619 | def apply_prometheus(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2620 | return self._apply(spec) |
2621 | ||
f67539c2 | 2622 | @handle_orch_error |
adb31ebb | 2623 | def apply_node_exporter(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2624 | return self._apply(spec) |
2625 | ||
f67539c2 | 2626 | @handle_orch_error |
adb31ebb | 2627 | def apply_crash(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2628 | return self._apply(spec) |
2629 | ||
f67539c2 | 2630 | @handle_orch_error |
f6b5b4d7 | 2631 | def apply_grafana(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2632 | return self._apply(spec) |
2633 | ||
f67539c2 | 2634 | @handle_orch_error |
f6b5b4d7 | 2635 | def apply_alertmanager(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2636 | return self._apply(spec) |
2637 | ||
f67539c2 | 2638 | @handle_orch_error |
f91f0fd5 TL |
2639 | def apply_container(self, spec: ServiceSpec) -> str: |
2640 | return self._apply(spec) | |
2641 | ||
f67539c2 TL |
2642 | @handle_orch_error |
2643 | def apply_cephadm_exporter(self, spec: ServiceSpec) -> str: | |
2644 | return self._apply(spec) | |
2645 | ||
2646 | @handle_orch_error | |
adb31ebb | 2647 | def upgrade_check(self, image: str, version: str) -> str: |
f67539c2 TL |
2648 | if self.inventory.get_host_with_state("maintenance"): |
2649 | raise OrchestratorError("check aborted - you have hosts in maintenance state") | |
2650 | ||
9f95a23c TL |
2651 | if version: |
2652 | target_name = self.container_image_base + ':v' + version | |
2653 | elif image: | |
2654 | target_name = image | |
2655 | else: | |
2656 | raise OrchestratorError('must specify either image or version') | |
2657 | ||
f67539c2 | 2658 | image_info = CephadmServe(self)._get_container_image_info(target_name) |
b3b6e05e TL |
2659 | |
2660 | ceph_image_version = image_info.ceph_version | |
2661 | if not ceph_image_version: | |
2662 | return f'Unable to extract ceph version from {target_name}.' | |
2663 | if ceph_image_version.startswith('ceph version '): | |
2664 | ceph_image_version = ceph_image_version.split(' ')[2] | |
2665 | version_error = self.upgrade._check_target_version(ceph_image_version) | |
2666 | if version_error: | |
2667 | return f'Incompatible upgrade: {version_error}' | |
2668 | ||
f91f0fd5 | 2669 | self.log.debug(f'image info {image} -> {image_info}') |
adb31ebb | 2670 | r: dict = { |
9f95a23c | 2671 | 'target_name': target_name, |
f91f0fd5 TL |
2672 | 'target_id': image_info.image_id, |
2673 | 'target_version': image_info.ceph_version, | |
9f95a23c TL |
2674 | 'needs_update': dict(), |
2675 | 'up_to_date': list(), | |
f67539c2 | 2676 | 'non_ceph_image_daemons': list() |
9f95a23c TL |
2677 | } |
2678 | for host, dm in self.cache.daemons.items(): | |
2679 | for name, dd in dm.items(): | |
f91f0fd5 | 2680 | if image_info.image_id == dd.container_image_id: |
9f95a23c | 2681 | r['up_to_date'].append(dd.name()) |
a4b75251 | 2682 | elif dd.daemon_type in CEPH_IMAGE_TYPES: |
9f95a23c TL |
2683 | r['needs_update'][dd.name()] = { |
2684 | 'current_name': dd.container_image_name, | |
2685 | 'current_id': dd.container_image_id, | |
2686 | 'current_version': dd.version, | |
2687 | } | |
f67539c2 TL |
2688 | else: |
2689 | r['non_ceph_image_daemons'].append(dd.name()) | |
2690 | if self.use_repo_digest and image_info.repo_digests: | |
2691 | # FIXME: we assume the first digest is the best one to use | |
2692 | r['target_digest'] = image_info.repo_digests[0] | |
f91f0fd5 | 2693 | |
9f95a23c TL |
2694 | return json.dumps(r, indent=4, sort_keys=True) |
2695 | ||
f67539c2 | 2696 | @handle_orch_error |
f6b5b4d7 | 2697 | def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: |
e306af50 | 2698 | return self.upgrade.upgrade_status() |
9f95a23c | 2699 | |
a4b75251 TL |
2700 | @handle_orch_error |
2701 | def upgrade_ls(self, image: Optional[str], tags: bool) -> Dict[Any, Any]: | |
2702 | return self.upgrade.upgrade_ls(image, tags) | |
2703 | ||
f67539c2 | 2704 | @handle_orch_error |
adb31ebb | 2705 | def upgrade_start(self, image: str, version: str) -> str: |
f67539c2 TL |
2706 | if self.inventory.get_host_with_state("maintenance"): |
2707 | raise OrchestratorError("upgrade aborted - you have host(s) in maintenance state") | |
e306af50 | 2708 | return self.upgrade.upgrade_start(image, version) |
9f95a23c | 2709 | |
f67539c2 | 2710 | @handle_orch_error |
f6b5b4d7 | 2711 | def upgrade_pause(self) -> str: |
e306af50 | 2712 | return self.upgrade.upgrade_pause() |
9f95a23c | 2713 | |
f67539c2 | 2714 | @handle_orch_error |
f6b5b4d7 | 2715 | def upgrade_resume(self) -> str: |
e306af50 | 2716 | return self.upgrade.upgrade_resume() |
9f95a23c | 2717 | |
f67539c2 | 2718 | @handle_orch_error |
f6b5b4d7 | 2719 | def upgrade_stop(self) -> str: |
e306af50 | 2720 | return self.upgrade.upgrade_stop() |
9f95a23c | 2721 | |
f67539c2 | 2722 | @handle_orch_error |
9f95a23c TL |
2723 | def remove_osds(self, osd_ids: List[str], |
2724 | replace: bool = False, | |
a4b75251 TL |
2725 | force: bool = False, |
2726 | zap: bool = False) -> str: | |
9f95a23c TL |
2727 | """ |
2728 | Takes a list of OSDs and schedules them for removal. | |
2729 | The function that takes care of the actual removal is | |
f6b5b4d7 | 2730 | process_removal_queue(). |
9f95a23c TL |
2731 | """ |
2732 | ||
f6b5b4d7 TL |
2733 | daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd') |
2734 | to_remove_daemons = list() | |
9f95a23c | 2735 | for daemon in daemons: |
f6b5b4d7 TL |
2736 | if daemon.daemon_id in osd_ids: |
2737 | to_remove_daemons.append(daemon) | |
2738 | if not to_remove_daemons: | |
2739 | return f"Unable to find OSDs: {osd_ids}" | |
9f95a23c | 2740 | |
f6b5b4d7 | 2741 | for daemon in to_remove_daemons: |
f67539c2 | 2742 | assert daemon.daemon_id is not None |
f6b5b4d7 TL |
2743 | try: |
2744 | self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id), | |
2745 | replace=replace, | |
2746 | force=force, | |
a4b75251 | 2747 | zap=zap, |
f6b5b4d7 | 2748 | hostname=daemon.hostname, |
adb31ebb TL |
2749 | process_started_at=datetime_now(), |
2750 | remove_util=self.to_remove_osds.rm_util)) | |
f6b5b4d7 TL |
2751 | except NotFoundError: |
2752 | return f"Unable to find OSDs: {osd_ids}" | |
9f95a23c TL |
2753 | |
2754 | # trigger the serve loop to initiate the removal | |
2755 | self._kick_serve_loop() | |
2756 | return "Scheduled OSD(s) for removal" | |
2757 | ||
f67539c2 | 2758 | @handle_orch_error |
adb31ebb | 2759 | def stop_remove_osds(self, osd_ids: List[str]) -> str: |
f6b5b4d7 TL |
2760 | """ |
2761 | Stops a `removal` process for a List of OSDs. | |
2762 | This will revert their weight and remove it from the osds_to_remove queue | |
2763 | """ | |
2764 | for osd_id in osd_ids: | |
2765 | try: | |
2766 | self.to_remove_osds.rm(OSD(osd_id=int(osd_id), | |
adb31ebb | 2767 | remove_util=self.to_remove_osds.rm_util)) |
f67539c2 | 2768 | except (NotFoundError, KeyError, ValueError): |
f6b5b4d7 TL |
2769 | return f'Unable to find OSD in the queue: {osd_id}' |
2770 | ||
2771 | # trigger the serve loop to halt the removal | |
2772 | self._kick_serve_loop() | |
2773 | return "Stopped OSD(s) removal" | |
2774 | ||
f67539c2 | 2775 | @handle_orch_error |
adb31ebb | 2776 | def remove_osds_status(self) -> List[OSD]: |
9f95a23c TL |
2777 | """ |
2778 | The CLI call to retrieve an osd removal report | |
2779 | """ | |
f6b5b4d7 | 2780 | return self.to_remove_osds.all_osds() |
522d829b TL |
2781 | |
2782 | @handle_orch_error | |
2783 | def drain_host(self, hostname): | |
2784 | # type: (str) -> str | |
2785 | """ | |
2786 | Drain all daemons from a host. | |
2787 | :param host: host name | |
2788 | """ | |
2789 | self.add_host_label(hostname, '_no_schedule') | |
2790 | ||
2791 | daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_host(hostname) | |
2792 | ||
2793 | osds_to_remove = [d.daemon_id for d in daemons if d.daemon_type == 'osd'] | |
2794 | self.remove_osds(osds_to_remove) | |
2795 | ||
2796 | daemons_table = "" | |
2797 | daemons_table += "{:<20} {:<15}\n".format("type", "id") | |
2798 | daemons_table += "{:<20} {:<15}\n".format("-" * 20, "-" * 15) | |
2799 | for d in daemons: | |
2800 | daemons_table += "{:<20} {:<15}\n".format(d.daemon_type, d.daemon_id) | |
2801 | ||
2802 | return "Scheduled to remove the following daemons from host '{}'\n{}".format(hostname, daemons_table) | |
2803 | ||
2804 | def trigger_connect_dashboard_rgw(self) -> None: | |
2805 | self.need_connect_dashboard_rgw = True | |
2806 | self.event.set() |