]>
Commit | Line | Data |
---|---|---|
b3b6e05e | 1 | import hashlib |
f91f0fd5 TL |
2 | import json |
3 | import logging | |
b3b6e05e | 4 | import uuid |
f91f0fd5 | 5 | from collections import defaultdict |
f67539c2 TL |
6 | from contextlib import contextmanager |
7 | from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator | |
8 | ||
9 | from cephadm import remotes | |
f91f0fd5 TL |
10 | |
11 | try: | |
12 | import remoto | |
f67539c2 | 13 | import execnet.gateway_bootstrap |
f91f0fd5 TL |
14 | except ImportError: |
15 | remoto = None | |
16 | ||
17 | from ceph.deployment import inventory | |
18 | from ceph.deployment.drive_group import DriveGroupSpec | |
b3b6e05e | 19 | from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec |
adb31ebb | 20 | from ceph.utils import str_to_datetime, datetime_now |
f91f0fd5 TL |
21 | |
22 | import orchestrator | |
f67539c2 TL |
23 | from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \ |
24 | DaemonDescriptionStatus, daemon_type_to_service | |
25 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec | |
f91f0fd5 | 26 | from cephadm.schedule import HostAssignment |
b3b6e05e | 27 | from cephadm.autotune import MemoryAutotuner |
f67539c2 TL |
28 | from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \ |
29 | CephadmNoImage, CEPH_TYPES, ContainerInspectInfo | |
30 | from mgr_module import MonCommandFailed | |
b3b6e05e | 31 | from mgr_util import format_bytes |
f67539c2 TL |
32 | |
33 | from . import utils | |
f91f0fd5 TL |
34 | |
35 | if TYPE_CHECKING: | |
f67539c2 TL |
36 | from cephadm.module import CephadmOrchestrator |
37 | from remoto.backends import BaseConnection | |
f91f0fd5 TL |
38 | |
39 | logger = logging.getLogger(__name__) | |
40 | ||
f91f0fd5 TL |
41 | |
42 | class CephadmServe: | |
43 | """ | |
44 | This module contains functions that are executed in the | |
45 | serve() thread. Thus they don't block the CLI. | |
46 | ||
f67539c2 TL |
47 | Please see the `Note regarding network calls from CLI handlers` |
48 | chapter in the cephadm developer guide. | |
49 | ||
f91f0fd5 TL |
50 | On the other hand, These function should *not* be called form |
51 | CLI handlers, to avoid blocking the CLI | |
52 | """ | |
53 | ||
54 | def __init__(self, mgr: "CephadmOrchestrator"): | |
55 | self.mgr: "CephadmOrchestrator" = mgr | |
56 | self.log = logger | |
57 | ||
58 | def serve(self) -> None: | |
59 | """ | |
60 | The main loop of cephadm. | |
61 | ||
62 | A command handler will typically change the declarative state | |
63 | of cephadm. This loop will then attempt to apply this new state. | |
64 | """ | |
65 | self.log.debug("serve starting") | |
f67539c2 TL |
66 | self.mgr.config_checker.load_network_config() |
67 | ||
f91f0fd5 TL |
68 | while self.mgr.run: |
69 | ||
70 | try: | |
71 | ||
72 | self.convert_tags_to_repo_digest() | |
73 | ||
74 | # refresh daemons | |
75 | self.log.debug('refreshing hosts and daemons') | |
76 | self._refresh_hosts_and_daemons() | |
77 | ||
78 | self._check_for_strays() | |
79 | ||
80 | self._update_paused_health() | |
81 | ||
82 | if not self.mgr.paused: | |
adb31ebb | 83 | self.mgr.to_remove_osds.process_removal_queue() |
f91f0fd5 TL |
84 | |
85 | self.mgr.migration.migrate() | |
86 | if self.mgr.migration.is_migration_ongoing(): | |
87 | continue | |
88 | ||
89 | if self._apply_all_services(): | |
90 | continue # did something, refresh | |
91 | ||
92 | self._check_daemons() | |
93 | ||
f67539c2 TL |
94 | self._purge_deleted_services() |
95 | ||
f91f0fd5 TL |
96 | if self.mgr.upgrade.continue_upgrade(): |
97 | continue | |
98 | ||
99 | except OrchestratorError as e: | |
100 | if e.event_subject: | |
101 | self.mgr.events.from_orch_error(e) | |
102 | ||
103 | self._serve_sleep() | |
104 | self.log.debug("serve exit") | |
105 | ||
adb31ebb | 106 | def _serve_sleep(self) -> None: |
f67539c2 TL |
107 | sleep_interval = max( |
108 | 30, | |
109 | min( | |
110 | self.mgr.host_check_interval, | |
111 | self.mgr.facts_cache_timeout, | |
112 | self.mgr.daemon_cache_timeout, | |
113 | self.mgr.device_cache_timeout, | |
114 | ) | |
115 | ) | |
f91f0fd5 | 116 | self.log.debug('Sleeping for %d seconds', sleep_interval) |
f67539c2 | 117 | self.mgr.event.wait(sleep_interval) |
f91f0fd5 TL |
118 | self.mgr.event.clear() |
119 | ||
adb31ebb | 120 | def _update_paused_health(self) -> None: |
f91f0fd5 TL |
121 | if self.mgr.paused: |
122 | self.mgr.health_checks['CEPHADM_PAUSED'] = { | |
123 | 'severity': 'warning', | |
124 | 'summary': 'cephadm background work is paused', | |
125 | 'count': 1, | |
126 | 'detail': ["'ceph orch resume' to resume"], | |
127 | } | |
128 | self.mgr.set_health_checks(self.mgr.health_checks) | |
129 | else: | |
130 | if 'CEPHADM_PAUSED' in self.mgr.health_checks: | |
131 | del self.mgr.health_checks['CEPHADM_PAUSED'] | |
132 | self.mgr.set_health_checks(self.mgr.health_checks) | |
133 | ||
b3b6e05e TL |
134 | def _autotune_host_memory(self, host: str) -> None: |
135 | total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0) | |
136 | if not total_mem: | |
137 | val = None | |
138 | else: | |
139 | total_mem *= 1024 # kb -> bytes | |
140 | total_mem *= self.mgr.autotune_memory_target_ratio | |
141 | a = MemoryAutotuner( | |
142 | daemons=self.mgr.cache.get_daemons_by_host(host), | |
143 | config_get=self.mgr.get_foreign_ceph_option, | |
144 | total_mem=total_mem, | |
145 | ) | |
146 | val, osds = a.tune() | |
147 | any_changed = False | |
148 | for o in osds: | |
149 | if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val: | |
150 | self.mgr.check_mon_command({ | |
151 | 'prefix': 'config rm', | |
152 | 'who': o, | |
153 | 'name': 'osd_memory_target', | |
154 | }) | |
155 | any_changed = True | |
156 | if val is not None: | |
157 | if any_changed: | |
158 | self.mgr.log.info( | |
159 | f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}' | |
160 | ) | |
161 | ret, out, err = self.mgr.mon_command({ | |
162 | 'prefix': 'config set', | |
163 | 'who': f'osd/host:{host}', | |
164 | 'name': 'osd_memory_target', | |
165 | 'value': str(val), | |
166 | }) | |
167 | if ret: | |
168 | self.log.warning( | |
169 | f'Unable to set osd_memory_target on {host} to {val}: {err}' | |
170 | ) | |
171 | else: | |
172 | self.mgr.check_mon_command({ | |
173 | 'prefix': 'config rm', | |
174 | 'who': f'osd/host:{host}', | |
175 | 'name': 'osd_memory_target', | |
176 | }) | |
177 | self.mgr.cache.update_autotune(host) | |
178 | ||
f91f0fd5 TL |
179 | def _refresh_hosts_and_daemons(self) -> None: |
180 | bad_hosts = [] | |
181 | failures = [] | |
182 | ||
b3b6e05e TL |
183 | # host -> path -> (mode, uid, gid, content, digest) |
184 | client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {} | |
185 | ||
186 | # ceph.conf | |
187 | if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys: | |
188 | config = self.mgr.get_minimal_ceph_conf().encode('utf-8') | |
189 | config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest()) | |
190 | ||
191 | if self.mgr.manage_etc_ceph_ceph_conf: | |
192 | try: | |
193 | pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts) | |
194 | ha = HostAssignment( | |
195 | spec=ServiceSpec('mon', placement=pspec), | |
196 | hosts=self.mgr._schedulable_hosts(), | |
197 | daemons=[], | |
198 | networks=self.mgr.cache.networks, | |
199 | ) | |
200 | all_slots, _, _ = ha.place() | |
201 | for host in {s.hostname for s in all_slots}: | |
202 | if host not in client_files: | |
203 | client_files[host] = {} | |
204 | client_files[host]['/etc/ceph/ceph.conf'] = ( | |
205 | 0o644, 0, 0, bytes(config), str(config_digest) | |
206 | ) | |
207 | except Exception as e: | |
208 | self.mgr.log.warning( | |
209 | f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}') | |
210 | ||
211 | # client keyrings | |
212 | for ks in self.mgr.keys.keys.values(): | |
213 | assert config | |
214 | assert config_digest | |
215 | try: | |
216 | ret, keyring, err = self.mgr.mon_command({ | |
217 | 'prefix': 'auth get', | |
218 | 'entity': ks.entity, | |
219 | }) | |
220 | if ret: | |
221 | self.log.warning(f'unable to fetch keyring for {ks.entity}') | |
222 | continue | |
223 | digest = ''.join('%02x' % c for c in hashlib.sha256( | |
224 | keyring.encode('utf-8')).digest()) | |
225 | ha = HostAssignment( | |
226 | spec=ServiceSpec('mon', placement=ks.placement), | |
227 | hosts=self.mgr._schedulable_hosts(), | |
228 | daemons=[], | |
229 | networks=self.mgr.cache.networks, | |
230 | ) | |
231 | all_slots, _, _ = ha.place() | |
232 | for host in {s.hostname for s in all_slots}: | |
233 | if host not in client_files: | |
234 | client_files[host] = {} | |
235 | client_files[host]['/etc/ceph/ceph.conf'] = ( | |
236 | 0o644, 0, 0, bytes(config), str(config_digest) | |
237 | ) | |
238 | client_files[host][ks.path] = ( | |
239 | ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest | |
240 | ) | |
241 | except Exception as e: | |
242 | self.log.warning( | |
243 | f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}') | |
244 | ||
f91f0fd5 | 245 | @forall_hosts |
adb31ebb TL |
246 | def refresh(host: str) -> None: |
247 | ||
f67539c2 TL |
248 | # skip hosts that are in maintenance - they could be powered off |
249 | if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance": | |
250 | return | |
251 | ||
f91f0fd5 TL |
252 | if self.mgr.cache.host_needs_check(host): |
253 | r = self._check_host(host) | |
254 | if r is not None: | |
255 | bad_hosts.append(r) | |
256 | if self.mgr.cache.host_needs_daemon_refresh(host): | |
257 | self.log.debug('refreshing %s daemons' % host) | |
258 | r = self._refresh_host_daemons(host) | |
259 | if r: | |
260 | failures.append(r) | |
261 | ||
262 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url: | |
263 | self.log.debug(f"Logging `{host}` into custom registry") | |
f67539c2 TL |
264 | r = self._registry_login(host, self.mgr.registry_url, |
265 | self.mgr.registry_username, self.mgr.registry_password) | |
f91f0fd5 TL |
266 | if r: |
267 | bad_hosts.append(r) | |
268 | ||
269 | if self.mgr.cache.host_needs_device_refresh(host): | |
270 | self.log.debug('refreshing %s devices' % host) | |
271 | r = self._refresh_host_devices(host) | |
272 | if r: | |
273 | failures.append(r) | |
274 | ||
adb31ebb | 275 | if self.mgr.cache.host_needs_facts_refresh(host): |
f67539c2 | 276 | self.log.debug(('Refreshing %s facts' % host)) |
adb31ebb TL |
277 | r = self._refresh_facts(host) |
278 | if r: | |
279 | failures.append(r) | |
280 | ||
f91f0fd5 TL |
281 | if self.mgr.cache.host_needs_osdspec_preview_refresh(host): |
282 | self.log.debug(f"refreshing OSDSpec previews for {host}") | |
283 | r = self._refresh_host_osdspec_previews(host) | |
284 | if r: | |
285 | failures.append(r) | |
286 | ||
b3b6e05e TL |
287 | if ( |
288 | self.mgr.cache.host_needs_autotune_memory(host) | |
289 | and not self.mgr.inventory.has_label(host, '_no_autotune_memory') | |
290 | ): | |
291 | self.log.debug(f"autotuning memory for {host}") | |
292 | self._autotune_host_memory(host) | |
293 | ||
294 | # client files | |
295 | updated_files = False | |
296 | old_files = self.mgr.cache.get_host_client_files(host).copy() | |
297 | for path, m in client_files.get(host, {}).items(): | |
298 | mode, uid, gid, content, digest = m | |
299 | if path in old_files: | |
300 | match = old_files[path] == (digest, mode, uid, gid) | |
301 | del old_files[path] | |
302 | if match: | |
303 | continue | |
304 | self.log.info(f'Updating {host}:{path}') | |
305 | self._write_remote_file(host, path, content, mode, uid, gid) | |
306 | self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid) | |
307 | updated_files = True | |
308 | for path in old_files.keys(): | |
309 | self.log.info(f'Removing {host}:{path}') | |
310 | with self._remote_connection(host) as tpl: | |
311 | conn, connr = tpl | |
312 | out, err, code = remoto.process.check( | |
313 | conn, | |
314 | ['rm', '-f', path]) | |
315 | updated_files = True | |
316 | self.mgr.cache.removed_client_file(host, path) | |
317 | if updated_files: | |
318 | self.mgr.cache.save_host(host) | |
f91f0fd5 TL |
319 | |
320 | refresh(self.mgr.cache.get_hosts()) | |
321 | ||
f67539c2 TL |
322 | self.mgr.config_checker.run_checks() |
323 | ||
f91f0fd5 | 324 | health_changed = False |
adb31ebb TL |
325 | for k in [ |
326 | 'CEPHADM_HOST_CHECK_FAILED', | |
327 | 'CEPHADM_FAILED_DAEMON', | |
328 | 'CEPHADM_REFRESH_FAILED', | |
329 | ]: | |
330 | if k in self.mgr.health_checks: | |
331 | del self.mgr.health_checks[k] | |
332 | health_changed = True | |
f91f0fd5 TL |
333 | if bad_hosts: |
334 | self.mgr.health_checks['CEPHADM_HOST_CHECK_FAILED'] = { | |
335 | 'severity': 'warning', | |
336 | 'summary': '%d hosts fail cephadm check' % len(bad_hosts), | |
337 | 'count': len(bad_hosts), | |
338 | 'detail': bad_hosts, | |
339 | } | |
340 | health_changed = True | |
341 | if failures: | |
342 | self.mgr.health_checks['CEPHADM_REFRESH_FAILED'] = { | |
343 | 'severity': 'warning', | |
344 | 'summary': 'failed to probe daemons or devices', | |
345 | 'count': len(failures), | |
346 | 'detail': failures, | |
347 | } | |
348 | health_changed = True | |
adb31ebb TL |
349 | failed_daemons = [] |
350 | for dd in self.mgr.cache.get_daemons(): | |
f67539c2 | 351 | if dd.status is not None and dd.status == DaemonDescriptionStatus.error: |
adb31ebb TL |
352 | failed_daemons.append('daemon %s on %s is in %s state' % ( |
353 | dd.name(), dd.hostname, dd.status_desc | |
354 | )) | |
355 | if failed_daemons: | |
356 | self.mgr.health_checks['CEPHADM_FAILED_DAEMON'] = { | |
357 | 'severity': 'warning', | |
358 | 'summary': '%d failed cephadm daemon(s)' % len(failed_daemons), | |
359 | 'count': len(failed_daemons), | |
360 | 'detail': failed_daemons, | |
361 | } | |
f91f0fd5 TL |
362 | health_changed = True |
363 | if health_changed: | |
364 | self.mgr.set_health_checks(self.mgr.health_checks) | |
365 | ||
adb31ebb | 366 | def _check_host(self, host: str) -> Optional[str]: |
f91f0fd5 | 367 | if host not in self.mgr.inventory: |
adb31ebb | 368 | return None |
f91f0fd5 TL |
369 | self.log.debug(' checking %s' % host) |
370 | try: | |
f67539c2 | 371 | out, err, code = self._run_cephadm( |
f91f0fd5 TL |
372 | host, cephadmNoImage, 'check-host', [], |
373 | error_ok=True, no_fsid=True) | |
374 | self.mgr.cache.update_last_host_check(host) | |
375 | self.mgr.cache.save_host(host) | |
376 | if code: | |
377 | self.log.debug(' host %s failed check' % host) | |
378 | if self.mgr.warn_on_failed_host_check: | |
379 | return 'host %s failed check: %s' % (host, err) | |
380 | else: | |
381 | self.log.debug(' host %s ok' % host) | |
382 | except Exception as e: | |
383 | self.log.debug(' host %s failed check' % host) | |
384 | return 'host %s failed check: %s' % (host, e) | |
adb31ebb | 385 | return None |
f91f0fd5 | 386 | |
adb31ebb | 387 | def _refresh_host_daemons(self, host: str) -> Optional[str]: |
f91f0fd5 | 388 | try: |
adb31ebb TL |
389 | ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True) |
390 | except OrchestratorError as e: | |
391 | return str(e) | |
f91f0fd5 TL |
392 | dm = {} |
393 | for d in ls: | |
394 | if not d['style'].startswith('cephadm'): | |
395 | continue | |
396 | if d['fsid'] != self.mgr._cluster_fsid: | |
397 | continue | |
398 | if '.' not in d['name']: | |
399 | continue | |
400 | sd = orchestrator.DaemonDescription() | |
adb31ebb | 401 | sd.last_refresh = datetime_now() |
f91f0fd5 TL |
402 | for k in ['created', 'started', 'last_configured', 'last_deployed']: |
403 | v = d.get(k, None) | |
404 | if v: | |
405 | setattr(sd, k, str_to_datetime(d[k])) | |
406 | sd.daemon_type = d['name'].split('.')[0] | |
407 | sd.daemon_id = '.'.join(d['name'].split('.')[1:]) | |
408 | sd.hostname = host | |
409 | sd.container_id = d.get('container_id') | |
410 | if sd.container_id: | |
411 | # shorten the hash | |
412 | sd.container_id = sd.container_id[0:12] | |
413 | sd.container_image_name = d.get('container_image_name') | |
414 | sd.container_image_id = d.get('container_image_id') | |
f67539c2 TL |
415 | sd.container_image_digests = d.get('container_image_digests') |
416 | sd.memory_usage = d.get('memory_usage') | |
417 | sd.memory_request = d.get('memory_request') | |
418 | sd.memory_limit = d.get('memory_limit') | |
419 | sd._service_name = d.get('service_name') | |
420 | sd.deployed_by = d.get('deployed_by') | |
f91f0fd5 | 421 | sd.version = d.get('version') |
f67539c2 TL |
422 | sd.ports = d.get('ports') |
423 | sd.ip = d.get('ip') | |
b3b6e05e TL |
424 | sd.rank = int(d['rank']) if d.get('rank') is not None else None |
425 | sd.rank_generation = int(d['rank_generation']) if d.get('rank_generation') is not None else None | |
f91f0fd5 TL |
426 | if sd.daemon_type == 'osd': |
427 | sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id) | |
428 | if 'state' in d: | |
429 | sd.status_desc = d['state'] | |
430 | sd.status = { | |
f67539c2 TL |
431 | 'running': DaemonDescriptionStatus.running, |
432 | 'stopped': DaemonDescriptionStatus.stopped, | |
433 | 'error': DaemonDescriptionStatus.error, | |
434 | 'unknown': DaemonDescriptionStatus.error, | |
f91f0fd5 TL |
435 | }[d['state']] |
436 | else: | |
437 | sd.status_desc = 'unknown' | |
438 | sd.status = None | |
439 | dm[sd.name()] = sd | |
440 | self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm))) | |
441 | self.mgr.cache.update_host_daemons(host, dm) | |
442 | self.mgr.cache.save_host(host) | |
443 | return None | |
444 | ||
adb31ebb | 445 | def _refresh_facts(self, host: str) -> Optional[str]: |
f91f0fd5 | 446 | try: |
adb31ebb TL |
447 | val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True) |
448 | except OrchestratorError as e: | |
449 | return str(e) | |
450 | ||
451 | self.mgr.cache.update_host_facts(host, val) | |
452 | ||
453 | return None | |
454 | ||
455 | def _refresh_host_devices(self, host: str) -> Optional[str]: | |
f67539c2 TL |
456 | |
457 | with_lsm = self.mgr.get_module_option('device_enhanced_scan') | |
458 | inventory_args = ['--', 'inventory', | |
459 | '--format=json', | |
460 | '--filter-for-batch'] | |
461 | if with_lsm: | |
462 | inventory_args.insert(-1, "--with-lsm") | |
463 | ||
f91f0fd5 | 464 | try: |
adb31ebb TL |
465 | try: |
466 | devices = self._run_cephadm_json(host, 'osd', 'ceph-volume', | |
f67539c2 | 467 | inventory_args) |
adb31ebb TL |
468 | except OrchestratorError as e: |
469 | if 'unrecognized arguments: --filter-for-batch' in str(e): | |
f67539c2 TL |
470 | rerun_args = inventory_args.copy() |
471 | rerun_args.remove('--filter-for-batch') | |
adb31ebb | 472 | devices = self._run_cephadm_json(host, 'osd', 'ceph-volume', |
f67539c2 | 473 | rerun_args) |
adb31ebb TL |
474 | else: |
475 | raise | |
476 | ||
477 | networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True) | |
478 | except OrchestratorError as e: | |
479 | return str(e) | |
480 | ||
f91f0fd5 TL |
481 | self.log.debug('Refreshed host %s devices (%d) networks (%s)' % ( |
482 | host, len(devices), len(networks))) | |
adb31ebb TL |
483 | ret = inventory.Devices.from_json(devices) |
484 | self.mgr.cache.update_host_devices_networks(host, ret.devices, networks) | |
f91f0fd5 TL |
485 | self.update_osdspec_previews(host) |
486 | self.mgr.cache.save_host(host) | |
487 | return None | |
488 | ||
adb31ebb | 489 | def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]: |
f91f0fd5 TL |
490 | self.update_osdspec_previews(host) |
491 | self.mgr.cache.save_host(host) | |
492 | self.log.debug(f'Refreshed OSDSpec previews for host <{host}>') | |
adb31ebb | 493 | return None |
f91f0fd5 | 494 | |
adb31ebb | 495 | def update_osdspec_previews(self, search_host: str = '') -> None: |
f91f0fd5 TL |
496 | # Set global 'pending' flag for host |
497 | self.mgr.cache.loading_osdspec_preview.add(search_host) | |
498 | previews = [] | |
499 | # query OSDSpecs for host <search host> and generate/get the preview | |
500 | # There can be multiple previews for one host due to multiple OSDSpecs. | |
501 | previews.extend(self.mgr.osd_service.get_previews(search_host)) | |
f67539c2 | 502 | self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>') |
f91f0fd5 TL |
503 | self.mgr.cache.osdspec_previews[search_host] = previews |
504 | # Unset global 'pending' flag for host | |
505 | self.mgr.cache.loading_osdspec_preview.remove(search_host) | |
506 | ||
f91f0fd5 TL |
507 | def _check_for_strays(self) -> None: |
508 | self.log.debug('_check_for_strays') | |
509 | for k in ['CEPHADM_STRAY_HOST', | |
510 | 'CEPHADM_STRAY_DAEMON']: | |
511 | if k in self.mgr.health_checks: | |
512 | del self.mgr.health_checks[k] | |
513 | if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons: | |
514 | ls = self.mgr.list_servers() | |
515 | managed = self.mgr.cache.get_daemon_names() | |
516 | host_detail = [] # type: List[str] | |
517 | host_num_daemons = 0 | |
518 | daemon_detail = [] # type: List[str] | |
519 | for item in ls: | |
520 | host = item.get('hostname') | |
f67539c2 | 521 | assert isinstance(host, str) |
f91f0fd5 | 522 | daemons = item.get('services') # misnomer! |
f67539c2 | 523 | assert isinstance(daemons, list) |
f91f0fd5 TL |
524 | missing_names = [] |
525 | for s in daemons: | |
f67539c2 TL |
526 | daemon_id = s.get('id') |
527 | assert daemon_id | |
528 | name = '%s.%s' % (s.get('type'), daemon_id) | |
529 | if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']: | |
f91f0fd5 | 530 | metadata = self.mgr.get_metadata( |
f67539c2 TL |
531 | cast(str, s.get('type')), daemon_id, {}) |
532 | assert metadata is not None | |
533 | try: | |
534 | if s.get('type') == 'rgw-nfs': | |
535 | # https://tracker.ceph.com/issues/49573 | |
536 | name = metadata['id'][:-4] | |
537 | else: | |
538 | name = '%s.%s' % (s.get('type'), metadata['id']) | |
539 | except (KeyError, TypeError): | |
f91f0fd5 | 540 | self.log.debug( |
f67539c2 TL |
541 | "Failed to find daemon id for %s service %s" % ( |
542 | s.get('type'), s.get('id') | |
543 | ) | |
544 | ) | |
f91f0fd5 TL |
545 | |
546 | if host not in self.mgr.inventory: | |
547 | missing_names.append(name) | |
548 | host_num_daemons += 1 | |
549 | if name not in managed: | |
550 | daemon_detail.append( | |
551 | 'stray daemon %s on host %s not managed by cephadm' % (name, host)) | |
552 | if missing_names: | |
553 | host_detail.append( | |
554 | 'stray host %s has %d stray daemons: %s' % ( | |
555 | host, len(missing_names), missing_names)) | |
556 | if self.mgr.warn_on_stray_hosts and host_detail: | |
557 | self.mgr.health_checks['CEPHADM_STRAY_HOST'] = { | |
558 | 'severity': 'warning', | |
559 | 'summary': '%d stray host(s) with %s daemon(s) ' | |
560 | 'not managed by cephadm' % ( | |
561 | len(host_detail), host_num_daemons), | |
562 | 'count': len(host_detail), | |
563 | 'detail': host_detail, | |
564 | } | |
565 | if self.mgr.warn_on_stray_daemons and daemon_detail: | |
566 | self.mgr.health_checks['CEPHADM_STRAY_DAEMON'] = { | |
567 | 'severity': 'warning', | |
adb31ebb | 568 | 'summary': '%d stray daemon(s) not managed by cephadm' % ( |
f91f0fd5 TL |
569 | len(daemon_detail)), |
570 | 'count': len(daemon_detail), | |
571 | 'detail': daemon_detail, | |
572 | } | |
573 | self.mgr.set_health_checks(self.mgr.health_checks) | |
574 | ||
575 | def _apply_all_services(self) -> bool: | |
576 | r = False | |
577 | specs = [] # type: List[ServiceSpec] | |
f67539c2 | 578 | for sn, spec in self.mgr.spec_store.active_specs.items(): |
f91f0fd5 TL |
579 | specs.append(spec) |
580 | for spec in specs: | |
581 | try: | |
582 | if self._apply_service(spec): | |
583 | r = True | |
584 | except Exception as e: | |
585 | self.log.exception('Failed to apply %s spec %s: %s' % ( | |
586 | spec.service_name(), spec, e)) | |
587 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) | |
588 | ||
589 | return r | |
590 | ||
f67539c2 TL |
591 | def _apply_service_config(self, spec: ServiceSpec) -> None: |
592 | if spec.config: | |
593 | section = utils.name_to_config_section(spec.service_name()) | |
594 | for k, v in spec.config.items(): | |
595 | try: | |
596 | current = self.mgr.get_foreign_ceph_option(section, k) | |
597 | except KeyError: | |
598 | self.log.warning( | |
599 | f'Ignoring invalid {spec.service_name()} config option {k}' | |
600 | ) | |
601 | self.mgr.events.for_service( | |
602 | spec, OrchestratorEvent.ERROR, f'Invalid config option {k}' | |
603 | ) | |
604 | continue | |
605 | if current != v: | |
606 | self.log.debug(f'setting [{section}] {k} = {v}') | |
607 | try: | |
608 | self.mgr.check_mon_command({ | |
609 | 'prefix': 'config set', | |
610 | 'name': k, | |
611 | 'value': str(v), | |
612 | 'who': section, | |
613 | }) | |
614 | except MonCommandFailed as e: | |
615 | self.log.warning( | |
616 | f'Failed to set {spec.service_name()} option {k}: {e}' | |
617 | ) | |
f91f0fd5 TL |
618 | |
619 | def _apply_service(self, spec: ServiceSpec) -> bool: | |
620 | """ | |
621 | Schedule a service. Deploy new daemons or remove old ones, depending | |
622 | on the target label and count specified in the placement. | |
623 | """ | |
624 | self.mgr.migration.verify_no_migration() | |
625 | ||
f67539c2 | 626 | service_type = spec.service_type |
f91f0fd5 TL |
627 | service_name = spec.service_name() |
628 | if spec.unmanaged: | |
629 | self.log.debug('Skipping unmanaged service %s' % service_name) | |
630 | return False | |
631 | if spec.preview_only: | |
632 | self.log.debug('Skipping preview_only service %s' % service_name) | |
633 | return False | |
634 | self.log.debug('Applying service %s spec' % service_name) | |
635 | ||
f67539c2 | 636 | self._apply_service_config(spec) |
f91f0fd5 | 637 | |
f67539c2 | 638 | if service_type == 'osd': |
f91f0fd5 TL |
639 | self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec)) |
640 | # TODO: return True would result in a busy loop | |
641 | # can't know if daemon count changed; create_from_spec doesn't | |
642 | # return a solid indication | |
643 | return False | |
644 | ||
f67539c2 | 645 | svc = self.mgr.cephadm_services[service_type] |
f91f0fd5 TL |
646 | daemons = self.mgr.cache.get_daemons_by_service(service_name) |
647 | ||
b3b6e05e | 648 | public_networks: List[str] = [] |
f67539c2 TL |
649 | if service_type == 'mon': |
650 | out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network')) | |
f91f0fd5 | 651 | if '/' in out: |
b3b6e05e TL |
652 | public_networks = [x.strip() for x in out.split(',')] |
653 | self.log.debug('mon public_network(s) is %s' % public_networks) | |
f91f0fd5 TL |
654 | |
655 | def matches_network(host): | |
656 | # type: (str) -> bool | |
b3b6e05e | 657 | # make sure we have 1 or more IPs for any of those networks on that |
f91f0fd5 | 658 | # host |
b3b6e05e TL |
659 | for network in public_networks: |
660 | if len(self.mgr.cache.networks[host].get(network, [])) > 0: | |
661 | return True | |
662 | self.log.info( | |
663 | f"Filtered out host {host}: does not belong to mon public_network" | |
664 | f" ({','.join(public_networks)})" | |
665 | ) | |
666 | return False | |
f91f0fd5 | 667 | |
b3b6e05e TL |
668 | rank_map = None |
669 | if svc.ranked(): | |
670 | rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {} | |
f91f0fd5 TL |
671 | ha = HostAssignment( |
672 | spec=spec, | |
b3b6e05e | 673 | hosts=self.mgr._schedulable_hosts(), |
f67539c2 TL |
674 | daemons=daemons, |
675 | networks=self.mgr.cache.networks, | |
676 | filter_new_host=( | |
677 | matches_network if service_type == 'mon' | |
678 | else None | |
679 | ), | |
680 | allow_colo=svc.allow_colo(), | |
681 | primary_daemon_type=svc.primary_daemon_type(), | |
682 | per_host_daemon_type=svc.per_host_daemon_type(), | |
b3b6e05e | 683 | rank_map=rank_map, |
f91f0fd5 TL |
684 | ) |
685 | ||
f67539c2 TL |
686 | try: |
687 | all_slots, slots_to_add, daemons_to_remove = ha.place() | |
b3b6e05e TL |
688 | daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get( |
689 | 'status', '').lower() not in ['maintenance', 'offline'])] | |
f67539c2 TL |
690 | self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove)) |
691 | except OrchestratorError as e: | |
692 | self.log.error('Failed to apply %s spec %s: %s' % ( | |
693 | spec.service_name(), spec, e)) | |
694 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) | |
695 | return False | |
f91f0fd5 TL |
696 | |
697 | r = None | |
698 | ||
699 | # sanity check | |
f67539c2 TL |
700 | final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove) |
701 | if service_type in ['mon', 'mgr'] and final_count < 1: | |
702 | self.log.debug('cannot scale mon|mgr below 1)') | |
f91f0fd5 TL |
703 | return False |
704 | ||
b3b6e05e TL |
705 | # progress |
706 | progress_id = str(uuid.uuid4()) | |
707 | delta: List[str] = [] | |
708 | if slots_to_add: | |
709 | delta += [f'+{len(slots_to_add)}'] | |
710 | if daemons_to_remove: | |
711 | delta += [f'-{len(daemons_to_remove)}'] | |
712 | progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})' | |
713 | progress_total = len(slots_to_add) + len(daemons_to_remove) | |
714 | progress_done = 0 | |
715 | ||
716 | def update_progress() -> None: | |
717 | self.mgr.remote( | |
718 | 'progress', 'update', progress_id, | |
719 | ev_msg=progress_title, | |
720 | ev_progress=(progress_done / progress_total), | |
721 | add_to_ceph_s=True, | |
722 | ) | |
723 | ||
724 | if progress_total: | |
725 | update_progress() | |
726 | ||
f91f0fd5 TL |
727 | # add any? |
728 | did_config = False | |
729 | ||
f67539c2 TL |
730 | self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add) |
731 | self.log.debug('Daemons that will be removed: %s' % daemons_to_remove) | |
f91f0fd5 | 732 | |
b3b6e05e TL |
733 | # assign names |
734 | for i in range(len(slots_to_add)): | |
735 | slot = slots_to_add[i] | |
736 | slot = slot.assign_name(self.mgr.get_unique_name( | |
737 | slot.daemon_type, | |
738 | slot.hostname, | |
739 | daemons, | |
740 | prefix=spec.service_id, | |
741 | forcename=slot.name, | |
742 | rank=slot.rank, | |
743 | rank_generation=slot.rank_generation, | |
744 | )) | |
745 | slots_to_add[i] = slot | |
746 | if rank_map is not None: | |
747 | assert slot.rank is not None | |
748 | assert slot.rank_generation is not None | |
749 | assert rank_map[slot.rank][slot.rank_generation] is None | |
750 | rank_map[slot.rank][slot.rank_generation] = slot.name | |
751 | ||
752 | if rank_map: | |
753 | # record the rank_map before we make changes so that if we fail the | |
754 | # next mgr will clean up. | |
755 | self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map) | |
756 | ||
757 | # remove daemons now, since we are going to fence them anyway | |
758 | for d in daemons_to_remove: | |
759 | assert d.hostname is not None | |
760 | self._remove_daemon(d.name(), d.hostname) | |
761 | daemons_to_remove = [] | |
762 | ||
763 | # fence them | |
764 | svc.fence_old_ranks(spec, rank_map, len(all_slots)) | |
765 | ||
766 | # create daemons | |
f67539c2 TL |
767 | for slot in slots_to_add: |
768 | # first remove daemon on conflicting port? | |
769 | if slot.ports: | |
770 | for d in daemons_to_remove: | |
771 | if d.hostname != slot.hostname: | |
772 | continue | |
773 | if not (set(d.ports or []) & set(slot.ports)): | |
774 | continue | |
775 | if d.ip and slot.ip and d.ip != slot.ip: | |
776 | continue | |
777 | self.log.info( | |
778 | f'Removing {d.name()} before deploying to {slot} to avoid a port conflict' | |
779 | ) | |
780 | # NOTE: we don't check ok-to-stop here to avoid starvation if | |
781 | # there is only 1 gateway. | |
782 | self._remove_daemon(d.name(), d.hostname) | |
783 | daemons_to_remove.remove(d) | |
b3b6e05e | 784 | progress_done += 1 |
f67539c2 TL |
785 | break |
786 | ||
787 | # deploy new daemon | |
b3b6e05e | 788 | daemon_id = slot.name |
f67539c2 TL |
789 | if not did_config: |
790 | svc.config(spec, daemon_id) | |
f91f0fd5 TL |
791 | did_config = True |
792 | ||
f67539c2 TL |
793 | daemon_spec = svc.make_daemon_spec( |
794 | slot.hostname, daemon_id, slot.network, spec, | |
795 | daemon_type=slot.daemon_type, | |
796 | ports=slot.ports, | |
797 | ip=slot.ip, | |
b3b6e05e TL |
798 | rank=slot.rank, |
799 | rank_generation=slot.rank_generation, | |
f67539c2 | 800 | ) |
f91f0fd5 | 801 | self.log.debug('Placing %s.%s on host %s' % ( |
f67539c2 | 802 | slot.daemon_type, daemon_id, slot.hostname)) |
f91f0fd5 TL |
803 | |
804 | try: | |
f67539c2 TL |
805 | daemon_spec = svc.prepare_create(daemon_spec) |
806 | self._create_daemon(daemon_spec) | |
f91f0fd5 | 807 | r = True |
b3b6e05e TL |
808 | progress_done += 1 |
809 | update_progress() | |
f91f0fd5 | 810 | except (RuntimeError, OrchestratorError) as e: |
b3b6e05e TL |
811 | msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} " |
812 | f"on {slot.hostname}: {e}") | |
813 | self.mgr.events.for_service(spec, 'ERROR', msg) | |
814 | self.mgr.log.error(msg) | |
f91f0fd5 TL |
815 | # only return "no change" if no one else has already succeeded. |
816 | # later successes will also change to True | |
817 | if r is None: | |
818 | r = False | |
b3b6e05e TL |
819 | progress_done += 1 |
820 | update_progress() | |
f91f0fd5 TL |
821 | continue |
822 | ||
823 | # add to daemon list so next name(s) will also be unique | |
824 | sd = orchestrator.DaemonDescription( | |
f67539c2 TL |
825 | hostname=slot.hostname, |
826 | daemon_type=slot.daemon_type, | |
f91f0fd5 TL |
827 | daemon_id=daemon_id, |
828 | ) | |
829 | daemons.append(sd) | |
830 | ||
831 | # remove any? | |
f67539c2 TL |
832 | def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool: |
833 | daemon_ids = [d.daemon_id for d in remove_daemons] | |
834 | assert None not in daemon_ids | |
835 | # setting force flag retains previous behavior | |
836 | r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True) | |
f91f0fd5 TL |
837 | return not r.retval |
838 | ||
f67539c2 | 839 | while daemons_to_remove and not _ok_to_stop(daemons_to_remove): |
f91f0fd5 | 840 | # let's find a subset that is ok-to-stop |
f67539c2 TL |
841 | daemons_to_remove.pop() |
842 | for d in daemons_to_remove: | |
f91f0fd5 | 843 | r = True |
f67539c2 TL |
844 | assert d.hostname is not None |
845 | self._remove_daemon(d.name(), d.hostname) | |
f91f0fd5 | 846 | |
b3b6e05e TL |
847 | progress_done += 1 |
848 | update_progress() | |
849 | ||
850 | self.mgr.remote('progress', 'complete', progress_id) | |
851 | ||
f91f0fd5 TL |
852 | if r is None: |
853 | r = False | |
854 | return r | |
855 | ||
856 | def _check_daemons(self) -> None: | |
857 | ||
858 | daemons = self.mgr.cache.get_daemons() | |
859 | daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list) | |
860 | for dd in daemons: | |
861 | # orphan? | |
f67539c2 TL |
862 | spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None) |
863 | assert dd.hostname is not None | |
864 | assert dd.daemon_type is not None | |
865 | assert dd.daemon_id is not None | |
f91f0fd5 TL |
866 | if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']: |
867 | # (mon and mgr specs should always exist; osds aren't matched | |
868 | # to a service spec) | |
869 | self.log.info('Removing orphan daemon %s...' % dd.name()) | |
f67539c2 | 870 | self._remove_daemon(dd.name(), dd.hostname) |
f91f0fd5 TL |
871 | |
872 | # ignore unmanaged services | |
873 | if spec and spec.unmanaged: | |
874 | continue | |
875 | ||
b3b6e05e TL |
876 | # ignore daemons for deleted services |
877 | if dd.service_name() in self.mgr.spec_store.spec_deleted: | |
878 | continue | |
879 | ||
f91f0fd5 TL |
880 | # These daemon types require additional configs after creation |
881 | if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']: | |
882 | daemons_post[dd.daemon_type].append(dd) | |
883 | ||
f67539c2 | 884 | if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon( |
f91f0fd5 TL |
885 | self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id: |
886 | dd.is_active = True | |
887 | else: | |
888 | dd.is_active = False | |
889 | ||
f67539c2 | 890 | deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id) |
f91f0fd5 TL |
891 | last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps( |
892 | dd.hostname, dd.name()) | |
893 | if last_deps is None: | |
894 | last_deps = [] | |
895 | action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) | |
896 | if not last_config: | |
897 | self.log.info('Reconfiguring %s (unknown last config time)...' % ( | |
898 | dd.name())) | |
899 | action = 'reconfig' | |
900 | elif last_deps != deps: | |
901 | self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps, | |
902 | deps)) | |
903 | self.log.info('Reconfiguring %s (dependencies changed)...' % ( | |
904 | dd.name())) | |
905 | action = 'reconfig' | |
906 | elif self.mgr.last_monmap and \ | |
907 | self.mgr.last_monmap > last_config and \ | |
908 | dd.daemon_type in CEPH_TYPES: | |
909 | self.log.info('Reconfiguring %s (monmap changed)...' % dd.name()) | |
910 | action = 'reconfig' | |
911 | elif self.mgr.extra_ceph_conf_is_newer(last_config) and \ | |
912 | dd.daemon_type in CEPH_TYPES: | |
913 | self.log.info('Reconfiguring %s (extra config changed)...' % dd.name()) | |
914 | action = 'reconfig' | |
915 | if action: | |
916 | if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \ | |
917 | and action == 'reconfig': | |
918 | action = 'redeploy' | |
919 | try: | |
f67539c2 TL |
920 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd) |
921 | self.mgr._daemon_action(daemon_spec, action=action) | |
f91f0fd5 TL |
922 | self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()) |
923 | except OrchestratorError as e: | |
924 | self.mgr.events.from_orch_error(e) | |
925 | if dd.daemon_type in daemons_post: | |
926 | del daemons_post[dd.daemon_type] | |
927 | # continue... | |
928 | except Exception as e: | |
929 | self.mgr.events.for_daemon_from_exception(dd.name(), e) | |
930 | if dd.daemon_type in daemons_post: | |
931 | del daemons_post[dd.daemon_type] | |
932 | # continue... | |
933 | ||
934 | # do daemon post actions | |
935 | for daemon_type, daemon_descs in daemons_post.items(): | |
936 | if daemon_type in self.mgr.requires_post_actions: | |
937 | self.mgr.requires_post_actions.remove(daemon_type) | |
f67539c2 TL |
938 | self.mgr._get_cephadm_service(daemon_type_to_service( |
939 | daemon_type)).daemon_check_post(daemon_descs) | |
940 | ||
941 | def _purge_deleted_services(self) -> None: | |
942 | existing_services = self.mgr.spec_store.all_specs.items() | |
943 | for service_name, spec in list(existing_services): | |
944 | if service_name not in self.mgr.spec_store.spec_deleted: | |
945 | continue | |
946 | if self.mgr.cache.get_daemons_by_service(service_name): | |
947 | continue | |
948 | if spec.service_type in ['mon', 'mgr']: | |
949 | continue | |
950 | ||
951 | logger.info(f'Purge service {service_name}') | |
952 | ||
953 | self.mgr.cephadm_services[spec.service_type].purge(service_name) | |
954 | self.mgr.spec_store.finally_rm(service_name) | |
f91f0fd5 | 955 | |
adb31ebb | 956 | def convert_tags_to_repo_digest(self) -> None: |
f91f0fd5 TL |
957 | if not self.mgr.use_repo_digest: |
958 | return | |
959 | settings = self.mgr.upgrade.get_distinct_container_image_settings() | |
960 | digests: Dict[str, ContainerInspectInfo] = {} | |
961 | for container_image_ref in set(settings.values()): | |
962 | if not is_repo_digest(container_image_ref): | |
f67539c2 TL |
963 | image_info = self._get_container_image_info(container_image_ref) |
964 | if image_info.repo_digests: | |
965 | # FIXME: we assume the first digest here is the best | |
966 | assert is_repo_digest(image_info.repo_digests[0]), image_info | |
f91f0fd5 TL |
967 | digests[container_image_ref] = image_info |
968 | ||
969 | for entity, container_image_ref in settings.items(): | |
970 | if not is_repo_digest(container_image_ref): | |
971 | image_info = digests[container_image_ref] | |
f67539c2 TL |
972 | if image_info.repo_digests: |
973 | # FIXME: we assume the first digest here is the best | |
974 | self.mgr.set_container_image(entity, image_info.repo_digests[0]) | |
975 | ||
976 | def _create_daemon(self, | |
977 | daemon_spec: CephadmDaemonDeploySpec, | |
978 | reconfig: bool = False, | |
979 | osd_uuid_map: Optional[Dict[str, Any]] = None, | |
980 | ) -> str: | |
981 | ||
982 | with set_exception_subject('service', orchestrator.DaemonDescription( | |
983 | daemon_type=daemon_spec.daemon_type, | |
984 | daemon_id=daemon_spec.daemon_id, | |
985 | hostname=daemon_spec.host, | |
986 | ).service_id(), overwrite=True): | |
987 | ||
988 | try: | |
989 | image = '' | |
990 | start_time = datetime_now() | |
991 | ports: List[int] = daemon_spec.ports if daemon_spec.ports else [] | |
992 | ||
993 | if daemon_spec.daemon_type == 'container': | |
994 | spec = cast(CustomContainerSpec, | |
995 | self.mgr.spec_store[daemon_spec.service_name].spec) | |
996 | image = spec.image | |
997 | if spec.ports: | |
998 | ports.extend(spec.ports) | |
999 | ||
1000 | if daemon_spec.daemon_type == 'cephadm-exporter': | |
1001 | if not reconfig: | |
1002 | assert daemon_spec.host | |
1003 | self._deploy_cephadm_binary(daemon_spec.host) | |
1004 | ||
f67539c2 TL |
1005 | # TCP port to open in the host firewall |
1006 | if len(ports) > 0: | |
1007 | daemon_spec.extra_args.extend([ | |
1008 | '--tcp-ports', ' '.join(map(str, ports)) | |
1009 | ]) | |
1010 | ||
1011 | # osd deployments needs an --osd-uuid arg | |
1012 | if daemon_spec.daemon_type == 'osd': | |
1013 | if not osd_uuid_map: | |
1014 | osd_uuid_map = self.mgr.get_osd_uuid_map() | |
1015 | osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) | |
1016 | if not osd_uuid: | |
1017 | raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) | |
1018 | daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) | |
1019 | ||
1020 | if reconfig: | |
1021 | daemon_spec.extra_args.append('--reconfig') | |
1022 | if self.mgr.allow_ptrace: | |
1023 | daemon_spec.extra_args.append('--allow-ptrace') | |
1024 | ||
1025 | if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url: | |
1026 | self._registry_login(daemon_spec.host, self.mgr.registry_url, | |
1027 | self.mgr.registry_username, self.mgr.registry_password) | |
1028 | ||
1029 | self.log.info('%s daemon %s on %s' % ( | |
1030 | 'Reconfiguring' if reconfig else 'Deploying', | |
1031 | daemon_spec.name(), daemon_spec.host)) | |
1032 | ||
1033 | out, err, code = self._run_cephadm( | |
1034 | daemon_spec.host, daemon_spec.name(), 'deploy', | |
1035 | [ | |
1036 | '--name', daemon_spec.name(), | |
1037 | '--meta-json', json.dumps({ | |
1038 | 'service_name': daemon_spec.service_name, | |
1039 | 'ports': daemon_spec.ports, | |
1040 | 'ip': daemon_spec.ip, | |
1041 | 'deployed_by': self.mgr.get_active_mgr_digests(), | |
b3b6e05e TL |
1042 | 'rank': daemon_spec.rank, |
1043 | 'rank_generation': daemon_spec.rank_generation, | |
f67539c2 TL |
1044 | }), |
1045 | '--config-json', '-', | |
1046 | ] + daemon_spec.extra_args, | |
1047 | stdin=json.dumps(daemon_spec.final_config), | |
1048 | image=image) | |
1049 | ||
1050 | # refresh daemon state? (ceph daemon reconfig does not need it) | |
1051 | if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES: | |
1052 | if not code and daemon_spec.host in self.mgr.cache.daemons: | |
1053 | # prime cached service state with what we (should have) | |
1054 | # just created | |
1055 | sd = daemon_spec.to_daemon_description( | |
1056 | DaemonDescriptionStatus.running, 'starting') | |
1057 | self.mgr.cache.add_daemon(daemon_spec.host, sd) | |
1058 | if daemon_spec.daemon_type in [ | |
1059 | 'grafana', 'iscsi', 'prometheus', 'alertmanager' | |
1060 | ]: | |
1061 | self.mgr.requires_post_actions.add(daemon_spec.daemon_type) | |
1062 | self.mgr.cache.invalidate_host_daemons(daemon_spec.host) | |
1063 | ||
1064 | self.mgr.cache.update_daemon_config_deps( | |
1065 | daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time) | |
1066 | self.mgr.cache.save_host(daemon_spec.host) | |
1067 | msg = "{} {} on host '{}'".format( | |
1068 | 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) | |
1069 | if not code: | |
1070 | self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) | |
1071 | else: | |
1072 | what = 'reconfigure' if reconfig else 'deploy' | |
1073 | self.mgr.events.for_daemon( | |
1074 | daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') | |
1075 | return msg | |
1076 | except OrchestratorError: | |
1077 | redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names() | |
1078 | if not reconfig and not redeploy: | |
1079 | # we have to clean up the daemon. E.g. keyrings. | |
1080 | servict_type = daemon_type_to_service(daemon_spec.daemon_type) | |
1081 | dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed') | |
1082 | self.mgr.cephadm_services[servict_type].post_remove(dd) | |
1083 | raise | |
1084 | ||
1085 | def _remove_daemon(self, name: str, host: str) -> str: | |
1086 | """ | |
1087 | Remove a daemon | |
1088 | """ | |
1089 | (daemon_type, daemon_id) = name.split('.', 1) | |
1090 | daemon = orchestrator.DaemonDescription( | |
1091 | daemon_type=daemon_type, | |
1092 | daemon_id=daemon_id, | |
1093 | hostname=host) | |
1094 | ||
1095 | with set_exception_subject('service', daemon.service_id(), overwrite=True): | |
1096 | ||
1097 | self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon) | |
1098 | ||
1099 | # NOTE: we are passing the 'force' flag here, which means | |
1100 | # we can delete a mon instances data. | |
1101 | args = ['--name', name, '--force'] | |
1102 | self.log.info('Removing daemon %s from %s' % (name, host)) | |
1103 | out, err, code = self._run_cephadm( | |
1104 | host, name, 'rm-daemon', args) | |
1105 | if not code: | |
1106 | # remove item from cache | |
1107 | self.mgr.cache.rm_daemon(host, name) | |
1108 | self.mgr.cache.invalidate_host_daemons(host) | |
1109 | ||
1110 | self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].post_remove(daemon) | |
1111 | ||
1112 | return "Removed {} from host '{}'".format(name, host) | |
adb31ebb TL |
1113 | |
1114 | def _run_cephadm_json(self, | |
1115 | host: str, | |
1116 | entity: Union[CephadmNoImage, str], | |
1117 | command: str, | |
1118 | args: List[str], | |
1119 | no_fsid: Optional[bool] = False, | |
1120 | image: Optional[str] = "", | |
1121 | ) -> Any: | |
1122 | try: | |
f67539c2 | 1123 | out, err, code = self._run_cephadm( |
adb31ebb TL |
1124 | host, entity, command, args, no_fsid=no_fsid, image=image) |
1125 | if code: | |
1126 | raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}') | |
1127 | except Exception as e: | |
1128 | raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}') | |
1129 | try: | |
1130 | return json.loads(''.join(out)) | |
1131 | except (ValueError, KeyError): | |
1132 | msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON' | |
1133 | self.log.exception(f'{msg}: {"".join(out)}') | |
1134 | raise OrchestratorError(msg) | |
1135 | ||
f67539c2 TL |
1136 | def _run_cephadm(self, |
1137 | host: str, | |
1138 | entity: Union[CephadmNoImage, str], | |
1139 | command: str, | |
1140 | args: List[str], | |
1141 | addr: Optional[str] = "", | |
1142 | stdin: Optional[str] = "", | |
1143 | no_fsid: Optional[bool] = False, | |
1144 | error_ok: Optional[bool] = False, | |
1145 | image: Optional[str] = "", | |
1146 | env_vars: Optional[List[str]] = None, | |
1147 | ) -> Tuple[List[str], List[str], int]: | |
1148 | """ | |
1149 | Run cephadm on the remote host with the given command + args | |
1150 | ||
1151 | Important: You probably don't want to run _run_cephadm from CLI handlers | |
1152 | ||
1153 | :env_vars: in format -> [KEY=VALUE, ..] | |
1154 | """ | |
1155 | self.log.debug(f"_run_cephadm : command = {command}") | |
1156 | self.log.debug(f"_run_cephadm : args = {args}") | |
1157 | ||
1158 | bypass_image = ('cephadm-exporter',) | |
1159 | ||
1160 | with self._remote_connection(host, addr) as tpl: | |
1161 | conn, connr = tpl | |
1162 | assert image or entity | |
1163 | # Skip the image check for daemons deployed that are not ceph containers | |
1164 | if not str(entity).startswith(bypass_image): | |
1165 | if not image and entity is not cephadmNoImage: | |
1166 | image = self.mgr._get_container_image(entity) | |
1167 | ||
1168 | final_args = [] | |
1169 | ||
1170 | # global args | |
1171 | if env_vars: | |
1172 | for env_var_pair in env_vars: | |
1173 | final_args.extend(['--env', env_var_pair]) | |
1174 | ||
1175 | if image: | |
1176 | final_args.extend(['--image', image]) | |
1177 | ||
1178 | if not self.mgr.container_init: | |
1179 | final_args += ['--no-container-init'] | |
1180 | ||
1181 | # subcommand | |
1182 | final_args.append(command) | |
1183 | ||
1184 | # subcommand args | |
1185 | if not no_fsid: | |
1186 | final_args += ['--fsid', self.mgr._cluster_fsid] | |
1187 | ||
1188 | final_args += args | |
1189 | ||
1190 | # exec | |
1191 | self.log.debug('args: %s' % (' '.join(final_args))) | |
1192 | if self.mgr.mode == 'root': | |
1193 | if stdin: | |
1194 | self.log.debug('stdin: %s' % stdin) | |
1195 | ||
1196 | python = connr.choose_python() | |
1197 | if not python: | |
1198 | raise RuntimeError( | |
1199 | 'unable to find python on %s (tried %s in %s)' % ( | |
1200 | host, remotes.PYTHONS, remotes.PATH)) | |
1201 | try: | |
1202 | out, err, code = remoto.process.check( | |
1203 | conn, | |
1204 | [python, self.mgr.cephadm_binary_path] + final_args, | |
1205 | stdin=stdin.encode('utf-8') if stdin is not None else None) | |
1206 | if code == 2: | |
1207 | out_ls, err_ls, code_ls = remoto.process.check( | |
1208 | conn, ['ls', self.mgr.cephadm_binary_path]) | |
1209 | if code_ls == 2: | |
1210 | self._deploy_cephadm_binary_conn(conn, host) | |
1211 | out, err, code = remoto.process.check( | |
1212 | conn, | |
1213 | [python, self.mgr.cephadm_binary_path] + final_args, | |
1214 | stdin=stdin.encode('utf-8') if stdin is not None else None) | |
1215 | ||
1216 | except RuntimeError as e: | |
1217 | self.mgr._reset_con(host) | |
1218 | if error_ok: | |
1219 | return [], [str(e)], 1 | |
1220 | raise | |
1221 | ||
1222 | elif self.mgr.mode == 'cephadm-package': | |
1223 | try: | |
1224 | out, err, code = remoto.process.check( | |
1225 | conn, | |
1226 | ['sudo', '/usr/bin/cephadm'] + final_args, | |
1227 | stdin=stdin) | |
1228 | except RuntimeError as e: | |
1229 | self.mgr._reset_con(host) | |
1230 | if error_ok: | |
1231 | return [], [str(e)], 1 | |
1232 | raise | |
1233 | else: | |
1234 | assert False, 'unsupported mode' | |
1235 | ||
1236 | self.log.debug('code: %d' % code) | |
1237 | if out: | |
1238 | self.log.debug('out: %s' % '\n'.join(out)) | |
1239 | if err: | |
1240 | self.log.debug('err: %s' % '\n'.join(err)) | |
1241 | if code and not error_ok: | |
1242 | raise OrchestratorError( | |
1243 | 'cephadm exited with an error code: %d, stderr:%s' % ( | |
1244 | code, '\n'.join(err))) | |
1245 | return out, err, code | |
1246 | ||
1247 | def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo: | |
1248 | # pick a random host... | |
1249 | host = None | |
1250 | for host_name in self.mgr.inventory.keys(): | |
1251 | host = host_name | |
1252 | break | |
1253 | if not host: | |
1254 | raise OrchestratorError('no hosts defined') | |
1255 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url: | |
1256 | self._registry_login(host, self.mgr.registry_url, | |
1257 | self.mgr.registry_username, self.mgr.registry_password) | |
1258 | ||
1259 | j = self._run_cephadm_json(host, '', 'pull', [], image=image_name, no_fsid=True) | |
1260 | ||
1261 | r = ContainerInspectInfo( | |
1262 | j['image_id'], | |
1263 | j.get('ceph_version'), | |
1264 | j.get('repo_digests') | |
1265 | ) | |
1266 | self.log.debug(f'image {image_name} -> {r}') | |
1267 | return r | |
1268 | ||
1269 | # function responsible for logging single host into custom registry | |
1270 | def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]: | |
1271 | self.log.debug(f"Attempting to log host {host} into custom registry @ {url}") | |
1272 | # want to pass info over stdin rather than through normal list of args | |
1273 | args_str = json.dumps({ | |
1274 | 'url': url, | |
1275 | 'username': username, | |
1276 | 'password': password, | |
1277 | }) | |
1278 | out, err, code = self._run_cephadm( | |
1279 | host, 'mon', 'registry-login', | |
1280 | ['--registry-json', '-'], stdin=args_str, error_ok=True) | |
1281 | if code: | |
1282 | return f"Host {host} failed to login to {url} as {username} with given password" | |
1283 | return None | |
1284 | ||
1285 | def _deploy_cephadm_binary(self, host: str) -> None: | |
1286 | # Use tee (from coreutils) to create a copy of cephadm on the target machine | |
1287 | self.log.info(f"Deploying cephadm binary to {host}") | |
1288 | with self._remote_connection(host) as tpl: | |
1289 | conn, _connr = tpl | |
1290 | return self._deploy_cephadm_binary_conn(conn, host) | |
1291 | ||
1292 | def _deploy_cephadm_binary_conn(self, conn: "BaseConnection", host: str) -> None: | |
1293 | _out, _err, code = remoto.process.check( | |
1294 | conn, | |
1295 | ['mkdir', '-p', f'/var/lib/ceph/{self.mgr._cluster_fsid}']) | |
1296 | if code: | |
1297 | msg = f"Unable to deploy the cephadm binary to {host}: {_err}" | |
1298 | self.log.warning(msg) | |
1299 | raise OrchestratorError(msg) | |
1300 | _out, _err, code = remoto.process.check( | |
1301 | conn, | |
1302 | ['tee', '-', self.mgr.cephadm_binary_path], | |
1303 | stdin=self.mgr._cephadm.encode('utf-8')) | |
1304 | if code: | |
1305 | msg = f"Unable to deploy the cephadm binary to {host}: {_err}" | |
1306 | self.log.warning(msg) | |
1307 | raise OrchestratorError(msg) | |
1308 | ||
b3b6e05e TL |
1309 | def _write_remote_file(self, |
1310 | host: str, | |
1311 | path: str, | |
1312 | content: bytes, | |
1313 | mode: int, | |
1314 | uid: int, | |
1315 | gid: int) -> None: | |
1316 | with self._remote_connection(host) as tpl: | |
1317 | conn, connr = tpl | |
1318 | try: | |
1319 | errmsg = connr.write_file(path, content, mode, uid, gid) | |
1320 | if errmsg is not None: | |
1321 | raise OrchestratorError(errmsg) | |
1322 | except Exception as e: | |
1323 | msg = f"Unable to write {host}:{path}: {e}" | |
1324 | self.log.warning(msg) | |
1325 | raise OrchestratorError(msg) | |
1326 | ||
f67539c2 TL |
1327 | @contextmanager |
1328 | def _remote_connection(self, | |
1329 | host: str, | |
1330 | addr: Optional[str] = None, | |
1331 | ) -> Iterator[Tuple["BaseConnection", Any]]: | |
1332 | if not addr and host in self.mgr.inventory: | |
1333 | addr = self.mgr.inventory.get_addr(host) | |
1334 | ||
1335 | self.mgr.offline_hosts_remove(host) | |
1336 | ||
1337 | try: | |
1338 | try: | |
1339 | if not addr: | |
1340 | raise OrchestratorError("host address is empty") | |
1341 | conn, connr = self.mgr._get_connection(addr) | |
1342 | except OSError as e: | |
1343 | self.mgr._reset_con(host) | |
1344 | msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}" | |
1345 | raise execnet.gateway_bootstrap.HostNotFound(msg) | |
1346 | ||
1347 | yield (conn, connr) | |
1348 | ||
1349 | except execnet.gateway_bootstrap.HostNotFound as e: | |
1350 | # this is a misleading exception as it seems to be thrown for | |
1351 | # any sort of connection failure, even those having nothing to | |
1352 | # do with "host not found" (e.g., ssh key permission denied). | |
1353 | self.mgr.offline_hosts.add(host) | |
1354 | self.mgr._reset_con(host) | |
1355 | ||
1356 | user = self.mgr.ssh_user if self.mgr.mode == 'root' else 'cephadm' | |
1357 | if str(e).startswith("Can't communicate"): | |
1358 | msg = str(e) | |
1359 | else: | |
1360 | msg = f'''Failed to connect to {host} ({addr}). | |
1361 | Please make sure that the host is reachable and accepts connections using the cephadm SSH key | |
1362 | ||
1363 | To add the cephadm SSH key to the host: | |
1364 | > ceph cephadm get-pub-key > ~/ceph.pub | |
1365 | > ssh-copy-id -f -i ~/ceph.pub {user}@{addr} | |
1366 | ||
b3b6e05e TL |
1367 | To check that the host is reachable open a new shell with the --no-hosts flag: |
1368 | > cephadm shell --no-hosts | |
1369 | ||
1370 | Then run the following: | |
f67539c2 TL |
1371 | > ceph cephadm get-ssh-config > ssh_config |
1372 | > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key | |
1373 | > chmod 0600 ~/cephadm_private_key | |
1374 | > ssh -F ssh_config -i ~/cephadm_private_key {user}@{addr}''' | |
1375 | raise OrchestratorError(msg) from e | |
1376 | except Exception as ex: | |
1377 | self.log.exception(ex) | |
1378 | raise |