]>
Commit | Line | Data |
---|---|---|
b3b6e05e | 1 | import hashlib |
f91f0fd5 TL |
2 | import json |
3 | import logging | |
b3b6e05e | 4 | import uuid |
f91f0fd5 | 5 | from collections import defaultdict |
f67539c2 TL |
6 | from contextlib import contextmanager |
7 | from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator | |
8 | ||
9 | from cephadm import remotes | |
f91f0fd5 TL |
10 | |
11 | try: | |
12 | import remoto | |
f67539c2 | 13 | import execnet.gateway_bootstrap |
f91f0fd5 TL |
14 | except ImportError: |
15 | remoto = None | |
16 | ||
17 | from ceph.deployment import inventory | |
18 | from ceph.deployment.drive_group import DriveGroupSpec | |
b3b6e05e | 19 | from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec |
adb31ebb | 20 | from ceph.utils import str_to_datetime, datetime_now |
f91f0fd5 TL |
21 | |
22 | import orchestrator | |
f67539c2 TL |
23 | from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \ |
24 | DaemonDescriptionStatus, daemon_type_to_service | |
25 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec | |
f91f0fd5 | 26 | from cephadm.schedule import HostAssignment |
b3b6e05e | 27 | from cephadm.autotune import MemoryAutotuner |
f67539c2 TL |
28 | from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \ |
29 | CephadmNoImage, CEPH_TYPES, ContainerInspectInfo | |
30 | from mgr_module import MonCommandFailed | |
b3b6e05e | 31 | from mgr_util import format_bytes |
f67539c2 TL |
32 | |
33 | from . import utils | |
f91f0fd5 TL |
34 | |
35 | if TYPE_CHECKING: | |
f67539c2 TL |
36 | from cephadm.module import CephadmOrchestrator |
37 | from remoto.backends import BaseConnection | |
f91f0fd5 TL |
38 | |
39 | logger = logging.getLogger(__name__) | |
40 | ||
522d829b TL |
41 | REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw'] |
42 | ||
f91f0fd5 TL |
43 | |
44 | class CephadmServe: | |
45 | """ | |
46 | This module contains functions that are executed in the | |
47 | serve() thread. Thus they don't block the CLI. | |
48 | ||
f67539c2 TL |
49 | Please see the `Note regarding network calls from CLI handlers` |
50 | chapter in the cephadm developer guide. | |
51 | ||
f91f0fd5 TL |
52 | On the other hand, These function should *not* be called form |
53 | CLI handlers, to avoid blocking the CLI | |
54 | """ | |
55 | ||
56 | def __init__(self, mgr: "CephadmOrchestrator"): | |
57 | self.mgr: "CephadmOrchestrator" = mgr | |
58 | self.log = logger | |
59 | ||
60 | def serve(self) -> None: | |
61 | """ | |
62 | The main loop of cephadm. | |
63 | ||
64 | A command handler will typically change the declarative state | |
65 | of cephadm. This loop will then attempt to apply this new state. | |
66 | """ | |
67 | self.log.debug("serve starting") | |
f67539c2 TL |
68 | self.mgr.config_checker.load_network_config() |
69 | ||
f91f0fd5 TL |
70 | while self.mgr.run: |
71 | ||
72 | try: | |
73 | ||
74 | self.convert_tags_to_repo_digest() | |
75 | ||
76 | # refresh daemons | |
77 | self.log.debug('refreshing hosts and daemons') | |
78 | self._refresh_hosts_and_daemons() | |
79 | ||
80 | self._check_for_strays() | |
81 | ||
82 | self._update_paused_health() | |
83 | ||
522d829b TL |
84 | if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard: |
85 | self.mgr.need_connect_dashboard_rgw = False | |
86 | if 'dashboard' in self.mgr.get('mgr_map')['modules']: | |
87 | self.log.info('Checking dashboard <-> RGW credentials') | |
88 | self.mgr.remote('dashboard', 'set_rgw_credentials') | |
89 | ||
f91f0fd5 | 90 | if not self.mgr.paused: |
adb31ebb | 91 | self.mgr.to_remove_osds.process_removal_queue() |
f91f0fd5 TL |
92 | |
93 | self.mgr.migration.migrate() | |
94 | if self.mgr.migration.is_migration_ongoing(): | |
95 | continue | |
96 | ||
97 | if self._apply_all_services(): | |
98 | continue # did something, refresh | |
99 | ||
100 | self._check_daemons() | |
101 | ||
f67539c2 TL |
102 | self._purge_deleted_services() |
103 | ||
f91f0fd5 TL |
104 | if self.mgr.upgrade.continue_upgrade(): |
105 | continue | |
106 | ||
107 | except OrchestratorError as e: | |
108 | if e.event_subject: | |
109 | self.mgr.events.from_orch_error(e) | |
110 | ||
111 | self._serve_sleep() | |
112 | self.log.debug("serve exit") | |
113 | ||
adb31ebb | 114 | def _serve_sleep(self) -> None: |
f67539c2 TL |
115 | sleep_interval = max( |
116 | 30, | |
117 | min( | |
118 | self.mgr.host_check_interval, | |
119 | self.mgr.facts_cache_timeout, | |
120 | self.mgr.daemon_cache_timeout, | |
121 | self.mgr.device_cache_timeout, | |
122 | ) | |
123 | ) | |
f91f0fd5 | 124 | self.log.debug('Sleeping for %d seconds', sleep_interval) |
f67539c2 | 125 | self.mgr.event.wait(sleep_interval) |
f91f0fd5 TL |
126 | self.mgr.event.clear() |
127 | ||
adb31ebb | 128 | def _update_paused_health(self) -> None: |
f91f0fd5 | 129 | if self.mgr.paused: |
a4b75251 | 130 | self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, ["'ceph orch resume' to resume"]) |
f91f0fd5 | 131 | else: |
a4b75251 | 132 | self.mgr.remove_health_warning('CEPHADM_PAUSED') |
f91f0fd5 | 133 | |
b3b6e05e TL |
134 | def _autotune_host_memory(self, host: str) -> None: |
135 | total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0) | |
136 | if not total_mem: | |
137 | val = None | |
138 | else: | |
139 | total_mem *= 1024 # kb -> bytes | |
140 | total_mem *= self.mgr.autotune_memory_target_ratio | |
141 | a = MemoryAutotuner( | |
142 | daemons=self.mgr.cache.get_daemons_by_host(host), | |
143 | config_get=self.mgr.get_foreign_ceph_option, | |
144 | total_mem=total_mem, | |
145 | ) | |
146 | val, osds = a.tune() | |
147 | any_changed = False | |
148 | for o in osds: | |
149 | if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val: | |
150 | self.mgr.check_mon_command({ | |
151 | 'prefix': 'config rm', | |
152 | 'who': o, | |
153 | 'name': 'osd_memory_target', | |
154 | }) | |
155 | any_changed = True | |
156 | if val is not None: | |
157 | if any_changed: | |
158 | self.mgr.log.info( | |
159 | f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}' | |
160 | ) | |
161 | ret, out, err = self.mgr.mon_command({ | |
162 | 'prefix': 'config set', | |
163 | 'who': f'osd/host:{host}', | |
164 | 'name': 'osd_memory_target', | |
165 | 'value': str(val), | |
166 | }) | |
167 | if ret: | |
168 | self.log.warning( | |
169 | f'Unable to set osd_memory_target on {host} to {val}: {err}' | |
170 | ) | |
171 | else: | |
172 | self.mgr.check_mon_command({ | |
173 | 'prefix': 'config rm', | |
174 | 'who': f'osd/host:{host}', | |
175 | 'name': 'osd_memory_target', | |
176 | }) | |
177 | self.mgr.cache.update_autotune(host) | |
178 | ||
f91f0fd5 TL |
179 | def _refresh_hosts_and_daemons(self) -> None: |
180 | bad_hosts = [] | |
181 | failures = [] | |
182 | ||
b3b6e05e TL |
183 | # host -> path -> (mode, uid, gid, content, digest) |
184 | client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {} | |
185 | ||
186 | # ceph.conf | |
187 | if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys: | |
188 | config = self.mgr.get_minimal_ceph_conf().encode('utf-8') | |
189 | config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest()) | |
190 | ||
191 | if self.mgr.manage_etc_ceph_ceph_conf: | |
192 | try: | |
193 | pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts) | |
194 | ha = HostAssignment( | |
195 | spec=ServiceSpec('mon', placement=pspec), | |
196 | hosts=self.mgr._schedulable_hosts(), | |
522d829b | 197 | unreachable_hosts=self.mgr._unreachable_hosts(), |
b3b6e05e TL |
198 | daemons=[], |
199 | networks=self.mgr.cache.networks, | |
200 | ) | |
201 | all_slots, _, _ = ha.place() | |
202 | for host in {s.hostname for s in all_slots}: | |
203 | if host not in client_files: | |
204 | client_files[host] = {} | |
205 | client_files[host]['/etc/ceph/ceph.conf'] = ( | |
206 | 0o644, 0, 0, bytes(config), str(config_digest) | |
207 | ) | |
208 | except Exception as e: | |
209 | self.mgr.log.warning( | |
210 | f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}') | |
211 | ||
212 | # client keyrings | |
213 | for ks in self.mgr.keys.keys.values(): | |
214 | assert config | |
215 | assert config_digest | |
216 | try: | |
217 | ret, keyring, err = self.mgr.mon_command({ | |
218 | 'prefix': 'auth get', | |
219 | 'entity': ks.entity, | |
220 | }) | |
221 | if ret: | |
222 | self.log.warning(f'unable to fetch keyring for {ks.entity}') | |
223 | continue | |
224 | digest = ''.join('%02x' % c for c in hashlib.sha256( | |
225 | keyring.encode('utf-8')).digest()) | |
226 | ha = HostAssignment( | |
227 | spec=ServiceSpec('mon', placement=ks.placement), | |
228 | hosts=self.mgr._schedulable_hosts(), | |
522d829b | 229 | unreachable_hosts=self.mgr._unreachable_hosts(), |
b3b6e05e TL |
230 | daemons=[], |
231 | networks=self.mgr.cache.networks, | |
232 | ) | |
233 | all_slots, _, _ = ha.place() | |
234 | for host in {s.hostname for s in all_slots}: | |
235 | if host not in client_files: | |
236 | client_files[host] = {} | |
237 | client_files[host]['/etc/ceph/ceph.conf'] = ( | |
238 | 0o644, 0, 0, bytes(config), str(config_digest) | |
239 | ) | |
240 | client_files[host][ks.path] = ( | |
241 | ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest | |
242 | ) | |
243 | except Exception as e: | |
244 | self.log.warning( | |
245 | f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}') | |
246 | ||
f91f0fd5 | 247 | @forall_hosts |
adb31ebb TL |
248 | def refresh(host: str) -> None: |
249 | ||
f67539c2 TL |
250 | # skip hosts that are in maintenance - they could be powered off |
251 | if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance": | |
252 | return | |
253 | ||
f91f0fd5 TL |
254 | if self.mgr.cache.host_needs_check(host): |
255 | r = self._check_host(host) | |
256 | if r is not None: | |
257 | bad_hosts.append(r) | |
258 | if self.mgr.cache.host_needs_daemon_refresh(host): | |
259 | self.log.debug('refreshing %s daemons' % host) | |
260 | r = self._refresh_host_daemons(host) | |
261 | if r: | |
262 | failures.append(r) | |
263 | ||
264 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url: | |
265 | self.log.debug(f"Logging `{host}` into custom registry") | |
f67539c2 TL |
266 | r = self._registry_login(host, self.mgr.registry_url, |
267 | self.mgr.registry_username, self.mgr.registry_password) | |
f91f0fd5 TL |
268 | if r: |
269 | bad_hosts.append(r) | |
270 | ||
271 | if self.mgr.cache.host_needs_device_refresh(host): | |
272 | self.log.debug('refreshing %s devices' % host) | |
273 | r = self._refresh_host_devices(host) | |
274 | if r: | |
275 | failures.append(r) | |
276 | ||
adb31ebb | 277 | if self.mgr.cache.host_needs_facts_refresh(host): |
f67539c2 | 278 | self.log.debug(('Refreshing %s facts' % host)) |
adb31ebb TL |
279 | r = self._refresh_facts(host) |
280 | if r: | |
281 | failures.append(r) | |
282 | ||
f91f0fd5 TL |
283 | if self.mgr.cache.host_needs_osdspec_preview_refresh(host): |
284 | self.log.debug(f"refreshing OSDSpec previews for {host}") | |
285 | r = self._refresh_host_osdspec_previews(host) | |
286 | if r: | |
287 | failures.append(r) | |
288 | ||
b3b6e05e TL |
289 | if ( |
290 | self.mgr.cache.host_needs_autotune_memory(host) | |
291 | and not self.mgr.inventory.has_label(host, '_no_autotune_memory') | |
292 | ): | |
293 | self.log.debug(f"autotuning memory for {host}") | |
294 | self._autotune_host_memory(host) | |
295 | ||
296 | # client files | |
297 | updated_files = False | |
298 | old_files = self.mgr.cache.get_host_client_files(host).copy() | |
299 | for path, m in client_files.get(host, {}).items(): | |
300 | mode, uid, gid, content, digest = m | |
301 | if path in old_files: | |
302 | match = old_files[path] == (digest, mode, uid, gid) | |
303 | del old_files[path] | |
304 | if match: | |
305 | continue | |
306 | self.log.info(f'Updating {host}:{path}') | |
307 | self._write_remote_file(host, path, content, mode, uid, gid) | |
308 | self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid) | |
309 | updated_files = True | |
310 | for path in old_files.keys(): | |
311 | self.log.info(f'Removing {host}:{path}') | |
312 | with self._remote_connection(host) as tpl: | |
313 | conn, connr = tpl | |
314 | out, err, code = remoto.process.check( | |
315 | conn, | |
316 | ['rm', '-f', path]) | |
317 | updated_files = True | |
318 | self.mgr.cache.removed_client_file(host, path) | |
319 | if updated_files: | |
320 | self.mgr.cache.save_host(host) | |
f91f0fd5 TL |
321 | |
322 | refresh(self.mgr.cache.get_hosts()) | |
323 | ||
f67539c2 TL |
324 | self.mgr.config_checker.run_checks() |
325 | ||
adb31ebb TL |
326 | for k in [ |
327 | 'CEPHADM_HOST_CHECK_FAILED', | |
328 | 'CEPHADM_FAILED_DAEMON', | |
329 | 'CEPHADM_REFRESH_FAILED', | |
330 | ]: | |
a4b75251 | 331 | self.mgr.remove_health_warning(k) |
f91f0fd5 | 332 | if bad_hosts: |
a4b75251 | 333 | self.mgr.set_health_warning('CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts) |
f91f0fd5 | 334 | if failures: |
a4b75251 | 335 | self.mgr.set_health_warning('CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures) |
adb31ebb TL |
336 | failed_daemons = [] |
337 | for dd in self.mgr.cache.get_daemons(): | |
f67539c2 | 338 | if dd.status is not None and dd.status == DaemonDescriptionStatus.error: |
adb31ebb TL |
339 | failed_daemons.append('daemon %s on %s is in %s state' % ( |
340 | dd.name(), dd.hostname, dd.status_desc | |
341 | )) | |
342 | if failed_daemons: | |
a4b75251 | 343 | self.mgr.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len(failed_daemons), failed_daemons) |
f91f0fd5 | 344 | |
adb31ebb | 345 | def _check_host(self, host: str) -> Optional[str]: |
f91f0fd5 | 346 | if host not in self.mgr.inventory: |
adb31ebb | 347 | return None |
f91f0fd5 TL |
348 | self.log.debug(' checking %s' % host) |
349 | try: | |
522d829b | 350 | addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host |
f67539c2 | 351 | out, err, code = self._run_cephadm( |
f91f0fd5 TL |
352 | host, cephadmNoImage, 'check-host', [], |
353 | error_ok=True, no_fsid=True) | |
354 | self.mgr.cache.update_last_host_check(host) | |
355 | self.mgr.cache.save_host(host) | |
356 | if code: | |
522d829b | 357 | self.log.debug(' host %s (%s) failed check' % (host, addr)) |
f91f0fd5 | 358 | if self.mgr.warn_on_failed_host_check: |
522d829b | 359 | return 'host %s (%s) failed check: %s' % (host, addr, err) |
f91f0fd5 | 360 | else: |
522d829b | 361 | self.log.debug(' host %s (%s) ok' % (host, addr)) |
f91f0fd5 | 362 | except Exception as e: |
522d829b TL |
363 | self.log.debug(' host %s (%s) failed check' % (host, addr)) |
364 | return 'host %s (%s) failed check: %s' % (host, addr, e) | |
adb31ebb | 365 | return None |
f91f0fd5 | 366 | |
adb31ebb | 367 | def _refresh_host_daemons(self, host: str) -> Optional[str]: |
f91f0fd5 | 368 | try: |
adb31ebb TL |
369 | ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True) |
370 | except OrchestratorError as e: | |
371 | return str(e) | |
f91f0fd5 TL |
372 | dm = {} |
373 | for d in ls: | |
374 | if not d['style'].startswith('cephadm'): | |
375 | continue | |
376 | if d['fsid'] != self.mgr._cluster_fsid: | |
377 | continue | |
378 | if '.' not in d['name']: | |
379 | continue | |
380 | sd = orchestrator.DaemonDescription() | |
adb31ebb | 381 | sd.last_refresh = datetime_now() |
f91f0fd5 TL |
382 | for k in ['created', 'started', 'last_configured', 'last_deployed']: |
383 | v = d.get(k, None) | |
384 | if v: | |
385 | setattr(sd, k, str_to_datetime(d[k])) | |
386 | sd.daemon_type = d['name'].split('.')[0] | |
522d829b TL |
387 | if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES: |
388 | logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}") | |
389 | continue | |
390 | ||
f91f0fd5 TL |
391 | sd.daemon_id = '.'.join(d['name'].split('.')[1:]) |
392 | sd.hostname = host | |
393 | sd.container_id = d.get('container_id') | |
394 | if sd.container_id: | |
395 | # shorten the hash | |
396 | sd.container_id = sd.container_id[0:12] | |
397 | sd.container_image_name = d.get('container_image_name') | |
398 | sd.container_image_id = d.get('container_image_id') | |
f67539c2 TL |
399 | sd.container_image_digests = d.get('container_image_digests') |
400 | sd.memory_usage = d.get('memory_usage') | |
401 | sd.memory_request = d.get('memory_request') | |
402 | sd.memory_limit = d.get('memory_limit') | |
403 | sd._service_name = d.get('service_name') | |
404 | sd.deployed_by = d.get('deployed_by') | |
f91f0fd5 | 405 | sd.version = d.get('version') |
f67539c2 TL |
406 | sd.ports = d.get('ports') |
407 | sd.ip = d.get('ip') | |
b3b6e05e | 408 | sd.rank = int(d['rank']) if d.get('rank') is not None else None |
522d829b TL |
409 | sd.rank_generation = int(d['rank_generation']) if d.get( |
410 | 'rank_generation') is not None else None | |
f91f0fd5 TL |
411 | if sd.daemon_type == 'osd': |
412 | sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id) | |
413 | if 'state' in d: | |
414 | sd.status_desc = d['state'] | |
415 | sd.status = { | |
f67539c2 TL |
416 | 'running': DaemonDescriptionStatus.running, |
417 | 'stopped': DaemonDescriptionStatus.stopped, | |
418 | 'error': DaemonDescriptionStatus.error, | |
419 | 'unknown': DaemonDescriptionStatus.error, | |
f91f0fd5 TL |
420 | }[d['state']] |
421 | else: | |
422 | sd.status_desc = 'unknown' | |
423 | sd.status = None | |
424 | dm[sd.name()] = sd | |
425 | self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm))) | |
426 | self.mgr.cache.update_host_daemons(host, dm) | |
427 | self.mgr.cache.save_host(host) | |
428 | return None | |
429 | ||
adb31ebb | 430 | def _refresh_facts(self, host: str) -> Optional[str]: |
f91f0fd5 | 431 | try: |
adb31ebb TL |
432 | val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True) |
433 | except OrchestratorError as e: | |
434 | return str(e) | |
435 | ||
436 | self.mgr.cache.update_host_facts(host, val) | |
437 | ||
438 | return None | |
439 | ||
440 | def _refresh_host_devices(self, host: str) -> Optional[str]: | |
f67539c2 TL |
441 | |
442 | with_lsm = self.mgr.get_module_option('device_enhanced_scan') | |
443 | inventory_args = ['--', 'inventory', | |
a4b75251 | 444 | '--format=json-pretty', |
f67539c2 TL |
445 | '--filter-for-batch'] |
446 | if with_lsm: | |
447 | inventory_args.insert(-1, "--with-lsm") | |
448 | ||
f91f0fd5 | 449 | try: |
adb31ebb TL |
450 | try: |
451 | devices = self._run_cephadm_json(host, 'osd', 'ceph-volume', | |
f67539c2 | 452 | inventory_args) |
adb31ebb TL |
453 | except OrchestratorError as e: |
454 | if 'unrecognized arguments: --filter-for-batch' in str(e): | |
f67539c2 TL |
455 | rerun_args = inventory_args.copy() |
456 | rerun_args.remove('--filter-for-batch') | |
adb31ebb | 457 | devices = self._run_cephadm_json(host, 'osd', 'ceph-volume', |
f67539c2 | 458 | rerun_args) |
adb31ebb TL |
459 | else: |
460 | raise | |
461 | ||
462 | networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True) | |
463 | except OrchestratorError as e: | |
464 | return str(e) | |
465 | ||
f91f0fd5 TL |
466 | self.log.debug('Refreshed host %s devices (%d) networks (%s)' % ( |
467 | host, len(devices), len(networks))) | |
adb31ebb TL |
468 | ret = inventory.Devices.from_json(devices) |
469 | self.mgr.cache.update_host_devices_networks(host, ret.devices, networks) | |
f91f0fd5 TL |
470 | self.update_osdspec_previews(host) |
471 | self.mgr.cache.save_host(host) | |
472 | return None | |
473 | ||
adb31ebb | 474 | def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]: |
f91f0fd5 TL |
475 | self.update_osdspec_previews(host) |
476 | self.mgr.cache.save_host(host) | |
477 | self.log.debug(f'Refreshed OSDSpec previews for host <{host}>') | |
adb31ebb | 478 | return None |
f91f0fd5 | 479 | |
adb31ebb | 480 | def update_osdspec_previews(self, search_host: str = '') -> None: |
f91f0fd5 TL |
481 | # Set global 'pending' flag for host |
482 | self.mgr.cache.loading_osdspec_preview.add(search_host) | |
483 | previews = [] | |
484 | # query OSDSpecs for host <search host> and generate/get the preview | |
485 | # There can be multiple previews for one host due to multiple OSDSpecs. | |
486 | previews.extend(self.mgr.osd_service.get_previews(search_host)) | |
f67539c2 | 487 | self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>') |
f91f0fd5 TL |
488 | self.mgr.cache.osdspec_previews[search_host] = previews |
489 | # Unset global 'pending' flag for host | |
490 | self.mgr.cache.loading_osdspec_preview.remove(search_host) | |
491 | ||
f91f0fd5 TL |
492 | def _check_for_strays(self) -> None: |
493 | self.log.debug('_check_for_strays') | |
494 | for k in ['CEPHADM_STRAY_HOST', | |
495 | 'CEPHADM_STRAY_DAEMON']: | |
a4b75251 | 496 | self.mgr.remove_health_warning(k) |
f91f0fd5 TL |
497 | if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons: |
498 | ls = self.mgr.list_servers() | |
a4b75251 | 499 | self.log.debug(ls) |
f91f0fd5 TL |
500 | managed = self.mgr.cache.get_daemon_names() |
501 | host_detail = [] # type: List[str] | |
502 | host_num_daemons = 0 | |
503 | daemon_detail = [] # type: List[str] | |
504 | for item in ls: | |
505 | host = item.get('hostname') | |
f67539c2 | 506 | assert isinstance(host, str) |
f91f0fd5 | 507 | daemons = item.get('services') # misnomer! |
f67539c2 | 508 | assert isinstance(daemons, list) |
f91f0fd5 TL |
509 | missing_names = [] |
510 | for s in daemons: | |
f67539c2 TL |
511 | daemon_id = s.get('id') |
512 | assert daemon_id | |
513 | name = '%s.%s' % (s.get('type'), daemon_id) | |
514 | if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']: | |
f91f0fd5 | 515 | metadata = self.mgr.get_metadata( |
f67539c2 TL |
516 | cast(str, s.get('type')), daemon_id, {}) |
517 | assert metadata is not None | |
518 | try: | |
519 | if s.get('type') == 'rgw-nfs': | |
520 | # https://tracker.ceph.com/issues/49573 | |
521 | name = metadata['id'][:-4] | |
522 | else: | |
523 | name = '%s.%s' % (s.get('type'), metadata['id']) | |
524 | except (KeyError, TypeError): | |
f91f0fd5 | 525 | self.log.debug( |
f67539c2 TL |
526 | "Failed to find daemon id for %s service %s" % ( |
527 | s.get('type'), s.get('id') | |
528 | ) | |
529 | ) | |
f91f0fd5 TL |
530 | |
531 | if host not in self.mgr.inventory: | |
532 | missing_names.append(name) | |
533 | host_num_daemons += 1 | |
534 | if name not in managed: | |
535 | daemon_detail.append( | |
536 | 'stray daemon %s on host %s not managed by cephadm' % (name, host)) | |
537 | if missing_names: | |
538 | host_detail.append( | |
539 | 'stray host %s has %d stray daemons: %s' % ( | |
540 | host, len(missing_names), missing_names)) | |
541 | if self.mgr.warn_on_stray_hosts and host_detail: | |
a4b75251 | 542 | self.mgr.set_health_warning('CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail) |
f91f0fd5 | 543 | if self.mgr.warn_on_stray_daemons and daemon_detail: |
a4b75251 | 544 | self.mgr.set_health_warning('CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail) |
f91f0fd5 TL |
545 | |
546 | def _apply_all_services(self) -> bool: | |
547 | r = False | |
548 | specs = [] # type: List[ServiceSpec] | |
f67539c2 | 549 | for sn, spec in self.mgr.spec_store.active_specs.items(): |
f91f0fd5 | 550 | specs.append(spec) |
a4b75251 TL |
551 | for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']: |
552 | self.mgr.remove_health_warning(name) | |
553 | self.mgr.apply_spec_fails = [] | |
f91f0fd5 TL |
554 | for spec in specs: |
555 | try: | |
556 | if self._apply_service(spec): | |
557 | r = True | |
558 | except Exception as e: | |
a4b75251 TL |
559 | msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}' |
560 | self.log.exception(msg) | |
f91f0fd5 | 561 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) |
a4b75251 TL |
562 | self.mgr.apply_spec_fails.append((spec.service_name(), str(e))) |
563 | warnings = [] | |
564 | for x in self.mgr.apply_spec_fails: | |
565 | warnings.append(f'{x[0]}: {x[1]}') | |
566 | self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL', | |
567 | f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}", | |
568 | len(self.mgr.apply_spec_fails), | |
569 | warnings) | |
f91f0fd5 TL |
570 | |
571 | return r | |
572 | ||
f67539c2 TL |
573 | def _apply_service_config(self, spec: ServiceSpec) -> None: |
574 | if spec.config: | |
575 | section = utils.name_to_config_section(spec.service_name()) | |
a4b75251 TL |
576 | for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']: |
577 | self.mgr.remove_health_warning(name) | |
578 | invalid_config_options = [] | |
579 | options_failed_to_set = [] | |
f67539c2 TL |
580 | for k, v in spec.config.items(): |
581 | try: | |
582 | current = self.mgr.get_foreign_ceph_option(section, k) | |
583 | except KeyError: | |
a4b75251 TL |
584 | msg = f'Ignoring invalid {spec.service_name()} config option {k}' |
585 | self.log.warning(msg) | |
f67539c2 TL |
586 | self.mgr.events.for_service( |
587 | spec, OrchestratorEvent.ERROR, f'Invalid config option {k}' | |
588 | ) | |
a4b75251 | 589 | invalid_config_options.append(msg) |
f67539c2 TL |
590 | continue |
591 | if current != v: | |
592 | self.log.debug(f'setting [{section}] {k} = {v}') | |
593 | try: | |
594 | self.mgr.check_mon_command({ | |
595 | 'prefix': 'config set', | |
596 | 'name': k, | |
597 | 'value': str(v), | |
598 | 'who': section, | |
599 | }) | |
600 | except MonCommandFailed as e: | |
a4b75251 TL |
601 | msg = f'Failed to set {spec.service_name()} option {k}: {e}' |
602 | self.log.warning(msg) | |
603 | options_failed_to_set.append(msg) | |
604 | ||
605 | if invalid_config_options: | |
606 | self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(invalid_config_options), invalid_config_options) | |
607 | if options_failed_to_set: | |
608 | self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(options_failed_to_set), options_failed_to_set) | |
f91f0fd5 TL |
609 | |
610 | def _apply_service(self, spec: ServiceSpec) -> bool: | |
611 | """ | |
612 | Schedule a service. Deploy new daemons or remove old ones, depending | |
613 | on the target label and count specified in the placement. | |
614 | """ | |
615 | self.mgr.migration.verify_no_migration() | |
616 | ||
f67539c2 | 617 | service_type = spec.service_type |
f91f0fd5 TL |
618 | service_name = spec.service_name() |
619 | if spec.unmanaged: | |
620 | self.log.debug('Skipping unmanaged service %s' % service_name) | |
621 | return False | |
622 | if spec.preview_only: | |
623 | self.log.debug('Skipping preview_only service %s' % service_name) | |
624 | return False | |
625 | self.log.debug('Applying service %s spec' % service_name) | |
626 | ||
f67539c2 | 627 | self._apply_service_config(spec) |
f91f0fd5 | 628 | |
f67539c2 | 629 | if service_type == 'osd': |
f91f0fd5 TL |
630 | self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec)) |
631 | # TODO: return True would result in a busy loop | |
632 | # can't know if daemon count changed; create_from_spec doesn't | |
633 | # return a solid indication | |
634 | return False | |
635 | ||
f67539c2 | 636 | svc = self.mgr.cephadm_services[service_type] |
f91f0fd5 TL |
637 | daemons = self.mgr.cache.get_daemons_by_service(service_name) |
638 | ||
b3b6e05e | 639 | public_networks: List[str] = [] |
f67539c2 TL |
640 | if service_type == 'mon': |
641 | out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network')) | |
f91f0fd5 | 642 | if '/' in out: |
b3b6e05e TL |
643 | public_networks = [x.strip() for x in out.split(',')] |
644 | self.log.debug('mon public_network(s) is %s' % public_networks) | |
f91f0fd5 TL |
645 | |
646 | def matches_network(host): | |
647 | # type: (str) -> bool | |
b3b6e05e | 648 | # make sure we have 1 or more IPs for any of those networks on that |
f91f0fd5 | 649 | # host |
b3b6e05e TL |
650 | for network in public_networks: |
651 | if len(self.mgr.cache.networks[host].get(network, [])) > 0: | |
652 | return True | |
653 | self.log.info( | |
654 | f"Filtered out host {host}: does not belong to mon public_network" | |
655 | f" ({','.join(public_networks)})" | |
656 | ) | |
657 | return False | |
f91f0fd5 | 658 | |
b3b6e05e TL |
659 | rank_map = None |
660 | if svc.ranked(): | |
661 | rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {} | |
f91f0fd5 TL |
662 | ha = HostAssignment( |
663 | spec=spec, | |
b3b6e05e | 664 | hosts=self.mgr._schedulable_hosts(), |
522d829b | 665 | unreachable_hosts=self.mgr._unreachable_hosts(), |
f67539c2 TL |
666 | daemons=daemons, |
667 | networks=self.mgr.cache.networks, | |
668 | filter_new_host=( | |
669 | matches_network if service_type == 'mon' | |
670 | else None | |
671 | ), | |
672 | allow_colo=svc.allow_colo(), | |
673 | primary_daemon_type=svc.primary_daemon_type(), | |
674 | per_host_daemon_type=svc.per_host_daemon_type(), | |
b3b6e05e | 675 | rank_map=rank_map, |
f91f0fd5 TL |
676 | ) |
677 | ||
f67539c2 TL |
678 | try: |
679 | all_slots, slots_to_add, daemons_to_remove = ha.place() | |
b3b6e05e | 680 | daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get( |
522d829b | 681 | 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)] |
f67539c2 TL |
682 | self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove)) |
683 | except OrchestratorError as e: | |
a4b75251 TL |
684 | msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}' |
685 | self.log.error(msg) | |
f67539c2 | 686 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) |
a4b75251 TL |
687 | self.mgr.apply_spec_fails.append((spec.service_name(), str(e))) |
688 | warnings = [] | |
689 | for x in self.mgr.apply_spec_fails: | |
690 | warnings.append(f'{x[0]}: {x[1]}') | |
691 | self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL', | |
692 | f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}", | |
693 | len(self.mgr.apply_spec_fails), | |
694 | warnings) | |
f67539c2 | 695 | return False |
f91f0fd5 TL |
696 | |
697 | r = None | |
698 | ||
699 | # sanity check | |
f67539c2 TL |
700 | final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove) |
701 | if service_type in ['mon', 'mgr'] and final_count < 1: | |
702 | self.log.debug('cannot scale mon|mgr below 1)') | |
f91f0fd5 TL |
703 | return False |
704 | ||
b3b6e05e TL |
705 | # progress |
706 | progress_id = str(uuid.uuid4()) | |
707 | delta: List[str] = [] | |
708 | if slots_to_add: | |
709 | delta += [f'+{len(slots_to_add)}'] | |
710 | if daemons_to_remove: | |
711 | delta += [f'-{len(daemons_to_remove)}'] | |
712 | progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})' | |
713 | progress_total = len(slots_to_add) + len(daemons_to_remove) | |
714 | progress_done = 0 | |
715 | ||
716 | def update_progress() -> None: | |
717 | self.mgr.remote( | |
718 | 'progress', 'update', progress_id, | |
719 | ev_msg=progress_title, | |
720 | ev_progress=(progress_done / progress_total), | |
721 | add_to_ceph_s=True, | |
722 | ) | |
723 | ||
724 | if progress_total: | |
725 | update_progress() | |
726 | ||
f91f0fd5 TL |
727 | # add any? |
728 | did_config = False | |
729 | ||
f67539c2 TL |
730 | self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add) |
731 | self.log.debug('Daemons that will be removed: %s' % daemons_to_remove) | |
f91f0fd5 | 732 | |
522d829b TL |
733 | try: |
734 | # assign names | |
735 | for i in range(len(slots_to_add)): | |
736 | slot = slots_to_add[i] | |
737 | slot = slot.assign_name(self.mgr.get_unique_name( | |
738 | slot.daemon_type, | |
739 | slot.hostname, | |
740 | daemons, | |
741 | prefix=spec.service_id, | |
742 | forcename=slot.name, | |
743 | rank=slot.rank, | |
744 | rank_generation=slot.rank_generation, | |
745 | )) | |
746 | slots_to_add[i] = slot | |
747 | if rank_map is not None: | |
748 | assert slot.rank is not None | |
749 | assert slot.rank_generation is not None | |
750 | assert rank_map[slot.rank][slot.rank_generation] is None | |
751 | rank_map[slot.rank][slot.rank_generation] = slot.name | |
752 | ||
753 | if rank_map: | |
754 | # record the rank_map before we make changes so that if we fail the | |
755 | # next mgr will clean up. | |
756 | self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map) | |
757 | ||
758 | # remove daemons now, since we are going to fence them anyway | |
f67539c2 | 759 | for d in daemons_to_remove: |
522d829b | 760 | assert d.hostname is not None |
f67539c2 | 761 | self._remove_daemon(d.name(), d.hostname) |
522d829b TL |
762 | daemons_to_remove = [] |
763 | ||
764 | # fence them | |
765 | svc.fence_old_ranks(spec, rank_map, len(all_slots)) | |
766 | ||
767 | # create daemons | |
a4b75251 | 768 | daemon_place_fails = [] |
522d829b TL |
769 | for slot in slots_to_add: |
770 | # first remove daemon on conflicting port? | |
771 | if slot.ports: | |
772 | for d in daemons_to_remove: | |
773 | if d.hostname != slot.hostname: | |
774 | continue | |
775 | if not (set(d.ports or []) & set(slot.ports)): | |
776 | continue | |
777 | if d.ip and slot.ip and d.ip != slot.ip: | |
778 | continue | |
779 | self.log.info( | |
780 | f'Removing {d.name()} before deploying to {slot} to avoid a port conflict' | |
781 | ) | |
782 | # NOTE: we don't check ok-to-stop here to avoid starvation if | |
783 | # there is only 1 gateway. | |
784 | self._remove_daemon(d.name(), d.hostname) | |
785 | daemons_to_remove.remove(d) | |
786 | progress_done += 1 | |
787 | break | |
788 | ||
789 | # deploy new daemon | |
790 | daemon_id = slot.name | |
791 | if not did_config: | |
792 | svc.config(spec) | |
793 | did_config = True | |
794 | ||
795 | daemon_spec = svc.make_daemon_spec( | |
796 | slot.hostname, daemon_id, slot.network, spec, | |
797 | daemon_type=slot.daemon_type, | |
798 | ports=slot.ports, | |
799 | ip=slot.ip, | |
800 | rank=slot.rank, | |
801 | rank_generation=slot.rank_generation, | |
802 | ) | |
803 | self.log.debug('Placing %s.%s on host %s' % ( | |
804 | slot.daemon_type, daemon_id, slot.hostname)) | |
805 | ||
806 | try: | |
807 | daemon_spec = svc.prepare_create(daemon_spec) | |
808 | self._create_daemon(daemon_spec) | |
809 | r = True | |
b3b6e05e | 810 | progress_done += 1 |
522d829b TL |
811 | update_progress() |
812 | except (RuntimeError, OrchestratorError) as e: | |
813 | msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} " | |
814 | f"on {slot.hostname}: {e}") | |
815 | self.mgr.events.for_service(spec, 'ERROR', msg) | |
816 | self.mgr.log.error(msg) | |
a4b75251 | 817 | daemon_place_fails.append(msg) |
522d829b TL |
818 | # only return "no change" if no one else has already succeeded. |
819 | # later successes will also change to True | |
820 | if r is None: | |
821 | r = False | |
822 | progress_done += 1 | |
823 | update_progress() | |
824 | continue | |
f91f0fd5 | 825 | |
522d829b TL |
826 | # add to daemon list so next name(s) will also be unique |
827 | sd = orchestrator.DaemonDescription( | |
828 | hostname=slot.hostname, | |
829 | daemon_type=slot.daemon_type, | |
830 | daemon_id=daemon_id, | |
831 | ) | |
832 | daemons.append(sd) | |
833 | ||
a4b75251 TL |
834 | if daemon_place_fails: |
835 | self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(daemon_place_fails), daemon_place_fails) | |
836 | ||
522d829b TL |
837 | # remove any? |
838 | def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool: | |
839 | daemon_ids = [d.daemon_id for d in remove_daemons] | |
840 | assert None not in daemon_ids | |
841 | # setting force flag retains previous behavior | |
842 | r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True) | |
843 | return not r.retval | |
844 | ||
845 | while daemons_to_remove and not _ok_to_stop(daemons_to_remove): | |
846 | # let's find a subset that is ok-to-stop | |
847 | daemons_to_remove.pop() | |
848 | for d in daemons_to_remove: | |
f91f0fd5 | 849 | r = True |
522d829b TL |
850 | assert d.hostname is not None |
851 | self._remove_daemon(d.name(), d.hostname) | |
852 | ||
b3b6e05e TL |
853 | progress_done += 1 |
854 | update_progress() | |
f91f0fd5 | 855 | |
522d829b TL |
856 | self.mgr.remote('progress', 'complete', progress_id) |
857 | except Exception as e: | |
858 | self.mgr.remote('progress', 'fail', progress_id, str(e)) | |
859 | raise | |
b3b6e05e | 860 | |
f91f0fd5 TL |
861 | if r is None: |
862 | r = False | |
863 | return r | |
864 | ||
865 | def _check_daemons(self) -> None: | |
866 | ||
867 | daemons = self.mgr.cache.get_daemons() | |
868 | daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list) | |
869 | for dd in daemons: | |
870 | # orphan? | |
f67539c2 TL |
871 | spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None) |
872 | assert dd.hostname is not None | |
873 | assert dd.daemon_type is not None | |
874 | assert dd.daemon_id is not None | |
f91f0fd5 TL |
875 | if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']: |
876 | # (mon and mgr specs should always exist; osds aren't matched | |
877 | # to a service spec) | |
878 | self.log.info('Removing orphan daemon %s...' % dd.name()) | |
f67539c2 | 879 | self._remove_daemon(dd.name(), dd.hostname) |
f91f0fd5 TL |
880 | |
881 | # ignore unmanaged services | |
882 | if spec and spec.unmanaged: | |
883 | continue | |
884 | ||
b3b6e05e TL |
885 | # ignore daemons for deleted services |
886 | if dd.service_name() in self.mgr.spec_store.spec_deleted: | |
887 | continue | |
888 | ||
f91f0fd5 | 889 | # These daemon types require additional configs after creation |
522d829b | 890 | if dd.daemon_type in REQUIRES_POST_ACTIONS: |
f91f0fd5 TL |
891 | daemons_post[dd.daemon_type].append(dd) |
892 | ||
f67539c2 | 893 | if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon( |
f91f0fd5 TL |
894 | self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id: |
895 | dd.is_active = True | |
896 | else: | |
897 | dd.is_active = False | |
898 | ||
f67539c2 | 899 | deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id) |
f91f0fd5 TL |
900 | last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps( |
901 | dd.hostname, dd.name()) | |
902 | if last_deps is None: | |
903 | last_deps = [] | |
904 | action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) | |
905 | if not last_config: | |
906 | self.log.info('Reconfiguring %s (unknown last config time)...' % ( | |
907 | dd.name())) | |
908 | action = 'reconfig' | |
909 | elif last_deps != deps: | |
910 | self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps, | |
911 | deps)) | |
912 | self.log.info('Reconfiguring %s (dependencies changed)...' % ( | |
913 | dd.name())) | |
914 | action = 'reconfig' | |
915 | elif self.mgr.last_monmap and \ | |
916 | self.mgr.last_monmap > last_config and \ | |
917 | dd.daemon_type in CEPH_TYPES: | |
918 | self.log.info('Reconfiguring %s (monmap changed)...' % dd.name()) | |
919 | action = 'reconfig' | |
920 | elif self.mgr.extra_ceph_conf_is_newer(last_config) and \ | |
921 | dd.daemon_type in CEPH_TYPES: | |
922 | self.log.info('Reconfiguring %s (extra config changed)...' % dd.name()) | |
923 | action = 'reconfig' | |
924 | if action: | |
925 | if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \ | |
926 | and action == 'reconfig': | |
927 | action = 'redeploy' | |
928 | try: | |
f67539c2 TL |
929 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd) |
930 | self.mgr._daemon_action(daemon_spec, action=action) | |
f91f0fd5 TL |
931 | self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()) |
932 | except OrchestratorError as e: | |
933 | self.mgr.events.from_orch_error(e) | |
934 | if dd.daemon_type in daemons_post: | |
935 | del daemons_post[dd.daemon_type] | |
936 | # continue... | |
937 | except Exception as e: | |
938 | self.mgr.events.for_daemon_from_exception(dd.name(), e) | |
939 | if dd.daemon_type in daemons_post: | |
940 | del daemons_post[dd.daemon_type] | |
941 | # continue... | |
942 | ||
943 | # do daemon post actions | |
944 | for daemon_type, daemon_descs in daemons_post.items(): | |
a4b75251 TL |
945 | run_post = False |
946 | for d in daemon_descs: | |
947 | if d.name() in self.mgr.requires_post_actions: | |
948 | self.mgr.requires_post_actions.remove(d.name()) | |
949 | run_post = True | |
950 | if run_post: | |
f67539c2 TL |
951 | self.mgr._get_cephadm_service(daemon_type_to_service( |
952 | daemon_type)).daemon_check_post(daemon_descs) | |
953 | ||
954 | def _purge_deleted_services(self) -> None: | |
955 | existing_services = self.mgr.spec_store.all_specs.items() | |
956 | for service_name, spec in list(existing_services): | |
957 | if service_name not in self.mgr.spec_store.spec_deleted: | |
958 | continue | |
959 | if self.mgr.cache.get_daemons_by_service(service_name): | |
960 | continue | |
961 | if spec.service_type in ['mon', 'mgr']: | |
962 | continue | |
963 | ||
964 | logger.info(f'Purge service {service_name}') | |
965 | ||
966 | self.mgr.cephadm_services[spec.service_type].purge(service_name) | |
967 | self.mgr.spec_store.finally_rm(service_name) | |
f91f0fd5 | 968 | |
adb31ebb | 969 | def convert_tags_to_repo_digest(self) -> None: |
f91f0fd5 TL |
970 | if not self.mgr.use_repo_digest: |
971 | return | |
972 | settings = self.mgr.upgrade.get_distinct_container_image_settings() | |
973 | digests: Dict[str, ContainerInspectInfo] = {} | |
974 | for container_image_ref in set(settings.values()): | |
975 | if not is_repo_digest(container_image_ref): | |
f67539c2 TL |
976 | image_info = self._get_container_image_info(container_image_ref) |
977 | if image_info.repo_digests: | |
978 | # FIXME: we assume the first digest here is the best | |
979 | assert is_repo_digest(image_info.repo_digests[0]), image_info | |
f91f0fd5 TL |
980 | digests[container_image_ref] = image_info |
981 | ||
982 | for entity, container_image_ref in settings.items(): | |
983 | if not is_repo_digest(container_image_ref): | |
984 | image_info = digests[container_image_ref] | |
f67539c2 TL |
985 | if image_info.repo_digests: |
986 | # FIXME: we assume the first digest here is the best | |
987 | self.mgr.set_container_image(entity, image_info.repo_digests[0]) | |
988 | ||
989 | def _create_daemon(self, | |
990 | daemon_spec: CephadmDaemonDeploySpec, | |
991 | reconfig: bool = False, | |
992 | osd_uuid_map: Optional[Dict[str, Any]] = None, | |
993 | ) -> str: | |
994 | ||
995 | with set_exception_subject('service', orchestrator.DaemonDescription( | |
996 | daemon_type=daemon_spec.daemon_type, | |
997 | daemon_id=daemon_spec.daemon_id, | |
998 | hostname=daemon_spec.host, | |
999 | ).service_id(), overwrite=True): | |
1000 | ||
1001 | try: | |
1002 | image = '' | |
1003 | start_time = datetime_now() | |
1004 | ports: List[int] = daemon_spec.ports if daemon_spec.ports else [] | |
1005 | ||
1006 | if daemon_spec.daemon_type == 'container': | |
1007 | spec = cast(CustomContainerSpec, | |
1008 | self.mgr.spec_store[daemon_spec.service_name].spec) | |
1009 | image = spec.image | |
1010 | if spec.ports: | |
1011 | ports.extend(spec.ports) | |
1012 | ||
1013 | if daemon_spec.daemon_type == 'cephadm-exporter': | |
1014 | if not reconfig: | |
1015 | assert daemon_spec.host | |
1016 | self._deploy_cephadm_binary(daemon_spec.host) | |
1017 | ||
f67539c2 TL |
1018 | # TCP port to open in the host firewall |
1019 | if len(ports) > 0: | |
1020 | daemon_spec.extra_args.extend([ | |
1021 | '--tcp-ports', ' '.join(map(str, ports)) | |
1022 | ]) | |
1023 | ||
1024 | # osd deployments needs an --osd-uuid arg | |
1025 | if daemon_spec.daemon_type == 'osd': | |
1026 | if not osd_uuid_map: | |
1027 | osd_uuid_map = self.mgr.get_osd_uuid_map() | |
1028 | osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) | |
1029 | if not osd_uuid: | |
1030 | raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) | |
1031 | daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) | |
1032 | ||
1033 | if reconfig: | |
1034 | daemon_spec.extra_args.append('--reconfig') | |
1035 | if self.mgr.allow_ptrace: | |
1036 | daemon_spec.extra_args.append('--allow-ptrace') | |
1037 | ||
1038 | if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url: | |
1039 | self._registry_login(daemon_spec.host, self.mgr.registry_url, | |
1040 | self.mgr.registry_username, self.mgr.registry_password) | |
1041 | ||
1042 | self.log.info('%s daemon %s on %s' % ( | |
1043 | 'Reconfiguring' if reconfig else 'Deploying', | |
1044 | daemon_spec.name(), daemon_spec.host)) | |
1045 | ||
1046 | out, err, code = self._run_cephadm( | |
1047 | daemon_spec.host, daemon_spec.name(), 'deploy', | |
1048 | [ | |
1049 | '--name', daemon_spec.name(), | |
1050 | '--meta-json', json.dumps({ | |
1051 | 'service_name': daemon_spec.service_name, | |
1052 | 'ports': daemon_spec.ports, | |
1053 | 'ip': daemon_spec.ip, | |
1054 | 'deployed_by': self.mgr.get_active_mgr_digests(), | |
b3b6e05e TL |
1055 | 'rank': daemon_spec.rank, |
1056 | 'rank_generation': daemon_spec.rank_generation, | |
f67539c2 TL |
1057 | }), |
1058 | '--config-json', '-', | |
1059 | ] + daemon_spec.extra_args, | |
1060 | stdin=json.dumps(daemon_spec.final_config), | |
1061 | image=image) | |
1062 | ||
1063 | # refresh daemon state? (ceph daemon reconfig does not need it) | |
1064 | if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES: | |
1065 | if not code and daemon_spec.host in self.mgr.cache.daemons: | |
1066 | # prime cached service state with what we (should have) | |
1067 | # just created | |
1068 | sd = daemon_spec.to_daemon_description( | |
1069 | DaemonDescriptionStatus.running, 'starting') | |
1070 | self.mgr.cache.add_daemon(daemon_spec.host, sd) | |
522d829b | 1071 | if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS: |
a4b75251 | 1072 | self.mgr.requires_post_actions.add(daemon_spec.name()) |
f67539c2 TL |
1073 | self.mgr.cache.invalidate_host_daemons(daemon_spec.host) |
1074 | ||
1075 | self.mgr.cache.update_daemon_config_deps( | |
1076 | daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time) | |
1077 | self.mgr.cache.save_host(daemon_spec.host) | |
1078 | msg = "{} {} on host '{}'".format( | |
1079 | 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) | |
1080 | if not code: | |
1081 | self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) | |
1082 | else: | |
1083 | what = 'reconfigure' if reconfig else 'deploy' | |
1084 | self.mgr.events.for_daemon( | |
1085 | daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') | |
1086 | return msg | |
1087 | except OrchestratorError: | |
1088 | redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names() | |
1089 | if not reconfig and not redeploy: | |
1090 | # we have to clean up the daemon. E.g. keyrings. | |
1091 | servict_type = daemon_type_to_service(daemon_spec.daemon_type) | |
1092 | dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed') | |
a4b75251 | 1093 | self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True) |
f67539c2 TL |
1094 | raise |
1095 | ||
1096 | def _remove_daemon(self, name: str, host: str) -> str: | |
1097 | """ | |
1098 | Remove a daemon | |
1099 | """ | |
1100 | (daemon_type, daemon_id) = name.split('.', 1) | |
1101 | daemon = orchestrator.DaemonDescription( | |
1102 | daemon_type=daemon_type, | |
1103 | daemon_id=daemon_id, | |
1104 | hostname=host) | |
1105 | ||
1106 | with set_exception_subject('service', daemon.service_id(), overwrite=True): | |
1107 | ||
1108 | self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon) | |
1109 | ||
1110 | # NOTE: we are passing the 'force' flag here, which means | |
1111 | # we can delete a mon instances data. | |
1112 | args = ['--name', name, '--force'] | |
1113 | self.log.info('Removing daemon %s from %s' % (name, host)) | |
1114 | out, err, code = self._run_cephadm( | |
1115 | host, name, 'rm-daemon', args) | |
1116 | if not code: | |
1117 | # remove item from cache | |
1118 | self.mgr.cache.rm_daemon(host, name) | |
1119 | self.mgr.cache.invalidate_host_daemons(host) | |
1120 | ||
a4b75251 TL |
1121 | self.mgr.cephadm_services[daemon_type_to_service( |
1122 | daemon_type)].post_remove(daemon, is_failed_deploy=False) | |
f67539c2 TL |
1123 | |
1124 | return "Removed {} from host '{}'".format(name, host) | |
adb31ebb TL |
1125 | |
1126 | def _run_cephadm_json(self, | |
1127 | host: str, | |
1128 | entity: Union[CephadmNoImage, str], | |
1129 | command: str, | |
1130 | args: List[str], | |
1131 | no_fsid: Optional[bool] = False, | |
1132 | image: Optional[str] = "", | |
1133 | ) -> Any: | |
1134 | try: | |
f67539c2 | 1135 | out, err, code = self._run_cephadm( |
adb31ebb TL |
1136 | host, entity, command, args, no_fsid=no_fsid, image=image) |
1137 | if code: | |
1138 | raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}') | |
1139 | except Exception as e: | |
1140 | raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}') | |
1141 | try: | |
1142 | return json.loads(''.join(out)) | |
1143 | except (ValueError, KeyError): | |
1144 | msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON' | |
1145 | self.log.exception(f'{msg}: {"".join(out)}') | |
1146 | raise OrchestratorError(msg) | |
1147 | ||
f67539c2 TL |
1148 | def _run_cephadm(self, |
1149 | host: str, | |
1150 | entity: Union[CephadmNoImage, str], | |
1151 | command: str, | |
1152 | args: List[str], | |
1153 | addr: Optional[str] = "", | |
1154 | stdin: Optional[str] = "", | |
1155 | no_fsid: Optional[bool] = False, | |
1156 | error_ok: Optional[bool] = False, | |
1157 | image: Optional[str] = "", | |
1158 | env_vars: Optional[List[str]] = None, | |
1159 | ) -> Tuple[List[str], List[str], int]: | |
1160 | """ | |
1161 | Run cephadm on the remote host with the given command + args | |
1162 | ||
1163 | Important: You probably don't want to run _run_cephadm from CLI handlers | |
1164 | ||
1165 | :env_vars: in format -> [KEY=VALUE, ..] | |
1166 | """ | |
1167 | self.log.debug(f"_run_cephadm : command = {command}") | |
1168 | self.log.debug(f"_run_cephadm : args = {args}") | |
1169 | ||
1170 | bypass_image = ('cephadm-exporter',) | |
1171 | ||
1172 | with self._remote_connection(host, addr) as tpl: | |
1173 | conn, connr = tpl | |
1174 | assert image or entity | |
1175 | # Skip the image check for daemons deployed that are not ceph containers | |
1176 | if not str(entity).startswith(bypass_image): | |
1177 | if not image and entity is not cephadmNoImage: | |
1178 | image = self.mgr._get_container_image(entity) | |
1179 | ||
1180 | final_args = [] | |
1181 | ||
1182 | # global args | |
1183 | if env_vars: | |
1184 | for env_var_pair in env_vars: | |
1185 | final_args.extend(['--env', env_var_pair]) | |
1186 | ||
1187 | if image: | |
1188 | final_args.extend(['--image', image]) | |
1189 | ||
1190 | if not self.mgr.container_init: | |
1191 | final_args += ['--no-container-init'] | |
1192 | ||
1193 | # subcommand | |
1194 | final_args.append(command) | |
1195 | ||
1196 | # subcommand args | |
1197 | if not no_fsid: | |
1198 | final_args += ['--fsid', self.mgr._cluster_fsid] | |
1199 | ||
1200 | final_args += args | |
1201 | ||
1202 | # exec | |
1203 | self.log.debug('args: %s' % (' '.join(final_args))) | |
1204 | if self.mgr.mode == 'root': | |
1205 | if stdin: | |
1206 | self.log.debug('stdin: %s' % stdin) | |
1207 | ||
1208 | python = connr.choose_python() | |
1209 | if not python: | |
1210 | raise RuntimeError( | |
1211 | 'unable to find python on %s (tried %s in %s)' % ( | |
1212 | host, remotes.PYTHONS, remotes.PATH)) | |
1213 | try: | |
1214 | out, err, code = remoto.process.check( | |
1215 | conn, | |
1216 | [python, self.mgr.cephadm_binary_path] + final_args, | |
1217 | stdin=stdin.encode('utf-8') if stdin is not None else None) | |
1218 | if code == 2: | |
1219 | out_ls, err_ls, code_ls = remoto.process.check( | |
1220 | conn, ['ls', self.mgr.cephadm_binary_path]) | |
1221 | if code_ls == 2: | |
1222 | self._deploy_cephadm_binary_conn(conn, host) | |
1223 | out, err, code = remoto.process.check( | |
1224 | conn, | |
1225 | [python, self.mgr.cephadm_binary_path] + final_args, | |
1226 | stdin=stdin.encode('utf-8') if stdin is not None else None) | |
1227 | ||
1228 | except RuntimeError as e: | |
1229 | self.mgr._reset_con(host) | |
1230 | if error_ok: | |
1231 | return [], [str(e)], 1 | |
1232 | raise | |
1233 | ||
1234 | elif self.mgr.mode == 'cephadm-package': | |
1235 | try: | |
1236 | out, err, code = remoto.process.check( | |
1237 | conn, | |
1238 | ['sudo', '/usr/bin/cephadm'] + final_args, | |
1239 | stdin=stdin) | |
1240 | except RuntimeError as e: | |
1241 | self.mgr._reset_con(host) | |
1242 | if error_ok: | |
1243 | return [], [str(e)], 1 | |
1244 | raise | |
1245 | else: | |
1246 | assert False, 'unsupported mode' | |
1247 | ||
1248 | self.log.debug('code: %d' % code) | |
1249 | if out: | |
1250 | self.log.debug('out: %s' % '\n'.join(out)) | |
1251 | if err: | |
1252 | self.log.debug('err: %s' % '\n'.join(err)) | |
1253 | if code and not error_ok: | |
1254 | raise OrchestratorError( | |
1255 | 'cephadm exited with an error code: %d, stderr:%s' % ( | |
1256 | code, '\n'.join(err))) | |
1257 | return out, err, code | |
1258 | ||
1259 | def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo: | |
1260 | # pick a random host... | |
1261 | host = None | |
1262 | for host_name in self.mgr.inventory.keys(): | |
1263 | host = host_name | |
1264 | break | |
1265 | if not host: | |
1266 | raise OrchestratorError('no hosts defined') | |
1267 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url: | |
1268 | self._registry_login(host, self.mgr.registry_url, | |
1269 | self.mgr.registry_username, self.mgr.registry_password) | |
1270 | ||
a4b75251 TL |
1271 | pullargs: List[str] = [] |
1272 | if self.mgr.registry_insecure: | |
1273 | pullargs.append("--insecure") | |
1274 | ||
1275 | j = self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True) | |
f67539c2 TL |
1276 | |
1277 | r = ContainerInspectInfo( | |
1278 | j['image_id'], | |
1279 | j.get('ceph_version'), | |
1280 | j.get('repo_digests') | |
1281 | ) | |
1282 | self.log.debug(f'image {image_name} -> {r}') | |
1283 | return r | |
1284 | ||
1285 | # function responsible for logging single host into custom registry | |
1286 | def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]: | |
1287 | self.log.debug(f"Attempting to log host {host} into custom registry @ {url}") | |
1288 | # want to pass info over stdin rather than through normal list of args | |
1289 | args_str = json.dumps({ | |
1290 | 'url': url, | |
1291 | 'username': username, | |
1292 | 'password': password, | |
1293 | }) | |
1294 | out, err, code = self._run_cephadm( | |
1295 | host, 'mon', 'registry-login', | |
1296 | ['--registry-json', '-'], stdin=args_str, error_ok=True) | |
1297 | if code: | |
1298 | return f"Host {host} failed to login to {url} as {username} with given password" | |
1299 | return None | |
1300 | ||
1301 | def _deploy_cephadm_binary(self, host: str) -> None: | |
1302 | # Use tee (from coreutils) to create a copy of cephadm on the target machine | |
1303 | self.log.info(f"Deploying cephadm binary to {host}") | |
1304 | with self._remote_connection(host) as tpl: | |
1305 | conn, _connr = tpl | |
1306 | return self._deploy_cephadm_binary_conn(conn, host) | |
1307 | ||
1308 | def _deploy_cephadm_binary_conn(self, conn: "BaseConnection", host: str) -> None: | |
1309 | _out, _err, code = remoto.process.check( | |
1310 | conn, | |
1311 | ['mkdir', '-p', f'/var/lib/ceph/{self.mgr._cluster_fsid}']) | |
1312 | if code: | |
1313 | msg = f"Unable to deploy the cephadm binary to {host}: {_err}" | |
1314 | self.log.warning(msg) | |
1315 | raise OrchestratorError(msg) | |
1316 | _out, _err, code = remoto.process.check( | |
1317 | conn, | |
1318 | ['tee', '-', self.mgr.cephadm_binary_path], | |
1319 | stdin=self.mgr._cephadm.encode('utf-8')) | |
1320 | if code: | |
1321 | msg = f"Unable to deploy the cephadm binary to {host}: {_err}" | |
1322 | self.log.warning(msg) | |
1323 | raise OrchestratorError(msg) | |
1324 | ||
b3b6e05e TL |
1325 | def _write_remote_file(self, |
1326 | host: str, | |
1327 | path: str, | |
1328 | content: bytes, | |
1329 | mode: int, | |
1330 | uid: int, | |
1331 | gid: int) -> None: | |
1332 | with self._remote_connection(host) as tpl: | |
1333 | conn, connr = tpl | |
1334 | try: | |
1335 | errmsg = connr.write_file(path, content, mode, uid, gid) | |
1336 | if errmsg is not None: | |
1337 | raise OrchestratorError(errmsg) | |
1338 | except Exception as e: | |
1339 | msg = f"Unable to write {host}:{path}: {e}" | |
1340 | self.log.warning(msg) | |
1341 | raise OrchestratorError(msg) | |
1342 | ||
f67539c2 TL |
1343 | @contextmanager |
1344 | def _remote_connection(self, | |
1345 | host: str, | |
1346 | addr: Optional[str] = None, | |
1347 | ) -> Iterator[Tuple["BaseConnection", Any]]: | |
1348 | if not addr and host in self.mgr.inventory: | |
1349 | addr = self.mgr.inventory.get_addr(host) | |
1350 | ||
1351 | self.mgr.offline_hosts_remove(host) | |
1352 | ||
1353 | try: | |
1354 | try: | |
1355 | if not addr: | |
1356 | raise OrchestratorError("host address is empty") | |
1357 | conn, connr = self.mgr._get_connection(addr) | |
1358 | except OSError as e: | |
1359 | self.mgr._reset_con(host) | |
1360 | msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}" | |
1361 | raise execnet.gateway_bootstrap.HostNotFound(msg) | |
1362 | ||
1363 | yield (conn, connr) | |
1364 | ||
1365 | except execnet.gateway_bootstrap.HostNotFound as e: | |
1366 | # this is a misleading exception as it seems to be thrown for | |
1367 | # any sort of connection failure, even those having nothing to | |
1368 | # do with "host not found" (e.g., ssh key permission denied). | |
1369 | self.mgr.offline_hosts.add(host) | |
1370 | self.mgr._reset_con(host) | |
1371 | ||
1372 | user = self.mgr.ssh_user if self.mgr.mode == 'root' else 'cephadm' | |
1373 | if str(e).startswith("Can't communicate"): | |
1374 | msg = str(e) | |
1375 | else: | |
1376 | msg = f'''Failed to connect to {host} ({addr}). | |
1377 | Please make sure that the host is reachable and accepts connections using the cephadm SSH key | |
1378 | ||
1379 | To add the cephadm SSH key to the host: | |
1380 | > ceph cephadm get-pub-key > ~/ceph.pub | |
1381 | > ssh-copy-id -f -i ~/ceph.pub {user}@{addr} | |
1382 | ||
b3b6e05e TL |
1383 | To check that the host is reachable open a new shell with the --no-hosts flag: |
1384 | > cephadm shell --no-hosts | |
1385 | ||
1386 | Then run the following: | |
f67539c2 TL |
1387 | > ceph cephadm get-ssh-config > ssh_config |
1388 | > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key | |
1389 | > chmod 0600 ~/cephadm_private_key | |
1390 | > ssh -F ssh_config -i ~/cephadm_private_key {user}@{addr}''' | |
1391 | raise OrchestratorError(msg) from e | |
1392 | except Exception as ex: | |
1393 | self.log.exception(ex) | |
1394 | raise |