]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/serve.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
1 import hashlib
2 import json
3 import logging
4 import uuid
5 from collections import defaultdict
6 from contextlib import contextmanager
7 from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator
8
9 from cephadm import remotes
10
11 try:
12 import remoto
13 import execnet.gateway_bootstrap
14 except ImportError:
15 remoto = None
16
17 from ceph.deployment import inventory
18 from ceph.deployment.drive_group import DriveGroupSpec
19 from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
20 from ceph.utils import str_to_datetime, datetime_now
21
22 import orchestrator
23 from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
24 DaemonDescriptionStatus, daemon_type_to_service
25 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
26 from cephadm.schedule import HostAssignment
27 from cephadm.autotune import MemoryAutotuner
28 from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
29 CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
30 from mgr_module import MonCommandFailed
31 from mgr_util import format_bytes
32
33 from . import utils
34
35 if TYPE_CHECKING:
36 from cephadm.module import CephadmOrchestrator
37 from remoto.backends import BaseConnection
38
39 logger = logging.getLogger(__name__)
40
41
42 class CephadmServe:
43 """
44 This module contains functions that are executed in the
45 serve() thread. Thus they don't block the CLI.
46
47 Please see the `Note regarding network calls from CLI handlers`
48 chapter in the cephadm developer guide.
49
50 On the other hand, These function should *not* be called form
51 CLI handlers, to avoid blocking the CLI
52 """
53
54 def __init__(self, mgr: "CephadmOrchestrator"):
55 self.mgr: "CephadmOrchestrator" = mgr
56 self.log = logger
57
58 def serve(self) -> None:
59 """
60 The main loop of cephadm.
61
62 A command handler will typically change the declarative state
63 of cephadm. This loop will then attempt to apply this new state.
64 """
65 self.log.debug("serve starting")
66 self.mgr.config_checker.load_network_config()
67
68 while self.mgr.run:
69
70 try:
71
72 self.convert_tags_to_repo_digest()
73
74 # refresh daemons
75 self.log.debug('refreshing hosts and daemons')
76 self._refresh_hosts_and_daemons()
77
78 self._check_for_strays()
79
80 self._update_paused_health()
81
82 if not self.mgr.paused:
83 self.mgr.to_remove_osds.process_removal_queue()
84
85 self.mgr.migration.migrate()
86 if self.mgr.migration.is_migration_ongoing():
87 continue
88
89 if self._apply_all_services():
90 continue # did something, refresh
91
92 self._check_daemons()
93
94 self._purge_deleted_services()
95
96 if self.mgr.upgrade.continue_upgrade():
97 continue
98
99 except OrchestratorError as e:
100 if e.event_subject:
101 self.mgr.events.from_orch_error(e)
102
103 self._serve_sleep()
104 self.log.debug("serve exit")
105
106 def _serve_sleep(self) -> None:
107 sleep_interval = max(
108 30,
109 min(
110 self.mgr.host_check_interval,
111 self.mgr.facts_cache_timeout,
112 self.mgr.daemon_cache_timeout,
113 self.mgr.device_cache_timeout,
114 )
115 )
116 self.log.debug('Sleeping for %d seconds', sleep_interval)
117 self.mgr.event.wait(sleep_interval)
118 self.mgr.event.clear()
119
120 def _update_paused_health(self) -> None:
121 if self.mgr.paused:
122 self.mgr.health_checks['CEPHADM_PAUSED'] = {
123 'severity': 'warning',
124 'summary': 'cephadm background work is paused',
125 'count': 1,
126 'detail': ["'ceph orch resume' to resume"],
127 }
128 self.mgr.set_health_checks(self.mgr.health_checks)
129 else:
130 if 'CEPHADM_PAUSED' in self.mgr.health_checks:
131 del self.mgr.health_checks['CEPHADM_PAUSED']
132 self.mgr.set_health_checks(self.mgr.health_checks)
133
134 def _autotune_host_memory(self, host: str) -> None:
135 total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0)
136 if not total_mem:
137 val = None
138 else:
139 total_mem *= 1024 # kb -> bytes
140 total_mem *= self.mgr.autotune_memory_target_ratio
141 a = MemoryAutotuner(
142 daemons=self.mgr.cache.get_daemons_by_host(host),
143 config_get=self.mgr.get_foreign_ceph_option,
144 total_mem=total_mem,
145 )
146 val, osds = a.tune()
147 any_changed = False
148 for o in osds:
149 if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val:
150 self.mgr.check_mon_command({
151 'prefix': 'config rm',
152 'who': o,
153 'name': 'osd_memory_target',
154 })
155 any_changed = True
156 if val is not None:
157 if any_changed:
158 self.mgr.log.info(
159 f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
160 )
161 ret, out, err = self.mgr.mon_command({
162 'prefix': 'config set',
163 'who': f'osd/host:{host}',
164 'name': 'osd_memory_target',
165 'value': str(val),
166 })
167 if ret:
168 self.log.warning(
169 f'Unable to set osd_memory_target on {host} to {val}: {err}'
170 )
171 else:
172 self.mgr.check_mon_command({
173 'prefix': 'config rm',
174 'who': f'osd/host:{host}',
175 'name': 'osd_memory_target',
176 })
177 self.mgr.cache.update_autotune(host)
178
179 def _refresh_hosts_and_daemons(self) -> None:
180 bad_hosts = []
181 failures = []
182
183 # host -> path -> (mode, uid, gid, content, digest)
184 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
185
186 # ceph.conf
187 if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
188 config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
189 config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
190
191 if self.mgr.manage_etc_ceph_ceph_conf:
192 try:
193 pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
194 ha = HostAssignment(
195 spec=ServiceSpec('mon', placement=pspec),
196 hosts=self.mgr._schedulable_hosts(),
197 daemons=[],
198 networks=self.mgr.cache.networks,
199 )
200 all_slots, _, _ = ha.place()
201 for host in {s.hostname for s in all_slots}:
202 if host not in client_files:
203 client_files[host] = {}
204 client_files[host]['/etc/ceph/ceph.conf'] = (
205 0o644, 0, 0, bytes(config), str(config_digest)
206 )
207 except Exception as e:
208 self.mgr.log.warning(
209 f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
210
211 # client keyrings
212 for ks in self.mgr.keys.keys.values():
213 assert config
214 assert config_digest
215 try:
216 ret, keyring, err = self.mgr.mon_command({
217 'prefix': 'auth get',
218 'entity': ks.entity,
219 })
220 if ret:
221 self.log.warning(f'unable to fetch keyring for {ks.entity}')
222 continue
223 digest = ''.join('%02x' % c for c in hashlib.sha256(
224 keyring.encode('utf-8')).digest())
225 ha = HostAssignment(
226 spec=ServiceSpec('mon', placement=ks.placement),
227 hosts=self.mgr._schedulable_hosts(),
228 daemons=[],
229 networks=self.mgr.cache.networks,
230 )
231 all_slots, _, _ = ha.place()
232 for host in {s.hostname for s in all_slots}:
233 if host not in client_files:
234 client_files[host] = {}
235 client_files[host]['/etc/ceph/ceph.conf'] = (
236 0o644, 0, 0, bytes(config), str(config_digest)
237 )
238 client_files[host][ks.path] = (
239 ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest
240 )
241 except Exception as e:
242 self.log.warning(
243 f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
244
245 @forall_hosts
246 def refresh(host: str) -> None:
247
248 # skip hosts that are in maintenance - they could be powered off
249 if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
250 return
251
252 if self.mgr.cache.host_needs_check(host):
253 r = self._check_host(host)
254 if r is not None:
255 bad_hosts.append(r)
256 if self.mgr.cache.host_needs_daemon_refresh(host):
257 self.log.debug('refreshing %s daemons' % host)
258 r = self._refresh_host_daemons(host)
259 if r:
260 failures.append(r)
261
262 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
263 self.log.debug(f"Logging `{host}` into custom registry")
264 r = self._registry_login(host, self.mgr.registry_url,
265 self.mgr.registry_username, self.mgr.registry_password)
266 if r:
267 bad_hosts.append(r)
268
269 if self.mgr.cache.host_needs_device_refresh(host):
270 self.log.debug('refreshing %s devices' % host)
271 r = self._refresh_host_devices(host)
272 if r:
273 failures.append(r)
274
275 if self.mgr.cache.host_needs_facts_refresh(host):
276 self.log.debug(('Refreshing %s facts' % host))
277 r = self._refresh_facts(host)
278 if r:
279 failures.append(r)
280
281 if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
282 self.log.debug(f"refreshing OSDSpec previews for {host}")
283 r = self._refresh_host_osdspec_previews(host)
284 if r:
285 failures.append(r)
286
287 if (
288 self.mgr.cache.host_needs_autotune_memory(host)
289 and not self.mgr.inventory.has_label(host, '_no_autotune_memory')
290 ):
291 self.log.debug(f"autotuning memory for {host}")
292 self._autotune_host_memory(host)
293
294 # client files
295 updated_files = False
296 old_files = self.mgr.cache.get_host_client_files(host).copy()
297 for path, m in client_files.get(host, {}).items():
298 mode, uid, gid, content, digest = m
299 if path in old_files:
300 match = old_files[path] == (digest, mode, uid, gid)
301 del old_files[path]
302 if match:
303 continue
304 self.log.info(f'Updating {host}:{path}')
305 self._write_remote_file(host, path, content, mode, uid, gid)
306 self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
307 updated_files = True
308 for path in old_files.keys():
309 self.log.info(f'Removing {host}:{path}')
310 with self._remote_connection(host) as tpl:
311 conn, connr = tpl
312 out, err, code = remoto.process.check(
313 conn,
314 ['rm', '-f', path])
315 updated_files = True
316 self.mgr.cache.removed_client_file(host, path)
317 if updated_files:
318 self.mgr.cache.save_host(host)
319
320 refresh(self.mgr.cache.get_hosts())
321
322 self.mgr.config_checker.run_checks()
323
324 health_changed = False
325 for k in [
326 'CEPHADM_HOST_CHECK_FAILED',
327 'CEPHADM_FAILED_DAEMON',
328 'CEPHADM_REFRESH_FAILED',
329 ]:
330 if k in self.mgr.health_checks:
331 del self.mgr.health_checks[k]
332 health_changed = True
333 if bad_hosts:
334 self.mgr.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
335 'severity': 'warning',
336 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
337 'count': len(bad_hosts),
338 'detail': bad_hosts,
339 }
340 health_changed = True
341 if failures:
342 self.mgr.health_checks['CEPHADM_REFRESH_FAILED'] = {
343 'severity': 'warning',
344 'summary': 'failed to probe daemons or devices',
345 'count': len(failures),
346 'detail': failures,
347 }
348 health_changed = True
349 failed_daemons = []
350 for dd in self.mgr.cache.get_daemons():
351 if dd.status is not None and dd.status == DaemonDescriptionStatus.error:
352 failed_daemons.append('daemon %s on %s is in %s state' % (
353 dd.name(), dd.hostname, dd.status_desc
354 ))
355 if failed_daemons:
356 self.mgr.health_checks['CEPHADM_FAILED_DAEMON'] = {
357 'severity': 'warning',
358 'summary': '%d failed cephadm daemon(s)' % len(failed_daemons),
359 'count': len(failed_daemons),
360 'detail': failed_daemons,
361 }
362 health_changed = True
363 if health_changed:
364 self.mgr.set_health_checks(self.mgr.health_checks)
365
366 def _check_host(self, host: str) -> Optional[str]:
367 if host not in self.mgr.inventory:
368 return None
369 self.log.debug(' checking %s' % host)
370 try:
371 out, err, code = self._run_cephadm(
372 host, cephadmNoImage, 'check-host', [],
373 error_ok=True, no_fsid=True)
374 self.mgr.cache.update_last_host_check(host)
375 self.mgr.cache.save_host(host)
376 if code:
377 self.log.debug(' host %s failed check' % host)
378 if self.mgr.warn_on_failed_host_check:
379 return 'host %s failed check: %s' % (host, err)
380 else:
381 self.log.debug(' host %s ok' % host)
382 except Exception as e:
383 self.log.debug(' host %s failed check' % host)
384 return 'host %s failed check: %s' % (host, e)
385 return None
386
387 def _refresh_host_daemons(self, host: str) -> Optional[str]:
388 try:
389 ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)
390 except OrchestratorError as e:
391 return str(e)
392 dm = {}
393 for d in ls:
394 if not d['style'].startswith('cephadm'):
395 continue
396 if d['fsid'] != self.mgr._cluster_fsid:
397 continue
398 if '.' not in d['name']:
399 continue
400 sd = orchestrator.DaemonDescription()
401 sd.last_refresh = datetime_now()
402 for k in ['created', 'started', 'last_configured', 'last_deployed']:
403 v = d.get(k, None)
404 if v:
405 setattr(sd, k, str_to_datetime(d[k]))
406 sd.daemon_type = d['name'].split('.')[0]
407 sd.daemon_id = '.'.join(d['name'].split('.')[1:])
408 sd.hostname = host
409 sd.container_id = d.get('container_id')
410 if sd.container_id:
411 # shorten the hash
412 sd.container_id = sd.container_id[0:12]
413 sd.container_image_name = d.get('container_image_name')
414 sd.container_image_id = d.get('container_image_id')
415 sd.container_image_digests = d.get('container_image_digests')
416 sd.memory_usage = d.get('memory_usage')
417 sd.memory_request = d.get('memory_request')
418 sd.memory_limit = d.get('memory_limit')
419 sd._service_name = d.get('service_name')
420 sd.deployed_by = d.get('deployed_by')
421 sd.version = d.get('version')
422 sd.ports = d.get('ports')
423 sd.ip = d.get('ip')
424 sd.rank = int(d['rank']) if d.get('rank') is not None else None
425 sd.rank_generation = int(d['rank_generation']) if d.get('rank_generation') is not None else None
426 if sd.daemon_type == 'osd':
427 sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id)
428 if 'state' in d:
429 sd.status_desc = d['state']
430 sd.status = {
431 'running': DaemonDescriptionStatus.running,
432 'stopped': DaemonDescriptionStatus.stopped,
433 'error': DaemonDescriptionStatus.error,
434 'unknown': DaemonDescriptionStatus.error,
435 }[d['state']]
436 else:
437 sd.status_desc = 'unknown'
438 sd.status = None
439 dm[sd.name()] = sd
440 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
441 self.mgr.cache.update_host_daemons(host, dm)
442 self.mgr.cache.save_host(host)
443 return None
444
445 def _refresh_facts(self, host: str) -> Optional[str]:
446 try:
447 val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True)
448 except OrchestratorError as e:
449 return str(e)
450
451 self.mgr.cache.update_host_facts(host, val)
452
453 return None
454
455 def _refresh_host_devices(self, host: str) -> Optional[str]:
456
457 with_lsm = self.mgr.get_module_option('device_enhanced_scan')
458 inventory_args = ['--', 'inventory',
459 '--format=json',
460 '--filter-for-batch']
461 if with_lsm:
462 inventory_args.insert(-1, "--with-lsm")
463
464 try:
465 try:
466 devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
467 inventory_args)
468 except OrchestratorError as e:
469 if 'unrecognized arguments: --filter-for-batch' in str(e):
470 rerun_args = inventory_args.copy()
471 rerun_args.remove('--filter-for-batch')
472 devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
473 rerun_args)
474 else:
475 raise
476
477 networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
478 except OrchestratorError as e:
479 return str(e)
480
481 self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
482 host, len(devices), len(networks)))
483 ret = inventory.Devices.from_json(devices)
484 self.mgr.cache.update_host_devices_networks(host, ret.devices, networks)
485 self.update_osdspec_previews(host)
486 self.mgr.cache.save_host(host)
487 return None
488
489 def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
490 self.update_osdspec_previews(host)
491 self.mgr.cache.save_host(host)
492 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
493 return None
494
495 def update_osdspec_previews(self, search_host: str = '') -> None:
496 # Set global 'pending' flag for host
497 self.mgr.cache.loading_osdspec_preview.add(search_host)
498 previews = []
499 # query OSDSpecs for host <search host> and generate/get the preview
500 # There can be multiple previews for one host due to multiple OSDSpecs.
501 previews.extend(self.mgr.osd_service.get_previews(search_host))
502 self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>')
503 self.mgr.cache.osdspec_previews[search_host] = previews
504 # Unset global 'pending' flag for host
505 self.mgr.cache.loading_osdspec_preview.remove(search_host)
506
507 def _check_for_strays(self) -> None:
508 self.log.debug('_check_for_strays')
509 for k in ['CEPHADM_STRAY_HOST',
510 'CEPHADM_STRAY_DAEMON']:
511 if k in self.mgr.health_checks:
512 del self.mgr.health_checks[k]
513 if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
514 ls = self.mgr.list_servers()
515 managed = self.mgr.cache.get_daemon_names()
516 host_detail = [] # type: List[str]
517 host_num_daemons = 0
518 daemon_detail = [] # type: List[str]
519 for item in ls:
520 host = item.get('hostname')
521 assert isinstance(host, str)
522 daemons = item.get('services') # misnomer!
523 assert isinstance(daemons, list)
524 missing_names = []
525 for s in daemons:
526 daemon_id = s.get('id')
527 assert daemon_id
528 name = '%s.%s' % (s.get('type'), daemon_id)
529 if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
530 metadata = self.mgr.get_metadata(
531 cast(str, s.get('type')), daemon_id, {})
532 assert metadata is not None
533 try:
534 if s.get('type') == 'rgw-nfs':
535 # https://tracker.ceph.com/issues/49573
536 name = metadata['id'][:-4]
537 else:
538 name = '%s.%s' % (s.get('type'), metadata['id'])
539 except (KeyError, TypeError):
540 self.log.debug(
541 "Failed to find daemon id for %s service %s" % (
542 s.get('type'), s.get('id')
543 )
544 )
545
546 if host not in self.mgr.inventory:
547 missing_names.append(name)
548 host_num_daemons += 1
549 if name not in managed:
550 daemon_detail.append(
551 'stray daemon %s on host %s not managed by cephadm' % (name, host))
552 if missing_names:
553 host_detail.append(
554 'stray host %s has %d stray daemons: %s' % (
555 host, len(missing_names), missing_names))
556 if self.mgr.warn_on_stray_hosts and host_detail:
557 self.mgr.health_checks['CEPHADM_STRAY_HOST'] = {
558 'severity': 'warning',
559 'summary': '%d stray host(s) with %s daemon(s) '
560 'not managed by cephadm' % (
561 len(host_detail), host_num_daemons),
562 'count': len(host_detail),
563 'detail': host_detail,
564 }
565 if self.mgr.warn_on_stray_daemons and daemon_detail:
566 self.mgr.health_checks['CEPHADM_STRAY_DAEMON'] = {
567 'severity': 'warning',
568 'summary': '%d stray daemon(s) not managed by cephadm' % (
569 len(daemon_detail)),
570 'count': len(daemon_detail),
571 'detail': daemon_detail,
572 }
573 self.mgr.set_health_checks(self.mgr.health_checks)
574
575 def _apply_all_services(self) -> bool:
576 r = False
577 specs = [] # type: List[ServiceSpec]
578 for sn, spec in self.mgr.spec_store.active_specs.items():
579 specs.append(spec)
580 for spec in specs:
581 try:
582 if self._apply_service(spec):
583 r = True
584 except Exception as e:
585 self.log.exception('Failed to apply %s spec %s: %s' % (
586 spec.service_name(), spec, e))
587 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
588
589 return r
590
591 def _apply_service_config(self, spec: ServiceSpec) -> None:
592 if spec.config:
593 section = utils.name_to_config_section(spec.service_name())
594 for k, v in spec.config.items():
595 try:
596 current = self.mgr.get_foreign_ceph_option(section, k)
597 except KeyError:
598 self.log.warning(
599 f'Ignoring invalid {spec.service_name()} config option {k}'
600 )
601 self.mgr.events.for_service(
602 spec, OrchestratorEvent.ERROR, f'Invalid config option {k}'
603 )
604 continue
605 if current != v:
606 self.log.debug(f'setting [{section}] {k} = {v}')
607 try:
608 self.mgr.check_mon_command({
609 'prefix': 'config set',
610 'name': k,
611 'value': str(v),
612 'who': section,
613 })
614 except MonCommandFailed as e:
615 self.log.warning(
616 f'Failed to set {spec.service_name()} option {k}: {e}'
617 )
618
619 def _apply_service(self, spec: ServiceSpec) -> bool:
620 """
621 Schedule a service. Deploy new daemons or remove old ones, depending
622 on the target label and count specified in the placement.
623 """
624 self.mgr.migration.verify_no_migration()
625
626 service_type = spec.service_type
627 service_name = spec.service_name()
628 if spec.unmanaged:
629 self.log.debug('Skipping unmanaged service %s' % service_name)
630 return False
631 if spec.preview_only:
632 self.log.debug('Skipping preview_only service %s' % service_name)
633 return False
634 self.log.debug('Applying service %s spec' % service_name)
635
636 self._apply_service_config(spec)
637
638 if service_type == 'osd':
639 self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
640 # TODO: return True would result in a busy loop
641 # can't know if daemon count changed; create_from_spec doesn't
642 # return a solid indication
643 return False
644
645 svc = self.mgr.cephadm_services[service_type]
646 daemons = self.mgr.cache.get_daemons_by_service(service_name)
647
648 public_networks: List[str] = []
649 if service_type == 'mon':
650 out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network'))
651 if '/' in out:
652 public_networks = [x.strip() for x in out.split(',')]
653 self.log.debug('mon public_network(s) is %s' % public_networks)
654
655 def matches_network(host):
656 # type: (str) -> bool
657 # make sure we have 1 or more IPs for any of those networks on that
658 # host
659 for network in public_networks:
660 if len(self.mgr.cache.networks[host].get(network, [])) > 0:
661 return True
662 self.log.info(
663 f"Filtered out host {host}: does not belong to mon public_network"
664 f" ({','.join(public_networks)})"
665 )
666 return False
667
668 rank_map = None
669 if svc.ranked():
670 rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
671 ha = HostAssignment(
672 spec=spec,
673 hosts=self.mgr._schedulable_hosts(),
674 daemons=daemons,
675 networks=self.mgr.cache.networks,
676 filter_new_host=(
677 matches_network if service_type == 'mon'
678 else None
679 ),
680 allow_colo=svc.allow_colo(),
681 primary_daemon_type=svc.primary_daemon_type(),
682 per_host_daemon_type=svc.per_host_daemon_type(),
683 rank_map=rank_map,
684 )
685
686 try:
687 all_slots, slots_to_add, daemons_to_remove = ha.place()
688 daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get(
689 'status', '').lower() not in ['maintenance', 'offline'])]
690 self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove))
691 except OrchestratorError as e:
692 self.log.error('Failed to apply %s spec %s: %s' % (
693 spec.service_name(), spec, e))
694 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
695 return False
696
697 r = None
698
699 # sanity check
700 final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove)
701 if service_type in ['mon', 'mgr'] and final_count < 1:
702 self.log.debug('cannot scale mon|mgr below 1)')
703 return False
704
705 # progress
706 progress_id = str(uuid.uuid4())
707 delta: List[str] = []
708 if slots_to_add:
709 delta += [f'+{len(slots_to_add)}']
710 if daemons_to_remove:
711 delta += [f'-{len(daemons_to_remove)}']
712 progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
713 progress_total = len(slots_to_add) + len(daemons_to_remove)
714 progress_done = 0
715
716 def update_progress() -> None:
717 self.mgr.remote(
718 'progress', 'update', progress_id,
719 ev_msg=progress_title,
720 ev_progress=(progress_done / progress_total),
721 add_to_ceph_s=True,
722 )
723
724 if progress_total:
725 update_progress()
726
727 # add any?
728 did_config = False
729
730 self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
731 self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
732
733 # assign names
734 for i in range(len(slots_to_add)):
735 slot = slots_to_add[i]
736 slot = slot.assign_name(self.mgr.get_unique_name(
737 slot.daemon_type,
738 slot.hostname,
739 daemons,
740 prefix=spec.service_id,
741 forcename=slot.name,
742 rank=slot.rank,
743 rank_generation=slot.rank_generation,
744 ))
745 slots_to_add[i] = slot
746 if rank_map is not None:
747 assert slot.rank is not None
748 assert slot.rank_generation is not None
749 assert rank_map[slot.rank][slot.rank_generation] is None
750 rank_map[slot.rank][slot.rank_generation] = slot.name
751
752 if rank_map:
753 # record the rank_map before we make changes so that if we fail the
754 # next mgr will clean up.
755 self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
756
757 # remove daemons now, since we are going to fence them anyway
758 for d in daemons_to_remove:
759 assert d.hostname is not None
760 self._remove_daemon(d.name(), d.hostname)
761 daemons_to_remove = []
762
763 # fence them
764 svc.fence_old_ranks(spec, rank_map, len(all_slots))
765
766 # create daemons
767 for slot in slots_to_add:
768 # first remove daemon on conflicting port?
769 if slot.ports:
770 for d in daemons_to_remove:
771 if d.hostname != slot.hostname:
772 continue
773 if not (set(d.ports or []) & set(slot.ports)):
774 continue
775 if d.ip and slot.ip and d.ip != slot.ip:
776 continue
777 self.log.info(
778 f'Removing {d.name()} before deploying to {slot} to avoid a port conflict'
779 )
780 # NOTE: we don't check ok-to-stop here to avoid starvation if
781 # there is only 1 gateway.
782 self._remove_daemon(d.name(), d.hostname)
783 daemons_to_remove.remove(d)
784 progress_done += 1
785 break
786
787 # deploy new daemon
788 daemon_id = slot.name
789 if not did_config:
790 svc.config(spec, daemon_id)
791 did_config = True
792
793 daemon_spec = svc.make_daemon_spec(
794 slot.hostname, daemon_id, slot.network, spec,
795 daemon_type=slot.daemon_type,
796 ports=slot.ports,
797 ip=slot.ip,
798 rank=slot.rank,
799 rank_generation=slot.rank_generation,
800 )
801 self.log.debug('Placing %s.%s on host %s' % (
802 slot.daemon_type, daemon_id, slot.hostname))
803
804 try:
805 daemon_spec = svc.prepare_create(daemon_spec)
806 self._create_daemon(daemon_spec)
807 r = True
808 progress_done += 1
809 update_progress()
810 except (RuntimeError, OrchestratorError) as e:
811 msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
812 f"on {slot.hostname}: {e}")
813 self.mgr.events.for_service(spec, 'ERROR', msg)
814 self.mgr.log.error(msg)
815 # only return "no change" if no one else has already succeeded.
816 # later successes will also change to True
817 if r is None:
818 r = False
819 progress_done += 1
820 update_progress()
821 continue
822
823 # add to daemon list so next name(s) will also be unique
824 sd = orchestrator.DaemonDescription(
825 hostname=slot.hostname,
826 daemon_type=slot.daemon_type,
827 daemon_id=daemon_id,
828 )
829 daemons.append(sd)
830
831 # remove any?
832 def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
833 daemon_ids = [d.daemon_id for d in remove_daemons]
834 assert None not in daemon_ids
835 # setting force flag retains previous behavior
836 r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True)
837 return not r.retval
838
839 while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
840 # let's find a subset that is ok-to-stop
841 daemons_to_remove.pop()
842 for d in daemons_to_remove:
843 r = True
844 assert d.hostname is not None
845 self._remove_daemon(d.name(), d.hostname)
846
847 progress_done += 1
848 update_progress()
849
850 self.mgr.remote('progress', 'complete', progress_id)
851
852 if r is None:
853 r = False
854 return r
855
856 def _check_daemons(self) -> None:
857
858 daemons = self.mgr.cache.get_daemons()
859 daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
860 for dd in daemons:
861 # orphan?
862 spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None)
863 assert dd.hostname is not None
864 assert dd.daemon_type is not None
865 assert dd.daemon_id is not None
866 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
867 # (mon and mgr specs should always exist; osds aren't matched
868 # to a service spec)
869 self.log.info('Removing orphan daemon %s...' % dd.name())
870 self._remove_daemon(dd.name(), dd.hostname)
871
872 # ignore unmanaged services
873 if spec and spec.unmanaged:
874 continue
875
876 # ignore daemons for deleted services
877 if dd.service_name() in self.mgr.spec_store.spec_deleted:
878 continue
879
880 # These daemon types require additional configs after creation
881 if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
882 daemons_post[dd.daemon_type].append(dd)
883
884 if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
885 self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
886 dd.is_active = True
887 else:
888 dd.is_active = False
889
890 deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id)
891 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
892 dd.hostname, dd.name())
893 if last_deps is None:
894 last_deps = []
895 action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
896 if not last_config:
897 self.log.info('Reconfiguring %s (unknown last config time)...' % (
898 dd.name()))
899 action = 'reconfig'
900 elif last_deps != deps:
901 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
902 deps))
903 self.log.info('Reconfiguring %s (dependencies changed)...' % (
904 dd.name()))
905 action = 'reconfig'
906 elif self.mgr.last_monmap and \
907 self.mgr.last_monmap > last_config and \
908 dd.daemon_type in CEPH_TYPES:
909 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
910 action = 'reconfig'
911 elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
912 dd.daemon_type in CEPH_TYPES:
913 self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
914 action = 'reconfig'
915 if action:
916 if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
917 and action == 'reconfig':
918 action = 'redeploy'
919 try:
920 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
921 self.mgr._daemon_action(daemon_spec, action=action)
922 self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name())
923 except OrchestratorError as e:
924 self.mgr.events.from_orch_error(e)
925 if dd.daemon_type in daemons_post:
926 del daemons_post[dd.daemon_type]
927 # continue...
928 except Exception as e:
929 self.mgr.events.for_daemon_from_exception(dd.name(), e)
930 if dd.daemon_type in daemons_post:
931 del daemons_post[dd.daemon_type]
932 # continue...
933
934 # do daemon post actions
935 for daemon_type, daemon_descs in daemons_post.items():
936 if daemon_type in self.mgr.requires_post_actions:
937 self.mgr.requires_post_actions.remove(daemon_type)
938 self.mgr._get_cephadm_service(daemon_type_to_service(
939 daemon_type)).daemon_check_post(daemon_descs)
940
941 def _purge_deleted_services(self) -> None:
942 existing_services = self.mgr.spec_store.all_specs.items()
943 for service_name, spec in list(existing_services):
944 if service_name not in self.mgr.spec_store.spec_deleted:
945 continue
946 if self.mgr.cache.get_daemons_by_service(service_name):
947 continue
948 if spec.service_type in ['mon', 'mgr']:
949 continue
950
951 logger.info(f'Purge service {service_name}')
952
953 self.mgr.cephadm_services[spec.service_type].purge(service_name)
954 self.mgr.spec_store.finally_rm(service_name)
955
956 def convert_tags_to_repo_digest(self) -> None:
957 if not self.mgr.use_repo_digest:
958 return
959 settings = self.mgr.upgrade.get_distinct_container_image_settings()
960 digests: Dict[str, ContainerInspectInfo] = {}
961 for container_image_ref in set(settings.values()):
962 if not is_repo_digest(container_image_ref):
963 image_info = self._get_container_image_info(container_image_ref)
964 if image_info.repo_digests:
965 # FIXME: we assume the first digest here is the best
966 assert is_repo_digest(image_info.repo_digests[0]), image_info
967 digests[container_image_ref] = image_info
968
969 for entity, container_image_ref in settings.items():
970 if not is_repo_digest(container_image_ref):
971 image_info = digests[container_image_ref]
972 if image_info.repo_digests:
973 # FIXME: we assume the first digest here is the best
974 self.mgr.set_container_image(entity, image_info.repo_digests[0])
975
976 def _create_daemon(self,
977 daemon_spec: CephadmDaemonDeploySpec,
978 reconfig: bool = False,
979 osd_uuid_map: Optional[Dict[str, Any]] = None,
980 ) -> str:
981
982 with set_exception_subject('service', orchestrator.DaemonDescription(
983 daemon_type=daemon_spec.daemon_type,
984 daemon_id=daemon_spec.daemon_id,
985 hostname=daemon_spec.host,
986 ).service_id(), overwrite=True):
987
988 try:
989 image = ''
990 start_time = datetime_now()
991 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
992
993 if daemon_spec.daemon_type == 'container':
994 spec = cast(CustomContainerSpec,
995 self.mgr.spec_store[daemon_spec.service_name].spec)
996 image = spec.image
997 if spec.ports:
998 ports.extend(spec.ports)
999
1000 if daemon_spec.daemon_type == 'cephadm-exporter':
1001 if not reconfig:
1002 assert daemon_spec.host
1003 self._deploy_cephadm_binary(daemon_spec.host)
1004
1005 # TCP port to open in the host firewall
1006 if len(ports) > 0:
1007 daemon_spec.extra_args.extend([
1008 '--tcp-ports', ' '.join(map(str, ports))
1009 ])
1010
1011 # osd deployments needs an --osd-uuid arg
1012 if daemon_spec.daemon_type == 'osd':
1013 if not osd_uuid_map:
1014 osd_uuid_map = self.mgr.get_osd_uuid_map()
1015 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1016 if not osd_uuid:
1017 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1018 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1019
1020 if reconfig:
1021 daemon_spec.extra_args.append('--reconfig')
1022 if self.mgr.allow_ptrace:
1023 daemon_spec.extra_args.append('--allow-ptrace')
1024
1025 if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
1026 self._registry_login(daemon_spec.host, self.mgr.registry_url,
1027 self.mgr.registry_username, self.mgr.registry_password)
1028
1029 self.log.info('%s daemon %s on %s' % (
1030 'Reconfiguring' if reconfig else 'Deploying',
1031 daemon_spec.name(), daemon_spec.host))
1032
1033 out, err, code = self._run_cephadm(
1034 daemon_spec.host, daemon_spec.name(), 'deploy',
1035 [
1036 '--name', daemon_spec.name(),
1037 '--meta-json', json.dumps({
1038 'service_name': daemon_spec.service_name,
1039 'ports': daemon_spec.ports,
1040 'ip': daemon_spec.ip,
1041 'deployed_by': self.mgr.get_active_mgr_digests(),
1042 'rank': daemon_spec.rank,
1043 'rank_generation': daemon_spec.rank_generation,
1044 }),
1045 '--config-json', '-',
1046 ] + daemon_spec.extra_args,
1047 stdin=json.dumps(daemon_spec.final_config),
1048 image=image)
1049
1050 # refresh daemon state? (ceph daemon reconfig does not need it)
1051 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
1052 if not code and daemon_spec.host in self.mgr.cache.daemons:
1053 # prime cached service state with what we (should have)
1054 # just created
1055 sd = daemon_spec.to_daemon_description(
1056 DaemonDescriptionStatus.running, 'starting')
1057 self.mgr.cache.add_daemon(daemon_spec.host, sd)
1058 if daemon_spec.daemon_type in [
1059 'grafana', 'iscsi', 'prometheus', 'alertmanager'
1060 ]:
1061 self.mgr.requires_post_actions.add(daemon_spec.daemon_type)
1062 self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
1063
1064 self.mgr.cache.update_daemon_config_deps(
1065 daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
1066 self.mgr.cache.save_host(daemon_spec.host)
1067 msg = "{} {} on host '{}'".format(
1068 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1069 if not code:
1070 self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1071 else:
1072 what = 'reconfigure' if reconfig else 'deploy'
1073 self.mgr.events.for_daemon(
1074 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1075 return msg
1076 except OrchestratorError:
1077 redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names()
1078 if not reconfig and not redeploy:
1079 # we have to clean up the daemon. E.g. keyrings.
1080 servict_type = daemon_type_to_service(daemon_spec.daemon_type)
1081 dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
1082 self.mgr.cephadm_services[servict_type].post_remove(dd)
1083 raise
1084
1085 def _remove_daemon(self, name: str, host: str) -> str:
1086 """
1087 Remove a daemon
1088 """
1089 (daemon_type, daemon_id) = name.split('.', 1)
1090 daemon = orchestrator.DaemonDescription(
1091 daemon_type=daemon_type,
1092 daemon_id=daemon_id,
1093 hostname=host)
1094
1095 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1096
1097 self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
1098
1099 # NOTE: we are passing the 'force' flag here, which means
1100 # we can delete a mon instances data.
1101 args = ['--name', name, '--force']
1102 self.log.info('Removing daemon %s from %s' % (name, host))
1103 out, err, code = self._run_cephadm(
1104 host, name, 'rm-daemon', args)
1105 if not code:
1106 # remove item from cache
1107 self.mgr.cache.rm_daemon(host, name)
1108 self.mgr.cache.invalidate_host_daemons(host)
1109
1110 self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].post_remove(daemon)
1111
1112 return "Removed {} from host '{}'".format(name, host)
1113
1114 def _run_cephadm_json(self,
1115 host: str,
1116 entity: Union[CephadmNoImage, str],
1117 command: str,
1118 args: List[str],
1119 no_fsid: Optional[bool] = False,
1120 image: Optional[str] = "",
1121 ) -> Any:
1122 try:
1123 out, err, code = self._run_cephadm(
1124 host, entity, command, args, no_fsid=no_fsid, image=image)
1125 if code:
1126 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
1127 except Exception as e:
1128 raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}')
1129 try:
1130 return json.loads(''.join(out))
1131 except (ValueError, KeyError):
1132 msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON'
1133 self.log.exception(f'{msg}: {"".join(out)}')
1134 raise OrchestratorError(msg)
1135
1136 def _run_cephadm(self,
1137 host: str,
1138 entity: Union[CephadmNoImage, str],
1139 command: str,
1140 args: List[str],
1141 addr: Optional[str] = "",
1142 stdin: Optional[str] = "",
1143 no_fsid: Optional[bool] = False,
1144 error_ok: Optional[bool] = False,
1145 image: Optional[str] = "",
1146 env_vars: Optional[List[str]] = None,
1147 ) -> Tuple[List[str], List[str], int]:
1148 """
1149 Run cephadm on the remote host with the given command + args
1150
1151 Important: You probably don't want to run _run_cephadm from CLI handlers
1152
1153 :env_vars: in format -> [KEY=VALUE, ..]
1154 """
1155 self.log.debug(f"_run_cephadm : command = {command}")
1156 self.log.debug(f"_run_cephadm : args = {args}")
1157
1158 bypass_image = ('cephadm-exporter',)
1159
1160 with self._remote_connection(host, addr) as tpl:
1161 conn, connr = tpl
1162 assert image or entity
1163 # Skip the image check for daemons deployed that are not ceph containers
1164 if not str(entity).startswith(bypass_image):
1165 if not image and entity is not cephadmNoImage:
1166 image = self.mgr._get_container_image(entity)
1167
1168 final_args = []
1169
1170 # global args
1171 if env_vars:
1172 for env_var_pair in env_vars:
1173 final_args.extend(['--env', env_var_pair])
1174
1175 if image:
1176 final_args.extend(['--image', image])
1177
1178 if not self.mgr.container_init:
1179 final_args += ['--no-container-init']
1180
1181 # subcommand
1182 final_args.append(command)
1183
1184 # subcommand args
1185 if not no_fsid:
1186 final_args += ['--fsid', self.mgr._cluster_fsid]
1187
1188 final_args += args
1189
1190 # exec
1191 self.log.debug('args: %s' % (' '.join(final_args)))
1192 if self.mgr.mode == 'root':
1193 if stdin:
1194 self.log.debug('stdin: %s' % stdin)
1195
1196 python = connr.choose_python()
1197 if not python:
1198 raise RuntimeError(
1199 'unable to find python on %s (tried %s in %s)' % (
1200 host, remotes.PYTHONS, remotes.PATH))
1201 try:
1202 out, err, code = remoto.process.check(
1203 conn,
1204 [python, self.mgr.cephadm_binary_path] + final_args,
1205 stdin=stdin.encode('utf-8') if stdin is not None else None)
1206 if code == 2:
1207 out_ls, err_ls, code_ls = remoto.process.check(
1208 conn, ['ls', self.mgr.cephadm_binary_path])
1209 if code_ls == 2:
1210 self._deploy_cephadm_binary_conn(conn, host)
1211 out, err, code = remoto.process.check(
1212 conn,
1213 [python, self.mgr.cephadm_binary_path] + final_args,
1214 stdin=stdin.encode('utf-8') if stdin is not None else None)
1215
1216 except RuntimeError as e:
1217 self.mgr._reset_con(host)
1218 if error_ok:
1219 return [], [str(e)], 1
1220 raise
1221
1222 elif self.mgr.mode == 'cephadm-package':
1223 try:
1224 out, err, code = remoto.process.check(
1225 conn,
1226 ['sudo', '/usr/bin/cephadm'] + final_args,
1227 stdin=stdin)
1228 except RuntimeError as e:
1229 self.mgr._reset_con(host)
1230 if error_ok:
1231 return [], [str(e)], 1
1232 raise
1233 else:
1234 assert False, 'unsupported mode'
1235
1236 self.log.debug('code: %d' % code)
1237 if out:
1238 self.log.debug('out: %s' % '\n'.join(out))
1239 if err:
1240 self.log.debug('err: %s' % '\n'.join(err))
1241 if code and not error_ok:
1242 raise OrchestratorError(
1243 'cephadm exited with an error code: %d, stderr:%s' % (
1244 code, '\n'.join(err)))
1245 return out, err, code
1246
1247 def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
1248 # pick a random host...
1249 host = None
1250 for host_name in self.mgr.inventory.keys():
1251 host = host_name
1252 break
1253 if not host:
1254 raise OrchestratorError('no hosts defined')
1255 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
1256 self._registry_login(host, self.mgr.registry_url,
1257 self.mgr.registry_username, self.mgr.registry_password)
1258
1259 j = self._run_cephadm_json(host, '', 'pull', [], image=image_name, no_fsid=True)
1260
1261 r = ContainerInspectInfo(
1262 j['image_id'],
1263 j.get('ceph_version'),
1264 j.get('repo_digests')
1265 )
1266 self.log.debug(f'image {image_name} -> {r}')
1267 return r
1268
1269 # function responsible for logging single host into custom registry
1270 def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
1271 self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
1272 # want to pass info over stdin rather than through normal list of args
1273 args_str = json.dumps({
1274 'url': url,
1275 'username': username,
1276 'password': password,
1277 })
1278 out, err, code = self._run_cephadm(
1279 host, 'mon', 'registry-login',
1280 ['--registry-json', '-'], stdin=args_str, error_ok=True)
1281 if code:
1282 return f"Host {host} failed to login to {url} as {username} with given password"
1283 return None
1284
1285 def _deploy_cephadm_binary(self, host: str) -> None:
1286 # Use tee (from coreutils) to create a copy of cephadm on the target machine
1287 self.log.info(f"Deploying cephadm binary to {host}")
1288 with self._remote_connection(host) as tpl:
1289 conn, _connr = tpl
1290 return self._deploy_cephadm_binary_conn(conn, host)
1291
1292 def _deploy_cephadm_binary_conn(self, conn: "BaseConnection", host: str) -> None:
1293 _out, _err, code = remoto.process.check(
1294 conn,
1295 ['mkdir', '-p', f'/var/lib/ceph/{self.mgr._cluster_fsid}'])
1296 if code:
1297 msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
1298 self.log.warning(msg)
1299 raise OrchestratorError(msg)
1300 _out, _err, code = remoto.process.check(
1301 conn,
1302 ['tee', '-', self.mgr.cephadm_binary_path],
1303 stdin=self.mgr._cephadm.encode('utf-8'))
1304 if code:
1305 msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
1306 self.log.warning(msg)
1307 raise OrchestratorError(msg)
1308
1309 def _write_remote_file(self,
1310 host: str,
1311 path: str,
1312 content: bytes,
1313 mode: int,
1314 uid: int,
1315 gid: int) -> None:
1316 with self._remote_connection(host) as tpl:
1317 conn, connr = tpl
1318 try:
1319 errmsg = connr.write_file(path, content, mode, uid, gid)
1320 if errmsg is not None:
1321 raise OrchestratorError(errmsg)
1322 except Exception as e:
1323 msg = f"Unable to write {host}:{path}: {e}"
1324 self.log.warning(msg)
1325 raise OrchestratorError(msg)
1326
1327 @contextmanager
1328 def _remote_connection(self,
1329 host: str,
1330 addr: Optional[str] = None,
1331 ) -> Iterator[Tuple["BaseConnection", Any]]:
1332 if not addr and host in self.mgr.inventory:
1333 addr = self.mgr.inventory.get_addr(host)
1334
1335 self.mgr.offline_hosts_remove(host)
1336
1337 try:
1338 try:
1339 if not addr:
1340 raise OrchestratorError("host address is empty")
1341 conn, connr = self.mgr._get_connection(addr)
1342 except OSError as e:
1343 self.mgr._reset_con(host)
1344 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1345 raise execnet.gateway_bootstrap.HostNotFound(msg)
1346
1347 yield (conn, connr)
1348
1349 except execnet.gateway_bootstrap.HostNotFound as e:
1350 # this is a misleading exception as it seems to be thrown for
1351 # any sort of connection failure, even those having nothing to
1352 # do with "host not found" (e.g., ssh key permission denied).
1353 self.mgr.offline_hosts.add(host)
1354 self.mgr._reset_con(host)
1355
1356 user = self.mgr.ssh_user if self.mgr.mode == 'root' else 'cephadm'
1357 if str(e).startswith("Can't communicate"):
1358 msg = str(e)
1359 else:
1360 msg = f'''Failed to connect to {host} ({addr}).
1361 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1362
1363 To add the cephadm SSH key to the host:
1364 > ceph cephadm get-pub-key > ~/ceph.pub
1365 > ssh-copy-id -f -i ~/ceph.pub {user}@{addr}
1366
1367 To check that the host is reachable open a new shell with the --no-hosts flag:
1368 > cephadm shell --no-hosts
1369
1370 Then run the following:
1371 > ceph cephadm get-ssh-config > ssh_config
1372 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1373 > chmod 0600 ~/cephadm_private_key
1374 > ssh -F ssh_config -i ~/cephadm_private_key {user}@{addr}'''
1375 raise OrchestratorError(msg) from e
1376 except Exception as ex:
1377 self.log.exception(ex)
1378 raise