]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/cephadm/serve.py
6ca94393ed2360b8cc58fb1a5cc4d705700352fe
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
1 import ipaddress
2 import hashlib
3 import json
4 import logging
5 import uuid
6 import os
7 from collections import defaultdict
8 from typing import TYPE_CHECKING, Optional, List, cast, Dict, Any, Union, Tuple, Set, \
9 DefaultDict
10
11 from ceph.deployment import inventory
12 from ceph.deployment.drive_group import DriveGroupSpec
13 from ceph.deployment.service_spec import ServiceSpec, CustomContainerSpec, PlacementSpec
14 from ceph.utils import datetime_now
15
16 import orchestrator
17 from orchestrator import OrchestratorError, set_exception_subject, OrchestratorEvent, \
18 DaemonDescriptionStatus, daemon_type_to_service
19 from cephadm.services.cephadmservice import CephadmDaemonDeploySpec
20 from cephadm.schedule import HostAssignment
21 from cephadm.autotune import MemoryAutotuner
22 from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, \
23 CephadmNoImage, CEPH_TYPES, ContainerInspectInfo
24 from mgr_module import MonCommandFailed
25 from mgr_util import format_bytes
26
27 from . import utils
28
29 if TYPE_CHECKING:
30 from cephadm.module import CephadmOrchestrator
31
32 logger = logging.getLogger(__name__)
33
34 REQUIRES_POST_ACTIONS = ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'rgw']
35
36
37 class CephadmServe:
38 """
39 This module contains functions that are executed in the
40 serve() thread. Thus they don't block the CLI.
41
42 Please see the `Note regarding network calls from CLI handlers`
43 chapter in the cephadm developer guide.
44
45 On the other hand, These function should *not* be called form
46 CLI handlers, to avoid blocking the CLI
47 """
48
49 def __init__(self, mgr: "CephadmOrchestrator"):
50 self.mgr: "CephadmOrchestrator" = mgr
51 self.log = logger
52
53 def serve(self) -> None:
54 """
55 The main loop of cephadm.
56
57 A command handler will typically change the declarative state
58 of cephadm. This loop will then attempt to apply this new state.
59 """
60 self.log.debug("serve starting")
61 self.mgr.config_checker.load_network_config()
62
63 while self.mgr.run:
64 self.log.debug("serve loop start")
65
66 try:
67
68 self.convert_tags_to_repo_digest()
69
70 # refresh daemons
71 self.log.debug('refreshing hosts and daemons')
72 self._refresh_hosts_and_daemons()
73
74 self._check_for_strays()
75
76 self._update_paused_health()
77
78 if self.mgr.need_connect_dashboard_rgw and self.mgr.config_dashboard:
79 self.mgr.need_connect_dashboard_rgw = False
80 if 'dashboard' in self.mgr.get('mgr_map')['modules']:
81 self.log.info('Checking dashboard <-> RGW credentials')
82 self.mgr.remote('dashboard', 'set_rgw_credentials')
83
84 if not self.mgr.paused:
85 self.mgr.to_remove_osds.process_removal_queue()
86
87 self.mgr.migration.migrate()
88 if self.mgr.migration.is_migration_ongoing():
89 continue
90
91 if self._apply_all_services():
92 continue # did something, refresh
93
94 self._check_daemons()
95
96 self._purge_deleted_services()
97
98 self._check_for_moved_osds()
99
100 if self.mgr.agent_helpers._handle_use_agent_setting():
101 continue
102
103 if self.mgr.upgrade.continue_upgrade():
104 continue
105
106 except OrchestratorError as e:
107 if e.event_subject:
108 self.mgr.events.from_orch_error(e)
109
110 self.log.debug("serve loop sleep")
111 self._serve_sleep()
112 self.log.debug("serve loop wake")
113 self.log.debug("serve exit")
114
115 def _serve_sleep(self) -> None:
116 sleep_interval = max(
117 30,
118 min(
119 self.mgr.host_check_interval,
120 self.mgr.facts_cache_timeout,
121 self.mgr.daemon_cache_timeout,
122 self.mgr.device_cache_timeout,
123 )
124 )
125 self.log.debug('Sleeping for %d seconds', sleep_interval)
126 self.mgr.event.wait(sleep_interval)
127 self.mgr.event.clear()
128
129 def _update_paused_health(self) -> None:
130 self.log.debug('_update_paused_health')
131 if self.mgr.paused:
132 self.mgr.set_health_warning('CEPHADM_PAUSED', 'cephadm background work is paused', 1, [
133 "'ceph orch resume' to resume"])
134 else:
135 self.mgr.remove_health_warning('CEPHADM_PAUSED')
136
137 def _autotune_host_memory(self, host: str) -> None:
138 total_mem = self.mgr.cache.get_facts(host).get('memory_total_kb', 0)
139 if not total_mem:
140 val = None
141 else:
142 total_mem *= 1024 # kb -> bytes
143 total_mem *= self.mgr.autotune_memory_target_ratio
144 a = MemoryAutotuner(
145 daemons=self.mgr.cache.get_daemons_by_host(host),
146 config_get=self.mgr.get_foreign_ceph_option,
147 total_mem=total_mem,
148 )
149 val, osds = a.tune()
150 any_changed = False
151 for o in osds:
152 if self.mgr.get_foreign_ceph_option(o, 'osd_memory_target') != val:
153 self.mgr.check_mon_command({
154 'prefix': 'config rm',
155 'who': o,
156 'name': 'osd_memory_target',
157 })
158 any_changed = True
159 if val is not None:
160 if any_changed:
161 self.mgr.log.info(
162 f'Adjusting osd_memory_target on {host} to {format_bytes(val, 6)}'
163 )
164 ret, out, err = self.mgr.mon_command({
165 'prefix': 'config set',
166 'who': f'osd/host:{host.split(".")[0]}',
167 'name': 'osd_memory_target',
168 'value': str(val),
169 })
170 if ret:
171 self.log.warning(
172 f'Unable to set osd_memory_target on {host} to {val}: {err}'
173 )
174 else:
175 # if osd memory autotuning is off, we don't want to remove these config
176 # options as users may be using them. Since there is no way to set autotuning
177 # on/off at a host level, best we can do is check if it is globally on.
178 if self.mgr.get_foreign_ceph_option('osd', 'osd_memory_target_autotune'):
179 self.mgr.check_mon_command({
180 'prefix': 'config rm',
181 'who': f'osd/host:{host.split(".")[0]}',
182 'name': 'osd_memory_target',
183 })
184 self.mgr.cache.update_autotune(host)
185
186 def _refresh_hosts_and_daemons(self) -> None:
187 self.log.debug('_refresh_hosts_and_daemons')
188 bad_hosts = []
189 failures = []
190
191 if self.mgr.manage_etc_ceph_ceph_conf or self.mgr.keys.keys:
192 client_files = self._calc_client_files()
193 else:
194 client_files = {}
195
196 agents_down: List[str] = []
197
198 @forall_hosts
199 def refresh(host: str) -> None:
200
201 # skip hosts that are in maintenance - they could be powered off
202 if self.mgr.inventory._inventory[host].get("status", "").lower() == "maintenance":
203 return
204
205 if self.mgr.use_agent:
206 if self.mgr.agent_helpers._check_agent(host):
207 agents_down.append(host)
208
209 if self.mgr.cache.host_needs_check(host):
210 r = self._check_host(host)
211 if r is not None:
212 bad_hosts.append(r)
213
214 if (
215 not self.mgr.use_agent
216 or host not in [h.hostname for h in self.mgr.cache.get_non_draining_hosts()]
217 or host in agents_down
218 ):
219 if self.mgr.cache.host_needs_daemon_refresh(host):
220 self.log.debug('refreshing %s daemons' % host)
221 r = self._refresh_host_daemons(host)
222 if r:
223 failures.append(r)
224
225 if self.mgr.cache.host_needs_facts_refresh(host):
226 self.log.debug(('Refreshing %s facts' % host))
227 r = self._refresh_facts(host)
228 if r:
229 failures.append(r)
230
231 if self.mgr.cache.host_needs_network_refresh(host):
232 self.log.debug(('Refreshing %s networks' % host))
233 r = self._refresh_host_networks(host)
234 if r:
235 failures.append(r)
236
237 if self.mgr.cache.host_needs_device_refresh(host):
238 self.log.debug('refreshing %s devices' % host)
239 r = self._refresh_host_devices(host)
240 if r:
241 failures.append(r)
242 self.mgr.cache.metadata_up_to_date[host] = True
243 elif not self.mgr.cache.get_daemons_by_type('agent', host=host):
244 if self.mgr.cache.host_needs_daemon_refresh(host):
245 self.log.debug('refreshing %s daemons' % host)
246 r = self._refresh_host_daemons(host)
247 if r:
248 failures.append(r)
249 self.mgr.cache.metadata_up_to_date[host] = True
250
251 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.get_store('registry_credentials'):
252 self.log.debug(f"Logging `{host}` into custom registry")
253 r = self.mgr.wait_async(self._registry_login(
254 host, json.loads(str(self.mgr.get_store('registry_credentials')))))
255 if r:
256 bad_hosts.append(r)
257
258 if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
259 self.log.debug(f"refreshing OSDSpec previews for {host}")
260 r = self._refresh_host_osdspec_previews(host)
261 if r:
262 failures.append(r)
263
264 if (
265 self.mgr.cache.host_needs_autotune_memory(host)
266 and not self.mgr.inventory.has_label(host, '_no_autotune_memory')
267 ):
268 self.log.debug(f"autotuning memory for {host}")
269 self._autotune_host_memory(host)
270
271 self._write_client_files(client_files, host)
272
273 refresh(self.mgr.cache.get_hosts())
274
275 self.mgr.agent_helpers._update_agent_down_healthcheck(agents_down)
276
277 self.mgr.config_checker.run_checks()
278
279 for k in [
280 'CEPHADM_HOST_CHECK_FAILED',
281 'CEPHADM_REFRESH_FAILED',
282 ]:
283 self.mgr.remove_health_warning(k)
284 if bad_hosts:
285 self.mgr.set_health_warning(
286 'CEPHADM_HOST_CHECK_FAILED', f'{len(bad_hosts)} hosts fail cephadm check', len(bad_hosts), bad_hosts)
287 if failures:
288 self.mgr.set_health_warning(
289 'CEPHADM_REFRESH_FAILED', 'failed to probe daemons or devices', len(failures), failures)
290 self.mgr.update_failed_daemon_health_check()
291
292 def _check_host(self, host: str) -> Optional[str]:
293 if host not in self.mgr.inventory:
294 return None
295 self.log.debug(' checking %s' % host)
296 try:
297 addr = self.mgr.inventory.get_addr(host) if host in self.mgr.inventory else host
298 out, err, code = self.mgr.wait_async(self._run_cephadm(
299 host, cephadmNoImage, 'check-host', [],
300 error_ok=True, no_fsid=True))
301 self.mgr.cache.update_last_host_check(host)
302 self.mgr.cache.save_host(host)
303 if code:
304 self.log.debug(' host %s (%s) failed check' % (host, addr))
305 if self.mgr.warn_on_failed_host_check:
306 return 'host %s (%s) failed check: %s' % (host, addr, err)
307 else:
308 self.log.debug(' host %s (%s) ok' % (host, addr))
309 except Exception as e:
310 self.log.debug(' host %s (%s) failed check' % (host, addr))
311 return 'host %s (%s) failed check: %s' % (host, addr, e)
312 return None
313
314 def _refresh_host_daemons(self, host: str) -> Optional[str]:
315 try:
316 ls = self.mgr.wait_async(self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True))
317 except OrchestratorError as e:
318 return str(e)
319 self.mgr._process_ls_output(host, ls)
320 return None
321
322 def _refresh_facts(self, host: str) -> Optional[str]:
323 try:
324 val = self.mgr.wait_async(self._run_cephadm_json(
325 host, cephadmNoImage, 'gather-facts', [], no_fsid=True))
326 except OrchestratorError as e:
327 return str(e)
328
329 self.mgr.cache.update_host_facts(host, val)
330
331 return None
332
333 def _refresh_host_devices(self, host: str) -> Optional[str]:
334 with_lsm = self.mgr.device_enhanced_scan
335 inventory_args = ['--', 'inventory',
336 '--format=json-pretty',
337 '--filter-for-batch']
338 if with_lsm:
339 inventory_args.insert(-1, "--with-lsm")
340
341 try:
342 try:
343 devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
344 inventory_args))
345 except OrchestratorError as e:
346 if 'unrecognized arguments: --filter-for-batch' in str(e):
347 rerun_args = inventory_args.copy()
348 rerun_args.remove('--filter-for-batch')
349 devices = self.mgr.wait_async(self._run_cephadm_json(host, 'osd', 'ceph-volume',
350 rerun_args))
351 else:
352 raise
353
354 except OrchestratorError as e:
355 return str(e)
356
357 self.log.debug('Refreshed host %s devices (%d)' % (
358 host, len(devices)))
359 ret = inventory.Devices.from_json(devices)
360 self.mgr.cache.update_host_devices(host, ret.devices)
361 self.update_osdspec_previews(host)
362 self.mgr.cache.save_host(host)
363 return None
364
365 def _refresh_host_networks(self, host: str) -> Optional[str]:
366 try:
367 networks = self.mgr.wait_async(self._run_cephadm_json(
368 host, 'mon', 'list-networks', [], no_fsid=True))
369 except OrchestratorError as e:
370 return str(e)
371
372 self.log.debug('Refreshed host %s networks (%s)' % (
373 host, len(networks)))
374 self.mgr.cache.update_host_networks(host, networks)
375 self.mgr.cache.save_host(host)
376 return None
377
378 def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
379 self.update_osdspec_previews(host)
380 self.mgr.cache.save_host(host)
381 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
382 return None
383
384 def update_osdspec_previews(self, search_host: str = '') -> None:
385 # Set global 'pending' flag for host
386 self.mgr.cache.loading_osdspec_preview.add(search_host)
387 previews = []
388 # query OSDSpecs for host <search host> and generate/get the preview
389 # There can be multiple previews for one host due to multiple OSDSpecs.
390 previews.extend(self.mgr.osd_service.get_previews(search_host))
391 self.log.debug(f'Loading OSDSpec previews to HostCache for host <{search_host}>')
392 self.mgr.cache.osdspec_previews[search_host] = previews
393 # Unset global 'pending' flag for host
394 self.mgr.cache.loading_osdspec_preview.remove(search_host)
395
396 def _check_for_strays(self) -> None:
397 self.log.debug('_check_for_strays')
398 for k in ['CEPHADM_STRAY_HOST',
399 'CEPHADM_STRAY_DAEMON']:
400 self.mgr.remove_health_warning(k)
401 if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
402 ls = self.mgr.list_servers()
403 self.log.debug(ls)
404 managed = self.mgr.cache.get_daemon_names()
405 host_detail = [] # type: List[str]
406 host_num_daemons = 0
407 daemon_detail = [] # type: List[str]
408 for item in ls:
409 host = item.get('hostname')
410 assert isinstance(host, str)
411 daemons = item.get('services') # misnomer!
412 assert isinstance(daemons, list)
413 missing_names = []
414 for s in daemons:
415 daemon_id = s.get('id')
416 assert daemon_id
417 name = '%s.%s' % (s.get('type'), daemon_id)
418 if s.get('type') in ['rbd-mirror', 'cephfs-mirror', 'rgw', 'rgw-nfs']:
419 metadata = self.mgr.get_metadata(
420 cast(str, s.get('type')), daemon_id, {})
421 assert metadata is not None
422 try:
423 if s.get('type') == 'rgw-nfs':
424 # https://tracker.ceph.com/issues/49573
425 name = metadata['id'][:-4]
426 else:
427 name = '%s.%s' % (s.get('type'), metadata['id'])
428 except (KeyError, TypeError):
429 self.log.debug(
430 "Failed to find daemon id for %s service %s" % (
431 s.get('type'), s.get('id')
432 )
433 )
434 if s.get('type') == 'tcmu-runner':
435 # because we don't track tcmu-runner daemons in the host cache
436 # and don't have a way to check if the daemon is part of iscsi service
437 # we assume that all tcmu-runner daemons are managed by cephadm
438 managed.append(name)
439 if host not in self.mgr.inventory:
440 missing_names.append(name)
441 host_num_daemons += 1
442 if name not in managed:
443 daemon_detail.append(
444 'stray daemon %s on host %s not managed by cephadm' % (name, host))
445 if missing_names:
446 host_detail.append(
447 'stray host %s has %d stray daemons: %s' % (
448 host, len(missing_names), missing_names))
449 if self.mgr.warn_on_stray_hosts and host_detail:
450 self.mgr.set_health_warning(
451 'CEPHADM_STRAY_HOST', f'{len(host_detail)} stray host(s) with {host_num_daemons} daemon(s) not managed by cephadm', len(host_detail), host_detail)
452 if self.mgr.warn_on_stray_daemons and daemon_detail:
453 self.mgr.set_health_warning(
454 'CEPHADM_STRAY_DAEMON', f'{len(daemon_detail)} stray daemon(s) not managed by cephadm', len(daemon_detail), daemon_detail)
455
456 def _check_for_moved_osds(self) -> None:
457 self.log.debug('_check_for_moved_osds')
458 all_osds: DefaultDict[int, List[orchestrator.DaemonDescription]] = defaultdict(list)
459 for dd in self.mgr.cache.get_daemons_by_type('osd'):
460 assert dd.daemon_id
461 all_osds[int(dd.daemon_id)].append(dd)
462 for osd_id, dds in all_osds.items():
463 if len(dds) <= 1:
464 continue
465 running = [dd for dd in dds if dd.status == DaemonDescriptionStatus.running]
466 error = [dd for dd in dds if dd.status == DaemonDescriptionStatus.error]
467 msg = f'Found duplicate OSDs: {", ".join(str(dd) for dd in dds)}'
468 logger.info(msg)
469 if len(running) != 1:
470 continue
471 osd = self.mgr.get_osd_by_id(osd_id)
472 if not osd or not osd['up']:
473 continue
474 for e in error:
475 assert e.hostname
476 try:
477 self._remove_daemon(e.name(), e.hostname, no_post_remove=True)
478 self.mgr.events.for_daemon(
479 e.name(), 'INFO', f"Removed duplicated daemon on host '{e.hostname}'")
480 except OrchestratorError as ex:
481 self.mgr.events.from_orch_error(ex)
482 logger.exception(f'failed to remove duplicated daemon {e}')
483
484 def _apply_all_services(self) -> bool:
485 self.log.debug('_apply_all_services')
486 r = False
487 specs = [] # type: List[ServiceSpec]
488 # if metadata is not up to date, we still need to apply spec for agent
489 # since the agent is the one who gather the metadata. If we don't we
490 # end up stuck between wanting metadata to be up to date to apply specs
491 # and needing to apply the agent spec to get up to date metadata
492 if self.mgr.use_agent and not self.mgr.cache.all_host_metadata_up_to_date():
493 self.log.info('Metadata not up to date on all hosts. Skipping non agent specs')
494 try:
495 specs.append(self.mgr.spec_store['agent'].spec)
496 except Exception as e:
497 self.log.debug(f'Failed to find agent spec: {e}')
498 self.mgr.agent_helpers._apply_agent()
499 return r
500 else:
501 for sn, spec in self.mgr.spec_store.active_specs.items():
502 specs.append(spec)
503 for name in ['CEPHADM_APPLY_SPEC_FAIL', 'CEPHADM_DAEMON_PLACE_FAIL']:
504 self.mgr.remove_health_warning(name)
505 self.mgr.apply_spec_fails = []
506 for spec in specs:
507 try:
508 if self._apply_service(spec):
509 r = True
510 except Exception as e:
511 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
512 self.log.exception(msg)
513 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
514 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
515 warnings = []
516 for x in self.mgr.apply_spec_fails:
517 warnings.append(f'{x[0]}: {x[1]}')
518 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
519 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
520 len(self.mgr.apply_spec_fails),
521 warnings)
522 self.mgr.update_watched_hosts()
523 self.mgr.tuned_profile_utils._write_all_tuned_profiles()
524 return r
525
526 def _apply_service_config(self, spec: ServiceSpec) -> None:
527 if spec.config:
528 section = utils.name_to_config_section(spec.service_name())
529 for name in ['CEPHADM_INVALID_CONFIG_OPTION', 'CEPHADM_FAILED_SET_OPTION']:
530 self.mgr.remove_health_warning(name)
531 invalid_config_options = []
532 options_failed_to_set = []
533 for k, v in spec.config.items():
534 try:
535 current = self.mgr.get_foreign_ceph_option(section, k)
536 except KeyError:
537 msg = f'Ignoring invalid {spec.service_name()} config option {k}'
538 self.log.warning(msg)
539 self.mgr.events.for_service(
540 spec, OrchestratorEvent.ERROR, f'Invalid config option {k}'
541 )
542 invalid_config_options.append(msg)
543 continue
544 if current != v:
545 self.log.debug(f'setting [{section}] {k} = {v}')
546 try:
547 self.mgr.check_mon_command({
548 'prefix': 'config set',
549 'name': k,
550 'value': str(v),
551 'who': section,
552 })
553 except MonCommandFailed as e:
554 msg = f'Failed to set {spec.service_name()} option {k}: {e}'
555 self.log.warning(msg)
556 options_failed_to_set.append(msg)
557
558 if invalid_config_options:
559 self.mgr.set_health_warning('CEPHADM_INVALID_CONFIG_OPTION', f'Ignoring {len(invalid_config_options)} invalid config option(s)', len(
560 invalid_config_options), invalid_config_options)
561 if options_failed_to_set:
562 self.mgr.set_health_warning('CEPHADM_FAILED_SET_OPTION', f'Failed to set {len(options_failed_to_set)} option(s)', len(
563 options_failed_to_set), options_failed_to_set)
564
565 def _apply_service(self, spec: ServiceSpec) -> bool:
566 """
567 Schedule a service. Deploy new daemons or remove old ones, depending
568 on the target label and count specified in the placement.
569 """
570 self.mgr.migration.verify_no_migration()
571
572 service_type = spec.service_type
573 service_name = spec.service_name()
574 if spec.unmanaged:
575 self.log.debug('Skipping unmanaged service %s' % service_name)
576 return False
577 if spec.preview_only:
578 self.log.debug('Skipping preview_only service %s' % service_name)
579 return False
580 self.log.debug('Applying service %s spec' % service_name)
581
582 if service_type == 'agent':
583 try:
584 assert self.mgr.cherrypy_thread
585 assert self.mgr.cherrypy_thread.ssl_certs.get_root_cert()
586 except Exception:
587 self.log.info(
588 'Delaying applying agent spec until cephadm endpoint root cert created')
589 return False
590
591 self._apply_service_config(spec)
592
593 if service_type == 'osd':
594 self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
595 # TODO: return True would result in a busy loop
596 # can't know if daemon count changed; create_from_spec doesn't
597 # return a solid indication
598 return False
599
600 svc = self.mgr.cephadm_services[service_type]
601 daemons = self.mgr.cache.get_daemons_by_service(service_name)
602
603 public_networks: List[str] = []
604 if service_type == 'mon':
605 out = str(self.mgr.get_foreign_ceph_option('mon', 'public_network'))
606 if '/' in out:
607 public_networks = [x.strip() for x in out.split(',')]
608 self.log.debug('mon public_network(s) is %s' % public_networks)
609
610 def matches_network(host):
611 # type: (str) -> bool
612 # make sure the host has at least one network that belongs to some configured public network(s)
613 for pn in public_networks:
614 public_network = ipaddress.ip_network(pn)
615 for hn in self.mgr.cache.networks[host]:
616 host_network = ipaddress.ip_network(hn)
617 if host_network.overlaps(public_network):
618 return True
619
620 host_networks = ','.join(self.mgr.cache.networks[host])
621 pub_networks = ','.join(public_networks)
622 self.log.info(
623 f"Filtered out host {host}: does not belong to mon public_network(s): "
624 f" {pub_networks}, host network(s): {host_networks}"
625 )
626 return False
627
628 rank_map = None
629 if svc.ranked():
630 rank_map = self.mgr.spec_store[spec.service_name()].rank_map or {}
631 ha = HostAssignment(
632 spec=spec,
633 hosts=self.mgr.cache.get_non_draining_hosts() if spec.service_name(
634 ) == 'agent' else self.mgr.cache.get_schedulable_hosts(),
635 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
636 draining_hosts=self.mgr.cache.get_draining_hosts(),
637 daemons=daemons,
638 networks=self.mgr.cache.networks,
639 filter_new_host=(
640 matches_network if service_type == 'mon'
641 else None
642 ),
643 allow_colo=svc.allow_colo(),
644 primary_daemon_type=svc.primary_daemon_type(),
645 per_host_daemon_type=svc.per_host_daemon_type(),
646 rank_map=rank_map,
647 )
648
649 try:
650 all_slots, slots_to_add, daemons_to_remove = ha.place()
651 daemons_to_remove = [d for d in daemons_to_remove if (d.hostname and self.mgr.inventory._inventory[d.hostname].get(
652 'status', '').lower() not in ['maintenance', 'offline'] and d.hostname not in self.mgr.offline_hosts)]
653 self.log.debug('Add %s, remove %s' % (slots_to_add, daemons_to_remove))
654 except OrchestratorError as e:
655 msg = f'Failed to apply {spec.service_name()} spec {spec}: {str(e)}'
656 self.log.error(msg)
657 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
658 self.mgr.apply_spec_fails.append((spec.service_name(), str(e)))
659 warnings = []
660 for x in self.mgr.apply_spec_fails:
661 warnings.append(f'{x[0]}: {x[1]}')
662 self.mgr.set_health_warning('CEPHADM_APPLY_SPEC_FAIL',
663 f"Failed to apply {len(self.mgr.apply_spec_fails)} service(s): {','.join(x[0] for x in self.mgr.apply_spec_fails)}",
664 len(self.mgr.apply_spec_fails),
665 warnings)
666 return False
667
668 r = None
669
670 # sanity check
671 final_count = len(daemons) + len(slots_to_add) - len(daemons_to_remove)
672 if service_type in ['mon', 'mgr'] and final_count < 1:
673 self.log.debug('cannot scale mon|mgr below 1)')
674 return False
675
676 # progress
677 progress_id = str(uuid.uuid4())
678 delta: List[str] = []
679 if slots_to_add:
680 delta += [f'+{len(slots_to_add)}']
681 if daemons_to_remove:
682 delta += [f'-{len(daemons_to_remove)}']
683 progress_title = f'Updating {spec.service_name()} deployment ({" ".join(delta)} -> {len(all_slots)})'
684 progress_total = len(slots_to_add) + len(daemons_to_remove)
685 progress_done = 0
686
687 def update_progress() -> None:
688 self.mgr.remote(
689 'progress', 'update', progress_id,
690 ev_msg=progress_title,
691 ev_progress=(progress_done / progress_total),
692 add_to_ceph_s=True,
693 )
694
695 if progress_total:
696 update_progress()
697
698 # add any?
699 did_config = False
700
701 self.log.debug('Hosts that will receive new daemons: %s' % slots_to_add)
702 self.log.debug('Daemons that will be removed: %s' % daemons_to_remove)
703
704 hosts_altered: Set[str] = set()
705
706 try:
707 # assign names
708 for i in range(len(slots_to_add)):
709 slot = slots_to_add[i]
710 slot = slot.assign_name(self.mgr.get_unique_name(
711 slot.daemon_type,
712 slot.hostname,
713 [d for d in daemons if d not in daemons_to_remove],
714 prefix=spec.service_id,
715 forcename=slot.name,
716 rank=slot.rank,
717 rank_generation=slot.rank_generation,
718 ))
719 slots_to_add[i] = slot
720 if rank_map is not None:
721 assert slot.rank is not None
722 assert slot.rank_generation is not None
723 assert rank_map[slot.rank][slot.rank_generation] is None
724 rank_map[slot.rank][slot.rank_generation] = slot.name
725
726 if rank_map:
727 # record the rank_map before we make changes so that if we fail the
728 # next mgr will clean up.
729 self.mgr.spec_store.save_rank_map(spec.service_name(), rank_map)
730
731 # remove daemons now, since we are going to fence them anyway
732 for d in daemons_to_remove:
733 assert d.hostname is not None
734 self._remove_daemon(d.name(), d.hostname)
735 daemons_to_remove = []
736
737 # fence them
738 svc.fence_old_ranks(spec, rank_map, len(all_slots))
739
740 # create daemons
741 daemon_place_fails = []
742 for slot in slots_to_add:
743 # first remove daemon with conflicting port or name?
744 if slot.ports or slot.name in [d.name() for d in daemons_to_remove]:
745 for d in daemons_to_remove:
746 if (
747 d.hostname != slot.hostname
748 or not (set(d.ports or []) & set(slot.ports))
749 or (d.ip and slot.ip and d.ip != slot.ip)
750 and d.name() != slot.name
751 ):
752 continue
753 if d.name() != slot.name:
754 self.log.info(
755 f'Removing {d.name()} before deploying to {slot} to avoid a port or conflict'
756 )
757 # NOTE: we don't check ok-to-stop here to avoid starvation if
758 # there is only 1 gateway.
759 self._remove_daemon(d.name(), d.hostname)
760 daemons_to_remove.remove(d)
761 progress_done += 1
762 hosts_altered.add(d.hostname)
763 break
764
765 # deploy new daemon
766 daemon_id = slot.name
767 if not did_config:
768 svc.config(spec)
769 did_config = True
770
771 daemon_spec = svc.make_daemon_spec(
772 slot.hostname, daemon_id, slot.network, spec,
773 daemon_type=slot.daemon_type,
774 ports=slot.ports,
775 ip=slot.ip,
776 rank=slot.rank,
777 rank_generation=slot.rank_generation,
778 )
779 self.log.debug('Placing %s.%s on host %s' % (
780 slot.daemon_type, daemon_id, slot.hostname))
781
782 try:
783 daemon_spec = svc.prepare_create(daemon_spec)
784 self.mgr.wait_async(self._create_daemon(daemon_spec))
785 r = True
786 progress_done += 1
787 update_progress()
788 hosts_altered.add(daemon_spec.host)
789 except (RuntimeError, OrchestratorError) as e:
790 msg = (f"Failed while placing {slot.daemon_type}.{daemon_id} "
791 f"on {slot.hostname}: {e}")
792 self.mgr.events.for_service(spec, 'ERROR', msg)
793 self.mgr.log.error(msg)
794 daemon_place_fails.append(msg)
795 # only return "no change" if no one else has already succeeded.
796 # later successes will also change to True
797 if r is None:
798 r = False
799 progress_done += 1
800 update_progress()
801 continue
802
803 # add to daemon list so next name(s) will also be unique
804 sd = orchestrator.DaemonDescription(
805 hostname=slot.hostname,
806 daemon_type=slot.daemon_type,
807 daemon_id=daemon_id,
808 )
809 daemons.append(sd)
810
811 if daemon_place_fails:
812 self.mgr.set_health_warning('CEPHADM_DAEMON_PLACE_FAIL', f'Failed to place {len(daemon_place_fails)} daemon(s)', len(
813 daemon_place_fails), daemon_place_fails)
814
815 if service_type == 'mgr':
816 active_mgr = svc.get_active_daemon(self.mgr.cache.get_daemons_by_type('mgr'))
817 if active_mgr.daemon_id in [d.daemon_id for d in daemons_to_remove]:
818 # We can't just remove the active mgr like any other daemon.
819 # Need to fail over later so it can be removed on next pass.
820 # This can be accomplished by scheduling a restart of the active mgr.
821 self.mgr._schedule_daemon_action(active_mgr.name(), 'restart')
822
823 # remove any?
824 def _ok_to_stop(remove_daemons: List[orchestrator.DaemonDescription]) -> bool:
825 daemon_ids = [d.daemon_id for d in remove_daemons]
826 assert None not in daemon_ids
827 # setting force flag retains previous behavior
828 r = svc.ok_to_stop(cast(List[str], daemon_ids), force=True)
829 return not r.retval
830
831 while daemons_to_remove and not _ok_to_stop(daemons_to_remove):
832 # let's find a subset that is ok-to-stop
833 daemons_to_remove.pop()
834 for d in daemons_to_remove:
835 r = True
836 assert d.hostname is not None
837 self._remove_daemon(d.name(), d.hostname)
838
839 progress_done += 1
840 update_progress()
841 hosts_altered.add(d.hostname)
842
843 self.mgr.remote('progress', 'complete', progress_id)
844 except Exception as e:
845 self.mgr.remote('progress', 'fail', progress_id, str(e))
846 raise
847 finally:
848 if self.mgr.use_agent:
849 # can only send ack to agents if we know for sure port they bound to
850 hosts_altered = set([h for h in hosts_altered if (h in self.mgr.agent_cache.agent_ports and h in [
851 h2.hostname for h2 in self.mgr.cache.get_non_draining_hosts()])])
852 self.mgr.agent_helpers._request_agent_acks(hosts_altered, increment=True)
853
854 if r is None:
855 r = False
856 return r
857
858 def _check_daemons(self) -> None:
859 self.log.debug('_check_daemons')
860 daemons = self.mgr.cache.get_daemons()
861 daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
862 for dd in daemons:
863 # orphan?
864 spec = self.mgr.spec_store.active_specs.get(dd.service_name(), None)
865 assert dd.hostname is not None
866 assert dd.daemon_type is not None
867 assert dd.daemon_id is not None
868 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
869 # (mon and mgr specs should always exist; osds aren't matched
870 # to a service spec)
871 self.log.info('Removing orphan daemon %s...' % dd.name())
872 self._remove_daemon(dd.name(), dd.hostname)
873
874 # ignore unmanaged services
875 if spec and spec.unmanaged:
876 continue
877
878 # ignore daemons for deleted services
879 if dd.service_name() in self.mgr.spec_store.spec_deleted:
880 continue
881
882 if dd.daemon_type == 'agent':
883 try:
884 self.mgr.agent_helpers._check_agent(dd.hostname)
885 except Exception as e:
886 self.log.debug(
887 f'Agent {dd.name()} could not be checked in _check_daemons: {e}')
888 continue
889
890 # These daemon types require additional configs after creation
891 if dd.daemon_type in REQUIRES_POST_ACTIONS:
892 daemons_post[dd.daemon_type].append(dd)
893
894 if self.mgr.cephadm_services[daemon_type_to_service(dd.daemon_type)].get_active_daemon(
895 self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
896 dd.is_active = True
897 else:
898 dd.is_active = False
899
900 deps = self.mgr._calc_daemon_deps(spec, dd.daemon_type, dd.daemon_id)
901 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
902 dd.hostname, dd.name())
903 if last_deps is None:
904 last_deps = []
905 action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
906 if not last_config:
907 self.log.info('Reconfiguring %s (unknown last config time)...' % (
908 dd.name()))
909 action = 'reconfig'
910 elif last_deps != deps:
911 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
912 deps))
913 self.log.info('Reconfiguring %s (dependencies changed)...' % (
914 dd.name()))
915 action = 'reconfig'
916 elif spec is not None and hasattr(spec, 'extra_container_args') and dd.extra_container_args != spec.extra_container_args:
917 self.log.debug(
918 f'{dd.name()} container cli args {dd.extra_container_args} -> {spec.extra_container_args}')
919 self.log.info(f'Redeploying {dd.name()}, (container cli args changed) . . .')
920 dd.extra_container_args = spec.extra_container_args
921 action = 'redeploy'
922 elif self.mgr.last_monmap and \
923 self.mgr.last_monmap > last_config and \
924 dd.daemon_type in CEPH_TYPES:
925 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
926 action = 'reconfig'
927 elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
928 dd.daemon_type in CEPH_TYPES:
929 self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
930 action = 'reconfig'
931 if action:
932 if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
933 and action == 'reconfig':
934 action = 'redeploy'
935 try:
936 daemon_spec = CephadmDaemonDeploySpec.from_daemon_description(dd)
937 self.mgr._daemon_action(daemon_spec, action=action)
938 if self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()):
939 self.mgr.cache.save_host(dd.hostname)
940 except OrchestratorError as e:
941 self.mgr.events.from_orch_error(e)
942 if dd.daemon_type in daemons_post:
943 del daemons_post[dd.daemon_type]
944 # continue...
945 except Exception as e:
946 self.mgr.events.for_daemon_from_exception(dd.name(), e)
947 if dd.daemon_type in daemons_post:
948 del daemons_post[dd.daemon_type]
949 # continue...
950
951 # do daemon post actions
952 for daemon_type, daemon_descs in daemons_post.items():
953 run_post = False
954 for d in daemon_descs:
955 if d.name() in self.mgr.requires_post_actions:
956 self.mgr.requires_post_actions.remove(d.name())
957 run_post = True
958 if run_post:
959 self.mgr._get_cephadm_service(daemon_type_to_service(
960 daemon_type)).daemon_check_post(daemon_descs)
961
962 def _purge_deleted_services(self) -> None:
963 self.log.debug('_purge_deleted_services')
964 existing_services = self.mgr.spec_store.all_specs.items()
965 for service_name, spec in list(existing_services):
966 if service_name not in self.mgr.spec_store.spec_deleted:
967 continue
968 if self.mgr.cache.get_daemons_by_service(service_name):
969 continue
970 if spec.service_type in ['mon', 'mgr']:
971 continue
972
973 logger.info(f'Purge service {service_name}')
974
975 self.mgr.cephadm_services[spec.service_type].purge(service_name)
976 self.mgr.spec_store.finally_rm(service_name)
977
978 def convert_tags_to_repo_digest(self) -> None:
979 if not self.mgr.use_repo_digest:
980 return
981 settings = self.mgr.upgrade.get_distinct_container_image_settings()
982 digests: Dict[str, ContainerInspectInfo] = {}
983 for container_image_ref in set(settings.values()):
984 if not is_repo_digest(container_image_ref):
985 image_info = self.mgr.wait_async(
986 self._get_container_image_info(container_image_ref))
987 if image_info.repo_digests:
988 # FIXME: we assume the first digest here is the best
989 assert is_repo_digest(image_info.repo_digests[0]), image_info
990 digests[container_image_ref] = image_info
991
992 for entity, container_image_ref in settings.items():
993 if not is_repo_digest(container_image_ref):
994 image_info = digests[container_image_ref]
995 if image_info.repo_digests:
996 # FIXME: we assume the first digest here is the best
997 self.mgr.set_container_image(entity, image_info.repo_digests[0])
998
999 def _calc_client_files(self) -> Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]]:
1000 # host -> path -> (mode, uid, gid, content, digest)
1001 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]] = {}
1002
1003 # ceph.conf
1004 config = self.mgr.get_minimal_ceph_conf().encode('utf-8')
1005 config_digest = ''.join('%02x' % c for c in hashlib.sha256(config).digest())
1006 cluster_cfg_dir = f'/var/lib/ceph/{self.mgr._cluster_fsid}/config'
1007
1008 if self.mgr.manage_etc_ceph_ceph_conf:
1009 try:
1010 pspec = PlacementSpec.from_string(self.mgr.manage_etc_ceph_ceph_conf_hosts)
1011 ha = HostAssignment(
1012 spec=ServiceSpec('mon', placement=pspec),
1013 hosts=self.mgr.cache.get_schedulable_hosts(),
1014 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
1015 draining_hosts=self.mgr.cache.get_draining_hosts(),
1016 daemons=[],
1017 networks=self.mgr.cache.networks,
1018 )
1019 all_slots, _, _ = ha.place()
1020 for host in {s.hostname for s in all_slots}:
1021 if host not in client_files:
1022 client_files[host] = {}
1023 ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest))
1024 client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf
1025 client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
1026 except Exception as e:
1027 self.mgr.log.warning(
1028 f'unable to calc conf hosts: {self.mgr.manage_etc_ceph_ceph_conf_hosts}: {e}')
1029
1030 # client keyrings
1031 for ks in self.mgr.keys.keys.values():
1032 try:
1033 ret, keyring, err = self.mgr.mon_command({
1034 'prefix': 'auth get',
1035 'entity': ks.entity,
1036 })
1037 if ret:
1038 self.log.warning(f'unable to fetch keyring for {ks.entity}')
1039 continue
1040 digest = ''.join('%02x' % c for c in hashlib.sha256(
1041 keyring.encode('utf-8')).digest())
1042 ha = HostAssignment(
1043 spec=ServiceSpec('mon', placement=ks.placement),
1044 hosts=self.mgr.cache.get_schedulable_hosts(),
1045 unreachable_hosts=self.mgr.cache.get_unreachable_hosts(),
1046 draining_hosts=self.mgr.cache.get_draining_hosts(),
1047 daemons=[],
1048 networks=self.mgr.cache.networks,
1049 )
1050 all_slots, _, _ = ha.place()
1051 for host in {s.hostname for s in all_slots}:
1052 if host not in client_files:
1053 client_files[host] = {}
1054 ceph_conf = (0o644, 0, 0, bytes(config), str(config_digest))
1055 client_files[host]['/etc/ceph/ceph.conf'] = ceph_conf
1056 client_files[host][f'{cluster_cfg_dir}/ceph.conf'] = ceph_conf
1057 ceph_admin_key = (ks.mode, ks.uid, ks.gid, keyring.encode('utf-8'), digest)
1058 client_files[host][ks.path] = ceph_admin_key
1059 client_files[host][f'{cluster_cfg_dir}/{os.path.basename(ks.path)}'] = ceph_admin_key
1060 except Exception as e:
1061 self.log.warning(
1062 f'unable to calc client keyring {ks.entity} placement {ks.placement}: {e}')
1063 return client_files
1064
1065 def _write_client_files(self,
1066 client_files: Dict[str, Dict[str, Tuple[int, int, int, bytes, str]]],
1067 host: str) -> None:
1068 updated_files = False
1069 if host in self.mgr.offline_hosts:
1070 return
1071 old_files = self.mgr.cache.get_host_client_files(host).copy()
1072 for path, m in client_files.get(host, {}).items():
1073 mode, uid, gid, content, digest = m
1074 if path in old_files:
1075 match = old_files[path] == (digest, mode, uid, gid)
1076 del old_files[path]
1077 if match:
1078 continue
1079 self.log.info(f'Updating {host}:{path}')
1080 self.mgr.ssh.write_remote_file(host, path, content, mode, uid, gid)
1081 self.mgr.cache.update_client_file(host, path, digest, mode, uid, gid)
1082 updated_files = True
1083 for path in old_files.keys():
1084 if path == '/etc/ceph/ceph.conf':
1085 continue
1086 self.log.info(f'Removing {host}:{path}')
1087 cmd = ['rm', '-f', path]
1088 self.mgr.ssh.check_execute_command(host, cmd)
1089 updated_files = True
1090 self.mgr.cache.removed_client_file(host, path)
1091 if updated_files:
1092 self.mgr.cache.save_host(host)
1093
1094 async def _create_daemon(self,
1095 daemon_spec: CephadmDaemonDeploySpec,
1096 reconfig: bool = False,
1097 osd_uuid_map: Optional[Dict[str, Any]] = None,
1098 ) -> str:
1099
1100 with set_exception_subject('service', orchestrator.DaemonDescription(
1101 daemon_type=daemon_spec.daemon_type,
1102 daemon_id=daemon_spec.daemon_id,
1103 hostname=daemon_spec.host,
1104 ).service_id(), overwrite=True):
1105
1106 try:
1107 image = ''
1108 start_time = datetime_now()
1109 ports: List[int] = daemon_spec.ports if daemon_spec.ports else []
1110
1111 if daemon_spec.daemon_type == 'container':
1112 spec = cast(CustomContainerSpec,
1113 self.mgr.spec_store[daemon_spec.service_name].spec)
1114 image = spec.image
1115 if spec.ports:
1116 ports.extend(spec.ports)
1117
1118 # TCP port to open in the host firewall
1119 if len(ports) > 0:
1120 daemon_spec.extra_args.extend([
1121 '--tcp-ports', ' '.join(map(str, ports))
1122 ])
1123
1124 # osd deployments needs an --osd-uuid arg
1125 if daemon_spec.daemon_type == 'osd':
1126 if not osd_uuid_map:
1127 osd_uuid_map = self.mgr.get_osd_uuid_map()
1128 osd_uuid = osd_uuid_map.get(daemon_spec.daemon_id)
1129 if not osd_uuid:
1130 raise OrchestratorError('osd.%s not in osdmap' % daemon_spec.daemon_id)
1131 daemon_spec.extra_args.extend(['--osd-fsid', osd_uuid])
1132
1133 if reconfig:
1134 daemon_spec.extra_args.append('--reconfig')
1135 if self.mgr.allow_ptrace:
1136 daemon_spec.extra_args.append('--allow-ptrace')
1137
1138 try:
1139 eca = daemon_spec.extra_container_args
1140 if eca:
1141 for a in eca:
1142 daemon_spec.extra_args.append(f'--extra-container-args={a}')
1143 except AttributeError:
1144 eca = None
1145
1146 if daemon_spec.service_name in self.mgr.spec_store:
1147 configs = self.mgr.spec_store[daemon_spec.service_name].spec.custom_configs
1148 if configs is not None:
1149 daemon_spec.final_config.update(
1150 {'custom_config_files': [c.to_json() for c in configs]})
1151
1152 if self.mgr.cache.host_needs_registry_login(daemon_spec.host) and self.mgr.registry_url:
1153 await self._registry_login(daemon_spec.host, json.loads(str(self.mgr.get_store('registry_credentials'))))
1154
1155 self.log.info('%s daemon %s on %s' % (
1156 'Reconfiguring' if reconfig else 'Deploying',
1157 daemon_spec.name(), daemon_spec.host))
1158
1159 out, err, code = await self._run_cephadm(
1160 daemon_spec.host, daemon_spec.name(), 'deploy',
1161 [
1162 '--name', daemon_spec.name(),
1163 '--meta-json', json.dumps({
1164 'service_name': daemon_spec.service_name,
1165 'ports': daemon_spec.ports,
1166 'ip': daemon_spec.ip,
1167 'deployed_by': self.mgr.get_active_mgr_digests(),
1168 'rank': daemon_spec.rank,
1169 'rank_generation': daemon_spec.rank_generation,
1170 'extra_container_args': eca
1171 }),
1172 '--config-json', '-',
1173 ] + daemon_spec.extra_args,
1174 stdin=json.dumps(daemon_spec.final_config),
1175 image=image,
1176 )
1177
1178 if daemon_spec.daemon_type == 'agent':
1179 self.mgr.agent_cache.agent_timestamp[daemon_spec.host] = datetime_now()
1180 self.mgr.agent_cache.agent_counter[daemon_spec.host] = 1
1181
1182 # refresh daemon state? (ceph daemon reconfig does not need it)
1183 if not reconfig or daemon_spec.daemon_type not in CEPH_TYPES:
1184 if not code and daemon_spec.host in self.mgr.cache.daemons:
1185 # prime cached service state with what we (should have)
1186 # just created
1187 sd = daemon_spec.to_daemon_description(
1188 DaemonDescriptionStatus.starting, 'starting')
1189 self.mgr.cache.add_daemon(daemon_spec.host, sd)
1190 if daemon_spec.daemon_type in REQUIRES_POST_ACTIONS:
1191 self.mgr.requires_post_actions.add(daemon_spec.name())
1192 self.mgr.cache.invalidate_host_daemons(daemon_spec.host)
1193
1194 if daemon_spec.daemon_type != 'agent':
1195 self.mgr.cache.update_daemon_config_deps(
1196 daemon_spec.host, daemon_spec.name(), daemon_spec.deps, start_time)
1197 self.mgr.cache.save_host(daemon_spec.host)
1198 else:
1199 self.mgr.agent_cache.update_agent_config_deps(
1200 daemon_spec.host, daemon_spec.deps, start_time)
1201 self.mgr.agent_cache.save_agent(daemon_spec.host)
1202 msg = "{} {} on host '{}'".format(
1203 'Reconfigured' if reconfig else 'Deployed', daemon_spec.name(), daemon_spec.host)
1204 if not code:
1205 self.mgr.events.for_daemon(daemon_spec.name(), OrchestratorEvent.INFO, msg)
1206 else:
1207 what = 'reconfigure' if reconfig else 'deploy'
1208 self.mgr.events.for_daemon(
1209 daemon_spec.name(), OrchestratorEvent.ERROR, f'Failed to {what}: {err}')
1210 return msg
1211 except OrchestratorError:
1212 redeploy = daemon_spec.name() in self.mgr.cache.get_daemon_names()
1213 if not reconfig and not redeploy:
1214 # we have to clean up the daemon. E.g. keyrings.
1215 servict_type = daemon_type_to_service(daemon_spec.daemon_type)
1216 dd = daemon_spec.to_daemon_description(DaemonDescriptionStatus.error, 'failed')
1217 self.mgr.cephadm_services[servict_type].post_remove(dd, is_failed_deploy=True)
1218 raise
1219
1220 def _remove_daemon(self, name: str, host: str, no_post_remove: bool = False) -> str:
1221 """
1222 Remove a daemon
1223 """
1224 (daemon_type, daemon_id) = name.split('.', 1)
1225 daemon = orchestrator.DaemonDescription(
1226 daemon_type=daemon_type,
1227 daemon_id=daemon_id,
1228 hostname=host)
1229
1230 with set_exception_subject('service', daemon.service_id(), overwrite=True):
1231
1232 self.mgr.cephadm_services[daemon_type_to_service(daemon_type)].pre_remove(daemon)
1233 # NOTE: we are passing the 'force' flag here, which means
1234 # we can delete a mon instances data.
1235 dd = self.mgr.cache.get_daemon(daemon.daemon_name)
1236 if dd.ports:
1237 args = ['--name', name, '--force', '--tcp-ports', ' '.join(map(str, dd.ports))]
1238 else:
1239 args = ['--name', name, '--force']
1240
1241 self.log.info('Removing daemon %s from %s -- ports %s' % (name, host, dd.ports))
1242 out, err, code = self.mgr.wait_async(self._run_cephadm(
1243 host, name, 'rm-daemon', args))
1244 if not code:
1245 # remove item from cache
1246 self.mgr.cache.rm_daemon(host, name)
1247 self.mgr.cache.invalidate_host_daemons(host)
1248
1249 if not no_post_remove:
1250 self.mgr.cephadm_services[daemon_type_to_service(
1251 daemon_type)].post_remove(daemon, is_failed_deploy=False)
1252
1253 return "Removed {} from host '{}'".format(name, host)
1254
1255 async def _run_cephadm_json(self,
1256 host: str,
1257 entity: Union[CephadmNoImage, str],
1258 command: str,
1259 args: List[str],
1260 no_fsid: Optional[bool] = False,
1261 image: Optional[str] = "",
1262 ) -> Any:
1263 try:
1264 out, err, code = await self._run_cephadm(
1265 host, entity, command, args, no_fsid=no_fsid, image=image)
1266 if code:
1267 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
1268 except Exception as e:
1269 raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}')
1270 try:
1271 return json.loads(''.join(out))
1272 except (ValueError, KeyError):
1273 msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON'
1274 self.log.exception(f'{msg}: {"".join(out)}')
1275 raise OrchestratorError(msg)
1276
1277 async def _run_cephadm(self,
1278 host: str,
1279 entity: Union[CephadmNoImage, str],
1280 command: str,
1281 args: List[str],
1282 addr: Optional[str] = "",
1283 stdin: Optional[str] = "",
1284 no_fsid: Optional[bool] = False,
1285 error_ok: Optional[bool] = False,
1286 image: Optional[str] = "",
1287 env_vars: Optional[List[str]] = None,
1288 ) -> Tuple[List[str], List[str], int]:
1289 """
1290 Run cephadm on the remote host with the given command + args
1291
1292 Important: You probably don't want to run _run_cephadm from CLI handlers
1293
1294 :env_vars: in format -> [KEY=VALUE, ..]
1295 """
1296
1297 await self.mgr.ssh._remote_connection(host, addr)
1298
1299 self.log.debug(f"_run_cephadm : command = {command}")
1300 self.log.debug(f"_run_cephadm : args = {args}")
1301
1302 bypass_image = ('agent')
1303
1304 assert image or entity
1305 # Skip the image check for daemons deployed that are not ceph containers
1306 if not str(entity).startswith(bypass_image):
1307 if not image and entity is not cephadmNoImage:
1308 image = self.mgr._get_container_image(entity)
1309
1310 final_args = []
1311
1312 # global args
1313 if env_vars:
1314 for env_var_pair in env_vars:
1315 final_args.extend(['--env', env_var_pair])
1316
1317 if image:
1318 final_args.extend(['--image', image])
1319
1320 if not self.mgr.container_init:
1321 final_args += ['--no-container-init']
1322
1323 # subcommand
1324 final_args.append(command)
1325
1326 # subcommand args
1327 if not no_fsid:
1328 final_args += ['--fsid', self.mgr._cluster_fsid]
1329
1330 final_args += args
1331
1332 # exec
1333 self.log.debug('args: %s' % (' '.join(final_args)))
1334 if self.mgr.mode == 'root':
1335 # agent has cephadm binary as an extra file which is
1336 # therefore passed over stdin. Even for debug logs it's too much
1337 if stdin and 'agent' not in str(entity):
1338 self.log.debug('stdin: %s' % stdin)
1339
1340 cmd = ['which', 'python3']
1341 python = await self.mgr.ssh._check_execute_command(host, cmd, addr=addr)
1342 cmd = [python, self.mgr.cephadm_binary_path] + final_args
1343
1344 try:
1345 out, err, code = await self.mgr.ssh._execute_command(
1346 host, cmd, stdin=stdin, addr=addr)
1347 if code == 2:
1348 ls_cmd = ['ls', self.mgr.cephadm_binary_path]
1349 out_ls, err_ls, code_ls = await self.mgr.ssh._execute_command(host, ls_cmd, addr=addr)
1350 if code_ls == 2:
1351 await self._deploy_cephadm_binary(host, addr)
1352 out, err, code = await self.mgr.ssh._execute_command(
1353 host, cmd, stdin=stdin, addr=addr)
1354 # if there is an agent on this host, make sure it is using the most recent
1355 # vesion of cephadm binary
1356 if host in self.mgr.inventory:
1357 for agent in self.mgr.cache.get_daemons_by_type('agent', host):
1358 self.mgr._schedule_daemon_action(agent.name(), 'redeploy')
1359
1360 except Exception as e:
1361 await self.mgr.ssh._reset_con(host)
1362 if error_ok:
1363 return [], [str(e)], 1
1364 raise
1365
1366 elif self.mgr.mode == 'cephadm-package':
1367 try:
1368 cmd = ['/usr/bin/cephadm'] + final_args
1369 out, err, code = await self.mgr.ssh._execute_command(
1370 host, cmd, stdin=stdin, addr=addr)
1371 except Exception as e:
1372 await self.mgr.ssh._reset_con(host)
1373 if error_ok:
1374 return [], [str(e)], 1
1375 raise
1376 else:
1377 assert False, 'unsupported mode'
1378
1379 self.log.debug(f'code: {code}')
1380 if out:
1381 self.log.debug(f'out: {out}')
1382 if err:
1383 self.log.debug(f'err: {err}')
1384 if code and not error_ok:
1385 raise OrchestratorError(
1386 f'cephadm exited with an error code: {code}, stderr: {err}')
1387 return [out], [err], code
1388
1389 async def _get_container_image_info(self, image_name: str) -> ContainerInspectInfo:
1390 # pick a random host...
1391 host = None
1392 for host_name in self.mgr.inventory.keys():
1393 host = host_name
1394 break
1395 if not host:
1396 raise OrchestratorError('no hosts defined')
1397 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
1398 await self._registry_login(host, json.loads(str(self.mgr.get_store('registry_credentials'))))
1399
1400 pullargs: List[str] = []
1401 if self.mgr.registry_insecure:
1402 pullargs.append("--insecure")
1403
1404 j = await self._run_cephadm_json(host, '', 'pull', pullargs, image=image_name, no_fsid=True)
1405
1406 r = ContainerInspectInfo(
1407 j['image_id'],
1408 j.get('ceph_version'),
1409 j.get('repo_digests')
1410 )
1411 self.log.debug(f'image {image_name} -> {r}')
1412 return r
1413
1414 # function responsible for logging single host into custom registry
1415 async def _registry_login(self, host: str, registry_json: Dict[str, str]) -> Optional[str]:
1416 self.log.debug(
1417 f"Attempting to log host {host} into custom registry @ {registry_json['url']}")
1418 # want to pass info over stdin rather than through normal list of args
1419 out, err, code = await self._run_cephadm(
1420 host, 'mon', 'registry-login',
1421 ['--registry-json', '-'], stdin=json.dumps(registry_json), error_ok=True)
1422 if code:
1423 return f"Host {host} failed to login to {registry_json['url']} as {registry_json['username']} with given password"
1424 return None
1425
1426 async def _deploy_cephadm_binary(self, host: str, addr: Optional[str] = None) -> None:
1427 # Use tee (from coreutils) to create a copy of cephadm on the target machine
1428 self.log.info(f"Deploying cephadm binary to {host}")
1429 await self.mgr.ssh._write_remote_file(host, self.mgr.cephadm_binary_path,
1430 self.mgr._cephadm.encode('utf-8'), addr=addr)