]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/serve.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
CommitLineData
b3b6e05e 1import hashlib
f91f0fd5
TL
2import json
3import logging
b3b6e05e 4import uuid
f91f0fd5 5from collections import defaultdict
20effc67
TL
6from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
7 DefaultDict
f91f0fd5
TL
8
9from ceph.deployment import inventory
10from ceph.deployment.drive_group import DriveGroupSpec
b3b6e05e 11from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
20effc67 12from ceph.utils import datetime_now
f91f0fd5
TL
13
14import orchestrator
f67539c2
TL
15from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
16 DaemonDescriptionStatus, daemon_type_to_service
17from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
f91f0fd5 18from cephadm.schedule import HostAssignment
b3b6e05e 19from cephadm.autotune import MemoryAutotuner
f67539c2
TL
20from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
21 CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
22from mgr_module import MonCommandFailed
b3b6e05e 23from mgr_util import format_bytes
f67539c2
TL
24
25from . import utils
f91f0fd5
TL
26
27if TYPE_CHECKING:
f67539c2 28 from cephadm.module import CephadmOrchestrator
f91f0fd5
TL
29
30logger = logging.getLogger(__name__)
31
522d829b
TL
32REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw']
33
f91f0fd5
TL
34
35class CephadmServe:
36 """
37 This module contains functions that are executed in the
38 serve() thread. Thus they don't block the CLI.
39
f67539c2
TL
40 Please see the `Note regarding network calls from CLI handlers`
41 chapter in the cephadm developer guide.
42
f91f0fd5
TL
43 On the other hand, These function should *not* be called form
44 CLI handlers, to avoid blocking the CLI
45 """
46
47 def __init__(self, mgr: "CephadmOrchestrator"):
48 self.mgr: "CephadmOrchestrator" = mgr
49 self.log = logger
50
51 def serve(self) -> None:
52 """
53 The main loop of cephadm.
54
55 A command handler will typically change the declarative state
56 of cephadm. This loop will then attempt to apply this new state.
57 """
58 self.log.debug("serve starting")
f67539c2
TL
59 self.mgr.config_checker.load_network_config()
60
f91f0fd5 61 while self.mgr.run:
20effc67 62 self.log.debug("serve loop start")
f91f0fd5
TL
63
64 try:
65
66 self.convert_tags_to_repo_digest()
67
68 # refresh daemons
69 self.log.debug('refreshing hosts and daemons')
70 self._refresh_hosts_and_daemons()
71
72 self._check_for_strays()
73
74 self._update_paused_health()
75
522d829b
TL
76 if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard:
77 self.mgr.need_connect_dashboard_rgw = False
78 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
79 self.log.info('Checking dashboard <-> RGW credentials')
80 self.mgr.remote('dashboard', 'set_rgw_credentials')
81
f91f0fd5 82 if not self.mgr.paused:
adb31ebb 83 self.mgr.to_remove_osds.process_removal_queue()
f91f0fd5
TL
84
85 self.mgr.migration.migrate()
86 if self.mgr.migration.is_migration_ongoing():
87 continue
88
89 if self._apply_all_services():
90 continue # did something, refresh
91
92 self._check_daemons()
93
f67539c2
TL
94 self._purge_deleted_services()
95
20effc67
TL
96 self._check_for_moved_osds()
97
98 if self.mgr.agent_helpers._handle_use_agent_setting():
99 continue
100
f91f0fd5
TL
101 if self.mgr.upgrade.continue_upgrade():
102 continue
103
104 except OrchestratorError as e:
105 if e.event_subject:
106 self.mgr.events.from_orch_error(e)
107
20effc67 108 self.log.debug("serve loop sleep")
f91f0fd5 109 self._serve_sleep()
20effc67 110 self.log.debug("serve loop wake")
f91f0fd5
TL
111 self.log.debug("serve exit")
112
adb31ebb 113 def _serve_sleep(self) -> None:
f67539c2
TL
114 sleep_interval = max(
115 30,
116 min(
117 self.mgr.host_check_interval,
118 self.mgr.facts_cache_timeout,
119 self.mgr.daemon_cache_timeout,
120 self.mgr.device_cache_timeout,
121 )
122 )
f91f0fd5 123 self.log.debug('Sleeping for %d seconds', sleep_interval)
f67539c2 124 self.mgr.event.wait(sleep_interval)
f91f0fd5
TL
125 self.mgr.event.clear()
126
adb31ebb 127 def _update_paused_health(self) -> None:
20effc67 128 self.log.debug('_update_paused_health')
f91f0fd5 129 if self.mgr.paused:
20effc67
TL
130 self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [
131 "'ceph orch resume' to resume"])
f91f0fd5 132 else:
a4b75251 133 self.mgr.remove_health_warning('CEPHADM_PAUSED')
f91f0fd5 134
b3b6e05e
TL
135 def _autotune_host_memory(self, host: str) -> None:
136 total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0)
137 if not total_mem:
138 val = None
139 else:
140 total_mem *= 1024 # kb -> bytes
141 total_mem *= self.mgr.autotune_memory_target_ratio
142 a = MemoryAutotuner(
143 daemons=self.mgr.cache.get_daemons_by_host(host),
144 config_get=self.mgr.get_foreign_ceph_option,
145 total_mem=total_mem,
146 )
147 val, osds = a.tune()
148 any_changed = False
149 for o in osds:
150 if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val:
151 self.mgr.check_mon_command({
152 'prefix': 'config rm',
153 'who': o,
154 'name': 'osd_memory_target',
155 })
156 any_changed = True
157 if val is not None:
158 if any_changed:
159 self.mgr.log.info(
160 f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
161 )
162 ret, out, err = self.mgr.mon_command({
163 'prefix': 'config set',
164 'who': f'osd/host:{host}',
165 'name': 'osd_memory_target',
166 'value': str(val),
167 })
168 if ret:
169 self.log.warning(
170 f'Unable to set osd_memory_target on {host} to {val}: {err}'
171 )
172 else:
173 self.mgr.check_mon_command({
174 'prefix': 'config rm',
175 'who': f'osd/host:{host}',
176 'name': 'osd_memory_target',
177 })
178 self.mgr.cache.update_autotune(host)
179
f91f0fd5 180 def _refresh_hosts_and_daemons(self) -> None:
20effc67 181 self.log.debug('_refresh_hosts_and_daemons')
f91f0fd5
TL
182 bad_hosts = []
183 failures = []
184
b3b6e05e 185 if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
20effc67
TL
186 client_files = self._calc_client_files()
187 else:
188 client_files = {}
b3b6e05e 189
20effc67 190 agents_down: List[str] = []
b3b6e05e 191
f91f0fd5 192 @forall_hosts
adb31ebb
TL
193 def refresh(host: str) -> None:
194
f67539c2
TL
195 # skip hosts that are in maintenance - they could be powered off
196 if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
197 return
198
20effc67
TL
199 if self.mgr.use_agent:
200 if self.mgr.agent_helpers._check_agent(host):
201 agents_down.append(host)
202
f91f0fd5
TL
203 if self.mgr.cache.host_needs_check(host):
204 r = self._check_host(host)
205 if r is not None:
206 bad_hosts.append(r)
f91f0fd5 207
20effc67
TL
208 if (
209 not self.mgr.use_agent
210 or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]
211 or host in agents_down
212 ):
213 if self.mgr.cache.host_needs_daemon_refresh(host):
214 self.log.debug('refreshing %s daemons' % host)
215 r = self._refresh_host_daemons(host)
216 if r:
217 failures.append(r)
218
219 if self.mgr.cache.host_needs_facts_refresh(host):
220 self.log.debug(('Refreshing %s facts' % host))
221 r = self._refresh_facts(host)
222 if r:
223 failures.append(r)
224
225 if self.mgr.cache.host_needs_network_refresh(host):
226 self.log.debug(('Refreshing %s networks' % host))
227 r = self._refresh_host_networks(host)
228 if r:
229 failures.append(r)
230
231 if self.mgr.cache.host_needs_device_refresh(host):
232 self.log.debug('refreshing %s devices' % host)
233 r = self._refresh_host_devices(host)
234 if r:
235 failures.append(r)
236 self.mgr.cache.metadata_up_to_date[host] = True
237 elif not self.mgr.cache.get_daemons_by_type('agent', host=host):
238 if self.mgr.cache.host_needs_daemon_refresh(host):
239 self.log.debug('refreshing %s daemons' % host)
240 r = self._refresh_host_daemons(host)
241 if r:
242 failures.append(r)
243 self.mgr.cache.metadata_up_to_date[host] = True
244
245 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'):
f91f0fd5 246 self.log.debug(f"Logging `{host}` into custom registry")
20effc67
TL
247 r = self.mgr.wait_async(self._registry_login(
248 host, json.loads(str(self.mgr.get_store('registry_credentials')))))
f91f0fd5
TL
249 if r:
250 bad_hosts.append(r)
251
f91f0fd5
TL
252 if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
253 self.log.debug(f"refreshing OSDSpec previews for {host}")
254 r = self._refresh_host_osdspec_previews(host)
255 if r:
256 failures.append(r)
257
b3b6e05e
TL
258 if (
259 self.mgr.cache.host_needs_autotune_memory(host)
260 and not self.mgr.inventory.has_label(host, '_no_autotune_memory')
261 ):
262 self.log.debug(f"autotuning memory for {host}")
263 self._autotune_host_memory(host)
264
20effc67 265 self._write_client_files(client_files, host)
f91f0fd5
TL
266
267 refresh(self.mgr.cache.get_hosts())
268
20effc67
TL
269 self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
270
f67539c2
TL
271 self.mgr.config_checker.run_checks()
272
adb31ebb
TL
273 for k in [
274 'CEPHADM_HOST_CHECK_FAILED',
adb31ebb
TL
275 'CEPHADM_REFRESH_FAILED',
276 ]:
a4b75251 277 self.mgr.remove_health_warning(k)
f91f0fd5 278 if bad_hosts:
20effc67
TL
279 self.mgr.set_health_warning(
280 'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
f91f0fd5 281 if failures:
20effc67
TL
282 self.mgr.set_health_warning(
283 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
284 self.mgr.update_failed_daemon_health_check()
f91f0fd5 285
adb31ebb 286 def _check_host(self, host: str) -> Optional[str]:
f91f0fd5 287 if host not in self.mgr.inventory:
adb31ebb 288 return None
f91f0fd5
TL
289 self.log.debug(' checking %s' % host)
290 try:
522d829b 291 addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
20effc67 292 out, err, code = self.mgr.wait_async(self._run_cephadm(
f91f0fd5 293 host, cephadmNoImage, 'check-host', [],
20effc67 294 error_ok=True, no_fsid=True))
f91f0fd5
TL
295 self.mgr.cache.update_last_host_check(host)
296 self.mgr.cache.save_host(host)
297 if code:
522d829b 298 self.log.debug(' host %s (%s) failed check' % (host, addr))
f91f0fd5 299 if self.mgr.warn_on_failed_host_check:
522d829b 300 return 'host %s (%s) failed check: %s' % (host, addr, err)
f91f0fd5 301 else:
522d829b 302 self.log.debug(' host %s (%s) ok' % (host, addr))
f91f0fd5 303 except Exception as e:
522d829b
TL
304 self.log.debug(' host %s (%s) failed check' % (host, addr))
305 return 'host %s (%s) failed check: %s' % (host, addr, e)
adb31ebb 306 return None
f91f0fd5 307
adb31ebb 308 def _refresh_host_daemons(self, host: str) -> Optional[str]:
f91f0fd5 309 try:
20effc67 310 ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True))
adb31ebb
TL
311 except OrchestratorError as e:
312 return str(e)
20effc67 313 self.mgr._process_ls_output(host, ls)
f91f0fd5
TL
314 return None
315
adb31ebb 316 def _refresh_facts(self, host: str) -> Optional[str]:
f91f0fd5 317 try:
20effc67
TL
318 val = self.mgr.wait_async(self._run_cephadm_json(
319 host, cephadmNoImage, 'gather-facts', [], no_fsid=True))
adb31ebb
TL
320 except OrchestratorError as e:
321 return str(e)
322
323 self.mgr.cache.update_host_facts(host, val)
324
325 return None
326
327 def _refresh_host_devices(self, host: str) -> Optional[str]:
20effc67 328 with_lsm = self.mgr.device_enhanced_scan
f67539c2 329 inventory_args = ['--', 'inventory',
a4b75251 330 '--format=json-pretty',
f67539c2
TL
331 '--filter-for-batch']
332 if with_lsm:
333 inventory_args.insert(-1, "--with-lsm")
334
f91f0fd5 335 try:
adb31ebb 336 try:
20effc67
TL
337 devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
338 inventory_args))
adb31ebb
TL
339 except OrchestratorError as e:
340 if 'unrecognized arguments: --filter-for-batch' in str(e):
f67539c2
TL
341 rerun_args = inventory_args.copy()
342 rerun_args.remove('--filter-for-batch')
20effc67
TL
343 devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
344 rerun_args))
adb31ebb
TL
345 else:
346 raise
347
adb31ebb
TL
348 except OrchestratorError as e:
349 return str(e)
350
20effc67
TL
351 self.log.debug('Refreshed host %s devices (%d)' % (
352 host, len(devices)))
adb31ebb 353 ret = inventory.Devices.from_json(devices)
20effc67 354 self.mgr.cache.update_host_devices(host, ret.devices)
f91f0fd5
TL
355 self.update_osdspec_previews(host)
356 self.mgr.cache.save_host(host)
357 return None
358
20effc67
TL
359 def _refresh_host_networks(self, host: str) -> Optional[str]:
360 try:
361 networks = self.mgr.wait_async(self._run_cephadm_json(
362 host, 'mon', 'list-networks', [], no_fsid=True))
363 except OrchestratorError as e:
364 return str(e)
365
366 self.log.debug('Refreshed host %s networks (%s)' % (
367 host, len(networks)))
368 self.mgr.cache.update_host_networks(host, networks)
369 self.mgr.cache.save_host(host)
370 return None
371
adb31ebb 372 def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
f91f0fd5
TL
373 self.update_osdspec_previews(host)
374 self.mgr.cache.save_host(host)
375 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
adb31ebb 376 return None
f91f0fd5 377
adb31ebb 378 def update_osdspec_previews(self, search_host: str = '') -> None:
f91f0fd5
TL
379 # Set global 'pending' flag for host
380 self.mgr.cache.loading_osdspec_preview.add(search_host)
381 previews = []
382 # query OSDSpecs for host <search host> and generate/get the preview
383 # There can be multiple previews for one host due to multiple OSDSpecs.
384 previews.extend(self.mgr.osd_service.get_previews(search_host))
f67539c2 385 self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>')
f91f0fd5
TL
386 self.mgr.cache.osdspec_previews[search_host] = previews
387 # Unset global 'pending' flag for host
388 self.mgr.cache.loading_osdspec_preview.remove(search_host)
389
f91f0fd5
TL
390 def _check_for_strays(self) -> None:
391 self.log.debug('_check_for_strays')
392 for k in ['CEPHADM_STRAY_HOST',
393 'CEPHADM_STRAY_DAEMON']:
a4b75251 394 self.mgr.remove_health_warning(k)
f91f0fd5
TL
395 if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
396 ls = self.mgr.list_servers()
a4b75251 397 self.log.debug(ls)
f91f0fd5
TL
398 managed = self.mgr.cache.get_daemon_names()
399 host_detail = [] # type: List[str]
400 host_num_daemons = 0
401 daemon_detail = [] # type: List[str]
402 for item in ls:
403 host = item.get('hostname')
f67539c2 404 assert isinstance(host, str)
f91f0fd5 405 daemons = item.get('services') # misnomer!
f67539c2 406 assert isinstance(daemons, list)
f91f0fd5
TL
407 missing_names = []
408 for s in daemons:
f67539c2
TL
409 daemon_id = s.get('id')
410 assert daemon_id
411 name = '%s.%s' % (s.get('type'), daemon_id)
412 if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
f91f0fd5 413 metadata = self.mgr.get_metadata(
f67539c2
TL
414 cast(str, s.get('type')), daemon_id, {})
415 assert metadata is not None
416 try:
417 if s.get('type') == 'rgw-nfs':
418 # https://tracker.ceph.com/issues/49573
419 name = metadata['id'][:-4]
420 else:
421 name = '%s.%s' % (s.get('type'), metadata['id'])
422 except (KeyError, TypeError):
f91f0fd5 423 self.log.debug(
f67539c2
TL
424 "Failed to find daemon id for %s service %s" % (
425 s.get('type'), s.get('id')
426 )
427 )
20effc67
TL
428 if s.get('type') == 'tcmu-runner':
429 # because we don't track tcmu-runner daemons in the host cache
430 # and don't have a way to check if the daemon is part of iscsi service
431 # we assume that all tcmu-runner daemons are managed by cephadm
432 managed.append(name)
f91f0fd5
TL
433 if host not in self.mgr.inventory:
434 missing_names.append(name)
435 host_num_daemons += 1
436 if name not in managed:
437 daemon_detail.append(
438 'stray daemon %s on host %s not managed by cephadm' % (name, host))
439 if missing_names:
440 host_detail.append(
441 'stray host %s has %d stray daemons: %s' % (
442 host, len(missing_names), missing_names))
443 if self.mgr.warn_on_stray_hosts and host_detail:
20effc67
TL
444 self.mgr.set_health_warning(
445 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
f91f0fd5 446 if self.mgr.warn_on_stray_daemons and daemon_detail:
20effc67
TL
447 self.mgr.set_health_warning(
448 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
449
450 def _check_for_moved_osds(self) -> None:
451 self.log.debug('_check_for_moved_osds')
452 all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
453 for dd in self.mgr.cache.get_daemons_by_type('osd'):
454 assert dd.daemon_id
455 all_osds[int(dd.daemon_id)].append(dd)
456 for osd_id, dds in all_osds.items():
457 if len(dds) <= 1:
458 continue
459 running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
460 error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
461 msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
462 logger.info(msg)
463 if len(running) != 1:
464 continue
465 osd = self.mgr.get_osd_by_id(osd_id)
466 if not osd or not osd['up']:
467 continue
468 for e in error:
469 assert e.hostname
470 try:
471 self._remove_daemon(e.name(), e.hostname, no_post_remove=True)
472 self.mgr.events.for_daemon(
473 e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'")
474 except OrchestratorError as ex:
475 self.mgr.events.from_orch_error(ex)
476 logger.exception(f'failed to remove duplicated daemon {e}')
f91f0fd5
TL
477
478 def _apply_all_services(self) -> bool:
20effc67 479 self.log.debug('_apply_all_services')
f91f0fd5
TL
480 r = False
481 specs = [] # type: List[ServiceSpec]
20effc67
TL
482 # if metadata is not up to date, we still need to apply spec for agent
483 # since the agent is the one who gather the metadata. If we don't we
484 # end up stuck between wanting metadata to be up to date to apply specs
485 # and needing to apply the agent spec to get up to date metadata
486 if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
487 self.log.info('Metadata not up to date on all hosts. Skipping non agent specs')
488 try:
489 specs.append(self.mgr.spec_store['agent'].spec)
490 except Exception as e:
491 self.log.debug(f'Failed to find agent spec: {e}')
492 self.mgr.agent_helpers._apply_agent()
493 return r
494 else:
495 for sn, spec in self.mgr.spec_store.active_specs.items():
496 specs.append(spec)
a4b75251
TL
497 for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
498 self.mgr.remove_health_warning(name)
499 self.mgr.apply_spec_fails = []
f91f0fd5
TL
500 for spec in specs:
501 try:
502 if self._apply_service(spec):
503 r = True
504 except Exception as e:
a4b75251
TL
505 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
506 self.log.exception(msg)
f91f0fd5 507 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
a4b75251
TL
508 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
509 warnings = []
510 for x in self.mgr.apply_spec_fails:
511 warnings.append(f'{x[0]}: {x[1]}')
512 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
513 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
514 len(self.mgr.apply_spec_fails),
515 warnings)
f91f0fd5
TL
516
517 return r
518
f67539c2
TL
519 def _apply_service_config(self, spec: ServiceSpec) -> None:
520 if spec.config:
521 section = utils.name_to_config_section(spec.service_name())
a4b75251
TL
522 for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']:
523 self.mgr.remove_health_warning(name)
524 invalid_config_options = []
525 options_failed_to_set = []
f67539c2
TL
526 for k, v in spec.config.items():
527 try:
528 current = self.mgr.get_foreign_ceph_option(section, k)
529 except KeyError:
a4b75251
TL
530 msg = f'Ignoring invalid {spec.service_name()} config option {k}'
531 self.log.warning(msg)
f67539c2
TL
532 self.mgr.events.for_service(
533 spec, OrchestratorEvent.ERROR, f'Invalid config option {k}'
534 )
a4b75251 535 invalid_config_options.append(msg)
f67539c2
TL
536 continue
537 if current != v:
538 self.log.debug(f'setting [{section}] {k} = {v}')
539 try:
540 self.mgr.check_mon_command({
541 'prefix': 'config set',
542 'name': k,
543 'value': str(v),
544 'who': section,
545 })
546 except MonCommandFailed as e:
a4b75251
TL
547 msg = f'Failed to set {spec.service_name()} option {k}: {e}'
548 self.log.warning(msg)
549 options_failed_to_set.append(msg)
550
551 if invalid_config_options:
20effc67
TL
552 self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(
553 invalid_config_options), invalid_config_options)
a4b75251 554 if options_failed_to_set:
20effc67
TL
555 self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(
556 options_failed_to_set), options_failed_to_set)
f91f0fd5
TL
557
558 def _apply_service(self, spec: ServiceSpec) -> bool:
559 """
560 Schedule a service. Deploy new daemons or remove old ones, depending
561 on the target label and count specified in the placement.
562 """
563 self.mgr.migration.verify_no_migration()
564
f67539c2 565 service_type = spec.service_type
f91f0fd5
TL
566 service_name = spec.service_name()
567 if spec.unmanaged:
568 self.log.debug('Skipping unmanaged service %s' % service_name)
569 return False
570 if spec.preview_only:
571 self.log.debug('Skipping preview_only service %s' % service_name)
572 return False
573 self.log.debug('Applying service %s spec' % service_name)
574
20effc67
TL
575 if service_type == 'agent':
576 try:
577 assert self.mgr.cherrypy_thread
578 assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
579 except Exception:
580 self.log.info(
581 'Delaying applying agent spec until cephadm endpoint root cert created')
582 return False
583
f67539c2 584 self._apply_service_config(spec)
f91f0fd5 585
f67539c2 586 if service_type == 'osd':
f91f0fd5
TL
587 self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
588 # TODO: return True would result in a busy loop
589 # can't know if daemon count changed; create_from_spec doesn't
590 # return a solid indication
591 return False
592
f67539c2 593 svc = self.mgr.cephadm_services[service_type]
f91f0fd5
TL
594 daemons = self.mgr.cache.get_daemons_by_service(service_name)
595
b3b6e05e 596 public_networks: List[str] = []
f67539c2
TL
597 if service_type == 'mon':
598 out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network'))
f91f0fd5 599 if '/' in out:
b3b6e05e
TL
600 public_networks = [x.strip() for x in out.split(',')]
601 self.log.debug('mon public_network(s) is %s' % public_networks)
f91f0fd5
TL
602
603 def matches_network(host):
604 # type: (str) -> bool
b3b6e05e 605 # make sure we have 1 or more IPs for any of those networks on that
f91f0fd5 606 # host
b3b6e05e
TL
607 for network in public_networks:
608 if len(self.mgr.cache.networks[host].get(network, [])) > 0:
609 return True
610 self.log.info(
611 f"Filtered out host {host}: does not belong to mon public_network"
612 f" ({','.join(public_networks)})"
613 )
614 return False
f91f0fd5 615
b3b6e05e
TL
616 rank_map = None
617 if svc.ranked():
618 rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
f91f0fd5
TL
619 ha = HostAssignment(
620 spec=spec,
20effc67
TL
621 hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name(
622 ) == 'agent' else self.mgr.cache.get_schedulable_hosts(),
623 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
f67539c2
TL
624 daemons=daemons,
625 networks=self.mgr.cache.networks,
626 filter_new_host=(
627 matches_network if service_type == 'mon'
628 else None
629 ),
630 allow_colo=svc.allow_colo(),
631 primary_daemon_type=svc.primary_daemon_type(),
632 per_host_daemon_type=svc.per_host_daemon_type(),
b3b6e05e 633 rank_map=rank_map,
f91f0fd5
TL
634 )
635
f67539c2
TL
636 try:
637 all_slots, slots_to_add, daemons_to_remove = ha.place()
b3b6e05e 638 daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get(
522d829b 639 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)]
f67539c2
TL
640 self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove))
641 except OrchestratorError as e:
a4b75251
TL
642 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
643 self.log.error(msg)
f67539c2 644 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
a4b75251
TL
645 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
646 warnings = []
647 for x in self.mgr.apply_spec_fails:
648 warnings.append(f'{x[0]}: {x[1]}')
649 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
650 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
651 len(self.mgr.apply_spec_fails),
652 warnings)
f67539c2 653 return False
f91f0fd5
TL
654
655 r = None
656
657 # sanity check
f67539c2
TL
658 final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove)
659 if service_type in ['mon', 'mgr'] and final_count < 1:
660 self.log.debug('cannot scale mon|mgr below 1)')
f91f0fd5
TL
661 return False
662
b3b6e05e
TL
663 # progress
664 progress_id = str(uuid.uuid4())
665 delta: List[str] = []
666 if slots_to_add:
667 delta += [f'+{len(slots_to_add)}']
668 if daemons_to_remove:
669 delta += [f'-{len(daemons_to_remove)}']
670 progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
671 progress_total = len(slots_to_add) + len(daemons_to_remove)
672 progress_done = 0
673
674 def update_progress() -> None:
675 self.mgr.remote(
676 'progress', 'update', progress_id,
677 ev_msg=progress_title,
678 ev_progress=(progress_done / progress_total),
679 add_to_ceph_s=True,
680 )
681
682 if progress_total:
683 update_progress()
684
f91f0fd5
TL
685 # add any?
686 did_config = False
687
f67539c2
TL
688 self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
689 self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
f91f0fd5 690
20effc67
TL
691 hosts_altered: Set[str] = set()
692
522d829b
TL
693 try:
694 # assign names
695 for i in range(len(slots_to_add)):
696 slot = slots_to_add[i]
697 slot = slot.assign_name(self.mgr.get_unique_name(
698 slot.daemon_type,
699 slot.hostname,
700 daemons,
701 prefix=spec.service_id,
702 forcename=slot.name,
703 rank=slot.rank,
704 rank_generation=slot.rank_generation,
705 ))
706 slots_to_add[i] = slot
707 if rank_map is not None:
708 assert slot.rank is not None
709 assert slot.rank_generation is not None
710 assert rank_map[slot.rank][slot.rank_generation] is None
711 rank_map[slot.rank][slot.rank_generation] = slot.name
712
713 if rank_map:
714 # record the rank_map before we make changes so that if we fail the
715 # next mgr will clean up.
716 self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
717
718 # remove daemons now, since we are going to fence them anyway
f67539c2 719 for d in daemons_to_remove:
522d829b 720 assert d.hostname is not None
f67539c2 721 self._remove_daemon(d.name(), d.hostname)
522d829b
TL
722 daemons_to_remove = []
723
724 # fence them
725 svc.fence_old_ranks(spec, rank_map, len(all_slots))
726
727 # create daemons
a4b75251 728 daemon_place_fails = []
522d829b
TL
729 for slot in slots_to_add:
730 # first remove daemon on conflicting port?
731 if slot.ports:
732 for d in daemons_to_remove:
733 if d.hostname != slot.hostname:
734 continue
735 if not (set(d.ports or []) & set(slot.ports)):
736 continue
737 if d.ip and slot.ip and d.ip != slot.ip:
738 continue
739 self.log.info(
740 f'Removing {d.name()} before deploying to {slot} to avoid a port conflict'
741 )
742 # NOTE: we don't check ok-to-stop here to avoid starvation if
743 # there is only 1 gateway.
744 self._remove_daemon(d.name(), d.hostname)
745 daemons_to_remove.remove(d)
746 progress_done += 1
20effc67 747 hosts_altered.add(d.hostname)
522d829b
TL
748 break
749
750 # deploy new daemon
751 daemon_id = slot.name
752 if not did_config:
753 svc.config(spec)
754 did_config = True
755
756 daemon_spec = svc.make_daemon_spec(
757 slot.hostname, daemon_id, slot.network, spec,
758 daemon_type=slot.daemon_type,
759 ports=slot.ports,
760 ip=slot.ip,
761 rank=slot.rank,
762 rank_generation=slot.rank_generation,
763 )
764 self.log.debug('Placing %s.%s on host %s' % (
765 slot.daemon_type, daemon_id, slot.hostname))
766
767 try:
768 daemon_spec = svc.prepare_create(daemon_spec)
20effc67 769 self.mgr.wait_async(self._create_daemon(daemon_spec))
522d829b 770 r = True
b3b6e05e 771 progress_done += 1
522d829b 772 update_progress()
20effc67 773 hosts_altered.add(daemon_spec.host)
522d829b
TL
774 except (RuntimeError, OrchestratorError) as e:
775 msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
776 f"on {slot.hostname}: {e}")
777 self.mgr.events.for_service(spec, 'ERROR', msg)
778 self.mgr.log.error(msg)
a4b75251 779 daemon_place_fails.append(msg)
522d829b
TL
780 # only return "no change" if no one else has already succeeded.
781 # later successes will also change to True
782 if r is None:
783 r = False
784 progress_done += 1
785 update_progress()
786 continue
f91f0fd5 787
522d829b
TL
788 # add to daemon list so next name(s) will also be unique
789 sd = orchestrator.DaemonDescription(
790 hostname=slot.hostname,
791 daemon_type=slot.daemon_type,
792 daemon_id=daemon_id,
793 )
794 daemons.append(sd)
795
a4b75251 796 if daemon_place_fails:
20effc67
TL
797 self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(
798 daemon_place_fails), daemon_place_fails)
a4b75251 799
522d829b
TL
800 # remove any?
801 def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
802 daemon_ids = [d.daemon_id for d in remove_daemons]
803 assert None not in daemon_ids
804 # setting force flag retains previous behavior
805 r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True)
806 return not r.retval
807
808 while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
809 # let's find a subset that is ok-to-stop
810 daemons_to_remove.pop()
811 for d in daemons_to_remove:
f91f0fd5 812 r = True
522d829b
TL
813 assert d.hostname is not None
814 self._remove_daemon(d.name(), d.hostname)
815
b3b6e05e
TL
816 progress_done += 1
817 update_progress()
20effc67 818 hosts_altered.add(d.hostname)
f91f0fd5 819
522d829b
TL
820 self.mgr.remote('progress', 'complete', progress_id)
821 except Exception as e:
822 self.mgr.remote('progress', 'fail', progress_id, str(e))
823 raise
20effc67
TL
824 finally:
825 if self.mgr.use_agent:
826 # can only send ack to agents if we know for sure port they bound to
827 hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
828 h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
829 self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
b3b6e05e 830
f91f0fd5
TL
831 if r is None:
832 r = False
833 return r
834
835 def _check_daemons(self) -> None:
20effc67 836 self.log.debug('_check_daemons')
f91f0fd5
TL
837 daemons = self.mgr.cache.get_daemons()
838 daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
839 for dd in daemons:
840 # orphan?
f67539c2
TL
841 spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None)
842 assert dd.hostname is not None
843 assert dd.daemon_type is not None
844 assert dd.daemon_id is not None
f91f0fd5
TL
845 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
846 # (mon and mgr specs should always exist; osds aren't matched
847 # to a service spec)
848 self.log.info('Removing orphan daemon %s...' % dd.name())
f67539c2 849 self._remove_daemon(dd.name(), dd.hostname)
f91f0fd5
TL
850
851 # ignore unmanaged services
852 if spec and spec.unmanaged:
853 continue
854
b3b6e05e
TL
855 # ignore daemons for deleted services
856 if dd.service_name() in self.mgr.spec_store.spec_deleted:
857 continue
858
20effc67
TL
859 if dd.daemon_type == 'agent':
860 try:
861 self.mgr.agent_helpers._check_agent(dd.hostname)
862 except Exception as e:
863 self.log.debug(
864 f'Agent {dd.name()} could not be checked in _check_daemons: {e}')
865 continue
866
f91f0fd5 867 # These daemon types require additional configs after creation
522d829b 868 if dd.daemon_type in REQUIRES_POST_ACTIONS:
f91f0fd5
TL
869 daemons_post[dd.daemon_type].append(dd)
870
f67539c2 871 if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
f91f0fd5
TL
872 self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
873 dd.is_active = True
874 else:
875 dd.is_active = False
876
f67539c2 877 deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id)
f91f0fd5
TL
878 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
879 dd.hostname, dd.name())
880 if last_deps is None:
881 last_deps = []
882 action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
883 if not last_config:
884 self.log.info('Reconfiguring %s (unknown last config time)...' % (
885 dd.name()))
886 action = 'reconfig'
887 elif last_deps != deps:
888 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
889 deps))
890 self.log.info('Reconfiguring %s (dependencies changed)...' % (
891 dd.name()))
892 action = 'reconfig'
20effc67
TL
893 elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
894 self.log.debug(
895 f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
896 self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .')
897 dd.extra_container_args = spec.extra_container_args
898 action = 'redeploy'
f91f0fd5
TL
899 elif self.mgr.last_monmap and \
900 self.mgr.last_monmap > last_config and \
901 dd.daemon_type in CEPH_TYPES:
902 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
903 action = 'reconfig'
904 elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
905 dd.daemon_type in CEPH_TYPES:
906 self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
907 action = 'reconfig'
908 if action:
909 if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
910 and action == 'reconfig':
911 action = 'redeploy'
912 try:
f67539c2
TL
913 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
914 self.mgr._daemon_action(daemon_spec, action=action)
20effc67
TL
915 if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()):
916 self.mgr.cache.save_host(dd.hostname)
f91f0fd5
TL
917 except OrchestratorError as e:
918 self.mgr.events.from_orch_error(e)
919 if dd.daemon_type in daemons_post:
920 del daemons_post[dd.daemon_type]
921 # continue...
922 except Exception as e:
923 self.mgr.events.for_daemon_from_exception(dd.name(), e)
924 if dd.daemon_type in daemons_post:
925 del daemons_post[dd.daemon_type]
926 # continue...
927
928 # do daemon post actions
929 for daemon_type, daemon_descs in daemons_post.items():
a4b75251
TL
930 run_post = False
931 for d in daemon_descs:
932 if d.name() in self.mgr.requires_post_actions:
933 self.mgr.requires_post_actions.remove(d.name())
934 run_post = True
935 if run_post:
f67539c2
TL
936 self.mgr._get_cephadm_service(daemon_type_to_service(
937 daemon_type)).daemon_check_post(daemon_descs)
938
939 def _purge_deleted_services(self) -> None:
20effc67 940 self.log.debug('_purge_deleted_services')
f67539c2
TL
941 existing_services = self.mgr.spec_store.all_specs.items()
942 for service_name, spec in list(existing_services):
943 if service_name not in self.mgr.spec_store.spec_deleted:
944 continue
945 if self.mgr.cache.get_daemons_by_service(service_name):
946 continue
947 if spec.service_type in ['mon', 'mgr']:
948 continue
949
950 logger.info(f'Purge service {service_name}')
951
952 self.mgr.cephadm_services[spec.service_type].purge(service_name)
953 self.mgr.spec_store.finally_rm(service_name)
f91f0fd5 954
adb31ebb 955 def convert_tags_to_repo_digest(self) -> None:
f91f0fd5
TL
956 if not self.mgr.use_repo_digest:
957 return
958 settings = self.mgr.upgrade.get_distinct_container_image_settings()
959 digests: Dict[str, ContainerInspectInfo] = {}
960 for container_image_ref in set(settings.values()):
961 if not is_repo_digest(container_image_ref):
20effc67
TL
962 image_info = self.mgr.wait_async(
963 self._get_container_image_info(container_image_ref))
f67539c2
TL
964 if image_info.repo_digests:
965 # FIXME: we assume the first digest here is the best
966 assert is_repo_digest(image_info.repo_digests[0]), image_info
f91f0fd5
TL
967 digests[container_image_ref] = image_info
968
969 for entity, container_image_ref in settings.items():
970 if not is_repo_digest(container_image_ref):
971 image_info = digests[container_image_ref]
f67539c2
TL
972 if image_info.repo_digests:
973 # FIXME: we assume the first digest here is the best
974 self.mgr.set_container_image(entity, image_info.repo_digests[0])
975
20effc67
TL
976 def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]:
977 # host -> path -> (mode, uid, gid, content, digest)
978 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
979
980 # ceph.conf
981 config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
982 config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
983
984 if self.mgr.manage_etc_ceph_ceph_conf:
985 try:
986 pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
987 ha = HostAssignment(
988 spec=ServiceSpec('mon', placement=pspec),
989 hosts=self.mgr.cache.get_schedulable_hosts(),
990 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
991 daemons=[],
992 networks=self.mgr.cache.networks,
993 )
994 all_slots, _, _ = ha.place()
995 for host in {s.hostname for s in all_slots}:
996 if host not in client_files:
997 client_files[host] = {}
998 client_files[host]['/etc/ceph/ceph.conf'] = (
999 0o644, 0, 0, bytes(config), str(config_digest)
1000 )
1001 except Exception as e:
1002 self.mgr.log.warning(
1003 f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
1004
1005 # client keyrings
1006 for ks in self.mgr.keys.keys.values():
1007 try:
1008 ret, keyring, err = self.mgr.mon_command({
1009 'prefix': 'auth get',
1010 'entity': ks.entity,
1011 })
1012 if ret:
1013 self.log.warning(f'unable to fetch keyring for {ks.entity}')
1014 continue
1015 digest = ''.join('%02x' % c for c in hashlib.sha256(
1016 keyring.encode('utf-8')).digest())
1017 ha = HostAssignment(
1018 spec=ServiceSpec('mon', placement=ks.placement),
1019 hosts=self.mgr.cache.get_schedulable_hosts(),
1020 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
1021 daemons=[],
1022 networks=self.mgr.cache.networks,
1023 )
1024 all_slots, _, _ = ha.place()
1025 for host in {s.hostname for s in all_slots}:
1026 if host not in client_files:
1027 client_files[host] = {}
1028 client_files[host]['/etc/ceph/ceph.conf'] = (
1029 0o644, 0, 0, bytes(config), str(config_digest)
1030 )
1031 client_files[host][ks.path] = (
1032 ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest
1033 )
1034 except Exception as e:
1035 self.log.warning(
1036 f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
1037 return client_files
1038
1039 def _write_client_files(self,
1040 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]],
1041 host: str) -> None:
1042 updated_files = False
1043 old_files = self.mgr.cache.get_host_client_files(host).copy()
1044 for path, m in client_files.get(host, {}).items():
1045 mode, uid, gid, content, digest = m
1046 if path in old_files:
1047 match = old_files[path] == (digest, mode, uid, gid)
1048 del old_files[path]
1049 if match:
1050 continue
1051 self.log.info(f'Updating {host}:{path}')
1052 self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid)
1053 self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
1054 updated_files = True
1055 for path in old_files.keys():
1056 self.log.info(f'Removing {host}:{path}')
1057 cmd = ['rm', '-f', path]
1058 self.mgr.ssh.check_execute_command(host, cmd)
1059 updated_files = True
1060 self.mgr.cache.removed_client_file(host, path)
1061 if updated_files:
1062 self.mgr.cache.save_host(host)
1063
1064 async def _create_daemon(self,
1065 daemon_spec: CephadmDaemonDeploySpec,
1066 reconfig: bool = False,
1067 osd_uuid_map: Optional[Dict[str, Any]] = None,
1068 ) -> str:
f67539c2
TL
1069
1070 with set_exception_subject('service', orchestrator.DaemonDescription(
1071 daemon_type=daemon_spec.daemon_type,
1072 daemon_id=daemon_spec.daemon_id,
1073 hostname=daemon_spec.host,
1074 ).service_id(), overwrite=True):
1075
1076 try:
1077 image = ''
1078 start_time = datetime_now()
1079 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1080
1081 if daemon_spec.daemon_type == 'container':
1082 spec = cast(CustomContainerSpec,
1083 self.mgr.spec_store[daemon_spec.service_name].spec)
1084 image = spec.image
1085 if spec.ports:
1086 ports.extend(spec.ports)
1087
f67539c2
TL
1088 # TCP port to open in the host firewall
1089 if len(ports) > 0:
1090 daemon_spec.extra_args.extend([
1091 '--tcp-ports', ' '.join(map(str, ports))
1092 ])
1093
1094 # osd deployments needs an --osd-uuid arg
1095 if daemon_spec.daemon_type == 'osd':
1096 if not osd_uuid_map:
1097 osd_uuid_map = self.mgr.get_osd_uuid_map()
1098 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1099 if not osd_uuid:
1100 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1101 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1102
1103 if reconfig:
1104 daemon_spec.extra_args.append('--reconfig')
1105 if self.mgr.allow_ptrace:
1106 daemon_spec.extra_args.append('--allow-ptrace')
1107
20effc67
TL
1108 try:
1109 eca = daemon_spec.extra_container_args
1110 if eca:
1111 for a in eca:
1112 daemon_spec.extra_args.append(f'--extra-container-args={a}')
1113 except AttributeError:
1114 eca = None
1115
f67539c2 1116 if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
20effc67 1117 await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials'))))
f67539c2
TL
1118
1119 self.log.info('%s daemon %s on %s' % (
1120 'Reconfiguring' if reconfig else 'Deploying',
1121 daemon_spec.name(), daemon_spec.host))
1122
20effc67 1123 out, err, code = await self._run_cephadm(
f67539c2
TL
1124 daemon_spec.host, daemon_spec.name(), 'deploy',
1125 [
1126 '--name', daemon_spec.name(),
1127 '--meta-json', json.dumps({
1128 'service_name': daemon_spec.service_name,
1129 'ports': daemon_spec.ports,
1130 'ip': daemon_spec.ip,
1131 'deployed_by': self.mgr.get_active_mgr_digests(),
b3b6e05e
TL
1132 'rank': daemon_spec.rank,
1133 'rank_generation': daemon_spec.rank_generation,
20effc67 1134 'extra_container_args': eca
f67539c2
TL
1135 }),
1136 '--config-json', '-',
1137 ] + daemon_spec.extra_args,
1138 stdin=json.dumps(daemon_spec.final_config),
20effc67
TL
1139 image=image,
1140 )
1141
1142 if daemon_spec.daemon_type == 'agent':
1143 self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now()
1144 self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1
f67539c2
TL
1145
1146 # refresh daemon state? (ceph daemon reconfig does not need it)
1147 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
1148 if not code and daemon_spec.host in self.mgr.cache.daemons:
1149 # prime cached service state with what we (should have)
1150 # just created
1151 sd = daemon_spec.to_daemon_description(
20effc67 1152 DaemonDescriptionStatus.starting, 'starting')
f67539c2 1153 self.mgr.cache.add_daemon(daemon_spec.host, sd)
522d829b 1154 if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS:
a4b75251 1155 self.mgr.requires_post_actions.add(daemon_spec.name())
f67539c2
TL
1156 self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
1157
20effc67
TL
1158 if daemon_spec.daemon_type != 'agent':
1159 self.mgr.cache.update_daemon_config_deps(
1160 daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
1161 self.mgr.cache.save_host(daemon_spec.host)
1162 else:
1163 self.mgr.agent_cache.update_agent_config_deps(
1164 daemon_spec.host, daemon_spec.deps, start_time)
1165 self.mgr.agent_cache.save_agent(daemon_spec.host)
f67539c2
TL
1166 msg = "{} {} on host '{}'".format(
1167 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1168 if not code:
1169 self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1170 else:
1171 what = 'reconfigure' if reconfig else 'deploy'
1172 self.mgr.events.for_daemon(
1173 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1174 return msg
1175 except OrchestratorError:
1176 redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names()
1177 if not reconfig and not redeploy:
1178 # we have to clean up the daemon. E.g. keyrings.
1179 servict_type = daemon_type_to_service(daemon_spec.daemon_type)
1180 dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
a4b75251 1181 self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
f67539c2
TL
1182 raise
1183
20effc67 1184 def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str:
f67539c2
TL
1185 """
1186 Remove a daemon
1187 """
1188 (daemon_type, daemon_id) = name.split('.', 1)
1189 daemon = orchestrator.DaemonDescription(
1190 daemon_type=daemon_type,
1191 daemon_id=daemon_id,
1192 hostname=host)
1193
1194 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1195
1196 self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
1197
1198 # NOTE: we are passing the 'force' flag here, which means
1199 # we can delete a mon instances data.
1200 args = ['--name', name, '--force']
1201 self.log.info('Removing daemon %s from %s' % (name, host))
20effc67
TL
1202 out, err, code = self.mgr.wait_async(self._run_cephadm(
1203 host, name, 'rm-daemon', args))
f67539c2
TL
1204 if not code:
1205 # remove item from cache
1206 self.mgr.cache.rm_daemon(host, name)
1207 self.mgr.cache.invalidate_host_daemons(host)
1208
20effc67
TL
1209 if not no_post_remove:
1210 self.mgr.cephadm_services[daemon_type_to_service(
1211 daemon_type)].post_remove(daemon, is_failed_deploy=False)
f67539c2
TL
1212
1213 return "Removed {} from host '{}'".format(name, host)
adb31ebb 1214
20effc67
TL
1215 async def _run_cephadm_json(self,
1216 host: str,
1217 entity: Union[CephadmNoImage, str],
1218 command: str,
1219 args: List[str],
1220 no_fsid: Optional[bool] = False,
1221 image: Optional[str] = "",
1222 ) -> Any:
adb31ebb 1223 try:
20effc67 1224 out, err, code = await self._run_cephadm(
adb31ebb
TL
1225 host, entity, command, args, no_fsid=no_fsid, image=image)
1226 if code:
1227 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
1228 except Exception as e:
1229 raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}')
1230 try:
1231 return json.loads(''.join(out))
1232 except (ValueError, KeyError):
1233 msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON'
1234 self.log.exception(f'{msg}: {"".join(out)}')
1235 raise OrchestratorError(msg)
1236
20effc67
TL
1237 async def _run_cephadm(self,
1238 host: str,
1239 entity: Union[CephadmNoImage, str],
1240 command: str,
1241 args: List[str],
1242 addr: Optional[str] = "",
1243 stdin: Optional[str] = "",
1244 no_fsid: Optional[bool] = False,
1245 error_ok: Optional[bool] = False,
1246 image: Optional[str] = "",
1247 env_vars: Optional[List[str]] = None,
1248 ) -> Tuple[List[str], List[str], int]:
f67539c2
TL
1249 """
1250 Run cephadm on the remote host with the given command + args
1251
1252 Important: You probably don't want to run _run_cephadm from CLI handlers
1253
1254 :env_vars: in format -> [KEY=VALUE, ..]
1255 """
20effc67
TL
1256
1257 await self.mgr.ssh._remote_connection(host, addr)
1258
f67539c2
TL
1259 self.log.debug(f"_run_cephadm : command = {command}")
1260 self.log.debug(f"_run_cephadm : args = {args}")
1261
20effc67 1262 bypass_image = ('agent')
f67539c2 1263
20effc67
TL
1264 assert image or entity
1265 # Skip the image check for daemons deployed that are not ceph containers
1266 if not str(entity).startswith(bypass_image):
1267 if not image and entity is not cephadmNoImage:
1268 image = self.mgr._get_container_image(entity)
f67539c2 1269
20effc67 1270 final_args = []
f67539c2 1271
20effc67
TL
1272 # global args
1273 if env_vars:
1274 for env_var_pair in env_vars:
1275 final_args.extend(['--env', env_var_pair])
f67539c2 1276
20effc67
TL
1277 if image:
1278 final_args.extend(['--image', image])
f67539c2 1279
20effc67
TL
1280 if not self.mgr.container_init:
1281 final_args += ['--no-container-init']
f67539c2 1282
20effc67
TL
1283 # subcommand
1284 final_args.append(command)
f67539c2 1285
20effc67
TL
1286 # subcommand args
1287 if not no_fsid:
1288 final_args += ['--fsid', self.mgr._cluster_fsid]
f67539c2 1289
20effc67 1290 final_args += args
f67539c2 1291
20effc67
TL
1292 # exec
1293 self.log.debug('args: %s' % (' '.join(final_args)))
1294 if self.mgr.mode == 'root':
1295 # agent has cephadm binary as an extra file which is
1296 # therefore passed over stdin. Even for debug logs it's too much
1297 if stdin and 'agent' not in str(entity):
1298 self.log.debug('stdin: %s' % stdin)
f67539c2 1299
20effc67
TL
1300 cmd = ['which', 'python3']
1301 python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
1302 cmd = [python, self.mgr.cephadm_binary_path] + final_args
f67539c2 1303
20effc67
TL
1304 try:
1305 out, err, code = await self.mgr.ssh._execute_command(
1306 host, cmd, stdin=stdin, addr=addr)
1307 if code == 2:
1308 ls_cmd = ['ls', self.mgr.cephadm_binary_path]
1309 out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr)
1310 if code_ls == 2:
1311 await self._deploy_cephadm_binary(host, addr)
1312 out, err, code = await self.mgr.ssh._execute_command(
1313 host, cmd, stdin=stdin, addr=addr)
1314 # if there is an agent on this host, make sure it is using the most recent
1315 # vesion of cephadm binary
1316 if host in self.mgr.inventory:
1317 for agent in self.mgr.cache.get_daemons_by_type('agent', host):
1318 self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
1319
1320 except Exception as e:
1321 await self.mgr.ssh._reset_con(host)
1322 if error_ok:
1323 return [], [str(e)], 1
1324 raise
1325
1326 elif self.mgr.mode == 'cephadm-package':
1327 try:
1328 cmd = ['/usr/bin/cephadm'] + final_args
1329 out, err, code = await self.mgr.ssh._execute_command(
1330 host, cmd, stdin=stdin, addr=addr)
1331 except Exception as e:
1332 await self.mgr.ssh._reset_con(host)
1333 if error_ok:
1334 return [], [str(e)], 1
1335 raise
1336 else:
1337 assert False, 'unsupported mode'
1338
1339 self.log.debug(f'code: {code}')
1340 if out:
1341 self.log.debug(f'out: {out}')
1342 if err:
1343 self.log.debug(f'err: {err}')
1344 if code and not error_ok:
1345 raise OrchestratorError(
1346 f'cephadm exited with an error code: {code}, stderr: {err}')
1347 return [out], [err], code
1348
1349 async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
f67539c2
TL
1350 # pick a random host...
1351 host = None
1352 for host_name in self.mgr.inventory.keys():
1353 host = host_name
1354 break
1355 if not host:
1356 raise OrchestratorError('no hosts defined')
1357 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
20effc67 1358 await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials'))))
f67539c2 1359
a4b75251
TL
1360 pullargs: List[str] = []
1361 if self.mgr.registry_insecure:
1362 pullargs.append("--insecure")
1363
20effc67 1364 j = await self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
f67539c2
TL
1365
1366 r = ContainerInspectInfo(
1367 j['image_id'],
1368 j.get('ceph_version'),
1369 j.get('repo_digests')
1370 )
1371 self.log.debug(f'image {image_name} -> {r}')
1372 return r
1373
1374 # function responsible for logging single host into custom registry
20effc67
TL
1375 async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]:
1376 self.log.debug(
1377 f"Attempting to log host {host} into custom registry @ {registry_json['url']}")
f67539c2 1378 # want to pass info over stdin rather than through normal list of args
20effc67 1379 out, err, code = await self._run_cephadm(
f67539c2 1380 host, 'mon', 'registry-login',
20effc67 1381 ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True)
f67539c2 1382 if code:
20effc67 1383 return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password"
f67539c2
TL
1384 return None
1385
20effc67 1386 async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
f67539c2
TL
1387 # Use tee (from coreutils) to create a copy of cephadm on the target machine
1388 self.log.info(f"Deploying cephadm binary to {host}")
20effc67
TL
1389 await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
1390 self.mgr._cephadm.encode('utf-8'), addr=addr)