]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/serve.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
1 import hashlib
2 import json
3 import logging
4 import uuid
5 import os
6 from collections import defaultdict
7 from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
8 DefaultDict
9
10 from ceph.deployment import inventory
11 from ceph.deployment.drive_group import DriveGroupSpec
12 from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
13 from ceph.utils import datetime_now
14
15 import orchestrator
16 from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
17 DaemonDescriptionStatus, daemon_type_to_service
18 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
19 from cephadm.schedule import HostAssignment
20 from cephadm.autotune import MemoryAutotuner
21 from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
22 CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
23 from mgr_module import MonCommandFailed
24 from mgr_util import format_bytes
25
26 from . import utils
27
28 if TYPE_CHECKING:
29 from cephadm.module import CephadmOrchestrator
30
31 logger = logging.getLogger(__name__)
32
33 REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw']
34
35
36 class CephadmServe:
37 """
38 This module contains functions that are executed in the
39 serve() thread. Thus they don't block the CLI.
40
41 Please see the `Note regarding network calls from CLI handlers`
42 chapter in the cephadm developer guide.
43
44 On the other hand, These function should *not* be called form
45 CLI handlers, to avoid blocking the CLI
46 """
47
48 def __init__(self, mgr: "CephadmOrchestrator"):
49 self.mgr: "CephadmOrchestrator" = mgr
50 self.log = logger
51
52 def serve(self) -> None:
53 """
54 The main loop of cephadm.
55
56 A command handler will typically change the declarative state
57 of cephadm. This loop will then attempt to apply this new state.
58 """
59 self.log.debug("serve starting")
60 self.mgr.config_checker.load_network_config()
61
62 while self.mgr.run:
63 self.log.debug("serve loop start")
64
65 try:
66
67 self.convert_tags_to_repo_digest()
68
69 # refresh daemons
70 self.log.debug('refreshing hosts and daemons')
71 self._refresh_hosts_and_daemons()
72
73 self._check_for_strays()
74
75 self._update_paused_health()
76
77 if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard:
78 self.mgr.need_connect_dashboard_rgw = False
79 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
80 self.log.info('Checking dashboard <-> RGW credentials')
81 self.mgr.remote('dashboard', 'set_rgw_credentials')
82
83 if not self.mgr.paused:
84 self.mgr.to_remove_osds.process_removal_queue()
85
86 self.mgr.migration.migrate()
87 if self.mgr.migration.is_migration_ongoing():
88 continue
89
90 if self._apply_all_services():
91 continue # did something, refresh
92
93 self._check_daemons()
94
95 self._purge_deleted_services()
96
97 self._check_for_moved_osds()
98
99 if self.mgr.agent_helpers._handle_use_agent_setting():
100 continue
101
102 if self.mgr.upgrade.continue_upgrade():
103 continue
104
105 except OrchestratorError as e:
106 if e.event_subject:
107 self.mgr.events.from_orch_error(e)
108
109 self.log.debug("serve loop sleep")
110 self._serve_sleep()
111 self.log.debug("serve loop wake")
112 self.log.debug("serve exit")
113
114 def _serve_sleep(self) -> None:
115 sleep_interval = max(
116 30,
117 min(
118 self.mgr.host_check_interval,
119 self.mgr.facts_cache_timeout,
120 self.mgr.daemon_cache_timeout,
121 self.mgr.device_cache_timeout,
122 )
123 )
124 self.log.debug('Sleeping for %d seconds', sleep_interval)
125 self.mgr.event.wait(sleep_interval)
126 self.mgr.event.clear()
127
128 def _update_paused_health(self) -> None:
129 self.log.debug('_update_paused_health')
130 if self.mgr.paused:
131 self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [
132 "'ceph orch resume' to resume"])
133 else:
134 self.mgr.remove_health_warning('CEPHADM_PAUSED')
135
136 def _autotune_host_memory(self, host: str) -> None:
137 total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0)
138 if not total_mem:
139 val = None
140 else:
141 total_mem *= 1024 # kb -> bytes
142 total_mem *= self.mgr.autotune_memory_target_ratio
143 a = MemoryAutotuner(
144 daemons=self.mgr.cache.get_daemons_by_host(host),
145 config_get=self.mgr.get_foreign_ceph_option,
146 total_mem=total_mem,
147 )
148 val, osds = a.tune()
149 any_changed = False
150 for o in osds:
151 if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val:
152 self.mgr.check_mon_command({
153 'prefix': 'config rm',
154 'who': o,
155 'name': 'osd_memory_target',
156 })
157 any_changed = True
158 if val is not None:
159 if any_changed:
160 self.mgr.log.info(
161 f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
162 )
163 ret, out, err = self.mgr.mon_command({
164 'prefix': 'config set',
165 'who': f'osd/host:{host}',
166 'name': 'osd_memory_target',
167 'value': str(val),
168 })
169 if ret:
170 self.log.warning(
171 f'Unable to set osd_memory_target on {host} to {val}: {err}'
172 )
173 else:
174 self.mgr.check_mon_command({
175 'prefix': 'config rm',
176 'who': f'osd/host:{host}',
177 'name': 'osd_memory_target',
178 })
179 self.mgr.cache.update_autotune(host)
180
181 def _refresh_hosts_and_daemons(self) -> None:
182 self.log.debug('_refresh_hosts_and_daemons')
183 bad_hosts = []
184 failures = []
185
186 if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
187 client_files = self._calc_client_files()
188 else:
189 client_files = {}
190
191 agents_down: List[str] = []
192
193 @forall_hosts
194 def refresh(host: str) -> None:
195
196 # skip hosts that are in maintenance - they could be powered off
197 if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
198 return
199
200 if self.mgr.use_agent:
201 if self.mgr.agent_helpers._check_agent(host):
202 agents_down.append(host)
203
204 if self.mgr.cache.host_needs_check(host):
205 r = self._check_host(host)
206 if r is not None:
207 bad_hosts.append(r)
208
209 if (
210 not self.mgr.use_agent
211 or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]
212 or host in agents_down
213 ):
214 if self.mgr.cache.host_needs_daemon_refresh(host):
215 self.log.debug('refreshing %s daemons' % host)
216 r = self._refresh_host_daemons(host)
217 if r:
218 failures.append(r)
219
220 if self.mgr.cache.host_needs_facts_refresh(host):
221 self.log.debug(('Refreshing %s facts' % host))
222 r = self._refresh_facts(host)
223 if r:
224 failures.append(r)
225
226 if self.mgr.cache.host_needs_network_refresh(host):
227 self.log.debug(('Refreshing %s networks' % host))
228 r = self._refresh_host_networks(host)
229 if r:
230 failures.append(r)
231
232 if self.mgr.cache.host_needs_device_refresh(host):
233 self.log.debug('refreshing %s devices' % host)
234 r = self._refresh_host_devices(host)
235 if r:
236 failures.append(r)
237 self.mgr.cache.metadata_up_to_date[host] = True
238 elif not self.mgr.cache.get_daemons_by_type('agent', host=host):
239 if self.mgr.cache.host_needs_daemon_refresh(host):
240 self.log.debug('refreshing %s daemons' % host)
241 r = self._refresh_host_daemons(host)
242 if r:
243 failures.append(r)
244 self.mgr.cache.metadata_up_to_date[host] = True
245
246 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'):
247 self.log.debug(f"Logging `{host}` into custom registry")
248 r = self.mgr.wait_async(self._registry_login(
249 host, json.loads(str(self.mgr.get_store('registry_credentials')))))
250 if r:
251 bad_hosts.append(r)
252
253 if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
254 self.log.debug(f"refreshing OSDSpec previews for {host}")
255 r = self._refresh_host_osdspec_previews(host)
256 if r:
257 failures.append(r)
258
259 if (
260 self.mgr.cache.host_needs_autotune_memory(host)
261 and not self.mgr.inventory.has_label(host, '_no_autotune_memory')
262 ):
263 self.log.debug(f"autotuning memory for {host}")
264 self._autotune_host_memory(host)
265
266 self._write_client_files(client_files, host)
267
268 refresh(self.mgr.cache.get_hosts())
269
270 self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
271
272 self.mgr.config_checker.run_checks()
273
274 for k in [
275 'CEPHADM_HOST_CHECK_FAILED',
276 'CEPHADM_REFRESH_FAILED',
277 ]:
278 self.mgr.remove_health_warning(k)
279 if bad_hosts:
280 self.mgr.set_health_warning(
281 'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
282 if failures:
283 self.mgr.set_health_warning(
284 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
285 self.mgr.update_failed_daemon_health_check()
286
287 def _check_host(self, host: str) -> Optional[str]:
288 if host not in self.mgr.inventory:
289 return None
290 self.log.debug(' checking %s' % host)
291 try:
292 addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
293 out, err, code = self.mgr.wait_async(self._run_cephadm(
294 host, cephadmNoImage, 'check-host', [],
295 error_ok=True, no_fsid=True))
296 self.mgr.cache.update_last_host_check(host)
297 self.mgr.cache.save_host(host)
298 if code:
299 self.log.debug(' host %s (%s) failed check' % (host, addr))
300 if self.mgr.warn_on_failed_host_check:
301 return 'host %s (%s) failed check: %s' % (host, addr, err)
302 else:
303 self.log.debug(' host %s (%s) ok' % (host, addr))
304 except Exception as e:
305 self.log.debug(' host %s (%s) failed check' % (host, addr))
306 return 'host %s (%s) failed check: %s' % (host, addr, e)
307 return None
308
309 def _refresh_host_daemons(self, host: str) -> Optional[str]:
310 try:
311 ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True))
312 except OrchestratorError as e:
313 return str(e)
314 self.mgr._process_ls_output(host, ls)
315 return None
316
317 def _refresh_facts(self, host: str) -> Optional[str]:
318 try:
319 val = self.mgr.wait_async(self._run_cephadm_json(
320 host, cephadmNoImage, 'gather-facts', [], no_fsid=True))
321 except OrchestratorError as e:
322 return str(e)
323
324 self.mgr.cache.update_host_facts(host, val)
325
326 return None
327
328 def _refresh_host_devices(self, host: str) -> Optional[str]:
329 with_lsm = self.mgr.device_enhanced_scan
330 inventory_args = ['--', 'inventory',
331 '--format=json-pretty',
332 '--filter-for-batch']
333 if with_lsm:
334 inventory_args.insert(-1, "--with-lsm")
335
336 try:
337 try:
338 devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
339 inventory_args))
340 except OrchestratorError as e:
341 if 'unrecognized arguments: --filter-for-batch' in str(e):
342 rerun_args = inventory_args.copy()
343 rerun_args.remove('--filter-for-batch')
344 devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
345 rerun_args))
346 else:
347 raise
348
349 except OrchestratorError as e:
350 return str(e)
351
352 self.log.debug('Refreshed host %s devices (%d)' % (
353 host, len(devices)))
354 ret = inventory.Devices.from_json(devices)
355 self.mgr.cache.update_host_devices(host, ret.devices)
356 self.update_osdspec_previews(host)
357 self.mgr.cache.save_host(host)
358 return None
359
360 def _refresh_host_networks(self, host: str) -> Optional[str]:
361 try:
362 networks = self.mgr.wait_async(self._run_cephadm_json(
363 host, 'mon', 'list-networks', [], no_fsid=True))
364 except OrchestratorError as e:
365 return str(e)
366
367 self.log.debug('Refreshed host %s networks (%s)' % (
368 host, len(networks)))
369 self.mgr.cache.update_host_networks(host, networks)
370 self.mgr.cache.save_host(host)
371 return None
372
373 def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
374 self.update_osdspec_previews(host)
375 self.mgr.cache.save_host(host)
376 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
377 return None
378
379 def update_osdspec_previews(self, search_host: str = '') -> None:
380 # Set global 'pending' flag for host
381 self.mgr.cache.loading_osdspec_preview.add(search_host)
382 previews = []
383 # query OSDSpecs for host <search host> and generate/get the preview
384 # There can be multiple previews for one host due to multiple OSDSpecs.
385 previews.extend(self.mgr.osd_service.get_previews(search_host))
386 self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>')
387 self.mgr.cache.osdspec_previews[search_host] = previews
388 # Unset global 'pending' flag for host
389 self.mgr.cache.loading_osdspec_preview.remove(search_host)
390
391 def _check_for_strays(self) -> None:
392 self.log.debug('_check_for_strays')
393 for k in ['CEPHADM_STRAY_HOST',
394 'CEPHADM_STRAY_DAEMON']:
395 self.mgr.remove_health_warning(k)
396 if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
397 ls = self.mgr.list_servers()
398 self.log.debug(ls)
399 managed = self.mgr.cache.get_daemon_names()
400 host_detail = [] # type: List[str]
401 host_num_daemons = 0
402 daemon_detail = [] # type: List[str]
403 for item in ls:
404 host = item.get('hostname')
405 assert isinstance(host, str)
406 daemons = item.get('services') # misnomer!
407 assert isinstance(daemons, list)
408 missing_names = []
409 for s in daemons:
410 daemon_id = s.get('id')
411 assert daemon_id
412 name = '%s.%s' % (s.get('type'), daemon_id)
413 if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
414 metadata = self.mgr.get_metadata(
415 cast(str, s.get('type')), daemon_id, {})
416 assert metadata is not None
417 try:
418 if s.get('type') == 'rgw-nfs':
419 # https://tracker.ceph.com/issues/49573
420 name = metadata['id'][:-4]
421 else:
422 name = '%s.%s' % (s.get('type'), metadata['id'])
423 except (KeyError, TypeError):
424 self.log.debug(
425 "Failed to find daemon id for %s service %s" % (
426 s.get('type'), s.get('id')
427 )
428 )
429 if s.get('type') == 'tcmu-runner':
430 # because we don't track tcmu-runner daemons in the host cache
431 # and don't have a way to check if the daemon is part of iscsi service
432 # we assume that all tcmu-runner daemons are managed by cephadm
433 managed.append(name)
434 if host not in self.mgr.inventory:
435 missing_names.append(name)
436 host_num_daemons += 1
437 if name not in managed:
438 daemon_detail.append(
439 'stray daemon %s on host %s not managed by cephadm' % (name, host))
440 if missing_names:
441 host_detail.append(
442 'stray host %s has %d stray daemons: %s' % (
443 host, len(missing_names), missing_names))
444 if self.mgr.warn_on_stray_hosts and host_detail:
445 self.mgr.set_health_warning(
446 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
447 if self.mgr.warn_on_stray_daemons and daemon_detail:
448 self.mgr.set_health_warning(
449 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
450
451 def _check_for_moved_osds(self) -> None:
452 self.log.debug('_check_for_moved_osds')
453 all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
454 for dd in self.mgr.cache.get_daemons_by_type('osd'):
455 assert dd.daemon_id
456 all_osds[int(dd.daemon_id)].append(dd)
457 for osd_id, dds in all_osds.items():
458 if len(dds) <= 1:
459 continue
460 running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
461 error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
462 msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
463 logger.info(msg)
464 if len(running) != 1:
465 continue
466 osd = self.mgr.get_osd_by_id(osd_id)
467 if not osd or not osd['up']:
468 continue
469 for e in error:
470 assert e.hostname
471 try:
472 self._remove_daemon(e.name(), e.hostname, no_post_remove=True)
473 self.mgr.events.for_daemon(
474 e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'")
475 except OrchestratorError as ex:
476 self.mgr.events.from_orch_error(ex)
477 logger.exception(f'failed to remove duplicated daemon {e}')
478
479 def _apply_all_services(self) -> bool:
480 self.log.debug('_apply_all_services')
481 r = False
482 specs = [] # type: List[ServiceSpec]
483 # if metadata is not up to date, we still need to apply spec for agent
484 # since the agent is the one who gather the metadata. If we don't we
485 # end up stuck between wanting metadata to be up to date to apply specs
486 # and needing to apply the agent spec to get up to date metadata
487 if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
488 self.log.info('Metadata not up to date on all hosts. Skipping non agent specs')
489 try:
490 specs.append(self.mgr.spec_store['agent'].spec)
491 except Exception as e:
492 self.log.debug(f'Failed to find agent spec: {e}')
493 self.mgr.agent_helpers._apply_agent()
494 return r
495 else:
496 for sn, spec in self.mgr.spec_store.active_specs.items():
497 specs.append(spec)
498 for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
499 self.mgr.remove_health_warning(name)
500 self.mgr.apply_spec_fails = []
501 for spec in specs:
502 try:
503 if self._apply_service(spec):
504 r = True
505 except Exception as e:
506 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
507 self.log.exception(msg)
508 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
509 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
510 warnings = []
511 for x in self.mgr.apply_spec_fails:
512 warnings.append(f'{x[0]}: {x[1]}')
513 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
514 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
515 len(self.mgr.apply_spec_fails),
516 warnings)
517 self.mgr.update_watched_hosts()
518 return r
519
520 def _apply_service_config(self, spec: ServiceSpec) -> None:
521 if spec.config:
522 section = utils.name_to_config_section(spec.service_name())
523 for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']:
524 self.mgr.remove_health_warning(name)
525 invalid_config_options = []
526 options_failed_to_set = []
527 for k, v in spec.config.items():
528 try:
529 current = self.mgr.get_foreign_ceph_option(section, k)
530 except KeyError:
531 msg = f'Ignoring invalid {spec.service_name()} config option {k}'
532 self.log.warning(msg)
533 self.mgr.events.for_service(
534 spec, OrchestratorEvent.ERROR, f'Invalid config option {k}'
535 )
536 invalid_config_options.append(msg)
537 continue
538 if current != v:
539 self.log.debug(f'setting [{section}] {k} = {v}')
540 try:
541 self.mgr.check_mon_command({
542 'prefix': 'config set',
543 'name': k,
544 'value': str(v),
545 'who': section,
546 })
547 except MonCommandFailed as e:
548 msg = f'Failed to set {spec.service_name()} option {k}: {e}'
549 self.log.warning(msg)
550 options_failed_to_set.append(msg)
551
552 if invalid_config_options:
553 self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(
554 invalid_config_options), invalid_config_options)
555 if options_failed_to_set:
556 self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(
557 options_failed_to_set), options_failed_to_set)
558
559 def _apply_service(self, spec: ServiceSpec) -> bool:
560 """
561 Schedule a service. Deploy new daemons or remove old ones, depending
562 on the target label and count specified in the placement.
563 """
564 self.mgr.migration.verify_no_migration()
565
566 service_type = spec.service_type
567 service_name = spec.service_name()
568 if spec.unmanaged:
569 self.log.debug('Skipping unmanaged service %s' % service_name)
570 return False
571 if spec.preview_only:
572 self.log.debug('Skipping preview_only service %s' % service_name)
573 return False
574 self.log.debug('Applying service %s spec' % service_name)
575
576 if service_type == 'agent':
577 try:
578 assert self.mgr.cherrypy_thread
579 assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
580 except Exception:
581 self.log.info(
582 'Delaying applying agent spec until cephadm endpoint root cert created')
583 return False
584
585 self._apply_service_config(spec)
586
587 if service_type == 'osd':
588 self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
589 # TODO: return True would result in a busy loop
590 # can't know if daemon count changed; create_from_spec doesn't
591 # return a solid indication
592 return False
593
594 svc = self.mgr.cephadm_services[service_type]
595 daemons = self.mgr.cache.get_daemons_by_service(service_name)
596
597 public_networks: List[str] = []
598 if service_type == 'mon':
599 out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network'))
600 if '/' in out:
601 public_networks = [x.strip() for x in out.split(',')]
602 self.log.debug('mon public_network(s) is %s' % public_networks)
603
604 def matches_network(host):
605 # type: (str) -> bool
606 # make sure we have 1 or more IPs for any of those networks on that
607 # host
608 for network in public_networks:
609 if len(self.mgr.cache.networks[host].get(network, [])) > 0:
610 return True
611 self.log.info(
612 f"Filtered out host {host}: does not belong to mon public_network"
613 f" ({','.join(public_networks)})"
614 )
615 return False
616
617 rank_map = None
618 if svc.ranked():
619 rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
620 ha = HostAssignment(
621 spec=spec,
622 hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name(
623 ) == 'agent' else self.mgr.cache.get_schedulable_hosts(),
624 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
625 daemons=daemons,
626 networks=self.mgr.cache.networks,
627 filter_new_host=(
628 matches_network if service_type == 'mon'
629 else None
630 ),
631 allow_colo=svc.allow_colo(),
632 primary_daemon_type=svc.primary_daemon_type(),
633 per_host_daemon_type=svc.per_host_daemon_type(),
634 rank_map=rank_map,
635 )
636
637 try:
638 all_slots, slots_to_add, daemons_to_remove = ha.place()
639 daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get(
640 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)]
641 self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove))
642 except OrchestratorError as e:
643 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
644 self.log.error(msg)
645 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
646 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
647 warnings = []
648 for x in self.mgr.apply_spec_fails:
649 warnings.append(f'{x[0]}: {x[1]}')
650 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
651 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
652 len(self.mgr.apply_spec_fails),
653 warnings)
654 return False
655
656 r = None
657
658 # sanity check
659 final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove)
660 if service_type in ['mon', 'mgr'] and final_count < 1:
661 self.log.debug('cannot scale mon|mgr below 1)')
662 return False
663
664 # progress
665 progress_id = str(uuid.uuid4())
666 delta: List[str] = []
667 if slots_to_add:
668 delta += [f'+{len(slots_to_add)}']
669 if daemons_to_remove:
670 delta += [f'-{len(daemons_to_remove)}']
671 progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
672 progress_total = len(slots_to_add) + len(daemons_to_remove)
673 progress_done = 0
674
675 def update_progress() -> None:
676 self.mgr.remote(
677 'progress', 'update', progress_id,
678 ev_msg=progress_title,
679 ev_progress=(progress_done / progress_total),
680 add_to_ceph_s=True,
681 )
682
683 if progress_total:
684 update_progress()
685
686 # add any?
687 did_config = False
688
689 self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
690 self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
691
692 hosts_altered: Set[str] = set()
693
694 try:
695 # assign names
696 for i in range(len(slots_to_add)):
697 slot = slots_to_add[i]
698 slot = slot.assign_name(self.mgr.get_unique_name(
699 slot.daemon_type,
700 slot.hostname,
701 [d for d in daemons if d not in daemons_to_remove],
702 prefix=spec.service_id,
703 forcename=slot.name,
704 rank=slot.rank,
705 rank_generation=slot.rank_generation,
706 ))
707 slots_to_add[i] = slot
708 if rank_map is not None:
709 assert slot.rank is not None
710 assert slot.rank_generation is not None
711 assert rank_map[slot.rank][slot.rank_generation] is None
712 rank_map[slot.rank][slot.rank_generation] = slot.name
713
714 if rank_map:
715 # record the rank_map before we make changes so that if we fail the
716 # next mgr will clean up.
717 self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
718
719 # remove daemons now, since we are going to fence them anyway
720 for d in daemons_to_remove:
721 assert d.hostname is not None
722 self._remove_daemon(d.name(), d.hostname)
723 daemons_to_remove = []
724
725 # fence them
726 svc.fence_old_ranks(spec, rank_map, len(all_slots))
727
728 # create daemons
729 daemon_place_fails = []
730 for slot in slots_to_add:
731 # first remove daemon with conflicting port or name?
732 if slot.ports or slot.name in [d.name() for d in daemons_to_remove]:
733 for d in daemons_to_remove:
734 if (
735 d.hostname != slot.hostname
736 or not (set(d.ports or []) & set(slot.ports))
737 or (d.ip and slot.ip and d.ip != slot.ip)
738 and d.name() != slot.name
739 ):
740 continue
741 if d.name() != slot.name:
742 self.log.info(
743 f'Removing {d.name()} before deploying to {slot} to avoid a port or conflict'
744 )
745 # NOTE: we don't check ok-to-stop here to avoid starvation if
746 # there is only 1 gateway.
747 self._remove_daemon(d.name(), d.hostname)
748 daemons_to_remove.remove(d)
749 progress_done += 1
750 hosts_altered.add(d.hostname)
751 break
752
753 # deploy new daemon
754 daemon_id = slot.name
755 if not did_config:
756 svc.config(spec)
757 did_config = True
758
759 daemon_spec = svc.make_daemon_spec(
760 slot.hostname, daemon_id, slot.network, spec,
761 daemon_type=slot.daemon_type,
762 ports=slot.ports,
763 ip=slot.ip,
764 rank=slot.rank,
765 rank_generation=slot.rank_generation,
766 )
767 self.log.debug('Placing %s.%s on host %s' % (
768 slot.daemon_type, daemon_id, slot.hostname))
769
770 try:
771 daemon_spec = svc.prepare_create(daemon_spec)
772 self.mgr.wait_async(self._create_daemon(daemon_spec))
773 r = True
774 progress_done += 1
775 update_progress()
776 hosts_altered.add(daemon_spec.host)
777 except (RuntimeError, OrchestratorError) as e:
778 msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
779 f"on {slot.hostname}: {e}")
780 self.mgr.events.for_service(spec, 'ERROR', msg)
781 self.mgr.log.error(msg)
782 daemon_place_fails.append(msg)
783 # only return "no change" if no one else has already succeeded.
784 # later successes will also change to True
785 if r is None:
786 r = False
787 progress_done += 1
788 update_progress()
789 continue
790
791 # add to daemon list so next name(s) will also be unique
792 sd = orchestrator.DaemonDescription(
793 hostname=slot.hostname,
794 daemon_type=slot.daemon_type,
795 daemon_id=daemon_id,
796 )
797 daemons.append(sd)
798
799 if daemon_place_fails:
800 self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(
801 daemon_place_fails), daemon_place_fails)
802
803 if service_type == 'mgr':
804 active_mgr = svc.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr'))
805 if active_mgr.daemon_id in [d.daemon_id for d in daemons_to_remove]:
806 # We can't just remove the active mgr like any other daemon.
807 # Need to fail over later so it can be removed on next pass.
808 # This can be accomplished by scheduling a restart of the active mgr.
809 self.mgr._schedule_daemon_action(active_mgr.name(), 'restart')
810
811 # remove any?
812 def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
813 daemon_ids = [d.daemon_id for d in remove_daemons]
814 assert None not in daemon_ids
815 # setting force flag retains previous behavior
816 r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True)
817 return not r.retval
818
819 while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
820 # let's find a subset that is ok-to-stop
821 daemons_to_remove.pop()
822 for d in daemons_to_remove:
823 r = True
824 assert d.hostname is not None
825 self._remove_daemon(d.name(), d.hostname)
826
827 progress_done += 1
828 update_progress()
829 hosts_altered.add(d.hostname)
830
831 self.mgr.remote('progress', 'complete', progress_id)
832 except Exception as e:
833 self.mgr.remote('progress', 'fail', progress_id, str(e))
834 raise
835 finally:
836 if self.mgr.use_agent:
837 # can only send ack to agents if we know for sure port they bound to
838 hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
839 h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
840 self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
841
842 if r is None:
843 r = False
844 return r
845
846 def _check_daemons(self) -> None:
847 self.log.debug('_check_daemons')
848 daemons = self.mgr.cache.get_daemons()
849 daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
850 for dd in daemons:
851 # orphan?
852 spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None)
853 assert dd.hostname is not None
854 assert dd.daemon_type is not None
855 assert dd.daemon_id is not None
856 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
857 # (mon and mgr specs should always exist; osds aren't matched
858 # to a service spec)
859 self.log.info('Removing orphan daemon %s...' % dd.name())
860 self._remove_daemon(dd.name(), dd.hostname)
861
862 # ignore unmanaged services
863 if spec and spec.unmanaged:
864 continue
865
866 # ignore daemons for deleted services
867 if dd.service_name() in self.mgr.spec_store.spec_deleted:
868 continue
869
870 if dd.daemon_type == 'agent':
871 try:
872 self.mgr.agent_helpers._check_agent(dd.hostname)
873 except Exception as e:
874 self.log.debug(
875 f'Agent {dd.name()} could not be checked in _check_daemons: {e}')
876 continue
877
878 # These daemon types require additional configs after creation
879 if dd.daemon_type in REQUIRES_POST_ACTIONS:
880 daemons_post[dd.daemon_type].append(dd)
881
882 if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
883 self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
884 dd.is_active = True
885 else:
886 dd.is_active = False
887
888 deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id)
889 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
890 dd.hostname, dd.name())
891 if last_deps is None:
892 last_deps = []
893 action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
894 if not last_config:
895 self.log.info('Reconfiguring %s (unknown last config time)...' % (
896 dd.name()))
897 action = 'reconfig'
898 elif last_deps != deps:
899 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
900 deps))
901 self.log.info('Reconfiguring %s (dependencies changed)...' % (
902 dd.name()))
903 action = 'reconfig'
904 elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
905 self.log.debug(
906 f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
907 self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .')
908 dd.extra_container_args = spec.extra_container_args
909 action = 'redeploy'
910 elif self.mgr.last_monmap and \
911 self.mgr.last_monmap > last_config and \
912 dd.daemon_type in CEPH_TYPES:
913 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
914 action = 'reconfig'
915 elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
916 dd.daemon_type in CEPH_TYPES:
917 self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
918 action = 'reconfig'
919 if action:
920 if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
921 and action == 'reconfig':
922 action = 'redeploy'
923 try:
924 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
925 self.mgr._daemon_action(daemon_spec, action=action)
926 if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()):
927 self.mgr.cache.save_host(dd.hostname)
928 except OrchestratorError as e:
929 self.mgr.events.from_orch_error(e)
930 if dd.daemon_type in daemons_post:
931 del daemons_post[dd.daemon_type]
932 # continue...
933 except Exception as e:
934 self.mgr.events.for_daemon_from_exception(dd.name(), e)
935 if dd.daemon_type in daemons_post:
936 del daemons_post[dd.daemon_type]
937 # continue...
938
939 # do daemon post actions
940 for daemon_type, daemon_descs in daemons_post.items():
941 run_post = False
942 for d in daemon_descs:
943 if d.name() in self.mgr.requires_post_actions:
944 self.mgr.requires_post_actions.remove(d.name())
945 run_post = True
946 if run_post:
947 self.mgr._get_cephadm_service(daemon_type_to_service(
948 daemon_type)).daemon_check_post(daemon_descs)
949
950 def _purge_deleted_services(self) -> None:
951 self.log.debug('_purge_deleted_services')
952 existing_services = self.mgr.spec_store.all_specs.items()
953 for service_name, spec in list(existing_services):
954 if service_name not in self.mgr.spec_store.spec_deleted:
955 continue
956 if self.mgr.cache.get_daemons_by_service(service_name):
957 continue
958 if spec.service_type in ['mon', 'mgr']:
959 continue
960
961 logger.info(f'Purge service {service_name}')
962
963 self.mgr.cephadm_services[spec.service_type].purge(service_name)
964 self.mgr.spec_store.finally_rm(service_name)
965
966 def convert_tags_to_repo_digest(self) -> None:
967 if not self.mgr.use_repo_digest:
968 return
969 settings = self.mgr.upgrade.get_distinct_container_image_settings()
970 digests: Dict[str, ContainerInspectInfo] = {}
971 for container_image_ref in set(settings.values()):
972 if not is_repo_digest(container_image_ref):
973 image_info = self.mgr.wait_async(
974 self._get_container_image_info(container_image_ref))
975 if image_info.repo_digests:
976 # FIXME: we assume the first digest here is the best
977 assert is_repo_digest(image_info.repo_digests[0]), image_info
978 digests[container_image_ref] = image_info
979
980 for entity, container_image_ref in settings.items():
981 if not is_repo_digest(container_image_ref):
982 image_info = digests[container_image_ref]
983 if image_info.repo_digests:
984 # FIXME: we assume the first digest here is the best
985 self.mgr.set_container_image(entity, image_info.repo_digests[0])
986
987 def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]:
988 # host -> path -> (mode, uid, gid, content, digest)
989 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
990
991 # ceph.conf
992 config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
993 config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
994 cluster_cfg_dir = f'/var/lib/ceph/{self.mgr._cluster_fsid}/config'
995
996 if self.mgr.manage_etc_ceph_ceph_conf:
997 try:
998 pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
999 ha = HostAssignment(
1000 spec=ServiceSpec('mon', placement=pspec),
1001 hosts=self.mgr.cache.get_schedulable_hosts(),
1002 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
1003 daemons=[],
1004 networks=self.mgr.cache.networks,
1005 )
1006 all_slots, _, _ = ha.place()
1007 for host in {s.hostname for s in all_slots}:
1008 if host not in client_files:
1009 client_files[host] = {}
1010 ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest))
1011 client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf
1012 client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
1013 except Exception as e:
1014 self.mgr.log.warning(
1015 f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
1016
1017 # client keyrings
1018 for ks in self.mgr.keys.keys.values():
1019 try:
1020 ret, keyring, err = self.mgr.mon_command({
1021 'prefix': 'auth get',
1022 'entity': ks.entity,
1023 })
1024 if ret:
1025 self.log.warning(f'unable to fetch keyring for {ks.entity}')
1026 continue
1027 digest = ''.join('%02x' % c for c in hashlib.sha256(
1028 keyring.encode('utf-8')).digest())
1029 ha = HostAssignment(
1030 spec=ServiceSpec('mon', placement=ks.placement),
1031 hosts=self.mgr.cache.get_schedulable_hosts(),
1032 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
1033 daemons=[],
1034 networks=self.mgr.cache.networks,
1035 )
1036 all_slots, _, _ = ha.place()
1037 for host in {s.hostname for s in all_slots}:
1038 if host not in client_files:
1039 client_files[host] = {}
1040 ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest))
1041 client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf
1042 client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
1043 ceph_admin_key = (ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest)
1044 client_files[host][ks.path] = ceph_admin_key
1045 client_files[host][f'{cluster_cfg_dir}/{os.path.basename(ks.path)}'] = ceph_admin_key
1046 except Exception as e:
1047 self.log.warning(
1048 f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
1049 return client_files
1050
1051 def _write_client_files(self,
1052 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]],
1053 host: str) -> None:
1054 updated_files = False
1055 old_files = self.mgr.cache.get_host_client_files(host).copy()
1056 for path, m in client_files.get(host, {}).items():
1057 mode, uid, gid, content, digest = m
1058 if path in old_files:
1059 match = old_files[path] == (digest, mode, uid, gid)
1060 del old_files[path]
1061 if match:
1062 continue
1063 self.log.info(f'Updating {host}:{path}')
1064 self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid)
1065 self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
1066 updated_files = True
1067 for path in old_files.keys():
1068 self.log.info(f'Removing {host}:{path}')
1069 cmd = ['rm', '-f', path]
1070 self.mgr.ssh.check_execute_command(host, cmd)
1071 updated_files = True
1072 self.mgr.cache.removed_client_file(host, path)
1073 if updated_files:
1074 self.mgr.cache.save_host(host)
1075
1076 async def _create_daemon(self,
1077 daemon_spec: CephadmDaemonDeploySpec,
1078 reconfig: bool = False,
1079 osd_uuid_map: Optional[Dict[str, Any]] = None,
1080 ) -> str:
1081
1082 with set_exception_subject('service', orchestrator.DaemonDescription(
1083 daemon_type=daemon_spec.daemon_type,
1084 daemon_id=daemon_spec.daemon_id,
1085 hostname=daemon_spec.host,
1086 ).service_id(), overwrite=True):
1087
1088 try:
1089 image = ''
1090 start_time = datetime_now()
1091 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1092
1093 if daemon_spec.daemon_type == 'container':
1094 spec = cast(CustomContainerSpec,
1095 self.mgr.spec_store[daemon_spec.service_name].spec)
1096 image = spec.image
1097 if spec.ports:
1098 ports.extend(spec.ports)
1099
1100 # TCP port to open in the host firewall
1101 if len(ports) > 0:
1102 daemon_spec.extra_args.extend([
1103 '--tcp-ports', ' '.join(map(str, ports))
1104 ])
1105
1106 # osd deployments needs an --osd-uuid arg
1107 if daemon_spec.daemon_type == 'osd':
1108 if not osd_uuid_map:
1109 osd_uuid_map = self.mgr.get_osd_uuid_map()
1110 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1111 if not osd_uuid:
1112 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1113 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1114
1115 if reconfig:
1116 daemon_spec.extra_args.append('--reconfig')
1117 if self.mgr.allow_ptrace:
1118 daemon_spec.extra_args.append('--allow-ptrace')
1119
1120 try:
1121 eca = daemon_spec.extra_container_args
1122 if eca:
1123 for a in eca:
1124 daemon_spec.extra_args.append(f'--extra-container-args={a}')
1125 except AttributeError:
1126 eca = None
1127
1128 if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
1129 await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials'))))
1130
1131 self.log.info('%s daemon %s on %s' % (
1132 'Reconfiguring' if reconfig else 'Deploying',
1133 daemon_spec.name(), daemon_spec.host))
1134
1135 out, err, code = await self._run_cephadm(
1136 daemon_spec.host, daemon_spec.name(), 'deploy',
1137 [
1138 '--name', daemon_spec.name(),
1139 '--meta-json', json.dumps({
1140 'service_name': daemon_spec.service_name,
1141 'ports': daemon_spec.ports,
1142 'ip': daemon_spec.ip,
1143 'deployed_by': self.mgr.get_active_mgr_digests(),
1144 'rank': daemon_spec.rank,
1145 'rank_generation': daemon_spec.rank_generation,
1146 'extra_container_args': eca
1147 }),
1148 '--config-json', '-',
1149 ] + daemon_spec.extra_args,
1150 stdin=json.dumps(daemon_spec.final_config),
1151 image=image,
1152 )
1153
1154 if daemon_spec.daemon_type == 'agent':
1155 self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now()
1156 self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1
1157
1158 # refresh daemon state? (ceph daemon reconfig does not need it)
1159 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
1160 if not code and daemon_spec.host in self.mgr.cache.daemons:
1161 # prime cached service state with what we (should have)
1162 # just created
1163 sd = daemon_spec.to_daemon_description(
1164 DaemonDescriptionStatus.starting, 'starting')
1165 self.mgr.cache.add_daemon(daemon_spec.host, sd)
1166 if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS:
1167 self.mgr.requires_post_actions.add(daemon_spec.name())
1168 self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
1169
1170 if daemon_spec.daemon_type != 'agent':
1171 self.mgr.cache.update_daemon_config_deps(
1172 daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
1173 self.mgr.cache.save_host(daemon_spec.host)
1174 else:
1175 self.mgr.agent_cache.update_agent_config_deps(
1176 daemon_spec.host, daemon_spec.deps, start_time)
1177 self.mgr.agent_cache.save_agent(daemon_spec.host)
1178 msg = "{} {} on host '{}'".format(
1179 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1180 if not code:
1181 self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1182 else:
1183 what = 'reconfigure' if reconfig else 'deploy'
1184 self.mgr.events.for_daemon(
1185 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1186 return msg
1187 except OrchestratorError:
1188 redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names()
1189 if not reconfig and not redeploy:
1190 # we have to clean up the daemon. E.g. keyrings.
1191 servict_type = daemon_type_to_service(daemon_spec.daemon_type)
1192 dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
1193 self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
1194 raise
1195
1196 def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str:
1197 """
1198 Remove a daemon
1199 """
1200 (daemon_type, daemon_id) = name.split('.', 1)
1201 daemon = orchestrator.DaemonDescription(
1202 daemon_type=daemon_type,
1203 daemon_id=daemon_id,
1204 hostname=host)
1205
1206 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1207
1208 self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
1209 # NOTE: we are passing the 'force' flag here, which means
1210 # we can delete a mon instances data.
1211 dd = self.mgr.cache.get_daemon(daemon.daemon_name)
1212 if dd.ports:
1213 args = ['--name', name, '--force', '--tcp-ports', ' '.join(map(str, dd.ports))]
1214 else:
1215 args = ['--name', name, '--force']
1216
1217 self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports))
1218 out, err, code = self.mgr.wait_async(self._run_cephadm(
1219 host, name, 'rm-daemon', args))
1220 if not code:
1221 # remove item from cache
1222 self.mgr.cache.rm_daemon(host, name)
1223 self.mgr.cache.invalidate_host_daemons(host)
1224
1225 if not no_post_remove:
1226 self.mgr.cephadm_services[daemon_type_to_service(
1227 daemon_type)].post_remove(daemon, is_failed_deploy=False)
1228
1229 return "Removed {} from host '{}'".format(name, host)
1230
1231 async def _run_cephadm_json(self,
1232 host: str,
1233 entity: Union[CephadmNoImage, str],
1234 command: str,
1235 args: List[str],
1236 no_fsid: Optional[bool] = False,
1237 image: Optional[str] = "",
1238 ) -> Any:
1239 try:
1240 out, err, code = await self._run_cephadm(
1241 host, entity, command, args, no_fsid=no_fsid, image=image)
1242 if code:
1243 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
1244 except Exception as e:
1245 raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}')
1246 try:
1247 return json.loads(''.join(out))
1248 except (ValueError, KeyError):
1249 msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON'
1250 self.log.exception(f'{msg}: {"".join(out)}')
1251 raise OrchestratorError(msg)
1252
1253 async def _run_cephadm(self,
1254 host: str,
1255 entity: Union[CephadmNoImage, str],
1256 command: str,
1257 args: List[str],
1258 addr: Optional[str] = "",
1259 stdin: Optional[str] = "",
1260 no_fsid: Optional[bool] = False,
1261 error_ok: Optional[bool] = False,
1262 image: Optional[str] = "",
1263 env_vars: Optional[List[str]] = None,
1264 ) -> Tuple[List[str], List[str], int]:
1265 """
1266 Run cephadm on the remote host with the given command + args
1267
1268 Important: You probably don't want to run _run_cephadm from CLI handlers
1269
1270 :env_vars: in format -> [KEY=VALUE, ..]
1271 """
1272
1273 await self.mgr.ssh._remote_connection(host, addr)
1274
1275 self.log.debug(f"_run_cephadm : command = {command}")
1276 self.log.debug(f"_run_cephadm : args = {args}")
1277
1278 bypass_image = ('agent')
1279
1280 assert image or entity
1281 # Skip the image check for daemons deployed that are not ceph containers
1282 if not str(entity).startswith(bypass_image):
1283 if not image and entity is not cephadmNoImage:
1284 image = self.mgr._get_container_image(entity)
1285
1286 final_args = []
1287
1288 # global args
1289 if env_vars:
1290 for env_var_pair in env_vars:
1291 final_args.extend(['--env', env_var_pair])
1292
1293 if image:
1294 final_args.extend(['--image', image])
1295
1296 if not self.mgr.container_init:
1297 final_args += ['--no-container-init']
1298
1299 # subcommand
1300 final_args.append(command)
1301
1302 # subcommand args
1303 if not no_fsid:
1304 final_args += ['--fsid', self.mgr._cluster_fsid]
1305
1306 final_args += args
1307
1308 # exec
1309 self.log.debug('args: %s' % (' '.join(final_args)))
1310 if self.mgr.mode == 'root':
1311 # agent has cephadm binary as an extra file which is
1312 # therefore passed over stdin. Even for debug logs it's too much
1313 if stdin and 'agent' not in str(entity):
1314 self.log.debug('stdin: %s' % stdin)
1315
1316 cmd = ['which', 'python3']
1317 python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
1318 cmd = [python, self.mgr.cephadm_binary_path] + final_args
1319
1320 try:
1321 out, err, code = await self.mgr.ssh._execute_command(
1322 host, cmd, stdin=stdin, addr=addr)
1323 if code == 2:
1324 ls_cmd = ['ls', self.mgr.cephadm_binary_path]
1325 out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr)
1326 if code_ls == 2:
1327 await self._deploy_cephadm_binary(host, addr)
1328 out, err, code = await self.mgr.ssh._execute_command(
1329 host, cmd, stdin=stdin, addr=addr)
1330 # if there is an agent on this host, make sure it is using the most recent
1331 # vesion of cephadm binary
1332 if host in self.mgr.inventory:
1333 for agent in self.mgr.cache.get_daemons_by_type('agent', host):
1334 self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
1335
1336 except Exception as e:
1337 await self.mgr.ssh._reset_con(host)
1338 if error_ok:
1339 return [], [str(e)], 1
1340 raise
1341
1342 elif self.mgr.mode == 'cephadm-package':
1343 try:
1344 cmd = ['/usr/bin/cephadm'] + final_args
1345 out, err, code = await self.mgr.ssh._execute_command(
1346 host, cmd, stdin=stdin, addr=addr)
1347 except Exception as e:
1348 await self.mgr.ssh._reset_con(host)
1349 if error_ok:
1350 return [], [str(e)], 1
1351 raise
1352 else:
1353 assert False, 'unsupported mode'
1354
1355 self.log.debug(f'code: {code}')
1356 if out:
1357 self.log.debug(f'out: {out}')
1358 if err:
1359 self.log.debug(f'err: {err}')
1360 if code and not error_ok:
1361 raise OrchestratorError(
1362 f'cephadm exited with an error code: {code}, stderr: {err}')
1363 return [out], [err], code
1364
1365 async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
1366 # pick a random host...
1367 host = None
1368 for host_name in self.mgr.inventory.keys():
1369 host = host_name
1370 break
1371 if not host:
1372 raise OrchestratorError('no hosts defined')
1373 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
1374 await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials'))))
1375
1376 pullargs: List[str] = []
1377 if self.mgr.registry_insecure:
1378 pullargs.append("--insecure")
1379
1380 j = await self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
1381
1382 r = ContainerInspectInfo(
1383 j['image_id'],
1384 j.get('ceph_version'),
1385 j.get('repo_digests')
1386 )
1387 self.log.debug(f'image {image_name} -> {r}')
1388 return r
1389
1390 # function responsible for logging single host into custom registry
1391 async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]:
1392 self.log.debug(
1393 f"Attempting to log host {host} into custom registry @ {registry_json['url']}")
1394 # want to pass info over stdin rather than through normal list of args
1395 out, err, code = await self._run_cephadm(
1396 host, 'mon', 'registry-login',
1397 ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True)
1398 if code:
1399 return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password"
1400 return None
1401
1402 async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
1403 # Use tee (from coreutils) to create a copy of cephadm on the target machine
1404 self.log.info(f"Deploying cephadm binary to {host}")
1405 await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
1406 self.mgr._cephadm.encode('utf-8'), addr=addr)