]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/serve.py
import ceph 16.2.6
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
1 import hashlib
2 import json
3 import logging
4 import uuid
5 from collections import defaultdict
6 from contextlib import contextmanager
7 from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator
8
9 from cephadm import remotes
10
11 try:
12 import remoto
13 import execnet.gateway_bootstrap
14 except ImportError:
15 remoto = None
16
17 from ceph.deployment import inventory
18 from ceph.deployment.drive_group import DriveGroupSpec
19 from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
20 from ceph.utils import str_to_datetime, datetime_now
21
22 import orchestrator
23 from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
24 DaemonDescriptionStatus, daemon_type_to_service
25 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
26 from cephadm.schedule import HostAssignment
27 from cephadm.autotune import MemoryAutotuner
28 from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
29 CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
30 from mgr_module import MonCommandFailed
31 from mgr_util import format_bytes
32
33 from . import utils
34
35 if TYPE_CHECKING:
36 from cephadm.module import CephadmOrchestrator
37 from remoto.backends import BaseConnection
38
39 logger = logging.getLogger(__name__)
40
41 REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw']
42
43
44 class CephadmServe:
45 """
46 This module contains functions that are executed in the
47 serve() thread. Thus they don't block the CLI.
48
49 Please see the `Note regarding network calls from CLI handlers`
50 chapter in the cephadm developer guide.
51
52 On the other hand, These function should *not* be called form
53 CLI handlers, to avoid blocking the CLI
54 """
55
56 def __init__(self, mgr: "CephadmOrchestrator"):
57 self.mgr: "CephadmOrchestrator" = mgr
58 self.log = logger
59
60 def serve(self) -> None:
61 """
62 The main loop of cephadm.
63
64 A command handler will typically change the declarative state
65 of cephadm. This loop will then attempt to apply this new state.
66 """
67 self.log.debug("serve starting")
68 self.mgr.config_checker.load_network_config()
69
70 while self.mgr.run:
71
72 try:
73
74 self.convert_tags_to_repo_digest()
75
76 # refresh daemons
77 self.log.debug('refreshing hosts and daemons')
78 self._refresh_hosts_and_daemons()
79
80 self._check_for_strays()
81
82 self._update_paused_health()
83
84 if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard:
85 self.mgr.need_connect_dashboard_rgw = False
86 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
87 self.log.info('Checking dashboard <-> RGW credentials')
88 self.mgr.remote('dashboard', 'set_rgw_credentials')
89
90 if not self.mgr.paused:
91 self.mgr.to_remove_osds.process_removal_queue()
92
93 self.mgr.migration.migrate()
94 if self.mgr.migration.is_migration_ongoing():
95 continue
96
97 if self._apply_all_services():
98 continue # did something, refresh
99
100 self._check_daemons()
101
102 self._purge_deleted_services()
103
104 if self.mgr.upgrade.continue_upgrade():
105 continue
106
107 except OrchestratorError as e:
108 if e.event_subject:
109 self.mgr.events.from_orch_error(e)
110
111 self._serve_sleep()
112 self.log.debug("serve exit")
113
114 def _serve_sleep(self) -> None:
115 sleep_interval = max(
116 30,
117 min(
118 self.mgr.host_check_interval,
119 self.mgr.facts_cache_timeout,
120 self.mgr.daemon_cache_timeout,
121 self.mgr.device_cache_timeout,
122 )
123 )
124 self.log.debug('Sleeping for %d seconds', sleep_interval)
125 self.mgr.event.wait(sleep_interval)
126 self.mgr.event.clear()
127
128 def _update_paused_health(self) -> None:
129 if self.mgr.paused:
130 self.mgr.health_checks['CEPHADM_PAUSED'] = {
131 'severity': 'warning',
132 'summary': 'cephadm background work is paused',
133 'count': 1,
134 'detail': ["'ceph orch resume' to resume"],
135 }
136 self.mgr.set_health_checks(self.mgr.health_checks)
137 else:
138 if 'CEPHADM_PAUSED' in self.mgr.health_checks:
139 del self.mgr.health_checks['CEPHADM_PAUSED']
140 self.mgr.set_health_checks(self.mgr.health_checks)
141
142 def _autotune_host_memory(self, host: str) -> None:
143 total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0)
144 if not total_mem:
145 val = None
146 else:
147 total_mem *= 1024 # kb -> bytes
148 total_mem *= self.mgr.autotune_memory_target_ratio
149 a = MemoryAutotuner(
150 daemons=self.mgr.cache.get_daemons_by_host(host),
151 config_get=self.mgr.get_foreign_ceph_option,
152 total_mem=total_mem,
153 )
154 val, osds = a.tune()
155 any_changed = False
156 for o in osds:
157 if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val:
158 self.mgr.check_mon_command({
159 'prefix': 'config rm',
160 'who': o,
161 'name': 'osd_memory_target',
162 })
163 any_changed = True
164 if val is not None:
165 if any_changed:
166 self.mgr.log.info(
167 f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
168 )
169 ret, out, err = self.mgr.mon_command({
170 'prefix': 'config set',
171 'who': f'osd/host:{host}',
172 'name': 'osd_memory_target',
173 'value': str(val),
174 })
175 if ret:
176 self.log.warning(
177 f'Unable to set osd_memory_target on {host} to {val}: {err}'
178 )
179 else:
180 self.mgr.check_mon_command({
181 'prefix': 'config rm',
182 'who': f'osd/host:{host}',
183 'name': 'osd_memory_target',
184 })
185 self.mgr.cache.update_autotune(host)
186
187 def _refresh_hosts_and_daemons(self) -> None:
188 bad_hosts = []
189 failures = []
190
191 # host -> path -> (mode, uid, gid, content, digest)
192 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
193
194 # ceph.conf
195 if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
196 config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
197 config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
198
199 if self.mgr.manage_etc_ceph_ceph_conf:
200 try:
201 pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
202 ha = HostAssignment(
203 spec=ServiceSpec('mon', placement=pspec),
204 hosts=self.mgr._schedulable_hosts(),
205 unreachable_hosts=self.mgr._unreachable_hosts(),
206 daemons=[],
207 networks=self.mgr.cache.networks,
208 )
209 all_slots, _, _ = ha.place()
210 for host in {s.hostname for s in all_slots}:
211 if host not in client_files:
212 client_files[host] = {}
213 client_files[host]['/etc/ceph/ceph.conf'] = (
214 0o644, 0, 0, bytes(config), str(config_digest)
215 )
216 except Exception as e:
217 self.mgr.log.warning(
218 f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
219
220 # client keyrings
221 for ks in self.mgr.keys.keys.values():
222 assert config
223 assert config_digest
224 try:
225 ret, keyring, err = self.mgr.mon_command({
226 'prefix': 'auth get',
227 'entity': ks.entity,
228 })
229 if ret:
230 self.log.warning(f'unable to fetch keyring for {ks.entity}')
231 continue
232 digest = ''.join('%02x' % c for c in hashlib.sha256(
233 keyring.encode('utf-8')).digest())
234 ha = HostAssignment(
235 spec=ServiceSpec('mon', placement=ks.placement),
236 hosts=self.mgr._schedulable_hosts(),
237 unreachable_hosts=self.mgr._unreachable_hosts(),
238 daemons=[],
239 networks=self.mgr.cache.networks,
240 )
241 all_slots, _, _ = ha.place()
242 for host in {s.hostname for s in all_slots}:
243 if host not in client_files:
244 client_files[host] = {}
245 client_files[host]['/etc/ceph/ceph.conf'] = (
246 0o644, 0, 0, bytes(config), str(config_digest)
247 )
248 client_files[host][ks.path] = (
249 ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest
250 )
251 except Exception as e:
252 self.log.warning(
253 f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
254
255 @forall_hosts
256 def refresh(host: str) -> None:
257
258 # skip hosts that are in maintenance - they could be powered off
259 if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
260 return
261
262 if self.mgr.cache.host_needs_check(host):
263 r = self._check_host(host)
264 if r is not None:
265 bad_hosts.append(r)
266 if self.mgr.cache.host_needs_daemon_refresh(host):
267 self.log.debug('refreshing %s daemons' % host)
268 r = self._refresh_host_daemons(host)
269 if r:
270 failures.append(r)
271
272 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
273 self.log.debug(f"Logging `{host}` into custom registry")
274 r = self._registry_login(host, self.mgr.registry_url,
275 self.mgr.registry_username, self.mgr.registry_password)
276 if r:
277 bad_hosts.append(r)
278
279 if self.mgr.cache.host_needs_device_refresh(host):
280 self.log.debug('refreshing %s devices' % host)
281 r = self._refresh_host_devices(host)
282 if r:
283 failures.append(r)
284
285 if self.mgr.cache.host_needs_facts_refresh(host):
286 self.log.debug(('Refreshing %s facts' % host))
287 r = self._refresh_facts(host)
288 if r:
289 failures.append(r)
290
291 if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
292 self.log.debug(f"refreshing OSDSpec previews for {host}")
293 r = self._refresh_host_osdspec_previews(host)
294 if r:
295 failures.append(r)
296
297 if (
298 self.mgr.cache.host_needs_autotune_memory(host)
299 and not self.mgr.inventory.has_label(host, '_no_autotune_memory')
300 ):
301 self.log.debug(f"autotuning memory for {host}")
302 self._autotune_host_memory(host)
303
304 # client files
305 updated_files = False
306 old_files = self.mgr.cache.get_host_client_files(host).copy()
307 for path, m in client_files.get(host, {}).items():
308 mode, uid, gid, content, digest = m
309 if path in old_files:
310 match = old_files[path] == (digest, mode, uid, gid)
311 del old_files[path]
312 if match:
313 continue
314 self.log.info(f'Updating {host}:{path}')
315 self._write_remote_file(host, path, content, mode, uid, gid)
316 self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
317 updated_files = True
318 for path in old_files.keys():
319 self.log.info(f'Removing {host}:{path}')
320 with self._remote_connection(host) as tpl:
321 conn, connr = tpl
322 out, err, code = remoto.process.check(
323 conn,
324 ['rm', '-f', path])
325 updated_files = True
326 self.mgr.cache.removed_client_file(host, path)
327 if updated_files:
328 self.mgr.cache.save_host(host)
329
330 refresh(self.mgr.cache.get_hosts())
331
332 self.mgr.config_checker.run_checks()
333
334 health_changed = False
335 for k in [
336 'CEPHADM_HOST_CHECK_FAILED',
337 'CEPHADM_FAILED_DAEMON',
338 'CEPHADM_REFRESH_FAILED',
339 ]:
340 if k in self.mgr.health_checks:
341 del self.mgr.health_checks[k]
342 health_changed = True
343 if bad_hosts:
344 self.mgr.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
345 'severity': 'warning',
346 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
347 'count': len(bad_hosts),
348 'detail': bad_hosts,
349 }
350 health_changed = True
351 if failures:
352 self.mgr.health_checks['CEPHADM_REFRESH_FAILED'] = {
353 'severity': 'warning',
354 'summary': 'failed to probe daemons or devices',
355 'count': len(failures),
356 'detail': failures,
357 }
358 health_changed = True
359 failed_daemons = []
360 for dd in self.mgr.cache.get_daemons():
361 if dd.status is not None and dd.status == DaemonDescriptionStatus.error:
362 failed_daemons.append('daemon %s on %s is in %s state' % (
363 dd.name(), dd.hostname, dd.status_desc
364 ))
365 if failed_daemons:
366 self.mgr.health_checks['CEPHADM_FAILED_DAEMON'] = {
367 'severity': 'warning',
368 'summary': '%d failed cephadm daemon(s)' % len(failed_daemons),
369 'count': len(failed_daemons),
370 'detail': failed_daemons,
371 }
372 health_changed = True
373 if health_changed:
374 self.mgr.set_health_checks(self.mgr.health_checks)
375
376 def _check_host(self, host: str) -> Optional[str]:
377 if host not in self.mgr.inventory:
378 return None
379 self.log.debug(' checking %s' % host)
380 try:
381 addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
382 out, err, code = self._run_cephadm(
383 host, cephadmNoImage, 'check-host', [],
384 error_ok=True, no_fsid=True)
385 self.mgr.cache.update_last_host_check(host)
386 self.mgr.cache.save_host(host)
387 if code:
388 self.log.debug(' host %s (%s) failed check' % (host, addr))
389 if self.mgr.warn_on_failed_host_check:
390 return 'host %s (%s) failed check: %s' % (host, addr, err)
391 else:
392 self.log.debug(' host %s (%s) ok' % (host, addr))
393 except Exception as e:
394 self.log.debug(' host %s (%s) failed check' % (host, addr))
395 return 'host %s (%s) failed check: %s' % (host, addr, e)
396 return None
397
398 def _refresh_host_daemons(self, host: str) -> Optional[str]:
399 try:
400 ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)
401 except OrchestratorError as e:
402 return str(e)
403 dm = {}
404 for d in ls:
405 if not d['style'].startswith('cephadm'):
406 continue
407 if d['fsid'] != self.mgr._cluster_fsid:
408 continue
409 if '.' not in d['name']:
410 continue
411 sd = orchestrator.DaemonDescription()
412 sd.last_refresh = datetime_now()
413 for k in ['created', 'started', 'last_configured', 'last_deployed']:
414 v = d.get(k, None)
415 if v:
416 setattr(sd, k, str_to_datetime(d[k]))
417 sd.daemon_type = d['name'].split('.')[0]
418 if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
419 logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
420 continue
421
422 sd.daemon_id = '.'.join(d['name'].split('.')[1:])
423 sd.hostname = host
424 sd.container_id = d.get('container_id')
425 if sd.container_id:
426 # shorten the hash
427 sd.container_id = sd.container_id[0:12]
428 sd.container_image_name = d.get('container_image_name')
429 sd.container_image_id = d.get('container_image_id')
430 sd.container_image_digests = d.get('container_image_digests')
431 sd.memory_usage = d.get('memory_usage')
432 sd.memory_request = d.get('memory_request')
433 sd.memory_limit = d.get('memory_limit')
434 sd._service_name = d.get('service_name')
435 sd.deployed_by = d.get('deployed_by')
436 sd.version = d.get('version')
437 sd.ports = d.get('ports')
438 sd.ip = d.get('ip')
439 sd.rank = int(d['rank']) if d.get('rank') is not None else None
440 sd.rank_generation = int(d['rank_generation']) if d.get(
441 'rank_generation') is not None else None
442 if sd.daemon_type == 'osd':
443 sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id)
444 if 'state' in d:
445 sd.status_desc = d['state']
446 sd.status = {
447 'running': DaemonDescriptionStatus.running,
448 'stopped': DaemonDescriptionStatus.stopped,
449 'error': DaemonDescriptionStatus.error,
450 'unknown': DaemonDescriptionStatus.error,
451 }[d['state']]
452 else:
453 sd.status_desc = 'unknown'
454 sd.status = None
455 dm[sd.name()] = sd
456 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
457 self.mgr.cache.update_host_daemons(host, dm)
458 self.mgr.cache.save_host(host)
459 return None
460
461 def _refresh_facts(self, host: str) -> Optional[str]:
462 try:
463 val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True)
464 except OrchestratorError as e:
465 return str(e)
466
467 self.mgr.cache.update_host_facts(host, val)
468
469 return None
470
471 def _refresh_host_devices(self, host: str) -> Optional[str]:
472
473 with_lsm = self.mgr.get_module_option('device_enhanced_scan')
474 inventory_args = ['--', 'inventory',
475 '--format=json',
476 '--filter-for-batch']
477 if with_lsm:
478 inventory_args.insert(-1, "--with-lsm")
479
480 try:
481 try:
482 devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
483 inventory_args)
484 except OrchestratorError as e:
485 if 'unrecognized arguments: --filter-for-batch' in str(e):
486 rerun_args = inventory_args.copy()
487 rerun_args.remove('--filter-for-batch')
488 devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
489 rerun_args)
490 else:
491 raise
492
493 networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
494 except OrchestratorError as e:
495 return str(e)
496
497 self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
498 host, len(devices), len(networks)))
499 ret = inventory.Devices.from_json(devices)
500 self.mgr.cache.update_host_devices_networks(host, ret.devices, networks)
501 self.update_osdspec_previews(host)
502 self.mgr.cache.save_host(host)
503 return None
504
505 def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
506 self.update_osdspec_previews(host)
507 self.mgr.cache.save_host(host)
508 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
509 return None
510
511 def update_osdspec_previews(self, search_host: str = '') -> None:
512 # Set global 'pending' flag for host
513 self.mgr.cache.loading_osdspec_preview.add(search_host)
514 previews = []
515 # query OSDSpecs for host <search host> and generate/get the preview
516 # There can be multiple previews for one host due to multiple OSDSpecs.
517 previews.extend(self.mgr.osd_service.get_previews(search_host))
518 self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>')
519 self.mgr.cache.osdspec_previews[search_host] = previews
520 # Unset global 'pending' flag for host
521 self.mgr.cache.loading_osdspec_preview.remove(search_host)
522
523 def _check_for_strays(self) -> None:
524 self.log.debug('_check_for_strays')
525 for k in ['CEPHADM_STRAY_HOST',
526 'CEPHADM_STRAY_DAEMON']:
527 if k in self.mgr.health_checks:
528 del self.mgr.health_checks[k]
529 if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
530 ls = self.mgr.list_servers()
531 managed = self.mgr.cache.get_daemon_names()
532 host_detail = [] # type: List[str]
533 host_num_daemons = 0
534 daemon_detail = [] # type: List[str]
535 for item in ls:
536 host = item.get('hostname')
537 assert isinstance(host, str)
538 daemons = item.get('services') # misnomer!
539 assert isinstance(daemons, list)
540 missing_names = []
541 for s in daemons:
542 daemon_id = s.get('id')
543 assert daemon_id
544 name = '%s.%s' % (s.get('type'), daemon_id)
545 if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
546 metadata = self.mgr.get_metadata(
547 cast(str, s.get('type')), daemon_id, {})
548 assert metadata is not None
549 try:
550 if s.get('type') == 'rgw-nfs':
551 # https://tracker.ceph.com/issues/49573
552 name = metadata['id'][:-4]
553 else:
554 name = '%s.%s' % (s.get('type'), metadata['id'])
555 except (KeyError, TypeError):
556 self.log.debug(
557 "Failed to find daemon id for %s service %s" % (
558 s.get('type'), s.get('id')
559 )
560 )
561
562 if host not in self.mgr.inventory:
563 missing_names.append(name)
564 host_num_daemons += 1
565 if name not in managed:
566 daemon_detail.append(
567 'stray daemon %s on host %s not managed by cephadm' % (name, host))
568 if missing_names:
569 host_detail.append(
570 'stray host %s has %d stray daemons: %s' % (
571 host, len(missing_names), missing_names))
572 if self.mgr.warn_on_stray_hosts and host_detail:
573 self.mgr.health_checks['CEPHADM_STRAY_HOST'] = {
574 'severity': 'warning',
575 'summary': '%d stray host(s) with %s daemon(s) '
576 'not managed by cephadm' % (
577 len(host_detail), host_num_daemons),
578 'count': len(host_detail),
579 'detail': host_detail,
580 }
581 if self.mgr.warn_on_stray_daemons and daemon_detail:
582 self.mgr.health_checks['CEPHADM_STRAY_DAEMON'] = {
583 'severity': 'warning',
584 'summary': '%d stray daemon(s) not managed by cephadm' % (
585 len(daemon_detail)),
586 'count': len(daemon_detail),
587 'detail': daemon_detail,
588 }
589 self.mgr.set_health_checks(self.mgr.health_checks)
590
591 def _apply_all_services(self) -> bool:
592 r = False
593 specs = [] # type: List[ServiceSpec]
594 for sn, spec in self.mgr.spec_store.active_specs.items():
595 specs.append(spec)
596 for spec in specs:
597 try:
598 if self._apply_service(spec):
599 r = True
600 except Exception as e:
601 self.log.exception('Failed to apply %s spec %s: %s' % (
602 spec.service_name(), spec, e))
603 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
604
605 return r
606
607 def _apply_service_config(self, spec: ServiceSpec) -> None:
608 if spec.config:
609 section = utils.name_to_config_section(spec.service_name())
610 for k, v in spec.config.items():
611 try:
612 current = self.mgr.get_foreign_ceph_option(section, k)
613 except KeyError:
614 self.log.warning(
615 f'Ignoring invalid {spec.service_name()} config option {k}'
616 )
617 self.mgr.events.for_service(
618 spec, OrchestratorEvent.ERROR, f'Invalid config option {k}'
619 )
620 continue
621 if current != v:
622 self.log.debug(f'setting [{section}] {k} = {v}')
623 try:
624 self.mgr.check_mon_command({
625 'prefix': 'config set',
626 'name': k,
627 'value': str(v),
628 'who': section,
629 })
630 except MonCommandFailed as e:
631 self.log.warning(
632 f'Failed to set {spec.service_name()} option {k}: {e}'
633 )
634
635 def _apply_service(self, spec: ServiceSpec) -> bool:
636 """
637 Schedule a service. Deploy new daemons or remove old ones, depending
638 on the target label and count specified in the placement.
639 """
640 self.mgr.migration.verify_no_migration()
641
642 service_type = spec.service_type
643 service_name = spec.service_name()
644 if spec.unmanaged:
645 self.log.debug('Skipping unmanaged service %s' % service_name)
646 return False
647 if spec.preview_only:
648 self.log.debug('Skipping preview_only service %s' % service_name)
649 return False
650 self.log.debug('Applying service %s spec' % service_name)
651
652 self._apply_service_config(spec)
653
654 if service_type == 'osd':
655 self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
656 # TODO: return True would result in a busy loop
657 # can't know if daemon count changed; create_from_spec doesn't
658 # return a solid indication
659 return False
660
661 svc = self.mgr.cephadm_services[service_type]
662 daemons = self.mgr.cache.get_daemons_by_service(service_name)
663
664 public_networks: List[str] = []
665 if service_type == 'mon':
666 out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network'))
667 if '/' in out:
668 public_networks = [x.strip() for x in out.split(',')]
669 self.log.debug('mon public_network(s) is %s' % public_networks)
670
671 def matches_network(host):
672 # type: (str) -> bool
673 # make sure we have 1 or more IPs for any of those networks on that
674 # host
675 for network in public_networks:
676 if len(self.mgr.cache.networks[host].get(network, [])) > 0:
677 return True
678 self.log.info(
679 f"Filtered out host {host}: does not belong to mon public_network"
680 f" ({','.join(public_networks)})"
681 )
682 return False
683
684 rank_map = None
685 if svc.ranked():
686 rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
687 ha = HostAssignment(
688 spec=spec,
689 hosts=self.mgr._schedulable_hosts(),
690 unreachable_hosts=self.mgr._unreachable_hosts(),
691 daemons=daemons,
692 networks=self.mgr.cache.networks,
693 filter_new_host=(
694 matches_network if service_type == 'mon'
695 else None
696 ),
697 allow_colo=svc.allow_colo(),
698 primary_daemon_type=svc.primary_daemon_type(),
699 per_host_daemon_type=svc.per_host_daemon_type(),
700 rank_map=rank_map,
701 )
702
703 try:
704 all_slots, slots_to_add, daemons_to_remove = ha.place()
705 daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get(
706 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)]
707 self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove))
708 except OrchestratorError as e:
709 self.log.error('Failed to apply %s spec %s: %s' % (
710 spec.service_name(), spec, e))
711 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
712 return False
713
714 r = None
715
716 # sanity check
717 final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove)
718 if service_type in ['mon', 'mgr'] and final_count < 1:
719 self.log.debug('cannot scale mon|mgr below 1)')
720 return False
721
722 # progress
723 progress_id = str(uuid.uuid4())
724 delta: List[str] = []
725 if slots_to_add:
726 delta += [f'+{len(slots_to_add)}']
727 if daemons_to_remove:
728 delta += [f'-{len(daemons_to_remove)}']
729 progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
730 progress_total = len(slots_to_add) + len(daemons_to_remove)
731 progress_done = 0
732
733 def update_progress() -> None:
734 self.mgr.remote(
735 'progress', 'update', progress_id,
736 ev_msg=progress_title,
737 ev_progress=(progress_done / progress_total),
738 add_to_ceph_s=True,
739 )
740
741 if progress_total:
742 update_progress()
743
744 # add any?
745 did_config = False
746
747 self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
748 self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
749
750 try:
751 # assign names
752 for i in range(len(slots_to_add)):
753 slot = slots_to_add[i]
754 slot = slot.assign_name(self.mgr.get_unique_name(
755 slot.daemon_type,
756 slot.hostname,
757 daemons,
758 prefix=spec.service_id,
759 forcename=slot.name,
760 rank=slot.rank,
761 rank_generation=slot.rank_generation,
762 ))
763 slots_to_add[i] = slot
764 if rank_map is not None:
765 assert slot.rank is not None
766 assert slot.rank_generation is not None
767 assert rank_map[slot.rank][slot.rank_generation] is None
768 rank_map[slot.rank][slot.rank_generation] = slot.name
769
770 if rank_map:
771 # record the rank_map before we make changes so that if we fail the
772 # next mgr will clean up.
773 self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
774
775 # remove daemons now, since we are going to fence them anyway
776 for d in daemons_to_remove:
777 assert d.hostname is not None
778 self._remove_daemon(d.name(), d.hostname)
779 daemons_to_remove = []
780
781 # fence them
782 svc.fence_old_ranks(spec, rank_map, len(all_slots))
783
784 # create daemons
785 for slot in slots_to_add:
786 # first remove daemon on conflicting port?
787 if slot.ports:
788 for d in daemons_to_remove:
789 if d.hostname != slot.hostname:
790 continue
791 if not (set(d.ports or []) & set(slot.ports)):
792 continue
793 if d.ip and slot.ip and d.ip != slot.ip:
794 continue
795 self.log.info(
796 f'Removing {d.name()} before deploying to {slot} to avoid a port conflict'
797 )
798 # NOTE: we don't check ok-to-stop here to avoid starvation if
799 # there is only 1 gateway.
800 self._remove_daemon(d.name(), d.hostname)
801 daemons_to_remove.remove(d)
802 progress_done += 1
803 break
804
805 # deploy new daemon
806 daemon_id = slot.name
807 if not did_config:
808 svc.config(spec)
809 did_config = True
810
811 daemon_spec = svc.make_daemon_spec(
812 slot.hostname, daemon_id, slot.network, spec,
813 daemon_type=slot.daemon_type,
814 ports=slot.ports,
815 ip=slot.ip,
816 rank=slot.rank,
817 rank_generation=slot.rank_generation,
818 )
819 self.log.debug('Placing %s.%s on host %s' % (
820 slot.daemon_type, daemon_id, slot.hostname))
821
822 try:
823 daemon_spec = svc.prepare_create(daemon_spec)
824 self._create_daemon(daemon_spec)
825 r = True
826 progress_done += 1
827 update_progress()
828 except (RuntimeError, OrchestratorError) as e:
829 msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
830 f"on {slot.hostname}: {e}")
831 self.mgr.events.for_service(spec, 'ERROR', msg)
832 self.mgr.log.error(msg)
833 # only return "no change" if no one else has already succeeded.
834 # later successes will also change to True
835 if r is None:
836 r = False
837 progress_done += 1
838 update_progress()
839 continue
840
841 # add to daemon list so next name(s) will also be unique
842 sd = orchestrator.DaemonDescription(
843 hostname=slot.hostname,
844 daemon_type=slot.daemon_type,
845 daemon_id=daemon_id,
846 )
847 daemons.append(sd)
848
849 # remove any?
850 def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
851 daemon_ids = [d.daemon_id for d in remove_daemons]
852 assert None not in daemon_ids
853 # setting force flag retains previous behavior
854 r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True)
855 return not r.retval
856
857 while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
858 # let's find a subset that is ok-to-stop
859 daemons_to_remove.pop()
860 for d in daemons_to_remove:
861 r = True
862 assert d.hostname is not None
863 self._remove_daemon(d.name(), d.hostname)
864
865 progress_done += 1
866 update_progress()
867
868 self.mgr.remote('progress', 'complete', progress_id)
869 except Exception as e:
870 self.mgr.remote('progress', 'fail', progress_id, str(e))
871 raise
872
873 if r is None:
874 r = False
875 return r
876
877 def _check_daemons(self) -> None:
878
879 daemons = self.mgr.cache.get_daemons()
880 daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
881 for dd in daemons:
882 # orphan?
883 spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None)
884 assert dd.hostname is not None
885 assert dd.daemon_type is not None
886 assert dd.daemon_id is not None
887 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
888 # (mon and mgr specs should always exist; osds aren't matched
889 # to a service spec)
890 self.log.info('Removing orphan daemon %s...' % dd.name())
891 self._remove_daemon(dd.name(), dd.hostname)
892
893 # ignore unmanaged services
894 if spec and spec.unmanaged:
895 continue
896
897 # ignore daemons for deleted services
898 if dd.service_name() in self.mgr.spec_store.spec_deleted:
899 continue
900
901 # These daemon types require additional configs after creation
902 if dd.daemon_type in REQUIRES_POST_ACTIONS:
903 daemons_post[dd.daemon_type].append(dd)
904
905 if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
906 self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
907 dd.is_active = True
908 else:
909 dd.is_active = False
910
911 deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id)
912 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
913 dd.hostname, dd.name())
914 if last_deps is None:
915 last_deps = []
916 action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
917 if not last_config:
918 self.log.info('Reconfiguring %s (unknown last config time)...' % (
919 dd.name()))
920 action = 'reconfig'
921 elif last_deps != deps:
922 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
923 deps))
924 self.log.info('Reconfiguring %s (dependencies changed)...' % (
925 dd.name()))
926 action = 'reconfig'
927 elif self.mgr.last_monmap and \
928 self.mgr.last_monmap > last_config and \
929 dd.daemon_type in CEPH_TYPES:
930 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
931 action = 'reconfig'
932 elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
933 dd.daemon_type in CEPH_TYPES:
934 self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
935 action = 'reconfig'
936 if action:
937 if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
938 and action == 'reconfig':
939 action = 'redeploy'
940 try:
941 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
942 self.mgr._daemon_action(daemon_spec, action=action)
943 self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name())
944 except OrchestratorError as e:
945 self.mgr.events.from_orch_error(e)
946 if dd.daemon_type in daemons_post:
947 del daemons_post[dd.daemon_type]
948 # continue...
949 except Exception as e:
950 self.mgr.events.for_daemon_from_exception(dd.name(), e)
951 if dd.daemon_type in daemons_post:
952 del daemons_post[dd.daemon_type]
953 # continue...
954
955 # do daemon post actions
956 for daemon_type, daemon_descs in daemons_post.items():
957 if daemon_type in self.mgr.requires_post_actions:
958 self.mgr.requires_post_actions.remove(daemon_type)
959 self.mgr._get_cephadm_service(daemon_type_to_service(
960 daemon_type)).daemon_check_post(daemon_descs)
961
962 def _purge_deleted_services(self) -> None:
963 existing_services = self.mgr.spec_store.all_specs.items()
964 for service_name, spec in list(existing_services):
965 if service_name not in self.mgr.spec_store.spec_deleted:
966 continue
967 if self.mgr.cache.get_daemons_by_service(service_name):
968 continue
969 if spec.service_type in ['mon', 'mgr']:
970 continue
971
972 logger.info(f'Purge service {service_name}')
973
974 self.mgr.cephadm_services[spec.service_type].purge(service_name)
975 self.mgr.spec_store.finally_rm(service_name)
976
977 def convert_tags_to_repo_digest(self) -> None:
978 if not self.mgr.use_repo_digest:
979 return
980 settings = self.mgr.upgrade.get_distinct_container_image_settings()
981 digests: Dict[str, ContainerInspectInfo] = {}
982 for container_image_ref in set(settings.values()):
983 if not is_repo_digest(container_image_ref):
984 image_info = self._get_container_image_info(container_image_ref)
985 if image_info.repo_digests:
986 # FIXME: we assume the first digest here is the best
987 assert is_repo_digest(image_info.repo_digests[0]), image_info
988 digests[container_image_ref] = image_info
989
990 for entity, container_image_ref in settings.items():
991 if not is_repo_digest(container_image_ref):
992 image_info = digests[container_image_ref]
993 if image_info.repo_digests:
994 # FIXME: we assume the first digest here is the best
995 self.mgr.set_container_image(entity, image_info.repo_digests[0])
996
997 def _create_daemon(self,
998 daemon_spec: CephadmDaemonDeploySpec,
999 reconfig: bool = False,
1000 osd_uuid_map: Optional[Dict[str, Any]] = None,
1001 ) -> str:
1002
1003 with set_exception_subject('service', orchestrator.DaemonDescription(
1004 daemon_type=daemon_spec.daemon_type,
1005 daemon_id=daemon_spec.daemon_id,
1006 hostname=daemon_spec.host,
1007 ).service_id(), overwrite=True):
1008
1009 try:
1010 image = ''
1011 start_time = datetime_now()
1012 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1013
1014 if daemon_spec.daemon_type == 'container':
1015 spec = cast(CustomContainerSpec,
1016 self.mgr.spec_store[daemon_spec.service_name].spec)
1017 image = spec.image
1018 if spec.ports:
1019 ports.extend(spec.ports)
1020
1021 if daemon_spec.daemon_type == 'cephadm-exporter':
1022 if not reconfig:
1023 assert daemon_spec.host
1024 self._deploy_cephadm_binary(daemon_spec.host)
1025
1026 # TCP port to open in the host firewall
1027 if len(ports) > 0:
1028 daemon_spec.extra_args.extend([
1029 '--tcp-ports', ' '.join(map(str, ports))
1030 ])
1031
1032 # osd deployments needs an --osd-uuid arg
1033 if daemon_spec.daemon_type == 'osd':
1034 if not osd_uuid_map:
1035 osd_uuid_map = self.mgr.get_osd_uuid_map()
1036 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1037 if not osd_uuid:
1038 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1039 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1040
1041 if reconfig:
1042 daemon_spec.extra_args.append('--reconfig')
1043 if self.mgr.allow_ptrace:
1044 daemon_spec.extra_args.append('--allow-ptrace')
1045
1046 if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
1047 self._registry_login(daemon_spec.host, self.mgr.registry_url,
1048 self.mgr.registry_username, self.mgr.registry_password)
1049
1050 self.log.info('%s daemon %s on %s' % (
1051 'Reconfiguring' if reconfig else 'Deploying',
1052 daemon_spec.name(), daemon_spec.host))
1053
1054 out, err, code = self._run_cephadm(
1055 daemon_spec.host, daemon_spec.name(), 'deploy',
1056 [
1057 '--name', daemon_spec.name(),
1058 '--meta-json', json.dumps({
1059 'service_name': daemon_spec.service_name,
1060 'ports': daemon_spec.ports,
1061 'ip': daemon_spec.ip,
1062 'deployed_by': self.mgr.get_active_mgr_digests(),
1063 'rank': daemon_spec.rank,
1064 'rank_generation': daemon_spec.rank_generation,
1065 }),
1066 '--config-json', '-',
1067 ] + daemon_spec.extra_args,
1068 stdin=json.dumps(daemon_spec.final_config),
1069 image=image)
1070
1071 # refresh daemon state? (ceph daemon reconfig does not need it)
1072 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
1073 if not code and daemon_spec.host in self.mgr.cache.daemons:
1074 # prime cached service state with what we (should have)
1075 # just created
1076 sd = daemon_spec.to_daemon_description(
1077 DaemonDescriptionStatus.running, 'starting')
1078 self.mgr.cache.add_daemon(daemon_spec.host, sd)
1079 if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS:
1080 self.mgr.requires_post_actions.add(daemon_spec.daemon_type)
1081 self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
1082
1083 self.mgr.cache.update_daemon_config_deps(
1084 daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
1085 self.mgr.cache.save_host(daemon_spec.host)
1086 msg = "{} {} on host '{}'".format(
1087 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1088 if not code:
1089 self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1090 else:
1091 what = 'reconfigure' if reconfig else 'deploy'
1092 self.mgr.events.for_daemon(
1093 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1094 return msg
1095 except OrchestratorError:
1096 redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names()
1097 if not reconfig and not redeploy:
1098 # we have to clean up the daemon. E.g. keyrings.
1099 servict_type = daemon_type_to_service(daemon_spec.daemon_type)
1100 dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
1101 self.mgr.cephadm_services[servict_type].post_remove(dd)
1102 raise
1103
1104 def _remove_daemon(self, name: str, host: str) -> str:
1105 """
1106 Remove a daemon
1107 """
1108 (daemon_type, daemon_id) = name.split('.', 1)
1109 daemon = orchestrator.DaemonDescription(
1110 daemon_type=daemon_type,
1111 daemon_id=daemon_id,
1112 hostname=host)
1113
1114 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1115
1116 self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
1117
1118 # NOTE: we are passing the 'force' flag here, which means
1119 # we can delete a mon instances data.
1120 args = ['--name', name, '--force']
1121 self.log.info('Removing daemon %s from %s' % (name, host))
1122 out, err, code = self._run_cephadm(
1123 host, name, 'rm-daemon', args)
1124 if not code:
1125 # remove item from cache
1126 self.mgr.cache.rm_daemon(host, name)
1127 self.mgr.cache.invalidate_host_daemons(host)
1128
1129 self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].post_remove(daemon)
1130
1131 return "Removed {} from host '{}'".format(name, host)
1132
1133 def _run_cephadm_json(self,
1134 host: str,
1135 entity: Union[CephadmNoImage, str],
1136 command: str,
1137 args: List[str],
1138 no_fsid: Optional[bool] = False,
1139 image: Optional[str] = "",
1140 ) -> Any:
1141 try:
1142 out, err, code = self._run_cephadm(
1143 host, entity, command, args, no_fsid=no_fsid, image=image)
1144 if code:
1145 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
1146 except Exception as e:
1147 raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}')
1148 try:
1149 return json.loads(''.join(out))
1150 except (ValueError, KeyError):
1151 msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON'
1152 self.log.exception(f'{msg}: {"".join(out)}')
1153 raise OrchestratorError(msg)
1154
1155 def _run_cephadm(self,
1156 host: str,
1157 entity: Union[CephadmNoImage, str],
1158 command: str,
1159 args: List[str],
1160 addr: Optional[str] = "",
1161 stdin: Optional[str] = "",
1162 no_fsid: Optional[bool] = False,
1163 error_ok: Optional[bool] = False,
1164 image: Optional[str] = "",
1165 env_vars: Optional[List[str]] = None,
1166 ) -> Tuple[List[str], List[str], int]:
1167 """
1168 Run cephadm on the remote host with the given command + args
1169
1170 Important: You probably don't want to run _run_cephadm from CLI handlers
1171
1172 :env_vars: in format -> [KEY=VALUE, ..]
1173 """
1174 self.log.debug(f"_run_cephadm : command = {command}")
1175 self.log.debug(f"_run_cephadm : args = {args}")
1176
1177 bypass_image = ('cephadm-exporter',)
1178
1179 with self._remote_connection(host, addr) as tpl:
1180 conn, connr = tpl
1181 assert image or entity
1182 # Skip the image check for daemons deployed that are not ceph containers
1183 if not str(entity).startswith(bypass_image):
1184 if not image and entity is not cephadmNoImage:
1185 image = self.mgr._get_container_image(entity)
1186
1187 final_args = []
1188
1189 # global args
1190 if env_vars:
1191 for env_var_pair in env_vars:
1192 final_args.extend(['--env', env_var_pair])
1193
1194 if image:
1195 final_args.extend(['--image', image])
1196
1197 if not self.mgr.container_init:
1198 final_args += ['--no-container-init']
1199
1200 # subcommand
1201 final_args.append(command)
1202
1203 # subcommand args
1204 if not no_fsid:
1205 final_args += ['--fsid', self.mgr._cluster_fsid]
1206
1207 final_args += args
1208
1209 # exec
1210 self.log.debug('args: %s' % (' '.join(final_args)))
1211 if self.mgr.mode == 'root':
1212 if stdin:
1213 self.log.debug('stdin: %s' % stdin)
1214
1215 python = connr.choose_python()
1216 if not python:
1217 raise RuntimeError(
1218 'unable to find python on %s (tried %s in %s)' % (
1219 host, remotes.PYTHONS, remotes.PATH))
1220 try:
1221 out, err, code = remoto.process.check(
1222 conn,
1223 [python, self.mgr.cephadm_binary_path] + final_args,
1224 stdin=stdin.encode('utf-8') if stdin is not None else None)
1225 if code == 2:
1226 out_ls, err_ls, code_ls = remoto.process.check(
1227 conn, ['ls', self.mgr.cephadm_binary_path])
1228 if code_ls == 2:
1229 self._deploy_cephadm_binary_conn(conn, host)
1230 out, err, code = remoto.process.check(
1231 conn,
1232 [python, self.mgr.cephadm_binary_path] + final_args,
1233 stdin=stdin.encode('utf-8') if stdin is not None else None)
1234
1235 except RuntimeError as e:
1236 self.mgr._reset_con(host)
1237 if error_ok:
1238 return [], [str(e)], 1
1239 raise
1240
1241 elif self.mgr.mode == 'cephadm-package':
1242 try:
1243 out, err, code = remoto.process.check(
1244 conn,
1245 ['sudo', '/usr/bin/cephadm'] + final_args,
1246 stdin=stdin)
1247 except RuntimeError as e:
1248 self.mgr._reset_con(host)
1249 if error_ok:
1250 return [], [str(e)], 1
1251 raise
1252 else:
1253 assert False, 'unsupported mode'
1254
1255 self.log.debug('code: %d' % code)
1256 if out:
1257 self.log.debug('out: %s' % '\n'.join(out))
1258 if err:
1259 self.log.debug('err: %s' % '\n'.join(err))
1260 if code and not error_ok:
1261 raise OrchestratorError(
1262 'cephadm exited with an error code: %d, stderr:%s' % (
1263 code, '\n'.join(err)))
1264 return out, err, code
1265
1266 def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
1267 # pick a random host...
1268 host = None
1269 for host_name in self.mgr.inventory.keys():
1270 host = host_name
1271 break
1272 if not host:
1273 raise OrchestratorError('no hosts defined')
1274 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
1275 self._registry_login(host, self.mgr.registry_url,
1276 self.mgr.registry_username, self.mgr.registry_password)
1277
1278 j = self._run_cephadm_json(host, '', 'pull', [], image=image_name, no_fsid=True)
1279
1280 r = ContainerInspectInfo(
1281 j['image_id'],
1282 j.get('ceph_version'),
1283 j.get('repo_digests')
1284 )
1285 self.log.debug(f'image {image_name} -> {r}')
1286 return r
1287
1288 # function responsible for logging single host into custom registry
1289 def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
1290 self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
1291 # want to pass info over stdin rather than through normal list of args
1292 args_str = json.dumps({
1293 'url': url,
1294 'username': username,
1295 'password': password,
1296 })
1297 out, err, code = self._run_cephadm(
1298 host, 'mon', 'registry-login',
1299 ['--registry-json', '-'], stdin=args_str, error_ok=True)
1300 if code:
1301 return f"Host {host} failed to login to {url} as {username} with given password"
1302 return None
1303
1304 def _deploy_cephadm_binary(self, host: str) -> None:
1305 # Use tee (from coreutils) to create a copy of cephadm on the target machine
1306 self.log.info(f"Deploying cephadm binary to {host}")
1307 with self._remote_connection(host) as tpl:
1308 conn, _connr = tpl
1309 return self._deploy_cephadm_binary_conn(conn, host)
1310
1311 def _deploy_cephadm_binary_conn(self, conn: "BaseConnection", host: str) -> None:
1312 _out, _err, code = remoto.process.check(
1313 conn,
1314 ['mkdir', '-p', f'/var/lib/ceph/{self.mgr._cluster_fsid}'])
1315 if code:
1316 msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
1317 self.log.warning(msg)
1318 raise OrchestratorError(msg)
1319 _out, _err, code = remoto.process.check(
1320 conn,
1321 ['tee', '-', self.mgr.cephadm_binary_path],
1322 stdin=self.mgr._cephadm.encode('utf-8'))
1323 if code:
1324 msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
1325 self.log.warning(msg)
1326 raise OrchestratorError(msg)
1327
1328 def _write_remote_file(self,
1329 host: str,
1330 path: str,
1331 content: bytes,
1332 mode: int,
1333 uid: int,
1334 gid: int) -> None:
1335 with self._remote_connection(host) as tpl:
1336 conn, connr = tpl
1337 try:
1338 errmsg = connr.write_file(path, content, mode, uid, gid)
1339 if errmsg is not None:
1340 raise OrchestratorError(errmsg)
1341 except Exception as e:
1342 msg = f"Unable to write {host}:{path}: {e}"
1343 self.log.warning(msg)
1344 raise OrchestratorError(msg)
1345
1346 @contextmanager
1347 def _remote_connection(self,
1348 host: str,
1349 addr: Optional[str] = None,
1350 ) -> Iterator[Tuple["BaseConnection", Any]]:
1351 if not addr and host in self.mgr.inventory:
1352 addr = self.mgr.inventory.get_addr(host)
1353
1354 self.mgr.offline_hosts_remove(host)
1355
1356 try:
1357 try:
1358 if not addr:
1359 raise OrchestratorError("host address is empty")
1360 conn, connr = self.mgr._get_connection(addr)
1361 except OSError as e:
1362 self.mgr._reset_con(host)
1363 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1364 raise execnet.gateway_bootstrap.HostNotFound(msg)
1365
1366 yield (conn, connr)
1367
1368 except execnet.gateway_bootstrap.HostNotFound as e:
1369 # this is a misleading exception as it seems to be thrown for
1370 # any sort of connection failure, even those having nothing to
1371 # do with "host not found" (e.g., ssh key permission denied).
1372 self.mgr.offline_hosts.add(host)
1373 self.mgr._reset_con(host)
1374
1375 user = self.mgr.ssh_user if self.mgr.mode == 'root' else 'cephadm'
1376 if str(e).startswith("Can't communicate"):
1377 msg = str(e)
1378 else:
1379 msg = f'''Failed to connect to {host} ({addr}).
1380 Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1381
1382 To add the cephadm SSH key to the host:
1383 > ceph cephadm get-pub-key > ~/ceph.pub
1384 > ssh-copy-id -f -i ~/ceph.pub {user}@{addr}
1385
1386 To check that the host is reachable open a new shell with the --no-hosts flag:
1387 > cephadm shell --no-hosts
1388
1389 Then run the following:
1390 > ceph cephadm get-ssh-config > ssh_config
1391 > ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1392 > chmod 0600 ~/cephadm_private_key
1393 > ssh -F ssh_config -i ~/cephadm_private_key {user}@{addr}'''
1394 raise OrchestratorError(msg) from e
1395 except Exception as ex:
1396 self.log.exception(ex)
1397 raise