]>
Commit | Line | Data |
---|---|---|
2a845540 | 1 | import ipaddress |
b3b6e05e | 2 | import hashlib |
f91f0fd5 TL |
3 | import json |
4 | import logging | |
b3b6e05e | 5 | import uuid |
33c7a0ef | 6 | import os |
f91f0fd5 | 7 | from collections import defaultdict |
20effc67 TL |
8 | from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \ |
9 | DefaultDict | |
f91f0fd5 TL |
10 | |
11 | from ceph.deployment import inventory | |
12 | from ceph.deployment.drive_group import DriveGroupSpec | |
b3b6e05e | 13 | from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec |
20effc67 | 14 | from ceph.utils import datetime_now |
f91f0fd5 TL |
15 | |
16 | import orchestrator | |
f67539c2 TL |
17 | from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \ |
18 | DaemonDescriptionStatus, daemon_type_to_service | |
19 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec | |
f91f0fd5 | 20 | from cephadm.schedule import HostAssignment |
b3b6e05e | 21 | from cephadm.autotune import MemoryAutotuner |
f67539c2 TL |
22 | from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \ |
23 | CephadmNoImage, CEPH_TYPES, ContainerInspectInfo | |
24 | from mgr_module import MonCommandFailed | |
b3b6e05e | 25 | from mgr_util import format_bytes |
f67539c2 TL |
26 | |
27 | from . import utils | |
f91f0fd5 TL |
28 | |
29 | if TYPE_CHECKING: | |
f67539c2 | 30 | from cephadm.module import CephadmOrchestrator |
f91f0fd5 TL |
31 | |
32 | logger = logging.getLogger(__name__) | |
33 | ||
522d829b TL |
34 | REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw'] |
35 | ||
f91f0fd5 TL |
36 | |
37 | class CephadmServe: | |
38 | """ | |
39 | This module contains functions that are executed in the | |
40 | serve() thread. Thus they don't block the CLI. | |
41 | ||
f67539c2 TL |
42 | Please see the `Note regarding network calls from CLI handlers` |
43 | chapter in the cephadm developer guide. | |
44 | ||
f91f0fd5 TL |
45 | On the other hand, These function should *not* be called form |
46 | CLI handlers, to avoid blocking the CLI | |
47 | """ | |
48 | ||
49 | def __init__(self, mgr: "CephadmOrchestrator"): | |
50 | self.mgr: "CephadmOrchestrator" = mgr | |
51 | self.log = logger | |
52 | ||
53 | def serve(self) -> None: | |
54 | """ | |
55 | The main loop of cephadm. | |
56 | ||
57 | A command handler will typically change the declarative state | |
58 | of cephadm. This loop will then attempt to apply this new state. | |
59 | """ | |
60 | self.log.debug("serve starting") | |
f67539c2 TL |
61 | self.mgr.config_checker.load_network_config() |
62 | ||
f91f0fd5 | 63 | while self.mgr.run: |
20effc67 | 64 | self.log.debug("serve loop start") |
f91f0fd5 TL |
65 | |
66 | try: | |
67 | ||
68 | self.convert_tags_to_repo_digest() | |
69 | ||
70 | # refresh daemons | |
71 | self.log.debug('refreshing hosts and daemons') | |
72 | self._refresh_hosts_and_daemons() | |
73 | ||
74 | self._check_for_strays() | |
75 | ||
76 | self._update_paused_health() | |
77 | ||
522d829b TL |
78 | if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard: |
79 | self.mgr.need_connect_dashboard_rgw = False | |
80 | if 'dashboard' in self.mgr.get('mgr_map')['modules']: | |
81 | self.log.info('Checking dashboard <-> RGW credentials') | |
82 | self.mgr.remote('dashboard', 'set_rgw_credentials') | |
83 | ||
f91f0fd5 | 84 | if not self.mgr.paused: |
adb31ebb | 85 | self.mgr.to_remove_osds.process_removal_queue() |
f91f0fd5 TL |
86 | |
87 | self.mgr.migration.migrate() | |
88 | if self.mgr.migration.is_migration_ongoing(): | |
89 | continue | |
90 | ||
91 | if self._apply_all_services(): | |
92 | continue # did something, refresh | |
93 | ||
94 | self._check_daemons() | |
95 | ||
f67539c2 TL |
96 | self._purge_deleted_services() |
97 | ||
20effc67 TL |
98 | self._check_for_moved_osds() |
99 | ||
100 | if self.mgr.agent_helpers._handle_use_agent_setting(): | |
101 | continue | |
102 | ||
f91f0fd5 TL |
103 | if self.mgr.upgrade.continue_upgrade(): |
104 | continue | |
105 | ||
106 | except OrchestratorError as e: | |
107 | if e.event_subject: | |
108 | self.mgr.events.from_orch_error(e) | |
109 | ||
20effc67 | 110 | self.log.debug("serve loop sleep") |
f91f0fd5 | 111 | self._serve_sleep() |
20effc67 | 112 | self.log.debug("serve loop wake") |
f91f0fd5 TL |
113 | self.log.debug("serve exit") |
114 | ||
adb31ebb | 115 | def _serve_sleep(self) -> None: |
f67539c2 TL |
116 | sleep_interval = max( |
117 | 30, | |
118 | min( | |
119 | self.mgr.host_check_interval, | |
120 | self.mgr.facts_cache_timeout, | |
121 | self.mgr.daemon_cache_timeout, | |
122 | self.mgr.device_cache_timeout, | |
123 | ) | |
124 | ) | |
f91f0fd5 | 125 | self.log.debug('Sleeping for %d seconds', sleep_interval) |
f67539c2 | 126 | self.mgr.event.wait(sleep_interval) |
f91f0fd5 TL |
127 | self.mgr.event.clear() |
128 | ||
adb31ebb | 129 | def _update_paused_health(self) -> None: |
20effc67 | 130 | self.log.debug('_update_paused_health') |
f91f0fd5 | 131 | if self.mgr.paused: |
20effc67 TL |
132 | self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [ |
133 | "'ceph orch resume' to resume"]) | |
f91f0fd5 | 134 | else: |
a4b75251 | 135 | self.mgr.remove_health_warning('CEPHADM_PAUSED') |
f91f0fd5 | 136 | |
b3b6e05e TL |
137 | def _autotune_host_memory(self, host: str) -> None: |
138 | total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0) | |
139 | if not total_mem: | |
140 | val = None | |
141 | else: | |
142 | total_mem *= 1024 # kb -> bytes | |
143 | total_mem *= self.mgr.autotune_memory_target_ratio | |
144 | a = MemoryAutotuner( | |
145 | daemons=self.mgr.cache.get_daemons_by_host(host), | |
146 | config_get=self.mgr.get_foreign_ceph_option, | |
147 | total_mem=total_mem, | |
148 | ) | |
149 | val, osds = a.tune() | |
150 | any_changed = False | |
151 | for o in osds: | |
152 | if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val: | |
153 | self.mgr.check_mon_command({ | |
154 | 'prefix': 'config rm', | |
155 | 'who': o, | |
156 | 'name': 'osd_memory_target', | |
157 | }) | |
158 | any_changed = True | |
159 | if val is not None: | |
160 | if any_changed: | |
161 | self.mgr.log.info( | |
162 | f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}' | |
163 | ) | |
164 | ret, out, err = self.mgr.mon_command({ | |
165 | 'prefix': 'config set', | |
2a845540 | 166 | 'who': f'osd/host:{host.split(".")[0]}', |
b3b6e05e TL |
167 | 'name': 'osd_memory_target', |
168 | 'value': str(val), | |
169 | }) | |
170 | if ret: | |
171 | self.log.warning( | |
172 | f'Unable to set osd_memory_target on {host} to {val}: {err}' | |
173 | ) | |
174 | else: | |
2a845540 TL |
175 | # if osd memory autotuning is off, we don't want to remove these config |
176 | # options as users may be using them. Since there is no way to set autotuning | |
177 | # on/off at a host level, best we can do is check if it is globally on. | |
178 | if self.mgr.get_foreign_ceph_option('osd', 'osd_memory_target_autotune'): | |
179 | self.mgr.check_mon_command({ | |
180 | 'prefix': 'config rm', | |
181 | 'who': f'osd/host:{host.split(".")[0]}', | |
182 | 'name': 'osd_memory_target', | |
183 | }) | |
b3b6e05e TL |
184 | self.mgr.cache.update_autotune(host) |
185 | ||
f91f0fd5 | 186 | def _refresh_hosts_and_daemons(self) -> None: |
20effc67 | 187 | self.log.debug('_refresh_hosts_and_daemons') |
f91f0fd5 TL |
188 | bad_hosts = [] |
189 | failures = [] | |
190 | ||
b3b6e05e | 191 | if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys: |
20effc67 TL |
192 | client_files = self._calc_client_files() |
193 | else: | |
194 | client_files = {} | |
b3b6e05e | 195 | |
20effc67 | 196 | agents_down: List[str] = [] |
b3b6e05e | 197 | |
f91f0fd5 | 198 | @forall_hosts |
adb31ebb TL |
199 | def refresh(host: str) -> None: |
200 | ||
f67539c2 TL |
201 | # skip hosts that are in maintenance - they could be powered off |
202 | if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance": | |
203 | return | |
204 | ||
20effc67 TL |
205 | if self.mgr.use_agent: |
206 | if self.mgr.agent_helpers._check_agent(host): | |
207 | agents_down.append(host) | |
208 | ||
f91f0fd5 TL |
209 | if self.mgr.cache.host_needs_check(host): |
210 | r = self._check_host(host) | |
211 | if r is not None: | |
212 | bad_hosts.append(r) | |
f91f0fd5 | 213 | |
20effc67 TL |
214 | if ( |
215 | not self.mgr.use_agent | |
216 | or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()] | |
217 | or host in agents_down | |
218 | ): | |
219 | if self.mgr.cache.host_needs_daemon_refresh(host): | |
220 | self.log.debug('refreshing %s daemons' % host) | |
221 | r = self._refresh_host_daemons(host) | |
222 | if r: | |
223 | failures.append(r) | |
224 | ||
225 | if self.mgr.cache.host_needs_facts_refresh(host): | |
226 | self.log.debug(('Refreshing %s facts' % host)) | |
227 | r = self._refresh_facts(host) | |
228 | if r: | |
229 | failures.append(r) | |
230 | ||
231 | if self.mgr.cache.host_needs_network_refresh(host): | |
232 | self.log.debug(('Refreshing %s networks' % host)) | |
233 | r = self._refresh_host_networks(host) | |
234 | if r: | |
235 | failures.append(r) | |
236 | ||
237 | if self.mgr.cache.host_needs_device_refresh(host): | |
238 | self.log.debug('refreshing %s devices' % host) | |
239 | r = self._refresh_host_devices(host) | |
240 | if r: | |
241 | failures.append(r) | |
242 | self.mgr.cache.metadata_up_to_date[host] = True | |
243 | elif not self.mgr.cache.get_daemons_by_type('agent', host=host): | |
244 | if self.mgr.cache.host_needs_daemon_refresh(host): | |
245 | self.log.debug('refreshing %s daemons' % host) | |
246 | r = self._refresh_host_daemons(host) | |
247 | if r: | |
248 | failures.append(r) | |
249 | self.mgr.cache.metadata_up_to_date[host] = True | |
250 | ||
251 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'): | |
f91f0fd5 | 252 | self.log.debug(f"Logging `{host}` into custom registry") |
20effc67 TL |
253 | r = self.mgr.wait_async(self._registry_login( |
254 | host, json.loads(str(self.mgr.get_store('registry_credentials'))))) | |
f91f0fd5 TL |
255 | if r: |
256 | bad_hosts.append(r) | |
257 | ||
f91f0fd5 TL |
258 | if self.mgr.cache.host_needs_osdspec_preview_refresh(host): |
259 | self.log.debug(f"refreshing OSDSpec previews for {host}") | |
260 | r = self._refresh_host_osdspec_previews(host) | |
261 | if r: | |
262 | failures.append(r) | |
263 | ||
b3b6e05e TL |
264 | if ( |
265 | self.mgr.cache.host_needs_autotune_memory(host) | |
266 | and not self.mgr.inventory.has_label(host, '_no_autotune_memory') | |
267 | ): | |
268 | self.log.debug(f"autotuning memory for {host}") | |
269 | self._autotune_host_memory(host) | |
270 | ||
20effc67 | 271 | self._write_client_files(client_files, host) |
f91f0fd5 TL |
272 | |
273 | refresh(self.mgr.cache.get_hosts()) | |
274 | ||
20effc67 TL |
275 | self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down) |
276 | ||
f67539c2 TL |
277 | self.mgr.config_checker.run_checks() |
278 | ||
adb31ebb TL |
279 | for k in [ |
280 | 'CEPHADM_HOST_CHECK_FAILED', | |
adb31ebb TL |
281 | 'CEPHADM_REFRESH_FAILED', |
282 | ]: | |
a4b75251 | 283 | self.mgr.remove_health_warning(k) |
f91f0fd5 | 284 | if bad_hosts: |
20effc67 TL |
285 | self.mgr.set_health_warning( |
286 | 'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts) | |
f91f0fd5 | 287 | if failures: |
20effc67 TL |
288 | self.mgr.set_health_warning( |
289 | 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures) | |
290 | self.mgr.update_failed_daemon_health_check() | |
f91f0fd5 | 291 | |
adb31ebb | 292 | def _check_host(self, host: str) -> Optional[str]: |
f91f0fd5 | 293 | if host not in self.mgr.inventory: |
adb31ebb | 294 | return None |
f91f0fd5 TL |
295 | self.log.debug(' checking %s' % host) |
296 | try: | |
522d829b | 297 | addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host |
20effc67 | 298 | out, err, code = self.mgr.wait_async(self._run_cephadm( |
f91f0fd5 | 299 | host, cephadmNoImage, 'check-host', [], |
20effc67 | 300 | error_ok=True, no_fsid=True)) |
f91f0fd5 TL |
301 | self.mgr.cache.update_last_host_check(host) |
302 | self.mgr.cache.save_host(host) | |
303 | if code: | |
522d829b | 304 | self.log.debug(' host %s (%s) failed check' % (host, addr)) |
f91f0fd5 | 305 | if self.mgr.warn_on_failed_host_check: |
522d829b | 306 | return 'host %s (%s) failed check: %s' % (host, addr, err) |
f91f0fd5 | 307 | else: |
522d829b | 308 | self.log.debug(' host %s (%s) ok' % (host, addr)) |
f91f0fd5 | 309 | except Exception as e: |
522d829b TL |
310 | self.log.debug(' host %s (%s) failed check' % (host, addr)) |
311 | return 'host %s (%s) failed check: %s' % (host, addr, e) | |
adb31ebb | 312 | return None |
f91f0fd5 | 313 | |
adb31ebb | 314 | def _refresh_host_daemons(self, host: str) -> Optional[str]: |
f91f0fd5 | 315 | try: |
20effc67 | 316 | ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)) |
adb31ebb TL |
317 | except OrchestratorError as e: |
318 | return str(e) | |
20effc67 | 319 | self.mgr._process_ls_output(host, ls) |
f91f0fd5 TL |
320 | return None |
321 | ||
adb31ebb | 322 | def _refresh_facts(self, host: str) -> Optional[str]: |
f91f0fd5 | 323 | try: |
20effc67 TL |
324 | val = self.mgr.wait_async(self._run_cephadm_json( |
325 | host, cephadmNoImage, 'gather-facts', [], no_fsid=True)) | |
adb31ebb TL |
326 | except OrchestratorError as e: |
327 | return str(e) | |
328 | ||
329 | self.mgr.cache.update_host_facts(host, val) | |
330 | ||
331 | return None | |
332 | ||
333 | def _refresh_host_devices(self, host: str) -> Optional[str]: | |
20effc67 | 334 | with_lsm = self.mgr.device_enhanced_scan |
f67539c2 | 335 | inventory_args = ['--', 'inventory', |
a4b75251 | 336 | '--format=json-pretty', |
f67539c2 TL |
337 | '--filter-for-batch'] |
338 | if with_lsm: | |
339 | inventory_args.insert(-1, "--with-lsm") | |
340 | ||
f91f0fd5 | 341 | try: |
adb31ebb | 342 | try: |
20effc67 TL |
343 | devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume', |
344 | inventory_args)) | |
adb31ebb TL |
345 | except OrchestratorError as e: |
346 | if 'unrecognized arguments: --filter-for-batch' in str(e): | |
f67539c2 TL |
347 | rerun_args = inventory_args.copy() |
348 | rerun_args.remove('--filter-for-batch') | |
20effc67 TL |
349 | devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume', |
350 | rerun_args)) | |
adb31ebb TL |
351 | else: |
352 | raise | |
353 | ||
adb31ebb TL |
354 | except OrchestratorError as e: |
355 | return str(e) | |
356 | ||
20effc67 TL |
357 | self.log.debug('Refreshed host %s devices (%d)' % ( |
358 | host, len(devices))) | |
adb31ebb | 359 | ret = inventory.Devices.from_json(devices) |
20effc67 | 360 | self.mgr.cache.update_host_devices(host, ret.devices) |
f91f0fd5 TL |
361 | self.update_osdspec_previews(host) |
362 | self.mgr.cache.save_host(host) | |
363 | return None | |
364 | ||
20effc67 TL |
365 | def _refresh_host_networks(self, host: str) -> Optional[str]: |
366 | try: | |
367 | networks = self.mgr.wait_async(self._run_cephadm_json( | |
368 | host, 'mon', 'list-networks', [], no_fsid=True)) | |
369 | except OrchestratorError as e: | |
370 | return str(e) | |
371 | ||
372 | self.log.debug('Refreshed host %s networks (%s)' % ( | |
373 | host, len(networks))) | |
374 | self.mgr.cache.update_host_networks(host, networks) | |
375 | self.mgr.cache.save_host(host) | |
376 | return None | |
377 | ||
adb31ebb | 378 | def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]: |
f91f0fd5 TL |
379 | self.update_osdspec_previews(host) |
380 | self.mgr.cache.save_host(host) | |
381 | self.log.debug(f'Refreshed OSDSpec previews for host <{host}>') | |
adb31ebb | 382 | return None |
f91f0fd5 | 383 | |
adb31ebb | 384 | def update_osdspec_previews(self, search_host: str = '') -> None: |
f91f0fd5 TL |
385 | # Set global 'pending' flag for host |
386 | self.mgr.cache.loading_osdspec_preview.add(search_host) | |
387 | previews = [] | |
388 | # query OSDSpecs for host <search host> and generate/get the preview | |
389 | # There can be multiple previews for one host due to multiple OSDSpecs. | |
390 | previews.extend(self.mgr.osd_service.get_previews(search_host)) | |
f67539c2 | 391 | self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>') |
f91f0fd5 TL |
392 | self.mgr.cache.osdspec_previews[search_host] = previews |
393 | # Unset global 'pending' flag for host | |
394 | self.mgr.cache.loading_osdspec_preview.remove(search_host) | |
395 | ||
f91f0fd5 TL |
396 | def _check_for_strays(self) -> None: |
397 | self.log.debug('_check_for_strays') | |
398 | for k in ['CEPHADM_STRAY_HOST', | |
399 | 'CEPHADM_STRAY_DAEMON']: | |
a4b75251 | 400 | self.mgr.remove_health_warning(k) |
f91f0fd5 TL |
401 | if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons: |
402 | ls = self.mgr.list_servers() | |
a4b75251 | 403 | self.log.debug(ls) |
f91f0fd5 TL |
404 | managed = self.mgr.cache.get_daemon_names() |
405 | host_detail = [] # type: List[str] | |
406 | host_num_daemons = 0 | |
407 | daemon_detail = [] # type: List[str] | |
408 | for item in ls: | |
409 | host = item.get('hostname') | |
f67539c2 | 410 | assert isinstance(host, str) |
f91f0fd5 | 411 | daemons = item.get('services') # misnomer! |
f67539c2 | 412 | assert isinstance(daemons, list) |
f91f0fd5 TL |
413 | missing_names = [] |
414 | for s in daemons: | |
f67539c2 TL |
415 | daemon_id = s.get('id') |
416 | assert daemon_id | |
417 | name = '%s.%s' % (s.get('type'), daemon_id) | |
418 | if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']: | |
f91f0fd5 | 419 | metadata = self.mgr.get_metadata( |
f67539c2 TL |
420 | cast(str, s.get('type')), daemon_id, {}) |
421 | assert metadata is not None | |
422 | try: | |
423 | if s.get('type') == 'rgw-nfs': | |
424 | # https://tracker.ceph.com/issues/49573 | |
425 | name = metadata['id'][:-4] | |
426 | else: | |
427 | name = '%s.%s' % (s.get('type'), metadata['id']) | |
428 | except (KeyError, TypeError): | |
f91f0fd5 | 429 | self.log.debug( |
f67539c2 TL |
430 | "Failed to find daemon id for %s service %s" % ( |
431 | s.get('type'), s.get('id') | |
432 | ) | |
433 | ) | |
20effc67 TL |
434 | if s.get('type') == 'tcmu-runner': |
435 | # because we don't track tcmu-runner daemons in the host cache | |
436 | # and don't have a way to check if the daemon is part of iscsi service | |
437 | # we assume that all tcmu-runner daemons are managed by cephadm | |
438 | managed.append(name) | |
f91f0fd5 TL |
439 | if host not in self.mgr.inventory: |
440 | missing_names.append(name) | |
441 | host_num_daemons += 1 | |
442 | if name not in managed: | |
443 | daemon_detail.append( | |
444 | 'stray daemon %s on host %s not managed by cephadm' % (name, host)) | |
445 | if missing_names: | |
446 | host_detail.append( | |
447 | 'stray host %s has %d stray daemons: %s' % ( | |
448 | host, len(missing_names), missing_names)) | |
449 | if self.mgr.warn_on_stray_hosts and host_detail: | |
20effc67 TL |
450 | self.mgr.set_health_warning( |
451 | 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail) | |
f91f0fd5 | 452 | if self.mgr.warn_on_stray_daemons and daemon_detail: |
20effc67 TL |
453 | self.mgr.set_health_warning( |
454 | 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail) | |
455 | ||
456 | def _check_for_moved_osds(self) -> None: | |
457 | self.log.debug('_check_for_moved_osds') | |
458 | all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list) | |
459 | for dd in self.mgr.cache.get_daemons_by_type('osd'): | |
460 | assert dd.daemon_id | |
461 | all_osds[int(dd.daemon_id)].append(dd) | |
462 | for osd_id, dds in all_osds.items(): | |
463 | if len(dds) <= 1: | |
464 | continue | |
465 | running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running] | |
466 | error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error] | |
467 | msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}' | |
468 | logger.info(msg) | |
469 | if len(running) != 1: | |
470 | continue | |
471 | osd = self.mgr.get_osd_by_id(osd_id) | |
472 | if not osd or not osd['up']: | |
473 | continue | |
474 | for e in error: | |
475 | assert e.hostname | |
476 | try: | |
477 | self._remove_daemon(e.name(), e.hostname, no_post_remove=True) | |
478 | self.mgr.events.for_daemon( | |
479 | e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'") | |
480 | except OrchestratorError as ex: | |
481 | self.mgr.events.from_orch_error(ex) | |
482 | logger.exception(f'failed to remove duplicated daemon {e}') | |
f91f0fd5 TL |
483 | |
484 | def _apply_all_services(self) -> bool: | |
20effc67 | 485 | self.log.debug('_apply_all_services') |
f91f0fd5 TL |
486 | r = False |
487 | specs = [] # type: List[ServiceSpec] | |
20effc67 TL |
488 | # if metadata is not up to date, we still need to apply spec for agent |
489 | # since the agent is the one who gather the metadata. If we don't we | |
490 | # end up stuck between wanting metadata to be up to date to apply specs | |
491 | # and needing to apply the agent spec to get up to date metadata | |
492 | if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date(): | |
493 | self.log.info('Metadata not up to date on all hosts. Skipping non agent specs') | |
494 | try: | |
495 | specs.append(self.mgr.spec_store['agent'].spec) | |
496 | except Exception as e: | |
497 | self.log.debug(f'Failed to find agent spec: {e}') | |
498 | self.mgr.agent_helpers._apply_agent() | |
499 | return r | |
500 | else: | |
501 | for sn, spec in self.mgr.spec_store.active_specs.items(): | |
502 | specs.append(spec) | |
a4b75251 TL |
503 | for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']: |
504 | self.mgr.remove_health_warning(name) | |
505 | self.mgr.apply_spec_fails = [] | |
f91f0fd5 TL |
506 | for spec in specs: |
507 | try: | |
508 | if self._apply_service(spec): | |
509 | r = True | |
510 | except Exception as e: | |
a4b75251 TL |
511 | msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}' |
512 | self.log.exception(msg) | |
f91f0fd5 | 513 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) |
a4b75251 TL |
514 | self.mgr.apply_spec_fails.append((spec.service_name(), str(e))) |
515 | warnings = [] | |
516 | for x in self.mgr.apply_spec_fails: | |
517 | warnings.append(f'{x[0]}: {x[1]}') | |
518 | self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL', | |
519 | f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}", | |
520 | len(self.mgr.apply_spec_fails), | |
521 | warnings) | |
33c7a0ef | 522 | self.mgr.update_watched_hosts() |
2a845540 | 523 | self.mgr.tuned_profile_utils._write_all_tuned_profiles() |
f91f0fd5 TL |
524 | return r |
525 | ||
f67539c2 TL |
526 | def _apply_service_config(self, spec: ServiceSpec) -> None: |
527 | if spec.config: | |
528 | section = utils.name_to_config_section(spec.service_name()) | |
a4b75251 TL |
529 | for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']: |
530 | self.mgr.remove_health_warning(name) | |
531 | invalid_config_options = [] | |
532 | options_failed_to_set = [] | |
f67539c2 TL |
533 | for k, v in spec.config.items(): |
534 | try: | |
535 | current = self.mgr.get_foreign_ceph_option(section, k) | |
536 | except KeyError: | |
a4b75251 TL |
537 | msg = f'Ignoring invalid {spec.service_name()} config option {k}' |
538 | self.log.warning(msg) | |
f67539c2 TL |
539 | self.mgr.events.for_service( |
540 | spec, OrchestratorEvent.ERROR, f'Invalid config option {k}' | |
541 | ) | |
a4b75251 | 542 | invalid_config_options.append(msg) |
f67539c2 TL |
543 | continue |
544 | if current != v: | |
545 | self.log.debug(f'setting [{section}] {k} = {v}') | |
546 | try: | |
547 | self.mgr.check_mon_command({ | |
548 | 'prefix': 'config set', | |
549 | 'name': k, | |
550 | 'value': str(v), | |
551 | 'who': section, | |
552 | }) | |
553 | except MonCommandFailed as e: | |
a4b75251 TL |
554 | msg = f'Failed to set {spec.service_name()} option {k}: {e}' |
555 | self.log.warning(msg) | |
556 | options_failed_to_set.append(msg) | |
557 | ||
558 | if invalid_config_options: | |
20effc67 TL |
559 | self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len( |
560 | invalid_config_options), invalid_config_options) | |
a4b75251 | 561 | if options_failed_to_set: |
20effc67 TL |
562 | self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len( |
563 | options_failed_to_set), options_failed_to_set) | |
f91f0fd5 TL |
564 | |
565 | def _apply_service(self, spec: ServiceSpec) -> bool: | |
566 | """ | |
567 | Schedule a service. Deploy new daemons or remove old ones, depending | |
568 | on the target label and count specified in the placement. | |
569 | """ | |
570 | self.mgr.migration.verify_no_migration() | |
571 | ||
f67539c2 | 572 | service_type = spec.service_type |
f91f0fd5 TL |
573 | service_name = spec.service_name() |
574 | if spec.unmanaged: | |
575 | self.log.debug('Skipping unmanaged service %s' % service_name) | |
576 | return False | |
577 | if spec.preview_only: | |
578 | self.log.debug('Skipping preview_only service %s' % service_name) | |
579 | return False | |
580 | self.log.debug('Applying service %s spec' % service_name) | |
581 | ||
20effc67 TL |
582 | if service_type == 'agent': |
583 | try: | |
584 | assert self.mgr.cherrypy_thread | |
585 | assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert() | |
586 | except Exception: | |
587 | self.log.info( | |
588 | 'Delaying applying agent spec until cephadm endpoint root cert created') | |
589 | return False | |
590 | ||
f67539c2 | 591 | self._apply_service_config(spec) |
f91f0fd5 | 592 | |
f67539c2 | 593 | if service_type == 'osd': |
f91f0fd5 TL |
594 | self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec)) |
595 | # TODO: return True would result in a busy loop | |
596 | # can't know if daemon count changed; create_from_spec doesn't | |
597 | # return a solid indication | |
598 | return False | |
599 | ||
f67539c2 | 600 | svc = self.mgr.cephadm_services[service_type] |
f91f0fd5 TL |
601 | daemons = self.mgr.cache.get_daemons_by_service(service_name) |
602 | ||
b3b6e05e | 603 | public_networks: List[str] = [] |
f67539c2 TL |
604 | if service_type == 'mon': |
605 | out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network')) | |
f91f0fd5 | 606 | if '/' in out: |
b3b6e05e TL |
607 | public_networks = [x.strip() for x in out.split(',')] |
608 | self.log.debug('mon public_network(s) is %s' % public_networks) | |
f91f0fd5 TL |
609 | |
610 | def matches_network(host): | |
611 | # type: (str) -> bool | |
2a845540 TL |
612 | # make sure the host has at least one network that belongs to some configured public network(s) |
613 | for pn in public_networks: | |
614 | public_network = ipaddress.ip_network(pn) | |
615 | for hn in self.mgr.cache.networks[host]: | |
616 | host_network = ipaddress.ip_network(hn) | |
617 | if host_network.overlaps(public_network): | |
618 | return True | |
619 | ||
620 | host_networks = ','.join(self.mgr.cache.networks[host]) | |
621 | pub_networks = ','.join(public_networks) | |
b3b6e05e | 622 | self.log.info( |
2a845540 TL |
623 | f"Filtered out host {host}: does not belong to mon public_network(s): " |
624 | f" {pub_networks}, host network(s): {host_networks}" | |
b3b6e05e TL |
625 | ) |
626 | return False | |
f91f0fd5 | 627 | |
b3b6e05e TL |
628 | rank_map = None |
629 | if svc.ranked(): | |
630 | rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {} | |
f91f0fd5 TL |
631 | ha = HostAssignment( |
632 | spec=spec, | |
20effc67 TL |
633 | hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name( |
634 | ) == 'agent' else self.mgr.cache.get_schedulable_hosts(), | |
635 | unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), | |
2a845540 | 636 | draining_hosts=self.mgr.cache.get_draining_hosts(), |
f67539c2 TL |
637 | daemons=daemons, |
638 | networks=self.mgr.cache.networks, | |
639 | filter_new_host=( | |
640 | matches_network if service_type == 'mon' | |
641 | else None | |
642 | ), | |
643 | allow_colo=svc.allow_colo(), | |
644 | primary_daemon_type=svc.primary_daemon_type(), | |
645 | per_host_daemon_type=svc.per_host_daemon_type(), | |
b3b6e05e | 646 | rank_map=rank_map, |
f91f0fd5 TL |
647 | ) |
648 | ||
f67539c2 TL |
649 | try: |
650 | all_slots, slots_to_add, daemons_to_remove = ha.place() | |
b3b6e05e | 651 | daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get( |
522d829b | 652 | 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)] |
f67539c2 TL |
653 | self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove)) |
654 | except OrchestratorError as e: | |
a4b75251 TL |
655 | msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}' |
656 | self.log.error(msg) | |
f67539c2 | 657 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) |
a4b75251 TL |
658 | self.mgr.apply_spec_fails.append((spec.service_name(), str(e))) |
659 | warnings = [] | |
660 | for x in self.mgr.apply_spec_fails: | |
661 | warnings.append(f'{x[0]}: {x[1]}') | |
662 | self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL', | |
663 | f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}", | |
664 | len(self.mgr.apply_spec_fails), | |
665 | warnings) | |
f67539c2 | 666 | return False |
f91f0fd5 TL |
667 | |
668 | r = None | |
669 | ||
670 | # sanity check | |
f67539c2 TL |
671 | final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove) |
672 | if service_type in ['mon', 'mgr'] and final_count < 1: | |
673 | self.log.debug('cannot scale mon|mgr below 1)') | |
f91f0fd5 TL |
674 | return False |
675 | ||
b3b6e05e TL |
676 | # progress |
677 | progress_id = str(uuid.uuid4()) | |
678 | delta: List[str] = [] | |
679 | if slots_to_add: | |
680 | delta += [f'+{len(slots_to_add)}'] | |
681 | if daemons_to_remove: | |
682 | delta += [f'-{len(daemons_to_remove)}'] | |
683 | progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})' | |
684 | progress_total = len(slots_to_add) + len(daemons_to_remove) | |
685 | progress_done = 0 | |
686 | ||
687 | def update_progress() -> None: | |
688 | self.mgr.remote( | |
689 | 'progress', 'update', progress_id, | |
690 | ev_msg=progress_title, | |
691 | ev_progress=(progress_done / progress_total), | |
692 | add_to_ceph_s=True, | |
693 | ) | |
694 | ||
695 | if progress_total: | |
696 | update_progress() | |
697 | ||
f91f0fd5 TL |
698 | # add any? |
699 | did_config = False | |
700 | ||
f67539c2 TL |
701 | self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add) |
702 | self.log.debug('Daemons that will be removed: %s' % daemons_to_remove) | |
f91f0fd5 | 703 | |
20effc67 TL |
704 | hosts_altered: Set[str] = set() |
705 | ||
522d829b TL |
706 | try: |
707 | # assign names | |
708 | for i in range(len(slots_to_add)): | |
709 | slot = slots_to_add[i] | |
710 | slot = slot.assign_name(self.mgr.get_unique_name( | |
711 | slot.daemon_type, | |
712 | slot.hostname, | |
33c7a0ef | 713 | [d for d in daemons if d not in daemons_to_remove], |
522d829b TL |
714 | prefix=spec.service_id, |
715 | forcename=slot.name, | |
716 | rank=slot.rank, | |
717 | rank_generation=slot.rank_generation, | |
718 | )) | |
719 | slots_to_add[i] = slot | |
720 | if rank_map is not None: | |
721 | assert slot.rank is not None | |
722 | assert slot.rank_generation is not None | |
723 | assert rank_map[slot.rank][slot.rank_generation] is None | |
724 | rank_map[slot.rank][slot.rank_generation] = slot.name | |
725 | ||
726 | if rank_map: | |
727 | # record the rank_map before we make changes so that if we fail the | |
728 | # next mgr will clean up. | |
729 | self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map) | |
730 | ||
731 | # remove daemons now, since we are going to fence them anyway | |
f67539c2 | 732 | for d in daemons_to_remove: |
522d829b | 733 | assert d.hostname is not None |
f67539c2 | 734 | self._remove_daemon(d.name(), d.hostname) |
522d829b TL |
735 | daemons_to_remove = [] |
736 | ||
737 | # fence them | |
738 | svc.fence_old_ranks(spec, rank_map, len(all_slots)) | |
739 | ||
740 | # create daemons | |
a4b75251 | 741 | daemon_place_fails = [] |
522d829b | 742 | for slot in slots_to_add: |
33c7a0ef TL |
743 | # first remove daemon with conflicting port or name? |
744 | if slot.ports or slot.name in [d.name() for d in daemons_to_remove]: | |
522d829b | 745 | for d in daemons_to_remove: |
33c7a0ef TL |
746 | if ( |
747 | d.hostname != slot.hostname | |
748 | or not (set(d.ports or []) & set(slot.ports)) | |
749 | or (d.ip and slot.ip and d.ip != slot.ip) | |
750 | and d.name() != slot.name | |
751 | ): | |
522d829b | 752 | continue |
33c7a0ef TL |
753 | if d.name() != slot.name: |
754 | self.log.info( | |
755 | f'Removing {d.name()} before deploying to {slot} to avoid a port or conflict' | |
756 | ) | |
522d829b TL |
757 | # NOTE: we don't check ok-to-stop here to avoid starvation if |
758 | # there is only 1 gateway. | |
759 | self._remove_daemon(d.name(), d.hostname) | |
760 | daemons_to_remove.remove(d) | |
761 | progress_done += 1 | |
20effc67 | 762 | hosts_altered.add(d.hostname) |
522d829b TL |
763 | break |
764 | ||
765 | # deploy new daemon | |
766 | daemon_id = slot.name | |
767 | if not did_config: | |
768 | svc.config(spec) | |
769 | did_config = True | |
770 | ||
771 | daemon_spec = svc.make_daemon_spec( | |
772 | slot.hostname, daemon_id, slot.network, spec, | |
773 | daemon_type=slot.daemon_type, | |
774 | ports=slot.ports, | |
775 | ip=slot.ip, | |
776 | rank=slot.rank, | |
777 | rank_generation=slot.rank_generation, | |
778 | ) | |
779 | self.log.debug('Placing %s.%s on host %s' % ( | |
780 | slot.daemon_type, daemon_id, slot.hostname)) | |
781 | ||
782 | try: | |
783 | daemon_spec = svc.prepare_create(daemon_spec) | |
20effc67 | 784 | self.mgr.wait_async(self._create_daemon(daemon_spec)) |
522d829b | 785 | r = True |
b3b6e05e | 786 | progress_done += 1 |
522d829b | 787 | update_progress() |
20effc67 | 788 | hosts_altered.add(daemon_spec.host) |
522d829b TL |
789 | except (RuntimeError, OrchestratorError) as e: |
790 | msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} " | |
791 | f"on {slot.hostname}: {e}") | |
792 | self.mgr.events.for_service(spec, 'ERROR', msg) | |
793 | self.mgr.log.error(msg) | |
a4b75251 | 794 | daemon_place_fails.append(msg) |
522d829b TL |
795 | # only return "no change" if no one else has already succeeded. |
796 | # later successes will also change to True | |
797 | if r is None: | |
798 | r = False | |
799 | progress_done += 1 | |
800 | update_progress() | |
801 | continue | |
f91f0fd5 | 802 | |
522d829b TL |
803 | # add to daemon list so next name(s) will also be unique |
804 | sd = orchestrator.DaemonDescription( | |
805 | hostname=slot.hostname, | |
806 | daemon_type=slot.daemon_type, | |
807 | daemon_id=daemon_id, | |
808 | ) | |
809 | daemons.append(sd) | |
810 | ||
a4b75251 | 811 | if daemon_place_fails: |
20effc67 TL |
812 | self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len( |
813 | daemon_place_fails), daemon_place_fails) | |
a4b75251 | 814 | |
33c7a0ef TL |
815 | if service_type == 'mgr': |
816 | active_mgr = svc.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr')) | |
817 | if active_mgr.daemon_id in [d.daemon_id for d in daemons_to_remove]: | |
818 | # We can't just remove the active mgr like any other daemon. | |
819 | # Need to fail over later so it can be removed on next pass. | |
820 | # This can be accomplished by scheduling a restart of the active mgr. | |
821 | self.mgr._schedule_daemon_action(active_mgr.name(), 'restart') | |
822 | ||
522d829b TL |
823 | # remove any? |
824 | def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool: | |
825 | daemon_ids = [d.daemon_id for d in remove_daemons] | |
826 | assert None not in daemon_ids | |
827 | # setting force flag retains previous behavior | |
828 | r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True) | |
829 | return not r.retval | |
830 | ||
831 | while daemons_to_remove and not _ok_to_stop(daemons_to_remove): | |
832 | # let's find a subset that is ok-to-stop | |
833 | daemons_to_remove.pop() | |
834 | for d in daemons_to_remove: | |
f91f0fd5 | 835 | r = True |
522d829b TL |
836 | assert d.hostname is not None |
837 | self._remove_daemon(d.name(), d.hostname) | |
838 | ||
b3b6e05e TL |
839 | progress_done += 1 |
840 | update_progress() | |
20effc67 | 841 | hosts_altered.add(d.hostname) |
f91f0fd5 | 842 | |
522d829b TL |
843 | self.mgr.remote('progress', 'complete', progress_id) |
844 | except Exception as e: | |
845 | self.mgr.remote('progress', 'fail', progress_id, str(e)) | |
846 | raise | |
20effc67 TL |
847 | finally: |
848 | if self.mgr.use_agent: | |
849 | # can only send ack to agents if we know for sure port they bound to | |
850 | hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [ | |
851 | h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])]) | |
852 | self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True) | |
b3b6e05e | 853 | |
f91f0fd5 TL |
854 | if r is None: |
855 | r = False | |
856 | return r | |
857 | ||
858 | def _check_daemons(self) -> None: | |
20effc67 | 859 | self.log.debug('_check_daemons') |
f91f0fd5 TL |
860 | daemons = self.mgr.cache.get_daemons() |
861 | daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list) | |
862 | for dd in daemons: | |
863 | # orphan? | |
f67539c2 TL |
864 | spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None) |
865 | assert dd.hostname is not None | |
866 | assert dd.daemon_type is not None | |
867 | assert dd.daemon_id is not None | |
f91f0fd5 TL |
868 | if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']: |
869 | # (mon and mgr specs should always exist; osds aren't matched | |
870 | # to a service spec) | |
871 | self.log.info('Removing orphan daemon %s...' % dd.name()) | |
f67539c2 | 872 | self._remove_daemon(dd.name(), dd.hostname) |
f91f0fd5 TL |
873 | |
874 | # ignore unmanaged services | |
875 | if spec and spec.unmanaged: | |
876 | continue | |
877 | ||
b3b6e05e TL |
878 | # ignore daemons for deleted services |
879 | if dd.service_name() in self.mgr.spec_store.spec_deleted: | |
880 | continue | |
881 | ||
20effc67 TL |
882 | if dd.daemon_type == 'agent': |
883 | try: | |
884 | self.mgr.agent_helpers._check_agent(dd.hostname) | |
885 | except Exception as e: | |
886 | self.log.debug( | |
887 | f'Agent {dd.name()} could not be checked in _check_daemons: {e}') | |
888 | continue | |
889 | ||
f91f0fd5 | 890 | # These daemon types require additional configs after creation |
522d829b | 891 | if dd.daemon_type in REQUIRES_POST_ACTIONS: |
f91f0fd5 TL |
892 | daemons_post[dd.daemon_type].append(dd) |
893 | ||
f67539c2 | 894 | if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon( |
f91f0fd5 TL |
895 | self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id: |
896 | dd.is_active = True | |
897 | else: | |
898 | dd.is_active = False | |
899 | ||
f67539c2 | 900 | deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id) |
f91f0fd5 TL |
901 | last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps( |
902 | dd.hostname, dd.name()) | |
903 | if last_deps is None: | |
904 | last_deps = [] | |
905 | action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) | |
906 | if not last_config: | |
907 | self.log.info('Reconfiguring %s (unknown last config time)...' % ( | |
908 | dd.name())) | |
909 | action = 'reconfig' | |
910 | elif last_deps != deps: | |
911 | self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps, | |
912 | deps)) | |
913 | self.log.info('Reconfiguring %s (dependencies changed)...' % ( | |
914 | dd.name())) | |
915 | action = 'reconfig' | |
20effc67 TL |
916 | elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args: |
917 | self.log.debug( | |
918 | f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}') | |
919 | self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .') | |
920 | dd.extra_container_args = spec.extra_container_args | |
921 | action = 'redeploy' | |
f91f0fd5 TL |
922 | elif self.mgr.last_monmap and \ |
923 | self.mgr.last_monmap > last_config and \ | |
924 | dd.daemon_type in CEPH_TYPES: | |
925 | self.log.info('Reconfiguring %s (monmap changed)...' % dd.name()) | |
926 | action = 'reconfig' | |
927 | elif self.mgr.extra_ceph_conf_is_newer(last_config) and \ | |
928 | dd.daemon_type in CEPH_TYPES: | |
929 | self.log.info('Reconfiguring %s (extra config changed)...' % dd.name()) | |
930 | action = 'reconfig' | |
931 | if action: | |
932 | if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \ | |
933 | and action == 'reconfig': | |
934 | action = 'redeploy' | |
935 | try: | |
f67539c2 TL |
936 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd) |
937 | self.mgr._daemon_action(daemon_spec, action=action) | |
20effc67 TL |
938 | if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()): |
939 | self.mgr.cache.save_host(dd.hostname) | |
f91f0fd5 TL |
940 | except OrchestratorError as e: |
941 | self.mgr.events.from_orch_error(e) | |
942 | if dd.daemon_type in daemons_post: | |
943 | del daemons_post[dd.daemon_type] | |
944 | # continue... | |
945 | except Exception as e: | |
946 | self.mgr.events.for_daemon_from_exception(dd.name(), e) | |
947 | if dd.daemon_type in daemons_post: | |
948 | del daemons_post[dd.daemon_type] | |
949 | # continue... | |
950 | ||
951 | # do daemon post actions | |
952 | for daemon_type, daemon_descs in daemons_post.items(): | |
a4b75251 TL |
953 | run_post = False |
954 | for d in daemon_descs: | |
955 | if d.name() in self.mgr.requires_post_actions: | |
956 | self.mgr.requires_post_actions.remove(d.name()) | |
957 | run_post = True | |
958 | if run_post: | |
f67539c2 TL |
959 | self.mgr._get_cephadm_service(daemon_type_to_service( |
960 | daemon_type)).daemon_check_post(daemon_descs) | |
961 | ||
962 | def _purge_deleted_services(self) -> None: | |
20effc67 | 963 | self.log.debug('_purge_deleted_services') |
f67539c2 TL |
964 | existing_services = self.mgr.spec_store.all_specs.items() |
965 | for service_name, spec in list(existing_services): | |
966 | if service_name not in self.mgr.spec_store.spec_deleted: | |
967 | continue | |
968 | if self.mgr.cache.get_daemons_by_service(service_name): | |
969 | continue | |
970 | if spec.service_type in ['mon', 'mgr']: | |
971 | continue | |
972 | ||
973 | logger.info(f'Purge service {service_name}') | |
974 | ||
975 | self.mgr.cephadm_services[spec.service_type].purge(service_name) | |
976 | self.mgr.spec_store.finally_rm(service_name) | |
f91f0fd5 | 977 | |
adb31ebb | 978 | def convert_tags_to_repo_digest(self) -> None: |
f91f0fd5 TL |
979 | if not self.mgr.use_repo_digest: |
980 | return | |
981 | settings = self.mgr.upgrade.get_distinct_container_image_settings() | |
982 | digests: Dict[str, ContainerInspectInfo] = {} | |
983 | for container_image_ref in set(settings.values()): | |
984 | if not is_repo_digest(container_image_ref): | |
20effc67 TL |
985 | image_info = self.mgr.wait_async( |
986 | self._get_container_image_info(container_image_ref)) | |
f67539c2 TL |
987 | if image_info.repo_digests: |
988 | # FIXME: we assume the first digest here is the best | |
989 | assert is_repo_digest(image_info.repo_digests[0]), image_info | |
f91f0fd5 TL |
990 | digests[container_image_ref] = image_info |
991 | ||
992 | for entity, container_image_ref in settings.items(): | |
993 | if not is_repo_digest(container_image_ref): | |
994 | image_info = digests[container_image_ref] | |
f67539c2 TL |
995 | if image_info.repo_digests: |
996 | # FIXME: we assume the first digest here is the best | |
997 | self.mgr.set_container_image(entity, image_info.repo_digests[0]) | |
998 | ||
20effc67 TL |
999 | def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]: |
1000 | # host -> path -> (mode, uid, gid, content, digest) | |
1001 | client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {} | |
1002 | ||
1003 | # ceph.conf | |
1004 | config = self.mgr.get_minimal_ceph_conf().encode('utf-8') | |
1005 | config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest()) | |
33c7a0ef | 1006 | cluster_cfg_dir = f'/var/lib/ceph/{self.mgr._cluster_fsid}/config' |
20effc67 TL |
1007 | |
1008 | if self.mgr.manage_etc_ceph_ceph_conf: | |
1009 | try: | |
1010 | pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts) | |
1011 | ha = HostAssignment( | |
1012 | spec=ServiceSpec('mon', placement=pspec), | |
1013 | hosts=self.mgr.cache.get_schedulable_hosts(), | |
1014 | unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), | |
2a845540 | 1015 | draining_hosts=self.mgr.cache.get_draining_hosts(), |
20effc67 TL |
1016 | daemons=[], |
1017 | networks=self.mgr.cache.networks, | |
1018 | ) | |
1019 | all_slots, _, _ = ha.place() | |
1020 | for host in {s.hostname for s in all_slots}: | |
1021 | if host not in client_files: | |
1022 | client_files[host] = {} | |
33c7a0ef TL |
1023 | ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest)) |
1024 | client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf | |
1025 | client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf | |
20effc67 TL |
1026 | except Exception as e: |
1027 | self.mgr.log.warning( | |
1028 | f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}') | |
1029 | ||
1030 | # client keyrings | |
1031 | for ks in self.mgr.keys.keys.values(): | |
1032 | try: | |
1033 | ret, keyring, err = self.mgr.mon_command({ | |
1034 | 'prefix': 'auth get', | |
1035 | 'entity': ks.entity, | |
1036 | }) | |
1037 | if ret: | |
1038 | self.log.warning(f'unable to fetch keyring for {ks.entity}') | |
1039 | continue | |
1040 | digest = ''.join('%02x' % c for c in hashlib.sha256( | |
1041 | keyring.encode('utf-8')).digest()) | |
1042 | ha = HostAssignment( | |
1043 | spec=ServiceSpec('mon', placement=ks.placement), | |
1044 | hosts=self.mgr.cache.get_schedulable_hosts(), | |
1045 | unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), | |
2a845540 | 1046 | draining_hosts=self.mgr.cache.get_draining_hosts(), |
20effc67 TL |
1047 | daemons=[], |
1048 | networks=self.mgr.cache.networks, | |
1049 | ) | |
1050 | all_slots, _, _ = ha.place() | |
1051 | for host in {s.hostname for s in all_slots}: | |
1052 | if host not in client_files: | |
1053 | client_files[host] = {} | |
33c7a0ef TL |
1054 | ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest)) |
1055 | client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf | |
1056 | client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf | |
1057 | ceph_admin_key = (ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest) | |
1058 | client_files[host][ks.path] = ceph_admin_key | |
1059 | client_files[host][f'{cluster_cfg_dir}/{os.path.basename(ks.path)}'] = ceph_admin_key | |
20effc67 TL |
1060 | except Exception as e: |
1061 | self.log.warning( | |
1062 | f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}') | |
1063 | return client_files | |
1064 | ||
1065 | def _write_client_files(self, | |
1066 | client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]], | |
1067 | host: str) -> None: | |
1068 | updated_files = False | |
2a845540 TL |
1069 | if host in self.mgr.offline_hosts: |
1070 | return | |
20effc67 TL |
1071 | old_files = self.mgr.cache.get_host_client_files(host).copy() |
1072 | for path, m in client_files.get(host, {}).items(): | |
1073 | mode, uid, gid, content, digest = m | |
1074 | if path in old_files: | |
1075 | match = old_files[path] == (digest, mode, uid, gid) | |
1076 | del old_files[path] | |
1077 | if match: | |
1078 | continue | |
1079 | self.log.info(f'Updating {host}:{path}') | |
1080 | self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid) | |
1081 | self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid) | |
1082 | updated_files = True | |
1083 | for path in old_files.keys(): | |
2a845540 TL |
1084 | if path == '/etc/ceph/ceph.conf': |
1085 | continue | |
20effc67 TL |
1086 | self.log.info(f'Removing {host}:{path}') |
1087 | cmd = ['rm', '-f', path] | |
1088 | self.mgr.ssh.check_execute_command(host, cmd) | |
1089 | updated_files = True | |
1090 | self.mgr.cache.removed_client_file(host, path) | |
1091 | if updated_files: | |
1092 | self.mgr.cache.save_host(host) | |
1093 | ||
1094 | async def _create_daemon(self, | |
1095 | daemon_spec: CephadmDaemonDeploySpec, | |
1096 | reconfig: bool = False, | |
1097 | osd_uuid_map: Optional[Dict[str, Any]] = None, | |
1098 | ) -> str: | |
f67539c2 TL |
1099 | |
1100 | with set_exception_subject('service', orchestrator.DaemonDescription( | |
1101 | daemon_type=daemon_spec.daemon_type, | |
1102 | daemon_id=daemon_spec.daemon_id, | |
1103 | hostname=daemon_spec.host, | |
1104 | ).service_id(), overwrite=True): | |
1105 | ||
1106 | try: | |
1107 | image = '' | |
1108 | start_time = datetime_now() | |
1109 | ports: List[int] = daemon_spec.ports if daemon_spec.ports else [] | |
1110 | ||
1111 | if daemon_spec.daemon_type == 'container': | |
1112 | spec = cast(CustomContainerSpec, | |
1113 | self.mgr.spec_store[daemon_spec.service_name].spec) | |
1114 | image = spec.image | |
1115 | if spec.ports: | |
1116 | ports.extend(spec.ports) | |
1117 | ||
f67539c2 TL |
1118 | # TCP port to open in the host firewall |
1119 | if len(ports) > 0: | |
1120 | daemon_spec.extra_args.extend([ | |
1121 | '--tcp-ports', ' '.join(map(str, ports)) | |
1122 | ]) | |
1123 | ||
1124 | # osd deployments needs an --osd-uuid arg | |
1125 | if daemon_spec.daemon_type == 'osd': | |
1126 | if not osd_uuid_map: | |
1127 | osd_uuid_map = self.mgr.get_osd_uuid_map() | |
1128 | osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) | |
1129 | if not osd_uuid: | |
1130 | raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) | |
1131 | daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) | |
1132 | ||
1133 | if reconfig: | |
1134 | daemon_spec.extra_args.append('--reconfig') | |
1135 | if self.mgr.allow_ptrace: | |
1136 | daemon_spec.extra_args.append('--allow-ptrace') | |
1137 | ||
20effc67 TL |
1138 | try: |
1139 | eca = daemon_spec.extra_container_args | |
1140 | if eca: | |
1141 | for a in eca: | |
1142 | daemon_spec.extra_args.append(f'--extra-container-args={a}') | |
1143 | except AttributeError: | |
1144 | eca = None | |
1145 | ||
2a845540 TL |
1146 | if daemon_spec.service_name in self.mgr.spec_store: |
1147 | configs = self.mgr.spec_store[daemon_spec.service_name].spec.custom_configs | |
1148 | if configs is not None: | |
1149 | daemon_spec.final_config.update( | |
1150 | {'custom_config_files': [c.to_json() for c in configs]}) | |
1151 | ||
f67539c2 | 1152 | if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url: |
20effc67 | 1153 | await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials')))) |
f67539c2 TL |
1154 | |
1155 | self.log.info('%s daemon %s on %s' % ( | |
1156 | 'Reconfiguring' if reconfig else 'Deploying', | |
1157 | daemon_spec.name(), daemon_spec.host)) | |
1158 | ||
20effc67 | 1159 | out, err, code = await self._run_cephadm( |
f67539c2 TL |
1160 | daemon_spec.host, daemon_spec.name(), 'deploy', |
1161 | [ | |
1162 | '--name', daemon_spec.name(), | |
1163 | '--meta-json', json.dumps({ | |
1164 | 'service_name': daemon_spec.service_name, | |
1165 | 'ports': daemon_spec.ports, | |
1166 | 'ip': daemon_spec.ip, | |
1167 | 'deployed_by': self.mgr.get_active_mgr_digests(), | |
b3b6e05e TL |
1168 | 'rank': daemon_spec.rank, |
1169 | 'rank_generation': daemon_spec.rank_generation, | |
20effc67 | 1170 | 'extra_container_args': eca |
f67539c2 TL |
1171 | }), |
1172 | '--config-json', '-', | |
1173 | ] + daemon_spec.extra_args, | |
1174 | stdin=json.dumps(daemon_spec.final_config), | |
20effc67 TL |
1175 | image=image, |
1176 | ) | |
1177 | ||
1178 | if daemon_spec.daemon_type == 'agent': | |
1179 | self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now() | |
1180 | self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1 | |
f67539c2 TL |
1181 | |
1182 | # refresh daemon state? (ceph daemon reconfig does not need it) | |
1183 | if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES: | |
1184 | if not code and daemon_spec.host in self.mgr.cache.daemons: | |
1185 | # prime cached service state with what we (should have) | |
1186 | # just created | |
1187 | sd = daemon_spec.to_daemon_description( | |
20effc67 | 1188 | DaemonDescriptionStatus.starting, 'starting') |
f67539c2 | 1189 | self.mgr.cache.add_daemon(daemon_spec.host, sd) |
522d829b | 1190 | if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS: |
a4b75251 | 1191 | self.mgr.requires_post_actions.add(daemon_spec.name()) |
f67539c2 TL |
1192 | self.mgr.cache.invalidate_host_daemons(daemon_spec.host) |
1193 | ||
20effc67 TL |
1194 | if daemon_spec.daemon_type != 'agent': |
1195 | self.mgr.cache.update_daemon_config_deps( | |
1196 | daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time) | |
1197 | self.mgr.cache.save_host(daemon_spec.host) | |
1198 | else: | |
1199 | self.mgr.agent_cache.update_agent_config_deps( | |
1200 | daemon_spec.host, daemon_spec.deps, start_time) | |
1201 | self.mgr.agent_cache.save_agent(daemon_spec.host) | |
f67539c2 TL |
1202 | msg = "{} {} on host '{}'".format( |
1203 | 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) | |
1204 | if not code: | |
1205 | self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) | |
1206 | else: | |
1207 | what = 'reconfigure' if reconfig else 'deploy' | |
1208 | self.mgr.events.for_daemon( | |
1209 | daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') | |
1210 | return msg | |
1211 | except OrchestratorError: | |
1212 | redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names() | |
1213 | if not reconfig and not redeploy: | |
1214 | # we have to clean up the daemon. E.g. keyrings. | |
1215 | servict_type = daemon_type_to_service(daemon_spec.daemon_type) | |
1216 | dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed') | |
a4b75251 | 1217 | self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True) |
f67539c2 TL |
1218 | raise |
1219 | ||
20effc67 | 1220 | def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str: |
f67539c2 TL |
1221 | """ |
1222 | Remove a daemon | |
1223 | """ | |
1224 | (daemon_type, daemon_id) = name.split('.', 1) | |
1225 | daemon = orchestrator.DaemonDescription( | |
1226 | daemon_type=daemon_type, | |
1227 | daemon_id=daemon_id, | |
1228 | hostname=host) | |
1229 | ||
1230 | with set_exception_subject('service', daemon.service_id(), overwrite=True): | |
1231 | ||
1232 | self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon) | |
f67539c2 TL |
1233 | # NOTE: we are passing the 'force' flag here, which means |
1234 | # we can delete a mon instances data. | |
33c7a0ef TL |
1235 | dd = self.mgr.cache.get_daemon(daemon.daemon_name) |
1236 | if dd.ports: | |
1237 | args = ['--name', name, '--force', '--tcp-ports', ' '.join(map(str, dd.ports))] | |
1238 | else: | |
1239 | args = ['--name', name, '--force'] | |
1240 | ||
1241 | self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports)) | |
20effc67 TL |
1242 | out, err, code = self.mgr.wait_async(self._run_cephadm( |
1243 | host, name, 'rm-daemon', args)) | |
f67539c2 TL |
1244 | if not code: |
1245 | # remove item from cache | |
1246 | self.mgr.cache.rm_daemon(host, name) | |
1247 | self.mgr.cache.invalidate_host_daemons(host) | |
1248 | ||
20effc67 TL |
1249 | if not no_post_remove: |
1250 | self.mgr.cephadm_services[daemon_type_to_service( | |
1251 | daemon_type)].post_remove(daemon, is_failed_deploy=False) | |
f67539c2 TL |
1252 | |
1253 | return "Removed {} from host '{}'".format(name, host) | |
adb31ebb | 1254 | |
20effc67 TL |
1255 | async def _run_cephadm_json(self, |
1256 | host: str, | |
1257 | entity: Union[CephadmNoImage, str], | |
1258 | command: str, | |
1259 | args: List[str], | |
1260 | no_fsid: Optional[bool] = False, | |
1261 | image: Optional[str] = "", | |
1262 | ) -> Any: | |
adb31ebb | 1263 | try: |
20effc67 | 1264 | out, err, code = await self._run_cephadm( |
adb31ebb TL |
1265 | host, entity, command, args, no_fsid=no_fsid, image=image) |
1266 | if code: | |
1267 | raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}') | |
1268 | except Exception as e: | |
1269 | raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}') | |
1270 | try: | |
1271 | return json.loads(''.join(out)) | |
1272 | except (ValueError, KeyError): | |
1273 | msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON' | |
1274 | self.log.exception(f'{msg}: {"".join(out)}') | |
1275 | raise OrchestratorError(msg) | |
1276 | ||
20effc67 TL |
1277 | async def _run_cephadm(self, |
1278 | host: str, | |
1279 | entity: Union[CephadmNoImage, str], | |
1280 | command: str, | |
1281 | args: List[str], | |
1282 | addr: Optional[str] = "", | |
1283 | stdin: Optional[str] = "", | |
1284 | no_fsid: Optional[bool] = False, | |
1285 | error_ok: Optional[bool] = False, | |
1286 | image: Optional[str] = "", | |
1287 | env_vars: Optional[List[str]] = None, | |
1288 | ) -> Tuple[List[str], List[str], int]: | |
f67539c2 TL |
1289 | """ |
1290 | Run cephadm on the remote host with the given command + args | |
1291 | ||
1292 | Important: You probably don't want to run _run_cephadm from CLI handlers | |
1293 | ||
1294 | :env_vars: in format -> [KEY=VALUE, ..] | |
1295 | """ | |
20effc67 TL |
1296 | |
1297 | await self.mgr.ssh._remote_connection(host, addr) | |
1298 | ||
f67539c2 TL |
1299 | self.log.debug(f"_run_cephadm : command = {command}") |
1300 | self.log.debug(f"_run_cephadm : args = {args}") | |
1301 | ||
20effc67 | 1302 | bypass_image = ('agent') |
f67539c2 | 1303 | |
20effc67 TL |
1304 | assert image or entity |
1305 | # Skip the image check for daemons deployed that are not ceph containers | |
1306 | if not str(entity).startswith(bypass_image): | |
1307 | if not image and entity is not cephadmNoImage: | |
1308 | image = self.mgr._get_container_image(entity) | |
f67539c2 | 1309 | |
20effc67 | 1310 | final_args = [] |
f67539c2 | 1311 | |
20effc67 TL |
1312 | # global args |
1313 | if env_vars: | |
1314 | for env_var_pair in env_vars: | |
1315 | final_args.extend(['--env', env_var_pair]) | |
f67539c2 | 1316 | |
20effc67 TL |
1317 | if image: |
1318 | final_args.extend(['--image', image]) | |
f67539c2 | 1319 | |
20effc67 TL |
1320 | if not self.mgr.container_init: |
1321 | final_args += ['--no-container-init'] | |
f67539c2 | 1322 | |
20effc67 TL |
1323 | # subcommand |
1324 | final_args.append(command) | |
f67539c2 | 1325 | |
20effc67 TL |
1326 | # subcommand args |
1327 | if not no_fsid: | |
1328 | final_args += ['--fsid', self.mgr._cluster_fsid] | |
f67539c2 | 1329 | |
20effc67 | 1330 | final_args += args |
f67539c2 | 1331 | |
20effc67 TL |
1332 | # exec |
1333 | self.log.debug('args: %s' % (' '.join(final_args))) | |
1334 | if self.mgr.mode == 'root': | |
1335 | # agent has cephadm binary as an extra file which is | |
1336 | # therefore passed over stdin. Even for debug logs it's too much | |
1337 | if stdin and 'agent' not in str(entity): | |
1338 | self.log.debug('stdin: %s' % stdin) | |
f67539c2 | 1339 | |
20effc67 TL |
1340 | cmd = ['which', 'python3'] |
1341 | python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr) | |
1342 | cmd = [python, self.mgr.cephadm_binary_path] + final_args | |
f67539c2 | 1343 | |
20effc67 TL |
1344 | try: |
1345 | out, err, code = await self.mgr.ssh._execute_command( | |
1346 | host, cmd, stdin=stdin, addr=addr) | |
1347 | if code == 2: | |
1348 | ls_cmd = ['ls', self.mgr.cephadm_binary_path] | |
1349 | out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr) | |
1350 | if code_ls == 2: | |
1351 | await self._deploy_cephadm_binary(host, addr) | |
1352 | out, err, code = await self.mgr.ssh._execute_command( | |
1353 | host, cmd, stdin=stdin, addr=addr) | |
1354 | # if there is an agent on this host, make sure it is using the most recent | |
1355 | # vesion of cephadm binary | |
1356 | if host in self.mgr.inventory: | |
1357 | for agent in self.mgr.cache.get_daemons_by_type('agent', host): | |
1358 | self.mgr._schedule_daemon_action(agent.name(), 'redeploy') | |
1359 | ||
1360 | except Exception as e: | |
1361 | await self.mgr.ssh._reset_con(host) | |
1362 | if error_ok: | |
1363 | return [], [str(e)], 1 | |
1364 | raise | |
1365 | ||
1366 | elif self.mgr.mode == 'cephadm-package': | |
1367 | try: | |
1368 | cmd = ['/usr/bin/cephadm'] + final_args | |
1369 | out, err, code = await self.mgr.ssh._execute_command( | |
1370 | host, cmd, stdin=stdin, addr=addr) | |
1371 | except Exception as e: | |
1372 | await self.mgr.ssh._reset_con(host) | |
1373 | if error_ok: | |
1374 | return [], [str(e)], 1 | |
1375 | raise | |
1376 | else: | |
1377 | assert False, 'unsupported mode' | |
1378 | ||
1379 | self.log.debug(f'code: {code}') | |
1380 | if out: | |
1381 | self.log.debug(f'out: {out}') | |
1382 | if err: | |
1383 | self.log.debug(f'err: {err}') | |
1384 | if code and not error_ok: | |
1385 | raise OrchestratorError( | |
1386 | f'cephadm exited with an error code: {code}, stderr: {err}') | |
1387 | return [out], [err], code | |
1388 | ||
1389 | async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo: | |
f67539c2 TL |
1390 | # pick a random host... |
1391 | host = None | |
1392 | for host_name in self.mgr.inventory.keys(): | |
1393 | host = host_name | |
1394 | break | |
1395 | if not host: | |
1396 | raise OrchestratorError('no hosts defined') | |
1397 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url: | |
20effc67 | 1398 | await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials')))) |
f67539c2 | 1399 | |
a4b75251 TL |
1400 | pullargs: List[str] = [] |
1401 | if self.mgr.registry_insecure: | |
1402 | pullargs.append("--insecure") | |
1403 | ||
20effc67 | 1404 | j = await self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True) |
f67539c2 TL |
1405 | |
1406 | r = ContainerInspectInfo( | |
1407 | j['image_id'], | |
1408 | j.get('ceph_version'), | |
1409 | j.get('repo_digests') | |
1410 | ) | |
1411 | self.log.debug(f'image {image_name} -> {r}') | |
1412 | return r | |
1413 | ||
1414 | # function responsible for logging single host into custom registry | |
20effc67 TL |
1415 | async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]: |
1416 | self.log.debug( | |
1417 | f"Attempting to log host {host} into custom registry @ {registry_json['url']}") | |
f67539c2 | 1418 | # want to pass info over stdin rather than through normal list of args |
20effc67 | 1419 | out, err, code = await self._run_cephadm( |
f67539c2 | 1420 | host, 'mon', 'registry-login', |
20effc67 | 1421 | ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True) |
f67539c2 | 1422 | if code: |
20effc67 | 1423 | return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password" |
f67539c2 TL |
1424 | return None |
1425 | ||
20effc67 | 1426 | async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None: |
f67539c2 TL |
1427 | # Use tee (from coreutils) to create a copy of cephadm on the target machine |
1428 | self.log.info(f"Deploying cephadm binary to {host}") | |
20effc67 TL |
1429 | await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path, |
1430 | self.mgr._cephadm.encode('utf-8'), addr=addr) |