]>
Commit | Line | Data |
---|---|---|
2a845540 | 1 | import ipaddress |
b3b6e05e | 2 | import hashlib |
f91f0fd5 TL |
3 | import json |
4 | import logging | |
b3b6e05e | 5 | import uuid |
33c7a0ef | 6 | import os |
f91f0fd5 | 7 | from collections import defaultdict |
20effc67 TL |
8 | from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \ |
9 | DefaultDict | |
f91f0fd5 TL |
10 | |
11 | from ceph.deployment import inventory | |
12 | from ceph.deployment.drive_group import DriveGroupSpec | |
b3b6e05e | 13 | from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec |
20effc67 | 14 | from ceph.utils import datetime_now |
f91f0fd5 TL |
15 | |
16 | import orchestrator | |
f67539c2 TL |
17 | from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \ |
18 | DaemonDescriptionStatus, daemon_type_to_service | |
19 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec | |
f91f0fd5 | 20 | from cephadm.schedule import HostAssignment |
b3b6e05e | 21 | from cephadm.autotune import MemoryAutotuner |
f67539c2 TL |
22 | from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \ |
23 | CephadmNoImage, CEPH_TYPES, ContainerInspectInfo | |
24 | from mgr_module import MonCommandFailed | |
b3b6e05e | 25 | from mgr_util import format_bytes |
f67539c2 TL |
26 | |
27 | from . import utils | |
f91f0fd5 TL |
28 | |
29 | if TYPE_CHECKING: | |
f67539c2 | 30 | from cephadm.module import CephadmOrchestrator |
f91f0fd5 TL |
31 | |
32 | logger = logging.getLogger(__name__) | |
33 | ||
522d829b TL |
34 | REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw'] |
35 | ||
f91f0fd5 TL |
36 | |
37 | class CephadmServe: | |
38 | """ | |
39 | This module contains functions that are executed in the | |
40 | serve() thread. Thus they don't block the CLI. | |
41 | ||
f67539c2 TL |
42 | Please see the `Note regarding network calls from CLI handlers` |
43 | chapter in the cephadm developer guide. | |
44 | ||
f91f0fd5 TL |
45 | On the other hand, These function should *not* be called form |
46 | CLI handlers, to avoid blocking the CLI | |
47 | """ | |
48 | ||
49 | def __init__(self, mgr: "CephadmOrchestrator"): | |
50 | self.mgr: "CephadmOrchestrator" = mgr | |
51 | self.log = logger | |
52 | ||
53 | def serve(self) -> None: | |
54 | """ | |
55 | The main loop of cephadm. | |
56 | ||
57 | A command handler will typically change the declarative state | |
58 | of cephadm. This loop will then attempt to apply this new state. | |
59 | """ | |
60 | self.log.debug("serve starting") | |
f67539c2 TL |
61 | self.mgr.config_checker.load_network_config() |
62 | ||
f91f0fd5 | 63 | while self.mgr.run: |
20effc67 | 64 | self.log.debug("serve loop start") |
f91f0fd5 TL |
65 | |
66 | try: | |
67 | ||
68 | self.convert_tags_to_repo_digest() | |
69 | ||
70 | # refresh daemons | |
71 | self.log.debug('refreshing hosts and daemons') | |
72 | self._refresh_hosts_and_daemons() | |
73 | ||
74 | self._check_for_strays() | |
75 | ||
76 | self._update_paused_health() | |
77 | ||
522d829b TL |
78 | if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard: |
79 | self.mgr.need_connect_dashboard_rgw = False | |
80 | if 'dashboard' in self.mgr.get('mgr_map')['modules']: | |
81 | self.log.info('Checking dashboard <-> RGW credentials') | |
82 | self.mgr.remote('dashboard', 'set_rgw_credentials') | |
83 | ||
f91f0fd5 | 84 | if not self.mgr.paused: |
39ae355f TL |
85 | self._run_async_actions() |
86 | ||
adb31ebb | 87 | self.mgr.to_remove_osds.process_removal_queue() |
f91f0fd5 TL |
88 | |
89 | self.mgr.migration.migrate() | |
90 | if self.mgr.migration.is_migration_ongoing(): | |
91 | continue | |
92 | ||
93 | if self._apply_all_services(): | |
94 | continue # did something, refresh | |
95 | ||
96 | self._check_daemons() | |
97 | ||
f67539c2 TL |
98 | self._purge_deleted_services() |
99 | ||
20effc67 TL |
100 | self._check_for_moved_osds() |
101 | ||
102 | if self.mgr.agent_helpers._handle_use_agent_setting(): | |
103 | continue | |
104 | ||
f91f0fd5 TL |
105 | if self.mgr.upgrade.continue_upgrade(): |
106 | continue | |
107 | ||
108 | except OrchestratorError as e: | |
109 | if e.event_subject: | |
110 | self.mgr.events.from_orch_error(e) | |
111 | ||
20effc67 | 112 | self.log.debug("serve loop sleep") |
f91f0fd5 | 113 | self._serve_sleep() |
20effc67 | 114 | self.log.debug("serve loop wake") |
f91f0fd5 TL |
115 | self.log.debug("serve exit") |
116 | ||
adb31ebb | 117 | def _serve_sleep(self) -> None: |
f67539c2 TL |
118 | sleep_interval = max( |
119 | 30, | |
120 | min( | |
121 | self.mgr.host_check_interval, | |
122 | self.mgr.facts_cache_timeout, | |
123 | self.mgr.daemon_cache_timeout, | |
124 | self.mgr.device_cache_timeout, | |
125 | ) | |
126 | ) | |
f91f0fd5 | 127 | self.log.debug('Sleeping for %d seconds', sleep_interval) |
f67539c2 | 128 | self.mgr.event.wait(sleep_interval) |
f91f0fd5 TL |
129 | self.mgr.event.clear() |
130 | ||
adb31ebb | 131 | def _update_paused_health(self) -> None: |
20effc67 | 132 | self.log.debug('_update_paused_health') |
f91f0fd5 | 133 | if self.mgr.paused: |
20effc67 TL |
134 | self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [ |
135 | "'ceph orch resume' to resume"]) | |
f91f0fd5 | 136 | else: |
a4b75251 | 137 | self.mgr.remove_health_warning('CEPHADM_PAUSED') |
f91f0fd5 | 138 | |
b3b6e05e TL |
139 | def _autotune_host_memory(self, host: str) -> None: |
140 | total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0) | |
141 | if not total_mem: | |
142 | val = None | |
143 | else: | |
144 | total_mem *= 1024 # kb -> bytes | |
145 | total_mem *= self.mgr.autotune_memory_target_ratio | |
146 | a = MemoryAutotuner( | |
147 | daemons=self.mgr.cache.get_daemons_by_host(host), | |
148 | config_get=self.mgr.get_foreign_ceph_option, | |
149 | total_mem=total_mem, | |
150 | ) | |
151 | val, osds = a.tune() | |
152 | any_changed = False | |
153 | for o in osds: | |
154 | if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val: | |
155 | self.mgr.check_mon_command({ | |
156 | 'prefix': 'config rm', | |
157 | 'who': o, | |
158 | 'name': 'osd_memory_target', | |
159 | }) | |
160 | any_changed = True | |
161 | if val is not None: | |
162 | if any_changed: | |
163 | self.mgr.log.info( | |
164 | f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}' | |
165 | ) | |
166 | ret, out, err = self.mgr.mon_command({ | |
167 | 'prefix': 'config set', | |
2a845540 | 168 | 'who': f'osd/host:{host.split(".")[0]}', |
b3b6e05e TL |
169 | 'name': 'osd_memory_target', |
170 | 'value': str(val), | |
171 | }) | |
172 | if ret: | |
173 | self.log.warning( | |
174 | f'Unable to set osd_memory_target on {host} to {val}: {err}' | |
175 | ) | |
176 | else: | |
2a845540 TL |
177 | # if osd memory autotuning is off, we don't want to remove these config |
178 | # options as users may be using them. Since there is no way to set autotuning | |
179 | # on/off at a host level, best we can do is check if it is globally on. | |
180 | if self.mgr.get_foreign_ceph_option('osd', 'osd_memory_target_autotune'): | |
181 | self.mgr.check_mon_command({ | |
182 | 'prefix': 'config rm', | |
183 | 'who': f'osd/host:{host.split(".")[0]}', | |
184 | 'name': 'osd_memory_target', | |
185 | }) | |
b3b6e05e TL |
186 | self.mgr.cache.update_autotune(host) |
187 | ||
f91f0fd5 | 188 | def _refresh_hosts_and_daemons(self) -> None: |
20effc67 | 189 | self.log.debug('_refresh_hosts_and_daemons') |
f91f0fd5 TL |
190 | bad_hosts = [] |
191 | failures = [] | |
20effc67 | 192 | agents_down: List[str] = [] |
b3b6e05e | 193 | |
f91f0fd5 | 194 | @forall_hosts |
adb31ebb TL |
195 | def refresh(host: str) -> None: |
196 | ||
f67539c2 TL |
197 | # skip hosts that are in maintenance - they could be powered off |
198 | if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance": | |
199 | return | |
200 | ||
20effc67 TL |
201 | if self.mgr.use_agent: |
202 | if self.mgr.agent_helpers._check_agent(host): | |
203 | agents_down.append(host) | |
204 | ||
f91f0fd5 TL |
205 | if self.mgr.cache.host_needs_check(host): |
206 | r = self._check_host(host) | |
207 | if r is not None: | |
208 | bad_hosts.append(r) | |
f91f0fd5 | 209 | |
20effc67 TL |
210 | if ( |
211 | not self.mgr.use_agent | |
212 | or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()] | |
213 | or host in agents_down | |
214 | ): | |
215 | if self.mgr.cache.host_needs_daemon_refresh(host): | |
216 | self.log.debug('refreshing %s daemons' % host) | |
217 | r = self._refresh_host_daemons(host) | |
218 | if r: | |
219 | failures.append(r) | |
220 | ||
221 | if self.mgr.cache.host_needs_facts_refresh(host): | |
222 | self.log.debug(('Refreshing %s facts' % host)) | |
223 | r = self._refresh_facts(host) | |
224 | if r: | |
225 | failures.append(r) | |
226 | ||
227 | if self.mgr.cache.host_needs_network_refresh(host): | |
228 | self.log.debug(('Refreshing %s networks' % host)) | |
229 | r = self._refresh_host_networks(host) | |
230 | if r: | |
231 | failures.append(r) | |
232 | ||
233 | if self.mgr.cache.host_needs_device_refresh(host): | |
234 | self.log.debug('refreshing %s devices' % host) | |
235 | r = self._refresh_host_devices(host) | |
236 | if r: | |
237 | failures.append(r) | |
238 | self.mgr.cache.metadata_up_to_date[host] = True | |
239 | elif not self.mgr.cache.get_daemons_by_type('agent', host=host): | |
240 | if self.mgr.cache.host_needs_daemon_refresh(host): | |
241 | self.log.debug('refreshing %s daemons' % host) | |
242 | r = self._refresh_host_daemons(host) | |
243 | if r: | |
244 | failures.append(r) | |
245 | self.mgr.cache.metadata_up_to_date[host] = True | |
246 | ||
247 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'): | |
f91f0fd5 | 248 | self.log.debug(f"Logging `{host}` into custom registry") |
20effc67 TL |
249 | r = self.mgr.wait_async(self._registry_login( |
250 | host, json.loads(str(self.mgr.get_store('registry_credentials'))))) | |
f91f0fd5 TL |
251 | if r: |
252 | bad_hosts.append(r) | |
253 | ||
f91f0fd5 TL |
254 | if self.mgr.cache.host_needs_osdspec_preview_refresh(host): |
255 | self.log.debug(f"refreshing OSDSpec previews for {host}") | |
256 | r = self._refresh_host_osdspec_previews(host) | |
257 | if r: | |
258 | failures.append(r) | |
259 | ||
b3b6e05e TL |
260 | if ( |
261 | self.mgr.cache.host_needs_autotune_memory(host) | |
262 | and not self.mgr.inventory.has_label(host, '_no_autotune_memory') | |
263 | ): | |
264 | self.log.debug(f"autotuning memory for {host}") | |
265 | self._autotune_host_memory(host) | |
266 | ||
f91f0fd5 TL |
267 | refresh(self.mgr.cache.get_hosts()) |
268 | ||
39ae355f TL |
269 | self._write_all_client_files() |
270 | ||
20effc67 TL |
271 | self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down) |
272 | ||
f67539c2 TL |
273 | self.mgr.config_checker.run_checks() |
274 | ||
adb31ebb TL |
275 | for k in [ |
276 | 'CEPHADM_HOST_CHECK_FAILED', | |
adb31ebb TL |
277 | 'CEPHADM_REFRESH_FAILED', |
278 | ]: | |
a4b75251 | 279 | self.mgr.remove_health_warning(k) |
f91f0fd5 | 280 | if bad_hosts: |
20effc67 TL |
281 | self.mgr.set_health_warning( |
282 | 'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts) | |
f91f0fd5 | 283 | if failures: |
20effc67 TL |
284 | self.mgr.set_health_warning( |
285 | 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures) | |
286 | self.mgr.update_failed_daemon_health_check() | |
f91f0fd5 | 287 | |
adb31ebb | 288 | def _check_host(self, host: str) -> Optional[str]: |
f91f0fd5 | 289 | if host not in self.mgr.inventory: |
adb31ebb | 290 | return None |
f91f0fd5 TL |
291 | self.log.debug(' checking %s' % host) |
292 | try: | |
522d829b | 293 | addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host |
20effc67 | 294 | out, err, code = self.mgr.wait_async(self._run_cephadm( |
f91f0fd5 | 295 | host, cephadmNoImage, 'check-host', [], |
39ae355f | 296 | error_ok=True, no_fsid=True, log_output=self.mgr.log_refresh_metadata)) |
f91f0fd5 TL |
297 | self.mgr.cache.update_last_host_check(host) |
298 | self.mgr.cache.save_host(host) | |
299 | if code: | |
522d829b | 300 | self.log.debug(' host %s (%s) failed check' % (host, addr)) |
f91f0fd5 | 301 | if self.mgr.warn_on_failed_host_check: |
522d829b | 302 | return 'host %s (%s) failed check: %s' % (host, addr, err) |
f91f0fd5 | 303 | else: |
522d829b | 304 | self.log.debug(' host %s (%s) ok' % (host, addr)) |
f91f0fd5 | 305 | except Exception as e: |
522d829b TL |
306 | self.log.debug(' host %s (%s) failed check' % (host, addr)) |
307 | return 'host %s (%s) failed check: %s' % (host, addr, e) | |
adb31ebb | 308 | return None |
f91f0fd5 | 309 | |
adb31ebb | 310 | def _refresh_host_daemons(self, host: str) -> Optional[str]: |
f91f0fd5 | 311 | try: |
39ae355f TL |
312 | ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], |
313 | no_fsid=True, log_output=self.mgr.log_refresh_metadata)) | |
adb31ebb TL |
314 | except OrchestratorError as e: |
315 | return str(e) | |
20effc67 | 316 | self.mgr._process_ls_output(host, ls) |
f91f0fd5 TL |
317 | return None |
318 | ||
adb31ebb | 319 | def _refresh_facts(self, host: str) -> Optional[str]: |
f91f0fd5 | 320 | try: |
20effc67 | 321 | val = self.mgr.wait_async(self._run_cephadm_json( |
39ae355f | 322 | host, cephadmNoImage, 'gather-facts', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata)) |
adb31ebb TL |
323 | except OrchestratorError as e: |
324 | return str(e) | |
325 | ||
326 | self.mgr.cache.update_host_facts(host, val) | |
327 | ||
328 | return None | |
329 | ||
330 | def _refresh_host_devices(self, host: str) -> Optional[str]: | |
20effc67 | 331 | with_lsm = self.mgr.device_enhanced_scan |
f67539c2 | 332 | inventory_args = ['--', 'inventory', |
a4b75251 | 333 | '--format=json-pretty', |
f67539c2 TL |
334 | '--filter-for-batch'] |
335 | if with_lsm: | |
336 | inventory_args.insert(-1, "--with-lsm") | |
337 | ||
f91f0fd5 | 338 | try: |
adb31ebb | 339 | try: |
20effc67 | 340 | devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume', |
39ae355f | 341 | inventory_args, log_output=self.mgr.log_refresh_metadata)) |
adb31ebb TL |
342 | except OrchestratorError as e: |
343 | if 'unrecognized arguments: --filter-for-batch' in str(e): | |
f67539c2 TL |
344 | rerun_args = inventory_args.copy() |
345 | rerun_args.remove('--filter-for-batch') | |
20effc67 | 346 | devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume', |
39ae355f | 347 | rerun_args, log_output=self.mgr.log_refresh_metadata)) |
adb31ebb TL |
348 | else: |
349 | raise | |
350 | ||
adb31ebb TL |
351 | except OrchestratorError as e: |
352 | return str(e) | |
353 | ||
20effc67 TL |
354 | self.log.debug('Refreshed host %s devices (%d)' % ( |
355 | host, len(devices))) | |
adb31ebb | 356 | ret = inventory.Devices.from_json(devices) |
20effc67 | 357 | self.mgr.cache.update_host_devices(host, ret.devices) |
f91f0fd5 TL |
358 | self.update_osdspec_previews(host) |
359 | self.mgr.cache.save_host(host) | |
360 | return None | |
361 | ||
20effc67 TL |
362 | def _refresh_host_networks(self, host: str) -> Optional[str]: |
363 | try: | |
364 | networks = self.mgr.wait_async(self._run_cephadm_json( | |
39ae355f | 365 | host, 'mon', 'list-networks', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata)) |
20effc67 TL |
366 | except OrchestratorError as e: |
367 | return str(e) | |
368 | ||
369 | self.log.debug('Refreshed host %s networks (%s)' % ( | |
370 | host, len(networks))) | |
371 | self.mgr.cache.update_host_networks(host, networks) | |
372 | self.mgr.cache.save_host(host) | |
373 | return None | |
374 | ||
adb31ebb | 375 | def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]: |
f91f0fd5 TL |
376 | self.update_osdspec_previews(host) |
377 | self.mgr.cache.save_host(host) | |
378 | self.log.debug(f'Refreshed OSDSpec previews for host <{host}>') | |
adb31ebb | 379 | return None |
f91f0fd5 | 380 | |
adb31ebb | 381 | def update_osdspec_previews(self, search_host: str = '') -> None: |
f91f0fd5 TL |
382 | # Set global 'pending' flag for host |
383 | self.mgr.cache.loading_osdspec_preview.add(search_host) | |
384 | previews = [] | |
385 | # query OSDSpecs for host <search host> and generate/get the preview | |
386 | # There can be multiple previews for one host due to multiple OSDSpecs. | |
387 | previews.extend(self.mgr.osd_service.get_previews(search_host)) | |
f67539c2 | 388 | self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>') |
f91f0fd5 TL |
389 | self.mgr.cache.osdspec_previews[search_host] = previews |
390 | # Unset global 'pending' flag for host | |
391 | self.mgr.cache.loading_osdspec_preview.remove(search_host) | |
392 | ||
39ae355f TL |
393 | def _run_async_actions(self) -> None: |
394 | while self.mgr.scheduled_async_actions: | |
395 | (self.mgr.scheduled_async_actions.pop(0))() | |
396 | ||
f91f0fd5 TL |
397 | def _check_for_strays(self) -> None: |
398 | self.log.debug('_check_for_strays') | |
399 | for k in ['CEPHADM_STRAY_HOST', | |
400 | 'CEPHADM_STRAY_DAEMON']: | |
a4b75251 | 401 | self.mgr.remove_health_warning(k) |
f91f0fd5 TL |
402 | if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons: |
403 | ls = self.mgr.list_servers() | |
a4b75251 | 404 | self.log.debug(ls) |
f91f0fd5 TL |
405 | managed = self.mgr.cache.get_daemon_names() |
406 | host_detail = [] # type: List[str] | |
407 | host_num_daemons = 0 | |
408 | daemon_detail = [] # type: List[str] | |
409 | for item in ls: | |
410 | host = item.get('hostname') | |
f67539c2 | 411 | assert isinstance(host, str) |
f91f0fd5 | 412 | daemons = item.get('services') # misnomer! |
f67539c2 | 413 | assert isinstance(daemons, list) |
f91f0fd5 TL |
414 | missing_names = [] |
415 | for s in daemons: | |
f67539c2 TL |
416 | daemon_id = s.get('id') |
417 | assert daemon_id | |
418 | name = '%s.%s' % (s.get('type'), daemon_id) | |
419 | if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']: | |
f91f0fd5 | 420 | metadata = self.mgr.get_metadata( |
f67539c2 TL |
421 | cast(str, s.get('type')), daemon_id, {}) |
422 | assert metadata is not None | |
423 | try: | |
424 | if s.get('type') == 'rgw-nfs': | |
425 | # https://tracker.ceph.com/issues/49573 | |
426 | name = metadata['id'][:-4] | |
427 | else: | |
428 | name = '%s.%s' % (s.get('type'), metadata['id']) | |
429 | except (KeyError, TypeError): | |
f91f0fd5 | 430 | self.log.debug( |
f67539c2 TL |
431 | "Failed to find daemon id for %s service %s" % ( |
432 | s.get('type'), s.get('id') | |
433 | ) | |
434 | ) | |
20effc67 TL |
435 | if s.get('type') == 'tcmu-runner': |
436 | # because we don't track tcmu-runner daemons in the host cache | |
437 | # and don't have a way to check if the daemon is part of iscsi service | |
438 | # we assume that all tcmu-runner daemons are managed by cephadm | |
439 | managed.append(name) | |
f91f0fd5 TL |
440 | if host not in self.mgr.inventory: |
441 | missing_names.append(name) | |
442 | host_num_daemons += 1 | |
443 | if name not in managed: | |
444 | daemon_detail.append( | |
445 | 'stray daemon %s on host %s not managed by cephadm' % (name, host)) | |
446 | if missing_names: | |
447 | host_detail.append( | |
448 | 'stray host %s has %d stray daemons: %s' % ( | |
449 | host, len(missing_names), missing_names)) | |
450 | if self.mgr.warn_on_stray_hosts and host_detail: | |
20effc67 TL |
451 | self.mgr.set_health_warning( |
452 | 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail) | |
f91f0fd5 | 453 | if self.mgr.warn_on_stray_daemons and daemon_detail: |
20effc67 TL |
454 | self.mgr.set_health_warning( |
455 | 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail) | |
456 | ||
457 | def _check_for_moved_osds(self) -> None: | |
458 | self.log.debug('_check_for_moved_osds') | |
459 | all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list) | |
460 | for dd in self.mgr.cache.get_daemons_by_type('osd'): | |
461 | assert dd.daemon_id | |
462 | all_osds[int(dd.daemon_id)].append(dd) | |
463 | for osd_id, dds in all_osds.items(): | |
464 | if len(dds) <= 1: | |
465 | continue | |
466 | running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running] | |
467 | error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error] | |
468 | msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}' | |
469 | logger.info(msg) | |
470 | if len(running) != 1: | |
471 | continue | |
472 | osd = self.mgr.get_osd_by_id(osd_id) | |
473 | if not osd or not osd['up']: | |
474 | continue | |
475 | for e in error: | |
476 | assert e.hostname | |
477 | try: | |
478 | self._remove_daemon(e.name(), e.hostname, no_post_remove=True) | |
479 | self.mgr.events.for_daemon( | |
480 | e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'") | |
481 | except OrchestratorError as ex: | |
482 | self.mgr.events.from_orch_error(ex) | |
483 | logger.exception(f'failed to remove duplicated daemon {e}') | |
f91f0fd5 TL |
484 | |
485 | def _apply_all_services(self) -> bool: | |
20effc67 | 486 | self.log.debug('_apply_all_services') |
f91f0fd5 TL |
487 | r = False |
488 | specs = [] # type: List[ServiceSpec] | |
20effc67 TL |
489 | # if metadata is not up to date, we still need to apply spec for agent |
490 | # since the agent is the one who gather the metadata. If we don't we | |
491 | # end up stuck between wanting metadata to be up to date to apply specs | |
492 | # and needing to apply the agent spec to get up to date metadata | |
493 | if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date(): | |
494 | self.log.info('Metadata not up to date on all hosts. Skipping non agent specs') | |
495 | try: | |
496 | specs.append(self.mgr.spec_store['agent'].spec) | |
497 | except Exception as e: | |
498 | self.log.debug(f'Failed to find agent spec: {e}') | |
499 | self.mgr.agent_helpers._apply_agent() | |
500 | return r | |
501 | else: | |
502 | for sn, spec in self.mgr.spec_store.active_specs.items(): | |
503 | specs.append(spec) | |
a4b75251 TL |
504 | for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']: |
505 | self.mgr.remove_health_warning(name) | |
506 | self.mgr.apply_spec_fails = [] | |
f91f0fd5 TL |
507 | for spec in specs: |
508 | try: | |
509 | if self._apply_service(spec): | |
510 | r = True | |
511 | except Exception as e: | |
a4b75251 TL |
512 | msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}' |
513 | self.log.exception(msg) | |
f91f0fd5 | 514 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) |
a4b75251 TL |
515 | self.mgr.apply_spec_fails.append((spec.service_name(), str(e))) |
516 | warnings = [] | |
517 | for x in self.mgr.apply_spec_fails: | |
518 | warnings.append(f'{x[0]}: {x[1]}') | |
519 | self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL', | |
520 | f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}", | |
521 | len(self.mgr.apply_spec_fails), | |
522 | warnings) | |
33c7a0ef | 523 | self.mgr.update_watched_hosts() |
2a845540 | 524 | self.mgr.tuned_profile_utils._write_all_tuned_profiles() |
f91f0fd5 TL |
525 | return r |
526 | ||
f67539c2 TL |
527 | def _apply_service_config(self, spec: ServiceSpec) -> None: |
528 | if spec.config: | |
529 | section = utils.name_to_config_section(spec.service_name()) | |
a4b75251 TL |
530 | for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']: |
531 | self.mgr.remove_health_warning(name) | |
532 | invalid_config_options = [] | |
533 | options_failed_to_set = [] | |
f67539c2 TL |
534 | for k, v in spec.config.items(): |
535 | try: | |
536 | current = self.mgr.get_foreign_ceph_option(section, k) | |
537 | except KeyError: | |
a4b75251 TL |
538 | msg = f'Ignoring invalid {spec.service_name()} config option {k}' |
539 | self.log.warning(msg) | |
f67539c2 TL |
540 | self.mgr.events.for_service( |
541 | spec, OrchestratorEvent.ERROR, f'Invalid config option {k}' | |
542 | ) | |
a4b75251 | 543 | invalid_config_options.append(msg) |
f67539c2 TL |
544 | continue |
545 | if current != v: | |
546 | self.log.debug(f'setting [{section}] {k} = {v}') | |
547 | try: | |
548 | self.mgr.check_mon_command({ | |
549 | 'prefix': 'config set', | |
550 | 'name': k, | |
551 | 'value': str(v), | |
552 | 'who': section, | |
553 | }) | |
554 | except MonCommandFailed as e: | |
a4b75251 TL |
555 | msg = f'Failed to set {spec.service_name()} option {k}: {e}' |
556 | self.log.warning(msg) | |
557 | options_failed_to_set.append(msg) | |
558 | ||
559 | if invalid_config_options: | |
20effc67 TL |
560 | self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len( |
561 | invalid_config_options), invalid_config_options) | |
a4b75251 | 562 | if options_failed_to_set: |
20effc67 TL |
563 | self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len( |
564 | options_failed_to_set), options_failed_to_set) | |
f91f0fd5 TL |
565 | |
566 | def _apply_service(self, spec: ServiceSpec) -> bool: | |
567 | """ | |
568 | Schedule a service. Deploy new daemons or remove old ones, depending | |
569 | on the target label and count specified in the placement. | |
570 | """ | |
571 | self.mgr.migration.verify_no_migration() | |
572 | ||
f67539c2 | 573 | service_type = spec.service_type |
f91f0fd5 TL |
574 | service_name = spec.service_name() |
575 | if spec.unmanaged: | |
576 | self.log.debug('Skipping unmanaged service %s' % service_name) | |
577 | return False | |
578 | if spec.preview_only: | |
579 | self.log.debug('Skipping preview_only service %s' % service_name) | |
580 | return False | |
581 | self.log.debug('Applying service %s spec' % service_name) | |
582 | ||
20effc67 TL |
583 | if service_type == 'agent': |
584 | try: | |
585 | assert self.mgr.cherrypy_thread | |
586 | assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert() | |
587 | except Exception: | |
588 | self.log.info( | |
589 | 'Delaying applying agent spec until cephadm endpoint root cert created') | |
590 | return False | |
591 | ||
f67539c2 | 592 | self._apply_service_config(spec) |
f91f0fd5 | 593 | |
f67539c2 | 594 | if service_type == 'osd': |
f91f0fd5 TL |
595 | self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec)) |
596 | # TODO: return True would result in a busy loop | |
597 | # can't know if daemon count changed; create_from_spec doesn't | |
598 | # return a solid indication | |
599 | return False | |
600 | ||
f67539c2 | 601 | svc = self.mgr.cephadm_services[service_type] |
f91f0fd5 TL |
602 | daemons = self.mgr.cache.get_daemons_by_service(service_name) |
603 | ||
b3b6e05e | 604 | public_networks: List[str] = [] |
f67539c2 TL |
605 | if service_type == 'mon': |
606 | out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network')) | |
f91f0fd5 | 607 | if '/' in out: |
b3b6e05e TL |
608 | public_networks = [x.strip() for x in out.split(',')] |
609 | self.log.debug('mon public_network(s) is %s' % public_networks) | |
f91f0fd5 TL |
610 | |
611 | def matches_network(host): | |
612 | # type: (str) -> bool | |
2a845540 TL |
613 | # make sure the host has at least one network that belongs to some configured public network(s) |
614 | for pn in public_networks: | |
615 | public_network = ipaddress.ip_network(pn) | |
616 | for hn in self.mgr.cache.networks[host]: | |
617 | host_network = ipaddress.ip_network(hn) | |
618 | if host_network.overlaps(public_network): | |
619 | return True | |
620 | ||
621 | host_networks = ','.join(self.mgr.cache.networks[host]) | |
622 | pub_networks = ','.join(public_networks) | |
b3b6e05e | 623 | self.log.info( |
2a845540 TL |
624 | f"Filtered out host {host}: does not belong to mon public_network(s): " |
625 | f" {pub_networks}, host network(s): {host_networks}" | |
b3b6e05e TL |
626 | ) |
627 | return False | |
f91f0fd5 | 628 | |
b3b6e05e TL |
629 | rank_map = None |
630 | if svc.ranked(): | |
631 | rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {} | |
f91f0fd5 TL |
632 | ha = HostAssignment( |
633 | spec=spec, | |
20effc67 TL |
634 | hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name( |
635 | ) == 'agent' else self.mgr.cache.get_schedulable_hosts(), | |
636 | unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), | |
2a845540 | 637 | draining_hosts=self.mgr.cache.get_draining_hosts(), |
f67539c2 TL |
638 | daemons=daemons, |
639 | networks=self.mgr.cache.networks, | |
640 | filter_new_host=( | |
641 | matches_network if service_type == 'mon' | |
642 | else None | |
643 | ), | |
644 | allow_colo=svc.allow_colo(), | |
645 | primary_daemon_type=svc.primary_daemon_type(), | |
646 | per_host_daemon_type=svc.per_host_daemon_type(), | |
b3b6e05e | 647 | rank_map=rank_map, |
f91f0fd5 TL |
648 | ) |
649 | ||
f67539c2 TL |
650 | try: |
651 | all_slots, slots_to_add, daemons_to_remove = ha.place() | |
b3b6e05e | 652 | daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get( |
522d829b | 653 | 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)] |
f67539c2 TL |
654 | self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove)) |
655 | except OrchestratorError as e: | |
a4b75251 TL |
656 | msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}' |
657 | self.log.error(msg) | |
f67539c2 | 658 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) |
a4b75251 TL |
659 | self.mgr.apply_spec_fails.append((spec.service_name(), str(e))) |
660 | warnings = [] | |
661 | for x in self.mgr.apply_spec_fails: | |
662 | warnings.append(f'{x[0]}: {x[1]}') | |
663 | self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL', | |
664 | f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}", | |
665 | len(self.mgr.apply_spec_fails), | |
666 | warnings) | |
f67539c2 | 667 | return False |
f91f0fd5 TL |
668 | |
669 | r = None | |
670 | ||
671 | # sanity check | |
f67539c2 TL |
672 | final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove) |
673 | if service_type in ['mon', 'mgr'] and final_count < 1: | |
674 | self.log.debug('cannot scale mon|mgr below 1)') | |
f91f0fd5 TL |
675 | return False |
676 | ||
b3b6e05e TL |
677 | # progress |
678 | progress_id = str(uuid.uuid4()) | |
679 | delta: List[str] = [] | |
680 | if slots_to_add: | |
681 | delta += [f'+{len(slots_to_add)}'] | |
682 | if daemons_to_remove: | |
683 | delta += [f'-{len(daemons_to_remove)}'] | |
684 | progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})' | |
685 | progress_total = len(slots_to_add) + len(daemons_to_remove) | |
686 | progress_done = 0 | |
687 | ||
688 | def update_progress() -> None: | |
689 | self.mgr.remote( | |
690 | 'progress', 'update', progress_id, | |
691 | ev_msg=progress_title, | |
692 | ev_progress=(progress_done / progress_total), | |
693 | add_to_ceph_s=True, | |
694 | ) | |
695 | ||
696 | if progress_total: | |
697 | update_progress() | |
698 | ||
f91f0fd5 TL |
699 | # add any? |
700 | did_config = False | |
701 | ||
f67539c2 TL |
702 | self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add) |
703 | self.log.debug('Daemons that will be removed: %s' % daemons_to_remove) | |
f91f0fd5 | 704 | |
20effc67 TL |
705 | hosts_altered: Set[str] = set() |
706 | ||
522d829b TL |
707 | try: |
708 | # assign names | |
709 | for i in range(len(slots_to_add)): | |
710 | slot = slots_to_add[i] | |
711 | slot = slot.assign_name(self.mgr.get_unique_name( | |
712 | slot.daemon_type, | |
713 | slot.hostname, | |
33c7a0ef | 714 | [d for d in daemons if d not in daemons_to_remove], |
522d829b TL |
715 | prefix=spec.service_id, |
716 | forcename=slot.name, | |
717 | rank=slot.rank, | |
718 | rank_generation=slot.rank_generation, | |
719 | )) | |
720 | slots_to_add[i] = slot | |
721 | if rank_map is not None: | |
722 | assert slot.rank is not None | |
723 | assert slot.rank_generation is not None | |
724 | assert rank_map[slot.rank][slot.rank_generation] is None | |
725 | rank_map[slot.rank][slot.rank_generation] = slot.name | |
726 | ||
727 | if rank_map: | |
728 | # record the rank_map before we make changes so that if we fail the | |
729 | # next mgr will clean up. | |
730 | self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map) | |
731 | ||
732 | # remove daemons now, since we are going to fence them anyway | |
f67539c2 | 733 | for d in daemons_to_remove: |
522d829b | 734 | assert d.hostname is not None |
f67539c2 | 735 | self._remove_daemon(d.name(), d.hostname) |
522d829b TL |
736 | daemons_to_remove = [] |
737 | ||
738 | # fence them | |
739 | svc.fence_old_ranks(spec, rank_map, len(all_slots)) | |
740 | ||
741 | # create daemons | |
a4b75251 | 742 | daemon_place_fails = [] |
522d829b | 743 | for slot in slots_to_add: |
33c7a0ef TL |
744 | # first remove daemon with conflicting port or name? |
745 | if slot.ports or slot.name in [d.name() for d in daemons_to_remove]: | |
522d829b | 746 | for d in daemons_to_remove: |
33c7a0ef TL |
747 | if ( |
748 | d.hostname != slot.hostname | |
749 | or not (set(d.ports or []) & set(slot.ports)) | |
750 | or (d.ip and slot.ip and d.ip != slot.ip) | |
751 | and d.name() != slot.name | |
752 | ): | |
522d829b | 753 | continue |
33c7a0ef TL |
754 | if d.name() != slot.name: |
755 | self.log.info( | |
756 | f'Removing {d.name()} before deploying to {slot} to avoid a port or conflict' | |
757 | ) | |
522d829b TL |
758 | # NOTE: we don't check ok-to-stop here to avoid starvation if |
759 | # there is only 1 gateway. | |
760 | self._remove_daemon(d.name(), d.hostname) | |
761 | daemons_to_remove.remove(d) | |
762 | progress_done += 1 | |
20effc67 | 763 | hosts_altered.add(d.hostname) |
522d829b TL |
764 | break |
765 | ||
766 | # deploy new daemon | |
767 | daemon_id = slot.name | |
768 | if not did_config: | |
769 | svc.config(spec) | |
770 | did_config = True | |
771 | ||
772 | daemon_spec = svc.make_daemon_spec( | |
773 | slot.hostname, daemon_id, slot.network, spec, | |
774 | daemon_type=slot.daemon_type, | |
775 | ports=slot.ports, | |
776 | ip=slot.ip, | |
777 | rank=slot.rank, | |
778 | rank_generation=slot.rank_generation, | |
779 | ) | |
780 | self.log.debug('Placing %s.%s on host %s' % ( | |
781 | slot.daemon_type, daemon_id, slot.hostname)) | |
782 | ||
783 | try: | |
784 | daemon_spec = svc.prepare_create(daemon_spec) | |
20effc67 | 785 | self.mgr.wait_async(self._create_daemon(daemon_spec)) |
522d829b | 786 | r = True |
b3b6e05e | 787 | progress_done += 1 |
522d829b | 788 | update_progress() |
20effc67 | 789 | hosts_altered.add(daemon_spec.host) |
522d829b TL |
790 | except (RuntimeError, OrchestratorError) as e: |
791 | msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} " | |
792 | f"on {slot.hostname}: {e}") | |
793 | self.mgr.events.for_service(spec, 'ERROR', msg) | |
794 | self.mgr.log.error(msg) | |
a4b75251 | 795 | daemon_place_fails.append(msg) |
522d829b TL |
796 | # only return "no change" if no one else has already succeeded. |
797 | # later successes will also change to True | |
798 | if r is None: | |
799 | r = False | |
800 | progress_done += 1 | |
801 | update_progress() | |
802 | continue | |
f91f0fd5 | 803 | |
522d829b TL |
804 | # add to daemon list so next name(s) will also be unique |
805 | sd = orchestrator.DaemonDescription( | |
806 | hostname=slot.hostname, | |
807 | daemon_type=slot.daemon_type, | |
808 | daemon_id=daemon_id, | |
809 | ) | |
810 | daemons.append(sd) | |
811 | ||
a4b75251 | 812 | if daemon_place_fails: |
20effc67 TL |
813 | self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len( |
814 | daemon_place_fails), daemon_place_fails) | |
a4b75251 | 815 | |
33c7a0ef TL |
816 | if service_type == 'mgr': |
817 | active_mgr = svc.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr')) | |
818 | if active_mgr.daemon_id in [d.daemon_id for d in daemons_to_remove]: | |
819 | # We can't just remove the active mgr like any other daemon. | |
820 | # Need to fail over later so it can be removed on next pass. | |
821 | # This can be accomplished by scheduling a restart of the active mgr. | |
822 | self.mgr._schedule_daemon_action(active_mgr.name(), 'restart') | |
823 | ||
522d829b TL |
824 | # remove any? |
825 | def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool: | |
826 | daemon_ids = [d.daemon_id for d in remove_daemons] | |
827 | assert None not in daemon_ids | |
828 | # setting force flag retains previous behavior | |
829 | r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True) | |
830 | return not r.retval | |
831 | ||
832 | while daemons_to_remove and not _ok_to_stop(daemons_to_remove): | |
833 | # let's find a subset that is ok-to-stop | |
834 | daemons_to_remove.pop() | |
835 | for d in daemons_to_remove: | |
f91f0fd5 | 836 | r = True |
522d829b TL |
837 | assert d.hostname is not None |
838 | self._remove_daemon(d.name(), d.hostname) | |
839 | ||
b3b6e05e TL |
840 | progress_done += 1 |
841 | update_progress() | |
20effc67 | 842 | hosts_altered.add(d.hostname) |
f91f0fd5 | 843 | |
522d829b TL |
844 | self.mgr.remote('progress', 'complete', progress_id) |
845 | except Exception as e: | |
846 | self.mgr.remote('progress', 'fail', progress_id, str(e)) | |
847 | raise | |
20effc67 TL |
848 | finally: |
849 | if self.mgr.use_agent: | |
850 | # can only send ack to agents if we know for sure port they bound to | |
851 | hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [ | |
852 | h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])]) | |
853 | self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True) | |
b3b6e05e | 854 | |
f91f0fd5 TL |
855 | if r is None: |
856 | r = False | |
857 | return r | |
858 | ||
859 | def _check_daemons(self) -> None: | |
20effc67 | 860 | self.log.debug('_check_daemons') |
f91f0fd5 TL |
861 | daemons = self.mgr.cache.get_daemons() |
862 | daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list) | |
863 | for dd in daemons: | |
864 | # orphan? | |
f67539c2 TL |
865 | spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None) |
866 | assert dd.hostname is not None | |
867 | assert dd.daemon_type is not None | |
868 | assert dd.daemon_id is not None | |
39ae355f TL |
869 | |
870 | # any action we can try will fail for a daemon on an offline host, | |
871 | # including removing the daemon | |
872 | if dd.hostname in self.mgr.offline_hosts: | |
873 | continue | |
874 | ||
f91f0fd5 TL |
875 | if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']: |
876 | # (mon and mgr specs should always exist; osds aren't matched | |
877 | # to a service spec) | |
878 | self.log.info('Removing orphan daemon %s...' % dd.name()) | |
f67539c2 | 879 | self._remove_daemon(dd.name(), dd.hostname) |
f91f0fd5 TL |
880 | |
881 | # ignore unmanaged services | |
882 | if spec and spec.unmanaged: | |
883 | continue | |
884 | ||
b3b6e05e TL |
885 | # ignore daemons for deleted services |
886 | if dd.service_name() in self.mgr.spec_store.spec_deleted: | |
887 | continue | |
888 | ||
20effc67 TL |
889 | if dd.daemon_type == 'agent': |
890 | try: | |
891 | self.mgr.agent_helpers._check_agent(dd.hostname) | |
892 | except Exception as e: | |
893 | self.log.debug( | |
894 | f'Agent {dd.name()} could not be checked in _check_daemons: {e}') | |
895 | continue | |
896 | ||
f91f0fd5 | 897 | # These daemon types require additional configs after creation |
522d829b | 898 | if dd.daemon_type in REQUIRES_POST_ACTIONS: |
f91f0fd5 TL |
899 | daemons_post[dd.daemon_type].append(dd) |
900 | ||
f67539c2 | 901 | if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon( |
f91f0fd5 TL |
902 | self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id: |
903 | dd.is_active = True | |
904 | else: | |
905 | dd.is_active = False | |
906 | ||
f67539c2 | 907 | deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id) |
f91f0fd5 TL |
908 | last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps( |
909 | dd.hostname, dd.name()) | |
910 | if last_deps is None: | |
911 | last_deps = [] | |
912 | action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) | |
913 | if not last_config: | |
914 | self.log.info('Reconfiguring %s (unknown last config time)...' % ( | |
915 | dd.name())) | |
916 | action = 'reconfig' | |
917 | elif last_deps != deps: | |
918 | self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps, | |
919 | deps)) | |
920 | self.log.info('Reconfiguring %s (dependencies changed)...' % ( | |
921 | dd.name())) | |
922 | action = 'reconfig' | |
20effc67 TL |
923 | elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args: |
924 | self.log.debug( | |
925 | f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}') | |
926 | self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .') | |
927 | dd.extra_container_args = spec.extra_container_args | |
928 | action = 'redeploy' | |
39ae355f TL |
929 | elif spec is not None and hasattr(spec, 'extra_entrypoint_args') and dd.extra_entrypoint_args != spec.extra_entrypoint_args: |
930 | self.log.info(f'Redeploying {dd.name()}, (entrypoint args changed) . . .') | |
931 | self.log.debug( | |
932 | f'{dd.name()} daemon entrypoint args {dd.extra_entrypoint_args} -> {spec.extra_entrypoint_args}') | |
933 | dd.extra_entrypoint_args = spec.extra_entrypoint_args | |
934 | action = 'redeploy' | |
f91f0fd5 TL |
935 | elif self.mgr.last_monmap and \ |
936 | self.mgr.last_monmap > last_config and \ | |
937 | dd.daemon_type in CEPH_TYPES: | |
938 | self.log.info('Reconfiguring %s (monmap changed)...' % dd.name()) | |
939 | action = 'reconfig' | |
940 | elif self.mgr.extra_ceph_conf_is_newer(last_config) and \ | |
941 | dd.daemon_type in CEPH_TYPES: | |
942 | self.log.info('Reconfiguring %s (extra config changed)...' % dd.name()) | |
943 | action = 'reconfig' | |
944 | if action: | |
945 | if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \ | |
946 | and action == 'reconfig': | |
947 | action = 'redeploy' | |
948 | try: | |
f67539c2 TL |
949 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd) |
950 | self.mgr._daemon_action(daemon_spec, action=action) | |
20effc67 TL |
951 | if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()): |
952 | self.mgr.cache.save_host(dd.hostname) | |
f91f0fd5 | 953 | except OrchestratorError as e: |
39ae355f | 954 | self.log.exception(e) |
f91f0fd5 TL |
955 | self.mgr.events.from_orch_error(e) |
956 | if dd.daemon_type in daemons_post: | |
957 | del daemons_post[dd.daemon_type] | |
958 | # continue... | |
959 | except Exception as e: | |
39ae355f | 960 | self.log.exception(e) |
f91f0fd5 TL |
961 | self.mgr.events.for_daemon_from_exception(dd.name(), e) |
962 | if dd.daemon_type in daemons_post: | |
963 | del daemons_post[dd.daemon_type] | |
964 | # continue... | |
965 | ||
966 | # do daemon post actions | |
967 | for daemon_type, daemon_descs in daemons_post.items(): | |
a4b75251 TL |
968 | run_post = False |
969 | for d in daemon_descs: | |
970 | if d.name() in self.mgr.requires_post_actions: | |
971 | self.mgr.requires_post_actions.remove(d.name()) | |
972 | run_post = True | |
973 | if run_post: | |
f67539c2 TL |
974 | self.mgr._get_cephadm_service(daemon_type_to_service( |
975 | daemon_type)).daemon_check_post(daemon_descs) | |
976 | ||
977 | def _purge_deleted_services(self) -> None: | |
20effc67 | 978 | self.log.debug('_purge_deleted_services') |
f67539c2 TL |
979 | existing_services = self.mgr.spec_store.all_specs.items() |
980 | for service_name, spec in list(existing_services): | |
981 | if service_name not in self.mgr.spec_store.spec_deleted: | |
982 | continue | |
983 | if self.mgr.cache.get_daemons_by_service(service_name): | |
984 | continue | |
985 | if spec.service_type in ['mon', 'mgr']: | |
986 | continue | |
987 | ||
988 | logger.info(f'Purge service {service_name}') | |
989 | ||
990 | self.mgr.cephadm_services[spec.service_type].purge(service_name) | |
991 | self.mgr.spec_store.finally_rm(service_name) | |
f91f0fd5 | 992 | |
adb31ebb | 993 | def convert_tags_to_repo_digest(self) -> None: |
f91f0fd5 TL |
994 | if not self.mgr.use_repo_digest: |
995 | return | |
996 | settings = self.mgr.upgrade.get_distinct_container_image_settings() | |
997 | digests: Dict[str, ContainerInspectInfo] = {} | |
998 | for container_image_ref in set(settings.values()): | |
999 | if not is_repo_digest(container_image_ref): | |
20effc67 TL |
1000 | image_info = self.mgr.wait_async( |
1001 | self._get_container_image_info(container_image_ref)) | |
f67539c2 TL |
1002 | if image_info.repo_digests: |
1003 | # FIXME: we assume the first digest here is the best | |
1004 | assert is_repo_digest(image_info.repo_digests[0]), image_info | |
f91f0fd5 TL |
1005 | digests[container_image_ref] = image_info |
1006 | ||
1007 | for entity, container_image_ref in settings.items(): | |
1008 | if not is_repo_digest(container_image_ref): | |
1009 | image_info = digests[container_image_ref] | |
f67539c2 TL |
1010 | if image_info.repo_digests: |
1011 | # FIXME: we assume the first digest here is the best | |
1012 | self.mgr.set_container_image(entity, image_info.repo_digests[0]) | |
1013 | ||
20effc67 TL |
1014 | def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]: |
1015 | # host -> path -> (mode, uid, gid, content, digest) | |
1016 | client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {} | |
1017 | ||
1018 | # ceph.conf | |
1019 | config = self.mgr.get_minimal_ceph_conf().encode('utf-8') | |
1020 | config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest()) | |
33c7a0ef | 1021 | cluster_cfg_dir = f'/var/lib/ceph/{self.mgr._cluster_fsid}/config' |
20effc67 TL |
1022 | |
1023 | if self.mgr.manage_etc_ceph_ceph_conf: | |
1024 | try: | |
1025 | pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts) | |
1026 | ha = HostAssignment( | |
1027 | spec=ServiceSpec('mon', placement=pspec), | |
1028 | hosts=self.mgr.cache.get_schedulable_hosts(), | |
1029 | unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), | |
2a845540 | 1030 | draining_hosts=self.mgr.cache.get_draining_hosts(), |
20effc67 TL |
1031 | daemons=[], |
1032 | networks=self.mgr.cache.networks, | |
1033 | ) | |
1034 | all_slots, _, _ = ha.place() | |
1035 | for host in {s.hostname for s in all_slots}: | |
1036 | if host not in client_files: | |
1037 | client_files[host] = {} | |
33c7a0ef TL |
1038 | ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest)) |
1039 | client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf | |
1040 | client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf | |
20effc67 TL |
1041 | except Exception as e: |
1042 | self.mgr.log.warning( | |
1043 | f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}') | |
1044 | ||
1045 | # client keyrings | |
1046 | for ks in self.mgr.keys.keys.values(): | |
1047 | try: | |
1048 | ret, keyring, err = self.mgr.mon_command({ | |
1049 | 'prefix': 'auth get', | |
1050 | 'entity': ks.entity, | |
1051 | }) | |
1052 | if ret: | |
1053 | self.log.warning(f'unable to fetch keyring for {ks.entity}') | |
1054 | continue | |
1055 | digest = ''.join('%02x' % c for c in hashlib.sha256( | |
1056 | keyring.encode('utf-8')).digest()) | |
1057 | ha = HostAssignment( | |
1058 | spec=ServiceSpec('mon', placement=ks.placement), | |
1059 | hosts=self.mgr.cache.get_schedulable_hosts(), | |
1060 | unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), | |
2a845540 | 1061 | draining_hosts=self.mgr.cache.get_draining_hosts(), |
20effc67 TL |
1062 | daemons=[], |
1063 | networks=self.mgr.cache.networks, | |
1064 | ) | |
1065 | all_slots, _, _ = ha.place() | |
1066 | for host in {s.hostname for s in all_slots}: | |
1067 | if host not in client_files: | |
1068 | client_files[host] = {} | |
33c7a0ef TL |
1069 | ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest)) |
1070 | client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf | |
1071 | client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf | |
1072 | ceph_admin_key = (ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest) | |
1073 | client_files[host][ks.path] = ceph_admin_key | |
1074 | client_files[host][f'{cluster_cfg_dir}/{os.path.basename(ks.path)}'] = ceph_admin_key | |
20effc67 TL |
1075 | except Exception as e: |
1076 | self.log.warning( | |
1077 | f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}') | |
1078 | return client_files | |
1079 | ||
39ae355f TL |
1080 | def _write_all_client_files(self) -> None: |
1081 | if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys: | |
1082 | client_files = self._calc_client_files() | |
1083 | else: | |
1084 | client_files = {} | |
1085 | ||
1086 | @forall_hosts | |
1087 | def _write_files(host: str) -> None: | |
1088 | self._write_client_files(client_files, host) | |
1089 | ||
1090 | _write_files(self.mgr.cache.get_hosts()) | |
1091 | ||
20effc67 TL |
1092 | def _write_client_files(self, |
1093 | client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]], | |
1094 | host: str) -> None: | |
1095 | updated_files = False | |
2a845540 TL |
1096 | if host in self.mgr.offline_hosts: |
1097 | return | |
20effc67 TL |
1098 | old_files = self.mgr.cache.get_host_client_files(host).copy() |
1099 | for path, m in client_files.get(host, {}).items(): | |
1100 | mode, uid, gid, content, digest = m | |
1101 | if path in old_files: | |
1102 | match = old_files[path] == (digest, mode, uid, gid) | |
1103 | del old_files[path] | |
1104 | if match: | |
1105 | continue | |
1106 | self.log.info(f'Updating {host}:{path}') | |
1107 | self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid) | |
1108 | self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid) | |
1109 | updated_files = True | |
1110 | for path in old_files.keys(): | |
2a845540 TL |
1111 | if path == '/etc/ceph/ceph.conf': |
1112 | continue | |
20effc67 TL |
1113 | self.log.info(f'Removing {host}:{path}') |
1114 | cmd = ['rm', '-f', path] | |
1115 | self.mgr.ssh.check_execute_command(host, cmd) | |
1116 | updated_files = True | |
1117 | self.mgr.cache.removed_client_file(host, path) | |
1118 | if updated_files: | |
1119 | self.mgr.cache.save_host(host) | |
1120 | ||
1121 | async def _create_daemon(self, | |
1122 | daemon_spec: CephadmDaemonDeploySpec, | |
1123 | reconfig: bool = False, | |
1124 | osd_uuid_map: Optional[Dict[str, Any]] = None, | |
1125 | ) -> str: | |
f67539c2 TL |
1126 | |
1127 | with set_exception_subject('service', orchestrator.DaemonDescription( | |
1128 | daemon_type=daemon_spec.daemon_type, | |
1129 | daemon_id=daemon_spec.daemon_id, | |
1130 | hostname=daemon_spec.host, | |
1131 | ).service_id(), overwrite=True): | |
1132 | ||
1133 | try: | |
1134 | image = '' | |
1135 | start_time = datetime_now() | |
1136 | ports: List[int] = daemon_spec.ports if daemon_spec.ports else [] | |
1137 | ||
1138 | if daemon_spec.daemon_type == 'container': | |
1139 | spec = cast(CustomContainerSpec, | |
1140 | self.mgr.spec_store[daemon_spec.service_name].spec) | |
1141 | image = spec.image | |
1142 | if spec.ports: | |
1143 | ports.extend(spec.ports) | |
1144 | ||
f67539c2 TL |
1145 | # TCP port to open in the host firewall |
1146 | if len(ports) > 0: | |
1147 | daemon_spec.extra_args.extend([ | |
1148 | '--tcp-ports', ' '.join(map(str, ports)) | |
1149 | ]) | |
1150 | ||
1151 | # osd deployments needs an --osd-uuid arg | |
1152 | if daemon_spec.daemon_type == 'osd': | |
1153 | if not osd_uuid_map: | |
1154 | osd_uuid_map = self.mgr.get_osd_uuid_map() | |
1155 | osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) | |
1156 | if not osd_uuid: | |
1157 | raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) | |
1158 | daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) | |
1159 | ||
1160 | if reconfig: | |
1161 | daemon_spec.extra_args.append('--reconfig') | |
1162 | if self.mgr.allow_ptrace: | |
1163 | daemon_spec.extra_args.append('--allow-ptrace') | |
1164 | ||
39ae355f | 1165 | daemon_spec, extra_container_args, extra_entrypoint_args = self._setup_extra_deployment_args(daemon_spec) |
20effc67 | 1166 | |
2a845540 TL |
1167 | if daemon_spec.service_name in self.mgr.spec_store: |
1168 | configs = self.mgr.spec_store[daemon_spec.service_name].spec.custom_configs | |
1169 | if configs is not None: | |
1170 | daemon_spec.final_config.update( | |
1171 | {'custom_config_files': [c.to_json() for c in configs]}) | |
1172 | ||
f67539c2 | 1173 | if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url: |
20effc67 | 1174 | await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials')))) |
f67539c2 TL |
1175 | |
1176 | self.log.info('%s daemon %s on %s' % ( | |
1177 | 'Reconfiguring' if reconfig else 'Deploying', | |
1178 | daemon_spec.name(), daemon_spec.host)) | |
1179 | ||
20effc67 | 1180 | out, err, code = await self._run_cephadm( |
f67539c2 TL |
1181 | daemon_spec.host, daemon_spec.name(), 'deploy', |
1182 | [ | |
1183 | '--name', daemon_spec.name(), | |
1184 | '--meta-json', json.dumps({ | |
1185 | 'service_name': daemon_spec.service_name, | |
1186 | 'ports': daemon_spec.ports, | |
1187 | 'ip': daemon_spec.ip, | |
1188 | 'deployed_by': self.mgr.get_active_mgr_digests(), | |
b3b6e05e TL |
1189 | 'rank': daemon_spec.rank, |
1190 | 'rank_generation': daemon_spec.rank_generation, | |
39ae355f TL |
1191 | 'extra_container_args': extra_container_args, |
1192 | 'extra_entrypoint_args': extra_entrypoint_args | |
f67539c2 TL |
1193 | }), |
1194 | '--config-json', '-', | |
1195 | ] + daemon_spec.extra_args, | |
1196 | stdin=json.dumps(daemon_spec.final_config), | |
20effc67 TL |
1197 | image=image, |
1198 | ) | |
1199 | ||
1200 | if daemon_spec.daemon_type == 'agent': | |
1201 | self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now() | |
1202 | self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1 | |
f67539c2 TL |
1203 | |
1204 | # refresh daemon state? (ceph daemon reconfig does not need it) | |
1205 | if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES: | |
1206 | if not code and daemon_spec.host in self.mgr.cache.daemons: | |
1207 | # prime cached service state with what we (should have) | |
1208 | # just created | |
1209 | sd = daemon_spec.to_daemon_description( | |
20effc67 | 1210 | DaemonDescriptionStatus.starting, 'starting') |
f67539c2 | 1211 | self.mgr.cache.add_daemon(daemon_spec.host, sd) |
522d829b | 1212 | if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS: |
a4b75251 | 1213 | self.mgr.requires_post_actions.add(daemon_spec.name()) |
f67539c2 TL |
1214 | self.mgr.cache.invalidate_host_daemons(daemon_spec.host) |
1215 | ||
20effc67 TL |
1216 | if daemon_spec.daemon_type != 'agent': |
1217 | self.mgr.cache.update_daemon_config_deps( | |
1218 | daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time) | |
1219 | self.mgr.cache.save_host(daemon_spec.host) | |
1220 | else: | |
1221 | self.mgr.agent_cache.update_agent_config_deps( | |
1222 | daemon_spec.host, daemon_spec.deps, start_time) | |
1223 | self.mgr.agent_cache.save_agent(daemon_spec.host) | |
f67539c2 TL |
1224 | msg = "{} {} on host '{}'".format( |
1225 | 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) | |
1226 | if not code: | |
1227 | self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) | |
1228 | else: | |
1229 | what = 'reconfigure' if reconfig else 'deploy' | |
1230 | self.mgr.events.for_daemon( | |
1231 | daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') | |
1232 | return msg | |
1233 | except OrchestratorError: | |
1234 | redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names() | |
1235 | if not reconfig and not redeploy: | |
1236 | # we have to clean up the daemon. E.g. keyrings. | |
1237 | servict_type = daemon_type_to_service(daemon_spec.daemon_type) | |
1238 | dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed') | |
a4b75251 | 1239 | self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True) |
f67539c2 TL |
1240 | raise |
1241 | ||
39ae355f TL |
1242 | def _setup_extra_deployment_args(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[CephadmDaemonDeploySpec, Optional[List[str]], Optional[List[str]]]: |
1243 | # this function is for handling any potential user specified | |
1244 | # (in the service spec) extra runtime or entrypoint args for a daemon | |
1245 | # we are going to deploy. Effectively just adds a set of extra args to | |
1246 | # pass to the cephadm binary to indicate the daemon being deployed | |
1247 | # needs extra runtime/entrypoint args. Returns the modified daemon spec | |
1248 | # as well as what args were added (as those are included in unit.meta file) | |
1249 | try: | |
1250 | eca = daemon_spec.extra_container_args | |
1251 | if eca: | |
1252 | for a in eca: | |
1253 | # args with spaces need to be split into multiple args | |
1254 | # in order to work properly | |
1255 | args = a.split(' ') | |
1256 | for arg in args: | |
1257 | if arg: | |
1258 | daemon_spec.extra_args.append(f'--extra-container-args={arg}') | |
1259 | except AttributeError: | |
1260 | eca = None | |
1261 | try: | |
1262 | eea = daemon_spec.extra_entrypoint_args | |
1263 | if eea: | |
1264 | for a in eea: | |
1265 | # args with spaces need to be split into multiple args | |
1266 | # in order to work properly | |
1267 | args = a.split(' ') | |
1268 | for arg in args: | |
1269 | if arg: | |
1270 | daemon_spec.extra_args.append(f'--extra-entrypoint-args={arg}') | |
1271 | except AttributeError: | |
1272 | eea = None | |
1273 | return daemon_spec, eca, eea | |
1274 | ||
20effc67 | 1275 | def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str: |
f67539c2 TL |
1276 | """ |
1277 | Remove a daemon | |
1278 | """ | |
1279 | (daemon_type, daemon_id) = name.split('.', 1) | |
1280 | daemon = orchestrator.DaemonDescription( | |
1281 | daemon_type=daemon_type, | |
1282 | daemon_id=daemon_id, | |
1283 | hostname=host) | |
1284 | ||
1285 | with set_exception_subject('service', daemon.service_id(), overwrite=True): | |
1286 | ||
1287 | self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon) | |
f67539c2 TL |
1288 | # NOTE: we are passing the 'force' flag here, which means |
1289 | # we can delete a mon instances data. | |
33c7a0ef TL |
1290 | dd = self.mgr.cache.get_daemon(daemon.daemon_name) |
1291 | if dd.ports: | |
1292 | args = ['--name', name, '--force', '--tcp-ports', ' '.join(map(str, dd.ports))] | |
1293 | else: | |
1294 | args = ['--name', name, '--force'] | |
1295 | ||
1296 | self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports)) | |
20effc67 TL |
1297 | out, err, code = self.mgr.wait_async(self._run_cephadm( |
1298 | host, name, 'rm-daemon', args)) | |
f67539c2 TL |
1299 | if not code: |
1300 | # remove item from cache | |
1301 | self.mgr.cache.rm_daemon(host, name) | |
1302 | self.mgr.cache.invalidate_host_daemons(host) | |
1303 | ||
20effc67 | 1304 | if not no_post_remove: |
39ae355f TL |
1305 | if daemon_type not in ['iscsi']: |
1306 | self.mgr.cephadm_services[daemon_type_to_service( | |
1307 | daemon_type)].post_remove(daemon, is_failed_deploy=False) | |
1308 | else: | |
1309 | self.mgr.scheduled_async_actions.append(lambda: self.mgr.cephadm_services[daemon_type_to_service( | |
1310 | daemon_type)].post_remove(daemon, is_failed_deploy=False)) | |
1311 | self.mgr._kick_serve_loop() | |
f67539c2 TL |
1312 | |
1313 | return "Removed {} from host '{}'".format(name, host) | |
adb31ebb | 1314 | |
20effc67 TL |
1315 | async def _run_cephadm_json(self, |
1316 | host: str, | |
1317 | entity: Union[CephadmNoImage, str], | |
1318 | command: str, | |
1319 | args: List[str], | |
1320 | no_fsid: Optional[bool] = False, | |
39ae355f | 1321 | error_ok: Optional[bool] = False, |
20effc67 | 1322 | image: Optional[str] = "", |
39ae355f | 1323 | log_output: Optional[bool] = True, |
20effc67 | 1324 | ) -> Any: |
adb31ebb | 1325 | try: |
20effc67 | 1326 | out, err, code = await self._run_cephadm( |
39ae355f TL |
1327 | host, entity, command, args, no_fsid=no_fsid, error_ok=error_ok, |
1328 | image=image, log_output=log_output) | |
adb31ebb TL |
1329 | if code: |
1330 | raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}') | |
1331 | except Exception as e: | |
1332 | raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}') | |
1333 | try: | |
1334 | return json.loads(''.join(out)) | |
1335 | except (ValueError, KeyError): | |
1336 | msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON' | |
1337 | self.log.exception(f'{msg}: {"".join(out)}') | |
1338 | raise OrchestratorError(msg) | |
1339 | ||
20effc67 TL |
1340 | async def _run_cephadm(self, |
1341 | host: str, | |
1342 | entity: Union[CephadmNoImage, str], | |
1343 | command: str, | |
1344 | args: List[str], | |
1345 | addr: Optional[str] = "", | |
1346 | stdin: Optional[str] = "", | |
1347 | no_fsid: Optional[bool] = False, | |
1348 | error_ok: Optional[bool] = False, | |
1349 | image: Optional[str] = "", | |
1350 | env_vars: Optional[List[str]] = None, | |
39ae355f | 1351 | log_output: Optional[bool] = True, |
20effc67 | 1352 | ) -> Tuple[List[str], List[str], int]: |
f67539c2 TL |
1353 | """ |
1354 | Run cephadm on the remote host with the given command + args | |
1355 | ||
1356 | Important: You probably don't want to run _run_cephadm from CLI handlers | |
1357 | ||
1358 | :env_vars: in format -> [KEY=VALUE, ..] | |
1359 | """ | |
20effc67 TL |
1360 | |
1361 | await self.mgr.ssh._remote_connection(host, addr) | |
1362 | ||
f67539c2 TL |
1363 | self.log.debug(f"_run_cephadm : command = {command}") |
1364 | self.log.debug(f"_run_cephadm : args = {args}") | |
1365 | ||
20effc67 | 1366 | bypass_image = ('agent') |
f67539c2 | 1367 | |
20effc67 TL |
1368 | assert image or entity |
1369 | # Skip the image check for daemons deployed that are not ceph containers | |
1370 | if not str(entity).startswith(bypass_image): | |
1371 | if not image and entity is not cephadmNoImage: | |
1372 | image = self.mgr._get_container_image(entity) | |
f67539c2 | 1373 | |
20effc67 | 1374 | final_args = [] |
f67539c2 | 1375 | |
20effc67 TL |
1376 | # global args |
1377 | if env_vars: | |
1378 | for env_var_pair in env_vars: | |
1379 | final_args.extend(['--env', env_var_pair]) | |
f67539c2 | 1380 | |
20effc67 TL |
1381 | if image: |
1382 | final_args.extend(['--image', image]) | |
f67539c2 | 1383 | |
20effc67 TL |
1384 | if not self.mgr.container_init: |
1385 | final_args += ['--no-container-init'] | |
f67539c2 | 1386 | |
39ae355f TL |
1387 | if not self.mgr.cgroups_split: |
1388 | final_args += ['--no-cgroups-split'] | |
1389 | ||
20effc67 TL |
1390 | # subcommand |
1391 | final_args.append(command) | |
f67539c2 | 1392 | |
20effc67 TL |
1393 | # subcommand args |
1394 | if not no_fsid: | |
1395 | final_args += ['--fsid', self.mgr._cluster_fsid] | |
f67539c2 | 1396 | |
20effc67 | 1397 | final_args += args |
f67539c2 | 1398 | |
20effc67 TL |
1399 | # exec |
1400 | self.log.debug('args: %s' % (' '.join(final_args))) | |
1401 | if self.mgr.mode == 'root': | |
1402 | # agent has cephadm binary as an extra file which is | |
1403 | # therefore passed over stdin. Even for debug logs it's too much | |
1404 | if stdin and 'agent' not in str(entity): | |
1405 | self.log.debug('stdin: %s' % stdin) | |
f67539c2 | 1406 | |
20effc67 TL |
1407 | cmd = ['which', 'python3'] |
1408 | python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr) | |
1409 | cmd = [python, self.mgr.cephadm_binary_path] + final_args | |
f67539c2 | 1410 | |
20effc67 TL |
1411 | try: |
1412 | out, err, code = await self.mgr.ssh._execute_command( | |
1413 | host, cmd, stdin=stdin, addr=addr) | |
1414 | if code == 2: | |
1415 | ls_cmd = ['ls', self.mgr.cephadm_binary_path] | |
39ae355f TL |
1416 | out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr, |
1417 | log_command=log_output) | |
20effc67 TL |
1418 | if code_ls == 2: |
1419 | await self._deploy_cephadm_binary(host, addr) | |
1420 | out, err, code = await self.mgr.ssh._execute_command( | |
1421 | host, cmd, stdin=stdin, addr=addr) | |
1422 | # if there is an agent on this host, make sure it is using the most recent | |
1423 | # vesion of cephadm binary | |
1424 | if host in self.mgr.inventory: | |
1425 | for agent in self.mgr.cache.get_daemons_by_type('agent', host): | |
1426 | self.mgr._schedule_daemon_action(agent.name(), 'redeploy') | |
1427 | ||
1428 | except Exception as e: | |
1429 | await self.mgr.ssh._reset_con(host) | |
1430 | if error_ok: | |
1431 | return [], [str(e)], 1 | |
1432 | raise | |
1433 | ||
1434 | elif self.mgr.mode == 'cephadm-package': | |
1435 | try: | |
1436 | cmd = ['/usr/bin/cephadm'] + final_args | |
1437 | out, err, code = await self.mgr.ssh._execute_command( | |
1438 | host, cmd, stdin=stdin, addr=addr) | |
1439 | except Exception as e: | |
1440 | await self.mgr.ssh._reset_con(host) | |
1441 | if error_ok: | |
1442 | return [], [str(e)], 1 | |
1443 | raise | |
1444 | else: | |
1445 | assert False, 'unsupported mode' | |
1446 | ||
39ae355f TL |
1447 | if log_output: |
1448 | self.log.debug(f'code: {code}') | |
1449 | if out: | |
1450 | self.log.debug(f'out: {out}') | |
1451 | if err: | |
1452 | self.log.debug(f'err: {err}') | |
20effc67 TL |
1453 | if code and not error_ok: |
1454 | raise OrchestratorError( | |
1455 | f'cephadm exited with an error code: {code}, stderr: {err}') | |
1456 | return [out], [err], code | |
1457 | ||
1458 | async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo: | |
f67539c2 TL |
1459 | # pick a random host... |
1460 | host = None | |
1461 | for host_name in self.mgr.inventory.keys(): | |
1462 | host = host_name | |
1463 | break | |
1464 | if not host: | |
1465 | raise OrchestratorError('no hosts defined') | |
1466 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url: | |
20effc67 | 1467 | await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials')))) |
f67539c2 | 1468 | |
39ae355f TL |
1469 | j = None |
1470 | if not self.mgr.use_repo_digest: | |
1471 | try: | |
1472 | j = await self._run_cephadm_json(host, '', 'inspect-image', [], | |
1473 | image=image_name, no_fsid=True, | |
1474 | error_ok=True) | |
1475 | except OrchestratorError: | |
1476 | pass | |
a4b75251 | 1477 | |
39ae355f TL |
1478 | if not j: |
1479 | pullargs: List[str] = [] | |
1480 | if self.mgr.registry_insecure: | |
1481 | pullargs.append("--insecure") | |
f67539c2 | 1482 | |
39ae355f TL |
1483 | j = await self._run_cephadm_json(host, '', 'pull', pullargs, |
1484 | image=image_name, no_fsid=True) | |
f67539c2 TL |
1485 | r = ContainerInspectInfo( |
1486 | j['image_id'], | |
1487 | j.get('ceph_version'), | |
1488 | j.get('repo_digests') | |
1489 | ) | |
1490 | self.log.debug(f'image {image_name} -> {r}') | |
1491 | return r | |
1492 | ||
1493 | # function responsible for logging single host into custom registry | |
20effc67 TL |
1494 | async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]: |
1495 | self.log.debug( | |
1496 | f"Attempting to log host {host} into custom registry @ {registry_json['url']}") | |
f67539c2 | 1497 | # want to pass info over stdin rather than through normal list of args |
20effc67 | 1498 | out, err, code = await self._run_cephadm( |
f67539c2 | 1499 | host, 'mon', 'registry-login', |
20effc67 | 1500 | ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True) |
f67539c2 | 1501 | if code: |
20effc67 | 1502 | return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password" |
f67539c2 TL |
1503 | return None |
1504 | ||
20effc67 | 1505 | async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None: |
f67539c2 TL |
1506 | # Use tee (from coreutils) to create a copy of cephadm on the target machine |
1507 | self.log.info(f"Deploying cephadm binary to {host}") | |
20effc67 TL |
1508 | await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path, |
1509 | self.mgr._cephadm.encode('utf-8'), addr=addr) |