]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | import json |
2 | import errno | |
3 | import logging | |
f91f0fd5 TL |
4 | import re |
5 | import shlex | |
e306af50 | 6 | from collections import defaultdict |
f91f0fd5 | 7 | from configparser import ConfigParser |
f6b5b4d7 | 8 | from contextlib import contextmanager |
9f95a23c | 9 | from functools import wraps |
e306af50 TL |
10 | from tempfile import TemporaryDirectory |
11 | from threading import Event | |
9f95a23c TL |
12 | |
13 | import string | |
e306af50 | 14 | from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \ |
f91f0fd5 | 15 | Any, Set, TYPE_CHECKING, cast, Iterator, Union, NamedTuple |
9f95a23c TL |
16 | |
17 | import datetime | |
18 | import six | |
19 | import os | |
20 | import random | |
21 | import tempfile | |
22 | import multiprocessing.pool | |
9f95a23c | 23 | import subprocess |
9f95a23c | 24 | |
e306af50 | 25 | from ceph.deployment import inventory |
9f95a23c | 26 | from ceph.deployment.drive_group import DriveGroupSpec |
801d1391 | 27 | from ceph.deployment.service_spec import \ |
f91f0fd5 TL |
28 | NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \ |
29 | CustomContainerSpec | |
30 | from cephadm.serve import CephadmServe | |
f6b5b4d7 | 31 | from cephadm.services.cephadmservice import CephadmDaemonSpec |
9f95a23c | 32 | |
801d1391 | 33 | from mgr_module import MgrModule, HandleCommandResult |
9f95a23c TL |
34 | import orchestrator |
35 | from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \ | |
f6b5b4d7 | 36 | CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription |
e306af50 | 37 | from orchestrator._interface import GenericSpec |
9f95a23c TL |
38 | |
39 | from . import remotes | |
801d1391 | 40 | from . import utils |
f6b5b4d7 | 41 | from .migrations import Migrations |
e306af50 TL |
42 | from .services.cephadmservice import MonService, MgrService, MdsService, RgwService, \ |
43 | RbdMirrorService, CrashService, CephadmService | |
f91f0fd5 | 44 | from .services.container import CustomContainerService |
e306af50 TL |
45 | from .services.iscsi import IscsiService |
46 | from .services.nfs import NFSService | |
f6b5b4d7 | 47 | from .services.osd import RemoveUtil, OSDQueue, OSDService, OSD, NotFoundError |
e306af50 TL |
48 | from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \ |
49 | NodeExporterService | |
f91f0fd5 | 50 | from .schedule import HostAssignment |
f6b5b4d7 | 51 | from .inventory import Inventory, SpecStore, HostCache, EventStore |
e306af50 TL |
52 | from .upgrade import CEPH_UPGRADE_ORDER, CephadmUpgrade |
53 | from .template import TemplateMgr | |
f91f0fd5 TL |
54 | from .utils import forall_hosts, CephadmNoImage, cephadmNoImage, \ |
55 | str_to_datetime, datetime_to_str | |
9f95a23c TL |
56 | |
57 | try: | |
58 | import remoto | |
e306af50 TL |
59 | # NOTE(mattoliverau) Patch remoto until remoto PR |
60 | # (https://github.com/alfredodeza/remoto/pull/56) lands | |
61 | from distutils.version import StrictVersion | |
62 | if StrictVersion(remoto.__version__) <= StrictVersion('1.2'): | |
63 | def remoto_has_connection(self): | |
64 | return self.gateway.hasreceiver() | |
65 | ||
66 | from remoto.backends import BaseConnection | |
67 | BaseConnection.has_connection = remoto_has_connection | |
9f95a23c TL |
68 | import remoto.process |
69 | import execnet.gateway_bootstrap | |
70 | except ImportError as e: | |
71 | remoto = None | |
72 | remoto_import_error = str(e) | |
73 | ||
74 | try: | |
75 | from typing import List | |
76 | except ImportError: | |
77 | pass | |
78 | ||
79 | logger = logging.getLogger(__name__) | |
80 | ||
e306af50 TL |
81 | T = TypeVar('T') |
82 | ||
1911f103 TL |
83 | DEFAULT_SSH_CONFIG = """ |
84 | Host * | |
85 | User root | |
86 | StrictHostKeyChecking no | |
87 | UserKnownHostsFile /dev/null | |
88 | ConnectTimeout=30 | |
89 | """ | |
9f95a23c | 90 | |
9f95a23c TL |
91 | CEPH_DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ' |
92 | ||
9f95a23c TL |
93 | CEPH_TYPES = set(CEPH_UPGRADE_ORDER) |
94 | ||
95 | ||
f6b5b4d7 | 96 | class CephadmCompletion(orchestrator.Completion[T]): |
e306af50 TL |
97 | def evaluate(self): |
98 | self.finalize(None) | |
9f95a23c | 99 | |
f91f0fd5 | 100 | |
f6b5b4d7 | 101 | def trivial_completion(f: Callable[..., T]) -> Callable[..., CephadmCompletion[T]]: |
9f95a23c | 102 | """ |
e306af50 TL |
103 | Decorator to make CephadmCompletion methods return |
104 | a completion object that executes themselves. | |
9f95a23c | 105 | """ |
9f95a23c | 106 | |
9f95a23c TL |
107 | @wraps(f) |
108 | def wrapper(*args, **kwargs): | |
e306af50 TL |
109 | return CephadmCompletion(on_complete=lambda _: f(*args, **kwargs)) |
110 | ||
9f95a23c TL |
111 | return wrapper |
112 | ||
113 | ||
f91f0fd5 TL |
114 | class ContainerInspectInfo(NamedTuple): |
115 | image_id: str | |
116 | ceph_version: Optional[str] | |
117 | repo_digest: Optional[str] | |
118 | ||
119 | ||
9f95a23c TL |
120 | @six.add_metaclass(CLICommandMeta) |
121 | class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule): | |
122 | ||
123 | _STORE_HOST_PREFIX = "host" | |
124 | ||
125 | instance = None | |
126 | NATIVE_OPTIONS = [] # type: List[Any] | |
f6b5b4d7 | 127 | MODULE_OPTIONS: List[dict] = [ |
9f95a23c TL |
128 | { |
129 | 'name': 'ssh_config_file', | |
130 | 'type': 'str', | |
131 | 'default': None, | |
132 | 'desc': 'customized SSH config file to connect to managed hosts', | |
133 | }, | |
134 | { | |
135 | 'name': 'device_cache_timeout', | |
136 | 'type': 'secs', | |
137 | 'default': 30 * 60, | |
138 | 'desc': 'seconds to cache device inventory', | |
139 | }, | |
140 | { | |
141 | 'name': 'daemon_cache_timeout', | |
142 | 'type': 'secs', | |
143 | 'default': 10 * 60, | |
144 | 'desc': 'seconds to cache service (daemon) inventory', | |
145 | }, | |
146 | { | |
147 | 'name': 'host_check_interval', | |
148 | 'type': 'secs', | |
149 | 'default': 10 * 60, | |
150 | 'desc': 'how frequently to perform a host check', | |
151 | }, | |
152 | { | |
153 | 'name': 'mode', | |
154 | 'type': 'str', | |
155 | 'enum_allowed': ['root', 'cephadm-package'], | |
156 | 'default': 'root', | |
157 | 'desc': 'mode for remote execution of cephadm', | |
158 | }, | |
159 | { | |
160 | 'name': 'container_image_base', | |
801d1391 | 161 | 'default': 'docker.io/ceph/ceph', |
9f95a23c TL |
162 | 'desc': 'Container image name, without the tag', |
163 | 'runtime': True, | |
164 | }, | |
e306af50 TL |
165 | { |
166 | 'name': 'container_image_prometheus', | |
f91f0fd5 | 167 | 'default': 'docker.io/prom/prometheus:v2.18.1', |
e306af50 TL |
168 | 'desc': 'Prometheus container image', |
169 | }, | |
170 | { | |
171 | 'name': 'container_image_grafana', | |
f91f0fd5 | 172 | 'default': 'docker.io/ceph/ceph-grafana:6.6.2', |
e306af50 TL |
173 | 'desc': 'Prometheus container image', |
174 | }, | |
175 | { | |
176 | 'name': 'container_image_alertmanager', | |
f91f0fd5 | 177 | 'default': 'docker.io/prom/alertmanager:v0.20.0', |
e306af50 TL |
178 | 'desc': 'Prometheus container image', |
179 | }, | |
180 | { | |
181 | 'name': 'container_image_node_exporter', | |
f91f0fd5 | 182 | 'default': 'docker.io/prom/node-exporter:v0.18.1', |
e306af50 TL |
183 | 'desc': 'Prometheus container image', |
184 | }, | |
9f95a23c TL |
185 | { |
186 | 'name': 'warn_on_stray_hosts', | |
187 | 'type': 'bool', | |
188 | 'default': True, | |
189 | 'desc': 'raise a health warning if daemons are detected on a host ' | |
190 | 'that is not managed by cephadm', | |
191 | }, | |
192 | { | |
193 | 'name': 'warn_on_stray_daemons', | |
194 | 'type': 'bool', | |
195 | 'default': True, | |
196 | 'desc': 'raise a health warning if daemons are detected ' | |
197 | 'that are not managed by cephadm', | |
198 | }, | |
199 | { | |
200 | 'name': 'warn_on_failed_host_check', | |
201 | 'type': 'bool', | |
202 | 'default': True, | |
203 | 'desc': 'raise a health warning if the host check fails', | |
204 | }, | |
205 | { | |
206 | 'name': 'log_to_cluster', | |
207 | 'type': 'bool', | |
208 | 'default': True, | |
209 | 'desc': 'log to the "cephadm" cluster log channel"', | |
210 | }, | |
211 | { | |
212 | 'name': 'allow_ptrace', | |
213 | 'type': 'bool', | |
214 | 'default': False, | |
215 | 'desc': 'allow SYS_PTRACE capability on ceph containers', | |
216 | 'long_desc': 'The SYS_PTRACE capability is needed to attach to a ' | |
217 | 'process with gdb or strace. Enabling this options ' | |
218 | 'can allow debugging daemons that encounter problems ' | |
219 | 'at runtime.', | |
220 | }, | |
f91f0fd5 TL |
221 | { |
222 | 'name': 'container_init', | |
223 | 'type': 'bool', | |
224 | 'default': False, | |
225 | 'desc': 'Run podman/docker with `--init`', | |
226 | }, | |
801d1391 TL |
227 | { |
228 | 'name': 'prometheus_alerts_path', | |
229 | 'type': 'str', | |
230 | 'default': '/etc/prometheus/ceph/ceph_default_alerts.yml', | |
231 | 'desc': 'location of alerts to include in prometheus deployments', | |
232 | }, | |
f6b5b4d7 TL |
233 | { |
234 | 'name': 'migration_current', | |
235 | 'type': 'int', | |
236 | 'default': None, | |
237 | 'desc': 'internal - do not modify', | |
238 | # used to track track spec and other data migrations. | |
239 | }, | |
240 | { | |
241 | 'name': 'config_dashboard', | |
242 | 'type': 'bool', | |
243 | 'default': True, | |
244 | 'desc': 'manage configs like API endpoints in Dashboard.' | |
245 | }, | |
246 | { | |
247 | 'name': 'manage_etc_ceph_ceph_conf', | |
248 | 'type': 'bool', | |
249 | 'default': False, | |
250 | 'desc': 'Manage and own /etc/ceph/ceph.conf on the hosts.', | |
251 | }, | |
252 | { | |
253 | 'name': 'registry_url', | |
254 | 'type': 'str', | |
255 | 'default': None, | |
256 | 'desc': 'Custom repository url' | |
257 | }, | |
258 | { | |
259 | 'name': 'registry_username', | |
260 | 'type': 'str', | |
261 | 'default': None, | |
262 | 'desc': 'Custom repository username' | |
263 | }, | |
264 | { | |
265 | 'name': 'registry_password', | |
266 | 'type': 'str', | |
267 | 'default': None, | |
268 | 'desc': 'Custom repository password' | |
269 | }, | |
f91f0fd5 TL |
270 | { |
271 | 'name': 'use_repo_digest', | |
272 | 'type': 'bool', | |
273 | 'default': False, | |
274 | 'desc': 'Automatically convert image tags to image digest. Make sure all daemons use the same image', | |
275 | } | |
9f95a23c TL |
276 | ] |
277 | ||
278 | def __init__(self, *args, **kwargs): | |
279 | super(CephadmOrchestrator, self).__init__(*args, **kwargs) | |
280 | self._cluster_fsid = self.get('mon_map')['fsid'] | |
f6b5b4d7 | 281 | self.last_monmap: Optional[datetime.datetime] = None |
9f95a23c TL |
282 | |
283 | # for serve() | |
284 | self.run = True | |
285 | self.event = Event() | |
286 | ||
287 | if self.get_store('pause'): | |
288 | self.paused = True | |
289 | else: | |
290 | self.paused = False | |
291 | ||
292 | # for mypy which does not run the code | |
293 | if TYPE_CHECKING: | |
294 | self.ssh_config_file = None # type: Optional[str] | |
295 | self.device_cache_timeout = 0 | |
296 | self.daemon_cache_timeout = 0 | |
297 | self.host_check_interval = 0 | |
298 | self.mode = '' | |
299 | self.container_image_base = '' | |
e306af50 TL |
300 | self.container_image_prometheus = '' |
301 | self.container_image_grafana = '' | |
302 | self.container_image_alertmanager = '' | |
303 | self.container_image_node_exporter = '' | |
9f95a23c TL |
304 | self.warn_on_stray_hosts = True |
305 | self.warn_on_stray_daemons = True | |
306 | self.warn_on_failed_host_check = True | |
307 | self.allow_ptrace = False | |
f91f0fd5 | 308 | self.container_init = False |
801d1391 | 309 | self.prometheus_alerts_path = '' |
f6b5b4d7 TL |
310 | self.migration_current = None |
311 | self.config_dashboard = True | |
312 | self.manage_etc_ceph_ceph_conf = True | |
313 | self.registry_url: Optional[str] = None | |
314 | self.registry_username: Optional[str] = None | |
315 | self.registry_password: Optional[str] = None | |
f91f0fd5 | 316 | self.use_repo_digest = False |
9f95a23c | 317 | |
f91f0fd5 TL |
318 | self._cons: Dict[str, Tuple[remoto.backends.BaseConnection, |
319 | remoto.backends.LegacyModuleExecute]] = {} | |
f6b5b4d7 TL |
320 | |
321 | self.notify('mon_map', None) | |
9f95a23c TL |
322 | self.config_notify() |
323 | ||
324 | path = self.get_ceph_option('cephadm_path') | |
325 | try: | |
326 | with open(path, 'r') as f: | |
327 | self._cephadm = f.read() | |
328 | except (IOError, TypeError) as e: | |
329 | raise RuntimeError("unable to read cephadm at '%s': %s" % ( | |
330 | path, str(e))) | |
331 | ||
332 | self._worker_pool = multiprocessing.pool.ThreadPool(10) | |
333 | ||
334 | self._reconfig_ssh() | |
335 | ||
336 | CephadmOrchestrator.instance = self | |
337 | ||
e306af50 | 338 | self.upgrade = CephadmUpgrade(self) |
9f95a23c TL |
339 | |
340 | self.health_checks = {} | |
341 | ||
342 | self.all_progress_references = list() # type: List[orchestrator.ProgressReference] | |
343 | ||
e306af50 | 344 | self.inventory = Inventory(self) |
9f95a23c TL |
345 | |
346 | self.cache = HostCache(self) | |
347 | self.cache.load() | |
f6b5b4d7 | 348 | |
9f95a23c | 349 | self.rm_util = RemoveUtil(self) |
f6b5b4d7 TL |
350 | self.to_remove_osds = OSDQueue() |
351 | self.rm_util.load_from_store() | |
9f95a23c TL |
352 | |
353 | self.spec_store = SpecStore(self) | |
354 | self.spec_store.load() | |
355 | ||
356 | # ensure the host lists are in sync | |
357 | for h in self.inventory.keys(): | |
358 | if h not in self.cache.daemons: | |
359 | self.cache.prime_empty_host(h) | |
360 | for h in self.cache.get_hosts(): | |
361 | if h not in self.inventory: | |
362 | self.cache.rm_host(h) | |
363 | ||
1911f103 | 364 | # in-memory only. |
f6b5b4d7 | 365 | self.events = EventStore(self) |
1911f103 TL |
366 | self.offline_hosts: Set[str] = set() |
367 | ||
f6b5b4d7 TL |
368 | self.migration = Migrations(self) |
369 | ||
e306af50 TL |
370 | # services: |
371 | self.osd_service = OSDService(self) | |
372 | self.nfs_service = NFSService(self) | |
373 | self.mon_service = MonService(self) | |
374 | self.mgr_service = MgrService(self) | |
375 | self.mds_service = MdsService(self) | |
376 | self.rgw_service = RgwService(self) | |
377 | self.rbd_mirror_service = RbdMirrorService(self) | |
378 | self.grafana_service = GrafanaService(self) | |
379 | self.alertmanager_service = AlertmanagerService(self) | |
380 | self.prometheus_service = PrometheusService(self) | |
381 | self.node_exporter_service = NodeExporterService(self) | |
382 | self.crash_service = CrashService(self) | |
383 | self.iscsi_service = IscsiService(self) | |
f91f0fd5 | 384 | self.container_service = CustomContainerService(self) |
e306af50 TL |
385 | self.cephadm_services = { |
386 | 'mon': self.mon_service, | |
387 | 'mgr': self.mgr_service, | |
388 | 'osd': self.osd_service, | |
389 | 'mds': self.mds_service, | |
390 | 'rgw': self.rgw_service, | |
391 | 'rbd-mirror': self.rbd_mirror_service, | |
392 | 'nfs': self.nfs_service, | |
393 | 'grafana': self.grafana_service, | |
394 | 'alertmanager': self.alertmanager_service, | |
395 | 'prometheus': self.prometheus_service, | |
396 | 'node-exporter': self.node_exporter_service, | |
397 | 'crash': self.crash_service, | |
398 | 'iscsi': self.iscsi_service, | |
f91f0fd5 | 399 | 'container': self.container_service, |
e306af50 TL |
400 | } |
401 | ||
f91f0fd5 | 402 | self.template = TemplateMgr(self) |
e306af50 | 403 | |
f6b5b4d7 TL |
404 | self.requires_post_actions = set() |
405 | ||
9f95a23c TL |
406 | def shutdown(self): |
407 | self.log.debug('shutdown') | |
408 | self._worker_pool.close() | |
409 | self._worker_pool.join() | |
410 | self.run = False | |
411 | self.event.set() | |
412 | ||
e306af50 TL |
413 | def _get_cephadm_service(self, service_type: str) -> CephadmService: |
414 | assert service_type in ServiceSpec.KNOWN_SERVICE_TYPES | |
415 | return self.cephadm_services[service_type] | |
416 | ||
9f95a23c TL |
417 | def _kick_serve_loop(self): |
418 | self.log.debug('_kick_serve_loop') | |
419 | self.event.set() | |
420 | ||
f6b5b4d7 TL |
421 | # function responsible for logging single host into custom registry |
422 | def _registry_login(self, host, url, username, password): | |
423 | self.log.debug(f"Attempting to log host {host} into custom registry @ {url}") | |
424 | # want to pass info over stdin rather than through normal list of args | |
425 | args_str = ("{\"url\": \"" + url + "\", \"username\": \"" + username + "\", " | |
426 | " \"password\": \"" + password + "\"}") | |
427 | out, err, code = self._run_cephadm( | |
428 | host, 'mon', 'registry-login', | |
429 | ['--registry-json', '-'], stdin=args_str, error_ok=True) | |
430 | if code: | |
431 | return f"Host {host} failed to login to {url} as {username} with given password" | |
432 | return | |
9f95a23c | 433 | |
f6b5b4d7 TL |
434 | def serve(self) -> None: |
435 | """ | |
436 | The main loop of cephadm. | |
437 | ||
438 | A command handler will typically change the declarative state | |
439 | of cephadm. This loop will then attempt to apply this new state. | |
440 | """ | |
f91f0fd5 TL |
441 | serve = CephadmServe(self) |
442 | serve.serve() | |
443 | ||
444 | def set_container_image(self, entity: str, image): | |
445 | self.check_mon_command({ | |
446 | 'prefix': 'config set', | |
447 | 'name': 'container_image', | |
448 | 'value': image, | |
449 | 'who': entity, | |
450 | }) | |
f6b5b4d7 | 451 | |
9f95a23c TL |
452 | def config_notify(self): |
453 | """ | |
454 | This method is called whenever one of our config options is changed. | |
f6b5b4d7 TL |
455 | |
456 | TODO: this method should be moved into mgr_module.py | |
9f95a23c TL |
457 | """ |
458 | for opt in self.MODULE_OPTIONS: | |
459 | setattr(self, | |
460 | opt['name'], # type: ignore | |
461 | self.get_module_option(opt['name'])) # type: ignore | |
462 | self.log.debug(' mgr option %s = %s', | |
463 | opt['name'], getattr(self, opt['name'])) # type: ignore | |
464 | for opt in self.NATIVE_OPTIONS: | |
465 | setattr(self, | |
466 | opt, # type: ignore | |
467 | self.get_ceph_option(opt)) | |
468 | self.log.debug(' native option %s = %s', opt, getattr(self, opt)) # type: ignore | |
469 | ||
470 | self.event.set() | |
471 | ||
472 | def notify(self, notify_type, notify_id): | |
f6b5b4d7 TL |
473 | if notify_type == "mon_map": |
474 | # get monmap mtime so we can refresh configs when mons change | |
475 | monmap = self.get('mon_map') | |
476 | self.last_monmap = datetime.datetime.strptime( | |
477 | monmap['modified'], CEPH_DATEFMT) | |
478 | if self.last_monmap and self.last_monmap > datetime.datetime.utcnow(): | |
479 | self.last_monmap = None # just in case clocks are skewed | |
f91f0fd5 TL |
480 | if getattr(self, 'manage_etc_ceph_ceph_conf', False): |
481 | # getattr, due to notify() being called before config_notify() | |
482 | self._kick_serve_loop() | |
f6b5b4d7 TL |
483 | if notify_type == "pg_summary": |
484 | self._trigger_osd_removal() | |
485 | ||
486 | def _trigger_osd_removal(self): | |
487 | data = self.get("osd_stats") | |
488 | for osd in data.get('osd_stats', []): | |
489 | if osd.get('num_pgs') == 0: | |
490 | # if _ANY_ osd that is currently in the queue appears to be empty, | |
491 | # start the removal process | |
492 | if int(osd.get('osd')) in self.to_remove_osds.as_osd_ids(): | |
493 | self.log.debug(f"Found empty osd. Starting removal process") | |
494 | # if the osd that is now empty is also part of the removal queue | |
495 | # start the process | |
496 | self.rm_util.process_removal_queue() | |
9f95a23c TL |
497 | |
498 | def pause(self): | |
499 | if not self.paused: | |
500 | self.log.info('Paused') | |
501 | self.set_store('pause', 'true') | |
502 | self.paused = True | |
503 | # wake loop so we update the health status | |
504 | self._kick_serve_loop() | |
505 | ||
506 | def resume(self): | |
507 | if self.paused: | |
508 | self.log.info('Resumed') | |
509 | self.paused = False | |
510 | self.set_store('pause', None) | |
511 | # unconditionally wake loop so that 'orch resume' can be used to kick | |
512 | # cephadm | |
513 | self._kick_serve_loop() | |
514 | ||
515 | def get_unique_name(self, daemon_type, host, existing, prefix=None, | |
516 | forcename=None): | |
517 | # type: (str, str, List[orchestrator.DaemonDescription], Optional[str], Optional[str]) -> str | |
518 | """ | |
519 | Generate a unique random service name | |
520 | """ | |
521 | suffix = daemon_type not in [ | |
801d1391 | 522 | 'mon', 'crash', 'nfs', |
9f95a23c | 523 | 'prometheus', 'node-exporter', 'grafana', 'alertmanager', |
f91f0fd5 | 524 | 'container' |
9f95a23c TL |
525 | ] |
526 | if forcename: | |
527 | if len([d for d in existing if d.daemon_id == forcename]): | |
f91f0fd5 TL |
528 | raise orchestrator.OrchestratorValidationError( |
529 | f'name {daemon_type}.{forcename} already in use') | |
9f95a23c TL |
530 | return forcename |
531 | ||
532 | if '.' in host: | |
533 | host = host.split('.')[0] | |
534 | while True: | |
535 | if prefix: | |
536 | name = prefix + '.' | |
537 | else: | |
538 | name = '' | |
539 | name += host | |
540 | if suffix: | |
541 | name += '.' + ''.join(random.choice(string.ascii_lowercase) | |
542 | for _ in range(6)) | |
543 | if len([d for d in existing if d.daemon_id == name]): | |
544 | if not suffix: | |
f91f0fd5 TL |
545 | raise orchestrator.OrchestratorValidationError( |
546 | f'name {daemon_type}.{name} already in use') | |
9f95a23c TL |
547 | self.log.debug('name %s exists, trying again', name) |
548 | continue | |
549 | return name | |
550 | ||
9f95a23c TL |
551 | def _reconfig_ssh(self): |
552 | temp_files = [] # type: list | |
553 | ssh_options = [] # type: List[str] | |
554 | ||
555 | # ssh_config | |
556 | ssh_config_fname = self.ssh_config_file | |
557 | ssh_config = self.get_store("ssh_config") | |
558 | if ssh_config is not None or ssh_config_fname is None: | |
559 | if not ssh_config: | |
560 | ssh_config = DEFAULT_SSH_CONFIG | |
561 | f = tempfile.NamedTemporaryFile(prefix='cephadm-conf-') | |
562 | os.fchmod(f.fileno(), 0o600) | |
563 | f.write(ssh_config.encode('utf-8')) | |
564 | f.flush() # make visible to other processes | |
565 | temp_files += [f] | |
566 | ssh_config_fname = f.name | |
567 | if ssh_config_fname: | |
801d1391 | 568 | self.validate_ssh_config_fname(ssh_config_fname) |
9f95a23c | 569 | ssh_options += ['-F', ssh_config_fname] |
f6b5b4d7 | 570 | self.ssh_config = ssh_config |
9f95a23c TL |
571 | |
572 | # identity | |
573 | ssh_key = self.get_store("ssh_identity_key") | |
574 | ssh_pub = self.get_store("ssh_identity_pub") | |
575 | self.ssh_pub = ssh_pub | |
576 | self.ssh_key = ssh_key | |
577 | if ssh_key and ssh_pub: | |
578 | tkey = tempfile.NamedTemporaryFile(prefix='cephadm-identity-') | |
579 | tkey.write(ssh_key.encode('utf-8')) | |
580 | os.fchmod(tkey.fileno(), 0o600) | |
581 | tkey.flush() # make visible to other processes | |
582 | tpub = open(tkey.name + '.pub', 'w') | |
583 | os.fchmod(tpub.fileno(), 0o600) | |
584 | tpub.write(ssh_pub) | |
585 | tpub.flush() # make visible to other processes | |
586 | temp_files += [tkey, tpub] | |
587 | ssh_options += ['-i', tkey.name] | |
588 | ||
589 | self._temp_files = temp_files | |
590 | if ssh_options: | |
591 | self._ssh_options = ' '.join(ssh_options) # type: Optional[str] | |
592 | else: | |
593 | self._ssh_options = None | |
594 | ||
595 | if self.mode == 'root': | |
f6b5b4d7 | 596 | self.ssh_user = self.get_store('ssh_user', default='root') |
9f95a23c TL |
597 | elif self.mode == 'cephadm-package': |
598 | self.ssh_user = 'cephadm' | |
599 | ||
600 | self._reset_cons() | |
601 | ||
f91f0fd5 TL |
602 | def validate_ssh_config_content(self, ssh_config): |
603 | if ssh_config is None or len(ssh_config.strip()) == 0: | |
604 | raise OrchestratorValidationError('ssh_config cannot be empty') | |
605 | # StrictHostKeyChecking is [yes|no] ? | |
606 | l = re.findall(r'StrictHostKeyChecking\s+.*', ssh_config) | |
607 | if not l: | |
608 | raise OrchestratorValidationError('ssh_config requires StrictHostKeyChecking') | |
609 | for s in l: | |
610 | if 'ask' in s.lower(): | |
611 | raise OrchestratorValidationError(f'ssh_config cannot contain: \'{s}\'') | |
612 | ||
801d1391 TL |
613 | def validate_ssh_config_fname(self, ssh_config_fname): |
614 | if not os.path.isfile(ssh_config_fname): | |
615 | raise OrchestratorValidationError("ssh_config \"{}\" does not exist".format( | |
616 | ssh_config_fname)) | |
617 | ||
9f95a23c TL |
618 | def _reset_con(self, host): |
619 | conn, r = self._cons.get(host, (None, None)) | |
620 | if conn: | |
621 | self.log.debug('_reset_con close %s' % host) | |
622 | conn.exit() | |
623 | del self._cons[host] | |
624 | ||
625 | def _reset_cons(self): | |
626 | for host, conn_and_r in self._cons.items(): | |
627 | self.log.debug('_reset_cons close %s' % host) | |
628 | conn, r = conn_and_r | |
629 | conn.exit() | |
630 | self._cons = {} | |
631 | ||
1911f103 TL |
632 | def offline_hosts_remove(self, host): |
633 | if host in self.offline_hosts: | |
634 | self.offline_hosts.remove(host) | |
635 | ||
9f95a23c TL |
636 | @staticmethod |
637 | def can_run(): | |
638 | if remoto is not None: | |
639 | return True, "" | |
640 | else: | |
641 | return False, "loading remoto library:{}".format( | |
f91f0fd5 | 642 | remoto_import_error) |
9f95a23c TL |
643 | |
644 | def available(self): | |
645 | """ | |
646 | The cephadm orchestrator is always available. | |
647 | """ | |
f6b5b4d7 TL |
648 | ok, err = self.can_run() |
649 | if not ok: | |
650 | return ok, err | |
651 | if not self.ssh_key or not self.ssh_pub: | |
652 | return False, 'SSH keys not set. Use `ceph cephadm set-priv-key` and `ceph cephadm set-pub-key` or `ceph cephadm generate-key`' | |
653 | return True, '' | |
9f95a23c TL |
654 | |
655 | def process(self, completions): | |
656 | """ | |
657 | Does nothing, as completions are processed in another thread. | |
658 | """ | |
659 | if completions: | |
f91f0fd5 TL |
660 | self.log.debug("process: completions={0}".format( |
661 | orchestrator.pretty_print(completions))) | |
9f95a23c TL |
662 | |
663 | for p in completions: | |
e306af50 | 664 | p.evaluate() |
9f95a23c TL |
665 | |
666 | @orchestrator._cli_write_command( | |
667 | prefix='cephadm set-ssh-config', | |
668 | desc='Set the ssh_config file (use -i <ssh_config>)') | |
669 | def _set_ssh_config(self, inbuf=None): | |
670 | """ | |
671 | Set an ssh_config file provided from stdin | |
9f95a23c | 672 | """ |
f6b5b4d7 TL |
673 | if inbuf == self.ssh_config: |
674 | return 0, "value unchanged", "" | |
f91f0fd5 | 675 | self.validate_ssh_config_content(inbuf) |
9f95a23c TL |
676 | self.set_store("ssh_config", inbuf) |
677 | self.log.info('Set ssh_config') | |
f6b5b4d7 | 678 | self._reconfig_ssh() |
9f95a23c TL |
679 | return 0, "", "" |
680 | ||
681 | @orchestrator._cli_write_command( | |
682 | prefix='cephadm clear-ssh-config', | |
683 | desc='Clear the ssh_config file') | |
684 | def _clear_ssh_config(self): | |
685 | """ | |
686 | Clear the ssh_config file provided from stdin | |
687 | """ | |
688 | self.set_store("ssh_config", None) | |
689 | self.ssh_config_tmp = None | |
690 | self.log.info('Cleared ssh_config') | |
f6b5b4d7 | 691 | self._reconfig_ssh() |
9f95a23c TL |
692 | return 0, "", "" |
693 | ||
801d1391 TL |
694 | @orchestrator._cli_read_command( |
695 | prefix='cephadm get-ssh-config', | |
696 | desc='Returns the ssh config as used by cephadm' | |
697 | ) | |
698 | def _get_ssh_config(self): | |
699 | if self.ssh_config_file: | |
700 | self.validate_ssh_config_fname(self.ssh_config_file) | |
701 | with open(self.ssh_config_file) as f: | |
702 | return HandleCommandResult(stdout=f.read()) | |
703 | ssh_config = self.get_store("ssh_config") | |
704 | if ssh_config: | |
705 | return HandleCommandResult(stdout=ssh_config) | |
706 | return HandleCommandResult(stdout=DEFAULT_SSH_CONFIG) | |
707 | ||
9f95a23c TL |
708 | @orchestrator._cli_write_command( |
709 | 'cephadm generate-key', | |
710 | desc='Generate a cluster SSH key (if not present)') | |
711 | def _generate_key(self): | |
712 | if not self.ssh_pub or not self.ssh_key: | |
713 | self.log.info('Generating ssh key...') | |
714 | tmp_dir = TemporaryDirectory() | |
715 | path = tmp_dir.name + '/key' | |
716 | try: | |
1911f103 | 717 | subprocess.check_call([ |
9f95a23c TL |
718 | '/usr/bin/ssh-keygen', |
719 | '-C', 'ceph-%s' % self._cluster_fsid, | |
720 | '-N', '', | |
721 | '-f', path | |
722 | ]) | |
723 | with open(path, 'r') as f: | |
724 | secret = f.read() | |
725 | with open(path + '.pub', 'r') as f: | |
726 | pub = f.read() | |
727 | finally: | |
728 | os.unlink(path) | |
729 | os.unlink(path + '.pub') | |
730 | tmp_dir.cleanup() | |
731 | self.set_store('ssh_identity_key', secret) | |
732 | self.set_store('ssh_identity_pub', pub) | |
733 | self._reconfig_ssh() | |
734 | return 0, '', '' | |
735 | ||
e306af50 TL |
736 | @orchestrator._cli_write_command( |
737 | 'cephadm set-priv-key', | |
738 | desc='Set cluster SSH private key (use -i <private_key>)') | |
739 | def _set_priv_key(self, inbuf=None): | |
740 | if inbuf is None or len(inbuf) == 0: | |
741 | return -errno.EINVAL, "", "empty private ssh key provided" | |
f6b5b4d7 TL |
742 | if inbuf == self.ssh_key: |
743 | return 0, "value unchanged", "" | |
e306af50 TL |
744 | self.set_store("ssh_identity_key", inbuf) |
745 | self.log.info('Set ssh private key') | |
746 | self._reconfig_ssh() | |
747 | return 0, "", "" | |
748 | ||
749 | @orchestrator._cli_write_command( | |
750 | 'cephadm set-pub-key', | |
751 | desc='Set cluster SSH public key (use -i <public_key>)') | |
752 | def _set_pub_key(self, inbuf=None): | |
753 | if inbuf is None or len(inbuf) == 0: | |
754 | return -errno.EINVAL, "", "empty public ssh key provided" | |
f6b5b4d7 TL |
755 | if inbuf == self.ssh_pub: |
756 | return 0, "value unchanged", "" | |
e306af50 TL |
757 | self.set_store("ssh_identity_pub", inbuf) |
758 | self.log.info('Set ssh public key') | |
759 | self._reconfig_ssh() | |
760 | return 0, "", "" | |
761 | ||
9f95a23c TL |
762 | @orchestrator._cli_write_command( |
763 | 'cephadm clear-key', | |
764 | desc='Clear cluster SSH key') | |
765 | def _clear_key(self): | |
766 | self.set_store('ssh_identity_key', None) | |
767 | self.set_store('ssh_identity_pub', None) | |
768 | self._reconfig_ssh() | |
769 | self.log.info('Cleared cluster SSH key') | |
770 | return 0, '', '' | |
771 | ||
772 | @orchestrator._cli_read_command( | |
773 | 'cephadm get-pub-key', | |
774 | desc='Show SSH public key for connecting to cluster hosts') | |
775 | def _get_pub_key(self): | |
776 | if self.ssh_pub: | |
777 | return 0, self.ssh_pub, '' | |
778 | else: | |
779 | return -errno.ENOENT, '', 'No cluster SSH key defined' | |
780 | ||
781 | @orchestrator._cli_read_command( | |
782 | 'cephadm get-user', | |
783 | desc='Show user for SSHing to cluster hosts') | |
784 | def _get_user(self): | |
785 | return 0, self.ssh_user, '' | |
786 | ||
f6b5b4d7 TL |
787 | @orchestrator._cli_read_command( |
788 | 'cephadm set-user', | |
789 | 'name=user,type=CephString', | |
790 | 'Set user for SSHing to cluster hosts, passwordless sudo will be needed for non-root users') | |
791 | def set_ssh_user(self, user): | |
792 | current_user = self.ssh_user | |
793 | if user == current_user: | |
794 | return 0, "value unchanged", "" | |
795 | ||
796 | self.set_store('ssh_user', user) | |
797 | self._reconfig_ssh() | |
798 | ||
799 | host = self.cache.get_hosts()[0] | |
f91f0fd5 | 800 | r = CephadmServe(self)._check_host(host) |
f6b5b4d7 | 801 | if r is not None: |
f91f0fd5 | 802 | # connection failed reset user |
f6b5b4d7 TL |
803 | self.set_store('ssh_user', current_user) |
804 | self._reconfig_ssh() | |
805 | return -errno.EINVAL, '', 'ssh connection %s@%s failed' % (user, host) | |
806 | ||
807 | msg = 'ssh user set to %s' % user | |
808 | if user != 'root': | |
809 | msg += ' sudo will be used' | |
810 | self.log.info(msg) | |
811 | return 0, msg, '' | |
812 | ||
813 | @orchestrator._cli_read_command( | |
814 | 'cephadm registry-login', | |
815 | "name=url,type=CephString,req=false " | |
816 | "name=username,type=CephString,req=false " | |
817 | "name=password,type=CephString,req=false", | |
818 | 'Set custom registry login info by providing url, username and password or json file with login info (-i <file>)') | |
819 | def registry_login(self, url=None, username=None, password=None, inbuf=None): | |
820 | # if password not given in command line, get it through file input | |
821 | if not (url and username and password) and (inbuf is None or len(inbuf) == 0): | |
822 | return -errno.EINVAL, "", ("Invalid arguments. Please provide arguments <url> <username> <password> " | |
f91f0fd5 | 823 | "or -i <login credentials json file>") |
f6b5b4d7 TL |
824 | elif not (url and username and password): |
825 | login_info = json.loads(inbuf) | |
826 | if "url" in login_info and "username" in login_info and "password" in login_info: | |
827 | url = login_info["url"] | |
828 | username = login_info["username"] | |
829 | password = login_info["password"] | |
830 | else: | |
831 | return -errno.EINVAL, "", ("json provided for custom registry login did not include all necessary fields. " | |
f91f0fd5 TL |
832 | "Please setup json file as\n" |
833 | "{\n" | |
834 | " \"url\": \"REGISTRY_URL\",\n" | |
835 | " \"username\": \"REGISTRY_USERNAME\",\n" | |
836 | " \"password\": \"REGISTRY_PASSWORD\"\n" | |
837 | "}\n") | |
f6b5b4d7 TL |
838 | # verify login info works by attempting login on random host |
839 | host = None | |
840 | for host_name in self.inventory.keys(): | |
841 | host = host_name | |
842 | break | |
843 | if not host: | |
844 | raise OrchestratorError('no hosts defined') | |
845 | r = self._registry_login(host, url, username, password) | |
846 | if r is not None: | |
847 | return 1, '', r | |
848 | # if logins succeeded, store info | |
849 | self.log.debug("Host logins successful. Storing login info.") | |
850 | self.set_module_option('registry_url', url) | |
851 | self.set_module_option('registry_username', username) | |
852 | self.set_module_option('registry_password', password) | |
853 | # distribute new login info to all hosts | |
854 | self.cache.distribute_new_registry_login_info() | |
855 | return 0, "registry login scheduled", '' | |
856 | ||
9f95a23c TL |
857 | @orchestrator._cli_read_command( |
858 | 'cephadm check-host', | |
859 | 'name=host,type=CephString ' | |
860 | 'name=addr,type=CephString,req=false', | |
861 | 'Check whether we can access and manage a remote host') | |
862 | def check_host(self, host, addr=None): | |
f6b5b4d7 TL |
863 | try: |
864 | out, err, code = self._run_cephadm(host, cephadmNoImage, 'check-host', | |
865 | ['--expect-hostname', host], | |
866 | addr=addr, | |
867 | error_ok=True, no_fsid=True) | |
868 | if code: | |
869 | return 1, '', ('check-host failed:\n' + '\n'.join(err)) | |
870 | except OrchestratorError as e: | |
871 | self.log.exception(f"check-host failed for '{host}'") | |
872 | return 1, '', ('check-host failed:\n' + | |
f91f0fd5 | 873 | f"Host '{host}' not found. Use 'ceph orch host ls' to see all managed hosts.") |
9f95a23c TL |
874 | # if we have an outstanding health alert for this host, give the |
875 | # serve thread a kick | |
876 | if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: | |
877 | for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: | |
878 | if item.startswith('host %s ' % host): | |
879 | self.event.set() | |
880 | return 0, '%s (%s) ok' % (host, addr), err | |
881 | ||
882 | @orchestrator._cli_read_command( | |
883 | 'cephadm prepare-host', | |
884 | 'name=host,type=CephString ' | |
885 | 'name=addr,type=CephString,req=false', | |
886 | 'Prepare a remote host for use with cephadm') | |
887 | def _prepare_host(self, host, addr=None): | |
f6b5b4d7 | 888 | out, err, code = self._run_cephadm(host, cephadmNoImage, 'prepare-host', |
9f95a23c TL |
889 | ['--expect-hostname', host], |
890 | addr=addr, | |
891 | error_ok=True, no_fsid=True) | |
892 | if code: | |
893 | return 1, '', ('prepare-host failed:\n' + '\n'.join(err)) | |
894 | # if we have an outstanding health alert for this host, give the | |
895 | # serve thread a kick | |
896 | if 'CEPHADM_HOST_CHECK_FAILED' in self.health_checks: | |
897 | for item in self.health_checks['CEPHADM_HOST_CHECK_FAILED']['detail']: | |
898 | if item.startswith('host %s ' % host): | |
899 | self.event.set() | |
900 | return 0, '%s (%s) ok' % (host, addr), err | |
901 | ||
f91f0fd5 TL |
902 | @orchestrator._cli_write_command( |
903 | prefix='cephadm set-extra-ceph-conf', | |
904 | desc="Text that is appended to all daemon's ceph.conf.\n" | |
905 | "Mainly a workaround, till `config generate-minimal-conf` generates\n" | |
906 | "a complete ceph.conf.\n\n" | |
907 | "Warning: this is a dangerous operation.") | |
908 | def _set_extra_ceph_conf(self, inbuf=None) -> HandleCommandResult: | |
909 | if inbuf: | |
910 | # sanity check. | |
911 | cp = ConfigParser() | |
912 | cp.read_string(inbuf, source='<infile>') | |
913 | ||
914 | self.set_store("extra_ceph_conf", json.dumps({ | |
915 | 'conf': inbuf, | |
916 | 'last_modified': datetime_to_str(datetime.datetime.utcnow()) | |
917 | })) | |
918 | self.log.info('Set extra_ceph_conf') | |
919 | self._kick_serve_loop() | |
920 | return HandleCommandResult() | |
921 | ||
922 | @orchestrator._cli_read_command( | |
923 | 'cephadm get-extra-ceph-conf', | |
924 | desc='Get extra ceph conf that is appended') | |
925 | def _get_extra_ceph_conf(self) -> HandleCommandResult: | |
926 | return HandleCommandResult(stdout=self.extra_ceph_conf().conf) | |
927 | ||
928 | class ExtraCephConf(NamedTuple): | |
929 | conf: str | |
930 | last_modified: Optional[datetime.datetime] | |
931 | ||
932 | def extra_ceph_conf(self) -> 'CephadmOrchestrator.ExtraCephConf': | |
933 | data = self.get_store('extra_ceph_conf') | |
934 | if not data: | |
935 | return CephadmOrchestrator.ExtraCephConf('', None) | |
936 | try: | |
937 | j = json.loads(data) | |
938 | except ValueError: | |
939 | self.log.exception('unable to laod extra_ceph_conf') | |
940 | return CephadmOrchestrator.ExtraCephConf('', None) | |
941 | return CephadmOrchestrator.ExtraCephConf(j['conf'], str_to_datetime(j['last_modified'])) | |
942 | ||
943 | def extra_ceph_conf_is_newer(self, dt: datetime.datetime) -> bool: | |
944 | conf = self.extra_ceph_conf() | |
945 | if not conf.last_modified: | |
946 | return False | |
947 | return conf.last_modified > dt | |
948 | ||
f6b5b4d7 | 949 | def _get_connection(self, host: str): |
9f95a23c TL |
950 | """ |
951 | Setup a connection for running commands on remote host. | |
952 | """ | |
e306af50 TL |
953 | conn, r = self._cons.get(host, (None, None)) |
954 | if conn: | |
955 | if conn.has_connection(): | |
956 | self.log.debug('Have connection to %s' % host) | |
957 | return conn, r | |
958 | else: | |
959 | self._reset_con(host) | |
9f95a23c TL |
960 | n = self.ssh_user + '@' + host |
961 | self.log.debug("Opening connection to {} with ssh options '{}'".format( | |
962 | n, self._ssh_options)) | |
f91f0fd5 | 963 | child_logger = self.log.getChild(n) |
9f95a23c TL |
964 | child_logger.setLevel('WARNING') |
965 | conn = remoto.Connection( | |
966 | n, | |
967 | logger=child_logger, | |
f6b5b4d7 TL |
968 | ssh_options=self._ssh_options, |
969 | sudo=True if self.ssh_user != 'root' else False) | |
9f95a23c TL |
970 | |
971 | r = conn.import_module(remotes) | |
972 | self._cons[host] = conn, r | |
973 | ||
974 | return conn, r | |
975 | ||
976 | def _executable_path(self, conn, executable): | |
977 | """ | |
978 | Remote validator that accepts a connection object to ensure that a certain | |
979 | executable is available returning its full path if so. | |
980 | ||
981 | Otherwise an exception with thorough details will be raised, informing the | |
982 | user that the executable was not found. | |
983 | """ | |
984 | executable_path = conn.remote_module.which(executable) | |
985 | if not executable_path: | |
986 | raise RuntimeError("Executable '{}' not found on host '{}'".format( | |
987 | executable, conn.hostname)) | |
988 | self.log.debug("Found executable '{}' at path '{}'".format(executable, | |
f91f0fd5 | 989 | executable_path)) |
9f95a23c TL |
990 | return executable_path |
991 | ||
f6b5b4d7 TL |
992 | @contextmanager |
993 | def _remote_connection(self, | |
994 | host: str, | |
f91f0fd5 | 995 | addr: Optional[str] = None, |
f6b5b4d7 | 996 | ) -> Iterator[Tuple["BaseConnection", Any]]: |
9f95a23c | 997 | if not addr and host in self.inventory: |
e306af50 | 998 | addr = self.inventory.get_addr(host) |
9f95a23c | 999 | |
1911f103 TL |
1000 | self.offline_hosts_remove(host) |
1001 | ||
9f95a23c | 1002 | try: |
1911f103 | 1003 | try: |
f6b5b4d7 TL |
1004 | if not addr: |
1005 | raise OrchestratorError("host address is empty") | |
1911f103 | 1006 | conn, connr = self._get_connection(addr) |
e306af50 | 1007 | except OSError as e: |
f6b5b4d7 TL |
1008 | self._reset_con(host) |
1009 | msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}" | |
1010 | raise execnet.gateway_bootstrap.HostNotFound(msg) | |
9f95a23c | 1011 | |
f6b5b4d7 TL |
1012 | yield (conn, connr) |
1013 | ||
1014 | except execnet.gateway_bootstrap.HostNotFound as e: | |
1015 | # this is a misleading exception as it seems to be thrown for | |
1016 | # any sort of connection failure, even those having nothing to | |
1017 | # do with "host not found" (e.g., ssh key permission denied). | |
1018 | self.offline_hosts.add(host) | |
1019 | self._reset_con(host) | |
1020 | ||
1021 | user = self.ssh_user if self.mode == 'root' else 'cephadm' | |
1022 | msg = f'''Failed to connect to {host} ({addr}). | |
f91f0fd5 TL |
1023 | Please make sure that the host is reachable and accepts connections using the cephadm SSH key |
1024 | ||
1025 | To add the cephadm SSH key to the host: | |
1026 | > ceph cephadm get-pub-key > ~/ceph.pub | |
1027 | > ssh-copy-id -f -i ~/ceph.pub {user}@{host} | |
f6b5b4d7 | 1028 | |
f91f0fd5 | 1029 | To check that the host is reachable: |
f6b5b4d7 | 1030 | > ceph cephadm get-ssh-config > ssh_config |
f91f0fd5 TL |
1031 | > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key |
1032 | > ssh -F ssh_config -i ~/cephadm_private_key {user}@{host}''' | |
f6b5b4d7 TL |
1033 | raise OrchestratorError(msg) from e |
1034 | except Exception as ex: | |
1035 | self.log.exception(ex) | |
1036 | raise | |
1037 | ||
f91f0fd5 | 1038 | def _get_container_image(self, daemon_name: str) -> Optional[str]: |
f6b5b4d7 TL |
1039 | daemon_type = daemon_name.split('.', 1)[0] # type: ignore |
1040 | if daemon_type in CEPH_TYPES or \ | |
1041 | daemon_type == 'nfs' or \ | |
1042 | daemon_type == 'iscsi': | |
1043 | # get container image | |
1044 | ret, image, err = self.check_mon_command({ | |
1045 | 'prefix': 'config get', | |
1046 | 'who': utils.name_to_config_section(daemon_name), | |
1047 | 'key': 'container_image', | |
1048 | }) | |
1049 | image = image.strip() # type: ignore | |
1050 | elif daemon_type == 'prometheus': | |
1051 | image = self.container_image_prometheus | |
1052 | elif daemon_type == 'grafana': | |
1053 | image = self.container_image_grafana | |
1054 | elif daemon_type == 'alertmanager': | |
1055 | image = self.container_image_alertmanager | |
1056 | elif daemon_type == 'node-exporter': | |
1057 | image = self.container_image_node_exporter | |
f91f0fd5 TL |
1058 | elif daemon_type == CustomContainerService.TYPE: |
1059 | # The image can't be resolved, the necessary information | |
1060 | # is only available when a container is deployed (given | |
1061 | # via spec). | |
1062 | image = None | |
f6b5b4d7 TL |
1063 | else: |
1064 | assert False, daemon_type | |
1065 | ||
1066 | self.log.debug('%s container image %s' % (daemon_name, image)) | |
1067 | ||
1068 | return image | |
1069 | ||
1070 | def _run_cephadm(self, | |
1071 | host: str, | |
1072 | entity: Union[CephadmNoImage, str], | |
1073 | command: str, | |
1074 | args: List[str], | |
1075 | addr: Optional[str] = "", | |
1076 | stdin: Optional[str] = "", | |
1077 | no_fsid: Optional[bool] = False, | |
1078 | error_ok: Optional[bool] = False, | |
1079 | image: Optional[str] = "", | |
f91f0fd5 | 1080 | env_vars: Optional[List[str]] = None, |
f6b5b4d7 TL |
1081 | ) -> Tuple[List[str], List[str], int]: |
1082 | """ | |
1083 | Run cephadm on the remote host with the given command + args | |
1084 | ||
1085 | :env_vars: in format -> [KEY=VALUE, ..] | |
1086 | """ | |
1087 | with self._remote_connection(host, addr) as tpl: | |
1088 | conn, connr = tpl | |
9f95a23c | 1089 | assert image or entity |
f6b5b4d7 TL |
1090 | if not image and entity is not cephadmNoImage: |
1091 | image = self._get_container_image(entity) | |
9f95a23c TL |
1092 | |
1093 | final_args = [] | |
e306af50 TL |
1094 | |
1095 | if env_vars: | |
1096 | for env_var_pair in env_vars: | |
1097 | final_args.extend(['--env', env_var_pair]) | |
1098 | ||
9f95a23c TL |
1099 | if image: |
1100 | final_args.extend(['--image', image]) | |
1101 | final_args.append(command) | |
1102 | ||
1103 | if not no_fsid: | |
1104 | final_args += ['--fsid', self._cluster_fsid] | |
f91f0fd5 TL |
1105 | |
1106 | if self.container_init: | |
1107 | final_args += ['--container-init'] | |
1108 | ||
9f95a23c TL |
1109 | final_args += args |
1110 | ||
e306af50 | 1111 | self.log.debug('args: %s' % (' '.join(final_args))) |
9f95a23c | 1112 | if self.mode == 'root': |
9f95a23c TL |
1113 | if stdin: |
1114 | self.log.debug('stdin: %s' % stdin) | |
1115 | script = 'injected_argv = ' + json.dumps(final_args) + '\n' | |
1116 | if stdin: | |
1117 | script += 'injected_stdin = ' + json.dumps(stdin) + '\n' | |
1118 | script += self._cephadm | |
1119 | python = connr.choose_python() | |
1120 | if not python: | |
1121 | raise RuntimeError( | |
1122 | 'unable to find python on %s (tried %s in %s)' % ( | |
1123 | host, remotes.PYTHONS, remotes.PATH)) | |
1124 | try: | |
1125 | out, err, code = remoto.process.check( | |
f91f0fd5 TL |
1126 | conn, |
1127 | [python, '-u'], | |
1128 | stdin=script.encode('utf-8')) | |
9f95a23c TL |
1129 | except RuntimeError as e: |
1130 | self._reset_con(host) | |
1131 | if error_ok: | |
1132 | return [], [str(e)], 1 | |
1133 | raise | |
1134 | elif self.mode == 'cephadm-package': | |
1135 | try: | |
1136 | out, err, code = remoto.process.check( | |
1137 | conn, | |
1138 | ['sudo', '/usr/bin/cephadm'] + final_args, | |
1139 | stdin=stdin) | |
1140 | except RuntimeError as e: | |
1141 | self._reset_con(host) | |
1142 | if error_ok: | |
1143 | return [], [str(e)], 1 | |
1144 | raise | |
1145 | else: | |
1146 | assert False, 'unsupported mode' | |
1147 | ||
1148 | self.log.debug('code: %d' % code) | |
1149 | if out: | |
1150 | self.log.debug('out: %s' % '\n'.join(out)) | |
1151 | if err: | |
1152 | self.log.debug('err: %s' % '\n'.join(err)) | |
1153 | if code and not error_ok: | |
f6b5b4d7 | 1154 | raise OrchestratorError( |
9f95a23c TL |
1155 | 'cephadm exited with an error code: %d, stderr:%s' % ( |
1156 | code, '\n'.join(err))) | |
1157 | return out, err, code | |
1158 | ||
f91f0fd5 TL |
1159 | def _hosts_with_daemon_inventory(self) -> List[HostSpec]: |
1160 | """ | |
1161 | Returns all hosts that went through _refresh_host_daemons(). | |
9f95a23c | 1162 | |
f91f0fd5 TL |
1163 | This mitigates a potential race, where new host was added *after* |
1164 | ``_refresh_host_daemons()`` was called, but *before* | |
1165 | ``_apply_all_specs()`` was called. thus we end up with a hosts | |
1166 | where daemons might be running, but we have not yet detected them. | |
1167 | """ | |
1168 | return [ | |
1169 | h for h in self.inventory.all_specs() | |
1170 | if self.cache.host_had_daemon_refresh(h.hostname) | |
1171 | ] | |
9f95a23c | 1172 | |
e306af50 | 1173 | def _add_host(self, spec): |
9f95a23c TL |
1174 | # type: (HostSpec) -> str |
1175 | """ | |
1176 | Add a host to be managed by the orchestrator. | |
1177 | ||
1178 | :param host: host name | |
1179 | """ | |
1180 | assert_valid_host(spec.hostname) | |
f6b5b4d7 | 1181 | out, err, code = self._run_cephadm(spec.hostname, cephadmNoImage, 'check-host', |
9f95a23c TL |
1182 | ['--expect-hostname', spec.hostname], |
1183 | addr=spec.addr, | |
1184 | error_ok=True, no_fsid=True) | |
1185 | if code: | |
1186 | raise OrchestratorError('New host %s (%s) failed check: %s' % ( | |
1187 | spec.hostname, spec.addr, err)) | |
1188 | ||
e306af50 | 1189 | self.inventory.add_host(spec) |
9f95a23c | 1190 | self.cache.prime_empty_host(spec.hostname) |
1911f103 | 1191 | self.offline_hosts_remove(spec.hostname) |
9f95a23c TL |
1192 | self.event.set() # refresh stray health check |
1193 | self.log.info('Added host %s' % spec.hostname) | |
1194 | return "Added host '{}'".format(spec.hostname) | |
1195 | ||
e306af50 TL |
1196 | @trivial_completion |
1197 | def add_host(self, spec: HostSpec) -> str: | |
1198 | return self._add_host(spec) | |
1199 | ||
1200 | @trivial_completion | |
9f95a23c TL |
1201 | def remove_host(self, host): |
1202 | # type: (str) -> str | |
1203 | """ | |
1204 | Remove a host from orchestrator management. | |
1205 | ||
1206 | :param host: host name | |
1207 | """ | |
e306af50 | 1208 | self.inventory.rm_host(host) |
9f95a23c TL |
1209 | self.cache.rm_host(host) |
1210 | self._reset_con(host) | |
1211 | self.event.set() # refresh stray health check | |
1212 | self.log.info('Removed host %s' % host) | |
1213 | return "Removed host '{}'".format(host) | |
1214 | ||
e306af50 | 1215 | @trivial_completion |
f6b5b4d7 | 1216 | def update_host_addr(self, host, addr) -> str: |
e306af50 | 1217 | self.inventory.set_addr(host, addr) |
9f95a23c TL |
1218 | self._reset_con(host) |
1219 | self.event.set() # refresh stray health check | |
1220 | self.log.info('Set host %s addr to %s' % (host, addr)) | |
1221 | return "Updated host '{}' addr to '{}'".format(host, addr) | |
1222 | ||
1223 | @trivial_completion | |
1224 | def get_hosts(self): | |
1225 | # type: () -> List[orchestrator.HostSpec] | |
1226 | """ | |
1227 | Return a list of hosts managed by the orchestrator. | |
1228 | ||
1229 | Notes: | |
1230 | - skip async: manager reads from cache. | |
1231 | """ | |
e306af50 | 1232 | return list(self.inventory.all_specs()) |
9f95a23c | 1233 | |
e306af50 | 1234 | @trivial_completion |
f6b5b4d7 | 1235 | def add_host_label(self, host, label) -> str: |
e306af50 | 1236 | self.inventory.add_label(host, label) |
9f95a23c TL |
1237 | self.log.info('Added label %s to host %s' % (label, host)) |
1238 | return 'Added label %s to host %s' % (label, host) | |
1239 | ||
e306af50 | 1240 | @trivial_completion |
f6b5b4d7 | 1241 | def remove_host_label(self, host, label) -> str: |
e306af50 | 1242 | self.inventory.rm_label(host, label) |
9f95a23c TL |
1243 | self.log.info('Removed label %s to host %s' % (label, host)) |
1244 | return 'Removed label %s from host %s' % (label, host) | |
1245 | ||
f6b5b4d7 TL |
1246 | @trivial_completion |
1247 | def host_ok_to_stop(self, hostname: str): | |
1248 | if hostname not in self.cache.get_hosts(): | |
1249 | raise OrchestratorError(f'Cannot find host "{hostname}"') | |
1250 | ||
1251 | daemons = self.cache.get_daemons() | |
1252 | daemon_map = defaultdict(lambda: []) | |
1253 | for dd in daemons: | |
1254 | if dd.hostname == hostname: | |
1255 | daemon_map[dd.daemon_type].append(dd.daemon_id) | |
1256 | ||
f91f0fd5 | 1257 | for daemon_type, daemon_ids in daemon_map.items(): |
f6b5b4d7 TL |
1258 | r = self.cephadm_services[daemon_type].ok_to_stop(daemon_ids) |
1259 | if r.retval: | |
1260 | self.log.error(f'It is NOT safe to stop host {hostname}') | |
1261 | raise orchestrator.OrchestratorError( | |
f91f0fd5 TL |
1262 | r.stderr, |
1263 | errno=r.retval) | |
f6b5b4d7 TL |
1264 | |
1265 | msg = f'It is presumed safe to stop host {hostname}' | |
1266 | self.log.info(msg) | |
1267 | return msg | |
1268 | ||
f91f0fd5 TL |
1269 | def get_minimal_ceph_conf(self) -> str: |
1270 | _, config, _ = self.check_mon_command({ | |
f6b5b4d7 TL |
1271 | "prefix": "config generate-minimal-conf", |
1272 | }) | |
f91f0fd5 TL |
1273 | extra = self.extra_ceph_conf().conf |
1274 | if extra: | |
1275 | config += '\n\n' + extra.strip() + '\n' | |
1276 | return config | |
f6b5b4d7 TL |
1277 | |
1278 | def _invalidate_daemons_and_kick_serve(self, filter_host=None): | |
1279 | if filter_host: | |
1280 | self.cache.invalidate_host_daemons(filter_host) | |
1281 | else: | |
1282 | for h in self.cache.get_hosts(): | |
1283 | # Also discover daemons deployed manually | |
1284 | self.cache.invalidate_host_daemons(h) | |
1285 | ||
1286 | self._kick_serve_loop() | |
1287 | ||
9f95a23c | 1288 | @trivial_completion |
f6b5b4d7 TL |
1289 | def describe_service(self, service_type: Optional[str] = None, service_name: Optional[str] = None, |
1290 | refresh: bool = False) -> List[orchestrator.ServiceDescription]: | |
9f95a23c | 1291 | if refresh: |
f6b5b4d7 TL |
1292 | self._invalidate_daemons_and_kick_serve() |
1293 | self.log.info('Kicked serve() loop to refresh all services') | |
1294 | ||
9f95a23c | 1295 | # <service_map> |
f6b5b4d7 | 1296 | sm: Dict[str, orchestrator.ServiceDescription] = {} |
e306af50 | 1297 | osd_count = 0 |
1911f103 | 1298 | for h, dm in self.cache.get_daemons_with_volatile_status(): |
9f95a23c TL |
1299 | for name, dd in dm.items(): |
1300 | if service_type and service_type != dd.daemon_type: | |
1301 | continue | |
1302 | n: str = dd.service_name() | |
1303 | if service_name and service_name != n: | |
1304 | continue | |
1305 | if dd.daemon_type == 'osd': | |
e306af50 TL |
1306 | """ |
1307 | OSDs do not know the affinity to their spec out of the box. | |
1308 | """ | |
1309 | n = f"osd.{dd.osdspec_affinity}" | |
f6b5b4d7 TL |
1310 | if not dd.osdspec_affinity: |
1311 | # If there is no osdspec_affinity, the spec should suffice for displaying | |
1312 | continue | |
e306af50 TL |
1313 | if n in self.spec_store.specs: |
1314 | spec = self.spec_store.specs[n] | |
1911f103 TL |
1315 | else: |
1316 | spec = ServiceSpec( | |
1317 | unmanaged=True, | |
1318 | service_type=dd.daemon_type, | |
1319 | service_id=dd.service_id(), | |
1320 | placement=PlacementSpec( | |
1321 | hosts=[dd.hostname] | |
1322 | ) | |
1323 | ) | |
9f95a23c TL |
1324 | if n not in sm: |
1325 | sm[n] = orchestrator.ServiceDescription( | |
9f95a23c TL |
1326 | last_refresh=dd.last_refresh, |
1327 | container_image_id=dd.container_image_id, | |
1328 | container_image_name=dd.container_image_name, | |
1329 | spec=spec, | |
f6b5b4d7 | 1330 | events=self.events.get_for_service(spec.service_name()), |
9f95a23c | 1331 | ) |
e306af50 TL |
1332 | if n in self.spec_store.specs: |
1333 | if dd.daemon_type == 'osd': | |
1334 | """ | |
1335 | The osd count can't be determined by the Placement spec. | |
f6b5b4d7 | 1336 | Showing an actual/expected representation cannot be determined |
e306af50 TL |
1337 | here. So we're setting running = size for now. |
1338 | """ | |
1339 | osd_count += 1 | |
1340 | sm[n].size = osd_count | |
1341 | else: | |
f6b5b4d7 TL |
1342 | sm[n].size = spec.placement.get_host_selection_size( |
1343 | self.inventory.all_specs()) | |
e306af50 TL |
1344 | |
1345 | sm[n].created = self.spec_store.spec_created[n] | |
1911f103 TL |
1346 | if service_type == 'nfs': |
1347 | spec = cast(NFSServiceSpec, spec) | |
1348 | sm[n].rados_config_location = spec.rados_config_location() | |
9f95a23c TL |
1349 | else: |
1350 | sm[n].size = 0 | |
1351 | if dd.status == 1: | |
1352 | sm[n].running += 1 | |
1353 | if not sm[n].last_refresh or not dd.last_refresh or dd.last_refresh < sm[n].last_refresh: # type: ignore | |
1354 | sm[n].last_refresh = dd.last_refresh | |
1355 | if sm[n].container_image_id != dd.container_image_id: | |
1356 | sm[n].container_image_id = 'mix' | |
1357 | if sm[n].container_image_name != dd.container_image_name: | |
1358 | sm[n].container_image_name = 'mix' | |
1359 | for n, spec in self.spec_store.specs.items(): | |
1360 | if n in sm: | |
1361 | continue | |
1362 | if service_type is not None and service_type != spec.service_type: | |
1363 | continue | |
1364 | if service_name is not None and service_name != n: | |
1365 | continue | |
1366 | sm[n] = orchestrator.ServiceDescription( | |
9f95a23c | 1367 | spec=spec, |
f6b5b4d7 | 1368 | size=spec.placement.get_host_selection_size(self.inventory.all_specs()), |
9f95a23c | 1369 | running=0, |
f6b5b4d7 | 1370 | events=self.events.get_for_service(spec.service_name()), |
9f95a23c | 1371 | ) |
1911f103 TL |
1372 | if service_type == 'nfs': |
1373 | spec = cast(NFSServiceSpec, spec) | |
1374 | sm[n].rados_config_location = spec.rados_config_location() | |
1375 | return list(sm.values()) | |
9f95a23c TL |
1376 | |
1377 | @trivial_completion | |
f6b5b4d7 TL |
1378 | def list_daemons(self, |
1379 | service_name: Optional[str] = None, | |
1380 | daemon_type: Optional[str] = None, | |
1381 | daemon_id: Optional[str] = None, | |
1382 | host: Optional[str] = None, | |
1383 | refresh: bool = False) -> List[orchestrator.DaemonDescription]: | |
9f95a23c | 1384 | if refresh: |
f6b5b4d7 TL |
1385 | self._invalidate_daemons_and_kick_serve(host) |
1386 | self.log.info('Kicked serve() loop to refresh all daemons') | |
1387 | ||
9f95a23c | 1388 | result = [] |
1911f103 | 1389 | for h, dm in self.cache.get_daemons_with_volatile_status(): |
9f95a23c TL |
1390 | if host and h != host: |
1391 | continue | |
1392 | for name, dd in dm.items(): | |
801d1391 TL |
1393 | if daemon_type is not None and daemon_type != dd.daemon_type: |
1394 | continue | |
1395 | if daemon_id is not None and daemon_id != dd.daemon_id: | |
9f95a23c | 1396 | continue |
801d1391 | 1397 | if service_name is not None and service_name != dd.service_name(): |
9f95a23c TL |
1398 | continue |
1399 | result.append(dd) | |
1400 | return result | |
1401 | ||
e306af50 | 1402 | @trivial_completion |
f6b5b4d7 | 1403 | def service_action(self, action, service_name) -> List[str]: |
9f95a23c TL |
1404 | args = [] |
1405 | for host, dm in self.cache.daemons.items(): | |
1406 | for name, d in dm.items(): | |
1407 | if d.matches_service(service_name): | |
1408 | args.append((d.daemon_type, d.daemon_id, | |
1409 | d.hostname, action)) | |
1410 | self.log.info('%s service %s' % (action.capitalize(), service_name)) | |
1411 | return self._daemon_actions(args) | |
1412 | ||
e306af50 | 1413 | @forall_hosts |
f6b5b4d7 TL |
1414 | def _daemon_actions(self, daemon_type, daemon_id, host, action) -> str: |
1415 | with set_exception_subject('daemon', DaemonDescription( | |
1416 | daemon_type=daemon_type, | |
1417 | daemon_id=daemon_id | |
1418 | ).name()): | |
1419 | return self._daemon_action(daemon_type, daemon_id, host, action) | |
1420 | ||
f91f0fd5 | 1421 | def _daemon_action(self, daemon_type, daemon_id, host, action, image=None) -> str: |
f6b5b4d7 TL |
1422 | daemon_spec: CephadmDaemonSpec = CephadmDaemonSpec( |
1423 | host=host, | |
1424 | daemon_id=daemon_id, | |
1425 | daemon_type=daemon_type, | |
1426 | ) | |
1427 | ||
f91f0fd5 | 1428 | self._daemon_action_set_image(action, image, daemon_type, daemon_id) |
9f95a23c | 1429 | |
9f95a23c | 1430 | if action == 'redeploy': |
f91f0fd5 TL |
1431 | if self.daemon_is_self(daemon_type, daemon_id): |
1432 | self.mgr_service.fail_over() | |
1433 | return '' # unreachable | |
9f95a23c | 1434 | # stop, recreate the container+unit, then restart |
f6b5b4d7 | 1435 | return self._create_daemon(daemon_spec) |
9f95a23c | 1436 | elif action == 'reconfig': |
f6b5b4d7 | 1437 | return self._create_daemon(daemon_spec, reconfig=True) |
9f95a23c TL |
1438 | |
1439 | actions = { | |
1440 | 'start': ['reset-failed', 'start'], | |
1441 | 'stop': ['stop'], | |
1442 | 'restart': ['reset-failed', 'restart'], | |
1443 | } | |
f6b5b4d7 | 1444 | name = daemon_spec.name() |
9f95a23c | 1445 | for a in actions[action]: |
f6b5b4d7 TL |
1446 | try: |
1447 | out, err, code = self._run_cephadm( | |
1448 | host, name, 'unit', | |
1449 | ['--name', name, a]) | |
1450 | except Exception: | |
1451 | self.log.exception(f'`{host}: cephadm unit {name} {a}` failed') | |
1452 | self.cache.invalidate_host_daemons(daemon_spec.host) | |
1453 | msg = "{} {} from host '{}'".format(action, name, daemon_spec.host) | |
1454 | self.events.for_daemon(name, 'INFO', msg) | |
1455 | return msg | |
9f95a23c | 1456 | |
f91f0fd5 TL |
1457 | def _daemon_action_set_image(self, action: str, image: Optional[str], daemon_type: str, daemon_id: str): |
1458 | if image is not None: | |
1459 | if action != 'redeploy': | |
1460 | raise OrchestratorError( | |
1461 | f'Cannot execute {action} with new image. `action` needs to be `redeploy`') | |
1462 | if daemon_type not in CEPH_TYPES: | |
1463 | raise OrchestratorError( | |
1464 | f'Cannot redeploy {daemon_type}.{daemon_id} with a new image: Supported ' | |
1465 | f'types are: {", ".join(CEPH_TYPES)}') | |
1466 | ||
1467 | self.check_mon_command({ | |
1468 | 'prefix': 'config set', | |
1469 | 'name': 'container_image', | |
1470 | 'value': image, | |
1471 | 'who': utils.name_to_config_section(daemon_type + '.' + daemon_id), | |
1472 | }) | |
1473 | ||
e306af50 | 1474 | @trivial_completion |
f91f0fd5 | 1475 | def daemon_action(self, action: str, daemon_name: str, image: Optional[str] = None) -> str: |
f6b5b4d7 TL |
1476 | d = self.cache.get_daemon(daemon_name) |
1477 | ||
f91f0fd5 TL |
1478 | if action == 'redeploy' and self.daemon_is_self(d.daemon_type, d.daemon_id) \ |
1479 | and not self.mgr_service.mgr_map_has_standby(): | |
1480 | raise OrchestratorError( | |
1481 | f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') | |
1482 | ||
1483 | self._daemon_action_set_image(action, image, d.daemon_type, d.daemon_id) | |
1484 | ||
1485 | self.log.info(f'Schedule {action} daemon {daemon_name}') | |
1486 | return self._schedule_daemon_action(daemon_name, action) | |
1487 | ||
1488 | def daemon_is_self(self, daemon_type: str, daemon_id: str) -> bool: | |
1489 | return daemon_type == 'mgr' and daemon_id == self.get_mgr_id() | |
1490 | ||
1491 | def _schedule_daemon_action(self, daemon_name: str, action: str): | |
1492 | dd = self.cache.get_daemon(daemon_name) | |
1493 | if action == 'redeploy' and self.daemon_is_self(dd.daemon_type, dd.daemon_id) \ | |
1494 | and not self.mgr_service.mgr_map_has_standby(): | |
1495 | raise OrchestratorError( | |
1496 | f'Unable to schedule redeploy for {daemon_name}: No standby MGRs') | |
1497 | self.cache.schedule_daemon_action(dd.hostname, dd.name(), action) | |
1498 | msg = "Scheduled to {} {} on host '{}'".format(action, daemon_name, dd.hostname) | |
1499 | self._kick_serve_loop() | |
1500 | return msg | |
9f95a23c | 1501 | |
e306af50 | 1502 | @trivial_completion |
9f95a23c | 1503 | def remove_daemons(self, names): |
e306af50 | 1504 | # type: (List[str]) -> List[str] |
9f95a23c TL |
1505 | args = [] |
1506 | for host, dm in self.cache.daemons.items(): | |
1507 | for name in names: | |
1508 | if name in dm: | |
1509 | args.append((name, host)) | |
1510 | if not args: | |
1511 | raise OrchestratorError('Unable to find daemon(s) %s' % (names)) | |
1512 | self.log.info('Remove daemons %s' % [a[0] for a in args]) | |
1513 | return self._remove_daemons(args) | |
1514 | ||
1515 | @trivial_completion | |
f6b5b4d7 | 1516 | def remove_service(self, service_name) -> str: |
9f95a23c | 1517 | self.log.info('Remove service %s' % service_name) |
e306af50 | 1518 | self._trigger_preview_refresh(service_name=service_name) |
1911f103 TL |
1519 | found = self.spec_store.rm(service_name) |
1520 | if found: | |
1521 | self._kick_serve_loop() | |
f6b5b4d7 | 1522 | return 'Removed service %s' % service_name |
1911f103 TL |
1523 | else: |
1524 | # must be idempotent: still a success. | |
f6b5b4d7 | 1525 | return f'Failed to remove service. <{service_name}> was not found.' |
9f95a23c TL |
1526 | |
1527 | @trivial_completion | |
f6b5b4d7 | 1528 | def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh=False) -> List[orchestrator.InventoryHost]: |
9f95a23c TL |
1529 | """ |
1530 | Return the storage inventory of hosts matching the given filter. | |
1531 | ||
1532 | :param host_filter: host filter | |
1533 | ||
1534 | TODO: | |
1535 | - add filtering by label | |
1536 | """ | |
1537 | if refresh: | |
f6b5b4d7 TL |
1538 | if host_filter and host_filter.hosts: |
1539 | for h in host_filter.hosts: | |
1540 | self.cache.invalidate_host_devices(h) | |
9f95a23c | 1541 | else: |
f6b5b4d7 TL |
1542 | for h in self.cache.get_hosts(): |
1543 | self.cache.invalidate_host_devices(h) | |
1544 | ||
1545 | self.event.set() | |
1546 | self.log.info('Kicked serve() loop to refresh devices') | |
9f95a23c TL |
1547 | |
1548 | result = [] | |
1549 | for host, dls in self.cache.devices.items(): | |
f6b5b4d7 | 1550 | if host_filter and host_filter.hosts and host not in host_filter.hosts: |
9f95a23c TL |
1551 | continue |
1552 | result.append(orchestrator.InventoryHost(host, | |
1553 | inventory.Devices(dls))) | |
1554 | return result | |
1555 | ||
1556 | @trivial_completion | |
f6b5b4d7 | 1557 | def zap_device(self, host, path) -> str: |
9f95a23c TL |
1558 | self.log.info('Zap device %s:%s' % (host, path)) |
1559 | out, err, code = self._run_cephadm( | |
1560 | host, 'osd', 'ceph-volume', | |
1561 | ['--', 'lvm', 'zap', '--destroy', path], | |
1562 | error_ok=True) | |
1563 | self.cache.invalidate_host_devices(host) | |
1564 | if code: | |
1565 | raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err)) | |
1566 | return '\n'.join(out + err) | |
1567 | ||
e306af50 | 1568 | @trivial_completion |
f91f0fd5 TL |
1569 | def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: |
1570 | """ | |
1571 | Blink a device light. Calling something like:: | |
1572 | ||
1573 | lsmcli local-disk-ident-led-on --path $path | |
1574 | ||
1575 | If you must, you can customize this via:: | |
1576 | ||
1577 | ceph config-key set mgr/cephadm/blink_device_light_cmd '<my jinja2 template>' | |
1578 | ceph config-key set mgr/cephadm/<host>/blink_device_light_cmd '<my jinja2 template>' | |
1579 | ||
1580 | See templates/blink_device_light_cmd.j2 | |
1581 | """ | |
e306af50 | 1582 | @forall_hosts |
9f95a23c | 1583 | def blink(host, dev, path): |
f91f0fd5 TL |
1584 | cmd_line = self.template.render('blink_device_light_cmd.j2', |
1585 | { | |
1586 | 'on': on, | |
1587 | 'ident_fault': ident_fault, | |
1588 | 'dev': dev, | |
1589 | 'path': path | |
1590 | }, | |
1591 | host=host) | |
1592 | cmd_args = shlex.split(cmd_line) | |
1593 | ||
9f95a23c | 1594 | out, err, code = self._run_cephadm( |
f91f0fd5 | 1595 | host, 'osd', 'shell', ['--'] + cmd_args, |
9f95a23c TL |
1596 | error_ok=True) |
1597 | if code: | |
f6b5b4d7 | 1598 | raise OrchestratorError( |
9f95a23c | 1599 | 'Unable to affect %s light for %s:%s. Command: %s' % ( |
f91f0fd5 | 1600 | ident_fault, host, dev, ' '.join(cmd_args))) |
9f95a23c TL |
1601 | self.log.info('Set %s light for %s:%s %s' % ( |
1602 | ident_fault, host, dev, 'on' if on else 'off')) | |
1603 | return "Set %s light for %s:%s %s" % ( | |
1604 | ident_fault, host, dev, 'on' if on else 'off') | |
1605 | ||
1606 | return blink(locs) | |
1607 | ||
1608 | def get_osd_uuid_map(self, only_up=False): | |
1911f103 | 1609 | # type: (bool) -> Dict[str, str] |
9f95a23c TL |
1610 | osd_map = self.get('osd_map') |
1611 | r = {} | |
1612 | for o in osd_map['osds']: | |
1613 | # only include OSDs that have ever started in this map. this way | |
1614 | # an interrupted osd create can be repeated and succeed the second | |
1615 | # time around. | |
1911f103 TL |
1616 | osd_id = o.get('osd') |
1617 | if osd_id is None: | |
1618 | raise OrchestratorError("Could not retrieve osd_id from osd_map") | |
1619 | if not only_up or (o['up_from'] > 0): | |
1620 | r[str(osd_id)] = o.get('uuid', '') | |
9f95a23c TL |
1621 | return r |
1622 | ||
e306af50 TL |
1623 | def _trigger_preview_refresh(self, |
1624 | specs: Optional[List[DriveGroupSpec]] = None, | |
f6b5b4d7 TL |
1625 | service_name: Optional[str] = None, |
1626 | ) -> None: | |
1627 | # Only trigger a refresh when a spec has changed | |
1628 | trigger_specs = [] | |
1629 | if specs: | |
1630 | for spec in specs: | |
1631 | preview_spec = self.spec_store.spec_preview.get(spec.service_name()) | |
1632 | # the to-be-preview spec != the actual spec, this means we need to | |
1633 | # trigger a refresh, if the spec has been removed (==None) we need to | |
1634 | # refresh as well. | |
1635 | if not preview_spec or spec != preview_spec: | |
1636 | trigger_specs.append(spec) | |
1637 | if service_name: | |
1638 | trigger_specs = [cast(DriveGroupSpec, self.spec_store.spec_preview.get(service_name))] | |
1639 | if not any(trigger_specs): | |
1640 | return None | |
1641 | ||
1642 | refresh_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=trigger_specs) | |
e306af50 TL |
1643 | for host in refresh_hosts: |
1644 | self.log.info(f"Marking host: {host} for OSDSpec preview refresh.") | |
1645 | self.cache.osdspec_previews_refresh_queue.append(host) | |
1646 | ||
9f95a23c | 1647 | @trivial_completion |
f6b5b4d7 TL |
1648 | def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> List[str]: |
1649 | """ | |
1650 | Deprecated. Please use `apply()` instead. | |
1651 | ||
1652 | Keeping this around to be compapatible to mgr/dashboard | |
1653 | """ | |
9f95a23c TL |
1654 | return [self._apply(spec) for spec in specs] |
1655 | ||
1656 | @trivial_completion | |
f6b5b4d7 TL |
1657 | def create_osds(self, drive_group: DriveGroupSpec) -> str: |
1658 | return self.osd_service.create_from_spec(drive_group) | |
9f95a23c | 1659 | |
f6b5b4d7 TL |
1660 | def _preview_osdspecs(self, |
1661 | osdspecs: Optional[List[DriveGroupSpec]] = None | |
1662 | ): | |
1663 | if not osdspecs: | |
1664 | return {'n/a': [{'error': True, | |
1665 | 'message': 'No OSDSpec or matching hosts found.'}]} | |
1666 | matching_hosts = self.osd_service.resolve_hosts_for_osdspecs(specs=osdspecs) | |
e306af50 TL |
1667 | if not matching_hosts: |
1668 | return {'n/a': [{'error': True, | |
1669 | 'message': 'No OSDSpec or matching hosts found.'}]} | |
1670 | # Is any host still loading previews | |
1671 | pending_hosts = {h for h in self.cache.loading_osdspec_preview if h in matching_hosts} | |
1672 | if pending_hosts: | |
1673 | # Report 'pending' when any of the matching hosts is still loading previews (flag is True) | |
1674 | return {'n/a': [{'error': True, | |
1675 | 'message': 'Preview data is being generated.. ' | |
f6b5b4d7 TL |
1676 | 'Please re-run this command in a bit.'}]} |
1677 | # drop all keys that are not in search_hosts and only select reports that match the requested osdspecs | |
1678 | previews_for_specs = {} | |
1679 | for host, raw_reports in self.cache.osdspec_previews.items(): | |
1680 | if host not in matching_hosts: | |
1681 | continue | |
1682 | osd_reports = [] | |
1683 | for osd_report in raw_reports: | |
1684 | if osd_report.get('osdspec') in [x.service_id for x in osdspecs]: | |
1685 | osd_reports.append(osd_report) | |
1686 | previews_for_specs.update({host: osd_reports}) | |
1687 | return previews_for_specs | |
9f95a23c TL |
1688 | |
1689 | def _calc_daemon_deps(self, daemon_type, daemon_id): | |
1690 | need = { | |
1691 | 'prometheus': ['mgr', 'alertmanager', 'node-exporter'], | |
1692 | 'grafana': ['prometheus'], | |
801d1391 | 1693 | 'alertmanager': ['mgr', 'alertmanager'], |
9f95a23c TL |
1694 | } |
1695 | deps = [] | |
1696 | for dep_type in need.get(daemon_type, []): | |
1697 | for dd in self.cache.get_daemons_by_service(dep_type): | |
1698 | deps.append(dd.name()) | |
1699 | return sorted(deps) | |
1700 | ||
e306af50 | 1701 | def _create_daemon(self, |
f6b5b4d7 | 1702 | daemon_spec: CephadmDaemonSpec, |
9f95a23c | 1703 | reconfig=False, |
e306af50 | 1704 | osd_uuid_map: Optional[Dict[str, Any]] = None, |
e306af50 TL |
1705 | ) -> str: |
1706 | ||
f6b5b4d7 TL |
1707 | with set_exception_subject('service', orchestrator.DaemonDescription( |
1708 | daemon_type=daemon_spec.daemon_type, | |
1709 | daemon_id=daemon_spec.daemon_id, | |
1710 | hostname=daemon_spec.host, | |
1711 | ).service_id(), overwrite=True): | |
1712 | ||
f91f0fd5 | 1713 | image = '' |
f6b5b4d7 | 1714 | start_time = datetime.datetime.utcnow() |
f91f0fd5 TL |
1715 | ports: List[int] = daemon_spec.ports if daemon_spec.ports else [] |
1716 | ||
1717 | if daemon_spec.daemon_type == 'container': | |
1718 | spec: Optional[CustomContainerSpec] = daemon_spec.spec | |
1719 | if spec is None: | |
1720 | # Exit here immediately because the required service | |
1721 | # spec to create a daemon is not provided. This is only | |
1722 | # provided when a service is applied via 'orch apply' | |
1723 | # command. | |
1724 | msg = "Failed to {} daemon {} on {}: Required " \ | |
1725 | "service specification not provided".format( | |
1726 | 'reconfigure' if reconfig else 'deploy', | |
1727 | daemon_spec.name(), daemon_spec.host) | |
1728 | self.log.info(msg) | |
1729 | return msg | |
1730 | image = spec.image | |
1731 | if spec.ports: | |
1732 | ports.extend(spec.ports) | |
1733 | ||
1734 | cephadm_config, deps = self.cephadm_services[daemon_spec.daemon_type].generate_config( | |
1735 | daemon_spec) | |
f6b5b4d7 TL |
1736 | |
1737 | # TCP port to open in the host firewall | |
f91f0fd5 TL |
1738 | if len(ports) > 0: |
1739 | daemon_spec.extra_args.extend([ | |
1740 | '--tcp-ports', ' '.join(map(str, ports)) | |
1741 | ]) | |
9f95a23c TL |
1742 | |
1743 | # osd deployments needs an --osd-uuid arg | |
f6b5b4d7 | 1744 | if daemon_spec.daemon_type == 'osd': |
9f95a23c TL |
1745 | if not osd_uuid_map: |
1746 | osd_uuid_map = self.get_osd_uuid_map() | |
f6b5b4d7 | 1747 | osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) |
9f95a23c | 1748 | if not osd_uuid: |
f6b5b4d7 TL |
1749 | raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) |
1750 | daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) | |
9f95a23c | 1751 | |
f6b5b4d7 TL |
1752 | if reconfig: |
1753 | daemon_spec.extra_args.append('--reconfig') | |
1754 | if self.allow_ptrace: | |
1755 | daemon_spec.extra_args.append('--allow-ptrace') | |
9f95a23c | 1756 | |
f6b5b4d7 | 1757 | if self.cache.host_needs_registry_login(daemon_spec.host) and self.registry_url: |
f91f0fd5 TL |
1758 | self._registry_login(daemon_spec.host, self.registry_url, |
1759 | self.registry_username, self.registry_password) | |
1760 | ||
1761 | daemon_spec.extra_args.extend(['--config-json', '-']) | |
9f95a23c | 1762 | |
f6b5b4d7 TL |
1763 | self.log.info('%s daemon %s on %s' % ( |
1764 | 'Reconfiguring' if reconfig else 'Deploying', | |
1765 | daemon_spec.name(), daemon_spec.host)) | |
1766 | ||
1767 | out, err, code = self._run_cephadm( | |
1768 | daemon_spec.host, daemon_spec.name(), 'deploy', | |
1769 | [ | |
1770 | '--name', daemon_spec.name(), | |
1771 | ] + daemon_spec.extra_args, | |
f91f0fd5 TL |
1772 | stdin=json.dumps(cephadm_config), |
1773 | image=image) | |
f6b5b4d7 TL |
1774 | if not code and daemon_spec.host in self.cache.daemons: |
1775 | # prime cached service state with what we (should have) | |
1776 | # just created | |
1777 | sd = orchestrator.DaemonDescription() | |
1778 | sd.daemon_type = daemon_spec.daemon_type | |
1779 | sd.daemon_id = daemon_spec.daemon_id | |
1780 | sd.hostname = daemon_spec.host | |
1781 | sd.status = 1 | |
1782 | sd.status_desc = 'starting' | |
1783 | self.cache.add_daemon(daemon_spec.host, sd) | |
f91f0fd5 | 1784 | if daemon_spec.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager']: |
f6b5b4d7 TL |
1785 | self.requires_post_actions.add(daemon_spec.daemon_type) |
1786 | self.cache.invalidate_host_daemons(daemon_spec.host) | |
f91f0fd5 TL |
1787 | self.cache.update_daemon_config_deps( |
1788 | daemon_spec.host, daemon_spec.name(), deps, start_time) | |
f6b5b4d7 TL |
1789 | self.cache.save_host(daemon_spec.host) |
1790 | msg = "{} {} on host '{}'".format( | |
1791 | 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) | |
1792 | if not code: | |
1793 | self.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) | |
1794 | else: | |
1795 | what = 'reconfigure' if reconfig else 'deploy' | |
f91f0fd5 TL |
1796 | self.events.for_daemon( |
1797 | daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') | |
f6b5b4d7 | 1798 | return msg |
9f95a23c | 1799 | |
e306af50 TL |
1800 | @forall_hosts |
1801 | def _remove_daemons(self, name, host) -> str: | |
9f95a23c TL |
1802 | return self._remove_daemon(name, host) |
1803 | ||
e306af50 | 1804 | def _remove_daemon(self, name, host) -> str: |
9f95a23c TL |
1805 | """ |
1806 | Remove a daemon | |
1807 | """ | |
1808 | (daemon_type, daemon_id) = name.split('.', 1) | |
f91f0fd5 TL |
1809 | daemon = orchestrator.DaemonDescription( |
1810 | daemon_type=daemon_type, | |
1811 | daemon_id=daemon_id, | |
1812 | hostname=host) | |
9f95a23c | 1813 | |
f91f0fd5 | 1814 | with set_exception_subject('service', daemon.service_id(), overwrite=True): |
f6b5b4d7 | 1815 | |
f91f0fd5 | 1816 | self.cephadm_services[daemon_type].pre_remove(daemon) |
f6b5b4d7 TL |
1817 | |
1818 | args = ['--name', name, '--force'] | |
1819 | self.log.info('Removing daemon %s from %s' % (name, host)) | |
1820 | out, err, code = self._run_cephadm( | |
1821 | host, name, 'rm-daemon', args) | |
1822 | if not code: | |
1823 | # remove item from cache | |
1824 | self.cache.rm_daemon(host, name) | |
1825 | self.cache.invalidate_host_daemons(host) | |
e306af50 | 1826 | |
f91f0fd5 | 1827 | self.cephadm_services[daemon_type].post_remove(daemon) |
9f95a23c | 1828 | |
f91f0fd5 | 1829 | return "Removed {} from host '{}'".format(name, host) |
9f95a23c | 1830 | |
e306af50 TL |
1831 | def _check_pool_exists(self, pool, service_name): |
1832 | logger.info(f'Checking pool "{pool}" exists for service {service_name}') | |
1833 | if not self.rados.pool_exists(pool): | |
1834 | raise OrchestratorError(f'Cannot find pool "{pool}" for ' | |
1835 | f'service {service_name}') | |
1836 | ||
9f95a23c | 1837 | def _add_daemon(self, daemon_type, spec, |
f91f0fd5 | 1838 | create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]: |
9f95a23c TL |
1839 | """ |
1840 | Add (and place) a daemon. Require explicit host placement. Do not | |
1841 | schedule, and do not apply the related scheduling limitations. | |
1842 | """ | |
1843 | self.log.debug('_add_daemon %s spec %s' % (daemon_type, spec.placement)) | |
1844 | if not spec.placement.hosts: | |
1845 | raise OrchestratorError('must specify host(s) to deploy on') | |
1846 | count = spec.placement.count or len(spec.placement.hosts) | |
1847 | daemons = self.cache.get_daemons_by_service(spec.service_name()) | |
1848 | return self._create_daemons(daemon_type, spec, daemons, | |
1849 | spec.placement.hosts, count, | |
1850 | create_func, config_func) | |
1851 | ||
1852 | def _create_daemons(self, daemon_type, spec, daemons, | |
1853 | hosts, count, | |
f91f0fd5 | 1854 | create_func: Callable[..., CephadmDaemonSpec], config_func=None) -> List[str]: |
9f95a23c TL |
1855 | if count > len(hosts): |
1856 | raise OrchestratorError('too few hosts: want %d, have %s' % ( | |
1857 | count, hosts)) | |
1858 | ||
f6b5b4d7 | 1859 | did_config = False |
9f95a23c | 1860 | |
f6b5b4d7 | 1861 | args = [] # type: List[CephadmDaemonSpec] |
9f95a23c TL |
1862 | for host, network, name in hosts: |
1863 | daemon_id = self.get_unique_name(daemon_type, host, daemons, | |
e306af50 TL |
1864 | prefix=spec.service_id, |
1865 | forcename=name) | |
f6b5b4d7 TL |
1866 | |
1867 | if not did_config and config_func: | |
1868 | if daemon_type == 'rgw': | |
1869 | config_func(spec, daemon_id) | |
1870 | else: | |
1871 | config_func(spec) | |
1872 | did_config = True | |
1873 | ||
f91f0fd5 TL |
1874 | daemon_spec = self.cephadm_services[daemon_type].make_daemon_spec( |
1875 | host, daemon_id, network, spec) | |
9f95a23c TL |
1876 | self.log.debug('Placing %s.%s on host %s' % ( |
1877 | daemon_type, daemon_id, host)) | |
f6b5b4d7 | 1878 | args.append(daemon_spec) |
9f95a23c TL |
1879 | |
1880 | # add to daemon list so next name(s) will also be unique | |
1881 | sd = orchestrator.DaemonDescription( | |
1882 | hostname=host, | |
1883 | daemon_type=daemon_type, | |
1884 | daemon_id=daemon_id, | |
1885 | ) | |
1886 | daemons.append(sd) | |
1887 | ||
e306af50 | 1888 | @forall_hosts |
9f95a23c | 1889 | def create_func_map(*args): |
f91f0fd5 TL |
1890 | daemon_spec = create_func(*args) |
1891 | return self._create_daemon(daemon_spec) | |
9f95a23c TL |
1892 | |
1893 | return create_func_map(args) | |
1894 | ||
1895 | @trivial_completion | |
f6b5b4d7 | 1896 | def apply_mon(self, spec) -> str: |
9f95a23c TL |
1897 | return self._apply(spec) |
1898 | ||
e306af50 | 1899 | @trivial_completion |
9f95a23c | 1900 | def add_mon(self, spec): |
e306af50 | 1901 | # type: (ServiceSpec) -> List[str] |
f91f0fd5 | 1902 | return self._add_daemon('mon', spec, self.mon_service.prepare_create) |
9f95a23c | 1903 | |
e306af50 TL |
1904 | @trivial_completion |
1905 | def add_mgr(self, spec): | |
1906 | # type: (ServiceSpec) -> List[str] | |
f91f0fd5 | 1907 | return self._add_daemon('mgr', spec, self.mgr_service.prepare_create) |
9f95a23c | 1908 | |
e306af50 TL |
1909 | def _apply(self, spec: GenericSpec) -> str: |
1910 | if spec.service_type == 'host': | |
1911 | return self._add_host(cast(HostSpec, spec)) | |
9f95a23c | 1912 | |
f6b5b4d7 TL |
1913 | if spec.service_type == 'osd': |
1914 | # _trigger preview refresh needs to be smart and | |
1915 | # should only refresh if a change has been detected | |
1916 | self._trigger_preview_refresh(specs=[cast(DriveGroupSpec, spec)]) | |
1917 | ||
e306af50 | 1918 | return self._apply_service_spec(cast(ServiceSpec, spec)) |
9f95a23c | 1919 | |
f6b5b4d7 TL |
1920 | def _plan(self, spec: ServiceSpec): |
1921 | if spec.service_type == 'osd': | |
1922 | return {'service_name': spec.service_name(), | |
1923 | 'service_type': spec.service_type, | |
1924 | 'data': self._preview_osdspecs(osdspecs=[cast(DriveGroupSpec, spec)])} | |
1925 | ||
1926 | ha = HostAssignment( | |
1927 | spec=spec, | |
f91f0fd5 | 1928 | hosts=self._hosts_with_daemon_inventory(), |
f6b5b4d7 TL |
1929 | get_daemons_func=self.cache.get_daemons_by_service, |
1930 | ) | |
1931 | ha.validate() | |
1932 | hosts = ha.place() | |
1933 | ||
1934 | add_daemon_hosts = ha.add_daemon_hosts(hosts) | |
1935 | remove_daemon_hosts = ha.remove_daemon_hosts(hosts) | |
1936 | ||
1937 | return { | |
1938 | 'service_name': spec.service_name(), | |
1939 | 'service_type': spec.service_type, | |
1940 | 'add': [hs.hostname for hs in add_daemon_hosts], | |
1941 | 'remove': [d.hostname for d in remove_daemon_hosts] | |
1942 | } | |
1943 | ||
1944 | @trivial_completion | |
1945 | def plan(self, specs: List[GenericSpec]) -> List: | |
1946 | results = [{'warning': 'WARNING! Dry-Runs are snapshots of a certain point in time and are bound \n' | |
1947 | 'to the current inventory setup. If any on these conditions changes, the \n' | |
1948 | 'preview will be invalid. Please make sure to have a minimal \n' | |
1949 | 'timeframe between planning and applying the specs.'}] | |
1950 | if any([spec.service_type == 'host' for spec in specs]): | |
1951 | return [{'error': 'Found <HostSpec>. Previews that include Host Specifications are not supported, yet.'}] | |
1952 | for spec in specs: | |
1953 | results.append(self._plan(cast(ServiceSpec, spec))) | |
1954 | return results | |
1955 | ||
e306af50 | 1956 | def _apply_service_spec(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
1957 | if spec.placement.is_empty(): |
1958 | # fill in default placement | |
1959 | defaults = { | |
1960 | 'mon': PlacementSpec(count=5), | |
1961 | 'mgr': PlacementSpec(count=2), | |
1962 | 'mds': PlacementSpec(count=2), | |
1963 | 'rgw': PlacementSpec(count=2), | |
1911f103 | 1964 | 'iscsi': PlacementSpec(count=1), |
9f95a23c | 1965 | 'rbd-mirror': PlacementSpec(count=2), |
801d1391 | 1966 | 'nfs': PlacementSpec(count=1), |
9f95a23c TL |
1967 | 'grafana': PlacementSpec(count=1), |
1968 | 'alertmanager': PlacementSpec(count=1), | |
1969 | 'prometheus': PlacementSpec(count=1), | |
1970 | 'node-exporter': PlacementSpec(host_pattern='*'), | |
1971 | 'crash': PlacementSpec(host_pattern='*'), | |
f91f0fd5 | 1972 | 'container': PlacementSpec(count=1), |
9f95a23c TL |
1973 | } |
1974 | spec.placement = defaults[spec.service_type] | |
1975 | elif spec.service_type in ['mon', 'mgr'] and \ | |
f91f0fd5 TL |
1976 | spec.placement.count is not None and \ |
1977 | spec.placement.count < 1: | |
9f95a23c TL |
1978 | raise OrchestratorError('cannot scale %s service below 1' % ( |
1979 | spec.service_type)) | |
1980 | ||
1981 | HostAssignment( | |
1982 | spec=spec, | |
f91f0fd5 | 1983 | hosts=self.inventory.all_specs(), # All hosts, even those without daemon refresh |
9f95a23c TL |
1984 | get_daemons_func=self.cache.get_daemons_by_service, |
1985 | ).validate() | |
1986 | ||
1987 | self.log.info('Saving service %s spec with placement %s' % ( | |
1988 | spec.service_name(), spec.placement.pretty_str())) | |
1989 | self.spec_store.save(spec) | |
1990 | self._kick_serve_loop() | |
1911f103 | 1991 | return "Scheduled %s update..." % spec.service_name() |
9f95a23c TL |
1992 | |
1993 | @trivial_completion | |
f6b5b4d7 | 1994 | def apply(self, specs: List[GenericSpec]) -> List[str]: |
e306af50 TL |
1995 | results = [] |
1996 | for spec in specs: | |
1997 | results.append(self._apply(spec)) | |
1998 | return results | |
9f95a23c TL |
1999 | |
2000 | @trivial_completion | |
f6b5b4d7 | 2001 | def apply_mgr(self, spec) -> str: |
9f95a23c TL |
2002 | return self._apply(spec) |
2003 | ||
e306af50 | 2004 | @trivial_completion |
f6b5b4d7 | 2005 | def add_mds(self, spec: ServiceSpec) -> List[str]: |
f91f0fd5 | 2006 | return self._add_daemon('mds', spec, self.mds_service.prepare_create, self.mds_service.config) |
9f95a23c TL |
2007 | |
2008 | @trivial_completion | |
f6b5b4d7 | 2009 | def apply_mds(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2010 | return self._apply(spec) |
2011 | ||
e306af50 | 2012 | @trivial_completion |
f6b5b4d7 | 2013 | def add_rgw(self, spec) -> List[str]: |
f91f0fd5 | 2014 | return self._add_daemon('rgw', spec, self.rgw_service.prepare_create, self.rgw_service.config) |
9f95a23c TL |
2015 | |
2016 | @trivial_completion | |
f6b5b4d7 | 2017 | def apply_rgw(self, spec) -> str: |
9f95a23c TL |
2018 | return self._apply(spec) |
2019 | ||
e306af50 | 2020 | @trivial_completion |
1911f103 | 2021 | def add_iscsi(self, spec): |
e306af50 | 2022 | # type: (ServiceSpec) -> List[str] |
f91f0fd5 | 2023 | return self._add_daemon('iscsi', spec, self.iscsi_service.prepare_create, self.iscsi_service.config) |
1911f103 TL |
2024 | |
2025 | @trivial_completion | |
f6b5b4d7 | 2026 | def apply_iscsi(self, spec) -> str: |
1911f103 TL |
2027 | return self._apply(spec) |
2028 | ||
e306af50 | 2029 | @trivial_completion |
f6b5b4d7 | 2030 | def add_rbd_mirror(self, spec) -> List[str]: |
f91f0fd5 | 2031 | return self._add_daemon('rbd-mirror', spec, self.rbd_mirror_service.prepare_create) |
9f95a23c TL |
2032 | |
2033 | @trivial_completion | |
f6b5b4d7 | 2034 | def apply_rbd_mirror(self, spec) -> str: |
9f95a23c TL |
2035 | return self._apply(spec) |
2036 | ||
e306af50 | 2037 | @trivial_completion |
f6b5b4d7 | 2038 | def add_nfs(self, spec) -> List[str]: |
f91f0fd5 | 2039 | return self._add_daemon('nfs', spec, self.nfs_service.prepare_create, self.nfs_service.config) |
801d1391 TL |
2040 | |
2041 | @trivial_completion | |
f6b5b4d7 | 2042 | def apply_nfs(self, spec) -> str: |
801d1391 TL |
2043 | return self._apply(spec) |
2044 | ||
9f95a23c TL |
2045 | def _get_dashboard_url(self): |
2046 | # type: () -> str | |
2047 | return self.get('mgr_map').get('services', {}).get('dashboard', '') | |
2048 | ||
e306af50 | 2049 | @trivial_completion |
f6b5b4d7 | 2050 | def add_prometheus(self, spec) -> List[str]: |
f91f0fd5 | 2051 | return self._add_daemon('prometheus', spec, self.prometheus_service.prepare_create) |
9f95a23c TL |
2052 | |
2053 | @trivial_completion | |
f6b5b4d7 | 2054 | def apply_prometheus(self, spec) -> str: |
9f95a23c TL |
2055 | return self._apply(spec) |
2056 | ||
e306af50 | 2057 | @trivial_completion |
9f95a23c | 2058 | def add_node_exporter(self, spec): |
e306af50 | 2059 | # type: (ServiceSpec) -> List[str] |
9f95a23c | 2060 | return self._add_daemon('node-exporter', spec, |
f91f0fd5 | 2061 | self.node_exporter_service.prepare_create) |
9f95a23c TL |
2062 | |
2063 | @trivial_completion | |
f6b5b4d7 | 2064 | def apply_node_exporter(self, spec) -> str: |
9f95a23c TL |
2065 | return self._apply(spec) |
2066 | ||
e306af50 | 2067 | @trivial_completion |
9f95a23c | 2068 | def add_crash(self, spec): |
e306af50 | 2069 | # type: (ServiceSpec) -> List[str] |
9f95a23c | 2070 | return self._add_daemon('crash', spec, |
f91f0fd5 | 2071 | self.crash_service.prepare_create) |
9f95a23c TL |
2072 | |
2073 | @trivial_completion | |
f6b5b4d7 | 2074 | def apply_crash(self, spec) -> str: |
9f95a23c TL |
2075 | return self._apply(spec) |
2076 | ||
e306af50 | 2077 | @trivial_completion |
9f95a23c | 2078 | def add_grafana(self, spec): |
e306af50 | 2079 | # type: (ServiceSpec) -> List[str] |
f91f0fd5 | 2080 | return self._add_daemon('grafana', spec, self.grafana_service.prepare_create) |
9f95a23c TL |
2081 | |
2082 | @trivial_completion | |
f6b5b4d7 | 2083 | def apply_grafana(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2084 | return self._apply(spec) |
2085 | ||
e306af50 | 2086 | @trivial_completion |
9f95a23c | 2087 | def add_alertmanager(self, spec): |
e306af50 | 2088 | # type: (ServiceSpec) -> List[str] |
f91f0fd5 | 2089 | return self._add_daemon('alertmanager', spec, self.alertmanager_service.prepare_create) |
9f95a23c TL |
2090 | |
2091 | @trivial_completion | |
f6b5b4d7 | 2092 | def apply_alertmanager(self, spec: ServiceSpec) -> str: |
9f95a23c TL |
2093 | return self._apply(spec) |
2094 | ||
f91f0fd5 TL |
2095 | @trivial_completion |
2096 | def add_container(self, spec: ServiceSpec) -> List[str]: | |
2097 | return self._add_daemon('container', spec, | |
2098 | self.container_service.prepare_create) | |
2099 | ||
2100 | @trivial_completion | |
2101 | def apply_container(self, spec: ServiceSpec) -> str: | |
2102 | return self._apply(spec) | |
2103 | ||
2104 | def _get_container_image_info(self, image_name) -> ContainerInspectInfo: | |
9f95a23c TL |
2105 | # pick a random host... |
2106 | host = None | |
e306af50 | 2107 | for host_name in self.inventory.keys(): |
9f95a23c TL |
2108 | host = host_name |
2109 | break | |
2110 | if not host: | |
2111 | raise OrchestratorError('no hosts defined') | |
f6b5b4d7 | 2112 | if self.cache.host_needs_registry_login(host) and self.registry_url: |
f91f0fd5 TL |
2113 | self._registry_login(host, self.registry_url, |
2114 | self.registry_username, self.registry_password) | |
9f95a23c | 2115 | out, err, code = self._run_cephadm( |
f6b5b4d7 | 2116 | host, '', 'pull', [], |
9f95a23c TL |
2117 | image=image_name, |
2118 | no_fsid=True, | |
2119 | error_ok=True) | |
2120 | if code: | |
2121 | raise OrchestratorError('Failed to pull %s on %s: %s' % ( | |
2122 | image_name, host, '\n'.join(out))) | |
f91f0fd5 TL |
2123 | try: |
2124 | j = json.loads('\n'.join(out)) | |
2125 | r = ContainerInspectInfo( | |
2126 | j['image_id'], | |
2127 | j.get('ceph_version'), | |
2128 | j.get('repo_digest') | |
2129 | ) | |
2130 | self.log.debug(f'image {image_name} -> {r}') | |
2131 | return r | |
2132 | except (ValueError, KeyError) as _: | |
2133 | msg = 'Failed to pull %s on %s: %s' % (image_name, host, '\n'.join(out)) | |
2134 | self.log.exception(msg) | |
2135 | raise OrchestratorError(msg) | |
9f95a23c TL |
2136 | |
2137 | @trivial_completion | |
f6b5b4d7 | 2138 | def upgrade_check(self, image, version) -> str: |
9f95a23c TL |
2139 | if version: |
2140 | target_name = self.container_image_base + ':v' + version | |
2141 | elif image: | |
2142 | target_name = image | |
2143 | else: | |
2144 | raise OrchestratorError('must specify either image or version') | |
2145 | ||
f91f0fd5 TL |
2146 | image_info = self._get_container_image_info(target_name) |
2147 | self.log.debug(f'image info {image} -> {image_info}') | |
9f95a23c TL |
2148 | r = { |
2149 | 'target_name': target_name, | |
f91f0fd5 TL |
2150 | 'target_id': image_info.image_id, |
2151 | 'target_version': image_info.ceph_version, | |
9f95a23c TL |
2152 | 'needs_update': dict(), |
2153 | 'up_to_date': list(), | |
2154 | } | |
2155 | for host, dm in self.cache.daemons.items(): | |
2156 | for name, dd in dm.items(): | |
f91f0fd5 | 2157 | if image_info.image_id == dd.container_image_id: |
9f95a23c TL |
2158 | r['up_to_date'].append(dd.name()) |
2159 | else: | |
2160 | r['needs_update'][dd.name()] = { | |
2161 | 'current_name': dd.container_image_name, | |
2162 | 'current_id': dd.container_image_id, | |
2163 | 'current_version': dd.version, | |
2164 | } | |
f91f0fd5 TL |
2165 | if self.use_repo_digest: |
2166 | r['target_digest'] = image_info.repo_digest | |
2167 | ||
9f95a23c TL |
2168 | return json.dumps(r, indent=4, sort_keys=True) |
2169 | ||
2170 | @trivial_completion | |
f6b5b4d7 | 2171 | def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: |
e306af50 | 2172 | return self.upgrade.upgrade_status() |
9f95a23c TL |
2173 | |
2174 | @trivial_completion | |
f6b5b4d7 | 2175 | def upgrade_start(self, image, version) -> str: |
e306af50 | 2176 | return self.upgrade.upgrade_start(image, version) |
9f95a23c TL |
2177 | |
2178 | @trivial_completion | |
f6b5b4d7 | 2179 | def upgrade_pause(self) -> str: |
e306af50 | 2180 | return self.upgrade.upgrade_pause() |
9f95a23c TL |
2181 | |
2182 | @trivial_completion | |
f6b5b4d7 | 2183 | def upgrade_resume(self) -> str: |
e306af50 | 2184 | return self.upgrade.upgrade_resume() |
9f95a23c TL |
2185 | |
2186 | @trivial_completion | |
f6b5b4d7 | 2187 | def upgrade_stop(self) -> str: |
e306af50 | 2188 | return self.upgrade.upgrade_stop() |
9f95a23c TL |
2189 | |
2190 | @trivial_completion | |
2191 | def remove_osds(self, osd_ids: List[str], | |
2192 | replace: bool = False, | |
f6b5b4d7 | 2193 | force: bool = False) -> str: |
9f95a23c TL |
2194 | """ |
2195 | Takes a list of OSDs and schedules them for removal. | |
2196 | The function that takes care of the actual removal is | |
f6b5b4d7 | 2197 | process_removal_queue(). |
9f95a23c TL |
2198 | """ |
2199 | ||
f6b5b4d7 TL |
2200 | daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd') |
2201 | to_remove_daemons = list() | |
9f95a23c | 2202 | for daemon in daemons: |
f6b5b4d7 TL |
2203 | if daemon.daemon_id in osd_ids: |
2204 | to_remove_daemons.append(daemon) | |
2205 | if not to_remove_daemons: | |
2206 | return f"Unable to find OSDs: {osd_ids}" | |
9f95a23c | 2207 | |
f6b5b4d7 TL |
2208 | for daemon in to_remove_daemons: |
2209 | try: | |
2210 | self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id), | |
2211 | replace=replace, | |
2212 | force=force, | |
2213 | hostname=daemon.hostname, | |
2214 | fullname=daemon.name(), | |
2215 | process_started_at=datetime.datetime.utcnow(), | |
2216 | remove_util=self.rm_util)) | |
2217 | except NotFoundError: | |
2218 | return f"Unable to find OSDs: {osd_ids}" | |
9f95a23c TL |
2219 | |
2220 | # trigger the serve loop to initiate the removal | |
2221 | self._kick_serve_loop() | |
2222 | return "Scheduled OSD(s) for removal" | |
2223 | ||
f6b5b4d7 TL |
2224 | @trivial_completion |
2225 | def stop_remove_osds(self, osd_ids: List[str]): | |
2226 | """ | |
2227 | Stops a `removal` process for a List of OSDs. | |
2228 | This will revert their weight and remove it from the osds_to_remove queue | |
2229 | """ | |
2230 | for osd_id in osd_ids: | |
2231 | try: | |
2232 | self.to_remove_osds.rm(OSD(osd_id=int(osd_id), | |
2233 | remove_util=self.rm_util)) | |
2234 | except (NotFoundError, KeyError): | |
2235 | return f'Unable to find OSD in the queue: {osd_id}' | |
2236 | ||
2237 | # trigger the serve loop to halt the removal | |
2238 | self._kick_serve_loop() | |
2239 | return "Stopped OSD(s) removal" | |
2240 | ||
9f95a23c TL |
2241 | @trivial_completion |
2242 | def remove_osds_status(self): | |
2243 | """ | |
2244 | The CLI call to retrieve an osd removal report | |
2245 | """ | |
f6b5b4d7 | 2246 | return self.to_remove_osds.all_osds() |