]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/serve.py
import 15.2.9
[ceph.git] / ceph / src / pybind / mgr / cephadm / serve.py
CommitLineData
f91f0fd5
TL
1import datetime
2import json
3import logging
4from collections import defaultdict
adb31ebb 5from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Union, Any
f91f0fd5
TL
6
7try:
8 import remoto
9except ImportError:
10 remoto = None
11
12from ceph.deployment import inventory
13from ceph.deployment.drive_group import DriveGroupSpec
14from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec
adb31ebb 15from ceph.utils import str_to_datetime, datetime_now
f91f0fd5
TL
16
17import orchestrator
18from cephadm.schedule import HostAssignment
19from cephadm.upgrade import CEPH_UPGRADE_ORDER
adb31ebb 20from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, CephadmNoImage
f91f0fd5
TL
21from orchestrator import OrchestratorError
22
23if TYPE_CHECKING:
24 from cephadm.module import CephadmOrchestrator, ContainerInspectInfo
25
26logger = logging.getLogger(__name__)
27
28CEPH_TYPES = set(CEPH_UPGRADE_ORDER)
29
30
31class CephadmServe:
32 """
33 This module contains functions that are executed in the
34 serve() thread. Thus they don't block the CLI.
35
36 On the other hand, These function should *not* be called form
37 CLI handlers, to avoid blocking the CLI
38 """
39
40 def __init__(self, mgr: "CephadmOrchestrator"):
41 self.mgr: "CephadmOrchestrator" = mgr
42 self.log = logger
43
44 def serve(self) -> None:
45 """
46 The main loop of cephadm.
47
48 A command handler will typically change the declarative state
49 of cephadm. This loop will then attempt to apply this new state.
50 """
51 self.log.debug("serve starting")
52 while self.mgr.run:
53
54 try:
55
56 self.convert_tags_to_repo_digest()
57
58 # refresh daemons
59 self.log.debug('refreshing hosts and daemons')
60 self._refresh_hosts_and_daemons()
61
62 self._check_for_strays()
63
64 self._update_paused_health()
65
66 if not self.mgr.paused:
adb31ebb 67 self.mgr.to_remove_osds.process_removal_queue()
f91f0fd5
TL
68
69 self.mgr.migration.migrate()
70 if self.mgr.migration.is_migration_ongoing():
71 continue
72
73 if self._apply_all_services():
74 continue # did something, refresh
75
76 self._check_daemons()
77
78 if self.mgr.upgrade.continue_upgrade():
79 continue
80
81 except OrchestratorError as e:
82 if e.event_subject:
83 self.mgr.events.from_orch_error(e)
84
85 self._serve_sleep()
86 self.log.debug("serve exit")
87
adb31ebb 88 def _serve_sleep(self) -> None:
f91f0fd5
TL
89 sleep_interval = 600
90 self.log.debug('Sleeping for %d seconds', sleep_interval)
91 ret = self.mgr.event.wait(sleep_interval)
92 self.mgr.event.clear()
93
adb31ebb 94 def _update_paused_health(self) -> None:
f91f0fd5
TL
95 if self.mgr.paused:
96 self.mgr.health_checks['CEPHADM_PAUSED'] = {
97 'severity': 'warning',
98 'summary': 'cephadm background work is paused',
99 'count': 1,
100 'detail': ["'ceph orch resume' to resume"],
101 }
102 self.mgr.set_health_checks(self.mgr.health_checks)
103 else:
104 if 'CEPHADM_PAUSED' in self.mgr.health_checks:
105 del self.mgr.health_checks['CEPHADM_PAUSED']
106 self.mgr.set_health_checks(self.mgr.health_checks)
107
108 def _refresh_hosts_and_daemons(self) -> None:
109 bad_hosts = []
110 failures = []
111
112 @forall_hosts
adb31ebb
TL
113 def refresh(host: str) -> None:
114
f91f0fd5
TL
115 if self.mgr.cache.host_needs_check(host):
116 r = self._check_host(host)
117 if r is not None:
118 bad_hosts.append(r)
119 if self.mgr.cache.host_needs_daemon_refresh(host):
120 self.log.debug('refreshing %s daemons' % host)
121 r = self._refresh_host_daemons(host)
122 if r:
123 failures.append(r)
124
125 if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url:
126 self.log.debug(f"Logging `{host}` into custom registry")
127 r = self.mgr._registry_login(host, self.mgr.registry_url,
128 self.mgr.registry_username, self.mgr.registry_password)
129 if r:
130 bad_hosts.append(r)
131
132 if self.mgr.cache.host_needs_device_refresh(host):
133 self.log.debug('refreshing %s devices' % host)
134 r = self._refresh_host_devices(host)
135 if r:
136 failures.append(r)
137
adb31ebb
TL
138 if self.mgr.cache.host_needs_facts_refresh(host):
139 self.log.info(('refreshing %s facts' % host))
140 r = self._refresh_facts(host)
141 if r:
142 failures.append(r)
143
f91f0fd5
TL
144 if self.mgr.cache.host_needs_osdspec_preview_refresh(host):
145 self.log.debug(f"refreshing OSDSpec previews for {host}")
146 r = self._refresh_host_osdspec_previews(host)
147 if r:
148 failures.append(r)
149
150 if self.mgr.cache.host_needs_new_etc_ceph_ceph_conf(host):
151 self.log.debug(f"deploying new /etc/ceph/ceph.conf on `{host}`")
152 r = self._deploy_etc_ceph_ceph_conf(host)
153 if r:
154 bad_hosts.append(r)
155
156 refresh(self.mgr.cache.get_hosts())
157
158 health_changed = False
adb31ebb
TL
159 for k in [
160 'CEPHADM_HOST_CHECK_FAILED',
161 'CEPHADM_FAILED_DAEMON',
162 'CEPHADM_REFRESH_FAILED',
163 ]:
164 if k in self.mgr.health_checks:
165 del self.mgr.health_checks[k]
166 health_changed = True
f91f0fd5
TL
167 if bad_hosts:
168 self.mgr.health_checks['CEPHADM_HOST_CHECK_FAILED'] = {
169 'severity': 'warning',
170 'summary': '%d hosts fail cephadm check' % len(bad_hosts),
171 'count': len(bad_hosts),
172 'detail': bad_hosts,
173 }
174 health_changed = True
175 if failures:
176 self.mgr.health_checks['CEPHADM_REFRESH_FAILED'] = {
177 'severity': 'warning',
178 'summary': 'failed to probe daemons or devices',
179 'count': len(failures),
180 'detail': failures,
181 }
182 health_changed = True
adb31ebb
TL
183 failed_daemons = []
184 for dd in self.mgr.cache.get_daemons():
185 if dd.status < 0:
186 failed_daemons.append('daemon %s on %s is in %s state' % (
187 dd.name(), dd.hostname, dd.status_desc
188 ))
189 if failed_daemons:
190 self.mgr.health_checks['CEPHADM_FAILED_DAEMON'] = {
191 'severity': 'warning',
192 'summary': '%d failed cephadm daemon(s)' % len(failed_daemons),
193 'count': len(failed_daemons),
194 'detail': failed_daemons,
195 }
f91f0fd5
TL
196 health_changed = True
197 if health_changed:
198 self.mgr.set_health_checks(self.mgr.health_checks)
199
adb31ebb 200 def _check_host(self, host: str) -> Optional[str]:
f91f0fd5 201 if host not in self.mgr.inventory:
adb31ebb 202 return None
f91f0fd5
TL
203 self.log.debug(' checking %s' % host)
204 try:
205 out, err, code = self.mgr._run_cephadm(
206 host, cephadmNoImage, 'check-host', [],
207 error_ok=True, no_fsid=True)
208 self.mgr.cache.update_last_host_check(host)
209 self.mgr.cache.save_host(host)
210 if code:
211 self.log.debug(' host %s failed check' % host)
212 if self.mgr.warn_on_failed_host_check:
213 return 'host %s failed check: %s' % (host, err)
214 else:
215 self.log.debug(' host %s ok' % host)
216 except Exception as e:
217 self.log.debug(' host %s failed check' % host)
218 return 'host %s failed check: %s' % (host, e)
adb31ebb 219 return None
f91f0fd5 220
adb31ebb 221 def _refresh_host_daemons(self, host: str) -> Optional[str]:
f91f0fd5 222 try:
adb31ebb
TL
223 ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True)
224 except OrchestratorError as e:
225 return str(e)
f91f0fd5
TL
226 dm = {}
227 for d in ls:
228 if not d['style'].startswith('cephadm'):
229 continue
230 if d['fsid'] != self.mgr._cluster_fsid:
231 continue
232 if '.' not in d['name']:
233 continue
234 sd = orchestrator.DaemonDescription()
adb31ebb 235 sd.last_refresh = datetime_now()
f91f0fd5
TL
236 for k in ['created', 'started', 'last_configured', 'last_deployed']:
237 v = d.get(k, None)
238 if v:
239 setattr(sd, k, str_to_datetime(d[k]))
240 sd.daemon_type = d['name'].split('.')[0]
241 sd.daemon_id = '.'.join(d['name'].split('.')[1:])
242 sd.hostname = host
243 sd.container_id = d.get('container_id')
244 if sd.container_id:
245 # shorten the hash
246 sd.container_id = sd.container_id[0:12]
247 sd.container_image_name = d.get('container_image_name')
248 sd.container_image_id = d.get('container_image_id')
249 sd.version = d.get('version')
250 if sd.daemon_type == 'osd':
251 sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id)
252 if 'state' in d:
253 sd.status_desc = d['state']
254 sd.status = {
255 'running': 1,
256 'stopped': 0,
257 'error': -1,
258 'unknown': -1,
259 }[d['state']]
260 else:
261 sd.status_desc = 'unknown'
262 sd.status = None
263 dm[sd.name()] = sd
264 self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm)))
265 self.mgr.cache.update_host_daemons(host, dm)
266 self.mgr.cache.save_host(host)
267 return None
268
adb31ebb 269 def _refresh_facts(self, host: str) -> Optional[str]:
f91f0fd5 270 try:
adb31ebb
TL
271 val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True)
272 except OrchestratorError as e:
273 return str(e)
274
275 self.mgr.cache.update_host_facts(host, val)
276
277 return None
278
279 def _refresh_host_devices(self, host: str) -> Optional[str]:
f91f0fd5 280 try:
adb31ebb
TL
281 try:
282 devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
283 ['--', 'inventory', '--format=json', '--filter-for-batch'])
284 except OrchestratorError as e:
285 if 'unrecognized arguments: --filter-for-batch' in str(e):
286 devices = self._run_cephadm_json(host, 'osd', 'ceph-volume',
287 ['--', 'inventory', '--format=json'])
288 else:
289 raise
290
291 networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True)
292 except OrchestratorError as e:
293 return str(e)
294
f91f0fd5
TL
295 self.log.debug('Refreshed host %s devices (%d) networks (%s)' % (
296 host, len(devices), len(networks)))
adb31ebb
TL
297 ret = inventory.Devices.from_json(devices)
298 self.mgr.cache.update_host_devices_networks(host, ret.devices, networks)
f91f0fd5
TL
299 self.update_osdspec_previews(host)
300 self.mgr.cache.save_host(host)
301 return None
302
adb31ebb 303 def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]:
f91f0fd5
TL
304 self.update_osdspec_previews(host)
305 self.mgr.cache.save_host(host)
306 self.log.debug(f'Refreshed OSDSpec previews for host <{host}>')
adb31ebb 307 return None
f91f0fd5 308
adb31ebb 309 def update_osdspec_previews(self, search_host: str = '') -> None:
f91f0fd5
TL
310 # Set global 'pending' flag for host
311 self.mgr.cache.loading_osdspec_preview.add(search_host)
312 previews = []
313 # query OSDSpecs for host <search host> and generate/get the preview
314 # There can be multiple previews for one host due to multiple OSDSpecs.
315 previews.extend(self.mgr.osd_service.get_previews(search_host))
316 self.log.debug(f"Loading OSDSpec previews to HostCache")
317 self.mgr.cache.osdspec_previews[search_host] = previews
318 # Unset global 'pending' flag for host
319 self.mgr.cache.loading_osdspec_preview.remove(search_host)
320
321 def _deploy_etc_ceph_ceph_conf(self, host: str) -> Optional[str]:
322 config = self.mgr.get_minimal_ceph_conf()
323
324 try:
325 with self.mgr._remote_connection(host) as tpl:
326 conn, connr = tpl
327 out, err, code = remoto.process.check(
328 conn,
329 ['mkdir', '-p', '/etc/ceph'])
330 if code:
331 return f'failed to create /etc/ceph on {host}: {err}'
332 out, err, code = remoto.process.check(
333 conn,
334 ['dd', 'of=/etc/ceph/ceph.conf'],
335 stdin=config.encode('utf-8')
336 )
337 if code:
338 return f'failed to create /etc/ceph/ceph.conf on {host}: {err}'
339 self.mgr.cache.update_last_etc_ceph_ceph_conf(host)
340 self.mgr.cache.save_host(host)
341 except OrchestratorError as e:
342 return f'failed to create /etc/ceph/ceph.conf on {host}: {str(e)}'
343 return None
344
345 def _check_for_strays(self) -> None:
346 self.log.debug('_check_for_strays')
347 for k in ['CEPHADM_STRAY_HOST',
348 'CEPHADM_STRAY_DAEMON']:
349 if k in self.mgr.health_checks:
350 del self.mgr.health_checks[k]
351 if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons:
352 ls = self.mgr.list_servers()
353 managed = self.mgr.cache.get_daemon_names()
354 host_detail = [] # type: List[str]
355 host_num_daemons = 0
356 daemon_detail = [] # type: List[str]
357 for item in ls:
358 host = item.get('hostname')
359 daemons = item.get('services') # misnomer!
360 missing_names = []
361 for s in daemons:
362 name = '%s.%s' % (s.get('type'), s.get('id'))
363 if s.get('type') == 'rbd-mirror':
364 defaults = defaultdict(lambda: None, {'id': None})
365 metadata = self.mgr.get_metadata(
366 "rbd-mirror", s.get('id'), default=defaults)
367 if metadata['id']:
368 name = '%s.%s' % (s.get('type'), metadata['id'])
369 else:
370 self.log.debug(
371 "Failed to find daemon id for rbd-mirror service %s" % (s.get('id')))
372
373 if host not in self.mgr.inventory:
374 missing_names.append(name)
375 host_num_daemons += 1
376 if name not in managed:
377 daemon_detail.append(
378 'stray daemon %s on host %s not managed by cephadm' % (name, host))
379 if missing_names:
380 host_detail.append(
381 'stray host %s has %d stray daemons: %s' % (
382 host, len(missing_names), missing_names))
383 if self.mgr.warn_on_stray_hosts and host_detail:
384 self.mgr.health_checks['CEPHADM_STRAY_HOST'] = {
385 'severity': 'warning',
386 'summary': '%d stray host(s) with %s daemon(s) '
387 'not managed by cephadm' % (
388 len(host_detail), host_num_daemons),
389 'count': len(host_detail),
390 'detail': host_detail,
391 }
392 if self.mgr.warn_on_stray_daemons and daemon_detail:
393 self.mgr.health_checks['CEPHADM_STRAY_DAEMON'] = {
394 'severity': 'warning',
adb31ebb 395 'summary': '%d stray daemon(s) not managed by cephadm' % (
f91f0fd5
TL
396 len(daemon_detail)),
397 'count': len(daemon_detail),
398 'detail': daemon_detail,
399 }
400 self.mgr.set_health_checks(self.mgr.health_checks)
401
402 def _apply_all_services(self) -> bool:
403 r = False
404 specs = [] # type: List[ServiceSpec]
405 for sn, spec in self.mgr.spec_store.specs.items():
406 specs.append(spec)
407 for spec in specs:
408 try:
409 if self._apply_service(spec):
410 r = True
411 except Exception as e:
412 self.log.exception('Failed to apply %s spec %s: %s' % (
413 spec.service_name(), spec, e))
414 self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e))
415
416 return r
417
adb31ebb 418 def _config_fn(self, service_type: str) -> Optional[Callable[[ServiceSpec], None]]:
f91f0fd5
TL
419 fn = {
420 'mds': self.mgr.mds_service.config,
421 'rgw': self.mgr.rgw_service.config,
422 'nfs': self.mgr.nfs_service.config,
423 'iscsi': self.mgr.iscsi_service.config,
424 }.get(service_type)
425 return cast(Callable[[ServiceSpec], None], fn)
426
427 def _apply_service(self, spec: ServiceSpec) -> bool:
428 """
429 Schedule a service. Deploy new daemons or remove old ones, depending
430 on the target label and count specified in the placement.
431 """
432 self.mgr.migration.verify_no_migration()
433
434 daemon_type = spec.service_type
435 service_name = spec.service_name()
436 if spec.unmanaged:
437 self.log.debug('Skipping unmanaged service %s' % service_name)
438 return False
439 if spec.preview_only:
440 self.log.debug('Skipping preview_only service %s' % service_name)
441 return False
442 self.log.debug('Applying service %s spec' % service_name)
443
444 config_func = self._config_fn(daemon_type)
445
446 if daemon_type == 'osd':
447 self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec))
448 # TODO: return True would result in a busy loop
449 # can't know if daemon count changed; create_from_spec doesn't
450 # return a solid indication
451 return False
452
453 daemons = self.mgr.cache.get_daemons_by_service(service_name)
454
455 public_network = None
456 if daemon_type == 'mon':
457 ret, out, err = self.mgr.check_mon_command({
458 'prefix': 'config get',
459 'who': 'mon',
460 'key': 'public_network',
461 })
462 if '/' in out:
463 public_network = out.strip()
464 self.log.debug('mon public_network is %s' % public_network)
465
466 def matches_network(host):
467 # type: (str) -> bool
468 if not public_network:
469 return False
470 # make sure we have 1 or more IPs for that network on that
471 # host
472 return len(self.mgr.cache.networks[host].get(public_network, [])) > 0
473
474 ha = HostAssignment(
475 spec=spec,
476 hosts=self.mgr._hosts_with_daemon_inventory(),
477 get_daemons_func=self.mgr.cache.get_daemons_by_service,
478 filter_new_host=matches_network if daemon_type == 'mon' else None,
479 )
480
481 hosts: List[HostPlacementSpec] = ha.place()
482 self.log.debug('Usable hosts: %s' % hosts)
483
484 r = None
485
486 # sanity check
487 if daemon_type in ['mon', 'mgr'] and len(hosts) < 1:
488 self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts)
489 return False
490
491 # add any?
492 did_config = False
493
494 add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts)
495 self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts)
496
497 remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts)
498 self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts)
499
500 for host, network, name in add_daemon_hosts:
501 daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons,
502 prefix=spec.service_id,
503 forcename=name)
504
505 if not did_config and config_func:
506 if daemon_type == 'rgw':
507 rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func)
508 rgw_config_func(cast(RGWSpec, spec), daemon_id)
509 else:
510 config_func(spec)
511 did_config = True
512
513 daemon_spec = self.mgr.cephadm_services[daemon_type].make_daemon_spec(
514 host, daemon_id, network, spec)
515 self.log.debug('Placing %s.%s on host %s' % (
516 daemon_type, daemon_id, host))
517
518 try:
519 daemon_spec = self.mgr.cephadm_services[daemon_type].prepare_create(daemon_spec)
520 self.mgr._create_daemon(daemon_spec)
521 r = True
522 except (RuntimeError, OrchestratorError) as e:
523 self.mgr.events.for_service(spec, 'ERROR',
524 f"Failed while placing {daemon_type}.{daemon_id}"
525 f"on {host}: {e}")
526 # only return "no change" if no one else has already succeeded.
527 # later successes will also change to True
528 if r is None:
529 r = False
530 continue
531
532 # add to daemon list so next name(s) will also be unique
533 sd = orchestrator.DaemonDescription(
534 hostname=host,
535 daemon_type=daemon_type,
536 daemon_id=daemon_id,
537 )
538 daemons.append(sd)
539
540 # remove any?
541 def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool:
542 daemon_ids = [d.daemon_id for d in remove_daemon_hosts]
543 r = self.mgr.cephadm_services[daemon_type].ok_to_stop(daemon_ids)
544 return not r.retval
545
546 while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts):
547 # let's find a subset that is ok-to-stop
548 remove_daemon_hosts.pop()
549 for d in remove_daemon_hosts:
550 r = True
551 # NOTE: we are passing the 'force' flag here, which means
552 # we can delete a mon instances data.
553 self.mgr._remove_daemon(d.name(), d.hostname)
554
555 if r is None:
556 r = False
557 return r
558
559 def _check_daemons(self) -> None:
560
561 daemons = self.mgr.cache.get_daemons()
562 daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list)
563 for dd in daemons:
564 # orphan?
565 spec = self.mgr.spec_store.specs.get(dd.service_name(), None)
566 if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']:
567 # (mon and mgr specs should always exist; osds aren't matched
568 # to a service spec)
569 self.log.info('Removing orphan daemon %s...' % dd.name())
570 self.mgr._remove_daemon(dd.name(), dd.hostname)
571
572 # ignore unmanaged services
573 if spec and spec.unmanaged:
574 continue
575
576 # These daemon types require additional configs after creation
577 if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']:
578 daemons_post[dd.daemon_type].append(dd)
579
580 if self.mgr.cephadm_services[dd.daemon_type].get_active_daemon(
581 self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id:
582 dd.is_active = True
583 else:
584 dd.is_active = False
585
586 deps = self.mgr._calc_daemon_deps(dd.daemon_type, dd.daemon_id)
587 last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps(
588 dd.hostname, dd.name())
589 if last_deps is None:
590 last_deps = []
591 action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name())
592 if not last_config:
593 self.log.info('Reconfiguring %s (unknown last config time)...' % (
594 dd.name()))
595 action = 'reconfig'
596 elif last_deps != deps:
597 self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps,
598 deps))
599 self.log.info('Reconfiguring %s (dependencies changed)...' % (
600 dd.name()))
601 action = 'reconfig'
602 elif self.mgr.last_monmap and \
603 self.mgr.last_monmap > last_config and \
604 dd.daemon_type in CEPH_TYPES:
605 self.log.info('Reconfiguring %s (monmap changed)...' % dd.name())
606 action = 'reconfig'
607 elif self.mgr.extra_ceph_conf_is_newer(last_config) and \
608 dd.daemon_type in CEPH_TYPES:
609 self.log.info('Reconfiguring %s (extra config changed)...' % dd.name())
610 action = 'reconfig'
611 if action:
612 if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \
613 and action == 'reconfig':
614 action = 'redeploy'
615 try:
616 self.mgr._daemon_action(
617 daemon_type=dd.daemon_type,
618 daemon_id=dd.daemon_id,
619 host=dd.hostname,
620 action=action
621 )
622 self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name())
623 except OrchestratorError as e:
624 self.mgr.events.from_orch_error(e)
625 if dd.daemon_type in daemons_post:
626 del daemons_post[dd.daemon_type]
627 # continue...
628 except Exception as e:
629 self.mgr.events.for_daemon_from_exception(dd.name(), e)
630 if dd.daemon_type in daemons_post:
631 del daemons_post[dd.daemon_type]
632 # continue...
633
634 # do daemon post actions
635 for daemon_type, daemon_descs in daemons_post.items():
636 if daemon_type in self.mgr.requires_post_actions:
637 self.mgr.requires_post_actions.remove(daemon_type)
638 self.mgr._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs)
639
adb31ebb 640 def convert_tags_to_repo_digest(self) -> None:
f91f0fd5
TL
641 if not self.mgr.use_repo_digest:
642 return
643 settings = self.mgr.upgrade.get_distinct_container_image_settings()
644 digests: Dict[str, ContainerInspectInfo] = {}
645 for container_image_ref in set(settings.values()):
646 if not is_repo_digest(container_image_ref):
647 image_info = self.mgr._get_container_image_info(container_image_ref)
648 if image_info.repo_digest:
649 assert is_repo_digest(image_info.repo_digest), image_info
650 digests[container_image_ref] = image_info
651
652 for entity, container_image_ref in settings.items():
653 if not is_repo_digest(container_image_ref):
654 image_info = digests[container_image_ref]
655 if image_info.repo_digest:
656 self.mgr.set_container_image(entity, image_info.repo_digest)
adb31ebb
TL
657
658 def _run_cephadm_json(self,
659 host: str,
660 entity: Union[CephadmNoImage, str],
661 command: str,
662 args: List[str],
663 no_fsid: Optional[bool] = False,
664 image: Optional[str] = "",
665 ) -> Any:
666 try:
667 out, err, code = self.mgr._run_cephadm(
668 host, entity, command, args, no_fsid=no_fsid, image=image)
669 if code:
670 raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}')
671 except Exception as e:
672 raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}')
673 try:
674 return json.loads(''.join(out))
675 except (ValueError, KeyError):
676 msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON'
677 self.log.exception(f'{msg}: {"".join(out)}')
678 raise OrchestratorError(msg)
679