]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/serve.py
import ceph 16.2.7
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
CommitLineData
b3b6e05e 1import hashlib
f91f0fd5
TL
2import json
3import logging
b3b6e05e 4import uuid
f91f0fd5 5from collections import defaultdict
f67539c2
TL
6from contextlib import contextmanager
7from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Iterator
8
9from cephadm import remotes
f91f0fd5
TL
10
11try:
12 import remoto
f67539c2 13 import execnet.gateway_bootstrap
f91f0fd5
TL
14except ImportError:
15 remoto = None
16
17from ceph.deployment import inventory
18from ceph.deployment.drive_group import DriveGroupSpec
b3b6e05e 19from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
adb31ebb 20from ceph.utils import str_to_datetime, datetime_now
f91f0fd5
TL
21
22import orchestrator
f67539c2
TL
23from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
24 DaemonDescriptionStatus, daemon_type_to_service
25from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
f91f0fd5 26from cephadm.schedule import HostAssignment
b3b6e05e 27from cephadm.autotune import MemoryAutotuner
f67539c2
TL
28from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
29 CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
30from mgr_module import MonCommandFailed
b3b6e05e 31from mgr_util import format_bytes
f67539c2
TL
32
33from . import utils
f91f0fd5
TL
34
35if TYPE_CHECKING:
f67539c2
TL
36 from cephadm.module import CephadmOrchestrator
37 from remoto.backends import BaseConnection
f91f0fd5
TL
38
39logger = logging.getLogger(__name__)
40
522d829b
TL
41REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw']
42
f91f0fd5
TL
43
44class CephadmServe:
45 """
46 This module contains functions that are executed in the
47 serve() thread. Thus they don't block the CLI.
48
f67539c2
TL
49 Please see the `Note regarding network calls from CLI handlers`
50 chapter in the cephadm developer guide.
51
f91f0fd5
TL
52 On the other hand, These function should *not* be called form
53 CLI handlers, to avoid blocking the CLI
54 """
55
56 def __init__(self, mgr: "CephadmOrchestrator"):
57 self.mgr: "CephadmOrchestrator" = mgr
58 self.log = logger
59
60 def serve(self) -> None:
61 """
62 The main loop of cephadm.
63
64 A command handler will typically change the declarative state
65 of cephadm. This loop will then attempt to apply this new state.
66 """
67 self.log.debug("serve starting")
f67539c2
TL
68 self.mgr.config_checker.load_network_config()
69
f91f0fd5
TL
70 while self.mgr.run:
71
72 try:
73
74 self.convert_tags_to_repo_digest()
75
76 # refresh daemons
77 self.log.debug('refreshing hosts and daemons')
78 self._refresh_hosts_and_daemons()
79
80 self._check_for_strays()
81
82 self._update_paused_health()
83
522d829b
TL
84 if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard:
85 self.mgr.need_connect_dashboard_rgw = False
86 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
87 self.log.info('Checking dashboard <-> RGW credentials')
88 self.mgr.remote('dashboard', 'set_rgw_credentials')
89
f91f0fd5 90 if not self.mgr.paused:
adb31ebb 91 self.mgr.to_remove_osds.process_removal_queue()
f91f0fd5
TL
92
93 self.mgr.migration.migrate()
94 if self.mgr.migration.is_migration_ongoing():
95 continue
96
97 if self._apply_all_services():
98 continue # did something, refresh
99
100 self._check_daemons()
101
f67539c2
TL
102 self._purge_deleted_services()
103
f91f0fd5
TL
104 if self.mgr.upgrade.continue_upgrade():
105 continue
106
107 except OrchestratorError as e:
108 if e.event_subject:
109 self.mgr.events.from_orch_error(e)
110
111 self._serve_sleep()
112 self.log.debug("serve exit")
113
adb31ebb 114 def _serve_sleep(self) -> None:
f67539c2
TL
115 sleep_interval = max(
116 30,
117 min(
118 self.mgr.host_check_interval,
119 self.mgr.facts_cache_timeout,
120 self.mgr.daemon_cache_timeout,
121 self.mgr.device_cache_timeout,
122 )
123 )
f91f0fd5 124 self.log.debug('Sleeping for %d seconds', sleep_interval)
f67539c2 125 self.mgr.event.wait(sleep_interval)
f91f0fd5
TL
126 self.mgr.event.clear()
127
adb31ebb 128 def _update_paused_health(self) -> None:
f91f0fd5 129 if self.mgr.paused:
a4b75251 130 self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, ["'ceph orch resume' to resume"])
f91f0fd5 131 else:
a4b75251 132 self.mgr.remove_health_warning('CEPHADM_PAUSED')
f91f0fd5 133
b3b6e05e
TL
134 def _autotune_host_memory(self, host: str) -> None:
135 total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0)
136 if not total_mem:
137 val = None
138 else:
139 total_mem *= 1024 # kb -> bytes
140 total_mem *= self.mgr.autotune_memory_target_ratio
141 a = MemoryAutotuner(
142 daemons=self.mgr.cache.get_daemons_by_host(host),
143 config_get=self.mgr.get_foreign_ceph_option,
144 total_mem=total_mem,
145 )
146 val, osds = a.tune()
147 any_changed = False
148 for o in osds:
149 if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val:
150 self.mgr.check_mon_command({
151 'prefix': 'config rm',
152 'who': o,
153 'name': 'osd_memory_target',
154 })
155 any_changed = True
156 if val is not None:
157 if any_changed:
158 self.mgr.log.info(
159 f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
160 )
161 ret, out, err = self.mgr.mon_command({
162 'prefix': 'config set',
163 'who': f'osd/host:{host}',
164 'name': 'osd_memory_target',
165 'value': str(val),
166 })
167 if ret:
168 self.log.warning(
169 f'Unable to set osd_memory_target on {host} to {val}: {err}'
170 )
171 else:
172 self.mgr.check_mon_command({
173 'prefix': 'config rm',
174 'who': f'osd/host:{host}',
175 'name': 'osd_memory_target',
176 })
177 self.mgr.cache.update_autotune(host)
178
f91f0fd5
TL
179 def _refresh_hosts_and_daemons(self) -> None:
180 bad_hosts = []
181 failures = []
182
b3b6e05e
TL
183 # host -> path -> (mode, uid, gid, content, digest)
184 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
185
186 # ceph.conf
187 if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
188 config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
189 config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
190
191 if self.mgr.manage_etc_ceph_ceph_conf:
192 try:
193 pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
194 ha = HostAssignment(
195 spec=ServiceSpec('mon', placement=pspec),
196 hosts=self.mgr._schedulable_hosts(),
522d829b 197 unreachable_hosts=self.mgr._unreachable_hosts(),
b3b6e05e
TL
198 daemons=[],
199 networks=self.mgr.cache.networks,
200 )
201 all_slots, _, _ = ha.place()
202 for host in {s.hostname for s in all_slots}:
203 if host not in client_files:
204 client_files[host] = {}
205 client_files[host]['/etc/ceph/ceph.conf'] = (
206 0o644, 0, 0, bytes(config), str(config_digest)
207 )
208 except Exception as e:
209 self.mgr.log.warning(
210 f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
211
212 # client keyrings
213 for ks in self.mgr.keys.keys.values():
214 assert config
215 assert config_digest
216 try:
217 ret, keyring, err = self.mgr.mon_command({
218 'prefix': 'auth get',
219 'entity': ks.entity,
220 })
221 if ret:
222 self.log.warning(f'unable to fetch keyring for {ks.entity}')
223 continue
224 digest = ''.join('%02x' % c for c in hashlib.sha256(
225 keyring.encode('utf-8')).digest())
226 ha = HostAssignment(
227 spec=ServiceSpec('mon', placement=ks.placement),
228 hosts=self.mgr._schedulable_hosts(),
522d829b 229 unreachable_hosts=self.mgr._unreachable_hosts(),
b3b6e05e
TL
230 daemons=[],
231 networks=self.mgr.cache.networks,
232 )
233 all_slots, _, _ = ha.place()
234 for host in {s.hostname for s in all_slots}:
235 if host not in client_files:
236 client_files[host] = {}
237 client_files[host]['/etc/ceph/ceph.conf'] = (
238 0o644, 0, 0, bytes(config), str(config_digest)
239 )
240 client_files[host][ks.path] = (
241 ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest
242 )
243 except Exception as e:
244 self.log.warning(
245 f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
246
f91f0fd5 247 @forall_hosts
adb31ebb
TL
248 def refresh(host: str) -> None:
249
f67539c2
TL
250 # skip hosts that are in maintenance - they could be powered off
251 if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
252 return
253
f91f0fd5
TL
254 if self.mgr.cache.host_needs_check(host):
255 r = self._check_host(host)
256 if r is not None:
257 bad_hosts.append(r)
258 if self.mgr.cache.host_needs_daemon_refresh(host):
259 self.log.debug('refreshing %s daemons' % host)
260 r = self._refresh_host_daemons(host)
261 if r:
262 failures.append(r)
263
264 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
265 self.log.debug(f"Logging `{host}` into custom registry")
f67539c2
TL
266 r = self._registry_login(host, self.mgr.registry_url,
267 self.mgr.registry_username, self.mgr.registry_password)
f91f0fd5
TL
268 if r:
269 bad_hosts.append(r)
270
271 if self.mgr.cache.host_needs_device_refresh(host):
272 self.log.debug('refreshing %s devices' % host)
273 r = self._refresh_host_devices(host)
274 if r:
275 failures.append(r)
276
adb31ebb 277 if self.mgr.cache.host_needs_facts_refresh(host):
f67539c2 278 self.log.debug(('Refreshing %s facts' % host))
adb31ebb
TL
279 r = self._refresh_facts(host)
280 if r:
281 failures.append(r)
282
f91f0fd5
TL
283 if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
284 self.log.debug(f"refreshing OSDSpec previews for {host}")
285 r = self._refresh_host_osdspec_previews(host)
286 if r:
287 failures.append(r)
288
b3b6e05e
TL
289 if (
290 self.mgr.cache.host_needs_autotune_memory(host)
291 and not self.mgr.inventory.has_label(host, '_no_autotune_memory')
292 ):
293 self.log.debug(f"autotuning memory for {host}")
294 self._autotune_host_memory(host)
295
296 # client files
297 updated_files = False
298 old_files = self.mgr.cache.get_host_client_files(host).copy()
299 for path, m in client_files.get(host, {}).items():
300 mode, uid, gid, content, digest = m
301 if path in old_files:
302 match = old_files[path] == (digest, mode, uid, gid)
303 del old_files[path]
304 if match:
305 continue
306 self.log.info(f'Updating {host}:{path}')
307 self._write_remote_file(host, path, content, mode, uid, gid)
308 self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
309 updated_files = True
310 for path in old_files.keys():
311 self.log.info(f'Removing {host}:{path}')
312 with self._remote_connection(host) as tpl:
313 conn, connr = tpl
314 out, err, code = remoto.process.check(
315 conn,
316 ['rm', '-f', path])
317 updated_files = True
318 self.mgr.cache.removed_client_file(host, path)
319 if updated_files:
320 self.mgr.cache.save_host(host)
f91f0fd5
TL
321
322 refresh(self.mgr.cache.get_hosts())
323
f67539c2
TL
324 self.mgr.config_checker.run_checks()
325
adb31ebb
TL
326 for k in [
327 'CEPHADM_HOST_CHECK_FAILED',
328 'CEPHADM_FAILED_DAEMON',
329 'CEPHADM_REFRESH_FAILED',
330 ]:
a4b75251 331 self.mgr.remove_health_warning(k)
f91f0fd5 332 if bad_hosts:
a4b75251 333 self.mgr.set_health_warning('CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
f91f0fd5 334 if failures:
a4b75251 335 self.mgr.set_health_warning('CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
adb31ebb
TL
336 failed_daemons = []
337 for dd in self.mgr.cache.get_daemons():
f67539c2 338 if dd.status is not None and dd.status == DaemonDescriptionStatus.error:
adb31ebb
TL
339 failed_daemons.append('daemon %s on %s is in %s state' % (
340 dd.name(), dd.hostname, dd.status_desc
341 ))
342 if failed_daemons:
a4b75251 343 self.mgr.set_health_warning('CEPHADM_FAILED_DAEMON', f'{len(failed_daemons)} failed cephadm daemon(s)', len(failed_daemons), failed_daemons)
f91f0fd5 344
adb31ebb 345 def _check_host(self, host: str) -> Optional[str]:
f91f0fd5 346 if host not in self.mgr.inventory:
adb31ebb 347 return None
f91f0fd5
TL
348 self.log.debug(' checking %s' % host)
349 try:
522d829b 350 addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
f67539c2 351 out, err, code = self._run_cephadm(
f91f0fd5
TL
352 host, cephadmNoImage, 'check-host', [],
353 error_ok=True, no_fsid=True)
354 self.mgr.cache.update_last_host_check(host)
355 self.mgr.cache.save_host(host)
356 if code:
522d829b 357 self.log.debug(' host %s (%s) failed check' % (host, addr))
f91f0fd5 358 if self.mgr.warn_on_failed_host_check:
522d829b 359 return 'host %s (%s) failed check: %s' % (host, addr, err)
f91f0fd5 360 else:
522d829b 361 self.log.debug(' host %s (%s) ok' % (host, addr))
f91f0fd5 362 except Exception as e:
522d829b
TL
363 self.log.debug(' host %s (%s) failed check' % (host, addr))
364 return 'host %s (%s) failed check: %s' % (host, addr, e)
adb31ebb 365 return None
f91f0fd5 366
adb31ebb 367 def _refresh_host_daemons(self, host: str) -> Optional[str]:
f91f0fd5 368 try:
adb31ebb
TL
369 ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)
370 except OrchestratorError as e:
371 return str(e)
f91f0fd5
TL
372 dm = {}
373 for d in ls:
374 if not d['style'].startswith('cephadm'):
375 continue
376 if d['fsid'] != self.mgr._cluster_fsid:
377 continue
378 if '.' not in d['name']:
379 continue
380 sd = orchestrator.DaemonDescription()
adb31ebb 381 sd.last_refresh = datetime_now()
f91f0fd5
TL
382 for k in ['created', 'started', 'last_configured', 'last_deployed']:
383 v = d.get(k, None)
384 if v:
385 setattr(sd, k, str_to_datetime(d[k]))
386 sd.daemon_type = d['name'].split('.')[0]
522d829b
TL
387 if sd.daemon_type not in orchestrator.KNOWN_DAEMON_TYPES:
388 logger.warning(f"Found unknown daemon type {sd.daemon_type} on host {host}")
389 continue
390
f91f0fd5
TL
391 sd.daemon_id = '.'.join(d['name'].split('.')[1:])
392 sd.hostname = host
393 sd.container_id = d.get('container_id')
394 if sd.container_id:
395 # shorten the hash
396 sd.container_id = sd.container_id[0:12]
397 sd.container_image_name = d.get('container_image_name')
398 sd.container_image_id = d.get('container_image_id')
f67539c2
TL
399 sd.container_image_digests = d.get('container_image_digests')
400 sd.memory_usage = d.get('memory_usage')
401 sd.memory_request = d.get('memory_request')
402 sd.memory_limit = d.get('memory_limit')
403 sd._service_name = d.get('service_name')
404 sd.deployed_by = d.get('deployed_by')
f91f0fd5 405 sd.version = d.get('version')
f67539c2
TL
406 sd.ports = d.get('ports')
407 sd.ip = d.get('ip')
b3b6e05e 408 sd.rank = int(d['rank']) if d.get('rank') is not None else None
522d829b
TL
409 sd.rank_generation = int(d['rank_generation']) if d.get(
410 'rank_generation') is not None else None
f91f0fd5
TL
411 if sd.daemon_type == 'osd':
412 sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id)
413 if 'state' in d:
414 sd.status_desc = d['state']
415 sd.status = {
f67539c2
TL
416 'running': DaemonDescriptionStatus.running,
417 'stopped': DaemonDescriptionStatus.stopped,
418 'error': DaemonDescriptionStatus.error,
419 'unknown': DaemonDescriptionStatus.error,
f91f0fd5
TL
420 }[d['state']]
421 else:
422 sd.status_desc = 'unknown'
423 sd.status = None
424 dm[sd.name()] = sd
425 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
426 self.mgr.cache.update_host_daemons(host, dm)
427 self.mgr.cache.save_host(host)
428 return None
429
adb31ebb 430 def _refresh_facts(self, host: str) -> Optional[str]:
f91f0fd5 431 try:
adb31ebb
TL
432 val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True)
433 except OrchestratorError as e:
434 return str(e)
435
436 self.mgr.cache.update_host_facts(host, val)
437
438 return None
439
440 def _refresh_host_devices(self, host: str) -> Optional[str]:
f67539c2
TL
441
442 with_lsm = self.mgr.get_module_option('device_enhanced_scan')
443 inventory_args = ['--', 'inventory',
a4b75251 444 '--format=json-pretty',
f67539c2
TL
445 '--filter-for-batch']
446 if with_lsm:
447 inventory_args.insert(-1, "--with-lsm")
448
f91f0fd5 449 try:
adb31ebb
TL
450 try:
451 devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
f67539c2 452 inventory_args)
adb31ebb
TL
453 except OrchestratorError as e:
454 if 'unrecognized arguments: --filter-for-batch' in str(e):
f67539c2
TL
455 rerun_args = inventory_args.copy()
456 rerun_args.remove('--filter-for-batch')
adb31ebb 457 devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
f67539c2 458 rerun_args)
adb31ebb
TL
459 else:
460 raise
461
462 networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
463 except OrchestratorError as e:
464 return str(e)
465
f91f0fd5
TL
466 self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
467 host, len(devices), len(networks)))
adb31ebb
TL
468 ret = inventory.Devices.from_json(devices)
469 self.mgr.cache.update_host_devices_networks(host, ret.devices, networks)
f91f0fd5
TL
470 self.update_osdspec_previews(host)
471 self.mgr.cache.save_host(host)
472 return None
473
adb31ebb 474 def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
f91f0fd5
TL
475 self.update_osdspec_previews(host)
476 self.mgr.cache.save_host(host)
477 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
adb31ebb 478 return None
f91f0fd5 479
adb31ebb 480 def update_osdspec_previews(self, search_host: str = '') -> None:
f91f0fd5
TL
481 # Set global 'pending' flag for host
482 self.mgr.cache.loading_osdspec_preview.add(search_host)
483 previews = []
484 # query OSDSpecs for host <search host> and generate/get the preview
485 # There can be multiple previews for one host due to multiple OSDSpecs.
486 previews.extend(self.mgr.osd_service.get_previews(search_host))
f67539c2 487 self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>')
f91f0fd5
TL
488 self.mgr.cache.osdspec_previews[search_host] = previews
489 # Unset global 'pending' flag for host
490 self.mgr.cache.loading_osdspec_preview.remove(search_host)
491
f91f0fd5
TL
492 def _check_for_strays(self) -> None:
493 self.log.debug('_check_for_strays')
494 for k in ['CEPHADM_STRAY_HOST',
495 'CEPHADM_STRAY_DAEMON']:
a4b75251 496 self.mgr.remove_health_warning(k)
f91f0fd5
TL
497 if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
498 ls = self.mgr.list_servers()
a4b75251 499 self.log.debug(ls)
f91f0fd5
TL
500 managed = self.mgr.cache.get_daemon_names()
501 host_detail = [] # type: List[str]
502 host_num_daemons = 0
503 daemon_detail = [] # type: List[str]
504 for item in ls:
505 host = item.get('hostname')
f67539c2 506 assert isinstance(host, str)
f91f0fd5 507 daemons = item.get('services') # misnomer!
f67539c2 508 assert isinstance(daemons, list)
f91f0fd5
TL
509 missing_names = []
510 for s in daemons:
f67539c2
TL
511 daemon_id = s.get('id')
512 assert daemon_id
513 name = '%s.%s' % (s.get('type'), daemon_id)
514 if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
f91f0fd5 515 metadata = self.mgr.get_metadata(
f67539c2
TL
516 cast(str, s.get('type')), daemon_id, {})
517 assert metadata is not None
518 try:
519 if s.get('type') == 'rgw-nfs':
520 # https://tracker.ceph.com/issues/49573
521 name = metadata['id'][:-4]
522 else:
523 name = '%s.%s' % (s.get('type'), metadata['id'])
524 except (KeyError, TypeError):
f91f0fd5 525 self.log.debug(
f67539c2
TL
526 "Failed to find daemon id for %s service %s" % (
527 s.get('type'), s.get('id')
528 )
529 )
f91f0fd5
TL
530
531 if host not in self.mgr.inventory:
532 missing_names.append(name)
533 host_num_daemons += 1
534 if name not in managed:
535 daemon_detail.append(
536 'stray daemon %s on host %s not managed by cephadm' % (name, host))
537 if missing_names:
538 host_detail.append(
539 'stray host %s has %d stray daemons: %s' % (
540 host, len(missing_names), missing_names))
541 if self.mgr.warn_on_stray_hosts and host_detail:
a4b75251 542 self.mgr.set_health_warning('CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
f91f0fd5 543 if self.mgr.warn_on_stray_daemons and daemon_detail:
a4b75251 544 self.mgr.set_health_warning('CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
f91f0fd5
TL
545
546 def _apply_all_services(self) -> bool:
547 r = False
548 specs = [] # type: List[ServiceSpec]
f67539c2 549 for sn, spec in self.mgr.spec_store.active_specs.items():
f91f0fd5 550 specs.append(spec)
a4b75251
TL
551 for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
552 self.mgr.remove_health_warning(name)
553 self.mgr.apply_spec_fails = []
f91f0fd5
TL
554 for spec in specs:
555 try:
556 if self._apply_service(spec):
557 r = True
558 except Exception as e:
a4b75251
TL
559 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
560 self.log.exception(msg)
f91f0fd5 561 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
a4b75251
TL
562 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
563 warnings = []
564 for x in self.mgr.apply_spec_fails:
565 warnings.append(f'{x[0]}: {x[1]}')
566 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
567 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
568 len(self.mgr.apply_spec_fails),
569 warnings)
f91f0fd5
TL
570
571 return r
572
f67539c2
TL
573 def _apply_service_config(self, spec: ServiceSpec) -> None:
574 if spec.config:
575 section = utils.name_to_config_section(spec.service_name())
a4b75251
TL
576 for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']:
577 self.mgr.remove_health_warning(name)
578 invalid_config_options = []
579 options_failed_to_set = []
f67539c2
TL
580 for k, v in spec.config.items():
581 try:
582 current = self.mgr.get_foreign_ceph_option(section, k)
583 except KeyError:
a4b75251
TL
584 msg = f'Ignoring invalid {spec.service_name()} config option {k}'
585 self.log.warning(msg)
f67539c2
TL
586 self.mgr.events.for_service(
587 spec, OrchestratorEvent.ERROR, f'Invalid config option {k}'
588 )
a4b75251 589 invalid_config_options.append(msg)
f67539c2
TL
590 continue
591 if current != v:
592 self.log.debug(f'setting [{section}] {k} = {v}')
593 try:
594 self.mgr.check_mon_command({
595 'prefix': 'config set',
596 'name': k,
597 'value': str(v),
598 'who': section,
599 })
600 except MonCommandFailed as e:
a4b75251
TL
601 msg = f'Failed to set {spec.service_name()} option {k}: {e}'
602 self.log.warning(msg)
603 options_failed_to_set.append(msg)
604
605 if invalid_config_options:
606 self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(invalid_config_options), invalid_config_options)
607 if options_failed_to_set:
608 self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(options_failed_to_set), options_failed_to_set)
f91f0fd5
TL
609
610 def _apply_service(self, spec: ServiceSpec) -> bool:
611 """
612 Schedule a service. Deploy new daemons or remove old ones, depending
613 on the target label and count specified in the placement.
614 """
615 self.mgr.migration.verify_no_migration()
616
f67539c2 617 service_type = spec.service_type
f91f0fd5
TL
618 service_name = spec.service_name()
619 if spec.unmanaged:
620 self.log.debug('Skipping unmanaged service %s' % service_name)
621 return False
622 if spec.preview_only:
623 self.log.debug('Skipping preview_only service %s' % service_name)
624 return False
625 self.log.debug('Applying service %s spec' % service_name)
626
f67539c2 627 self._apply_service_config(spec)
f91f0fd5 628
f67539c2 629 if service_type == 'osd':
f91f0fd5
TL
630 self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
631 # TODO: return True would result in a busy loop
632 # can't know if daemon count changed; create_from_spec doesn't
633 # return a solid indication
634 return False
635
f67539c2 636 svc = self.mgr.cephadm_services[service_type]
f91f0fd5
TL
637 daemons = self.mgr.cache.get_daemons_by_service(service_name)
638
b3b6e05e 639 public_networks: List[str] = []
f67539c2
TL
640 if service_type == 'mon':
641 out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network'))
f91f0fd5 642 if '/' in out:
b3b6e05e
TL
643 public_networks = [x.strip() for x in out.split(',')]
644 self.log.debug('mon public_network(s) is %s' % public_networks)
f91f0fd5
TL
645
646 def matches_network(host):
647 # type: (str) -> bool
b3b6e05e 648 # make sure we have 1 or more IPs for any of those networks on that
f91f0fd5 649 # host
b3b6e05e
TL
650 for network in public_networks:
651 if len(self.mgr.cache.networks[host].get(network, [])) > 0:
652 return True
653 self.log.info(
654 f"Filtered out host {host}: does not belong to mon public_network"
655 f" ({','.join(public_networks)})"
656 )
657 return False
f91f0fd5 658
b3b6e05e
TL
659 rank_map = None
660 if svc.ranked():
661 rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
f91f0fd5
TL
662 ha = HostAssignment(
663 spec=spec,
b3b6e05e 664 hosts=self.mgr._schedulable_hosts(),
522d829b 665 unreachable_hosts=self.mgr._unreachable_hosts(),
f67539c2
TL
666 daemons=daemons,
667 networks=self.mgr.cache.networks,
668 filter_new_host=(
669 matches_network if service_type == 'mon'
670 else None
671 ),
672 allow_colo=svc.allow_colo(),
673 primary_daemon_type=svc.primary_daemon_type(),
674 per_host_daemon_type=svc.per_host_daemon_type(),
b3b6e05e 675 rank_map=rank_map,
f91f0fd5
TL
676 )
677
f67539c2
TL
678 try:
679 all_slots, slots_to_add, daemons_to_remove = ha.place()
b3b6e05e 680 daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get(
522d829b 681 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)]
f67539c2
TL
682 self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove))
683 except OrchestratorError as e:
a4b75251
TL
684 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
685 self.log.error(msg)
f67539c2 686 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
a4b75251
TL
687 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
688 warnings = []
689 for x in self.mgr.apply_spec_fails:
690 warnings.append(f'{x[0]}: {x[1]}')
691 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
692 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
693 len(self.mgr.apply_spec_fails),
694 warnings)
f67539c2 695 return False
f91f0fd5
TL
696
697 r = None
698
699 # sanity check
f67539c2
TL
700 final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove)
701 if service_type in ['mon', 'mgr'] and final_count < 1:
702 self.log.debug('cannot scale mon|mgr below 1)')
f91f0fd5
TL
703 return False
704
b3b6e05e
TL
705 # progress
706 progress_id = str(uuid.uuid4())
707 delta: List[str] = []
708 if slots_to_add:
709 delta += [f'+{len(slots_to_add)}']
710 if daemons_to_remove:
711 delta += [f'-{len(daemons_to_remove)}']
712 progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
713 progress_total = len(slots_to_add) + len(daemons_to_remove)
714 progress_done = 0
715
716 def update_progress() -> None:
717 self.mgr.remote(
718 'progress', 'update', progress_id,
719 ev_msg=progress_title,
720 ev_progress=(progress_done / progress_total),
721 add_to_ceph_s=True,
722 )
723
724 if progress_total:
725 update_progress()
726
f91f0fd5
TL
727 # add any?
728 did_config = False
729
f67539c2
TL
730 self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
731 self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
f91f0fd5 732
522d829b
TL
733 try:
734 # assign names
735 for i in range(len(slots_to_add)):
736 slot = slots_to_add[i]
737 slot = slot.assign_name(self.mgr.get_unique_name(
738 slot.daemon_type,
739 slot.hostname,
740 daemons,
741 prefix=spec.service_id,
742 forcename=slot.name,
743 rank=slot.rank,
744 rank_generation=slot.rank_generation,
745 ))
746 slots_to_add[i] = slot
747 if rank_map is not None:
748 assert slot.rank is not None
749 assert slot.rank_generation is not None
750 assert rank_map[slot.rank][slot.rank_generation] is None
751 rank_map[slot.rank][slot.rank_generation] = slot.name
752
753 if rank_map:
754 # record the rank_map before we make changes so that if we fail the
755 # next mgr will clean up.
756 self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
757
758 # remove daemons now, since we are going to fence them anyway
f67539c2 759 for d in daemons_to_remove:
522d829b 760 assert d.hostname is not None
f67539c2 761 self._remove_daemon(d.name(), d.hostname)
522d829b
TL
762 daemons_to_remove = []
763
764 # fence them
765 svc.fence_old_ranks(spec, rank_map, len(all_slots))
766
767 # create daemons
a4b75251 768 daemon_place_fails = []
522d829b
TL
769 for slot in slots_to_add:
770 # first remove daemon on conflicting port?
771 if slot.ports:
772 for d in daemons_to_remove:
773 if d.hostname != slot.hostname:
774 continue
775 if not (set(d.ports or []) & set(slot.ports)):
776 continue
777 if d.ip and slot.ip and d.ip != slot.ip:
778 continue
779 self.log.info(
780 f'Removing {d.name()} before deploying to {slot} to avoid a port conflict'
781 )
782 # NOTE: we don't check ok-to-stop here to avoid starvation if
783 # there is only 1 gateway.
784 self._remove_daemon(d.name(), d.hostname)
785 daemons_to_remove.remove(d)
786 progress_done += 1
787 break
788
789 # deploy new daemon
790 daemon_id = slot.name
791 if not did_config:
792 svc.config(spec)
793 did_config = True
794
795 daemon_spec = svc.make_daemon_spec(
796 slot.hostname, daemon_id, slot.network, spec,
797 daemon_type=slot.daemon_type,
798 ports=slot.ports,
799 ip=slot.ip,
800 rank=slot.rank,
801 rank_generation=slot.rank_generation,
802 )
803 self.log.debug('Placing %s.%s on host %s' % (
804 slot.daemon_type, daemon_id, slot.hostname))
805
806 try:
807 daemon_spec = svc.prepare_create(daemon_spec)
808 self._create_daemon(daemon_spec)
809 r = True
b3b6e05e 810 progress_done += 1
522d829b
TL
811 update_progress()
812 except (RuntimeError, OrchestratorError) as e:
813 msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
814 f"on {slot.hostname}: {e}")
815 self.mgr.events.for_service(spec, 'ERROR', msg)
816 self.mgr.log.error(msg)
a4b75251 817 daemon_place_fails.append(msg)
522d829b
TL
818 # only return "no change" if no one else has already succeeded.
819 # later successes will also change to True
820 if r is None:
821 r = False
822 progress_done += 1
823 update_progress()
824 continue
f91f0fd5 825
522d829b
TL
826 # add to daemon list so next name(s) will also be unique
827 sd = orchestrator.DaemonDescription(
828 hostname=slot.hostname,
829 daemon_type=slot.daemon_type,
830 daemon_id=daemon_id,
831 )
832 daemons.append(sd)
833
a4b75251
TL
834 if daemon_place_fails:
835 self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(daemon_place_fails), daemon_place_fails)
836
522d829b
TL
837 # remove any?
838 def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
839 daemon_ids = [d.daemon_id for d in remove_daemons]
840 assert None not in daemon_ids
841 # setting force flag retains previous behavior
842 r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True)
843 return not r.retval
844
845 while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
846 # let's find a subset that is ok-to-stop
847 daemons_to_remove.pop()
848 for d in daemons_to_remove:
f91f0fd5 849 r = True
522d829b
TL
850 assert d.hostname is not None
851 self._remove_daemon(d.name(), d.hostname)
852
b3b6e05e
TL
853 progress_done += 1
854 update_progress()
f91f0fd5 855
522d829b
TL
856 self.mgr.remote('progress', 'complete', progress_id)
857 except Exception as e:
858 self.mgr.remote('progress', 'fail', progress_id, str(e))
859 raise
b3b6e05e 860
f91f0fd5
TL
861 if r is None:
862 r = False
863 return r
864
865 def _check_daemons(self) -> None:
866
867 daemons = self.mgr.cache.get_daemons()
868 daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
869 for dd in daemons:
870 # orphan?
f67539c2
TL
871 spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None)
872 assert dd.hostname is not None
873 assert dd.daemon_type is not None
874 assert dd.daemon_id is not None
f91f0fd5
TL
875 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
876 # (mon and mgr specs should always exist; osds aren't matched
877 # to a service spec)
878 self.log.info('Removing orphan daemon %s...' % dd.name())
f67539c2 879 self._remove_daemon(dd.name(), dd.hostname)
f91f0fd5
TL
880
881 # ignore unmanaged services
882 if spec and spec.unmanaged:
883 continue
884
b3b6e05e
TL
885 # ignore daemons for deleted services
886 if dd.service_name() in self.mgr.spec_store.spec_deleted:
887 continue
888
f91f0fd5 889 # These daemon types require additional configs after creation
522d829b 890 if dd.daemon_type in REQUIRES_POST_ACTIONS:
f91f0fd5
TL
891 daemons_post[dd.daemon_type].append(dd)
892
f67539c2 893 if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
f91f0fd5
TL
894 self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
895 dd.is_active = True
896 else:
897 dd.is_active = False
898
f67539c2 899 deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id)
f91f0fd5
TL
900 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
901 dd.hostname, dd.name())
902 if last_deps is None:
903 last_deps = []
904 action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
905 if not last_config:
906 self.log.info('Reconfiguring %s (unknown last config time)...' % (
907 dd.name()))
908 action = 'reconfig'
909 elif last_deps != deps:
910 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
911 deps))
912 self.log.info('Reconfiguring %s (dependencies changed)...' % (
913 dd.name()))
914 action = 'reconfig'
915 elif self.mgr.last_monmap and \
916 self.mgr.last_monmap > last_config and \
917 dd.daemon_type in CEPH_TYPES:
918 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
919 action = 'reconfig'
920 elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
921 dd.daemon_type in CEPH_TYPES:
922 self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
923 action = 'reconfig'
924 if action:
925 if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
926 and action == 'reconfig':
927 action = 'redeploy'
928 try:
f67539c2
TL
929 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
930 self.mgr._daemon_action(daemon_spec, action=action)
f91f0fd5
TL
931 self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name())
932 except OrchestratorError as e:
933 self.mgr.events.from_orch_error(e)
934 if dd.daemon_type in daemons_post:
935 del daemons_post[dd.daemon_type]
936 # continue...
937 except Exception as e:
938 self.mgr.events.for_daemon_from_exception(dd.name(), e)
939 if dd.daemon_type in daemons_post:
940 del daemons_post[dd.daemon_type]
941 # continue...
942
943 # do daemon post actions
944 for daemon_type, daemon_descs in daemons_post.items():
a4b75251
TL
945 run_post = False
946 for d in daemon_descs:
947 if d.name() in self.mgr.requires_post_actions:
948 self.mgr.requires_post_actions.remove(d.name())
949 run_post = True
950 if run_post:
f67539c2
TL
951 self.mgr._get_cephadm_service(daemon_type_to_service(
952 daemon_type)).daemon_check_post(daemon_descs)
953
954 def _purge_deleted_services(self) -> None:
955 existing_services = self.mgr.spec_store.all_specs.items()
956 for service_name, spec in list(existing_services):
957 if service_name not in self.mgr.spec_store.spec_deleted:
958 continue
959 if self.mgr.cache.get_daemons_by_service(service_name):
960 continue
961 if spec.service_type in ['mon', 'mgr']:
962 continue
963
964 logger.info(f'Purge service {service_name}')
965
966 self.mgr.cephadm_services[spec.service_type].purge(service_name)
967 self.mgr.spec_store.finally_rm(service_name)
f91f0fd5 968
adb31ebb 969 def convert_tags_to_repo_digest(self) -> None:
f91f0fd5
TL
970 if not self.mgr.use_repo_digest:
971 return
972 settings = self.mgr.upgrade.get_distinct_container_image_settings()
973 digests: Dict[str, ContainerInspectInfo] = {}
974 for container_image_ref in set(settings.values()):
975 if not is_repo_digest(container_image_ref):
f67539c2
TL
976 image_info = self._get_container_image_info(container_image_ref)
977 if image_info.repo_digests:
978 # FIXME: we assume the first digest here is the best
979 assert is_repo_digest(image_info.repo_digests[0]), image_info
f91f0fd5
TL
980 digests[container_image_ref] = image_info
981
982 for entity, container_image_ref in settings.items():
983 if not is_repo_digest(container_image_ref):
984 image_info = digests[container_image_ref]
f67539c2
TL
985 if image_info.repo_digests:
986 # FIXME: we assume the first digest here is the best
987 self.mgr.set_container_image(entity, image_info.repo_digests[0])
988
989 def _create_daemon(self,
990 daemon_spec: CephadmDaemonDeploySpec,
991 reconfig: bool = False,
992 osd_uuid_map: Optional[Dict[str, Any]] = None,
993 ) -> str:
994
995 with set_exception_subject('service', orchestrator.DaemonDescription(
996 daemon_type=daemon_spec.daemon_type,
997 daemon_id=daemon_spec.daemon_id,
998 hostname=daemon_spec.host,
999 ).service_id(), overwrite=True):
1000
1001 try:
1002 image = ''
1003 start_time = datetime_now()
1004 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1005
1006 if daemon_spec.daemon_type == 'container':
1007 spec = cast(CustomContainerSpec,
1008 self.mgr.spec_store[daemon_spec.service_name].spec)
1009 image = spec.image
1010 if spec.ports:
1011 ports.extend(spec.ports)
1012
1013 if daemon_spec.daemon_type == 'cephadm-exporter':
1014 if not reconfig:
1015 assert daemon_spec.host
1016 self._deploy_cephadm_binary(daemon_spec.host)
1017
f67539c2
TL
1018 # TCP port to open in the host firewall
1019 if len(ports) > 0:
1020 daemon_spec.extra_args.extend([
1021 '--tcp-ports', ' '.join(map(str, ports))
1022 ])
1023
1024 # osd deployments needs an --osd-uuid arg
1025 if daemon_spec.daemon_type == 'osd':
1026 if not osd_uuid_map:
1027 osd_uuid_map = self.mgr.get_osd_uuid_map()
1028 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1029 if not osd_uuid:
1030 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1031 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1032
1033 if reconfig:
1034 daemon_spec.extra_args.append('--reconfig')
1035 if self.mgr.allow_ptrace:
1036 daemon_spec.extra_args.append('--allow-ptrace')
1037
1038 if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
1039 self._registry_login(daemon_spec.host, self.mgr.registry_url,
1040 self.mgr.registry_username, self.mgr.registry_password)
1041
1042 self.log.info('%s daemon %s on %s' % (
1043 'Reconfiguring' if reconfig else 'Deploying',
1044 daemon_spec.name(), daemon_spec.host))
1045
1046 out, err, code = self._run_cephadm(
1047 daemon_spec.host, daemon_spec.name(), 'deploy',
1048 [
1049 '--name', daemon_spec.name(),
1050 '--meta-json', json.dumps({
1051 'service_name': daemon_spec.service_name,
1052 'ports': daemon_spec.ports,
1053 'ip': daemon_spec.ip,
1054 'deployed_by': self.mgr.get_active_mgr_digests(),
b3b6e05e
TL
1055 'rank': daemon_spec.rank,
1056 'rank_generation': daemon_spec.rank_generation,
f67539c2
TL
1057 }),
1058 '--config-json', '-',
1059 ] + daemon_spec.extra_args,
1060 stdin=json.dumps(daemon_spec.final_config),
1061 image=image)
1062
1063 # refresh daemon state? (ceph daemon reconfig does not need it)
1064 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
1065 if not code and daemon_spec.host in self.mgr.cache.daemons:
1066 # prime cached service state with what we (should have)
1067 # just created
1068 sd = daemon_spec.to_daemon_description(
1069 DaemonDescriptionStatus.running, 'starting')
1070 self.mgr.cache.add_daemon(daemon_spec.host, sd)
522d829b 1071 if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS:
a4b75251 1072 self.mgr.requires_post_actions.add(daemon_spec.name())
f67539c2
TL
1073 self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
1074
1075 self.mgr.cache.update_daemon_config_deps(
1076 daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
1077 self.mgr.cache.save_host(daemon_spec.host)
1078 msg = "{} {} on host '{}'".format(
1079 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1080 if not code:
1081 self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1082 else:
1083 what = 'reconfigure' if reconfig else 'deploy'
1084 self.mgr.events.for_daemon(
1085 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1086 return msg
1087 except OrchestratorError:
1088 redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names()
1089 if not reconfig and not redeploy:
1090 # we have to clean up the daemon. E.g. keyrings.
1091 servict_type = daemon_type_to_service(daemon_spec.daemon_type)
1092 dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
a4b75251 1093 self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
f67539c2
TL
1094 raise
1095
1096 def _remove_daemon(self, name: str, host: str) -> str:
1097 """
1098 Remove a daemon
1099 """
1100 (daemon_type, daemon_id) = name.split('.', 1)
1101 daemon = orchestrator.DaemonDescription(
1102 daemon_type=daemon_type,
1103 daemon_id=daemon_id,
1104 hostname=host)
1105
1106 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1107
1108 self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
1109
1110 # NOTE: we are passing the 'force' flag here, which means
1111 # we can delete a mon instances data.
1112 args = ['--name', name, '--force']
1113 self.log.info('Removing daemon %s from %s' % (name, host))
1114 out, err, code = self._run_cephadm(
1115 host, name, 'rm-daemon', args)
1116 if not code:
1117 # remove item from cache
1118 self.mgr.cache.rm_daemon(host, name)
1119 self.mgr.cache.invalidate_host_daemons(host)
1120
a4b75251
TL
1121 self.mgr.cephadm_services[daemon_type_to_service(
1122 daemon_type)].post_remove(daemon, is_failed_deploy=False)
f67539c2
TL
1123
1124 return "Removed {} from host '{}'".format(name, host)
adb31ebb
TL
1125
1126 def _run_cephadm_json(self,
1127 host: str,
1128 entity: Union[CephadmNoImage, str],
1129 command: str,
1130 args: List[str],
1131 no_fsid: Optional[bool] = False,
1132 image: Optional[str] = "",
1133 ) -> Any:
1134 try:
f67539c2 1135 out, err, code = self._run_cephadm(
adb31ebb
TL
1136 host, entity, command, args, no_fsid=no_fsid, image=image)
1137 if code:
1138 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
1139 except Exception as e:
1140 raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}')
1141 try:
1142 return json.loads(''.join(out))
1143 except (ValueError, KeyError):
1144 msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON'
1145 self.log.exception(f'{msg}: {"".join(out)}')
1146 raise OrchestratorError(msg)
1147
f67539c2
TL
1148 def _run_cephadm(self,
1149 host: str,
1150 entity: Union[CephadmNoImage, str],
1151 command: str,
1152 args: List[str],
1153 addr: Optional[str] = "",
1154 stdin: Optional[str] = "",
1155 no_fsid: Optional[bool] = False,
1156 error_ok: Optional[bool] = False,
1157 image: Optional[str] = "",
1158 env_vars: Optional[List[str]] = None,
1159 ) -> Tuple[List[str], List[str], int]:
1160 """
1161 Run cephadm on the remote host with the given command + args
1162
1163 Important: You probably don't want to run _run_cephadm from CLI handlers
1164
1165 :env_vars: in format -> [KEY=VALUE, ..]
1166 """
1167 self.log.debug(f"_run_cephadm : command = {command}")
1168 self.log.debug(f"_run_cephadm : args = {args}")
1169
1170 bypass_image = ('cephadm-exporter',)
1171
1172 with self._remote_connection(host, addr) as tpl:
1173 conn, connr = tpl
1174 assert image or entity
1175 # Skip the image check for daemons deployed that are not ceph containers
1176 if not str(entity).startswith(bypass_image):
1177 if not image and entity is not cephadmNoImage:
1178 image = self.mgr._get_container_image(entity)
1179
1180 final_args = []
1181
1182 # global args
1183 if env_vars:
1184 for env_var_pair in env_vars:
1185 final_args.extend(['--env', env_var_pair])
1186
1187 if image:
1188 final_args.extend(['--image', image])
1189
1190 if not self.mgr.container_init:
1191 final_args += ['--no-container-init']
1192
1193 # subcommand
1194 final_args.append(command)
1195
1196 # subcommand args
1197 if not no_fsid:
1198 final_args += ['--fsid', self.mgr._cluster_fsid]
1199
1200 final_args += args
1201
1202 # exec
1203 self.log.debug('args: %s' % (' '.join(final_args)))
1204 if self.mgr.mode == 'root':
1205 if stdin:
1206 self.log.debug('stdin: %s' % stdin)
1207
1208 python = connr.choose_python()
1209 if not python:
1210 raise RuntimeError(
1211 'unable to find python on %s (tried %s in %s)' % (
1212 host, remotes.PYTHONS, remotes.PATH))
1213 try:
1214 out, err, code = remoto.process.check(
1215 conn,
1216 [python, self.mgr.cephadm_binary_path] + final_args,
1217 stdin=stdin.encode('utf-8') if stdin is not None else None)
1218 if code == 2:
1219 out_ls, err_ls, code_ls = remoto.process.check(
1220 conn, ['ls', self.mgr.cephadm_binary_path])
1221 if code_ls == 2:
1222 self._deploy_cephadm_binary_conn(conn, host)
1223 out, err, code = remoto.process.check(
1224 conn,
1225 [python, self.mgr.cephadm_binary_path] + final_args,
1226 stdin=stdin.encode('utf-8') if stdin is not None else None)
1227
1228 except RuntimeError as e:
1229 self.mgr._reset_con(host)
1230 if error_ok:
1231 return [], [str(e)], 1
1232 raise
1233
1234 elif self.mgr.mode == 'cephadm-package':
1235 try:
1236 out, err, code = remoto.process.check(
1237 conn,
1238 ['sudo', '/usr/bin/cephadm'] + final_args,
1239 stdin=stdin)
1240 except RuntimeError as e:
1241 self.mgr._reset_con(host)
1242 if error_ok:
1243 return [], [str(e)], 1
1244 raise
1245 else:
1246 assert False, 'unsupported mode'
1247
1248 self.log.debug('code: %d' % code)
1249 if out:
1250 self.log.debug('out: %s' % '\n'.join(out))
1251 if err:
1252 self.log.debug('err: %s' % '\n'.join(err))
1253 if code and not error_ok:
1254 raise OrchestratorError(
1255 'cephadm exited with an error code: %d, stderr:%s' % (
1256 code, '\n'.join(err)))
1257 return out, err, code
1258
1259 def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
1260 # pick a random host...
1261 host = None
1262 for host_name in self.mgr.inventory.keys():
1263 host = host_name
1264 break
1265 if not host:
1266 raise OrchestratorError('no hosts defined')
1267 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
1268 self._registry_login(host, self.mgr.registry_url,
1269 self.mgr.registry_username, self.mgr.registry_password)
1270
a4b75251
TL
1271 pullargs: List[str] = []
1272 if self.mgr.registry_insecure:
1273 pullargs.append("--insecure")
1274
1275 j = self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
f67539c2
TL
1276
1277 r = ContainerInspectInfo(
1278 j['image_id'],
1279 j.get('ceph_version'),
1280 j.get('repo_digests')
1281 )
1282 self.log.debug(f'image {image_name} -> {r}')
1283 return r
1284
1285 # function responsible for logging single host into custom registry
1286 def _registry_login(self, host: str, url: Optional[str], username: Optional[str], password: Optional[str]) -> Optional[str]:
1287 self.log.debug(f"Attempting to log host {host} into custom registry @ {url}")
1288 # want to pass info over stdin rather than through normal list of args
1289 args_str = json.dumps({
1290 'url': url,
1291 'username': username,
1292 'password': password,
1293 })
1294 out, err, code = self._run_cephadm(
1295 host, 'mon', 'registry-login',
1296 ['--registry-json', '-'], stdin=args_str, error_ok=True)
1297 if code:
1298 return f"Host {host} failed to login to {url} as {username} with given password"
1299 return None
1300
1301 def _deploy_cephadm_binary(self, host: str) -> None:
1302 # Use tee (from coreutils) to create a copy of cephadm on the target machine
1303 self.log.info(f"Deploying cephadm binary to {host}")
1304 with self._remote_connection(host) as tpl:
1305 conn, _connr = tpl
1306 return self._deploy_cephadm_binary_conn(conn, host)
1307
1308 def _deploy_cephadm_binary_conn(self, conn: "BaseConnection", host: str) -> None:
1309 _out, _err, code = remoto.process.check(
1310 conn,
1311 ['mkdir', '-p', f'/var/lib/ceph/{self.mgr._cluster_fsid}'])
1312 if code:
1313 msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
1314 self.log.warning(msg)
1315 raise OrchestratorError(msg)
1316 _out, _err, code = remoto.process.check(
1317 conn,
1318 ['tee', '-', self.mgr.cephadm_binary_path],
1319 stdin=self.mgr._cephadm.encode('utf-8'))
1320 if code:
1321 msg = f"Unable to deploy the cephadm binary to {host}: {_err}"
1322 self.log.warning(msg)
1323 raise OrchestratorError(msg)
1324
b3b6e05e
TL
1325 def _write_remote_file(self,
1326 host: str,
1327 path: str,
1328 content: bytes,
1329 mode: int,
1330 uid: int,
1331 gid: int) -> None:
1332 with self._remote_connection(host) as tpl:
1333 conn, connr = tpl
1334 try:
1335 errmsg = connr.write_file(path, content, mode, uid, gid)
1336 if errmsg is not None:
1337 raise OrchestratorError(errmsg)
1338 except Exception as e:
1339 msg = f"Unable to write {host}:{path}: {e}"
1340 self.log.warning(msg)
1341 raise OrchestratorError(msg)
1342
f67539c2
TL
1343 @contextmanager
1344 def _remote_connection(self,
1345 host: str,
1346 addr: Optional[str] = None,
1347 ) -> Iterator[Tuple["BaseConnection", Any]]:
1348 if not addr and host in self.mgr.inventory:
1349 addr = self.mgr.inventory.get_addr(host)
1350
1351 self.mgr.offline_hosts_remove(host)
1352
1353 try:
1354 try:
1355 if not addr:
1356 raise OrchestratorError("host address is empty")
1357 conn, connr = self.mgr._get_connection(addr)
1358 except OSError as e:
1359 self.mgr._reset_con(host)
1360 msg = f"Can't communicate with remote host `{addr}`, possibly because python3 is not installed there: {str(e)}"
1361 raise execnet.gateway_bootstrap.HostNotFound(msg)
1362
1363 yield (conn, connr)
1364
1365 except execnet.gateway_bootstrap.HostNotFound as e:
1366 # this is a misleading exception as it seems to be thrown for
1367 # any sort of connection failure, even those having nothing to
1368 # do with "host not found" (e.g., ssh key permission denied).
1369 self.mgr.offline_hosts.add(host)
1370 self.mgr._reset_con(host)
1371
1372 user = self.mgr.ssh_user if self.mgr.mode == 'root' else 'cephadm'
1373 if str(e).startswith("Can't communicate"):
1374 msg = str(e)
1375 else:
1376 msg = f'''Failed to connect to {host} ({addr}).
1377Please make sure that the host is reachable and accepts connections using the cephadm SSH key
1378
1379To add the cephadm SSH key to the host:
1380> ceph cephadm get-pub-key > ~/ceph.pub
1381> ssh-copy-id -f -i ~/ceph.pub {user}@{addr}
1382
b3b6e05e
TL
1383To check that the host is reachable open a new shell with the --no-hosts flag:
1384> cephadm shell --no-hosts
1385
1386Then run the following:
f67539c2
TL
1387> ceph cephadm get-ssh-config > ssh_config
1388> ceph config-key get mgr/cephadm/ssh_identity_key > ~/cephadm_private_key
1389> chmod 0600 ~/cephadm_private_key
1390> ssh -F ssh_config -i ~/cephadm_private_key {user}@{addr}'''
1391 raise OrchestratorError(msg) from e
1392 except Exception as ex:
1393 self.log.exception(ex)
1394 raise