]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/serve.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
CommitLineData
2a845540 1import ipaddress
b3b6e05e 2import hashlib
f91f0fd5
TL
3import json
4import logging
b3b6e05e 5import uuid
33c7a0ef 6import os
f91f0fd5 7from collections import defaultdict
20effc67
TL
8from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
9 DefaultDict
f91f0fd5
TL
10
11from ceph.deployment import inventory
12from ceph.deployment.drive_group import DriveGroupSpec
b3b6e05e 13from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
20effc67 14from ceph.utils import datetime_now
f91f0fd5
TL
15
16import orchestrator
f67539c2
TL
17from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
18 DaemonDescriptionStatus, daemon_type_to_service
19from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
f91f0fd5 20from cephadm.schedule import HostAssignment
b3b6e05e 21from cephadm.autotune import MemoryAutotuner
f67539c2
TL
22from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
23 CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
24from mgr_module import MonCommandFailed
b3b6e05e 25from mgr_util import format_bytes
f67539c2
TL
26
27from . import utils
f91f0fd5
TL
28
29if TYPE_CHECKING:
f67539c2 30 from cephadm.module import CephadmOrchestrator
f91f0fd5
TL
31
32logger = logging.getLogger(__name__)
33
522d829b
TL
34REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw']
35
f91f0fd5
TL
36
37class CephadmServe:
38 """
39 This module contains functions that are executed in the
40 serve() thread. Thus they don't block the CLI.
41
f67539c2
TL
42 Please see the `Note regarding network calls from CLI handlers`
43 chapter in the cephadm developer guide.
44
f91f0fd5
TL
45 On the other hand, These function should *not* be called form
46 CLI handlers, to avoid blocking the CLI
47 """
48
49 def __init__(self, mgr: "CephadmOrchestrator"):
50 self.mgr: "CephadmOrchestrator" = mgr
51 self.log = logger
52
53 def serve(self) -> None:
54 """
55 The main loop of cephadm.
56
57 A command handler will typically change the declarative state
58 of cephadm. This loop will then attempt to apply this new state.
59 """
60 self.log.debug("serve starting")
f67539c2
TL
61 self.mgr.config_checker.load_network_config()
62
f91f0fd5 63 while self.mgr.run:
20effc67 64 self.log.debug("serve loop start")
f91f0fd5
TL
65
66 try:
67
68 self.convert_tags_to_repo_digest()
69
70 # refresh daemons
71 self.log.debug('refreshing hosts and daemons')
72 self._refresh_hosts_and_daemons()
73
74 self._check_for_strays()
75
76 self._update_paused_health()
77
522d829b
TL
78 if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard:
79 self.mgr.need_connect_dashboard_rgw = False
80 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
81 self.log.info('Checking dashboard <-> RGW credentials')
82 self.mgr.remote('dashboard', 'set_rgw_credentials')
83
f91f0fd5 84 if not self.mgr.paused:
39ae355f
TL
85 self._run_async_actions()
86
adb31ebb 87 self.mgr.to_remove_osds.process_removal_queue()
f91f0fd5
TL
88
89 self.mgr.migration.migrate()
90 if self.mgr.migration.is_migration_ongoing():
91 continue
92
93 if self._apply_all_services():
94 continue # did something, refresh
95
96 self._check_daemons()
97
f67539c2
TL
98 self._purge_deleted_services()
99
20effc67
TL
100 self._check_for_moved_osds()
101
102 if self.mgr.agent_helpers._handle_use_agent_setting():
103 continue
104
f91f0fd5
TL
105 if self.mgr.upgrade.continue_upgrade():
106 continue
107
108 except OrchestratorError as e:
109 if e.event_subject:
110 self.mgr.events.from_orch_error(e)
111
20effc67 112 self.log.debug("serve loop sleep")
f91f0fd5 113 self._serve_sleep()
20effc67 114 self.log.debug("serve loop wake")
f91f0fd5
TL
115 self.log.debug("serve exit")
116
adb31ebb 117 def _serve_sleep(self) -> None:
f67539c2
TL
118 sleep_interval = max(
119 30,
120 min(
121 self.mgr.host_check_interval,
122 self.mgr.facts_cache_timeout,
123 self.mgr.daemon_cache_timeout,
124 self.mgr.device_cache_timeout,
125 )
126 )
f91f0fd5 127 self.log.debug('Sleeping for %d seconds', sleep_interval)
f67539c2 128 self.mgr.event.wait(sleep_interval)
f91f0fd5
TL
129 self.mgr.event.clear()
130
adb31ebb 131 def _update_paused_health(self) -> None:
20effc67 132 self.log.debug('_update_paused_health')
f91f0fd5 133 if self.mgr.paused:
20effc67
TL
134 self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [
135 "'ceph orch resume' to resume"])
f91f0fd5 136 else:
a4b75251 137 self.mgr.remove_health_warning('CEPHADM_PAUSED')
f91f0fd5 138
b3b6e05e
TL
139 def _autotune_host_memory(self, host: str) -> None:
140 total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0)
141 if not total_mem:
142 val = None
143 else:
144 total_mem *= 1024 # kb -> bytes
145 total_mem *= self.mgr.autotune_memory_target_ratio
146 a = MemoryAutotuner(
147 daemons=self.mgr.cache.get_daemons_by_host(host),
148 config_get=self.mgr.get_foreign_ceph_option,
149 total_mem=total_mem,
150 )
151 val, osds = a.tune()
152 any_changed = False
153 for o in osds:
154 if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val:
155 self.mgr.check_mon_command({
156 'prefix': 'config rm',
157 'who': o,
158 'name': 'osd_memory_target',
159 })
160 any_changed = True
161 if val is not None:
162 if any_changed:
163 self.mgr.log.info(
164 f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
165 )
166 ret, out, err = self.mgr.mon_command({
167 'prefix': 'config set',
2a845540 168 'who': f'osd/host:{host.split(".")[0]}',
b3b6e05e
TL
169 'name': 'osd_memory_target',
170 'value': str(val),
171 })
172 if ret:
173 self.log.warning(
174 f'Unable to set osd_memory_target on {host} to {val}: {err}'
175 )
176 else:
2a845540
TL
177 # if osd memory autotuning is off, we don't want to remove these config
178 # options as users may be using them. Since there is no way to set autotuning
179 # on/off at a host level, best we can do is check if it is globally on.
180 if self.mgr.get_foreign_ceph_option('osd', 'osd_memory_target_autotune'):
181 self.mgr.check_mon_command({
182 'prefix': 'config rm',
183 'who': f'osd/host:{host.split(".")[0]}',
184 'name': 'osd_memory_target',
185 })
b3b6e05e
TL
186 self.mgr.cache.update_autotune(host)
187
f91f0fd5 188 def _refresh_hosts_and_daemons(self) -> None:
20effc67 189 self.log.debug('_refresh_hosts_and_daemons')
f91f0fd5
TL
190 bad_hosts = []
191 failures = []
20effc67 192 agents_down: List[str] = []
b3b6e05e 193
f91f0fd5 194 @forall_hosts
adb31ebb
TL
195 def refresh(host: str) -> None:
196
f67539c2
TL
197 # skip hosts that are in maintenance - they could be powered off
198 if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
199 return
200
20effc67
TL
201 if self.mgr.use_agent:
202 if self.mgr.agent_helpers._check_agent(host):
203 agents_down.append(host)
204
f91f0fd5
TL
205 if self.mgr.cache.host_needs_check(host):
206 r = self._check_host(host)
207 if r is not None:
208 bad_hosts.append(r)
f91f0fd5 209
20effc67
TL
210 if (
211 not self.mgr.use_agent
212 or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]
213 or host in agents_down
214 ):
215 if self.mgr.cache.host_needs_daemon_refresh(host):
216 self.log.debug('refreshing %s daemons' % host)
217 r = self._refresh_host_daemons(host)
218 if r:
219 failures.append(r)
220
221 if self.mgr.cache.host_needs_facts_refresh(host):
222 self.log.debug(('Refreshing %s facts' % host))
223 r = self._refresh_facts(host)
224 if r:
225 failures.append(r)
226
227 if self.mgr.cache.host_needs_network_refresh(host):
228 self.log.debug(('Refreshing %s networks' % host))
229 r = self._refresh_host_networks(host)
230 if r:
231 failures.append(r)
232
233 if self.mgr.cache.host_needs_device_refresh(host):
234 self.log.debug('refreshing %s devices' % host)
235 r = self._refresh_host_devices(host)
236 if r:
237 failures.append(r)
238 self.mgr.cache.metadata_up_to_date[host] = True
239 elif not self.mgr.cache.get_daemons_by_type('agent', host=host):
240 if self.mgr.cache.host_needs_daemon_refresh(host):
241 self.log.debug('refreshing %s daemons' % host)
242 r = self._refresh_host_daemons(host)
243 if r:
244 failures.append(r)
245 self.mgr.cache.metadata_up_to_date[host] = True
246
247 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'):
f91f0fd5 248 self.log.debug(f"Logging `{host}` into custom registry")
20effc67
TL
249 r = self.mgr.wait_async(self._registry_login(
250 host, json.loads(str(self.mgr.get_store('registry_credentials')))))
f91f0fd5
TL
251 if r:
252 bad_hosts.append(r)
253
f91f0fd5
TL
254 if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
255 self.log.debug(f"refreshing OSDSpec previews for {host}")
256 r = self._refresh_host_osdspec_previews(host)
257 if r:
258 failures.append(r)
259
b3b6e05e
TL
260 if (
261 self.mgr.cache.host_needs_autotune_memory(host)
262 and not self.mgr.inventory.has_label(host, '_no_autotune_memory')
263 ):
264 self.log.debug(f"autotuning memory for {host}")
265 self._autotune_host_memory(host)
266
f91f0fd5
TL
267 refresh(self.mgr.cache.get_hosts())
268
39ae355f
TL
269 self._write_all_client_files()
270
20effc67
TL
271 self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
272
f67539c2
TL
273 self.mgr.config_checker.run_checks()
274
adb31ebb
TL
275 for k in [
276 'CEPHADM_HOST_CHECK_FAILED',
adb31ebb
TL
277 'CEPHADM_REFRESH_FAILED',
278 ]:
a4b75251 279 self.mgr.remove_health_warning(k)
f91f0fd5 280 if bad_hosts:
20effc67
TL
281 self.mgr.set_health_warning(
282 'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
f91f0fd5 283 if failures:
20effc67
TL
284 self.mgr.set_health_warning(
285 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
286 self.mgr.update_failed_daemon_health_check()
f91f0fd5 287
adb31ebb 288 def _check_host(self, host: str) -> Optional[str]:
f91f0fd5 289 if host not in self.mgr.inventory:
adb31ebb 290 return None
f91f0fd5
TL
291 self.log.debug(' checking %s' % host)
292 try:
522d829b 293 addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
20effc67 294 out, err, code = self.mgr.wait_async(self._run_cephadm(
f91f0fd5 295 host, cephadmNoImage, 'check-host', [],
39ae355f 296 error_ok=True, no_fsid=True, log_output=self.mgr.log_refresh_metadata))
f91f0fd5
TL
297 self.mgr.cache.update_last_host_check(host)
298 self.mgr.cache.save_host(host)
299 if code:
522d829b 300 self.log.debug(' host %s (%s) failed check' % (host, addr))
f91f0fd5 301 if self.mgr.warn_on_failed_host_check:
522d829b 302 return 'host %s (%s) failed check: %s' % (host, addr, err)
f91f0fd5 303 else:
522d829b 304 self.log.debug(' host %s (%s) ok' % (host, addr))
f91f0fd5 305 except Exception as e:
522d829b
TL
306 self.log.debug(' host %s (%s) failed check' % (host, addr))
307 return 'host %s (%s) failed check: %s' % (host, addr, e)
adb31ebb 308 return None
f91f0fd5 309
adb31ebb 310 def _refresh_host_daemons(self, host: str) -> Optional[str]:
f91f0fd5 311 try:
39ae355f
TL
312 ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [],
313 no_fsid=True, log_output=self.mgr.log_refresh_metadata))
adb31ebb
TL
314 except OrchestratorError as e:
315 return str(e)
20effc67 316 self.mgr._process_ls_output(host, ls)
f91f0fd5
TL
317 return None
318
adb31ebb 319 def _refresh_facts(self, host: str) -> Optional[str]:
f91f0fd5 320 try:
20effc67 321 val = self.mgr.wait_async(self._run_cephadm_json(
39ae355f 322 host, cephadmNoImage, 'gather-facts', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
adb31ebb
TL
323 except OrchestratorError as e:
324 return str(e)
325
326 self.mgr.cache.update_host_facts(host, val)
327
328 return None
329
330 def _refresh_host_devices(self, host: str) -> Optional[str]:
20effc67 331 with_lsm = self.mgr.device_enhanced_scan
f67539c2 332 inventory_args = ['--', 'inventory',
a4b75251 333 '--format=json-pretty',
f67539c2
TL
334 '--filter-for-batch']
335 if with_lsm:
336 inventory_args.insert(-1, "--with-lsm")
337
f91f0fd5 338 try:
adb31ebb 339 try:
20effc67 340 devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
39ae355f 341 inventory_args, log_output=self.mgr.log_refresh_metadata))
adb31ebb
TL
342 except OrchestratorError as e:
343 if 'unrecognized arguments: --filter-for-batch' in str(e):
f67539c2
TL
344 rerun_args = inventory_args.copy()
345 rerun_args.remove('--filter-for-batch')
20effc67 346 devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
39ae355f 347 rerun_args, log_output=self.mgr.log_refresh_metadata))
adb31ebb
TL
348 else:
349 raise
350
adb31ebb
TL
351 except OrchestratorError as e:
352 return str(e)
353
20effc67
TL
354 self.log.debug('Refreshed host %s devices (%d)' % (
355 host, len(devices)))
adb31ebb 356 ret = inventory.Devices.from_json(devices)
20effc67 357 self.mgr.cache.update_host_devices(host, ret.devices)
f91f0fd5
TL
358 self.update_osdspec_previews(host)
359 self.mgr.cache.save_host(host)
360 return None
361
20effc67
TL
362 def _refresh_host_networks(self, host: str) -> Optional[str]:
363 try:
364 networks = self.mgr.wait_async(self._run_cephadm_json(
39ae355f 365 host, 'mon', 'list-networks', [], no_fsid=True, log_output=self.mgr.log_refresh_metadata))
20effc67
TL
366 except OrchestratorError as e:
367 return str(e)
368
369 self.log.debug('Refreshed host %s networks (%s)' % (
370 host, len(networks)))
371 self.mgr.cache.update_host_networks(host, networks)
372 self.mgr.cache.save_host(host)
373 return None
374
adb31ebb 375 def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
f91f0fd5
TL
376 self.update_osdspec_previews(host)
377 self.mgr.cache.save_host(host)
378 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
adb31ebb 379 return None
f91f0fd5 380
adb31ebb 381 def update_osdspec_previews(self, search_host: str = '') -> None:
f91f0fd5
TL
382 # Set global 'pending' flag for host
383 self.mgr.cache.loading_osdspec_preview.add(search_host)
384 previews = []
385 # query OSDSpecs for host <search host> and generate/get the preview
386 # There can be multiple previews for one host due to multiple OSDSpecs.
387 previews.extend(self.mgr.osd_service.get_previews(search_host))
f67539c2 388 self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>')
f91f0fd5
TL
389 self.mgr.cache.osdspec_previews[search_host] = previews
390 # Unset global 'pending' flag for host
391 self.mgr.cache.loading_osdspec_preview.remove(search_host)
392
39ae355f
TL
393 def _run_async_actions(self) -> None:
394 while self.mgr.scheduled_async_actions:
395 (self.mgr.scheduled_async_actions.pop(0))()
396
f91f0fd5
TL
397 def _check_for_strays(self) -> None:
398 self.log.debug('_check_for_strays')
399 for k in ['CEPHADM_STRAY_HOST',
400 'CEPHADM_STRAY_DAEMON']:
a4b75251 401 self.mgr.remove_health_warning(k)
f91f0fd5
TL
402 if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
403 ls = self.mgr.list_servers()
a4b75251 404 self.log.debug(ls)
f91f0fd5
TL
405 managed = self.mgr.cache.get_daemon_names()
406 host_detail = [] # type: List[str]
407 host_num_daemons = 0
408 daemon_detail = [] # type: List[str]
409 for item in ls:
410 host = item.get('hostname')
f67539c2 411 assert isinstance(host, str)
f91f0fd5 412 daemons = item.get('services') # misnomer!
f67539c2 413 assert isinstance(daemons, list)
f91f0fd5
TL
414 missing_names = []
415 for s in daemons:
f67539c2
TL
416 daemon_id = s.get('id')
417 assert daemon_id
418 name = '%s.%s' % (s.get('type'), daemon_id)
419 if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
f91f0fd5 420 metadata = self.mgr.get_metadata(
f67539c2
TL
421 cast(str, s.get('type')), daemon_id, {})
422 assert metadata is not None
423 try:
424 if s.get('type') == 'rgw-nfs':
425 # https://tracker.ceph.com/issues/49573
426 name = metadata['id'][:-4]
427 else:
428 name = '%s.%s' % (s.get('type'), metadata['id'])
429 except (KeyError, TypeError):
f91f0fd5 430 self.log.debug(
f67539c2
TL
431 "Failed to find daemon id for %s service %s" % (
432 s.get('type'), s.get('id')
433 )
434 )
20effc67
TL
435 if s.get('type') == 'tcmu-runner':
436 # because we don't track tcmu-runner daemons in the host cache
437 # and don't have a way to check if the daemon is part of iscsi service
438 # we assume that all tcmu-runner daemons are managed by cephadm
439 managed.append(name)
f91f0fd5
TL
440 if host not in self.mgr.inventory:
441 missing_names.append(name)
442 host_num_daemons += 1
443 if name not in managed:
444 daemon_detail.append(
445 'stray daemon %s on host %s not managed by cephadm' % (name, host))
446 if missing_names:
447 host_detail.append(
448 'stray host %s has %d stray daemons: %s' % (
449 host, len(missing_names), missing_names))
450 if self.mgr.warn_on_stray_hosts and host_detail:
20effc67
TL
451 self.mgr.set_health_warning(
452 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
f91f0fd5 453 if self.mgr.warn_on_stray_daemons and daemon_detail:
20effc67
TL
454 self.mgr.set_health_warning(
455 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
456
457 def _check_for_moved_osds(self) -> None:
458 self.log.debug('_check_for_moved_osds')
459 all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
460 for dd in self.mgr.cache.get_daemons_by_type('osd'):
461 assert dd.daemon_id
462 all_osds[int(dd.daemon_id)].append(dd)
463 for osd_id, dds in all_osds.items():
464 if len(dds) <= 1:
465 continue
466 running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
467 error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
468 msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
469 logger.info(msg)
470 if len(running) != 1:
471 continue
472 osd = self.mgr.get_osd_by_id(osd_id)
473 if not osd or not osd['up']:
474 continue
475 for e in error:
476 assert e.hostname
477 try:
478 self._remove_daemon(e.name(), e.hostname, no_post_remove=True)
479 self.mgr.events.for_daemon(
480 e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'")
481 except OrchestratorError as ex:
482 self.mgr.events.from_orch_error(ex)
483 logger.exception(f'failed to remove duplicated daemon {e}')
f91f0fd5
TL
484
485 def _apply_all_services(self) -> bool:
20effc67 486 self.log.debug('_apply_all_services')
f91f0fd5
TL
487 r = False
488 specs = [] # type: List[ServiceSpec]
20effc67
TL
489 # if metadata is not up to date, we still need to apply spec for agent
490 # since the agent is the one who gather the metadata. If we don't we
491 # end up stuck between wanting metadata to be up to date to apply specs
492 # and needing to apply the agent spec to get up to date metadata
493 if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
494 self.log.info('Metadata not up to date on all hosts. Skipping non agent specs')
495 try:
496 specs.append(self.mgr.spec_store['agent'].spec)
497 except Exception as e:
498 self.log.debug(f'Failed to find agent spec: {e}')
499 self.mgr.agent_helpers._apply_agent()
500 return r
501 else:
502 for sn, spec in self.mgr.spec_store.active_specs.items():
503 specs.append(spec)
a4b75251
TL
504 for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
505 self.mgr.remove_health_warning(name)
506 self.mgr.apply_spec_fails = []
f91f0fd5
TL
507 for spec in specs:
508 try:
509 if self._apply_service(spec):
510 r = True
511 except Exception as e:
a4b75251
TL
512 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
513 self.log.exception(msg)
f91f0fd5 514 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
a4b75251
TL
515 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
516 warnings = []
517 for x in self.mgr.apply_spec_fails:
518 warnings.append(f'{x[0]}: {x[1]}')
519 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
520 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
521 len(self.mgr.apply_spec_fails),
522 warnings)
33c7a0ef 523 self.mgr.update_watched_hosts()
2a845540 524 self.mgr.tuned_profile_utils._write_all_tuned_profiles()
f91f0fd5
TL
525 return r
526
f67539c2
TL
527 def _apply_service_config(self, spec: ServiceSpec) -> None:
528 if spec.config:
529 section = utils.name_to_config_section(spec.service_name())
a4b75251
TL
530 for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']:
531 self.mgr.remove_health_warning(name)
532 invalid_config_options = []
533 options_failed_to_set = []
f67539c2
TL
534 for k, v in spec.config.items():
535 try:
536 current = self.mgr.get_foreign_ceph_option(section, k)
537 except KeyError:
a4b75251
TL
538 msg = f'Ignoring invalid {spec.service_name()} config option {k}'
539 self.log.warning(msg)
f67539c2
TL
540 self.mgr.events.for_service(
541 spec, OrchestratorEvent.ERROR, f'Invalid config option {k}'
542 )
a4b75251 543 invalid_config_options.append(msg)
f67539c2
TL
544 continue
545 if current != v:
546 self.log.debug(f'setting [{section}] {k} = {v}')
547 try:
548 self.mgr.check_mon_command({
549 'prefix': 'config set',
550 'name': k,
551 'value': str(v),
552 'who': section,
553 })
554 except MonCommandFailed as e:
a4b75251
TL
555 msg = f'Failed to set {spec.service_name()} option {k}: {e}'
556 self.log.warning(msg)
557 options_failed_to_set.append(msg)
558
559 if invalid_config_options:
20effc67
TL
560 self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(
561 invalid_config_options), invalid_config_options)
a4b75251 562 if options_failed_to_set:
20effc67
TL
563 self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(
564 options_failed_to_set), options_failed_to_set)
f91f0fd5
TL
565
566 def _apply_service(self, spec: ServiceSpec) -> bool:
567 """
568 Schedule a service. Deploy new daemons or remove old ones, depending
569 on the target label and count specified in the placement.
570 """
571 self.mgr.migration.verify_no_migration()
572
f67539c2 573 service_type = spec.service_type
f91f0fd5
TL
574 service_name = spec.service_name()
575 if spec.unmanaged:
576 self.log.debug('Skipping unmanaged service %s' % service_name)
577 return False
578 if spec.preview_only:
579 self.log.debug('Skipping preview_only service %s' % service_name)
580 return False
581 self.log.debug('Applying service %s spec' % service_name)
582
20effc67
TL
583 if service_type == 'agent':
584 try:
585 assert self.mgr.cherrypy_thread
586 assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
587 except Exception:
588 self.log.info(
589 'Delaying applying agent spec until cephadm endpoint root cert created')
590 return False
591
f67539c2 592 self._apply_service_config(spec)
f91f0fd5 593
f67539c2 594 if service_type == 'osd':
f91f0fd5
TL
595 self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
596 # TODO: return True would result in a busy loop
597 # can't know if daemon count changed; create_from_spec doesn't
598 # return a solid indication
599 return False
600
f67539c2 601 svc = self.mgr.cephadm_services[service_type]
f91f0fd5
TL
602 daemons = self.mgr.cache.get_daemons_by_service(service_name)
603
b3b6e05e 604 public_networks: List[str] = []
f67539c2
TL
605 if service_type == 'mon':
606 out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network'))
f91f0fd5 607 if '/' in out:
b3b6e05e
TL
608 public_networks = [x.strip() for x in out.split(',')]
609 self.log.debug('mon public_network(s) is %s' % public_networks)
f91f0fd5
TL
610
611 def matches_network(host):
612 # type: (str) -> bool
2a845540
TL
613 # make sure the host has at least one network that belongs to some configured public network(s)
614 for pn in public_networks:
615 public_network = ipaddress.ip_network(pn)
616 for hn in self.mgr.cache.networks[host]:
617 host_network = ipaddress.ip_network(hn)
618 if host_network.overlaps(public_network):
619 return True
620
621 host_networks = ','.join(self.mgr.cache.networks[host])
622 pub_networks = ','.join(public_networks)
b3b6e05e 623 self.log.info(
2a845540
TL
624 f"Filtered out host {host}: does not belong to mon public_network(s): "
625 f" {pub_networks}, host network(s): {host_networks}"
b3b6e05e
TL
626 )
627 return False
f91f0fd5 628
b3b6e05e
TL
629 rank_map = None
630 if svc.ranked():
631 rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
f91f0fd5
TL
632 ha = HostAssignment(
633 spec=spec,
20effc67
TL
634 hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name(
635 ) == 'agent' else self.mgr.cache.get_schedulable_hosts(),
636 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
2a845540 637 draining_hosts=self.mgr.cache.get_draining_hosts(),
f67539c2
TL
638 daemons=daemons,
639 networks=self.mgr.cache.networks,
640 filter_new_host=(
641 matches_network if service_type == 'mon'
642 else None
643 ),
644 allow_colo=svc.allow_colo(),
645 primary_daemon_type=svc.primary_daemon_type(),
646 per_host_daemon_type=svc.per_host_daemon_type(),
b3b6e05e 647 rank_map=rank_map,
f91f0fd5
TL
648 )
649
f67539c2
TL
650 try:
651 all_slots, slots_to_add, daemons_to_remove = ha.place()
b3b6e05e 652 daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get(
522d829b 653 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)]
f67539c2
TL
654 self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove))
655 except OrchestratorError as e:
a4b75251
TL
656 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
657 self.log.error(msg)
f67539c2 658 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
a4b75251
TL
659 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
660 warnings = []
661 for x in self.mgr.apply_spec_fails:
662 warnings.append(f'{x[0]}: {x[1]}')
663 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
664 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
665 len(self.mgr.apply_spec_fails),
666 warnings)
f67539c2 667 return False
f91f0fd5
TL
668
669 r = None
670
671 # sanity check
f67539c2
TL
672 final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove)
673 if service_type in ['mon', 'mgr'] and final_count < 1:
674 self.log.debug('cannot scale mon|mgr below 1)')
f91f0fd5
TL
675 return False
676
b3b6e05e
TL
677 # progress
678 progress_id = str(uuid.uuid4())
679 delta: List[str] = []
680 if slots_to_add:
681 delta += [f'+{len(slots_to_add)}']
682 if daemons_to_remove:
683 delta += [f'-{len(daemons_to_remove)}']
684 progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
685 progress_total = len(slots_to_add) + len(daemons_to_remove)
686 progress_done = 0
687
688 def update_progress() -> None:
689 self.mgr.remote(
690 'progress', 'update', progress_id,
691 ev_msg=progress_title,
692 ev_progress=(progress_done / progress_total),
693 add_to_ceph_s=True,
694 )
695
696 if progress_total:
697 update_progress()
698
f91f0fd5
TL
699 # add any?
700 did_config = False
701
f67539c2
TL
702 self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
703 self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
f91f0fd5 704
20effc67
TL
705 hosts_altered: Set[str] = set()
706
522d829b
TL
707 try:
708 # assign names
709 for i in range(len(slots_to_add)):
710 slot = slots_to_add[i]
711 slot = slot.assign_name(self.mgr.get_unique_name(
712 slot.daemon_type,
713 slot.hostname,
33c7a0ef 714 [d for d in daemons if d not in daemons_to_remove],
522d829b
TL
715 prefix=spec.service_id,
716 forcename=slot.name,
717 rank=slot.rank,
718 rank_generation=slot.rank_generation,
719 ))
720 slots_to_add[i] = slot
721 if rank_map is not None:
722 assert slot.rank is not None
723 assert slot.rank_generation is not None
724 assert rank_map[slot.rank][slot.rank_generation] is None
725 rank_map[slot.rank][slot.rank_generation] = slot.name
726
727 if rank_map:
728 # record the rank_map before we make changes so that if we fail the
729 # next mgr will clean up.
730 self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
731
732 # remove daemons now, since we are going to fence them anyway
f67539c2 733 for d in daemons_to_remove:
522d829b 734 assert d.hostname is not None
f67539c2 735 self._remove_daemon(d.name(), d.hostname)
522d829b
TL
736 daemons_to_remove = []
737
738 # fence them
739 svc.fence_old_ranks(spec, rank_map, len(all_slots))
740
741 # create daemons
a4b75251 742 daemon_place_fails = []
522d829b 743 for slot in slots_to_add:
33c7a0ef
TL
744 # first remove daemon with conflicting port or name?
745 if slot.ports or slot.name in [d.name() for d in daemons_to_remove]:
522d829b 746 for d in daemons_to_remove:
33c7a0ef
TL
747 if (
748 d.hostname != slot.hostname
749 or not (set(d.ports or []) & set(slot.ports))
750 or (d.ip and slot.ip and d.ip != slot.ip)
751 and d.name() != slot.name
752 ):
522d829b 753 continue
33c7a0ef
TL
754 if d.name() != slot.name:
755 self.log.info(
756 f'Removing {d.name()} before deploying to {slot} to avoid a port or conflict'
757 )
522d829b
TL
758 # NOTE: we don't check ok-to-stop here to avoid starvation if
759 # there is only 1 gateway.
760 self._remove_daemon(d.name(), d.hostname)
761 daemons_to_remove.remove(d)
762 progress_done += 1
20effc67 763 hosts_altered.add(d.hostname)
522d829b
TL
764 break
765
766 # deploy new daemon
767 daemon_id = slot.name
768 if not did_config:
769 svc.config(spec)
770 did_config = True
771
772 daemon_spec = svc.make_daemon_spec(
773 slot.hostname, daemon_id, slot.network, spec,
774 daemon_type=slot.daemon_type,
775 ports=slot.ports,
776 ip=slot.ip,
777 rank=slot.rank,
778 rank_generation=slot.rank_generation,
779 )
780 self.log.debug('Placing %s.%s on host %s' % (
781 slot.daemon_type, daemon_id, slot.hostname))
782
783 try:
784 daemon_spec = svc.prepare_create(daemon_spec)
20effc67 785 self.mgr.wait_async(self._create_daemon(daemon_spec))
522d829b 786 r = True
b3b6e05e 787 progress_done += 1
522d829b 788 update_progress()
20effc67 789 hosts_altered.add(daemon_spec.host)
522d829b
TL
790 except (RuntimeError, OrchestratorError) as e:
791 msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
792 f"on {slot.hostname}: {e}")
793 self.mgr.events.for_service(spec, 'ERROR', msg)
794 self.mgr.log.error(msg)
a4b75251 795 daemon_place_fails.append(msg)
522d829b
TL
796 # only return "no change" if no one else has already succeeded.
797 # later successes will also change to True
798 if r is None:
799 r = False
800 progress_done += 1
801 update_progress()
802 continue
f91f0fd5 803
522d829b
TL
804 # add to daemon list so next name(s) will also be unique
805 sd = orchestrator.DaemonDescription(
806 hostname=slot.hostname,
807 daemon_type=slot.daemon_type,
808 daemon_id=daemon_id,
809 )
810 daemons.append(sd)
811
a4b75251 812 if daemon_place_fails:
20effc67
TL
813 self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(
814 daemon_place_fails), daemon_place_fails)
a4b75251 815
33c7a0ef
TL
816 if service_type == 'mgr':
817 active_mgr = svc.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr'))
818 if active_mgr.daemon_id in [d.daemon_id for d in daemons_to_remove]:
819 # We can't just remove the active mgr like any other daemon.
820 # Need to fail over later so it can be removed on next pass.
821 # This can be accomplished by scheduling a restart of the active mgr.
822 self.mgr._schedule_daemon_action(active_mgr.name(), 'restart')
823
522d829b
TL
824 # remove any?
825 def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
826 daemon_ids = [d.daemon_id for d in remove_daemons]
827 assert None not in daemon_ids
828 # setting force flag retains previous behavior
829 r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True)
830 return not r.retval
831
832 while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
833 # let's find a subset that is ok-to-stop
834 daemons_to_remove.pop()
835 for d in daemons_to_remove:
f91f0fd5 836 r = True
522d829b
TL
837 assert d.hostname is not None
838 self._remove_daemon(d.name(), d.hostname)
839
b3b6e05e
TL
840 progress_done += 1
841 update_progress()
20effc67 842 hosts_altered.add(d.hostname)
f91f0fd5 843
522d829b
TL
844 self.mgr.remote('progress', 'complete', progress_id)
845 except Exception as e:
846 self.mgr.remote('progress', 'fail', progress_id, str(e))
847 raise
20effc67
TL
848 finally:
849 if self.mgr.use_agent:
850 # can only send ack to agents if we know for sure port they bound to
851 hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
852 h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
853 self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
b3b6e05e 854
f91f0fd5
TL
855 if r is None:
856 r = False
857 return r
858
859 def _check_daemons(self) -> None:
20effc67 860 self.log.debug('_check_daemons')
f91f0fd5
TL
861 daemons = self.mgr.cache.get_daemons()
862 daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
863 for dd in daemons:
864 # orphan?
f67539c2
TL
865 spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None)
866 assert dd.hostname is not None
867 assert dd.daemon_type is not None
868 assert dd.daemon_id is not None
39ae355f
TL
869
870 # any action we can try will fail for a daemon on an offline host,
871 # including removing the daemon
872 if dd.hostname in self.mgr.offline_hosts:
873 continue
874
f91f0fd5
TL
875 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
876 # (mon and mgr specs should always exist; osds aren't matched
877 # to a service spec)
878 self.log.info('Removing orphan daemon %s...' % dd.name())
f67539c2 879 self._remove_daemon(dd.name(), dd.hostname)
f91f0fd5
TL
880
881 # ignore unmanaged services
882 if spec and spec.unmanaged:
883 continue
884
b3b6e05e
TL
885 # ignore daemons for deleted services
886 if dd.service_name() in self.mgr.spec_store.spec_deleted:
887 continue
888
20effc67
TL
889 if dd.daemon_type == 'agent':
890 try:
891 self.mgr.agent_helpers._check_agent(dd.hostname)
892 except Exception as e:
893 self.log.debug(
894 f'Agent {dd.name()} could not be checked in _check_daemons: {e}')
895 continue
896
f91f0fd5 897 # These daemon types require additional configs after creation
522d829b 898 if dd.daemon_type in REQUIRES_POST_ACTIONS:
f91f0fd5
TL
899 daemons_post[dd.daemon_type].append(dd)
900
f67539c2 901 if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
f91f0fd5
TL
902 self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
903 dd.is_active = True
904 else:
905 dd.is_active = False
906
f67539c2 907 deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id)
f91f0fd5
TL
908 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
909 dd.hostname, dd.name())
910 if last_deps is None:
911 last_deps = []
912 action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
913 if not last_config:
914 self.log.info('Reconfiguring %s (unknown last config time)...' % (
915 dd.name()))
916 action = 'reconfig'
917 elif last_deps != deps:
918 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
919 deps))
920 self.log.info('Reconfiguring %s (dependencies changed)...' % (
921 dd.name()))
922 action = 'reconfig'
20effc67
TL
923 elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
924 self.log.debug(
925 f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
926 self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .')
927 dd.extra_container_args = spec.extra_container_args
928 action = 'redeploy'
39ae355f
TL
929 elif spec is not None and hasattr(spec, 'extra_entrypoint_args') and dd.extra_entrypoint_args != spec.extra_entrypoint_args:
930 self.log.info(f'Redeploying {dd.name()}, (entrypoint args changed) . . .')
931 self.log.debug(
932 f'{dd.name()} daemon entrypoint args {dd.extra_entrypoint_args} -> {spec.extra_entrypoint_args}')
933 dd.extra_entrypoint_args = spec.extra_entrypoint_args
934 action = 'redeploy'
f91f0fd5
TL
935 elif self.mgr.last_monmap and \
936 self.mgr.last_monmap > last_config and \
937 dd.daemon_type in CEPH_TYPES:
938 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
939 action = 'reconfig'
940 elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
941 dd.daemon_type in CEPH_TYPES:
942 self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
943 action = 'reconfig'
944 if action:
945 if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
946 and action == 'reconfig':
947 action = 'redeploy'
948 try:
f67539c2
TL
949 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
950 self.mgr._daemon_action(daemon_spec, action=action)
20effc67
TL
951 if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()):
952 self.mgr.cache.save_host(dd.hostname)
f91f0fd5 953 except OrchestratorError as e:
39ae355f 954 self.log.exception(e)
f91f0fd5
TL
955 self.mgr.events.from_orch_error(e)
956 if dd.daemon_type in daemons_post:
957 del daemons_post[dd.daemon_type]
958 # continue...
959 except Exception as e:
39ae355f 960 self.log.exception(e)
f91f0fd5
TL
961 self.mgr.events.for_daemon_from_exception(dd.name(), e)
962 if dd.daemon_type in daemons_post:
963 del daemons_post[dd.daemon_type]
964 # continue...
965
966 # do daemon post actions
967 for daemon_type, daemon_descs in daemons_post.items():
a4b75251
TL
968 run_post = False
969 for d in daemon_descs:
970 if d.name() in self.mgr.requires_post_actions:
971 self.mgr.requires_post_actions.remove(d.name())
972 run_post = True
973 if run_post:
f67539c2
TL
974 self.mgr._get_cephadm_service(daemon_type_to_service(
975 daemon_type)).daemon_check_post(daemon_descs)
976
977 def _purge_deleted_services(self) -> None:
20effc67 978 self.log.debug('_purge_deleted_services')
f67539c2
TL
979 existing_services = self.mgr.spec_store.all_specs.items()
980 for service_name, spec in list(existing_services):
981 if service_name not in self.mgr.spec_store.spec_deleted:
982 continue
983 if self.mgr.cache.get_daemons_by_service(service_name):
984 continue
985 if spec.service_type in ['mon', 'mgr']:
986 continue
987
988 logger.info(f'Purge service {service_name}')
989
990 self.mgr.cephadm_services[spec.service_type].purge(service_name)
991 self.mgr.spec_store.finally_rm(service_name)
f91f0fd5 992
adb31ebb 993 def convert_tags_to_repo_digest(self) -> None:
f91f0fd5
TL
994 if not self.mgr.use_repo_digest:
995 return
996 settings = self.mgr.upgrade.get_distinct_container_image_settings()
997 digests: Dict[str, ContainerInspectInfo] = {}
998 for container_image_ref in set(settings.values()):
999 if not is_repo_digest(container_image_ref):
20effc67
TL
1000 image_info = self.mgr.wait_async(
1001 self._get_container_image_info(container_image_ref))
f67539c2
TL
1002 if image_info.repo_digests:
1003 # FIXME: we assume the first digest here is the best
1004 assert is_repo_digest(image_info.repo_digests[0]), image_info
f91f0fd5
TL
1005 digests[container_image_ref] = image_info
1006
1007 for entity, container_image_ref in settings.items():
1008 if not is_repo_digest(container_image_ref):
1009 image_info = digests[container_image_ref]
f67539c2
TL
1010 if image_info.repo_digests:
1011 # FIXME: we assume the first digest here is the best
1012 self.mgr.set_container_image(entity, image_info.repo_digests[0])
1013
20effc67
TL
1014 def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]:
1015 # host -> path -> (mode, uid, gid, content, digest)
1016 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
1017
1018 # ceph.conf
1019 config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
1020 config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
33c7a0ef 1021 cluster_cfg_dir = f'/var/lib/ceph/{self.mgr._cluster_fsid}/config'
20effc67
TL
1022
1023 if self.mgr.manage_etc_ceph_ceph_conf:
1024 try:
1025 pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
1026 ha = HostAssignment(
1027 spec=ServiceSpec('mon', placement=pspec),
1028 hosts=self.mgr.cache.get_schedulable_hosts(),
1029 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
2a845540 1030 draining_hosts=self.mgr.cache.get_draining_hosts(),
20effc67
TL
1031 daemons=[],
1032 networks=self.mgr.cache.networks,
1033 )
1034 all_slots, _, _ = ha.place()
1035 for host in {s.hostname for s in all_slots}:
1036 if host not in client_files:
1037 client_files[host] = {}
33c7a0ef
TL
1038 ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest))
1039 client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf
1040 client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
20effc67
TL
1041 except Exception as e:
1042 self.mgr.log.warning(
1043 f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
1044
1045 # client keyrings
1046 for ks in self.mgr.keys.keys.values():
1047 try:
1048 ret, keyring, err = self.mgr.mon_command({
1049 'prefix': 'auth get',
1050 'entity': ks.entity,
1051 })
1052 if ret:
1053 self.log.warning(f'unable to fetch keyring for {ks.entity}')
1054 continue
1055 digest = ''.join('%02x' % c for c in hashlib.sha256(
1056 keyring.encode('utf-8')).digest())
1057 ha = HostAssignment(
1058 spec=ServiceSpec('mon', placement=ks.placement),
1059 hosts=self.mgr.cache.get_schedulable_hosts(),
1060 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
2a845540 1061 draining_hosts=self.mgr.cache.get_draining_hosts(),
20effc67
TL
1062 daemons=[],
1063 networks=self.mgr.cache.networks,
1064 )
1065 all_slots, _, _ = ha.place()
1066 for host in {s.hostname for s in all_slots}:
1067 if host not in client_files:
1068 client_files[host] = {}
33c7a0ef
TL
1069 ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest))
1070 client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf
1071 client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
1072 ceph_admin_key = (ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest)
1073 client_files[host][ks.path] = ceph_admin_key
1074 client_files[host][f'{cluster_cfg_dir}/{os.path.basename(ks.path)}'] = ceph_admin_key
20effc67
TL
1075 except Exception as e:
1076 self.log.warning(
1077 f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
1078 return client_files
1079
39ae355f
TL
1080 def _write_all_client_files(self) -> None:
1081 if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
1082 client_files = self._calc_client_files()
1083 else:
1084 client_files = {}
1085
1086 @forall_hosts
1087 def _write_files(host: str) -> None:
1088 self._write_client_files(client_files, host)
1089
1090 _write_files(self.mgr.cache.get_hosts())
1091
20effc67
TL
1092 def _write_client_files(self,
1093 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]],
1094 host: str) -> None:
1095 updated_files = False
2a845540
TL
1096 if host in self.mgr.offline_hosts:
1097 return
20effc67
TL
1098 old_files = self.mgr.cache.get_host_client_files(host).copy()
1099 for path, m in client_files.get(host, {}).items():
1100 mode, uid, gid, content, digest = m
1101 if path in old_files:
1102 match = old_files[path] == (digest, mode, uid, gid)
1103 del old_files[path]
1104 if match:
1105 continue
1106 self.log.info(f'Updating {host}:{path}')
1107 self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid)
1108 self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
1109 updated_files = True
1110 for path in old_files.keys():
2a845540
TL
1111 if path == '/etc/ceph/ceph.conf':
1112 continue
20effc67
TL
1113 self.log.info(f'Removing {host}:{path}')
1114 cmd = ['rm', '-f', path]
1115 self.mgr.ssh.check_execute_command(host, cmd)
1116 updated_files = True
1117 self.mgr.cache.removed_client_file(host, path)
1118 if updated_files:
1119 self.mgr.cache.save_host(host)
1120
1121 async def _create_daemon(self,
1122 daemon_spec: CephadmDaemonDeploySpec,
1123 reconfig: bool = False,
1124 osd_uuid_map: Optional[Dict[str, Any]] = None,
1125 ) -> str:
f67539c2
TL
1126
1127 with set_exception_subject('service', orchestrator.DaemonDescription(
1128 daemon_type=daemon_spec.daemon_type,
1129 daemon_id=daemon_spec.daemon_id,
1130 hostname=daemon_spec.host,
1131 ).service_id(), overwrite=True):
1132
1133 try:
1134 image = ''
1135 start_time = datetime_now()
1136 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1137
1138 if daemon_spec.daemon_type == 'container':
1139 spec = cast(CustomContainerSpec,
1140 self.mgr.spec_store[daemon_spec.service_name].spec)
1141 image = spec.image
1142 if spec.ports:
1143 ports.extend(spec.ports)
1144
f67539c2
TL
1145 # TCP port to open in the host firewall
1146 if len(ports) > 0:
1147 daemon_spec.extra_args.extend([
1148 '--tcp-ports', ' '.join(map(str, ports))
1149 ])
1150
1151 # osd deployments needs an --osd-uuid arg
1152 if daemon_spec.daemon_type == 'osd':
1153 if not osd_uuid_map:
1154 osd_uuid_map = self.mgr.get_osd_uuid_map()
1155 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1156 if not osd_uuid:
1157 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1158 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1159
1160 if reconfig:
1161 daemon_spec.extra_args.append('--reconfig')
1162 if self.mgr.allow_ptrace:
1163 daemon_spec.extra_args.append('--allow-ptrace')
1164
39ae355f 1165 daemon_spec, extra_container_args, extra_entrypoint_args = self._setup_extra_deployment_args(daemon_spec)
20effc67 1166
2a845540
TL
1167 if daemon_spec.service_name in self.mgr.spec_store:
1168 configs = self.mgr.spec_store[daemon_spec.service_name].spec.custom_configs
1169 if configs is not None:
1170 daemon_spec.final_config.update(
1171 {'custom_config_files': [c.to_json() for c in configs]})
1172
f67539c2 1173 if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
20effc67 1174 await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials'))))
f67539c2
TL
1175
1176 self.log.info('%s daemon %s on %s' % (
1177 'Reconfiguring' if reconfig else 'Deploying',
1178 daemon_spec.name(), daemon_spec.host))
1179
20effc67 1180 out, err, code = await self._run_cephadm(
f67539c2
TL
1181 daemon_spec.host, daemon_spec.name(), 'deploy',
1182 [
1183 '--name', daemon_spec.name(),
1184 '--meta-json', json.dumps({
1185 'service_name': daemon_spec.service_name,
1186 'ports': daemon_spec.ports,
1187 'ip': daemon_spec.ip,
1188 'deployed_by': self.mgr.get_active_mgr_digests(),
b3b6e05e
TL
1189 'rank': daemon_spec.rank,
1190 'rank_generation': daemon_spec.rank_generation,
39ae355f
TL
1191 'extra_container_args': extra_container_args,
1192 'extra_entrypoint_args': extra_entrypoint_args
f67539c2
TL
1193 }),
1194 '--config-json', '-',
1195 ] + daemon_spec.extra_args,
1196 stdin=json.dumps(daemon_spec.final_config),
20effc67
TL
1197 image=image,
1198 )
1199
1200 if daemon_spec.daemon_type == 'agent':
1201 self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now()
1202 self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1
f67539c2
TL
1203
1204 # refresh daemon state? (ceph daemon reconfig does not need it)
1205 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
1206 if not code and daemon_spec.host in self.mgr.cache.daemons:
1207 # prime cached service state with what we (should have)
1208 # just created
1209 sd = daemon_spec.to_daemon_description(
20effc67 1210 DaemonDescriptionStatus.starting, 'starting')
f67539c2 1211 self.mgr.cache.add_daemon(daemon_spec.host, sd)
522d829b 1212 if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS:
a4b75251 1213 self.mgr.requires_post_actions.add(daemon_spec.name())
f67539c2
TL
1214 self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
1215
20effc67
TL
1216 if daemon_spec.daemon_type != 'agent':
1217 self.mgr.cache.update_daemon_config_deps(
1218 daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
1219 self.mgr.cache.save_host(daemon_spec.host)
1220 else:
1221 self.mgr.agent_cache.update_agent_config_deps(
1222 daemon_spec.host, daemon_spec.deps, start_time)
1223 self.mgr.agent_cache.save_agent(daemon_spec.host)
f67539c2
TL
1224 msg = "{} {} on host '{}'".format(
1225 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1226 if not code:
1227 self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1228 else:
1229 what = 'reconfigure' if reconfig else 'deploy'
1230 self.mgr.events.for_daemon(
1231 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1232 return msg
1233 except OrchestratorError:
1234 redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names()
1235 if not reconfig and not redeploy:
1236 # we have to clean up the daemon. E.g. keyrings.
1237 servict_type = daemon_type_to_service(daemon_spec.daemon_type)
1238 dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
a4b75251 1239 self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
f67539c2
TL
1240 raise
1241
39ae355f
TL
1242 def _setup_extra_deployment_args(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[CephadmDaemonDeploySpec, Optional[List[str]], Optional[List[str]]]:
1243 # this function is for handling any potential user specified
1244 # (in the service spec) extra runtime or entrypoint args for a daemon
1245 # we are going to deploy. Effectively just adds a set of extra args to
1246 # pass to the cephadm binary to indicate the daemon being deployed
1247 # needs extra runtime/entrypoint args. Returns the modified daemon spec
1248 # as well as what args were added (as those are included in unit.meta file)
1249 try:
1250 eca = daemon_spec.extra_container_args
1251 if eca:
1252 for a in eca:
1253 # args with spaces need to be split into multiple args
1254 # in order to work properly
1255 args = a.split(' ')
1256 for arg in args:
1257 if arg:
1258 daemon_spec.extra_args.append(f'--extra-container-args={arg}')
1259 except AttributeError:
1260 eca = None
1261 try:
1262 eea = daemon_spec.extra_entrypoint_args
1263 if eea:
1264 for a in eea:
1265 # args with spaces need to be split into multiple args
1266 # in order to work properly
1267 args = a.split(' ')
1268 for arg in args:
1269 if arg:
1270 daemon_spec.extra_args.append(f'--extra-entrypoint-args={arg}')
1271 except AttributeError:
1272 eea = None
1273 return daemon_spec, eca, eea
1274
20effc67 1275 def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str:
f67539c2
TL
1276 """
1277 Remove a daemon
1278 """
1279 (daemon_type, daemon_id) = name.split('.', 1)
1280 daemon = orchestrator.DaemonDescription(
1281 daemon_type=daemon_type,
1282 daemon_id=daemon_id,
1283 hostname=host)
1284
1285 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1286
1287 self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
f67539c2
TL
1288 # NOTE: we are passing the 'force' flag here, which means
1289 # we can delete a mon instances data.
33c7a0ef
TL
1290 dd = self.mgr.cache.get_daemon(daemon.daemon_name)
1291 if dd.ports:
1292 args = ['--name', name, '--force', '--tcp-ports', ' '.join(map(str, dd.ports))]
1293 else:
1294 args = ['--name', name, '--force']
1295
1296 self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports))
20effc67
TL
1297 out, err, code = self.mgr.wait_async(self._run_cephadm(
1298 host, name, 'rm-daemon', args))
f67539c2
TL
1299 if not code:
1300 # remove item from cache
1301 self.mgr.cache.rm_daemon(host, name)
1302 self.mgr.cache.invalidate_host_daemons(host)
1303
20effc67 1304 if not no_post_remove:
39ae355f
TL
1305 if daemon_type not in ['iscsi']:
1306 self.mgr.cephadm_services[daemon_type_to_service(
1307 daemon_type)].post_remove(daemon, is_failed_deploy=False)
1308 else:
1309 self.mgr.scheduled_async_actions.append(lambda: self.mgr.cephadm_services[daemon_type_to_service(
1310 daemon_type)].post_remove(daemon, is_failed_deploy=False))
1311 self.mgr._kick_serve_loop()
f67539c2
TL
1312
1313 return "Removed {} from host '{}'".format(name, host)
adb31ebb 1314
20effc67
TL
1315 async def _run_cephadm_json(self,
1316 host: str,
1317 entity: Union[CephadmNoImage, str],
1318 command: str,
1319 args: List[str],
1320 no_fsid: Optional[bool] = False,
39ae355f 1321 error_ok: Optional[bool] = False,
20effc67 1322 image: Optional[str] = "",
39ae355f 1323 log_output: Optional[bool] = True,
20effc67 1324 ) -> Any:
adb31ebb 1325 try:
20effc67 1326 out, err, code = await self._run_cephadm(
39ae355f
TL
1327 host, entity, command, args, no_fsid=no_fsid, error_ok=error_ok,
1328 image=image, log_output=log_output)
adb31ebb
TL
1329 if code:
1330 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
1331 except Exception as e:
1332 raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}')
1333 try:
1334 return json.loads(''.join(out))
1335 except (ValueError, KeyError):
1336 msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON'
1337 self.log.exception(f'{msg}: {"".join(out)}')
1338 raise OrchestratorError(msg)
1339
20effc67
TL
1340 async def _run_cephadm(self,
1341 host: str,
1342 entity: Union[CephadmNoImage, str],
1343 command: str,
1344 args: List[str],
1345 addr: Optional[str] = "",
1346 stdin: Optional[str] = "",
1347 no_fsid: Optional[bool] = False,
1348 error_ok: Optional[bool] = False,
1349 image: Optional[str] = "",
1350 env_vars: Optional[List[str]] = None,
39ae355f 1351 log_output: Optional[bool] = True,
20effc67 1352 ) -> Tuple[List[str], List[str], int]:
f67539c2
TL
1353 """
1354 Run cephadm on the remote host with the given command + args
1355
1356 Important: You probably don't want to run _run_cephadm from CLI handlers
1357
1358 :env_vars: in format -> [KEY=VALUE, ..]
1359 """
20effc67
TL
1360
1361 await self.mgr.ssh._remote_connection(host, addr)
1362
f67539c2
TL
1363 self.log.debug(f"_run_cephadm : command = {command}")
1364 self.log.debug(f"_run_cephadm : args = {args}")
1365
20effc67 1366 bypass_image = ('agent')
f67539c2 1367
20effc67
TL
1368 assert image or entity
1369 # Skip the image check for daemons deployed that are not ceph containers
1370 if not str(entity).startswith(bypass_image):
1371 if not image and entity is not cephadmNoImage:
1372 image = self.mgr._get_container_image(entity)
f67539c2 1373
20effc67 1374 final_args = []
f67539c2 1375
20effc67
TL
1376 # global args
1377 if env_vars:
1378 for env_var_pair in env_vars:
1379 final_args.extend(['--env', env_var_pair])
f67539c2 1380
20effc67
TL
1381 if image:
1382 final_args.extend(['--image', image])
f67539c2 1383
20effc67
TL
1384 if not self.mgr.container_init:
1385 final_args += ['--no-container-init']
f67539c2 1386
39ae355f
TL
1387 if not self.mgr.cgroups_split:
1388 final_args += ['--no-cgroups-split']
1389
20effc67
TL
1390 # subcommand
1391 final_args.append(command)
f67539c2 1392
20effc67
TL
1393 # subcommand args
1394 if not no_fsid:
1395 final_args += ['--fsid', self.mgr._cluster_fsid]
f67539c2 1396
20effc67 1397 final_args += args
f67539c2 1398
20effc67
TL
1399 # exec
1400 self.log.debug('args: %s' % (' '.join(final_args)))
1401 if self.mgr.mode == 'root':
1402 # agent has cephadm binary as an extra file which is
1403 # therefore passed over stdin. Even for debug logs it's too much
1404 if stdin and 'agent' not in str(entity):
1405 self.log.debug('stdin: %s' % stdin)
f67539c2 1406
20effc67
TL
1407 cmd = ['which', 'python3']
1408 python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
1409 cmd = [python, self.mgr.cephadm_binary_path] + final_args
f67539c2 1410
20effc67
TL
1411 try:
1412 out, err, code = await self.mgr.ssh._execute_command(
1413 host, cmd, stdin=stdin, addr=addr)
1414 if code == 2:
1415 ls_cmd = ['ls', self.mgr.cephadm_binary_path]
39ae355f
TL
1416 out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr,
1417 log_command=log_output)
20effc67
TL
1418 if code_ls == 2:
1419 await self._deploy_cephadm_binary(host, addr)
1420 out, err, code = await self.mgr.ssh._execute_command(
1421 host, cmd, stdin=stdin, addr=addr)
1422 # if there is an agent on this host, make sure it is using the most recent
1423 # vesion of cephadm binary
1424 if host in self.mgr.inventory:
1425 for agent in self.mgr.cache.get_daemons_by_type('agent', host):
1426 self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
1427
1428 except Exception as e:
1429 await self.mgr.ssh._reset_con(host)
1430 if error_ok:
1431 return [], [str(e)], 1
1432 raise
1433
1434 elif self.mgr.mode == 'cephadm-package':
1435 try:
1436 cmd = ['/usr/bin/cephadm'] + final_args
1437 out, err, code = await self.mgr.ssh._execute_command(
1438 host, cmd, stdin=stdin, addr=addr)
1439 except Exception as e:
1440 await self.mgr.ssh._reset_con(host)
1441 if error_ok:
1442 return [], [str(e)], 1
1443 raise
1444 else:
1445 assert False, 'unsupported mode'
1446
39ae355f
TL
1447 if log_output:
1448 self.log.debug(f'code: {code}')
1449 if out:
1450 self.log.debug(f'out: {out}')
1451 if err:
1452 self.log.debug(f'err: {err}')
20effc67
TL
1453 if code and not error_ok:
1454 raise OrchestratorError(
1455 f'cephadm exited with an error code: {code}, stderr: {err}')
1456 return [out], [err], code
1457
1458 async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
f67539c2
TL
1459 # pick a random host...
1460 host = None
1461 for host_name in self.mgr.inventory.keys():
1462 host = host_name
1463 break
1464 if not host:
1465 raise OrchestratorError('no hosts defined')
1466 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
20effc67 1467 await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials'))))
f67539c2 1468
39ae355f
TL
1469 j = None
1470 if not self.mgr.use_repo_digest:
1471 try:
1472 j = await self._run_cephadm_json(host, '', 'inspect-image', [],
1473 image=image_name, no_fsid=True,
1474 error_ok=True)
1475 except OrchestratorError:
1476 pass
a4b75251 1477
39ae355f
TL
1478 if not j:
1479 pullargs: List[str] = []
1480 if self.mgr.registry_insecure:
1481 pullargs.append("--insecure")
f67539c2 1482
39ae355f
TL
1483 j = await self._run_cephadm_json(host, '', 'pull', pullargs,
1484 image=image_name, no_fsid=True)
f67539c2
TL
1485 r = ContainerInspectInfo(
1486 j['image_id'],
1487 j.get('ceph_version'),
1488 j.get('repo_digests')
1489 )
1490 self.log.debug(f'image {image_name} -> {r}')
1491 return r
1492
1493 # function responsible for logging single host into custom registry
20effc67
TL
1494 async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]:
1495 self.log.debug(
1496 f"Attempting to log host {host} into custom registry @ {registry_json['url']}")
f67539c2 1497 # want to pass info over stdin rather than through normal list of args
20effc67 1498 out, err, code = await self._run_cephadm(
f67539c2 1499 host, 'mon', 'registry-login',
20effc67 1500 ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True)
f67539c2 1501 if code:
20effc67 1502 return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password"
f67539c2
TL
1503 return None
1504
20effc67 1505 async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
f67539c2
TL
1506 # Use tee (from coreutils) to create a copy of cephadm on the target machine
1507 self.log.info(f"Deploying cephadm binary to {host}")
20effc67
TL
1508 await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
1509 self.mgr._cephadm.encode('utf-8'), addr=addr)