]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import json |
2 | import errno | |
3 | import logging | |
f91f0fd5 TL |
4 | import re |
5 | import shlex | |
e306af50 | 6 | from collections import defaultdict |
f91f0fd5 | 7 | from configparser import ConfigParser |
f6b5b4d7 | 8 | from contextlib import contextmanager |
9f95a23c | 9 | from functools import wraps |
e306af50 TL |
10 | from tempfile import TemporaryDirectory |
11 | from threading import Event | |
9f95a23c TL |
12 | |
13 | import string | |
e306af50 | 14 | from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ |
f91f0fd5 | 15 | Any, Set, TYPE_CHECKING, cast, Iterator, Union, NamedTuple |
9f95a23c TL |
16 | |
17 | import datetime | |
18 | import six | |
19 | import os | |
20 | import random | |
21 | import tempfile | |
22 | import multiprocessing.pool | |
9f95a23c | 23 | import subprocess |
9f95a23c | 24 | |
e306af50 | 25 | from ceph.deployment import inventory |
9f95a23c | 26 | from ceph.deployment.drive_group import DriveGroupSpec |
801d1391 | 27 | from ceph.deployment.service_spec import \ |
f91f0fd5 | 28 | NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \ |
adb31ebb TL |
29 | CustomContainerSpec, HostPlacementSpec |
30 | from ceph.utils import str_to_datetime, datetime_to_str, datetime_now | |
f91f0fd5 | 31 | from cephadm.serve import CephadmServe |
f6b5b4d7 | 32 | from cephadm.services.cephadmservice import CephadmDaemonSpec |
9f95a23c | 33 | |
801d1391 | 34 | from mgr_module import MgrModule, HandleCommandResult |
9f95a23c TL |
35 | import orchestrator |
36 | from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ | |
f6b5b4d7 | 37 | CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription |
e306af50 | 38 | from orchestrator._interface import GenericSpec |
9f95a23c TL |
39 | |
40 | from . import remotes | |
801d1391 | 41 | from . import utils |
f6b5b4d7 | 42 | from .migrations import Migrations |
e306af50 TL |
43 | from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \ |
44 | RbdMirrorService, CrashService, CephadmService | |
f91f0fd5 | 45 | from .services.container import CustomContainerService |
e306af50 TL |
46 | from .services.iscsi import IscsiService |
47 | from .services.nfs import NFSService | |
adb31ebb | 48 | from .services.osd import RemoveUtil, OSDRemovalQueue, OSDService, OSD, NotFoundError |
e306af50 TL |
49 | from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ |
50 | NodeExporterService | |
f91f0fd5 | 51 | from .schedule import HostAssignment |
f6b5b4d7 | 52 | from .inventory import Inventory, SpecStore, HostCache, EventStore |
e306af50 TL |
53 | from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade |
54 | from .template import TemplateMgr | |
adb31ebb | 55 | from .utils import forall_hosts, CephadmNoImage, cephadmNoImage |
9f95a23c TL |
56 | |
57 | try: | |
58 | import remoto | |
e306af50 TL |
59 | # NOTE(mattoliverau) Patch remoto until remoto PR |
60 | # (https://github.com/alfredodeza/remoto/pull/56) lands | |
61 | from distutils.version import StrictVersion | |
62 | if StrictVersion(remoto.__version__) <= StrictVersion('1.2'): | |
adb31ebb | 63 | def remoto_has_connection(self: Any) -> bool: |
e306af50 TL |
64 | return self.gateway.hasreceiver() |
65 | ||
66 | from remoto.backends import BaseConnection | |
67 | BaseConnection.has_connection = remoto_has_connection | |
9f95a23c TL |
68 | import remoto.process |
69 | import execnet.gateway_bootstrap | |
70 | except ImportError as e: | |
71 | remoto = None | |
72 | remoto_import_error = str(e) | |
73 | ||
74 | try: | |
75 | from typing import List | |
76 | except ImportError: | |
77 | pass | |
78 | ||
79 | logger = logging.getLogger(__name__) | |
80 | ||
e306af50 TL |
81 | T = TypeVar('T') |
82 | ||
1911f103 TL |
83 | DEFAULT_SSH_CONFIG = """ |
84 | Host * | |
85 | User root | |
86 | StrictHostKeyChecking no | |
87 | UserKnownHostsFile /dev/null | |
88 | ConnectTimeout=30 | |
89 | """ | |
9f95a23c | 90 | |
9f95a23c TL |
91 | CEPH_TYPES = set(CEPH_UPGRADE_ORDER) |
92 | ||
93 | ||
f6b5b4d7 | 94 | class CephadmCompletion(orchestrator.Completion[T]): |
adb31ebb | 95 | def evaluate(self) -> None: |
e306af50 | 96 | self.finalize(None) |
9f95a23c | 97 | |
f91f0fd5 | 98 | |
f6b5b4d7 | 99 | def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]: |
9f95a23c | 100 | """ |
e306af50 TL |
101 | Decorator to make CephadmCompletion methods return |
102 | a completion object that executes themselves. | |
9f95a23c | 103 | """ |
9f95a23c | 104 | |
9f95a23c | 105 | @wraps(f) |
adb31ebb | 106 | def wrapper(*args: Any, **kwargs: Any) -> CephadmCompletion: |
e306af50 TL |
107 | return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs)) |
108 | ||
9f95a23c TL |
109 | return wrapper |
110 | ||
111 | ||
f91f0fd5 TL |
112 | class ContainerInspectInfo(NamedTuple): |
113 | image_id: str | |
114 | ceph_version: Optional[str] | |
115 | repo_digest: Optional[str] | |
116 | ||
117 | ||
9f95a23c TL |
118 | @six.add_metaclass(CLICommandMeta) |
119 | class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): | |
120 | ||
121 | _STORE_HOST_PREFIX = "host" | |
122 | ||
123 | instance = None | |
124 | NATIVE_OPTIONS = [] # type: List[Any] | |
f6b5b4d7 | 125 | MODULE_OPTIONS: List[dict] = [ |
9f95a23c TL |
126 | { |
127 | 'name': 'ssh_config_file', | |
128 | 'type': 'str', | |
129 | 'default': None, | |
130 | 'desc': 'customized SSH config file to connect to managed hosts', | |
131 | }, | |
132 | { | |
133 | 'name': 'device_cache_timeout', | |
134 | 'type': 'secs', | |
135 | 'default': 30 * 60, | |
136 | 'desc': 'seconds to cache device inventory', | |
137 | }, | |
138 | { | |
139 | 'name': 'daemon_cache_timeout', | |
140 | 'type': 'secs', | |
141 | 'default': 10 * 60, | |
142 | 'desc': 'seconds to cache service (daemon) inventory', | |
143 | }, | |
adb31ebb TL |
144 | { |
145 | 'name': 'facts_cache_timeout', | |
146 | 'type': 'secs', | |
147 | 'default': 1 * 60, | |
148 | 'desc': 'seconds to cache host facts data', | |
149 | }, | |
9f95a23c TL |
150 | { |
151 | 'name': 'host_check_interval', | |
152 | 'type': 'secs', | |
153 | 'default': 10 * 60, | |
154 | 'desc': 'how frequently to perform a host check', | |
155 | }, | |
156 | { | |
157 | 'name': 'mode', | |
158 | 'type': 'str', | |
159 | 'enum_allowed': ['root', 'cephadm-package'], | |
160 | 'default': 'root', | |
161 | 'desc': 'mode for remote execution of cephadm', | |
162 | }, | |
163 | { | |
164 | 'name': 'container_image_base', | |
801d1391 | 165 | 'default': 'docker.io/ceph/ceph', |
9f95a23c TL |
166 | 'desc': 'Container image name, without the tag', |
167 | 'runtime': True, | |
168 | }, | |
e306af50 TL |
169 | { |
170 | 'name': 'container_image_prometheus', | |
f91f0fd5 | 171 | 'default': 'docker.io/prom/prometheus:v2.18.1', |
e306af50 TL |
172 | 'desc': 'Prometheus container image', |
173 | }, | |
174 | { | |
175 | 'name': 'container_image_grafana', | |
cd265ab1 | 176 | 'default': 'docker.io/ceph/ceph-grafana:6.7.4', |
e306af50 TL |
177 | 'desc': 'Prometheus container image', |
178 | }, | |
179 | { | |
180 | 'name': 'container_image_alertmanager', | |
f91f0fd5 | 181 | 'default': 'docker.io/prom/alertmanager:v0.20.0', |
e306af50 TL |
182 | 'desc': 'Prometheus container image', |
183 | }, | |
184 | { | |
185 | 'name': 'container_image_node_exporter', | |
f91f0fd5 | 186 | 'default': 'docker.io/prom/node-exporter:v0.18.1', |
e306af50 TL |
187 | 'desc': 'Prometheus container image', |
188 | }, | |
9f95a23c TL |
189 | { |
190 | 'name': 'warn_on_stray_hosts', | |
191 | 'type': 'bool', | |
192 | 'default': True, | |
193 | 'desc': 'raise a health warning if daemons are detected on a host ' | |
194 | 'that is not managed by cephadm', | |
195 | }, | |
196 | { | |
197 | 'name': 'warn_on_stray_daemons', | |
198 | 'type': 'bool', | |
199 | 'default': True, | |
200 | 'desc': 'raise a health warning if daemons are detected ' | |
201 | 'that are not managed by cephadm', | |
202 | }, | |
203 | { | |
204 | 'name': 'warn_on_failed_host_check', | |
205 | 'type': 'bool', | |
206 | 'default': True, | |
207 | 'desc': 'raise a health warning if the host check fails', | |
208 | }, | |
209 | { | |
210 | 'name': 'log_to_cluster', | |
211 | 'type': 'bool', | |
212 | 'default': True, | |
213 | 'desc': 'log to the "cephadm" cluster log channel"', | |
214 | }, | |
215 | { | |
216 | 'name': 'allow_ptrace', | |
217 | 'type': 'bool', | |
218 | 'default': False, | |
219 | 'desc': 'allow SYS_PTRACE capability on ceph containers', | |
220 | 'long_desc': 'The SYS_PTRACE capability is needed to attach to a ' | |
221 | 'process with gdb or strace. Enabling this options ' | |
222 | 'can allow debugging daemons that encounter problems ' | |
223 | 'at runtime.', | |
224 | }, | |
f91f0fd5 TL |
225 | { |
226 | 'name': 'container_init', | |
227 | 'type': 'bool', | |
228 | 'default': False, | |
229 | 'desc': 'Run podman/docker with `--init`', | |
230 | }, | |
801d1391 TL |
231 | { |
232 | 'name': 'prometheus_alerts_path', | |
233 | 'type': 'str', | |
234 | 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml', | |
235 | 'desc': 'location of alerts to include in prometheus deployments', | |
236 | }, | |
f6b5b4d7 TL |
237 | { |
238 | 'name': 'migration_current', | |
239 | 'type': 'int', | |
240 | 'default': None, | |
241 | 'desc': 'internal - do not modify', | |
242 | # used to track track spec and other data migrations. | |
243 | }, | |
244 | { | |
245 | 'name': 'config_dashboard', | |
246 | 'type': 'bool', | |
247 | 'default': True, | |
248 | 'desc': 'manage configs like API endpoints in Dashboard.' | |
249 | }, | |
250 | { | |
251 | 'name': 'manage_etc_ceph_ceph_conf', | |
252 | 'type': 'bool', | |
253 | 'default': False, | |
254 | 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.', | |
255 | }, | |
256 | { | |
257 | 'name': 'registry_url', | |
258 | 'type': 'str', | |
259 | 'default': None, | |
260 | 'desc': 'Custom repository url' | |
261 | }, | |
262 | { | |
263 | 'name': 'registry_username', | |
264 | 'type': 'str', | |
265 | 'default': None, | |
266 | 'desc': 'Custom repository username' | |
267 | }, | |
268 | { | |
269 | 'name': 'registry_password', | |
270 | 'type': 'str', | |
271 | 'default': None, | |
272 | 'desc': 'Custom repository password' | |
273 | }, | |
f91f0fd5 TL |
274 | { |
275 | 'name': 'use_repo_digest', | |
276 | 'type': 'bool', | |
277 | 'default': False, | |
278 | 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image', | |
279 | } | |
9f95a23c TL |
280 | ] |
281 | ||
adb31ebb | 282 | def __init__(self, *args: Any, **kwargs: Any): |
9f95a23c TL |
283 | super(CephadmOrchestrator, self).__init__(*args, **kwargs) |
284 | self._cluster_fsid = self.get('mon_map')['fsid'] | |
f6b5b4d7 | 285 | self.last_monmap: Optional[datetime.datetime] = None |
9f95a23c TL |
286 | |
287 | # for serve() | |
288 | self.run = True | |
289 | self.event = Event() | |
290 | ||
291 | if self.get_store('pause'): | |
292 | self.paused = True | |
293 | else: | |
294 | self.paused = False | |
295 | ||
296 | # for mypy which does not run the code | |
297 | if TYPE_CHECKING: | |
298 | self.ssh_config_file = None # type: Optional[str] | |
299 | self.device_cache_timeout = 0 | |
300 | self.daemon_cache_timeout = 0 | |
adb31ebb | 301 | self.facts_cache_timeout = 0 |
9f95a23c TL |
302 | self.host_check_interval = 0 |
303 | self.mode = '' | |
304 | self.container_image_base = '' | |
e306af50 TL |
305 | self.container_image_prometheus = '' |
306 | self.container_image_grafana = '' | |
307 | self.container_image_alertmanager = '' | |
308 | self.container_image_node_exporter = '' | |
9f95a23c TL |
309 | self.warn_on_stray_hosts = True |
310 | self.warn_on_stray_daemons = True | |
311 | self.warn_on_failed_host_check = True | |
312 | self.allow_ptrace = False | |
f91f0fd5 | 313 | self.container_init = False |
801d1391 | 314 | self.prometheus_alerts_path = '' |
adb31ebb | 315 | self.migration_current: Optional[int] = None |
f6b5b4d7 TL |
316 | self.config_dashboard = True |
317 | self.manage_etc_ceph_ceph_conf = True | |
318 | self.registry_url: Optional[str] = None | |
319 | self.registry_username: Optional[str] = None | |
320 | self.registry_password: Optional[str] = None | |
f91f0fd5 | 321 | self.use_repo_digest = False |
9f95a23c | 322 | |
f91f0fd5 TL |
323 | self._cons: Dict[str, Tuple[remoto.backends.BaseConnection, |
324 | remoto.backends.LegacyModuleExecute]] = {} | |
f6b5b4d7 TL |
325 | |
326 | self.notify('mon_map', None) | |
9f95a23c TL |
327 | self.config_notify() |
328 | ||
329 | path = self.get_ceph_option('cephadm_path') | |
330 | try: | |
331 | with open(path, 'r') as f: | |
332 | self._cephadm = f.read() | |
333 | except (IOError, TypeError) as e: | |
334 | raise RuntimeError("unable to read cephadm at '%s': %s" % ( | |
335 | path, str(e))) | |
336 | ||
337 | self._worker_pool = multiprocessing.pool.ThreadPool(10) | |
338 | ||
339 | self._reconfig_ssh() | |
340 | ||
341 | CephadmOrchestrator.instance = self | |
342 | ||
e306af50 | 343 | self.upgrade = CephadmUpgrade(self) |
9f95a23c | 344 | |
adb31ebb | 345 | self.health_checks: Dict[str, dict] = {} |
9f95a23c TL |
346 | |
347 | self.all_progress_references = list() # type: List[orchestrator.ProgressReference] | |
348 | ||
e306af50 | 349 | self.inventory = Inventory(self) |
9f95a23c TL |
350 | |
351 | self.cache = HostCache(self) | |
352 | self.cache.load() | |
f6b5b4d7 | 353 | |
adb31ebb TL |
354 | self.to_remove_osds = OSDRemovalQueue(self) |
355 | self.to_remove_osds.load_from_store() | |
9f95a23c TL |
356 | |
357 | self.spec_store = SpecStore(self) | |
358 | self.spec_store.load() | |
359 | ||
360 | # ensure the host lists are in sync | |
361 | for h in self.inventory.keys(): | |
362 | if h not in self.cache.daemons: | |
363 | self.cache.prime_empty_host(h) | |
364 | for h in self.cache.get_hosts(): | |
365 | if h not in self.inventory: | |
366 | self.cache.rm_host(h) | |
367 | ||
1911f103 | 368 | # in-memory only. |
f6b5b4d7 | 369 | self.events = EventStore(self) |
1911f103 TL |
370 | self.offline_hosts: Set[str] = set() |
371 | ||
f6b5b4d7 TL |
372 | self.migration = Migrations(self) |
373 | ||
e306af50 TL |
374 | # services: |
375 | self.osd_service = OSDService(self) | |
376 | self.nfs_service = NFSService(self) | |
377 | self.mon_service = MonService(self) | |
378 | self.mgr_service = MgrService(self) | |
379 | self.mds_service = MdsService(self) | |
380 | self.rgw_service = RgwService(self) | |
381 | self.rbd_mirror_service = RbdMirrorService(self) | |
382 | self.grafana_service = GrafanaService(self) | |
383 | self.alertmanager_service = AlertmanagerService(self) | |
384 | self.prometheus_service = PrometheusService(self) | |
385 | self.node_exporter_service = NodeExporterService(self) | |
386 | self.crash_service = CrashService(self) | |
387 | self.iscsi_service = IscsiService(self) | |
f91f0fd5 | 388 | self.container_service = CustomContainerService(self) |
e306af50 TL |
389 | self.cephadm_services = { |
390 | 'mon': self.mon_service, | |
391 | 'mgr': self.mgr_service, | |
392 | 'osd': self.osd_service, | |
393 | 'mds': self.mds_service, | |
394 | 'rgw': self.rgw_service, | |
395 | 'rbd-mirror': self.rbd_mirror_service, | |
396 | 'nfs': self.nfs_service, | |
397 | 'grafana': self.grafana_service, | |
398 | 'alertmanager': self.alertmanager_service, | |
399 | 'prometheus': self.prometheus_service, | |
400 | 'node-exporter': self.node_exporter_service, | |
401 | 'crash': self.crash_service, | |
402 | 'iscsi': self.iscsi_service, | |
f91f0fd5 | 403 | 'container': self.container_service, |
e306af50 TL |
404 | } |
405 | ||
f91f0fd5 | 406 | self.template = TemplateMgr(self) |
e306af50 | 407 | |
adb31ebb | 408 | self.requires_post_actions: Set[str] = set() |
f6b5b4d7 | 409 | |
adb31ebb | 410 | def shutdown(self) -> None: |
9f95a23c TL |
411 | self.log.debug('shutdown') |
412 | self._worker_pool.close() | |
413 | self._worker_pool.join() | |
414 | self.run = False | |
415 | self.event.set() | |
416 | ||
e306af50 TL |
417 | def _get_cephadm_service(self, service_type: str) -> CephadmService: |
418 | assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES | |
419 | return self.cephadm_services[service_type] | |
420 | ||
adb31ebb | 421 | def _kick_serve_loop(self) -> None: |
9f95a23c TL |
422 | self.log.debug('_kick_serve_loop') |
423 | self.event.set() | |
424 | ||
f6b5b4d7 | 425 | # function responsible for logging single host into custom registry |
adb31ebb | 426 | def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]: |
f6b5b4d7 TL |
427 | self.log.debug(f"Attempting to log host {host} into custom registry @ {url}") |
428 | # want to pass info over stdin rather than through normal list of args | |
adb31ebb TL |
429 | args_str = json.dumps({ |
430 | 'url': url, | |
431 | 'username': username, | |
432 | 'password': password, | |
433 | }) | |
f6b5b4d7 TL |
434 | out, err, code = self._run_cephadm( |
435 | host, 'mon', 'registry-login', | |
436 | ['--registry-json', '-'], stdin=args_str, error_ok=True) | |
437 | if code: | |
438 | return f"Host {host} failed to login to {url} as {username} with given password" | |
adb31ebb | 439 | return None |
9f95a23c | 440 | |
f6b5b4d7 TL |
441 | def serve(self) -> None: |
442 | """ | |
443 | The main loop of cephadm. | |
444 | ||
445 | A command handler will typically change the declarative state | |
446 | of cephadm. This loop will then attempt to apply this new state. | |
447 | """ | |
f91f0fd5 TL |
448 | serve = CephadmServe(self) |
449 | serve.serve() | |
450 | ||
adb31ebb | 451 | def set_container_image(self, entity: str, image: str) -> None: |
f91f0fd5 TL |
452 | self.check_mon_command({ |
453 | 'prefix': 'config set', | |
454 | 'name': 'container_image', | |
455 | 'value': image, | |
456 | 'who': entity, | |
457 | }) | |
f6b5b4d7 | 458 | |
adb31ebb | 459 | def config_notify(self) -> None: |
9f95a23c TL |
460 | """ |
461 | This method is called whenever one of our config options is changed. | |
f6b5b4d7 TL |
462 | |
463 | TODO: this method should be moved into mgr_module.py | |
9f95a23c TL |
464 | """ |
465 | for opt in self.MODULE_OPTIONS: | |
466 | setattr(self, | |
467 | opt['name'], # type: ignore | |
468 | self.get_module_option(opt['name'])) # type: ignore | |
469 | self.log.debug(' mgr option %s = %s', | |
470 | opt['name'], getattr(self, opt['name'])) # type: ignore | |
471 | for opt in self.NATIVE_OPTIONS: | |
472 | setattr(self, | |
473 | opt, # type: ignore | |
474 | self.get_ceph_option(opt)) | |
475 | self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore | |
476 | ||
477 | self.event.set() | |
478 | ||
adb31ebb | 479 | def notify(self, notify_type: str, notify_id: Optional[str]) -> None: |
f6b5b4d7 TL |
480 | if notify_type == "mon_map": |
481 | # get monmap mtime so we can refresh configs when mons change | |
482 | monmap = self.get('mon_map') | |
adb31ebb TL |
483 | self.last_monmap = str_to_datetime(monmap['modified']) |
484 | if self.last_monmap and self.last_monmap > datetime_now(): | |
f6b5b4d7 | 485 | self.last_monmap = None # just in case clocks are skewed |
f91f0fd5 TL |
486 | if getattr(self, 'manage_etc_ceph_ceph_conf', False): |
487 | # getattr, due to notify() being called before config_notify() | |
488 | self._kick_serve_loop() | |
f6b5b4d7 TL |
489 | if notify_type == "pg_summary": |
490 | self._trigger_osd_removal() | |
491 | ||
adb31ebb | 492 | def _trigger_osd_removal(self) -> None: |
f6b5b4d7 TL |
493 | data = self.get("osd_stats") |
494 | for osd in data.get('osd_stats', []): | |
495 | if osd.get('num_pgs') == 0: | |
496 | # if _ANY_ osd that is currently in the queue appears to be empty, | |
497 | # start the removal process | |
498 | if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids(): | |
499 | self.log.debug(f"Found empty osd. Starting removal process") | |
500 | # if the osd that is now empty is also part of the removal queue | |
501 | # start the process | |
adb31ebb | 502 | self._kick_serve_loop() |
9f95a23c | 503 | |
adb31ebb | 504 | def pause(self) -> None: |
9f95a23c TL |
505 | if not self.paused: |
506 | self.log.info('Paused') | |
507 | self.set_store('pause', 'true') | |
508 | self.paused = True | |
509 | # wake loop so we update the health status | |
510 | self._kick_serve_loop() | |
511 | ||
adb31ebb | 512 | def resume(self) -> None: |
9f95a23c TL |
513 | if self.paused: |
514 | self.log.info('Resumed') | |
515 | self.paused = False | |
516 | self.set_store('pause', None) | |
517 | # unconditionally wake loop so that 'orch resume' can be used to kick | |
518 | # cephadm | |
519 | self._kick_serve_loop() | |
520 | ||
521 | def get_unique_name(self, daemon_type, host, existing, prefix=None, | |
522 | forcename=None): | |
523 | # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str | |
524 | """ | |
525 | Generate a unique random service name | |
526 | """ | |
527 | suffix = daemon_type not in [ | |
801d1391 | 528 | 'mon', 'crash', 'nfs', |
9f95a23c | 529 | 'prometheus', 'node-exporter', 'grafana', 'alertmanager', |
f91f0fd5 | 530 | 'container' |
9f95a23c TL |
531 | ] |
532 | if forcename: | |
533 | if len([d for d in existing if d.daemon_id == forcename]): | |
f91f0fd5 TL |
534 | raise orchestrator.OrchestratorValidationError( |
535 | f'name {daemon_type}.{forcename} already in use') | |
9f95a23c TL |
536 | return forcename |
537 | ||
538 | if '.' in host: | |
539 | host = host.split('.')[0] | |
540 | while True: | |
541 | if prefix: | |
542 | name = prefix + '.' | |
543 | else: | |
544 | name = '' | |
545 | name += host | |
546 | if suffix: | |
547 | name += '.' + ''.join(random.choice(string.ascii_lowercase) | |
548 | for _ in range(6)) | |
549 | if len([d for d in existing if d.daemon_id == name]): | |
550 | if not suffix: | |
f91f0fd5 TL |
551 | raise orchestrator.OrchestratorValidationError( |
552 | f'name {daemon_type}.{name} already in use') | |
9f95a23c TL |
553 | self.log.debug('name %s exists, trying again', name) |
554 | continue | |
555 | return name | |
556 | ||
adb31ebb | 557 | def _reconfig_ssh(self) -> None: |
9f95a23c TL |
558 | temp_files = [] # type: list |
559 | ssh_options = [] # type: List[str] | |
560 | ||
561 | # ssh_config | |
562 | ssh_config_fname = self.ssh_config_file | |
563 | ssh_config = self.get_store("ssh_config") | |
564 | if ssh_config is not None or ssh_config_fname is None: | |
565 | if not ssh_config: | |
566 | ssh_config = DEFAULT_SSH_CONFIG | |
567 | f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-') | |
568 | os.fchmod(f.fileno(), 0o600) | |
569 | f.write(ssh_config.encode('utf-8')) | |
570 | f.flush() # make visible to other processes | |
571 | temp_files += [f] | |
572 | ssh_config_fname = f.name | |
573 | if ssh_config_fname: | |
801d1391 | 574 | self.validate_ssh_config_fname(ssh_config_fname) |
9f95a23c | 575 | ssh_options += ['-F', ssh_config_fname] |
f6b5b4d7 | 576 | self.ssh_config = ssh_config |
9f95a23c TL |
577 | |
578 | # identity | |
579 | ssh_key = self.get_store("ssh_identity_key") | |
580 | ssh_pub = self.get_store("ssh_identity_pub") | |
581 | self.ssh_pub = ssh_pub | |
582 | self.ssh_key = ssh_key | |
583 | if ssh_key and ssh_pub: | |
584 | tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-') | |
585 | tkey.write(ssh_key.encode('utf-8')) | |
586 | os.fchmod(tkey.fileno(), 0o600) | |
587 | tkey.flush() # make visible to other processes | |
588 | tpub = open(tkey.name + '.pub', 'w') | |
589 | os.fchmod(tpub.fileno(), 0o600) | |
590 | tpub.write(ssh_pub) | |
591 | tpub.flush() # make visible to other processes | |
592 | temp_files += [tkey, tpub] | |
593 | ssh_options += ['-i', tkey.name] | |
594 | ||
595 | self._temp_files = temp_files | |
596 | if ssh_options: | |
597 | self._ssh_options = ' '.join(ssh_options) # type: Optional[str] | |
598 | else: | |
599 | self._ssh_options = None | |
600 | ||
601 | if self.mode == 'root': | |
f6b5b4d7 | 602 | self.ssh_user = self.get_store('ssh_user', default='root') |
9f95a23c TL |
603 | elif self.mode == 'cephadm-package': |
604 | self.ssh_user = 'cephadm' | |
605 | ||
606 | self._reset_cons() | |
607 | ||
adb31ebb | 608 | def validate_ssh_config_content(self, ssh_config: Optional[str]) -> None: |
f91f0fd5 TL |
609 | if ssh_config is None or len(ssh_config.strip()) == 0: |
610 | raise OrchestratorValidationError('ssh_config cannot be empty') | |
611 | # StrictHostKeyChecking is [yes|no] ? | |
612 | l = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config) | |
613 | if not l: | |
614 | raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking') | |
615 | for s in l: | |
616 | if 'ask' in s.lower(): | |
617 | raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'') | |
618 | ||
adb31ebb | 619 | def validate_ssh_config_fname(self, ssh_config_fname: str) -> None: |
801d1391 TL |
620 | if not os.path.isfile(ssh_config_fname): |
621 | raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format( | |
622 | ssh_config_fname)) | |
623 | ||
adb31ebb | 624 | def _reset_con(self, host: str) -> None: |
9f95a23c TL |
625 | conn, r = self._cons.get(host, (None, None)) |
626 | if conn: | |
627 | self.log.debug('_reset_con close %s' % host) | |
628 | conn.exit() | |
629 | del self._cons[host] | |
630 | ||
adb31ebb | 631 | def _reset_cons(self) -> None: |
9f95a23c TL |
632 | for host, conn_and_r in self._cons.items(): |
633 | self.log.debug('_reset_cons close %s' % host) | |
634 | conn, r = conn_and_r | |
635 | conn.exit() | |
636 | self._cons = {} | |
637 | ||
adb31ebb | 638 | def offline_hosts_remove(self, host: str) -> None: |
1911f103 TL |
639 | if host in self.offline_hosts: |
640 | self.offline_hosts.remove(host) | |
641 | ||
9f95a23c | 642 | @staticmethod |
adb31ebb | 643 | def can_run() -> Tuple[bool, str]: |
9f95a23c TL |
644 | if remoto is not None: |
645 | return True, "" | |
646 | else: | |
647 | return False, "loading remoto library:{}".format( | |
f91f0fd5 | 648 | remoto_import_error) |
9f95a23c | 649 | |
adb31ebb | 650 | def available(self) -> Tuple[bool, str]: |
9f95a23c TL |
651 | """ |
652 | The cephadm orchestrator is always available. | |
653 | """ | |
f6b5b4d7 TL |
654 | ok, err = self.can_run() |
655 | if not ok: | |
656 | return ok, err | |
657 | if not self.ssh_key or not self.ssh_pub: | |
658 | return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`' | |
659 | return True, '' | |
9f95a23c | 660 | |
adb31ebb | 661 | def process(self, completions: List[CephadmCompletion]) -> None: |
9f95a23c TL |
662 | """ |
663 | Does nothing, as completions are processed in another thread. | |
664 | """ | |
665 | if completions: | |
f91f0fd5 TL |
666 | self.log.debug("process: completions={0}".format( |
667 | orchestrator.pretty_print(completions))) | |
9f95a23c TL |
668 | |
669 | for p in completions: | |
e306af50 | 670 | p.evaluate() |
9f95a23c TL |
671 | |
672 | @orchestrator._cli_write_command( | |
673 | prefix='cephadm set-ssh-config', | |
674 | desc='Set the ssh_config file (use -i <ssh_config>)') | |
adb31ebb | 675 | def _set_ssh_config(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: |
9f95a23c TL |
676 | """ |
677 | Set an ssh_config file provided from stdin | |
9f95a23c | 678 | """ |
f6b5b4d7 TL |
679 | if inbuf == self.ssh_config: |
680 | return 0, "value unchanged", "" | |
f91f0fd5 | 681 | self.validate_ssh_config_content(inbuf) |
9f95a23c TL |
682 | self.set_store("ssh_config", inbuf) |
683 | self.log.info('Set ssh_config') | |
f6b5b4d7 | 684 | self._reconfig_ssh() |
9f95a23c TL |
685 | return 0, "", "" |
686 | ||
687 | @orchestrator._cli_write_command( | |
688 | prefix='cephadm clear-ssh-config', | |
689 | desc='Clear the ssh_config file') | |
adb31ebb | 690 | def _clear_ssh_config(self) -> Tuple[int, str, str]: |
9f95a23c TL |
691 | """ |
692 | Clear the ssh_config file provided from stdin | |
693 | """ | |
694 | self.set_store("ssh_config", None) | |
695 | self.ssh_config_tmp = None | |
696 | self.log.info('Cleared ssh_config') | |
f6b5b4d7 | 697 | self._reconfig_ssh() |
9f95a23c TL |
698 | return 0, "", "" |
699 | ||
801d1391 TL |
700 | @orchestrator._cli_read_command( |
701 | prefix='cephadm get-ssh-config', | |
702 | desc='Returns the ssh config as used by cephadm' | |
703 | ) | |
adb31ebb | 704 | def _get_ssh_config(self) -> HandleCommandResult: |
801d1391 TL |
705 | if self.ssh_config_file: |
706 | self.validate_ssh_config_fname(self.ssh_config_file) | |
707 | with open(self.ssh_config_file) as f: | |
708 | return HandleCommandResult(stdout=f.read()) | |
709 | ssh_config = self.get_store("ssh_config") | |
710 | if ssh_config: | |
711 | return HandleCommandResult(stdout=ssh_config) | |
712 | return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG) | |
713 | ||
9f95a23c TL |
714 | @orchestrator._cli_write_command( |
715 | 'cephadm generate-key', | |
716 | desc='Generate a cluster SSH key (if not present)') | |
adb31ebb | 717 | def _generate_key(self) -> Tuple[int, str, str]: |
9f95a23c TL |
718 | if not self.ssh_pub or not self.ssh_key: |
719 | self.log.info('Generating ssh key...') | |
720 | tmp_dir = TemporaryDirectory() | |
721 | path = tmp_dir.name + '/key' | |
722 | try: | |
1911f103 | 723 | subprocess.check_call([ |
9f95a23c TL |
724 | '/usr/bin/ssh-keygen', |
725 | '-C', 'ceph-%s' % self._cluster_fsid, | |
726 | '-N', '', | |
727 | '-f', path | |
728 | ]) | |
729 | with open(path, 'r') as f: | |
730 | secret = f.read() | |
731 | with open(path + '.pub', 'r') as f: | |
732 | pub = f.read() | |
733 | finally: | |
734 | os.unlink(path) | |
735 | os.unlink(path + '.pub') | |
736 | tmp_dir.cleanup() | |
737 | self.set_store('ssh_identity_key', secret) | |
738 | self.set_store('ssh_identity_pub', pub) | |
739 | self._reconfig_ssh() | |
740 | return 0, '', '' | |
741 | ||
e306af50 TL |
742 | @orchestrator._cli_write_command( |
743 | 'cephadm set-priv-key', | |
744 | desc='Set cluster SSH private key (use -i <private_key>)') | |
adb31ebb | 745 | def _set_priv_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: |
e306af50 TL |
746 | if inbuf is None or len(inbuf) == 0: |
747 | return -errno.EINVAL, "", "empty private ssh key provided" | |
f6b5b4d7 TL |
748 | if inbuf == self.ssh_key: |
749 | return 0, "value unchanged", "" | |
e306af50 TL |
750 | self.set_store("ssh_identity_key", inbuf) |
751 | self.log.info('Set ssh private key') | |
752 | self._reconfig_ssh() | |
753 | return 0, "", "" | |
754 | ||
755 | @orchestrator._cli_write_command( | |
756 | 'cephadm set-pub-key', | |
757 | desc='Set cluster SSH public key (use -i <public_key>)') | |
adb31ebb | 758 | def _set_pub_key(self, inbuf: Optional[str] = None) -> Tuple[int, str, str]: |
e306af50 TL |
759 | if inbuf is None or len(inbuf) == 0: |
760 | return -errno.EINVAL, "", "empty public ssh key provided" | |
f6b5b4d7 TL |
761 | if inbuf == self.ssh_pub: |
762 | return 0, "value unchanged", "" | |
e306af50 TL |
763 | self.set_store("ssh_identity_pub", inbuf) |
764 | self.log.info('Set ssh public key') | |
765 | self._reconfig_ssh() | |
766 | return 0, "", "" | |
767 | ||
9f95a23c TL |
768 | @orchestrator._cli_write_command( |
769 | 'cephadm clear-key', | |
770 | desc='Clear cluster SSH key') | |
adb31ebb | 771 | def _clear_key(self) -> Tuple[int, str, str]: |
9f95a23c TL |
772 | self.set_store('ssh_identity_key', None) |
773 | self.set_store('ssh_identity_pub', None) | |
774 | self._reconfig_ssh() | |
775 | self.log.info('Cleared cluster SSH key') | |
776 | return 0, '', '' | |
777 | ||
778 | @orchestrator._cli_read_command( | |
779 | 'cephadm get-pub-key', | |
780 | desc='Show SSH public key for connecting to cluster hosts') | |
adb31ebb | 781 | def _get_pub_key(self) -> Tuple[int, str, str]: |
9f95a23c TL |
782 | if self.ssh_pub: |
783 | return 0, self.ssh_pub, '' | |
784 | else: | |
785 | return -errno.ENOENT, '', 'No cluster SSH key defined' | |
786 | ||
787 | @orchestrator._cli_read_command( | |
788 | 'cephadm get-user', | |
789 | desc='Show user for SSHing to cluster hosts') | |
adb31ebb | 790 | def _get_user(self) -> Tuple[int, str, str]: |
9f95a23c TL |
791 | return 0, self.ssh_user, '' |
792 | ||
f6b5b4d7 TL |
793 | @orchestrator._cli_read_command( |
794 | 'cephadm set-user', | |
795 | 'name=user,type=CephString', | |
796 | 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users') | |
adb31ebb | 797 | def set_ssh_user(self, user: str) -> Tuple[int, str, str]: |
f6b5b4d7 TL |
798 | current_user = self.ssh_user |
799 | if user == current_user: | |
800 | return 0, "value unchanged", "" | |
801 | ||
802 | self.set_store('ssh_user', user) | |
803 | self._reconfig_ssh() | |
804 | ||
805 | host = self.cache.get_hosts()[0] | |
f91f0fd5 | 806 | r = CephadmServe(self)._check_host(host) |
f6b5b4d7 | 807 | if r is not None: |
f91f0fd5 | 808 | # connection failed reset user |
f6b5b4d7 TL |
809 | self.set_store('ssh_user', current_user) |
810 | self._reconfig_ssh() | |
811 | return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host) | |
812 | ||
813 | msg = 'ssh user set to %s' % user | |
814 | if user != 'root': | |
815 | msg += ' sudo will be used' | |
816 | self.log.info(msg) | |
817 | return 0, msg, '' | |
818 | ||
819 | @orchestrator._cli_read_command( | |
820 | 'cephadm registry-login', | |
821 | "name=url,type=CephString,req=false " | |
822 | "name=username,type=CephString,req=false " | |
823 | "name=password,type=CephString,req=false", | |
824 | 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)') | |
adb31ebb | 825 | def registry_login(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, inbuf: Optional[str] = None) -> Tuple[int, str, str]: |
f6b5b4d7 TL |
826 | # if password not given in command line, get it through file input |
827 | if not (url and username and password) and (inbuf is None or len(inbuf) == 0): | |
828 | return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> " | |
f91f0fd5 | 829 | "or -i <login credentials json file>") |
f6b5b4d7 | 830 | elif not (url and username and password): |
adb31ebb | 831 | assert isinstance(inbuf, str) |
f6b5b4d7 TL |
832 | login_info = json.loads(inbuf) |
833 | if "url" in login_info and "username" in login_info and "password" in login_info: | |
834 | url = login_info["url"] | |
835 | username = login_info["username"] | |
836 | password = login_info["password"] | |
837 | else: | |
838 | return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. " | |
f91f0fd5 TL |
839 | "Please setup json file as\n" |
840 | "{\n" | |
841 | " \"url\": \"REGISTRY_URL\",\n" | |
842 | " \"username\": \"REGISTRY_USERNAME\",\n" | |
843 | " \"password\": \"REGISTRY_PASSWORD\"\n" | |
844 | "}\n") | |
f6b5b4d7 TL |
845 | # verify login info works by attempting login on random host |
846 | host = None | |
847 | for host_name in self.inventory.keys(): | |
848 | host = host_name | |
849 | break | |
850 | if not host: | |
851 | raise OrchestratorError('no hosts defined') | |
852 | r = self._registry_login(host, url, username, password) | |
853 | if r is not None: | |
854 | return 1, '', r | |
855 | # if logins succeeded, store info | |
856 | self.log.debug("Host logins successful. Storing login info.") | |
857 | self.set_module_option('registry_url', url) | |
858 | self.set_module_option('registry_username', username) | |
859 | self.set_module_option('registry_password', password) | |
860 | # distribute new login info to all hosts | |
861 | self.cache.distribute_new_registry_login_info() | |
862 | return 0, "registry login scheduled", '' | |
863 | ||
9f95a23c TL |
864 | @orchestrator._cli_read_command( |
865 | 'cephadm check-host', | |
866 | 'name=host,type=CephString ' | |
867 | 'name=addr,type=CephString,req=false', | |
868 | 'Check whether we can access and manage a remote host') | |
adb31ebb | 869 | def check_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: |
f6b5b4d7 TL |
870 | try: |
871 | out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host', | |
872 | ['--expect-hostname', host], | |
873 | addr=addr, | |
874 | error_ok=True, no_fsid=True) | |
875 | if code: | |
876 | return 1, '', ('check-host failed:\n' + '\n'.join(err)) | |
877 | except OrchestratorError as e: | |
878 | self.log.exception(f"check-host failed for '{host}'") | |
879 | return 1, '', ('check-host failed:\n' + | |
f91f0fd5 | 880 | f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.") |
9f95a23c TL |
881 | # if we have an outstanding health alert for this host, give the |
882 | # serve thread a kick | |
883 | if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: | |
884 | for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: | |
885 | if item.startswith('host %s ' % host): | |
886 | self.event.set() | |
adb31ebb | 887 | return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) |
9f95a23c TL |
888 | |
889 | @orchestrator._cli_read_command( | |
890 | 'cephadm prepare-host', | |
891 | 'name=host,type=CephString ' | |
892 | 'name=addr,type=CephString,req=false', | |
893 | 'Prepare a remote host for use with cephadm') | |
adb31ebb | 894 | def _prepare_host(self, host: str, addr: Optional[str] = None) -> Tuple[int, str, str]: |
f6b5b4d7 | 895 | out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host', |
9f95a23c TL |
896 | ['--expect-hostname', host], |
897 | addr=addr, | |
898 | error_ok=True, no_fsid=True) | |
899 | if code: | |
900 | return 1, '', ('prepare-host failed:\n' + '\n'.join(err)) | |
901 | # if we have an outstanding health alert for this host, give the | |
902 | # serve thread a kick | |
903 | if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: | |
904 | for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: | |
905 | if item.startswith('host %s ' % host): | |
906 | self.event.set() | |
adb31ebb | 907 | return 0, '%s (%s) ok' % (host, addr), '\n'.join(err) |
9f95a23c | 908 | |
f91f0fd5 TL |
909 | @orchestrator._cli_write_command( |
910 | prefix='cephadm set-extra-ceph-conf', | |
911 | desc="Text that is appended to all daemon's ceph.conf.\n" | |
912 | "Mainly a workaround, till `config generate-minimal-conf` generates\n" | |
913 | "a complete ceph.conf.\n\n" | |
914 | "Warning: this is a dangerous operation.") | |
adb31ebb | 915 | def _set_extra_ceph_conf(self, inbuf: Optional[str] = None) -> HandleCommandResult: |
f91f0fd5 TL |
916 | if inbuf: |
917 | # sanity check. | |
918 | cp = ConfigParser() | |
919 | cp.read_string(inbuf, source='<infile>') | |
920 | ||
921 | self.set_store("extra_ceph_conf", json.dumps({ | |
922 | 'conf': inbuf, | |
adb31ebb | 923 | 'last_modified': datetime_to_str(datetime_now()) |
f91f0fd5 TL |
924 | })) |
925 | self.log.info('Set extra_ceph_conf') | |
926 | self._kick_serve_loop() | |
927 | return HandleCommandResult() | |
928 | ||
929 | @orchestrator._cli_read_command( | |
930 | 'cephadm get-extra-ceph-conf', | |
931 | desc='Get extra ceph conf that is appended') | |
932 | def _get_extra_ceph_conf(self) -> HandleCommandResult: | |
933 | return HandleCommandResult(stdout=self.extra_ceph_conf().conf) | |
934 | ||
935 | class ExtraCephConf(NamedTuple): | |
936 | conf: str | |
937 | last_modified: Optional[datetime.datetime] | |
938 | ||
939 | def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf': | |
940 | data = self.get_store('extra_ceph_conf') | |
941 | if not data: | |
942 | return CephadmOrchestrator.ExtraCephConf('', None) | |
943 | try: | |
944 | j = json.loads(data) | |
945 | except ValueError: | |
adb31ebb TL |
946 | msg = 'Unable to load extra_ceph_conf: Cannot decode JSON' |
947 | self.log.exception('%s: \'%s\'', msg, data) | |
f91f0fd5 TL |
948 | return CephadmOrchestrator.ExtraCephConf('', None) |
949 | return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified'])) | |
950 | ||
951 | def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool: | |
952 | conf = self.extra_ceph_conf() | |
953 | if not conf.last_modified: | |
954 | return False | |
955 | return conf.last_modified > dt | |
956 | ||
adb31ebb TL |
957 | def _get_connection(self, host: str) -> Tuple['remoto.backends.BaseConnection', |
958 | 'remoto.backends.LegacyModuleExecute']: | |
9f95a23c TL |
959 | """ |
960 | Setup a connection for running commands on remote host. | |
961 | """ | |
e306af50 TL |
962 | conn, r = self._cons.get(host, (None, None)) |
963 | if conn: | |
964 | if conn.has_connection(): | |
965 | self.log.debug('Have connection to %s' % host) | |
966 | return conn, r | |
967 | else: | |
968 | self._reset_con(host) | |
9f95a23c TL |
969 | n = self.ssh_user + '@' + host |
970 | self.log.debug("Opening connection to {} with ssh options '{}'".format( | |
971 | n, self._ssh_options)) | |
f91f0fd5 | 972 | child_logger = self.log.getChild(n) |
9f95a23c TL |
973 | child_logger.setLevel('WARNING') |
974 | conn = remoto.Connection( | |
975 | n, | |
976 | logger=child_logger, | |
f6b5b4d7 TL |
977 | ssh_options=self._ssh_options, |
978 | sudo=True if self.ssh_user != 'root' else False) | |
9f95a23c TL |
979 | |
980 | r = conn.import_module(remotes) | |
981 | self._cons[host] = conn, r | |
982 | ||
983 | return conn, r | |
984 | ||
adb31ebb | 985 | def _executable_path(self, conn: 'remoto.backends.BaseConnection', executable: str) -> str: |
9f95a23c TL |
986 | """ |
987 | Remote validator that accepts a connection object to ensure that a certain | |
988 | executable is available returning its full path if so. | |
989 | ||
990 | Otherwise an exception with thorough details will be raised, informing the | |
991 | user that the executable was not found. | |
992 | """ | |
993 | executable_path = conn.remote_module.which(executable) | |
994 | if not executable_path: | |
995 | raise RuntimeError("Executable '{}' not found on host '{}'".format( | |
996 | executable, conn.hostname)) | |
997 | self.log.debug("Found executable '{}' at path '{}'".format(executable, | |
f91f0fd5 | 998 | executable_path)) |
9f95a23c TL |
999 | return executable_path |
1000 | ||
f6b5b4d7 TL |
1001 | @contextmanager |
1002 | def _remote_connection(self, | |
1003 | host: str, | |
f91f0fd5 | 1004 | addr: Optional[str] = None, |
f6b5b4d7 | 1005 | ) -> Iterator[Tuple["BaseConnection", Any]]: |
9f95a23c | 1006 | if not addr and host in self.inventory: |
e306af50 | 1007 | addr = self.inventory.get_addr(host) |
9f95a23c | 1008 | |
1911f103 TL |
1009 | self.offline_hosts_remove(host) |
1010 | ||
9f95a23c | 1011 | try: |
1911f103 | 1012 | try: |
f6b5b4d7 TL |
1013 | if not addr: |
1014 | raise OrchestratorError("host address is empty") | |
1911f103 | 1015 | conn, connr = self._get_connection(addr) |
e306af50 | 1016 | except OSError as e: |
f6b5b4d7 TL |
1017 | self._reset_con(host) |
1018 | msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}" | |
1019 | raise execnet.gateway_bootstrap.HostNotFound(msg) | |
9f95a23c | 1020 | |
f6b5b4d7 TL |
1021 | yield (conn, connr) |
1022 | ||
1023 | except execnet.gateway_bootstrap.HostNotFound as e: | |
1024 | # this is a misleading exception as it seems to be thrown for | |
1025 | # any sort of connection failure, even those having nothing to | |
1026 | # do with "host not found" (e.g., ssh key permission denied). | |
1027 | self.offline_hosts.add(host) | |
1028 | self._reset_con(host) | |
1029 | ||
1030 | user = self.ssh_user if self.mode == 'root' else 'cephadm' | |
adb31ebb TL |
1031 | if str(e).startswith("Can't communicate"): |
1032 | msg = str(e) | |
1033 | else: | |
1034 | msg = f'''Failed to connect to {host} ({addr}). | |
f91f0fd5 TL |
1035 | Please make sure that the host is reachable and accepts connections using the cephadm SSH key |
1036 | ||
1037 | To add the cephadm SSH key to the host: | |
1038 | > ceph cephadm get-pub-key > ~/ceph.pub | |
1039 | > ssh-copy-id -f -i ~/ceph.pub {user}@{host} | |
f6b5b4d7 | 1040 | |
f91f0fd5 | 1041 | To check that the host is reachable: |
f6b5b4d7 | 1042 | > ceph cephadm get-ssh-config > ssh_config |
f91f0fd5 TL |
1043 | > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key |
1044 | > ssh -F ssh_config -i ~/cephadm_private_key {user}@{host}''' | |
f6b5b4d7 TL |
1045 | raise OrchestratorError(msg) from e |
1046 | except Exception as ex: | |
1047 | self.log.exception(ex) | |
1048 | raise | |
1049 | ||
f91f0fd5 | 1050 | def _get_container_image(self, daemon_name: str) -> Optional[str]: |
f6b5b4d7 TL |
1051 | daemon_type = daemon_name.split('.', 1)[0] # type: ignore |
1052 | if daemon_type in CEPH_TYPES or \ | |
1053 | daemon_type == 'nfs' or \ | |
1054 | daemon_type == 'iscsi': | |
1055 | # get container image | |
1056 | ret, image, err = self.check_mon_command({ | |
1057 | 'prefix': 'config get', | |
1058 | 'who': utils.name_to_config_section(daemon_name), | |
1059 | 'key': 'container_image', | |
1060 | }) | |
1061 | image = image.strip() # type: ignore | |
1062 | elif daemon_type == 'prometheus': | |
1063 | image = self.container_image_prometheus | |
1064 | elif daemon_type == 'grafana': | |
1065 | image = self.container_image_grafana | |
1066 | elif daemon_type == 'alertmanager': | |
1067 | image = self.container_image_alertmanager | |
1068 | elif daemon_type == 'node-exporter': | |
1069 | image = self.container_image_node_exporter | |
f91f0fd5 TL |
1070 | elif daemon_type == CustomContainerService.TYPE: |
1071 | # The image can't be resolved, the necessary information | |
1072 | # is only available when a container is deployed (given | |
1073 | # via spec). | |
1074 | image = None | |
f6b5b4d7 TL |
1075 | else: |
1076 | assert False, daemon_type | |
1077 | ||
1078 | self.log.debug('%s container image %s' % (daemon_name, image)) | |
1079 | ||
1080 | return image | |
1081 | ||
1082 | def _run_cephadm(self, | |
1083 | host: str, | |
1084 | entity: Union[CephadmNoImage, str], | |
1085 | command: str, | |
1086 | args: List[str], | |
1087 | addr: Optional[str] = "", | |
1088 | stdin: Optional[str] = "", | |
1089 | no_fsid: Optional[bool] = False, | |
1090 | error_ok: Optional[bool] = False, | |
1091 | image: Optional[str] = "", | |
f91f0fd5 | 1092 | env_vars: Optional[List[str]] = None, |
f6b5b4d7 TL |
1093 | ) -> Tuple[List[str], List[str], int]: |
1094 | """ | |
1095 | Run cephadm on the remote host with the given command + args | |
1096 | ||
1097 | :env_vars: in format -> [KEY=VALUE, ..] | |
1098 | """ | |
1099 | with self._remote_connection(host, addr) as tpl: | |
1100 | conn, connr = tpl | |
9f95a23c | 1101 | assert image or entity |
f6b5b4d7 TL |
1102 | if not image and entity is not cephadmNoImage: |
1103 | image = self._get_container_image(entity) | |
9f95a23c TL |
1104 | |
1105 | final_args = [] | |
e306af50 TL |
1106 | |
1107 | if env_vars: | |
1108 | for env_var_pair in env_vars: | |
1109 | final_args.extend(['--env', env_var_pair]) | |
1110 | ||
9f95a23c TL |
1111 | if image: |
1112 | final_args.extend(['--image', image]) | |
1113 | final_args.append(command) | |
1114 | ||
1115 | if not no_fsid: | |
1116 | final_args += ['--fsid', self._cluster_fsid] | |
f91f0fd5 TL |
1117 | |
1118 | if self.container_init: | |
1119 | final_args += ['--container-init'] | |
1120 | ||
9f95a23c TL |
1121 | final_args += args |
1122 | ||
e306af50 | 1123 | self.log.debug('args: %s' % (' '.join(final_args))) |
9f95a23c | 1124 | if self.mode == 'root': |
9f95a23c TL |
1125 | if stdin: |
1126 | self.log.debug('stdin: %s' % stdin) | |
1127 | script = 'injected_argv = ' + json.dumps(final_args) + '\n' | |
1128 | if stdin: | |
1129 | script += 'injected_stdin = ' + json.dumps(stdin) + '\n' | |
1130 | script += self._cephadm | |
1131 | python = connr.choose_python() | |
1132 | if not python: | |
1133 | raise RuntimeError( | |
1134 | 'unable to find python on %s (tried %s in %s)' % ( | |
1135 | host, remotes.PYTHONS, remotes.PATH)) | |
1136 | try: | |
1137 | out, err, code = remoto.process.check( | |
f91f0fd5 TL |
1138 | conn, |
1139 | [python, '-u'], | |
1140 | stdin=script.encode('utf-8')) | |
9f95a23c TL |
1141 | except RuntimeError as e: |
1142 | self._reset_con(host) | |
1143 | if error_ok: | |
1144 | return [], [str(e)], 1 | |
1145 | raise | |
1146 | elif self.mode == 'cephadm-package': | |
1147 | try: | |
1148 | out, err, code = remoto.process.check( | |
1149 | conn, | |
1150 | ['sudo', '/usr/bin/cephadm'] + final_args, | |
1151 | stdin=stdin) | |
1152 | except RuntimeError as e: | |
1153 | self._reset_con(host) | |
1154 | if error_ok: | |
1155 | return [], [str(e)], 1 | |
1156 | raise | |
1157 | else: | |
1158 | assert False, 'unsupported mode' | |
1159 | ||
1160 | self.log.debug('code: %d' % code) | |
1161 | if out: | |
1162 | self.log.debug('out: %s' % '\n'.join(out)) | |
1163 | if err: | |
1164 | self.log.debug('err: %s' % '\n'.join(err)) | |
1165 | if code and not error_ok: | |
f6b5b4d7 | 1166 | raise OrchestratorError( |
9f95a23c TL |
1167 | 'cephadm exited with an error code: %d, stderr:%s' % ( |
1168 | code, '\n'.join(err))) | |
1169 | return out, err, code | |
1170 | ||
f91f0fd5 TL |
1171 | def _hosts_with_daemon_inventory(self) -> List[HostSpec]: |
1172 | """ | |
1173 | Returns all hosts that went through _refresh_host_daemons(). | |
9f95a23c | 1174 | |
f91f0fd5 TL |
1175 | This mitigates a potential race, where new host was added *after* |
1176 | ``_refresh_host_daemons()`` was called, but *before* | |
1177 | ``_apply_all_specs()`` was called. thus we end up with a hosts | |
1178 | where daemons might be running, but we have not yet detected them. | |
1179 | """ | |
1180 | return [ | |
1181 | h for h in self.inventory.all_specs() | |
1182 | if self.cache.host_had_daemon_refresh(h.hostname) | |
1183 | ] | |
9f95a23c | 1184 | |
e306af50 | 1185 | def _add_host(self, spec): |
9f95a23c TL |
1186 | # type: (HostSpec) -> str |
1187 | """ | |
1188 | Add a host to be managed by the orchestrator. | |
1189 | ||
1190 | :param host: host name | |
1191 | """ | |
1192 | assert_valid_host(spec.hostname) | |
f6b5b4d7 | 1193 | out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host', |
9f95a23c TL |
1194 | ['--expect-hostname', spec.hostname], |
1195 | addr=spec.addr, | |
1196 | error_ok=True, no_fsid=True) | |
1197 | if code: | |
1198 | raise OrchestratorError('New host %s (%s) failed check: %s' % ( | |
1199 | spec.hostname, spec.addr, err)) | |
1200 | ||
e306af50 | 1201 | self.inventory.add_host(spec) |
9f95a23c | 1202 | self.cache.prime_empty_host(spec.hostname) |
1911f103 | 1203 | self.offline_hosts_remove(spec.hostname) |
9f95a23c TL |
1204 | self.event.set() # refresh stray health check |
1205 | self.log.info('Added host %s' % spec.hostname) | |
1206 | return "Added host '{}'".format(spec.hostname) | |
1207 | ||
e306af50 TL |
1208 | @trivial_completion |
1209 | def add_host(self, spec: HostSpec) -> str: | |
1210 | return self._add_host(spec) | |
1211 | ||
1212 | @trivial_completion | |
9f95a23c TL |
1213 | def remove_host(self, host): |
1214 | # type: (str) -> str | |
1215 | """ | |
1216 | Remove a host from orchestrator management. | |
1217 | ||
1218 | :param host: host name | |
1219 | """ | |
e306af50 | 1220 | self.inventory.rm_host(host) |
9f95a23c TL |
1221 | self.cache.rm_host(host) |
1222 | self._reset_con(host) | |
1223 | self.event.set() # refresh stray health check | |
1224 | self.log.info('Removed host %s' % host) | |
1225 | return "Removed host '{}'".format(host) | |
1226 | ||
e306af50 | 1227 | @trivial_completion |
adb31ebb | 1228 | def update_host_addr(self, host: str, addr: str) -> str: |
e306af50 | 1229 | self.inventory.set_addr(host, addr) |
9f95a23c TL |
1230 | self._reset_con(host) |
1231 | self.event.set() # refresh stray health check | |
1232 | self.log.info('Set host %s addr to %s' % (host, addr)) | |
1233 | return "Updated host '{}' addr to '{}'".format(host, addr) | |
1234 | ||
1235 | @trivial_completion | |
1236 | def get_hosts(self): | |
1237 | # type: () -> List[orchestrator.HostSpec] | |
1238 | """ | |
1239 | Return a list of hosts managed by the orchestrator. | |
1240 | ||
1241 | Notes: | |
1242 | - skip async: manager reads from cache. | |
1243 | """ | |
e306af50 | 1244 | return list(self.inventory.all_specs()) |
9f95a23c | 1245 | |
e306af50 | 1246 | @trivial_completion |
adb31ebb | 1247 | def add_host_label(self, host: str, label: str) -> str: |
e306af50 | 1248 | self.inventory.add_label(host, label) |
9f95a23c TL |
1249 | self.log.info('Added label %s to host %s' % (label, host)) |
1250 | return 'Added label %s to host %s' % (label, host) | |
1251 | ||
e306af50 | 1252 | @trivial_completion |
adb31ebb | 1253 | def remove_host_label(self, host: str, label: str) -> str: |
e306af50 | 1254 | self.inventory.rm_label(host, label) |
9f95a23c TL |
1255 | self.log.info('Removed label %s to host %s' % (label, host)) |
1256 | return 'Removed label %s from host %s' % (label, host) | |
1257 | ||
f6b5b4d7 | 1258 | @trivial_completion |
adb31ebb | 1259 | def host_ok_to_stop(self, hostname: str) -> str: |
f6b5b4d7 TL |
1260 | if hostname not in self.cache.get_hosts(): |
1261 | raise OrchestratorError(f'Cannot find host "{hostname}"') | |
1262 | ||
1263 | daemons = self.cache.get_daemons() | |
1264 | daemon_map = defaultdict(lambda: []) | |
1265 | for dd in daemons: | |
1266 | if dd.hostname == hostname: | |
1267 | daemon_map[dd.daemon_type].append(dd.daemon_id) | |
1268 | ||
f91f0fd5 | 1269 | for daemon_type, daemon_ids in daemon_map.items(): |
f6b5b4d7 TL |
1270 | r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids) |
1271 | if r.retval: | |
1272 | self.log.error(f'It is NOT safe to stop host {hostname}') | |
1273 | raise orchestrator.OrchestratorError( | |
f91f0fd5 TL |
1274 | r.stderr, |
1275 | errno=r.retval) | |
f6b5b4d7 TL |
1276 | |
1277 | msg = f'It is presumed safe to stop host {hostname}' | |
1278 | self.log.info(msg) | |
1279 | return msg | |
1280 | ||
f91f0fd5 TL |
1281 | def get_minimal_ceph_conf(self) -> str: |
1282 | _, config, _ = self.check_mon_command({ | |
f6b5b4d7 TL |
1283 | "prefix": "config generate-minimal-conf", |
1284 | }) | |
f91f0fd5 TL |
1285 | extra = self.extra_ceph_conf().conf |
1286 | if extra: | |
1287 | config += '\n\n' + extra.strip() + '\n' | |
1288 | return config | |
f6b5b4d7 | 1289 | |
adb31ebb | 1290 | def _invalidate_daemons_and_kick_serve(self, filter_host: Optional[str] = None) -> None: |
f6b5b4d7 TL |
1291 | if filter_host: |
1292 | self.cache.invalidate_host_daemons(filter_host) | |
1293 | else: | |
1294 | for h in self.cache.get_hosts(): | |
1295 | # Also discover daemons deployed manually | |
1296 | self.cache.invalidate_host_daemons(h) | |
1297 | ||
1298 | self._kick_serve_loop() | |
1299 | ||
9f95a23c | 1300 | @trivial_completion |
f6b5b4d7 TL |
1301 | def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, |
1302 | refresh: bool = False) -> List[orchestrator.ServiceDescription]: | |
9f95a23c | 1303 | if refresh: |
f6b5b4d7 TL |
1304 | self._invalidate_daemons_and_kick_serve() |
1305 | self.log.info('Kicked serve() loop to refresh all services') | |
1306 | ||
9f95a23c | 1307 | # <service_map> |
f6b5b4d7 | 1308 | sm: Dict[str, orchestrator.ServiceDescription] = {} |
e306af50 | 1309 | osd_count = 0 |
1911f103 | 1310 | for h, dm in self.cache.get_daemons_with_volatile_status(): |
9f95a23c TL |
1311 | for name, dd in dm.items(): |
1312 | if service_type and service_type != dd.daemon_type: | |
1313 | continue | |
1314 | n: str = dd.service_name() | |
1315 | if service_name and service_name != n: | |
1316 | continue | |
1317 | if dd.daemon_type == 'osd': | |
e306af50 TL |
1318 | """ |
1319 | OSDs do not know the affinity to their spec out of the box. | |
1320 | """ | |
1321 | n = f"osd.{dd.osdspec_affinity}" | |
f6b5b4d7 TL |
1322 | if not dd.osdspec_affinity: |
1323 | # If there is no osdspec_affinity, the spec should suffice for displaying | |
1324 | continue | |
e306af50 TL |
1325 | if n in self.spec_store.specs: |
1326 | spec = self.spec_store.specs[n] | |
1911f103 TL |
1327 | else: |
1328 | spec = ServiceSpec( | |
1329 | unmanaged=True, | |
1330 | service_type=dd.daemon_type, | |
1331 | service_id=dd.service_id(), | |
1332 | placement=PlacementSpec( | |
1333 | hosts=[dd.hostname] | |
1334 | ) | |
1335 | ) | |
9f95a23c TL |
1336 | if n not in sm: |
1337 | sm[n] = orchestrator.ServiceDescription( | |
9f95a23c TL |
1338 | last_refresh=dd.last_refresh, |
1339 | container_image_id=dd.container_image_id, | |
1340 | container_image_name=dd.container_image_name, | |
1341 | spec=spec, | |
f6b5b4d7 | 1342 | events=self.events.get_for_service(spec.service_name()), |
9f95a23c | 1343 | ) |
e306af50 TL |
1344 | if n in self.spec_store.specs: |
1345 | if dd.daemon_type == 'osd': | |
1346 | """ | |
1347 | The osd count can't be determined by the Placement spec. | |
f6b5b4d7 | 1348 | Showing an actual/expected representation cannot be determined |
e306af50 TL |
1349 | here. So we're setting running = size for now. |
1350 | """ | |
1351 | osd_count += 1 | |
1352 | sm[n].size = osd_count | |
1353 | else: | |
f6b5b4d7 TL |
1354 | sm[n].size = spec.placement.get_host_selection_size( |
1355 | self.inventory.all_specs()) | |
e306af50 TL |
1356 | |
1357 | sm[n].created = self.spec_store.spec_created[n] | |
1911f103 TL |
1358 | if service_type == 'nfs': |
1359 | spec = cast(NFSServiceSpec, spec) | |
1360 | sm[n].rados_config_location = spec.rados_config_location() | |
9f95a23c TL |
1361 | else: |
1362 | sm[n].size = 0 | |
1363 | if dd.status == 1: | |
1364 | sm[n].running += 1 | |
1365 | if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore | |
1366 | sm[n].last_refresh = dd.last_refresh | |
1367 | if sm[n].container_image_id != dd.container_image_id: | |
1368 | sm[n].container_image_id = 'mix' | |
1369 | if sm[n].container_image_name != dd.container_image_name: | |
1370 | sm[n].container_image_name = 'mix' | |
1371 | for n, spec in self.spec_store.specs.items(): | |
1372 | if n in sm: | |
1373 | continue | |
1374 | if service_type is not None and service_type != spec.service_type: | |
1375 | continue | |
1376 | if service_name is not None and service_name != n: | |
1377 | continue | |
1378 | sm[n] = orchestrator.ServiceDescription( | |
9f95a23c | 1379 | spec=spec, |
f6b5b4d7 | 1380 | size=spec.placement.get_host_selection_size(self.inventory.all_specs()), |
9f95a23c | 1381 | running=0, |
f6b5b4d7 | 1382 | events=self.events.get_for_service(spec.service_name()), |
9f95a23c | 1383 | ) |
1911f103 TL |
1384 | if service_type == 'nfs': |
1385 | spec = cast(NFSServiceSpec, spec) | |
1386 | sm[n].rados_config_location = spec.rados_config_location() | |
1387 | return list(sm.values()) | |
9f95a23c TL |
1388 | |
1389 | @trivial_completion | |
f6b5b4d7 TL |
1390 | def list_daemons(self, |
1391 | service_name: Optional[str] = None, | |
1392 | daemon_type: Optional[str] = None, | |
1393 | daemon_id: Optional[str] = None, | |
1394 | host: Optional[str] = None, | |
1395 | refresh: bool = False) -> List[orchestrator.DaemonDescription]: | |
9f95a23c | 1396 | if refresh: |
f6b5b4d7 TL |
1397 | self._invalidate_daemons_and_kick_serve(host) |
1398 | self.log.info('Kicked serve() loop to refresh all daemons') | |
1399 | ||
9f95a23c | 1400 | result = [] |
1911f103 | 1401 | for h, dm in self.cache.get_daemons_with_volatile_status(): |
9f95a23c TL |
1402 | if host and h != host: |
1403 | continue | |
1404 | for name, dd in dm.items(): | |
801d1391 TL |
1405 | if daemon_type is not None and daemon_type != dd.daemon_type: |
1406 | continue | |
1407 | if daemon_id is not None and daemon_id != dd.daemon_id: | |
9f95a23c | 1408 | continue |
801d1391 | 1409 | if service_name is not None and service_name != dd.service_name(): |
9f95a23c TL |
1410 | continue |
1411 | result.append(dd) | |
1412 | return result | |
1413 | ||
e306af50 | 1414 | @trivial_completion |
adb31ebb TL |
1415 | def service_action(self, action: str, service_name: str) -> List[str]: |
1416 | dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name) | |
9f95a23c | 1417 | self.log.info('%s service %s' % (action.capitalize(), service_name)) |
adb31ebb TL |
1418 | return [ |
1419 | self._schedule_daemon_action(dd.name(), action) | |
1420 | for dd in dds | |
1421 | ] | |
f6b5b4d7 | 1422 | |
adb31ebb | 1423 | def _daemon_action(self, daemon_type: str, daemon_id: str, host: str, action: str, image: Optional[str] = None) -> str: |
f6b5b4d7 TL |
1424 | daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec( |
1425 | host=host, | |
1426 | daemon_id=daemon_id, | |
1427 | daemon_type=daemon_type, | |
1428 | ) | |
1429 | ||
f91f0fd5 | 1430 | self._daemon_action_set_image(action, image, daemon_type, daemon_id) |
9f95a23c | 1431 | |
9f95a23c | 1432 | if action == 'redeploy': |
f91f0fd5 TL |
1433 | if self.daemon_is_self(daemon_type, daemon_id): |
1434 | self.mgr_service.fail_over() | |
1435 | return '' # unreachable | |
9f95a23c | 1436 | # stop, recreate the container+unit, then restart |
f6b5b4d7 | 1437 | return self._create_daemon(daemon_spec) |
9f95a23c | 1438 | elif action == 'reconfig': |
f6b5b4d7 | 1439 | return self._create_daemon(daemon_spec, reconfig=True) |
9f95a23c TL |
1440 | |
1441 | actions = { | |
1442 | 'start': ['reset-failed', 'start'], | |
1443 | 'stop': ['stop'], | |
1444 | 'restart': ['reset-failed', 'restart'], | |
1445 | } | |
f6b5b4d7 | 1446 | name = daemon_spec.name() |
9f95a23c | 1447 | for a in actions[action]: |
f6b5b4d7 TL |
1448 | try: |
1449 | out, err, code = self._run_cephadm( | |
1450 | host, name, 'unit', | |
1451 | ['--name', name, a]) | |
1452 | except Exception: | |
1453 | self.log.exception(f'`{host}: cephadm unit {name} {a}` failed') | |
1454 | self.cache.invalidate_host_daemons(daemon_spec.host) | |
1455 | msg = "{} {} from host '{}'".format(action, name, daemon_spec.host) | |
1456 | self.events.for_daemon(name, 'INFO', msg) | |
1457 | return msg | |
9f95a23c | 1458 | |
adb31ebb | 1459 | def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str) -> None: |
f91f0fd5 TL |
1460 | if image is not None: |
1461 | if action != 'redeploy': | |
1462 | raise OrchestratorError( | |
1463 | f'Cannot execute {action} with new image. `action` needs to be `redeploy`') | |
1464 | if daemon_type not in CEPH_TYPES: | |
1465 | raise OrchestratorError( | |
1466 | f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported ' | |
1467 | f'types are: {", ".join(CEPH_TYPES)}') | |
1468 | ||
1469 | self.check_mon_command({ | |
1470 | 'prefix': 'config set', | |
1471 | 'name': 'container_image', | |
1472 | 'value': image, | |
1473 | 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id), | |
1474 | }) | |
1475 | ||
e306af50 | 1476 | @trivial_completion |
f91f0fd5 | 1477 | def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str: |
f6b5b4d7 TL |
1478 | d = self.cache.get_daemon(daemon_name) |
1479 | ||
f91f0fd5 TL |
1480 | if action == 'redeploy' and self.daemon_is_self(d.daemon_type, d.daemon_id) \ |
1481 | and not self.mgr_service.mgr_map_has_standby(): | |
1482 | raise OrchestratorError( | |
1483 | f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') | |
1484 | ||
1485 | self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id) | |
1486 | ||
1487 | self.log.info(f'Schedule {action} daemon {daemon_name}') | |
1488 | return self._schedule_daemon_action(daemon_name, action) | |
1489 | ||
1490 | def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool: | |
1491 | return daemon_type == 'mgr' and daemon_id == self.get_mgr_id() | |
1492 | ||
adb31ebb | 1493 | def _schedule_daemon_action(self, daemon_name: str, action: str) -> str: |
f91f0fd5 TL |
1494 | dd = self.cache.get_daemon(daemon_name) |
1495 | if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \ | |
1496 | and not self.mgr_service.mgr_map_has_standby(): | |
1497 | raise OrchestratorError( | |
1498 | f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') | |
1499 | self.cache.schedule_daemon_action(dd.hostname, dd.name(), action) | |
1500 | msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname) | |
1501 | self._kick_serve_loop() | |
1502 | return msg | |
9f95a23c | 1503 | |
e306af50 | 1504 | @trivial_completion |
9f95a23c | 1505 | def remove_daemons(self, names): |
e306af50 | 1506 | # type: (List[str]) -> List[str] |
9f95a23c TL |
1507 | args = [] |
1508 | for host, dm in self.cache.daemons.items(): | |
1509 | for name in names: | |
1510 | if name in dm: | |
1511 | args.append((name, host)) | |
1512 | if not args: | |
1513 | raise OrchestratorError('Unable to find daemon(s) %s' % (names)) | |
1514 | self.log.info('Remove daemons %s' % [a[0] for a in args]) | |
1515 | return self._remove_daemons(args) | |
1516 | ||
1517 | @trivial_completion | |
adb31ebb | 1518 | def remove_service(self, service_name: str) -> str: |
9f95a23c | 1519 | self.log.info('Remove service %s' % service_name) |
e306af50 | 1520 | self._trigger_preview_refresh(service_name=service_name) |
1911f103 TL |
1521 | found = self.spec_store.rm(service_name) |
1522 | if found: | |
1523 | self._kick_serve_loop() | |
f6b5b4d7 | 1524 | return 'Removed service %s' % service_name |
1911f103 TL |
1525 | else: |
1526 | # must be idempotent: still a success. | |
f6b5b4d7 | 1527 | return f'Failed to remove service. <{service_name}> was not found.' |
9f95a23c TL |
1528 | |
1529 | @trivial_completion | |
adb31ebb | 1530 | def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: |
9f95a23c TL |
1531 | """ |
1532 | Return the storage inventory of hosts matching the given filter. | |
1533 | ||
1534 | :param host_filter: host filter | |
1535 | ||
1536 | TODO: | |
1537 | - add filtering by label | |
1538 | """ | |
1539 | if refresh: | |
f6b5b4d7 TL |
1540 | if host_filter and host_filter.hosts: |
1541 | for h in host_filter.hosts: | |
1542 | self.cache.invalidate_host_devices(h) | |
9f95a23c | 1543 | else: |
f6b5b4d7 TL |
1544 | for h in self.cache.get_hosts(): |
1545 | self.cache.invalidate_host_devices(h) | |
1546 | ||
1547 | self.event.set() | |
1548 | self.log.info('Kicked serve() loop to refresh devices') | |
9f95a23c TL |
1549 | |
1550 | result = [] | |
1551 | for host, dls in self.cache.devices.items(): | |
f6b5b4d7 | 1552 | if host_filter and host_filter.hosts and host not in host_filter.hosts: |
9f95a23c TL |
1553 | continue |
1554 | result.append(orchestrator.InventoryHost(host, | |
1555 | inventory.Devices(dls))) | |
1556 | return result | |
1557 | ||
1558 | @trivial_completion | |
adb31ebb | 1559 | def zap_device(self, host: str, path: str) -> str: |
9f95a23c TL |
1560 | self.log.info('Zap device %s:%s' % (host, path)) |
1561 | out, err, code = self._run_cephadm( | |
1562 | host, 'osd', 'ceph-volume', | |
1563 | ['--', 'lvm', 'zap', '--destroy', path], | |
1564 | error_ok=True) | |
1565 | self.cache.invalidate_host_devices(host) | |
1566 | if code: | |
1567 | raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) | |
1568 | return '\n'.join(out + err) | |
1569 | ||
e306af50 | 1570 | @trivial_completion |
f91f0fd5 TL |
1571 | def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: |
1572 | """ | |
1573 | Blink a device light. Calling something like:: | |
1574 | ||
1575 | lsmcli local-disk-ident-led-on --path $path | |
1576 | ||
1577 | If you must, you can customize this via:: | |
1578 | ||
1579 | ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>' | |
1580 | ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>' | |
1581 | ||
1582 | See templates/blink_device_light_cmd.j2 | |
1583 | """ | |
e306af50 | 1584 | @forall_hosts |
adb31ebb | 1585 | def blink(host: str, dev: str, path: str) -> str: |
f91f0fd5 TL |
1586 | cmd_line = self.template.render('blink_device_light_cmd.j2', |
1587 | { | |
1588 | 'on': on, | |
1589 | 'ident_fault': ident_fault, | |
1590 | 'dev': dev, | |
1591 | 'path': path | |
1592 | }, | |
1593 | host=host) | |
1594 | cmd_args = shlex.split(cmd_line) | |
1595 | ||
9f95a23c | 1596 | out, err, code = self._run_cephadm( |
f91f0fd5 | 1597 | host, 'osd', 'shell', ['--'] + cmd_args, |
9f95a23c TL |
1598 | error_ok=True) |
1599 | if code: | |
f6b5b4d7 | 1600 | raise OrchestratorError( |
9f95a23c | 1601 | 'Unable to affect %s light for %s:%s. Command: %s' % ( |
f91f0fd5 | 1602 | ident_fault, host, dev, ' '.join(cmd_args))) |
9f95a23c TL |
1603 | self.log.info('Set %s light for %s:%s %s' % ( |
1604 | ident_fault, host, dev, 'on' if on else 'off')) | |
1605 | return "Set %s light for %s:%s %s" % ( | |
1606 | ident_fault, host, dev, 'on' if on else 'off') | |
1607 | ||
1608 | return blink(locs) | |
1609 | ||
1610 | def get_osd_uuid_map(self, only_up=False): | |
1911f103 | 1611 | # type: (bool) -> Dict[str, str] |
9f95a23c TL |
1612 | osd_map = self.get('osd_map') |
1613 | r = {} | |
1614 | for o in osd_map['osds']: | |
1615 | # only include OSDs that have ever started in this map. this way | |
1616 | # an interrupted osd create can be repeated and succeed the second | |
1617 | # time around. | |
1911f103 TL |
1618 | osd_id = o.get('osd') |
1619 | if osd_id is None: | |
1620 | raise OrchestratorError("Could not retrieve osd_id from osd_map") | |
1621 | if not only_up or (o['up_from'] > 0): | |
1622 | r[str(osd_id)] = o.get('uuid', '') | |
9f95a23c TL |
1623 | return r |
1624 | ||
e306af50 TL |
1625 | def _trigger_preview_refresh(self, |
1626 | specs: Optional[List[DriveGroupSpec]] = None, | |
f6b5b4d7 TL |
1627 | service_name: Optional[str] = None, |
1628 | ) -> None: | |
1629 | # Only trigger a refresh when a spec has changed | |
1630 | trigger_specs = [] | |
1631 | if specs: | |
1632 | for spec in specs: | |
1633 | preview_spec = self.spec_store.spec_preview.get(spec.service_name()) | |
1634 | # the to-be-preview spec != the actual spec, this means we need to | |
1635 | # trigger a refresh, if the spec has been removed (==None) we need to | |
1636 | # refresh as well. | |
1637 | if not preview_spec or spec != preview_spec: | |
1638 | trigger_specs.append(spec) | |
1639 | if service_name: | |
1640 | trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))] | |
1641 | if not any(trigger_specs): | |
1642 | return None | |
1643 | ||
1644 | refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs) | |
e306af50 TL |
1645 | for host in refresh_hosts: |
1646 | self.log.info(f"Marking host: {host} for OSDSpec preview refresh.") | |
1647 | self.cache.osdspec_previews_refresh_queue.append(host) | |
1648 | ||
9f95a23c | 1649 | @trivial_completion |
f6b5b4d7 TL |
1650 | def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]: |
1651 | """ | |
1652 | Deprecated. Please use `apply()` instead. | |
1653 | ||
1654 | Keeping this around to be compapatible to mgr/dashboard | |
1655 | """ | |
9f95a23c TL |
1656 | return [self._apply(spec) for spec in specs] |
1657 | ||
1658 | @trivial_completion | |
f6b5b4d7 TL |
1659 | def create_osds(self, drive_group: DriveGroupSpec) -> str: |
1660 | return self.osd_service.create_from_spec(drive_group) | |
9f95a23c | 1661 | |
f6b5b4d7 TL |
1662 | def _preview_osdspecs(self, |
1663 | osdspecs: Optional[List[DriveGroupSpec]] = None | |
adb31ebb | 1664 | ) -> dict: |
f6b5b4d7 TL |
1665 | if not osdspecs: |
1666 | return {'n/a': [{'error': True, | |
1667 | 'message': 'No OSDSpec or matching hosts found.'}]} | |
1668 | matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs) | |
e306af50 TL |
1669 | if not matching_hosts: |
1670 | return {'n/a': [{'error': True, | |
1671 | 'message': 'No OSDSpec or matching hosts found.'}]} | |
adb31ebb | 1672 | # Is any host still loading previews or still in the queue to be previewed |
e306af50 | 1673 | pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts} |
adb31ebb | 1674 | if pending_hosts or any(item in self.cache.osdspec_previews_refresh_queue for item in matching_hosts): |
e306af50 TL |
1675 | # Report 'pending' when any of the matching hosts is still loading previews (flag is True) |
1676 | return {'n/a': [{'error': True, | |
1677 | 'message': 'Preview data is being generated.. ' | |
f6b5b4d7 TL |
1678 | 'Please re-run this command in a bit.'}]} |
1679 | # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs | |
1680 | previews_for_specs = {} | |
1681 | for host, raw_reports in self.cache.osdspec_previews.items(): | |
1682 | if host not in matching_hosts: | |
1683 | continue | |
1684 | osd_reports = [] | |
1685 | for osd_report in raw_reports: | |
1686 | if osd_report.get('osdspec') in [x.service_id for x in osdspecs]: | |
1687 | osd_reports.append(osd_report) | |
1688 | previews_for_specs.update({host: osd_reports}) | |
1689 | return previews_for_specs | |
9f95a23c | 1690 | |
adb31ebb | 1691 | def _calc_daemon_deps(self, daemon_type: str, daemon_id: str) -> List[str]: |
9f95a23c TL |
1692 | need = { |
1693 | 'prometheus': ['mgr', 'alertmanager', 'node-exporter'], | |
1694 | 'grafana': ['prometheus'], | |
801d1391 | 1695 | 'alertmanager': ['mgr', 'alertmanager'], |
9f95a23c TL |
1696 | } |
1697 | deps = [] | |
1698 | for dep_type in need.get(daemon_type, []): | |
1699 | for dd in self.cache.get_daemons_by_service(dep_type): | |
1700 | deps.append(dd.name()) | |
1701 | return sorted(deps) | |
1702 | ||
e306af50 | 1703 | def _create_daemon(self, |
f6b5b4d7 | 1704 | daemon_spec: CephadmDaemonSpec, |
adb31ebb | 1705 | reconfig: bool = False, |
e306af50 | 1706 | osd_uuid_map: Optional[Dict[str, Any]] = None, |
e306af50 TL |
1707 | ) -> str: |
1708 | ||
f6b5b4d7 TL |
1709 | with set_exception_subject('service', orchestrator.DaemonDescription( |
1710 | daemon_type=daemon_spec.daemon_type, | |
1711 | daemon_id=daemon_spec.daemon_id, | |
1712 | hostname=daemon_spec.host, | |
1713 | ).service_id(), overwrite=True): | |
1714 | ||
f91f0fd5 | 1715 | image = '' |
adb31ebb | 1716 | start_time = datetime_now() |
f91f0fd5 TL |
1717 | ports: List[int] = daemon_spec.ports if daemon_spec.ports else [] |
1718 | ||
1719 | if daemon_spec.daemon_type == 'container': | |
1720 | spec: Optional[CustomContainerSpec] = daemon_spec.spec | |
1721 | if spec is None: | |
1722 | # Exit here immediately because the required service | |
1723 | # spec to create a daemon is not provided. This is only | |
1724 | # provided when a service is applied via 'orch apply' | |
1725 | # command. | |
1726 | msg = "Failed to {} daemon {} on {}: Required " \ | |
1727 | "service specification not provided".format( | |
1728 | 'reconfigure' if reconfig else 'deploy', | |
1729 | daemon_spec.name(), daemon_spec.host) | |
1730 | self.log.info(msg) | |
1731 | return msg | |
1732 | image = spec.image | |
1733 | if spec.ports: | |
1734 | ports.extend(spec.ports) | |
1735 | ||
1736 | cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config( | |
1737 | daemon_spec) | |
f6b5b4d7 TL |
1738 | |
1739 | # TCP port to open in the host firewall | |
f91f0fd5 TL |
1740 | if len(ports) > 0: |
1741 | daemon_spec.extra_args.extend([ | |
1742 | '--tcp-ports', ' '.join(map(str, ports)) | |
1743 | ]) | |
9f95a23c TL |
1744 | |
1745 | # osd deployments needs an --osd-uuid arg | |
f6b5b4d7 | 1746 | if daemon_spec.daemon_type == 'osd': |
9f95a23c TL |
1747 | if not osd_uuid_map: |
1748 | osd_uuid_map = self.get_osd_uuid_map() | |
f6b5b4d7 | 1749 | osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) |
9f95a23c | 1750 | if not osd_uuid: |
f6b5b4d7 TL |
1751 | raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) |
1752 | daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) | |
9f95a23c | 1753 | |
f6b5b4d7 TL |
1754 | if reconfig: |
1755 | daemon_spec.extra_args.append('--reconfig') | |
1756 | if self.allow_ptrace: | |
1757 | daemon_spec.extra_args.append('--allow-ptrace') | |
9f95a23c | 1758 | |
f6b5b4d7 | 1759 | if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url: |
f91f0fd5 TL |
1760 | self._registry_login(daemon_spec.host, self.registry_url, |
1761 | self.registry_username, self.registry_password) | |
1762 | ||
1763 | daemon_spec.extra_args.extend(['--config-json', '-']) | |
9f95a23c | 1764 | |
f6b5b4d7 TL |
1765 | self.log.info('%s daemon %s on %s' % ( |
1766 | 'Reconfiguring' if reconfig else 'Deploying', | |
1767 | daemon_spec.name(), daemon_spec.host)) | |
1768 | ||
1769 | out, err, code = self._run_cephadm( | |
1770 | daemon_spec.host, daemon_spec.name(), 'deploy', | |
1771 | [ | |
1772 | '--name', daemon_spec.name(), | |
1773 | ] + daemon_spec.extra_args, | |
f91f0fd5 TL |
1774 | stdin=json.dumps(cephadm_config), |
1775 | image=image) | |
f6b5b4d7 TL |
1776 | if not code and daemon_spec.host in self.cache.daemons: |
1777 | # prime cached service state with what we (should have) | |
1778 | # just created | |
1779 | sd = orchestrator.DaemonDescription() | |
1780 | sd.daemon_type = daemon_spec.daemon_type | |
1781 | sd.daemon_id = daemon_spec.daemon_id | |
1782 | sd.hostname = daemon_spec.host | |
1783 | sd.status = 1 | |
1784 | sd.status_desc = 'starting' | |
1785 | self.cache.add_daemon(daemon_spec.host, sd) | |
f91f0fd5 | 1786 | if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']: |
f6b5b4d7 TL |
1787 | self.requires_post_actions.add(daemon_spec.daemon_type) |
1788 | self.cache.invalidate_host_daemons(daemon_spec.host) | |
f91f0fd5 TL |
1789 | self.cache.update_daemon_config_deps( |
1790 | daemon_spec.host, daemon_spec.name(), deps, start_time) | |
f6b5b4d7 TL |
1791 | self.cache.save_host(daemon_spec.host) |
1792 | msg = "{} {} on host '{}'".format( | |
1793 | 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) | |
1794 | if not code: | |
1795 | self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) | |
1796 | else: | |
1797 | what = 'reconfigure' if reconfig else 'deploy' | |
f91f0fd5 TL |
1798 | self.events.for_daemon( |
1799 | daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') | |
f6b5b4d7 | 1800 | return msg |
9f95a23c | 1801 | |
e306af50 | 1802 | @forall_hosts |
adb31ebb | 1803 | def _remove_daemons(self, name: str, host: str) -> str: |
9f95a23c TL |
1804 | return self._remove_daemon(name, host) |
1805 | ||
adb31ebb | 1806 | def _remove_daemon(self, name: str, host: str) -> str: |
9f95a23c TL |
1807 | """ |
1808 | Remove a daemon | |
1809 | """ | |
1810 | (daemon_type, daemon_id) = name.split('.', 1) | |
f91f0fd5 TL |
1811 | daemon = orchestrator.DaemonDescription( |
1812 | daemon_type=daemon_type, | |
1813 | daemon_id=daemon_id, | |
1814 | hostname=host) | |
9f95a23c | 1815 | |
f91f0fd5 | 1816 | with set_exception_subject('service', daemon.service_id(), overwrite=True): |
f6b5b4d7 | 1817 | |
f91f0fd5 | 1818 | self.cephadm_services[daemon_type].pre_remove(daemon) |
f6b5b4d7 TL |
1819 | |
1820 | args = ['--name', name, '--force'] | |
1821 | self.log.info('Removing daemon %s from %s' % (name, host)) | |
1822 | out, err, code = self._run_cephadm( | |
1823 | host, name, 'rm-daemon', args) | |
1824 | if not code: | |
1825 | # remove item from cache | |
1826 | self.cache.rm_daemon(host, name) | |
1827 | self.cache.invalidate_host_daemons(host) | |
e306af50 | 1828 | |
f91f0fd5 | 1829 | self.cephadm_services[daemon_type].post_remove(daemon) |
9f95a23c | 1830 | |
f91f0fd5 | 1831 | return "Removed {} from host '{}'".format(name, host) |
9f95a23c | 1832 | |
adb31ebb | 1833 | def _check_pool_exists(self, pool: str, service_name: str) -> None: |
e306af50 TL |
1834 | logger.info(f'Checking pool "{pool}" exists for service {service_name}') |
1835 | if not self.rados.pool_exists(pool): | |
1836 | raise OrchestratorError(f'Cannot find pool "{pool}" for ' | |
1837 | f'service {service_name}') | |
1838 | ||
adb31ebb TL |
1839 | def _add_daemon(self, |
1840 | daemon_type: str, | |
1841 | spec: ServiceSpec, | |
1842 | create_func: Callable[..., CephadmDaemonSpec], | |
1843 | config_func: Optional[Callable] = None) -> List[str]: | |
9f95a23c TL |
1844 | """ |
1845 | Add (and place) a daemon. Require explicit host placement. Do not | |
1846 | schedule, and do not apply the related scheduling limitations. | |
1847 | """ | |
1848 | self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement)) | |
1849 | if not spec.placement.hosts: | |
1850 | raise OrchestratorError('must specify host(s) to deploy on') | |
1851 | count = spec.placement.count or len(spec.placement.hosts) | |
1852 | daemons = self.cache.get_daemons_by_service(spec.service_name()) | |
1853 | return self._create_daemons(daemon_type, spec, daemons, | |
1854 | spec.placement.hosts, count, | |
1855 | create_func, config_func) | |
1856 | ||
adb31ebb TL |
1857 | def _create_daemons(self, |
1858 | daemon_type: str, | |
1859 | spec: ServiceSpec, | |
1860 | daemons: List[DaemonDescription], | |
1861 | hosts: List[HostPlacementSpec], | |
1862 | count: int, | |
1863 | create_func: Callable[..., CephadmDaemonSpec], | |
1864 | config_func: Optional[Callable] = None) -> List[str]: | |
9f95a23c TL |
1865 | if count > len(hosts): |
1866 | raise OrchestratorError('too few hosts: want %d, have %s' % ( | |
1867 | count, hosts)) | |
1868 | ||
f6b5b4d7 | 1869 | did_config = False |
9f95a23c | 1870 | |
f6b5b4d7 | 1871 | args = [] # type: List[CephadmDaemonSpec] |
9f95a23c TL |
1872 | for host, network, name in hosts: |
1873 | daemon_id = self.get_unique_name(daemon_type, host, daemons, | |
e306af50 TL |
1874 | prefix=spec.service_id, |
1875 | forcename=name) | |
f6b5b4d7 TL |
1876 | |
1877 | if not did_config and config_func: | |
1878 | if daemon_type == 'rgw': | |
1879 | config_func(spec, daemon_id) | |
1880 | else: | |
1881 | config_func(spec) | |
1882 | did_config = True | |
1883 | ||
f91f0fd5 TL |
1884 | daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec( |
1885 | host, daemon_id, network, spec) | |
9f95a23c TL |
1886 | self.log.debug('Placing %s.%s on host %s' % ( |
1887 | daemon_type, daemon_id, host)) | |
f6b5b4d7 | 1888 | args.append(daemon_spec) |
9f95a23c TL |
1889 | |
1890 | # add to daemon list so next name(s) will also be unique | |
1891 | sd = orchestrator.DaemonDescription( | |
1892 | hostname=host, | |
1893 | daemon_type=daemon_type, | |
1894 | daemon_id=daemon_id, | |
1895 | ) | |
1896 | daemons.append(sd) | |
1897 | ||
e306af50 | 1898 | @forall_hosts |
adb31ebb | 1899 | def create_func_map(*args: Any) -> str: |
f91f0fd5 TL |
1900 | daemon_spec = create_func(*args) |
1901 | return self._create_daemon(daemon_spec) | |
9f95a23c TL |
1902 | |
1903 | return create_func_map(args) | |
1904 | ||
1905 | @trivial_completion | |
adb31ebb | 1906 | def apply_mon(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
1907 | return self._apply(spec) |
1908 | ||
e306af50 | 1909 | @trivial_completion |
9f95a23c | 1910 | def add_mon(self, spec): |
e306af50 | 1911 | # type: (ServiceSpec) -> List[str] |
f91f0fd5 | 1912 | return self._add_daemon('mon', spec, self.mon_service.prepare_create) |
9f95a23c | 1913 | |
e306af50 TL |
1914 | @trivial_completion |
1915 | def add_mgr(self, spec): | |
1916 | # type: (ServiceSpec) -> List[str] | |
f91f0fd5 | 1917 | return self._add_daemon('mgr', spec, self.mgr_service.prepare_create) |
9f95a23c | 1918 | |
e306af50 TL |
1919 | def _apply(self, spec: GenericSpec) -> str: |
1920 | if spec.service_type == 'host': | |
1921 | return self._add_host(cast(HostSpec, spec)) | |
9f95a23c | 1922 | |
f6b5b4d7 TL |
1923 | if spec.service_type == 'osd': |
1924 | # _trigger preview refresh needs to be smart and | |
1925 | # should only refresh if a change has been detected | |
1926 | self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)]) | |
1927 | ||
e306af50 | 1928 | return self._apply_service_spec(cast(ServiceSpec, spec)) |
9f95a23c | 1929 | |
adb31ebb | 1930 | def _plan(self, spec: ServiceSpec) -> dict: |
f6b5b4d7 TL |
1931 | if spec.service_type == 'osd': |
1932 | return {'service_name': spec.service_name(), | |
1933 | 'service_type': spec.service_type, | |
1934 | 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])} | |
1935 | ||
1936 | ha = HostAssignment( | |
1937 | spec=spec, | |
f91f0fd5 | 1938 | hosts=self._hosts_with_daemon_inventory(), |
f6b5b4d7 TL |
1939 | get_daemons_func=self.cache.get_daemons_by_service, |
1940 | ) | |
1941 | ha.validate() | |
1942 | hosts = ha.place() | |
1943 | ||
1944 | add_daemon_hosts = ha.add_daemon_hosts(hosts) | |
1945 | remove_daemon_hosts = ha.remove_daemon_hosts(hosts) | |
1946 | ||
1947 | return { | |
1948 | 'service_name': spec.service_name(), | |
1949 | 'service_type': spec.service_type, | |
1950 | 'add': [hs.hostname for hs in add_daemon_hosts], | |
1951 | 'remove': [d.hostname for d in remove_daemon_hosts] | |
1952 | } | |
1953 | ||
1954 | @trivial_completion | |
1955 | def plan(self, specs: List[GenericSpec]) -> List: | |
1956 | results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n' | |
1957 | 'to the current inventory setup. If any on these conditions changes, the \n' | |
1958 | 'preview will be invalid. Please make sure to have a minimal \n' | |
1959 | 'timeframe between planning and applying the specs.'}] | |
1960 | if any([spec.service_type == 'host' for spec in specs]): | |
1961 | return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}] | |
1962 | for spec in specs: | |
1963 | results.append(self._plan(cast(ServiceSpec, spec))) | |
1964 | return results | |
1965 | ||
e306af50 | 1966 | def _apply_service_spec(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
1967 | if spec.placement.is_empty(): |
1968 | # fill in default placement | |
1969 | defaults = { | |
1970 | 'mon': PlacementSpec(count=5), | |
1971 | 'mgr': PlacementSpec(count=2), | |
1972 | 'mds': PlacementSpec(count=2), | |
1973 | 'rgw': PlacementSpec(count=2), | |
1911f103 | 1974 | 'iscsi': PlacementSpec(count=1), |
9f95a23c | 1975 | 'rbd-mirror': PlacementSpec(count=2), |
801d1391 | 1976 | 'nfs': PlacementSpec(count=1), |
9f95a23c TL |
1977 | 'grafana': PlacementSpec(count=1), |
1978 | 'alertmanager': PlacementSpec(count=1), | |
1979 | 'prometheus': PlacementSpec(count=1), | |
1980 | 'node-exporter': PlacementSpec(host_pattern='*'), | |
1981 | 'crash': PlacementSpec(host_pattern='*'), | |
f91f0fd5 | 1982 | 'container': PlacementSpec(count=1), |
9f95a23c TL |
1983 | } |
1984 | spec.placement = defaults[spec.service_type] | |
1985 | elif spec.service_type in ['mon', 'mgr'] and \ | |
f91f0fd5 TL |
1986 | spec.placement.count is not None and \ |
1987 | spec.placement.count < 1: | |
9f95a23c TL |
1988 | raise OrchestratorError('cannot scale %s service below 1' % ( |
1989 | spec.service_type)) | |
1990 | ||
1991 | HostAssignment( | |
1992 | spec=spec, | |
f91f0fd5 | 1993 | hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh |
9f95a23c TL |
1994 | get_daemons_func=self.cache.get_daemons_by_service, |
1995 | ).validate() | |
1996 | ||
1997 | self.log.info('Saving service %s spec with placement %s' % ( | |
1998 | spec.service_name(), spec.placement.pretty_str())) | |
1999 | self.spec_store.save(spec) | |
2000 | self._kick_serve_loop() | |
1911f103 | 2001 | return "Scheduled %s update..." % spec.service_name() |
9f95a23c TL |
2002 | |
2003 | @trivial_completion | |
f6b5b4d7 | 2004 | def apply(self, specs: List[GenericSpec]) -> List[str]: |
e306af50 TL |
2005 | results = [] |
2006 | for spec in specs: | |
2007 | results.append(self._apply(spec)) | |
2008 | return results | |
9f95a23c TL |
2009 | |
2010 | @trivial_completion | |
adb31ebb | 2011 | def apply_mgr(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2012 | return self._apply(spec) |
2013 | ||
e306af50 | 2014 | @trivial_completion |
f6b5b4d7 | 2015 | def add_mds(self, spec: ServiceSpec) -> List[str]: |
f91f0fd5 | 2016 | return self._add_daemon('mds', spec, self.mds_service.prepare_create, self.mds_service.config) |
9f95a23c TL |
2017 | |
2018 | @trivial_completion | |
f6b5b4d7 | 2019 | def apply_mds(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2020 | return self._apply(spec) |
2021 | ||
e306af50 | 2022 | @trivial_completion |
adb31ebb | 2023 | def add_rgw(self, spec: ServiceSpec) -> List[str]: |
f91f0fd5 | 2024 | return self._add_daemon('rgw', spec, self.rgw_service.prepare_create, self.rgw_service.config) |
9f95a23c TL |
2025 | |
2026 | @trivial_completion | |
adb31ebb | 2027 | def apply_rgw(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2028 | return self._apply(spec) |
2029 | ||
e306af50 | 2030 | @trivial_completion |
1911f103 | 2031 | def add_iscsi(self, spec): |
e306af50 | 2032 | # type: (ServiceSpec) -> List[str] |
f91f0fd5 | 2033 | return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create, self.iscsi_service.config) |
1911f103 TL |
2034 | |
2035 | @trivial_completion | |
adb31ebb | 2036 | def apply_iscsi(self, spec: ServiceSpec) -> str: |
1911f103 TL |
2037 | return self._apply(spec) |
2038 | ||
e306af50 | 2039 | @trivial_completion |
adb31ebb | 2040 | def add_rbd_mirror(self, spec: ServiceSpec) -> List[str]: |
f91f0fd5 | 2041 | return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create) |
9f95a23c TL |
2042 | |
2043 | @trivial_completion | |
adb31ebb | 2044 | def apply_rbd_mirror(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2045 | return self._apply(spec) |
2046 | ||
e306af50 | 2047 | @trivial_completion |
adb31ebb | 2048 | def add_nfs(self, spec: ServiceSpec) -> List[str]: |
f91f0fd5 | 2049 | return self._add_daemon('nfs', spec, self.nfs_service.prepare_create, self.nfs_service.config) |
801d1391 TL |
2050 | |
2051 | @trivial_completion | |
adb31ebb | 2052 | def apply_nfs(self, spec: ServiceSpec) -> str: |
801d1391 TL |
2053 | return self._apply(spec) |
2054 | ||
9f95a23c TL |
2055 | def _get_dashboard_url(self): |
2056 | # type: () -> str | |
2057 | return self.get('mgr_map').get('services', {}).get('dashboard', '') | |
2058 | ||
e306af50 | 2059 | @trivial_completion |
adb31ebb | 2060 | def add_prometheus(self, spec: ServiceSpec) -> List[str]: |
f91f0fd5 | 2061 | return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create) |
9f95a23c TL |
2062 | |
2063 | @trivial_completion | |
adb31ebb | 2064 | def apply_prometheus(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2065 | return self._apply(spec) |
2066 | ||
e306af50 | 2067 | @trivial_completion |
9f95a23c | 2068 | def add_node_exporter(self, spec): |
e306af50 | 2069 | # type: (ServiceSpec) -> List[str] |
9f95a23c | 2070 | return self._add_daemon('node-exporter', spec, |
f91f0fd5 | 2071 | self.node_exporter_service.prepare_create) |
9f95a23c TL |
2072 | |
2073 | @trivial_completion | |
adb31ebb | 2074 | def apply_node_exporter(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2075 | return self._apply(spec) |
2076 | ||
e306af50 | 2077 | @trivial_completion |
9f95a23c | 2078 | def add_crash(self, spec): |
e306af50 | 2079 | # type: (ServiceSpec) -> List[str] |
9f95a23c | 2080 | return self._add_daemon('crash', spec, |
f91f0fd5 | 2081 | self.crash_service.prepare_create) |
9f95a23c TL |
2082 | |
2083 | @trivial_completion | |
adb31ebb | 2084 | def apply_crash(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2085 | return self._apply(spec) |
2086 | ||
e306af50 | 2087 | @trivial_completion |
9f95a23c | 2088 | def add_grafana(self, spec): |
e306af50 | 2089 | # type: (ServiceSpec) -> List[str] |
f91f0fd5 | 2090 | return self._add_daemon('grafana', spec, self.grafana_service.prepare_create) |
9f95a23c TL |
2091 | |
2092 | @trivial_completion | |
f6b5b4d7 | 2093 | def apply_grafana(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2094 | return self._apply(spec) |
2095 | ||
e306af50 | 2096 | @trivial_completion |
9f95a23c | 2097 | def add_alertmanager(self, spec): |
e306af50 | 2098 | # type: (ServiceSpec) -> List[str] |
f91f0fd5 | 2099 | return self._add_daemon('alertmanager', spec, self.alertmanager_service.prepare_create) |
9f95a23c TL |
2100 | |
2101 | @trivial_completion | |
f6b5b4d7 | 2102 | def apply_alertmanager(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2103 | return self._apply(spec) |
2104 | ||
f91f0fd5 TL |
2105 | @trivial_completion |
2106 | def add_container(self, spec: ServiceSpec) -> List[str]: | |
2107 | return self._add_daemon('container', spec, | |
2108 | self.container_service.prepare_create) | |
2109 | ||
2110 | @trivial_completion | |
2111 | def apply_container(self, spec: ServiceSpec) -> str: | |
2112 | return self._apply(spec) | |
2113 | ||
adb31ebb | 2114 | def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo: |
9f95a23c TL |
2115 | # pick a random host... |
2116 | host = None | |
e306af50 | 2117 | for host_name in self.inventory.keys(): |
9f95a23c TL |
2118 | host = host_name |
2119 | break | |
2120 | if not host: | |
2121 | raise OrchestratorError('no hosts defined') | |
f6b5b4d7 | 2122 | if self.cache.host_needs_registry_login(host) and self.registry_url: |
f91f0fd5 TL |
2123 | self._registry_login(host, self.registry_url, |
2124 | self.registry_username, self.registry_password) | |
9f95a23c | 2125 | out, err, code = self._run_cephadm( |
f6b5b4d7 | 2126 | host, '', 'pull', [], |
9f95a23c TL |
2127 | image=image_name, |
2128 | no_fsid=True, | |
2129 | error_ok=True) | |
2130 | if code: | |
2131 | raise OrchestratorError('Failed to pull %s on %s: %s' % ( | |
2132 | image_name, host, '\n'.join(out))) | |
f91f0fd5 TL |
2133 | try: |
2134 | j = json.loads('\n'.join(out)) | |
2135 | r = ContainerInspectInfo( | |
2136 | j['image_id'], | |
2137 | j.get('ceph_version'), | |
2138 | j.get('repo_digest') | |
2139 | ) | |
2140 | self.log.debug(f'image {image_name} -> {r}') | |
2141 | return r | |
2142 | except (ValueError, KeyError) as _: | |
adb31ebb TL |
2143 | msg = 'Failed to pull %s on %s: Cannot decode JSON' % (image_name, host) |
2144 | self.log.exception('%s: \'%s\'' % (msg, '\n'.join(out))) | |
f91f0fd5 | 2145 | raise OrchestratorError(msg) |
9f95a23c TL |
2146 | |
2147 | @trivial_completion | |
adb31ebb | 2148 | def upgrade_check(self, image: str, version: str) -> str: |
9f95a23c TL |
2149 | if version: |
2150 | target_name = self.container_image_base + ':v' + version | |
2151 | elif image: | |
2152 | target_name = image | |
2153 | else: | |
2154 | raise OrchestratorError('must specify either image or version') | |
2155 | ||
f91f0fd5 TL |
2156 | image_info = self._get_container_image_info(target_name) |
2157 | self.log.debug(f'image info {image} -> {image_info}') | |
adb31ebb | 2158 | r: dict = { |
9f95a23c | 2159 | 'target_name': target_name, |
f91f0fd5 TL |
2160 | 'target_id': image_info.image_id, |
2161 | 'target_version': image_info.ceph_version, | |
9f95a23c TL |
2162 | 'needs_update': dict(), |
2163 | 'up_to_date': list(), | |
2164 | } | |
2165 | for host, dm in self.cache.daemons.items(): | |
2166 | for name, dd in dm.items(): | |
f91f0fd5 | 2167 | if image_info.image_id == dd.container_image_id: |
9f95a23c TL |
2168 | r['up_to_date'].append(dd.name()) |
2169 | else: | |
2170 | r['needs_update'][dd.name()] = { | |
2171 | 'current_name': dd.container_image_name, | |
2172 | 'current_id': dd.container_image_id, | |
2173 | 'current_version': dd.version, | |
2174 | } | |
f91f0fd5 TL |
2175 | if self.use_repo_digest: |
2176 | r['target_digest'] = image_info.repo_digest | |
2177 | ||
9f95a23c TL |
2178 | return json.dumps(r, indent=4, sort_keys=True) |
2179 | ||
2180 | @trivial_completion | |
f6b5b4d7 | 2181 | def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: |
e306af50 | 2182 | return self.upgrade.upgrade_status() |
9f95a23c TL |
2183 | |
2184 | @trivial_completion | |
adb31ebb | 2185 | def upgrade_start(self, image: str, version: str) -> str: |
e306af50 | 2186 | return self.upgrade.upgrade_start(image, version) |
9f95a23c TL |
2187 | |
2188 | @trivial_completion | |
f6b5b4d7 | 2189 | def upgrade_pause(self) -> str: |
e306af50 | 2190 | return self.upgrade.upgrade_pause() |
9f95a23c TL |
2191 | |
2192 | @trivial_completion | |
f6b5b4d7 | 2193 | def upgrade_resume(self) -> str: |
e306af50 | 2194 | return self.upgrade.upgrade_resume() |
9f95a23c TL |
2195 | |
2196 | @trivial_completion | |
f6b5b4d7 | 2197 | def upgrade_stop(self) -> str: |
e306af50 | 2198 | return self.upgrade.upgrade_stop() |
9f95a23c TL |
2199 | |
2200 | @trivial_completion | |
2201 | def remove_osds(self, osd_ids: List[str], | |
2202 | replace: bool = False, | |
f6b5b4d7 | 2203 | force: bool = False) -> str: |
9f95a23c TL |
2204 | """ |
2205 | Takes a list of OSDs and schedules them for removal. | |
2206 | The function that takes care of the actual removal is | |
f6b5b4d7 | 2207 | process_removal_queue(). |
9f95a23c TL |
2208 | """ |
2209 | ||
f6b5b4d7 TL |
2210 | daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd') |
2211 | to_remove_daemons = list() | |
9f95a23c | 2212 | for daemon in daemons: |
f6b5b4d7 TL |
2213 | if daemon.daemon_id in osd_ids: |
2214 | to_remove_daemons.append(daemon) | |
2215 | if not to_remove_daemons: | |
2216 | return f"Unable to find OSDs: {osd_ids}" | |
9f95a23c | 2217 | |
f6b5b4d7 TL |
2218 | for daemon in to_remove_daemons: |
2219 | try: | |
2220 | self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id), | |
2221 | replace=replace, | |
2222 | force=force, | |
2223 | hostname=daemon.hostname, | |
2224 | fullname=daemon.name(), | |
adb31ebb TL |
2225 | process_started_at=datetime_now(), |
2226 | remove_util=self.to_remove_osds.rm_util)) | |
f6b5b4d7 TL |
2227 | except NotFoundError: |
2228 | return f"Unable to find OSDs: {osd_ids}" | |
9f95a23c TL |
2229 | |
2230 | # trigger the serve loop to initiate the removal | |
2231 | self._kick_serve_loop() | |
2232 | return "Scheduled OSD(s) for removal" | |
2233 | ||
f6b5b4d7 | 2234 | @trivial_completion |
adb31ebb | 2235 | def stop_remove_osds(self, osd_ids: List[str]) -> str: |
f6b5b4d7 TL |
2236 | """ |
2237 | Stops a `removal` process for a List of OSDs. | |
2238 | This will revert their weight and remove it from the osds_to_remove queue | |
2239 | """ | |
2240 | for osd_id in osd_ids: | |
2241 | try: | |
2242 | self.to_remove_osds.rm(OSD(osd_id=int(osd_id), | |
adb31ebb | 2243 | remove_util=self.to_remove_osds.rm_util)) |
f6b5b4d7 TL |
2244 | except (NotFoundError, KeyError): |
2245 | return f'Unable to find OSD in the queue: {osd_id}' | |
2246 | ||
2247 | # trigger the serve loop to halt the removal | |
2248 | self._kick_serve_loop() | |
2249 | return "Stopped OSD(s) removal" | |
2250 | ||
9f95a23c | 2251 | @trivial_completion |
adb31ebb | 2252 | def remove_osds_status(self) -> List[OSD]: |
9f95a23c TL |
2253 | """ |
2254 | The CLI call to retrieve an osd removal report | |
2255 | """ | |
f6b5b4d7 | 2256 | return self.to_remove_osds.all_osds() |