]>
Commit | Line | Data |
---|---|---|
b3b6e05e | 1 | import hashlib |
f91f0fd5 TL |
2 | import json |
3 | import logging | |
b3b6e05e | 4 | import uuid |
f91f0fd5 | 5 | from collections import defaultdict |
20effc67 TL |
6 | from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \ |
7 | DefaultDict | |
f91f0fd5 TL |
8 | |
9 | from ceph.deployment import inventory | |
10 | from ceph.deployment.drive_group import DriveGroupSpec | |
b3b6e05e | 11 | from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec |
20effc67 | 12 | from ceph.utils import datetime_now |
f91f0fd5 TL |
13 | |
14 | import orchestrator | |
f67539c2 TL |
15 | from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \ |
16 | DaemonDescriptionStatus, daemon_type_to_service | |
17 | from cephadm.services.cephadmservice import CephadmDaemonDeploySpec | |
f91f0fd5 | 18 | from cephadm.schedule import HostAssignment |
b3b6e05e | 19 | from cephadm.autotune import MemoryAutotuner |
f67539c2 TL |
20 | from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \ |
21 | CephadmNoImage, CEPH_TYPES, ContainerInspectInfo | |
22 | from mgr_module import MonCommandFailed | |
b3b6e05e | 23 | from mgr_util import format_bytes |
f67539c2 TL |
24 | |
25 | from . import utils | |
f91f0fd5 TL |
26 | |
27 | if TYPE_CHECKING: | |
f67539c2 | 28 | from cephadm.module import CephadmOrchestrator |
f91f0fd5 TL |
29 | |
30 | logger = logging.getLogger(__name__) | |
31 | ||
522d829b TL |
32 | REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw'] |
33 | ||
f91f0fd5 TL |
34 | |
35 | class CephadmServe: | |
36 | """ | |
37 | This module contains functions that are executed in the | |
38 | serve() thread. Thus they don't block the CLI. | |
39 | ||
f67539c2 TL |
40 | Please see the `Note regarding network calls from CLI handlers` |
41 | chapter in the cephadm developer guide. | |
42 | ||
f91f0fd5 TL |
43 | On the other hand, These function should *not* be called form |
44 | CLI handlers, to avoid blocking the CLI | |
45 | """ | |
46 | ||
47 | def __init__(self, mgr: "CephadmOrchestrator"): | |
48 | self.mgr: "CephadmOrchestrator" = mgr | |
49 | self.log = logger | |
50 | ||
51 | def serve(self) -> None: | |
52 | """ | |
53 | The main loop of cephadm. | |
54 | ||
55 | A command handler will typically change the declarative state | |
56 | of cephadm. This loop will then attempt to apply this new state. | |
57 | """ | |
58 | self.log.debug("serve starting") | |
f67539c2 TL |
59 | self.mgr.config_checker.load_network_config() |
60 | ||
f91f0fd5 | 61 | while self.mgr.run: |
20effc67 | 62 | self.log.debug("serve loop start") |
f91f0fd5 TL |
63 | |
64 | try: | |
65 | ||
66 | self.convert_tags_to_repo_digest() | |
67 | ||
68 | # refresh daemons | |
69 | self.log.debug('refreshing hosts and daemons') | |
70 | self._refresh_hosts_and_daemons() | |
71 | ||
72 | self._check_for_strays() | |
73 | ||
74 | self._update_paused_health() | |
75 | ||
522d829b TL |
76 | if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard: |
77 | self.mgr.need_connect_dashboard_rgw = False | |
78 | if 'dashboard' in self.mgr.get('mgr_map')['modules']: | |
79 | self.log.info('Checking dashboard <-> RGW credentials') | |
80 | self.mgr.remote('dashboard', 'set_rgw_credentials') | |
81 | ||
f91f0fd5 | 82 | if not self.mgr.paused: |
adb31ebb | 83 | self.mgr.to_remove_osds.process_removal_queue() |
f91f0fd5 TL |
84 | |
85 | self.mgr.migration.migrate() | |
86 | if self.mgr.migration.is_migration_ongoing(): | |
87 | continue | |
88 | ||
89 | if self._apply_all_services(): | |
90 | continue # did something, refresh | |
91 | ||
92 | self._check_daemons() | |
93 | ||
f67539c2 TL |
94 | self._purge_deleted_services() |
95 | ||
20effc67 TL |
96 | self._check_for_moved_osds() |
97 | ||
98 | if self.mgr.agent_helpers._handle_use_agent_setting(): | |
99 | continue | |
100 | ||
f91f0fd5 TL |
101 | if self.mgr.upgrade.continue_upgrade(): |
102 | continue | |
103 | ||
104 | except OrchestratorError as e: | |
105 | if e.event_subject: | |
106 | self.mgr.events.from_orch_error(e) | |
107 | ||
20effc67 | 108 | self.log.debug("serve loop sleep") |
f91f0fd5 | 109 | self._serve_sleep() |
20effc67 | 110 | self.log.debug("serve loop wake") |
f91f0fd5 TL |
111 | self.log.debug("serve exit") |
112 | ||
adb31ebb | 113 | def _serve_sleep(self) -> None: |
f67539c2 TL |
114 | sleep_interval = max( |
115 | 30, | |
116 | min( | |
117 | self.mgr.host_check_interval, | |
118 | self.mgr.facts_cache_timeout, | |
119 | self.mgr.daemon_cache_timeout, | |
120 | self.mgr.device_cache_timeout, | |
121 | ) | |
122 | ) | |
f91f0fd5 | 123 | self.log.debug('Sleeping for %d seconds', sleep_interval) |
f67539c2 | 124 | self.mgr.event.wait(sleep_interval) |
f91f0fd5 TL |
125 | self.mgr.event.clear() |
126 | ||
adb31ebb | 127 | def _update_paused_health(self) -> None: |
20effc67 | 128 | self.log.debug('_update_paused_health') |
f91f0fd5 | 129 | if self.mgr.paused: |
20effc67 TL |
130 | self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [ |
131 | "'ceph orch resume' to resume"]) | |
f91f0fd5 | 132 | else: |
a4b75251 | 133 | self.mgr.remove_health_warning('CEPHADM_PAUSED') |
f91f0fd5 | 134 | |
b3b6e05e TL |
135 | def _autotune_host_memory(self, host: str) -> None: |
136 | total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0) | |
137 | if not total_mem: | |
138 | val = None | |
139 | else: | |
140 | total_mem *= 1024 # kb -> bytes | |
141 | total_mem *= self.mgr.autotune_memory_target_ratio | |
142 | a = MemoryAutotuner( | |
143 | daemons=self.mgr.cache.get_daemons_by_host(host), | |
144 | config_get=self.mgr.get_foreign_ceph_option, | |
145 | total_mem=total_mem, | |
146 | ) | |
147 | val, osds = a.tune() | |
148 | any_changed = False | |
149 | for o in osds: | |
150 | if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val: | |
151 | self.mgr.check_mon_command({ | |
152 | 'prefix': 'config rm', | |
153 | 'who': o, | |
154 | 'name': 'osd_memory_target', | |
155 | }) | |
156 | any_changed = True | |
157 | if val is not None: | |
158 | if any_changed: | |
159 | self.mgr.log.info( | |
160 | f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}' | |
161 | ) | |
162 | ret, out, err = self.mgr.mon_command({ | |
163 | 'prefix': 'config set', | |
164 | 'who': f'osd/host:{host}', | |
165 | 'name': 'osd_memory_target', | |
166 | 'value': str(val), | |
167 | }) | |
168 | if ret: | |
169 | self.log.warning( | |
170 | f'Unable to set osd_memory_target on {host} to {val}: {err}' | |
171 | ) | |
172 | else: | |
173 | self.mgr.check_mon_command({ | |
174 | 'prefix': 'config rm', | |
175 | 'who': f'osd/host:{host}', | |
176 | 'name': 'osd_memory_target', | |
177 | }) | |
178 | self.mgr.cache.update_autotune(host) | |
179 | ||
f91f0fd5 | 180 | def _refresh_hosts_and_daemons(self) -> None: |
20effc67 | 181 | self.log.debug('_refresh_hosts_and_daemons') |
f91f0fd5 TL |
182 | bad_hosts = [] |
183 | failures = [] | |
184 | ||
b3b6e05e | 185 | if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys: |
20effc67 TL |
186 | client_files = self._calc_client_files() |
187 | else: | |
188 | client_files = {} | |
b3b6e05e | 189 | |
20effc67 | 190 | agents_down: List[str] = [] |
b3b6e05e | 191 | |
f91f0fd5 | 192 | @forall_hosts |
adb31ebb TL |
193 | def refresh(host: str) -> None: |
194 | ||
f67539c2 TL |
195 | # skip hosts that are in maintenance - they could be powered off |
196 | if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance": | |
197 | return | |
198 | ||
20effc67 TL |
199 | if self.mgr.use_agent: |
200 | if self.mgr.agent_helpers._check_agent(host): | |
201 | agents_down.append(host) | |
202 | ||
f91f0fd5 TL |
203 | if self.mgr.cache.host_needs_check(host): |
204 | r = self._check_host(host) | |
205 | if r is not None: | |
206 | bad_hosts.append(r) | |
f91f0fd5 | 207 | |
20effc67 TL |
208 | if ( |
209 | not self.mgr.use_agent | |
210 | or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()] | |
211 | or host in agents_down | |
212 | ): | |
213 | if self.mgr.cache.host_needs_daemon_refresh(host): | |
214 | self.log.debug('refreshing %s daemons' % host) | |
215 | r = self._refresh_host_daemons(host) | |
216 | if r: | |
217 | failures.append(r) | |
218 | ||
219 | if self.mgr.cache.host_needs_facts_refresh(host): | |
220 | self.log.debug(('Refreshing %s facts' % host)) | |
221 | r = self._refresh_facts(host) | |
222 | if r: | |
223 | failures.append(r) | |
224 | ||
225 | if self.mgr.cache.host_needs_network_refresh(host): | |
226 | self.log.debug(('Refreshing %s networks' % host)) | |
227 | r = self._refresh_host_networks(host) | |
228 | if r: | |
229 | failures.append(r) | |
230 | ||
231 | if self.mgr.cache.host_needs_device_refresh(host): | |
232 | self.log.debug('refreshing %s devices' % host) | |
233 | r = self._refresh_host_devices(host) | |
234 | if r: | |
235 | failures.append(r) | |
236 | self.mgr.cache.metadata_up_to_date[host] = True | |
237 | elif not self.mgr.cache.get_daemons_by_type('agent', host=host): | |
238 | if self.mgr.cache.host_needs_daemon_refresh(host): | |
239 | self.log.debug('refreshing %s daemons' % host) | |
240 | r = self._refresh_host_daemons(host) | |
241 | if r: | |
242 | failures.append(r) | |
243 | self.mgr.cache.metadata_up_to_date[host] = True | |
244 | ||
245 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'): | |
f91f0fd5 | 246 | self.log.debug(f"Logging `{host}` into custom registry") |
20effc67 TL |
247 | r = self.mgr.wait_async(self._registry_login( |
248 | host, json.loads(str(self.mgr.get_store('registry_credentials'))))) | |
f91f0fd5 TL |
249 | if r: |
250 | bad_hosts.append(r) | |
251 | ||
f91f0fd5 TL |
252 | if self.mgr.cache.host_needs_osdspec_preview_refresh(host): |
253 | self.log.debug(f"refreshing OSDSpec previews for {host}") | |
254 | r = self._refresh_host_osdspec_previews(host) | |
255 | if r: | |
256 | failures.append(r) | |
257 | ||
b3b6e05e TL |
258 | if ( |
259 | self.mgr.cache.host_needs_autotune_memory(host) | |
260 | and not self.mgr.inventory.has_label(host, '_no_autotune_memory') | |
261 | ): | |
262 | self.log.debug(f"autotuning memory for {host}") | |
263 | self._autotune_host_memory(host) | |
264 | ||
20effc67 | 265 | self._write_client_files(client_files, host) |
f91f0fd5 TL |
266 | |
267 | refresh(self.mgr.cache.get_hosts()) | |
268 | ||
20effc67 TL |
269 | self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down) |
270 | ||
f67539c2 TL |
271 | self.mgr.config_checker.run_checks() |
272 | ||
adb31ebb TL |
273 | for k in [ |
274 | 'CEPHADM_HOST_CHECK_FAILED', | |
adb31ebb TL |
275 | 'CEPHADM_REFRESH_FAILED', |
276 | ]: | |
a4b75251 | 277 | self.mgr.remove_health_warning(k) |
f91f0fd5 | 278 | if bad_hosts: |
20effc67 TL |
279 | self.mgr.set_health_warning( |
280 | 'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts) | |
f91f0fd5 | 281 | if failures: |
20effc67 TL |
282 | self.mgr.set_health_warning( |
283 | 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures) | |
284 | self.mgr.update_failed_daemon_health_check() | |
f91f0fd5 | 285 | |
adb31ebb | 286 | def _check_host(self, host: str) -> Optional[str]: |
f91f0fd5 | 287 | if host not in self.mgr.inventory: |
adb31ebb | 288 | return None |
f91f0fd5 TL |
289 | self.log.debug(' checking %s' % host) |
290 | try: | |
522d829b | 291 | addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host |
20effc67 | 292 | out, err, code = self.mgr.wait_async(self._run_cephadm( |
f91f0fd5 | 293 | host, cephadmNoImage, 'check-host', [], |
20effc67 | 294 | error_ok=True, no_fsid=True)) |
f91f0fd5 TL |
295 | self.mgr.cache.update_last_host_check(host) |
296 | self.mgr.cache.save_host(host) | |
297 | if code: | |
522d829b | 298 | self.log.debug(' host %s (%s) failed check' % (host, addr)) |
f91f0fd5 | 299 | if self.mgr.warn_on_failed_host_check: |
522d829b | 300 | return 'host %s (%s) failed check: %s' % (host, addr, err) |
f91f0fd5 | 301 | else: |
522d829b | 302 | self.log.debug(' host %s (%s) ok' % (host, addr)) |
f91f0fd5 | 303 | except Exception as e: |
522d829b TL |
304 | self.log.debug(' host %s (%s) failed check' % (host, addr)) |
305 | return 'host %s (%s) failed check: %s' % (host, addr, e) | |
adb31ebb | 306 | return None |
f91f0fd5 | 307 | |
adb31ebb | 308 | def _refresh_host_daemons(self, host: str) -> Optional[str]: |
f91f0fd5 | 309 | try: |
20effc67 | 310 | ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)) |
adb31ebb TL |
311 | except OrchestratorError as e: |
312 | return str(e) | |
20effc67 | 313 | self.mgr._process_ls_output(host, ls) |
f91f0fd5 TL |
314 | return None |
315 | ||
adb31ebb | 316 | def _refresh_facts(self, host: str) -> Optional[str]: |
f91f0fd5 | 317 | try: |
20effc67 TL |
318 | val = self.mgr.wait_async(self._run_cephadm_json( |
319 | host, cephadmNoImage, 'gather-facts', [], no_fsid=True)) | |
adb31ebb TL |
320 | except OrchestratorError as e: |
321 | return str(e) | |
322 | ||
323 | self.mgr.cache.update_host_facts(host, val) | |
324 | ||
325 | return None | |
326 | ||
327 | def _refresh_host_devices(self, host: str) -> Optional[str]: | |
20effc67 | 328 | with_lsm = self.mgr.device_enhanced_scan |
f67539c2 | 329 | inventory_args = ['--', 'inventory', |
a4b75251 | 330 | '--format=json-pretty', |
f67539c2 TL |
331 | '--filter-for-batch'] |
332 | if with_lsm: | |
333 | inventory_args.insert(-1, "--with-lsm") | |
334 | ||
f91f0fd5 | 335 | try: |
adb31ebb | 336 | try: |
20effc67 TL |
337 | devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume', |
338 | inventory_args)) | |
adb31ebb TL |
339 | except OrchestratorError as e: |
340 | if 'unrecognized arguments: --filter-for-batch' in str(e): | |
f67539c2 TL |
341 | rerun_args = inventory_args.copy() |
342 | rerun_args.remove('--filter-for-batch') | |
20effc67 TL |
343 | devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume', |
344 | rerun_args)) | |
adb31ebb TL |
345 | else: |
346 | raise | |
347 | ||
adb31ebb TL |
348 | except OrchestratorError as e: |
349 | return str(e) | |
350 | ||
20effc67 TL |
351 | self.log.debug('Refreshed host %s devices (%d)' % ( |
352 | host, len(devices))) | |
adb31ebb | 353 | ret = inventory.Devices.from_json(devices) |
20effc67 | 354 | self.mgr.cache.update_host_devices(host, ret.devices) |
f91f0fd5 TL |
355 | self.update_osdspec_previews(host) |
356 | self.mgr.cache.save_host(host) | |
357 | return None | |
358 | ||
20effc67 TL |
359 | def _refresh_host_networks(self, host: str) -> Optional[str]: |
360 | try: | |
361 | networks = self.mgr.wait_async(self._run_cephadm_json( | |
362 | host, 'mon', 'list-networks', [], no_fsid=True)) | |
363 | except OrchestratorError as e: | |
364 | return str(e) | |
365 | ||
366 | self.log.debug('Refreshed host %s networks (%s)' % ( | |
367 | host, len(networks))) | |
368 | self.mgr.cache.update_host_networks(host, networks) | |
369 | self.mgr.cache.save_host(host) | |
370 | return None | |
371 | ||
adb31ebb | 372 | def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]: |
f91f0fd5 TL |
373 | self.update_osdspec_previews(host) |
374 | self.mgr.cache.save_host(host) | |
375 | self.log.debug(f'Refreshed OSDSpec previews for host <{host}>') | |
adb31ebb | 376 | return None |
f91f0fd5 | 377 | |
adb31ebb | 378 | def update_osdspec_previews(self, search_host: str = '') -> None: |
f91f0fd5 TL |
379 | # Set global 'pending' flag for host |
380 | self.mgr.cache.loading_osdspec_preview.add(search_host) | |
381 | previews = [] | |
382 | # query OSDSpecs for host <search host> and generate/get the preview | |
383 | # There can be multiple previews for one host due to multiple OSDSpecs. | |
384 | previews.extend(self.mgr.osd_service.get_previews(search_host)) | |
f67539c2 | 385 | self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>') |
f91f0fd5 TL |
386 | self.mgr.cache.osdspec_previews[search_host] = previews |
387 | # Unset global 'pending' flag for host | |
388 | self.mgr.cache.loading_osdspec_preview.remove(search_host) | |
389 | ||
f91f0fd5 TL |
390 | def _check_for_strays(self) -> None: |
391 | self.log.debug('_check_for_strays') | |
392 | for k in ['CEPHADM_STRAY_HOST', | |
393 | 'CEPHADM_STRAY_DAEMON']: | |
a4b75251 | 394 | self.mgr.remove_health_warning(k) |
f91f0fd5 TL |
395 | if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons: |
396 | ls = self.mgr.list_servers() | |
a4b75251 | 397 | self.log.debug(ls) |
f91f0fd5 TL |
398 | managed = self.mgr.cache.get_daemon_names() |
399 | host_detail = [] # type: List[str] | |
400 | host_num_daemons = 0 | |
401 | daemon_detail = [] # type: List[str] | |
402 | for item in ls: | |
403 | host = item.get('hostname') | |
f67539c2 | 404 | assert isinstance(host, str) |
f91f0fd5 | 405 | daemons = item.get('services') # misnomer! |
f67539c2 | 406 | assert isinstance(daemons, list) |
f91f0fd5 TL |
407 | missing_names = [] |
408 | for s in daemons: | |
f67539c2 TL |
409 | daemon_id = s.get('id') |
410 | assert daemon_id | |
411 | name = '%s.%s' % (s.get('type'), daemon_id) | |
412 | if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']: | |
f91f0fd5 | 413 | metadata = self.mgr.get_metadata( |
f67539c2 TL |
414 | cast(str, s.get('type')), daemon_id, {}) |
415 | assert metadata is not None | |
416 | try: | |
417 | if s.get('type') == 'rgw-nfs': | |
418 | # https://tracker.ceph.com/issues/49573 | |
419 | name = metadata['id'][:-4] | |
420 | else: | |
421 | name = '%s.%s' % (s.get('type'), metadata['id']) | |
422 | except (KeyError, TypeError): | |
f91f0fd5 | 423 | self.log.debug( |
f67539c2 TL |
424 | "Failed to find daemon id for %s service %s" % ( |
425 | s.get('type'), s.get('id') | |
426 | ) | |
427 | ) | |
20effc67 TL |
428 | if s.get('type') == 'tcmu-runner': |
429 | # because we don't track tcmu-runner daemons in the host cache | |
430 | # and don't have a way to check if the daemon is part of iscsi service | |
431 | # we assume that all tcmu-runner daemons are managed by cephadm | |
432 | managed.append(name) | |
f91f0fd5 TL |
433 | if host not in self.mgr.inventory: |
434 | missing_names.append(name) | |
435 | host_num_daemons += 1 | |
436 | if name not in managed: | |
437 | daemon_detail.append( | |
438 | 'stray daemon %s on host %s not managed by cephadm' % (name, host)) | |
439 | if missing_names: | |
440 | host_detail.append( | |
441 | 'stray host %s has %d stray daemons: %s' % ( | |
442 | host, len(missing_names), missing_names)) | |
443 | if self.mgr.warn_on_stray_hosts and host_detail: | |
20effc67 TL |
444 | self.mgr.set_health_warning( |
445 | 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail) | |
f91f0fd5 | 446 | if self.mgr.warn_on_stray_daemons and daemon_detail: |
20effc67 TL |
447 | self.mgr.set_health_warning( |
448 | 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail) | |
449 | ||
450 | def _check_for_moved_osds(self) -> None: | |
451 | self.log.debug('_check_for_moved_osds') | |
452 | all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list) | |
453 | for dd in self.mgr.cache.get_daemons_by_type('osd'): | |
454 | assert dd.daemon_id | |
455 | all_osds[int(dd.daemon_id)].append(dd) | |
456 | for osd_id, dds in all_osds.items(): | |
457 | if len(dds) <= 1: | |
458 | continue | |
459 | running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running] | |
460 | error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error] | |
461 | msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}' | |
462 | logger.info(msg) | |
463 | if len(running) != 1: | |
464 | continue | |
465 | osd = self.mgr.get_osd_by_id(osd_id) | |
466 | if not osd or not osd['up']: | |
467 | continue | |
468 | for e in error: | |
469 | assert e.hostname | |
470 | try: | |
471 | self._remove_daemon(e.name(), e.hostname, no_post_remove=True) | |
472 | self.mgr.events.for_daemon( | |
473 | e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'") | |
474 | except OrchestratorError as ex: | |
475 | self.mgr.events.from_orch_error(ex) | |
476 | logger.exception(f'failed to remove duplicated daemon {e}') | |
f91f0fd5 TL |
477 | |
478 | def _apply_all_services(self) -> bool: | |
20effc67 | 479 | self.log.debug('_apply_all_services') |
f91f0fd5 TL |
480 | r = False |
481 | specs = [] # type: List[ServiceSpec] | |
20effc67 TL |
482 | # if metadata is not up to date, we still need to apply spec for agent |
483 | # since the agent is the one who gather the metadata. If we don't we | |
484 | # end up stuck between wanting metadata to be up to date to apply specs | |
485 | # and needing to apply the agent spec to get up to date metadata | |
486 | if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date(): | |
487 | self.log.info('Metadata not up to date on all hosts. Skipping non agent specs') | |
488 | try: | |
489 | specs.append(self.mgr.spec_store['agent'].spec) | |
490 | except Exception as e: | |
491 | self.log.debug(f'Failed to find agent spec: {e}') | |
492 | self.mgr.agent_helpers._apply_agent() | |
493 | return r | |
494 | else: | |
495 | for sn, spec in self.mgr.spec_store.active_specs.items(): | |
496 | specs.append(spec) | |
a4b75251 TL |
497 | for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']: |
498 | self.mgr.remove_health_warning(name) | |
499 | self.mgr.apply_spec_fails = [] | |
f91f0fd5 TL |
500 | for spec in specs: |
501 | try: | |
502 | if self._apply_service(spec): | |
503 | r = True | |
504 | except Exception as e: | |
a4b75251 TL |
505 | msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}' |
506 | self.log.exception(msg) | |
f91f0fd5 | 507 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) |
a4b75251 TL |
508 | self.mgr.apply_spec_fails.append((spec.service_name(), str(e))) |
509 | warnings = [] | |
510 | for x in self.mgr.apply_spec_fails: | |
511 | warnings.append(f'{x[0]}: {x[1]}') | |
512 | self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL', | |
513 | f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}", | |
514 | len(self.mgr.apply_spec_fails), | |
515 | warnings) | |
f91f0fd5 TL |
516 | |
517 | return r | |
518 | ||
f67539c2 TL |
519 | def _apply_service_config(self, spec: ServiceSpec) -> None: |
520 | if spec.config: | |
521 | section = utils.name_to_config_section(spec.service_name()) | |
a4b75251 TL |
522 | for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']: |
523 | self.mgr.remove_health_warning(name) | |
524 | invalid_config_options = [] | |
525 | options_failed_to_set = [] | |
f67539c2 TL |
526 | for k, v in spec.config.items(): |
527 | try: | |
528 | current = self.mgr.get_foreign_ceph_option(section, k) | |
529 | except KeyError: | |
a4b75251 TL |
530 | msg = f'Ignoring invalid {spec.service_name()} config option {k}' |
531 | self.log.warning(msg) | |
f67539c2 TL |
532 | self.mgr.events.for_service( |
533 | spec, OrchestratorEvent.ERROR, f'Invalid config option {k}' | |
534 | ) | |
a4b75251 | 535 | invalid_config_options.append(msg) |
f67539c2 TL |
536 | continue |
537 | if current != v: | |
538 | self.log.debug(f'setting [{section}] {k} = {v}') | |
539 | try: | |
540 | self.mgr.check_mon_command({ | |
541 | 'prefix': 'config set', | |
542 | 'name': k, | |
543 | 'value': str(v), | |
544 | 'who': section, | |
545 | }) | |
546 | except MonCommandFailed as e: | |
a4b75251 TL |
547 | msg = f'Failed to set {spec.service_name()} option {k}: {e}' |
548 | self.log.warning(msg) | |
549 | options_failed_to_set.append(msg) | |
550 | ||
551 | if invalid_config_options: | |
20effc67 TL |
552 | self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len( |
553 | invalid_config_options), invalid_config_options) | |
a4b75251 | 554 | if options_failed_to_set: |
20effc67 TL |
555 | self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len( |
556 | options_failed_to_set), options_failed_to_set) | |
f91f0fd5 TL |
557 | |
558 | def _apply_service(self, spec: ServiceSpec) -> bool: | |
559 | """ | |
560 | Schedule a service. Deploy new daemons or remove old ones, depending | |
561 | on the target label and count specified in the placement. | |
562 | """ | |
563 | self.mgr.migration.verify_no_migration() | |
564 | ||
f67539c2 | 565 | service_type = spec.service_type |
f91f0fd5 TL |
566 | service_name = spec.service_name() |
567 | if spec.unmanaged: | |
568 | self.log.debug('Skipping unmanaged service %s' % service_name) | |
569 | return False | |
570 | if spec.preview_only: | |
571 | self.log.debug('Skipping preview_only service %s' % service_name) | |
572 | return False | |
573 | self.log.debug('Applying service %s spec' % service_name) | |
574 | ||
20effc67 TL |
575 | if service_type == 'agent': |
576 | try: | |
577 | assert self.mgr.cherrypy_thread | |
578 | assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert() | |
579 | except Exception: | |
580 | self.log.info( | |
581 | 'Delaying applying agent spec until cephadm endpoint root cert created') | |
582 | return False | |
583 | ||
f67539c2 | 584 | self._apply_service_config(spec) |
f91f0fd5 | 585 | |
f67539c2 | 586 | if service_type == 'osd': |
f91f0fd5 TL |
587 | self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec)) |
588 | # TODO: return True would result in a busy loop | |
589 | # can't know if daemon count changed; create_from_spec doesn't | |
590 | # return a solid indication | |
591 | return False | |
592 | ||
f67539c2 | 593 | svc = self.mgr.cephadm_services[service_type] |
f91f0fd5 TL |
594 | daemons = self.mgr.cache.get_daemons_by_service(service_name) |
595 | ||
b3b6e05e | 596 | public_networks: List[str] = [] |
f67539c2 TL |
597 | if service_type == 'mon': |
598 | out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network')) | |
f91f0fd5 | 599 | if '/' in out: |
b3b6e05e TL |
600 | public_networks = [x.strip() for x in out.split(',')] |
601 | self.log.debug('mon public_network(s) is %s' % public_networks) | |
f91f0fd5 TL |
602 | |
603 | def matches_network(host): | |
604 | # type: (str) -> bool | |
b3b6e05e | 605 | # make sure we have 1 or more IPs for any of those networks on that |
f91f0fd5 | 606 | # host |
b3b6e05e TL |
607 | for network in public_networks: |
608 | if len(self.mgr.cache.networks[host].get(network, [])) > 0: | |
609 | return True | |
610 | self.log.info( | |
611 | f"Filtered out host {host}: does not belong to mon public_network" | |
612 | f" ({','.join(public_networks)})" | |
613 | ) | |
614 | return False | |
f91f0fd5 | 615 | |
b3b6e05e TL |
616 | rank_map = None |
617 | if svc.ranked(): | |
618 | rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {} | |
f91f0fd5 TL |
619 | ha = HostAssignment( |
620 | spec=spec, | |
20effc67 TL |
621 | hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name( |
622 | ) == 'agent' else self.mgr.cache.get_schedulable_hosts(), | |
623 | unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), | |
f67539c2 TL |
624 | daemons=daemons, |
625 | networks=self.mgr.cache.networks, | |
626 | filter_new_host=( | |
627 | matches_network if service_type == 'mon' | |
628 | else None | |
629 | ), | |
630 | allow_colo=svc.allow_colo(), | |
631 | primary_daemon_type=svc.primary_daemon_type(), | |
632 | per_host_daemon_type=svc.per_host_daemon_type(), | |
b3b6e05e | 633 | rank_map=rank_map, |
f91f0fd5 TL |
634 | ) |
635 | ||
f67539c2 TL |
636 | try: |
637 | all_slots, slots_to_add, daemons_to_remove = ha.place() | |
b3b6e05e | 638 | daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get( |
522d829b | 639 | 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)] |
f67539c2 TL |
640 | self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove)) |
641 | except OrchestratorError as e: | |
a4b75251 TL |
642 | msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}' |
643 | self.log.error(msg) | |
f67539c2 | 644 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) |
a4b75251 TL |
645 | self.mgr.apply_spec_fails.append((spec.service_name(), str(e))) |
646 | warnings = [] | |
647 | for x in self.mgr.apply_spec_fails: | |
648 | warnings.append(f'{x[0]}: {x[1]}') | |
649 | self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL', | |
650 | f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}", | |
651 | len(self.mgr.apply_spec_fails), | |
652 | warnings) | |
f67539c2 | 653 | return False |
f91f0fd5 TL |
654 | |
655 | r = None | |
656 | ||
657 | # sanity check | |
f67539c2 TL |
658 | final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove) |
659 | if service_type in ['mon', 'mgr'] and final_count < 1: | |
660 | self.log.debug('cannot scale mon|mgr below 1)') | |
f91f0fd5 TL |
661 | return False |
662 | ||
b3b6e05e TL |
663 | # progress |
664 | progress_id = str(uuid.uuid4()) | |
665 | delta: List[str] = [] | |
666 | if slots_to_add: | |
667 | delta += [f'+{len(slots_to_add)}'] | |
668 | if daemons_to_remove: | |
669 | delta += [f'-{len(daemons_to_remove)}'] | |
670 | progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})' | |
671 | progress_total = len(slots_to_add) + len(daemons_to_remove) | |
672 | progress_done = 0 | |
673 | ||
674 | def update_progress() -> None: | |
675 | self.mgr.remote( | |
676 | 'progress', 'update', progress_id, | |
677 | ev_msg=progress_title, | |
678 | ev_progress=(progress_done / progress_total), | |
679 | add_to_ceph_s=True, | |
680 | ) | |
681 | ||
682 | if progress_total: | |
683 | update_progress() | |
684 | ||
f91f0fd5 TL |
685 | # add any? |
686 | did_config = False | |
687 | ||
f67539c2 TL |
688 | self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add) |
689 | self.log.debug('Daemons that will be removed: %s' % daemons_to_remove) | |
f91f0fd5 | 690 | |
20effc67 TL |
691 | hosts_altered: Set[str] = set() |
692 | ||
522d829b TL |
693 | try: |
694 | # assign names | |
695 | for i in range(len(slots_to_add)): | |
696 | slot = slots_to_add[i] | |
697 | slot = slot.assign_name(self.mgr.get_unique_name( | |
698 | slot.daemon_type, | |
699 | slot.hostname, | |
700 | daemons, | |
701 | prefix=spec.service_id, | |
702 | forcename=slot.name, | |
703 | rank=slot.rank, | |
704 | rank_generation=slot.rank_generation, | |
705 | )) | |
706 | slots_to_add[i] = slot | |
707 | if rank_map is not None: | |
708 | assert slot.rank is not None | |
709 | assert slot.rank_generation is not None | |
710 | assert rank_map[slot.rank][slot.rank_generation] is None | |
711 | rank_map[slot.rank][slot.rank_generation] = slot.name | |
712 | ||
713 | if rank_map: | |
714 | # record the rank_map before we make changes so that if we fail the | |
715 | # next mgr will clean up. | |
716 | self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map) | |
717 | ||
718 | # remove daemons now, since we are going to fence them anyway | |
f67539c2 | 719 | for d in daemons_to_remove: |
522d829b | 720 | assert d.hostname is not None |
f67539c2 | 721 | self._remove_daemon(d.name(), d.hostname) |
522d829b TL |
722 | daemons_to_remove = [] |
723 | ||
724 | # fence them | |
725 | svc.fence_old_ranks(spec, rank_map, len(all_slots)) | |
726 | ||
727 | # create daemons | |
a4b75251 | 728 | daemon_place_fails = [] |
522d829b TL |
729 | for slot in slots_to_add: |
730 | # first remove daemon on conflicting port? | |
731 | if slot.ports: | |
732 | for d in daemons_to_remove: | |
733 | if d.hostname != slot.hostname: | |
734 | continue | |
735 | if not (set(d.ports or []) & set(slot.ports)): | |
736 | continue | |
737 | if d.ip and slot.ip and d.ip != slot.ip: | |
738 | continue | |
739 | self.log.info( | |
740 | f'Removing {d.name()} before deploying to {slot} to avoid a port conflict' | |
741 | ) | |
742 | # NOTE: we don't check ok-to-stop here to avoid starvation if | |
743 | # there is only 1 gateway. | |
744 | self._remove_daemon(d.name(), d.hostname) | |
745 | daemons_to_remove.remove(d) | |
746 | progress_done += 1 | |
20effc67 | 747 | hosts_altered.add(d.hostname) |
522d829b TL |
748 | break |
749 | ||
750 | # deploy new daemon | |
751 | daemon_id = slot.name | |
752 | if not did_config: | |
753 | svc.config(spec) | |
754 | did_config = True | |
755 | ||
756 | daemon_spec = svc.make_daemon_spec( | |
757 | slot.hostname, daemon_id, slot.network, spec, | |
758 | daemon_type=slot.daemon_type, | |
759 | ports=slot.ports, | |
760 | ip=slot.ip, | |
761 | rank=slot.rank, | |
762 | rank_generation=slot.rank_generation, | |
763 | ) | |
764 | self.log.debug('Placing %s.%s on host %s' % ( | |
765 | slot.daemon_type, daemon_id, slot.hostname)) | |
766 | ||
767 | try: | |
768 | daemon_spec = svc.prepare_create(daemon_spec) | |
20effc67 | 769 | self.mgr.wait_async(self._create_daemon(daemon_spec)) |
522d829b | 770 | r = True |
b3b6e05e | 771 | progress_done += 1 |
522d829b | 772 | update_progress() |
20effc67 | 773 | hosts_altered.add(daemon_spec.host) |
522d829b TL |
774 | except (RuntimeError, OrchestratorError) as e: |
775 | msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} " | |
776 | f"on {slot.hostname}: {e}") | |
777 | self.mgr.events.for_service(spec, 'ERROR', msg) | |
778 | self.mgr.log.error(msg) | |
a4b75251 | 779 | daemon_place_fails.append(msg) |
522d829b TL |
780 | # only return "no change" if no one else has already succeeded. |
781 | # later successes will also change to True | |
782 | if r is None: | |
783 | r = False | |
784 | progress_done += 1 | |
785 | update_progress() | |
786 | continue | |
f91f0fd5 | 787 | |
522d829b TL |
788 | # add to daemon list so next name(s) will also be unique |
789 | sd = orchestrator.DaemonDescription( | |
790 | hostname=slot.hostname, | |
791 | daemon_type=slot.daemon_type, | |
792 | daemon_id=daemon_id, | |
793 | ) | |
794 | daemons.append(sd) | |
795 | ||
a4b75251 | 796 | if daemon_place_fails: |
20effc67 TL |
797 | self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len( |
798 | daemon_place_fails), daemon_place_fails) | |
a4b75251 | 799 | |
522d829b TL |
800 | # remove any? |
801 | def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool: | |
802 | daemon_ids = [d.daemon_id for d in remove_daemons] | |
803 | assert None not in daemon_ids | |
804 | # setting force flag retains previous behavior | |
805 | r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True) | |
806 | return not r.retval | |
807 | ||
808 | while daemons_to_remove and not _ok_to_stop(daemons_to_remove): | |
809 | # let's find a subset that is ok-to-stop | |
810 | daemons_to_remove.pop() | |
811 | for d in daemons_to_remove: | |
f91f0fd5 | 812 | r = True |
522d829b TL |
813 | assert d.hostname is not None |
814 | self._remove_daemon(d.name(), d.hostname) | |
815 | ||
b3b6e05e TL |
816 | progress_done += 1 |
817 | update_progress() | |
20effc67 | 818 | hosts_altered.add(d.hostname) |
f91f0fd5 | 819 | |
522d829b TL |
820 | self.mgr.remote('progress', 'complete', progress_id) |
821 | except Exception as e: | |
822 | self.mgr.remote('progress', 'fail', progress_id, str(e)) | |
823 | raise | |
20effc67 TL |
824 | finally: |
825 | if self.mgr.use_agent: | |
826 | # can only send ack to agents if we know for sure port they bound to | |
827 | hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [ | |
828 | h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])]) | |
829 | self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True) | |
b3b6e05e | 830 | |
f91f0fd5 TL |
831 | if r is None: |
832 | r = False | |
833 | return r | |
834 | ||
835 | def _check_daemons(self) -> None: | |
20effc67 | 836 | self.log.debug('_check_daemons') |
f91f0fd5 TL |
837 | daemons = self.mgr.cache.get_daemons() |
838 | daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list) | |
839 | for dd in daemons: | |
840 | # orphan? | |
f67539c2 TL |
841 | spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None) |
842 | assert dd.hostname is not None | |
843 | assert dd.daemon_type is not None | |
844 | assert dd.daemon_id is not None | |
f91f0fd5 TL |
845 | if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']: |
846 | # (mon and mgr specs should always exist; osds aren't matched | |
847 | # to a service spec) | |
848 | self.log.info('Removing orphan daemon %s...' % dd.name()) | |
f67539c2 | 849 | self._remove_daemon(dd.name(), dd.hostname) |
f91f0fd5 TL |
850 | |
851 | # ignore unmanaged services | |
852 | if spec and spec.unmanaged: | |
853 | continue | |
854 | ||
b3b6e05e TL |
855 | # ignore daemons for deleted services |
856 | if dd.service_name() in self.mgr.spec_store.spec_deleted: | |
857 | continue | |
858 | ||
20effc67 TL |
859 | if dd.daemon_type == 'agent': |
860 | try: | |
861 | self.mgr.agent_helpers._check_agent(dd.hostname) | |
862 | except Exception as e: | |
863 | self.log.debug( | |
864 | f'Agent {dd.name()} could not be checked in _check_daemons: {e}') | |
865 | continue | |
866 | ||
f91f0fd5 | 867 | # These daemon types require additional configs after creation |
522d829b | 868 | if dd.daemon_type in REQUIRES_POST_ACTIONS: |
f91f0fd5 TL |
869 | daemons_post[dd.daemon_type].append(dd) |
870 | ||
f67539c2 | 871 | if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon( |
f91f0fd5 TL |
872 | self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id: |
873 | dd.is_active = True | |
874 | else: | |
875 | dd.is_active = False | |
876 | ||
f67539c2 | 877 | deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id) |
f91f0fd5 TL |
878 | last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps( |
879 | dd.hostname, dd.name()) | |
880 | if last_deps is None: | |
881 | last_deps = [] | |
882 | action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) | |
883 | if not last_config: | |
884 | self.log.info('Reconfiguring %s (unknown last config time)...' % ( | |
885 | dd.name())) | |
886 | action = 'reconfig' | |
887 | elif last_deps != deps: | |
888 | self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps, | |
889 | deps)) | |
890 | self.log.info('Reconfiguring %s (dependencies changed)...' % ( | |
891 | dd.name())) | |
892 | action = 'reconfig' | |
20effc67 TL |
893 | elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args: |
894 | self.log.debug( | |
895 | f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}') | |
896 | self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .') | |
897 | dd.extra_container_args = spec.extra_container_args | |
898 | action = 'redeploy' | |
f91f0fd5 TL |
899 | elif self.mgr.last_monmap and \ |
900 | self.mgr.last_monmap > last_config and \ | |
901 | dd.daemon_type in CEPH_TYPES: | |
902 | self.log.info('Reconfiguring %s (monmap changed)...' % dd.name()) | |
903 | action = 'reconfig' | |
904 | elif self.mgr.extra_ceph_conf_is_newer(last_config) and \ | |
905 | dd.daemon_type in CEPH_TYPES: | |
906 | self.log.info('Reconfiguring %s (extra config changed)...' % dd.name()) | |
907 | action = 'reconfig' | |
908 | if action: | |
909 | if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \ | |
910 | and action == 'reconfig': | |
911 | action = 'redeploy' | |
912 | try: | |
f67539c2 TL |
913 | daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd) |
914 | self.mgr._daemon_action(daemon_spec, action=action) | |
20effc67 TL |
915 | if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()): |
916 | self.mgr.cache.save_host(dd.hostname) | |
f91f0fd5 TL |
917 | except OrchestratorError as e: |
918 | self.mgr.events.from_orch_error(e) | |
919 | if dd.daemon_type in daemons_post: | |
920 | del daemons_post[dd.daemon_type] | |
921 | # continue... | |
922 | except Exception as e: | |
923 | self.mgr.events.for_daemon_from_exception(dd.name(), e) | |
924 | if dd.daemon_type in daemons_post: | |
925 | del daemons_post[dd.daemon_type] | |
926 | # continue... | |
927 | ||
928 | # do daemon post actions | |
929 | for daemon_type, daemon_descs in daemons_post.items(): | |
a4b75251 TL |
930 | run_post = False |
931 | for d in daemon_descs: | |
932 | if d.name() in self.mgr.requires_post_actions: | |
933 | self.mgr.requires_post_actions.remove(d.name()) | |
934 | run_post = True | |
935 | if run_post: | |
f67539c2 TL |
936 | self.mgr._get_cephadm_service(daemon_type_to_service( |
937 | daemon_type)).daemon_check_post(daemon_descs) | |
938 | ||
939 | def _purge_deleted_services(self) -> None: | |
20effc67 | 940 | self.log.debug('_purge_deleted_services') |
f67539c2 TL |
941 | existing_services = self.mgr.spec_store.all_specs.items() |
942 | for service_name, spec in list(existing_services): | |
943 | if service_name not in self.mgr.spec_store.spec_deleted: | |
944 | continue | |
945 | if self.mgr.cache.get_daemons_by_service(service_name): | |
946 | continue | |
947 | if spec.service_type in ['mon', 'mgr']: | |
948 | continue | |
949 | ||
950 | logger.info(f'Purge service {service_name}') | |
951 | ||
952 | self.mgr.cephadm_services[spec.service_type].purge(service_name) | |
953 | self.mgr.spec_store.finally_rm(service_name) | |
f91f0fd5 | 954 | |
adb31ebb | 955 | def convert_tags_to_repo_digest(self) -> None: |
f91f0fd5 TL |
956 | if not self.mgr.use_repo_digest: |
957 | return | |
958 | settings = self.mgr.upgrade.get_distinct_container_image_settings() | |
959 | digests: Dict[str, ContainerInspectInfo] = {} | |
960 | for container_image_ref in set(settings.values()): | |
961 | if not is_repo_digest(container_image_ref): | |
20effc67 TL |
962 | image_info = self.mgr.wait_async( |
963 | self._get_container_image_info(container_image_ref)) | |
f67539c2 TL |
964 | if image_info.repo_digests: |
965 | # FIXME: we assume the first digest here is the best | |
966 | assert is_repo_digest(image_info.repo_digests[0]), image_info | |
f91f0fd5 TL |
967 | digests[container_image_ref] = image_info |
968 | ||
969 | for entity, container_image_ref in settings.items(): | |
970 | if not is_repo_digest(container_image_ref): | |
971 | image_info = digests[container_image_ref] | |
f67539c2 TL |
972 | if image_info.repo_digests: |
973 | # FIXME: we assume the first digest here is the best | |
974 | self.mgr.set_container_image(entity, image_info.repo_digests[0]) | |
975 | ||
20effc67 TL |
976 | def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]: |
977 | # host -> path -> (mode, uid, gid, content, digest) | |
978 | client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {} | |
979 | ||
980 | # ceph.conf | |
981 | config = self.mgr.get_minimal_ceph_conf().encode('utf-8') | |
982 | config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest()) | |
983 | ||
984 | if self.mgr.manage_etc_ceph_ceph_conf: | |
985 | try: | |
986 | pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts) | |
987 | ha = HostAssignment( | |
988 | spec=ServiceSpec('mon', placement=pspec), | |
989 | hosts=self.mgr.cache.get_schedulable_hosts(), | |
990 | unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), | |
991 | daemons=[], | |
992 | networks=self.mgr.cache.networks, | |
993 | ) | |
994 | all_slots, _, _ = ha.place() | |
995 | for host in {s.hostname for s in all_slots}: | |
996 | if host not in client_files: | |
997 | client_files[host] = {} | |
998 | client_files[host]['/etc/ceph/ceph.conf'] = ( | |
999 | 0o644, 0, 0, bytes(config), str(config_digest) | |
1000 | ) | |
1001 | except Exception as e: | |
1002 | self.mgr.log.warning( | |
1003 | f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}') | |
1004 | ||
1005 | # client keyrings | |
1006 | for ks in self.mgr.keys.keys.values(): | |
1007 | try: | |
1008 | ret, keyring, err = self.mgr.mon_command({ | |
1009 | 'prefix': 'auth get', | |
1010 | 'entity': ks.entity, | |
1011 | }) | |
1012 | if ret: | |
1013 | self.log.warning(f'unable to fetch keyring for {ks.entity}') | |
1014 | continue | |
1015 | digest = ''.join('%02x' % c for c in hashlib.sha256( | |
1016 | keyring.encode('utf-8')).digest()) | |
1017 | ha = HostAssignment( | |
1018 | spec=ServiceSpec('mon', placement=ks.placement), | |
1019 | hosts=self.mgr.cache.get_schedulable_hosts(), | |
1020 | unreachable_hosts=self.mgr.cache.get_unreachable_hosts(), | |
1021 | daemons=[], | |
1022 | networks=self.mgr.cache.networks, | |
1023 | ) | |
1024 | all_slots, _, _ = ha.place() | |
1025 | for host in {s.hostname for s in all_slots}: | |
1026 | if host not in client_files: | |
1027 | client_files[host] = {} | |
1028 | client_files[host]['/etc/ceph/ceph.conf'] = ( | |
1029 | 0o644, 0, 0, bytes(config), str(config_digest) | |
1030 | ) | |
1031 | client_files[host][ks.path] = ( | |
1032 | ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest | |
1033 | ) | |
1034 | except Exception as e: | |
1035 | self.log.warning( | |
1036 | f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}') | |
1037 | return client_files | |
1038 | ||
1039 | def _write_client_files(self, | |
1040 | client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]], | |
1041 | host: str) -> None: | |
1042 | updated_files = False | |
1043 | old_files = self.mgr.cache.get_host_client_files(host).copy() | |
1044 | for path, m in client_files.get(host, {}).items(): | |
1045 | mode, uid, gid, content, digest = m | |
1046 | if path in old_files: | |
1047 | match = old_files[path] == (digest, mode, uid, gid) | |
1048 | del old_files[path] | |
1049 | if match: | |
1050 | continue | |
1051 | self.log.info(f'Updating {host}:{path}') | |
1052 | self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid) | |
1053 | self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid) | |
1054 | updated_files = True | |
1055 | for path in old_files.keys(): | |
1056 | self.log.info(f'Removing {host}:{path}') | |
1057 | cmd = ['rm', '-f', path] | |
1058 | self.mgr.ssh.check_execute_command(host, cmd) | |
1059 | updated_files = True | |
1060 | self.mgr.cache.removed_client_file(host, path) | |
1061 | if updated_files: | |
1062 | self.mgr.cache.save_host(host) | |
1063 | ||
1064 | async def _create_daemon(self, | |
1065 | daemon_spec: CephadmDaemonDeploySpec, | |
1066 | reconfig: bool = False, | |
1067 | osd_uuid_map: Optional[Dict[str, Any]] = None, | |
1068 | ) -> str: | |
f67539c2 TL |
1069 | |
1070 | with set_exception_subject('service', orchestrator.DaemonDescription( | |
1071 | daemon_type=daemon_spec.daemon_type, | |
1072 | daemon_id=daemon_spec.daemon_id, | |
1073 | hostname=daemon_spec.host, | |
1074 | ).service_id(), overwrite=True): | |
1075 | ||
1076 | try: | |
1077 | image = '' | |
1078 | start_time = datetime_now() | |
1079 | ports: List[int] = daemon_spec.ports if daemon_spec.ports else [] | |
1080 | ||
1081 | if daemon_spec.daemon_type == 'container': | |
1082 | spec = cast(CustomContainerSpec, | |
1083 | self.mgr.spec_store[daemon_spec.service_name].spec) | |
1084 | image = spec.image | |
1085 | if spec.ports: | |
1086 | ports.extend(spec.ports) | |
1087 | ||
f67539c2 TL |
1088 | # TCP port to open in the host firewall |
1089 | if len(ports) > 0: | |
1090 | daemon_spec.extra_args.extend([ | |
1091 | '--tcp-ports', ' '.join(map(str, ports)) | |
1092 | ]) | |
1093 | ||
1094 | # osd deployments needs an --osd-uuid arg | |
1095 | if daemon_spec.daemon_type == 'osd': | |
1096 | if not osd_uuid_map: | |
1097 | osd_uuid_map = self.mgr.get_osd_uuid_map() | |
1098 | osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id) | |
1099 | if not osd_uuid: | |
1100 | raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id) | |
1101 | daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid]) | |
1102 | ||
1103 | if reconfig: | |
1104 | daemon_spec.extra_args.append('--reconfig') | |
1105 | if self.mgr.allow_ptrace: | |
1106 | daemon_spec.extra_args.append('--allow-ptrace') | |
1107 | ||
20effc67 TL |
1108 | try: |
1109 | eca = daemon_spec.extra_container_args | |
1110 | if eca: | |
1111 | for a in eca: | |
1112 | daemon_spec.extra_args.append(f'--extra-container-args={a}') | |
1113 | except AttributeError: | |
1114 | eca = None | |
1115 | ||
f67539c2 | 1116 | if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url: |
20effc67 | 1117 | await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials')))) |
f67539c2 TL |
1118 | |
1119 | self.log.info('%s daemon %s on %s' % ( | |
1120 | 'Reconfiguring' if reconfig else 'Deploying', | |
1121 | daemon_spec.name(), daemon_spec.host)) | |
1122 | ||
20effc67 | 1123 | out, err, code = await self._run_cephadm( |
f67539c2 TL |
1124 | daemon_spec.host, daemon_spec.name(), 'deploy', |
1125 | [ | |
1126 | '--name', daemon_spec.name(), | |
1127 | '--meta-json', json.dumps({ | |
1128 | 'service_name': daemon_spec.service_name, | |
1129 | 'ports': daemon_spec.ports, | |
1130 | 'ip': daemon_spec.ip, | |
1131 | 'deployed_by': self.mgr.get_active_mgr_digests(), | |
b3b6e05e TL |
1132 | 'rank': daemon_spec.rank, |
1133 | 'rank_generation': daemon_spec.rank_generation, | |
20effc67 | 1134 | 'extra_container_args': eca |
f67539c2 TL |
1135 | }), |
1136 | '--config-json', '-', | |
1137 | ] + daemon_spec.extra_args, | |
1138 | stdin=json.dumps(daemon_spec.final_config), | |
20effc67 TL |
1139 | image=image, |
1140 | ) | |
1141 | ||
1142 | if daemon_spec.daemon_type == 'agent': | |
1143 | self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now() | |
1144 | self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1 | |
f67539c2 TL |
1145 | |
1146 | # refresh daemon state? (ceph daemon reconfig does not need it) | |
1147 | if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES: | |
1148 | if not code and daemon_spec.host in self.mgr.cache.daemons: | |
1149 | # prime cached service state with what we (should have) | |
1150 | # just created | |
1151 | sd = daemon_spec.to_daemon_description( | |
20effc67 | 1152 | DaemonDescriptionStatus.starting, 'starting') |
f67539c2 | 1153 | self.mgr.cache.add_daemon(daemon_spec.host, sd) |
522d829b | 1154 | if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS: |
a4b75251 | 1155 | self.mgr.requires_post_actions.add(daemon_spec.name()) |
f67539c2 TL |
1156 | self.mgr.cache.invalidate_host_daemons(daemon_spec.host) |
1157 | ||
20effc67 TL |
1158 | if daemon_spec.daemon_type != 'agent': |
1159 | self.mgr.cache.update_daemon_config_deps( | |
1160 | daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time) | |
1161 | self.mgr.cache.save_host(daemon_spec.host) | |
1162 | else: | |
1163 | self.mgr.agent_cache.update_agent_config_deps( | |
1164 | daemon_spec.host, daemon_spec.deps, start_time) | |
1165 | self.mgr.agent_cache.save_agent(daemon_spec.host) | |
f67539c2 TL |
1166 | msg = "{} {} on host '{}'".format( |
1167 | 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host) | |
1168 | if not code: | |
1169 | self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg) | |
1170 | else: | |
1171 | what = 'reconfigure' if reconfig else 'deploy' | |
1172 | self.mgr.events.for_daemon( | |
1173 | daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}') | |
1174 | return msg | |
1175 | except OrchestratorError: | |
1176 | redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names() | |
1177 | if not reconfig and not redeploy: | |
1178 | # we have to clean up the daemon. E.g. keyrings. | |
1179 | servict_type = daemon_type_to_service(daemon_spec.daemon_type) | |
1180 | dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed') | |
a4b75251 | 1181 | self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True) |
f67539c2 TL |
1182 | raise |
1183 | ||
20effc67 | 1184 | def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str: |
f67539c2 TL |
1185 | """ |
1186 | Remove a daemon | |
1187 | """ | |
1188 | (daemon_type, daemon_id) = name.split('.', 1) | |
1189 | daemon = orchestrator.DaemonDescription( | |
1190 | daemon_type=daemon_type, | |
1191 | daemon_id=daemon_id, | |
1192 | hostname=host) | |
1193 | ||
1194 | with set_exception_subject('service', daemon.service_id(), overwrite=True): | |
1195 | ||
1196 | self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon) | |
1197 | ||
1198 | # NOTE: we are passing the 'force' flag here, which means | |
1199 | # we can delete a mon instances data. | |
1200 | args = ['--name', name, '--force'] | |
1201 | self.log.info('Removing daemon %s from %s' % (name, host)) | |
20effc67 TL |
1202 | out, err, code = self.mgr.wait_async(self._run_cephadm( |
1203 | host, name, 'rm-daemon', args)) | |
f67539c2 TL |
1204 | if not code: |
1205 | # remove item from cache | |
1206 | self.mgr.cache.rm_daemon(host, name) | |
1207 | self.mgr.cache.invalidate_host_daemons(host) | |
1208 | ||
20effc67 TL |
1209 | if not no_post_remove: |
1210 | self.mgr.cephadm_services[daemon_type_to_service( | |
1211 | daemon_type)].post_remove(daemon, is_failed_deploy=False) | |
f67539c2 TL |
1212 | |
1213 | return "Removed {} from host '{}'".format(name, host) | |
adb31ebb | 1214 | |
20effc67 TL |
1215 | async def _run_cephadm_json(self, |
1216 | host: str, | |
1217 | entity: Union[CephadmNoImage, str], | |
1218 | command: str, | |
1219 | args: List[str], | |
1220 | no_fsid: Optional[bool] = False, | |
1221 | image: Optional[str] = "", | |
1222 | ) -> Any: | |
adb31ebb | 1223 | try: |
20effc67 | 1224 | out, err, code = await self._run_cephadm( |
adb31ebb TL |
1225 | host, entity, command, args, no_fsid=no_fsid, image=image) |
1226 | if code: | |
1227 | raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}') | |
1228 | except Exception as e: | |
1229 | raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}') | |
1230 | try: | |
1231 | return json.loads(''.join(out)) | |
1232 | except (ValueError, KeyError): | |
1233 | msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON' | |
1234 | self.log.exception(f'{msg}: {"".join(out)}') | |
1235 | raise OrchestratorError(msg) | |
1236 | ||
20effc67 TL |
1237 | async def _run_cephadm(self, |
1238 | host: str, | |
1239 | entity: Union[CephadmNoImage, str], | |
1240 | command: str, | |
1241 | args: List[str], | |
1242 | addr: Optional[str] = "", | |
1243 | stdin: Optional[str] = "", | |
1244 | no_fsid: Optional[bool] = False, | |
1245 | error_ok: Optional[bool] = False, | |
1246 | image: Optional[str] = "", | |
1247 | env_vars: Optional[List[str]] = None, | |
1248 | ) -> Tuple[List[str], List[str], int]: | |
f67539c2 TL |
1249 | """ |
1250 | Run cephadm on the remote host with the given command + args | |
1251 | ||
1252 | Important: You probably don't want to run _run_cephadm from CLI handlers | |
1253 | ||
1254 | :env_vars: in format -> [KEY=VALUE, ..] | |
1255 | """ | |
20effc67 TL |
1256 | |
1257 | await self.mgr.ssh._remote_connection(host, addr) | |
1258 | ||
f67539c2 TL |
1259 | self.log.debug(f"_run_cephadm : command = {command}") |
1260 | self.log.debug(f"_run_cephadm : args = {args}") | |
1261 | ||
20effc67 | 1262 | bypass_image = ('agent') |
f67539c2 | 1263 | |
20effc67 TL |
1264 | assert image or entity |
1265 | # Skip the image check for daemons deployed that are not ceph containers | |
1266 | if not str(entity).startswith(bypass_image): | |
1267 | if not image and entity is not cephadmNoImage: | |
1268 | image = self.mgr._get_container_image(entity) | |
f67539c2 | 1269 | |
20effc67 | 1270 | final_args = [] |
f67539c2 | 1271 | |
20effc67 TL |
1272 | # global args |
1273 | if env_vars: | |
1274 | for env_var_pair in env_vars: | |
1275 | final_args.extend(['--env', env_var_pair]) | |
f67539c2 | 1276 | |
20effc67 TL |
1277 | if image: |
1278 | final_args.extend(['--image', image]) | |
f67539c2 | 1279 | |
20effc67 TL |
1280 | if not self.mgr.container_init: |
1281 | final_args += ['--no-container-init'] | |
f67539c2 | 1282 | |
20effc67 TL |
1283 | # subcommand |
1284 | final_args.append(command) | |
f67539c2 | 1285 | |
20effc67 TL |
1286 | # subcommand args |
1287 | if not no_fsid: | |
1288 | final_args += ['--fsid', self.mgr._cluster_fsid] | |
f67539c2 | 1289 | |
20effc67 | 1290 | final_args += args |
f67539c2 | 1291 | |
20effc67 TL |
1292 | # exec |
1293 | self.log.debug('args: %s' % (' '.join(final_args))) | |
1294 | if self.mgr.mode == 'root': | |
1295 | # agent has cephadm binary as an extra file which is | |
1296 | # therefore passed over stdin. Even for debug logs it's too much | |
1297 | if stdin and 'agent' not in str(entity): | |
1298 | self.log.debug('stdin: %s' % stdin) | |
f67539c2 | 1299 | |
20effc67 TL |
1300 | cmd = ['which', 'python3'] |
1301 | python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr) | |
1302 | cmd = [python, self.mgr.cephadm_binary_path] + final_args | |
f67539c2 | 1303 | |
20effc67 TL |
1304 | try: |
1305 | out, err, code = await self.mgr.ssh._execute_command( | |
1306 | host, cmd, stdin=stdin, addr=addr) | |
1307 | if code == 2: | |
1308 | ls_cmd = ['ls', self.mgr.cephadm_binary_path] | |
1309 | out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr) | |
1310 | if code_ls == 2: | |
1311 | await self._deploy_cephadm_binary(host, addr) | |
1312 | out, err, code = await self.mgr.ssh._execute_command( | |
1313 | host, cmd, stdin=stdin, addr=addr) | |
1314 | # if there is an agent on this host, make sure it is using the most recent | |
1315 | # vesion of cephadm binary | |
1316 | if host in self.mgr.inventory: | |
1317 | for agent in self.mgr.cache.get_daemons_by_type('agent', host): | |
1318 | self.mgr._schedule_daemon_action(agent.name(), 'redeploy') | |
1319 | ||
1320 | except Exception as e: | |
1321 | await self.mgr.ssh._reset_con(host) | |
1322 | if error_ok: | |
1323 | return [], [str(e)], 1 | |
1324 | raise | |
1325 | ||
1326 | elif self.mgr.mode == 'cephadm-package': | |
1327 | try: | |
1328 | cmd = ['/usr/bin/cephadm'] + final_args | |
1329 | out, err, code = await self.mgr.ssh._execute_command( | |
1330 | host, cmd, stdin=stdin, addr=addr) | |
1331 | except Exception as e: | |
1332 | await self.mgr.ssh._reset_con(host) | |
1333 | if error_ok: | |
1334 | return [], [str(e)], 1 | |
1335 | raise | |
1336 | else: | |
1337 | assert False, 'unsupported mode' | |
1338 | ||
1339 | self.log.debug(f'code: {code}') | |
1340 | if out: | |
1341 | self.log.debug(f'out: {out}') | |
1342 | if err: | |
1343 | self.log.debug(f'err: {err}') | |
1344 | if code and not error_ok: | |
1345 | raise OrchestratorError( | |
1346 | f'cephadm exited with an error code: {code}, stderr: {err}') | |
1347 | return [out], [err], code | |
1348 | ||
1349 | async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo: | |
f67539c2 TL |
1350 | # pick a random host... |
1351 | host = None | |
1352 | for host_name in self.mgr.inventory.keys(): | |
1353 | host = host_name | |
1354 | break | |
1355 | if not host: | |
1356 | raise OrchestratorError('no hosts defined') | |
1357 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url: | |
20effc67 | 1358 | await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials')))) |
f67539c2 | 1359 | |
a4b75251 TL |
1360 | pullargs: List[str] = [] |
1361 | if self.mgr.registry_insecure: | |
1362 | pullargs.append("--insecure") | |
1363 | ||
20effc67 | 1364 | j = await self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True) |
f67539c2 TL |
1365 | |
1366 | r = ContainerInspectInfo( | |
1367 | j['image_id'], | |
1368 | j.get('ceph_version'), | |
1369 | j.get('repo_digests') | |
1370 | ) | |
1371 | self.log.debug(f'image {image_name} -> {r}') | |
1372 | return r | |
1373 | ||
1374 | # function responsible for logging single host into custom registry | |
20effc67 TL |
1375 | async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]: |
1376 | self.log.debug( | |
1377 | f"Attempting to log host {host} into custom registry @ {registry_json['url']}") | |
f67539c2 | 1378 | # want to pass info over stdin rather than through normal list of args |
20effc67 | 1379 | out, err, code = await self._run_cephadm( |
f67539c2 | 1380 | host, 'mon', 'registry-login', |
20effc67 | 1381 | ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True) |
f67539c2 | 1382 | if code: |
20effc67 | 1383 | return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password" |
f67539c2 TL |
1384 | return None |
1385 | ||
20effc67 | 1386 | async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None: |
f67539c2 TL |
1387 | # Use tee (from coreutils) to create a copy of cephadm on the target machine |
1388 | self.log.info(f"Deploying cephadm binary to {host}") | |
20effc67 TL |
1389 | await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path, |
1390 | self.mgr._cephadm.encode('utf-8'), addr=addr) |