]>
Commit | Line | Data |
---|---|---|
f91f0fd5 TL |
1 | import datetime |
2 | import json | |
3 | import logging | |
4 | from collections import defaultdict | |
adb31ebb | 5 | from typing import TYPE_CHECKING, Optional, List, Callable, cast, Set, Dict, Union, Any |
f91f0fd5 TL |
6 | |
7 | try: | |
8 | import remoto | |
9 | except ImportError: | |
10 | remoto = None | |
11 | ||
12 | from ceph.deployment import inventory | |
13 | from ceph.deployment.drive_group import DriveGroupSpec | |
14 | from ceph.deployment.service_spec import ServiceSpec, HostPlacementSpec, RGWSpec | |
adb31ebb | 15 | from ceph.utils import str_to_datetime, datetime_now |
f91f0fd5 TL |
16 | |
17 | import orchestrator | |
18 | from cephadm.schedule import HostAssignment | |
19 | from cephadm.upgrade import CEPH_UPGRADE_ORDER | |
adb31ebb | 20 | from cephadm.utils import forall_hosts, cephadmNoImage, is_repo_digest, CephadmNoImage |
f91f0fd5 TL |
21 | from orchestrator import OrchestratorError |
22 | ||
23 | if TYPE_CHECKING: | |
24 | from cephadm.module import CephadmOrchestrator, ContainerInspectInfo | |
25 | ||
26 | logger = logging.getLogger(__name__) | |
27 | ||
28 | CEPH_TYPES = set(CEPH_UPGRADE_ORDER) | |
29 | ||
30 | ||
31 | class CephadmServe: | |
32 | """ | |
33 | This module contains functions that are executed in the | |
34 | serve() thread. Thus they don't block the CLI. | |
35 | ||
36 | On the other hand, These function should *not* be called form | |
37 | CLI handlers, to avoid blocking the CLI | |
38 | """ | |
39 | ||
40 | def __init__(self, mgr: "CephadmOrchestrator"): | |
41 | self.mgr: "CephadmOrchestrator" = mgr | |
42 | self.log = logger | |
43 | ||
44 | def serve(self) -> None: | |
45 | """ | |
46 | The main loop of cephadm. | |
47 | ||
48 | A command handler will typically change the declarative state | |
49 | of cephadm. This loop will then attempt to apply this new state. | |
50 | """ | |
51 | self.log.debug("serve starting") | |
52 | while self.mgr.run: | |
53 | ||
54 | try: | |
55 | ||
56 | self.convert_tags_to_repo_digest() | |
57 | ||
58 | # refresh daemons | |
59 | self.log.debug('refreshing hosts and daemons') | |
60 | self._refresh_hosts_and_daemons() | |
61 | ||
62 | self._check_for_strays() | |
63 | ||
64 | self._update_paused_health() | |
65 | ||
66 | if not self.mgr.paused: | |
adb31ebb | 67 | self.mgr.to_remove_osds.process_removal_queue() |
f91f0fd5 TL |
68 | |
69 | self.mgr.migration.migrate() | |
70 | if self.mgr.migration.is_migration_ongoing(): | |
71 | continue | |
72 | ||
73 | if self._apply_all_services(): | |
74 | continue # did something, refresh | |
75 | ||
76 | self._check_daemons() | |
77 | ||
78 | if self.mgr.upgrade.continue_upgrade(): | |
79 | continue | |
80 | ||
81 | except OrchestratorError as e: | |
82 | if e.event_subject: | |
83 | self.mgr.events.from_orch_error(e) | |
84 | ||
85 | self._serve_sleep() | |
86 | self.log.debug("serve exit") | |
87 | ||
adb31ebb | 88 | def _serve_sleep(self) -> None: |
f91f0fd5 TL |
89 | sleep_interval = 600 |
90 | self.log.debug('Sleeping for %d seconds', sleep_interval) | |
91 | ret = self.mgr.event.wait(sleep_interval) | |
92 | self.mgr.event.clear() | |
93 | ||
adb31ebb | 94 | def _update_paused_health(self) -> None: |
f91f0fd5 TL |
95 | if self.mgr.paused: |
96 | self.mgr.health_checks['CEPHADM_PAUSED'] = { | |
97 | 'severity': 'warning', | |
98 | 'summary': 'cephadm background work is paused', | |
99 | 'count': 1, | |
100 | 'detail': ["'ceph orch resume' to resume"], | |
101 | } | |
102 | self.mgr.set_health_checks(self.mgr.health_checks) | |
103 | else: | |
104 | if 'CEPHADM_PAUSED' in self.mgr.health_checks: | |
105 | del self.mgr.health_checks['CEPHADM_PAUSED'] | |
106 | self.mgr.set_health_checks(self.mgr.health_checks) | |
107 | ||
108 | def _refresh_hosts_and_daemons(self) -> None: | |
109 | bad_hosts = [] | |
110 | failures = [] | |
111 | ||
112 | @forall_hosts | |
adb31ebb TL |
113 | def refresh(host: str) -> None: |
114 | ||
f91f0fd5 TL |
115 | if self.mgr.cache.host_needs_check(host): |
116 | r = self._check_host(host) | |
117 | if r is not None: | |
118 | bad_hosts.append(r) | |
119 | if self.mgr.cache.host_needs_daemon_refresh(host): | |
120 | self.log.debug('refreshing %s daemons' % host) | |
121 | r = self._refresh_host_daemons(host) | |
122 | if r: | |
123 | failures.append(r) | |
124 | ||
125 | if self.mgr.cache.host_needs_registry_login(host) and self.mgr.registry_url: | |
126 | self.log.debug(f"Logging `{host}` into custom registry") | |
127 | r = self.mgr._registry_login(host, self.mgr.registry_url, | |
128 | self.mgr.registry_username, self.mgr.registry_password) | |
129 | if r: | |
130 | bad_hosts.append(r) | |
131 | ||
132 | if self.mgr.cache.host_needs_device_refresh(host): | |
133 | self.log.debug('refreshing %s devices' % host) | |
134 | r = self._refresh_host_devices(host) | |
135 | if r: | |
136 | failures.append(r) | |
137 | ||
adb31ebb TL |
138 | if self.mgr.cache.host_needs_facts_refresh(host): |
139 | self.log.info(('refreshing %s facts' % host)) | |
140 | r = self._refresh_facts(host) | |
141 | if r: | |
142 | failures.append(r) | |
143 | ||
f91f0fd5 TL |
144 | if self.mgr.cache.host_needs_osdspec_preview_refresh(host): |
145 | self.log.debug(f"refreshing OSDSpec previews for {host}") | |
146 | r = self._refresh_host_osdspec_previews(host) | |
147 | if r: | |
148 | failures.append(r) | |
149 | ||
150 | if self.mgr.cache.host_needs_new_etc_ceph_ceph_conf(host): | |
151 | self.log.debug(f"deploying new /etc/ceph/ceph.conf on `{host}`") | |
152 | r = self._deploy_etc_ceph_ceph_conf(host) | |
153 | if r: | |
154 | bad_hosts.append(r) | |
155 | ||
156 | refresh(self.mgr.cache.get_hosts()) | |
157 | ||
158 | health_changed = False | |
adb31ebb TL |
159 | for k in [ |
160 | 'CEPHADM_HOST_CHECK_FAILED', | |
161 | 'CEPHADM_FAILED_DAEMON', | |
162 | 'CEPHADM_REFRESH_FAILED', | |
163 | ]: | |
164 | if k in self.mgr.health_checks: | |
165 | del self.mgr.health_checks[k] | |
166 | health_changed = True | |
f91f0fd5 TL |
167 | if bad_hosts: |
168 | self.mgr.health_checks['CEPHADM_HOST_CHECK_FAILED'] = { | |
169 | 'severity': 'warning', | |
170 | 'summary': '%d hosts fail cephadm check' % len(bad_hosts), | |
171 | 'count': len(bad_hosts), | |
172 | 'detail': bad_hosts, | |
173 | } | |
174 | health_changed = True | |
175 | if failures: | |
176 | self.mgr.health_checks['CEPHADM_REFRESH_FAILED'] = { | |
177 | 'severity': 'warning', | |
178 | 'summary': 'failed to probe daemons or devices', | |
179 | 'count': len(failures), | |
180 | 'detail': failures, | |
181 | } | |
182 | health_changed = True | |
adb31ebb TL |
183 | failed_daemons = [] |
184 | for dd in self.mgr.cache.get_daemons(): | |
185 | if dd.status < 0: | |
186 | failed_daemons.append('daemon %s on %s is in %s state' % ( | |
187 | dd.name(), dd.hostname, dd.status_desc | |
188 | )) | |
189 | if failed_daemons: | |
190 | self.mgr.health_checks['CEPHADM_FAILED_DAEMON'] = { | |
191 | 'severity': 'warning', | |
192 | 'summary': '%d failed cephadm daemon(s)' % len(failed_daemons), | |
193 | 'count': len(failed_daemons), | |
194 | 'detail': failed_daemons, | |
195 | } | |
f91f0fd5 TL |
196 | health_changed = True |
197 | if health_changed: | |
198 | self.mgr.set_health_checks(self.mgr.health_checks) | |
199 | ||
adb31ebb | 200 | def _check_host(self, host: str) -> Optional[str]: |
f91f0fd5 | 201 | if host not in self.mgr.inventory: |
adb31ebb | 202 | return None |
f91f0fd5 TL |
203 | self.log.debug(' checking %s' % host) |
204 | try: | |
205 | out, err, code = self.mgr._run_cephadm( | |
206 | host, cephadmNoImage, 'check-host', [], | |
207 | error_ok=True, no_fsid=True) | |
208 | self.mgr.cache.update_last_host_check(host) | |
209 | self.mgr.cache.save_host(host) | |
210 | if code: | |
211 | self.log.debug(' host %s failed check' % host) | |
212 | if self.mgr.warn_on_failed_host_check: | |
213 | return 'host %s failed check: %s' % (host, err) | |
214 | else: | |
215 | self.log.debug(' host %s ok' % host) | |
216 | except Exception as e: | |
217 | self.log.debug(' host %s failed check' % host) | |
218 | return 'host %s failed check: %s' % (host, e) | |
adb31ebb | 219 | return None |
f91f0fd5 | 220 | |
adb31ebb | 221 | def _refresh_host_daemons(self, host: str) -> Optional[str]: |
f91f0fd5 | 222 | try: |
adb31ebb TL |
223 | ls = self._run_cephadm_json(host, 'mon', 'ls', [], no_fsid=True) |
224 | except OrchestratorError as e: | |
225 | return str(e) | |
f91f0fd5 TL |
226 | dm = {} |
227 | for d in ls: | |
228 | if not d['style'].startswith('cephadm'): | |
229 | continue | |
230 | if d['fsid'] != self.mgr._cluster_fsid: | |
231 | continue | |
232 | if '.' not in d['name']: | |
233 | continue | |
234 | sd = orchestrator.DaemonDescription() | |
adb31ebb | 235 | sd.last_refresh = datetime_now() |
f91f0fd5 TL |
236 | for k in ['created', 'started', 'last_configured', 'last_deployed']: |
237 | v = d.get(k, None) | |
238 | if v: | |
239 | setattr(sd, k, str_to_datetime(d[k])) | |
240 | sd.daemon_type = d['name'].split('.')[0] | |
241 | sd.daemon_id = '.'.join(d['name'].split('.')[1:]) | |
242 | sd.hostname = host | |
243 | sd.container_id = d.get('container_id') | |
244 | if sd.container_id: | |
245 | # shorten the hash | |
246 | sd.container_id = sd.container_id[0:12] | |
247 | sd.container_image_name = d.get('container_image_name') | |
248 | sd.container_image_id = d.get('container_image_id') | |
249 | sd.version = d.get('version') | |
250 | if sd.daemon_type == 'osd': | |
251 | sd.osdspec_affinity = self.mgr.osd_service.get_osdspec_affinity(sd.daemon_id) | |
252 | if 'state' in d: | |
253 | sd.status_desc = d['state'] | |
254 | sd.status = { | |
255 | 'running': 1, | |
256 | 'stopped': 0, | |
257 | 'error': -1, | |
258 | 'unknown': -1, | |
259 | }[d['state']] | |
260 | else: | |
261 | sd.status_desc = 'unknown' | |
262 | sd.status = None | |
263 | dm[sd.name()] = sd | |
264 | self.log.debug('Refreshed host %s daemons (%d)' % (host, len(dm))) | |
265 | self.mgr.cache.update_host_daemons(host, dm) | |
266 | self.mgr.cache.save_host(host) | |
267 | return None | |
268 | ||
adb31ebb | 269 | def _refresh_facts(self, host: str) -> Optional[str]: |
f91f0fd5 | 270 | try: |
adb31ebb TL |
271 | val = self._run_cephadm_json(host, cephadmNoImage, 'gather-facts', [], no_fsid=True) |
272 | except OrchestratorError as e: | |
273 | return str(e) | |
274 | ||
275 | self.mgr.cache.update_host_facts(host, val) | |
276 | ||
277 | return None | |
278 | ||
279 | def _refresh_host_devices(self, host: str) -> Optional[str]: | |
f91f0fd5 | 280 | try: |
adb31ebb TL |
281 | try: |
282 | devices = self._run_cephadm_json(host, 'osd', 'ceph-volume', | |
283 | ['--', 'inventory', '--format=json', '--filter-for-batch']) | |
284 | except OrchestratorError as e: | |
285 | if 'unrecognized arguments: --filter-for-batch' in str(e): | |
286 | devices = self._run_cephadm_json(host, 'osd', 'ceph-volume', | |
287 | ['--', 'inventory', '--format=json']) | |
288 | else: | |
289 | raise | |
290 | ||
291 | networks = self._run_cephadm_json(host, 'mon', 'list-networks', [], no_fsid=True) | |
292 | except OrchestratorError as e: | |
293 | return str(e) | |
294 | ||
f91f0fd5 TL |
295 | self.log.debug('Refreshed host %s devices (%d) networks (%s)' % ( |
296 | host, len(devices), len(networks))) | |
adb31ebb TL |
297 | ret = inventory.Devices.from_json(devices) |
298 | self.mgr.cache.update_host_devices_networks(host, ret.devices, networks) | |
f91f0fd5 TL |
299 | self.update_osdspec_previews(host) |
300 | self.mgr.cache.save_host(host) | |
301 | return None | |
302 | ||
adb31ebb | 303 | def _refresh_host_osdspec_previews(self, host: str) -> Optional[str]: |
f91f0fd5 TL |
304 | self.update_osdspec_previews(host) |
305 | self.mgr.cache.save_host(host) | |
306 | self.log.debug(f'Refreshed OSDSpec previews for host <{host}>') | |
adb31ebb | 307 | return None |
f91f0fd5 | 308 | |
adb31ebb | 309 | def update_osdspec_previews(self, search_host: str = '') -> None: |
f91f0fd5 TL |
310 | # Set global 'pending' flag for host |
311 | self.mgr.cache.loading_osdspec_preview.add(search_host) | |
312 | previews = [] | |
313 | # query OSDSpecs for host <search host> and generate/get the preview | |
314 | # There can be multiple previews for one host due to multiple OSDSpecs. | |
315 | previews.extend(self.mgr.osd_service.get_previews(search_host)) | |
316 | self.log.debug(f"Loading OSDSpec previews to HostCache") | |
317 | self.mgr.cache.osdspec_previews[search_host] = previews | |
318 | # Unset global 'pending' flag for host | |
319 | self.mgr.cache.loading_osdspec_preview.remove(search_host) | |
320 | ||
321 | def _deploy_etc_ceph_ceph_conf(self, host: str) -> Optional[str]: | |
322 | config = self.mgr.get_minimal_ceph_conf() | |
323 | ||
324 | try: | |
325 | with self.mgr._remote_connection(host) as tpl: | |
326 | conn, connr = tpl | |
327 | out, err, code = remoto.process.check( | |
328 | conn, | |
329 | ['mkdir', '-p', '/etc/ceph']) | |
330 | if code: | |
331 | return f'failed to create /etc/ceph on {host}: {err}' | |
332 | out, err, code = remoto.process.check( | |
333 | conn, | |
334 | ['dd', 'of=/etc/ceph/ceph.conf'], | |
335 | stdin=config.encode('utf-8') | |
336 | ) | |
337 | if code: | |
338 | return f'failed to create /etc/ceph/ceph.conf on {host}: {err}' | |
339 | self.mgr.cache.update_last_etc_ceph_ceph_conf(host) | |
340 | self.mgr.cache.save_host(host) | |
341 | except OrchestratorError as e: | |
342 | return f'failed to create /etc/ceph/ceph.conf on {host}: {str(e)}' | |
343 | return None | |
344 | ||
345 | def _check_for_strays(self) -> None: | |
346 | self.log.debug('_check_for_strays') | |
347 | for k in ['CEPHADM_STRAY_HOST', | |
348 | 'CEPHADM_STRAY_DAEMON']: | |
349 | if k in self.mgr.health_checks: | |
350 | del self.mgr.health_checks[k] | |
351 | if self.mgr.warn_on_stray_hosts or self.mgr.warn_on_stray_daemons: | |
352 | ls = self.mgr.list_servers() | |
353 | managed = self.mgr.cache.get_daemon_names() | |
354 | host_detail = [] # type: List[str] | |
355 | host_num_daemons = 0 | |
356 | daemon_detail = [] # type: List[str] | |
357 | for item in ls: | |
358 | host = item.get('hostname') | |
359 | daemons = item.get('services') # misnomer! | |
360 | missing_names = [] | |
361 | for s in daemons: | |
362 | name = '%s.%s' % (s.get('type'), s.get('id')) | |
363 | if s.get('type') == 'rbd-mirror': | |
364 | defaults = defaultdict(lambda: None, {'id': None}) | |
365 | metadata = self.mgr.get_metadata( | |
366 | "rbd-mirror", s.get('id'), default=defaults) | |
367 | if metadata['id']: | |
368 | name = '%s.%s' % (s.get('type'), metadata['id']) | |
369 | else: | |
370 | self.log.debug( | |
371 | "Failed to find daemon id for rbd-mirror service %s" % (s.get('id'))) | |
372 | ||
373 | if host not in self.mgr.inventory: | |
374 | missing_names.append(name) | |
375 | host_num_daemons += 1 | |
376 | if name not in managed: | |
377 | daemon_detail.append( | |
378 | 'stray daemon %s on host %s not managed by cephadm' % (name, host)) | |
379 | if missing_names: | |
380 | host_detail.append( | |
381 | 'stray host %s has %d stray daemons: %s' % ( | |
382 | host, len(missing_names), missing_names)) | |
383 | if self.mgr.warn_on_stray_hosts and host_detail: | |
384 | self.mgr.health_checks['CEPHADM_STRAY_HOST'] = { | |
385 | 'severity': 'warning', | |
386 | 'summary': '%d stray host(s) with %s daemon(s) ' | |
387 | 'not managed by cephadm' % ( | |
388 | len(host_detail), host_num_daemons), | |
389 | 'count': len(host_detail), | |
390 | 'detail': host_detail, | |
391 | } | |
392 | if self.mgr.warn_on_stray_daemons and daemon_detail: | |
393 | self.mgr.health_checks['CEPHADM_STRAY_DAEMON'] = { | |
394 | 'severity': 'warning', | |
adb31ebb | 395 | 'summary': '%d stray daemon(s) not managed by cephadm' % ( |
f91f0fd5 TL |
396 | len(daemon_detail)), |
397 | 'count': len(daemon_detail), | |
398 | 'detail': daemon_detail, | |
399 | } | |
400 | self.mgr.set_health_checks(self.mgr.health_checks) | |
401 | ||
402 | def _apply_all_services(self) -> bool: | |
403 | r = False | |
404 | specs = [] # type: List[ServiceSpec] | |
405 | for sn, spec in self.mgr.spec_store.specs.items(): | |
406 | specs.append(spec) | |
407 | for spec in specs: | |
408 | try: | |
409 | if self._apply_service(spec): | |
410 | r = True | |
411 | except Exception as e: | |
412 | self.log.exception('Failed to apply %s spec %s: %s' % ( | |
413 | spec.service_name(), spec, e)) | |
414 | self.mgr.events.for_service(spec, 'ERROR', 'Failed to apply: ' + str(e)) | |
415 | ||
416 | return r | |
417 | ||
adb31ebb | 418 | def _config_fn(self, service_type: str) -> Optional[Callable[[ServiceSpec], None]]: |
f91f0fd5 TL |
419 | fn = { |
420 | 'mds': self.mgr.mds_service.config, | |
421 | 'rgw': self.mgr.rgw_service.config, | |
422 | 'nfs': self.mgr.nfs_service.config, | |
423 | 'iscsi': self.mgr.iscsi_service.config, | |
424 | }.get(service_type) | |
425 | return cast(Callable[[ServiceSpec], None], fn) | |
426 | ||
427 | def _apply_service(self, spec: ServiceSpec) -> bool: | |
428 | """ | |
429 | Schedule a service. Deploy new daemons or remove old ones, depending | |
430 | on the target label and count specified in the placement. | |
431 | """ | |
432 | self.mgr.migration.verify_no_migration() | |
433 | ||
434 | daemon_type = spec.service_type | |
435 | service_name = spec.service_name() | |
436 | if spec.unmanaged: | |
437 | self.log.debug('Skipping unmanaged service %s' % service_name) | |
438 | return False | |
439 | if spec.preview_only: | |
440 | self.log.debug('Skipping preview_only service %s' % service_name) | |
441 | return False | |
442 | self.log.debug('Applying service %s spec' % service_name) | |
443 | ||
444 | config_func = self._config_fn(daemon_type) | |
445 | ||
446 | if daemon_type == 'osd': | |
447 | self.mgr.osd_service.create_from_spec(cast(DriveGroupSpec, spec)) | |
448 | # TODO: return True would result in a busy loop | |
449 | # can't know if daemon count changed; create_from_spec doesn't | |
450 | # return a solid indication | |
451 | return False | |
452 | ||
453 | daemons = self.mgr.cache.get_daemons_by_service(service_name) | |
454 | ||
455 | public_network = None | |
456 | if daemon_type == 'mon': | |
457 | ret, out, err = self.mgr.check_mon_command({ | |
458 | 'prefix': 'config get', | |
459 | 'who': 'mon', | |
460 | 'key': 'public_network', | |
461 | }) | |
462 | if '/' in out: | |
463 | public_network = out.strip() | |
464 | self.log.debug('mon public_network is %s' % public_network) | |
465 | ||
466 | def matches_network(host): | |
467 | # type: (str) -> bool | |
468 | if not public_network: | |
469 | return False | |
470 | # make sure we have 1 or more IPs for that network on that | |
471 | # host | |
472 | return len(self.mgr.cache.networks[host].get(public_network, [])) > 0 | |
473 | ||
474 | ha = HostAssignment( | |
475 | spec=spec, | |
476 | hosts=self.mgr._hosts_with_daemon_inventory(), | |
477 | get_daemons_func=self.mgr.cache.get_daemons_by_service, | |
478 | filter_new_host=matches_network if daemon_type == 'mon' else None, | |
479 | ) | |
480 | ||
481 | hosts: List[HostPlacementSpec] = ha.place() | |
482 | self.log.debug('Usable hosts: %s' % hosts) | |
483 | ||
484 | r = None | |
485 | ||
486 | # sanity check | |
487 | if daemon_type in ['mon', 'mgr'] and len(hosts) < 1: | |
488 | self.log.debug('cannot scale mon|mgr below 1 (hosts=%s)' % hosts) | |
489 | return False | |
490 | ||
491 | # add any? | |
492 | did_config = False | |
493 | ||
494 | add_daemon_hosts: Set[HostPlacementSpec] = ha.add_daemon_hosts(hosts) | |
495 | self.log.debug('Hosts that will receive new daemons: %s' % add_daemon_hosts) | |
496 | ||
497 | remove_daemon_hosts: Set[orchestrator.DaemonDescription] = ha.remove_daemon_hosts(hosts) | |
498 | self.log.debug('Hosts that will loose daemons: %s' % remove_daemon_hosts) | |
499 | ||
500 | for host, network, name in add_daemon_hosts: | |
501 | daemon_id = self.mgr.get_unique_name(daemon_type, host, daemons, | |
502 | prefix=spec.service_id, | |
503 | forcename=name) | |
504 | ||
505 | if not did_config and config_func: | |
506 | if daemon_type == 'rgw': | |
507 | rgw_config_func = cast(Callable[[RGWSpec, str], None], config_func) | |
508 | rgw_config_func(cast(RGWSpec, spec), daemon_id) | |
509 | else: | |
510 | config_func(spec) | |
511 | did_config = True | |
512 | ||
513 | daemon_spec = self.mgr.cephadm_services[daemon_type].make_daemon_spec( | |
514 | host, daemon_id, network, spec) | |
515 | self.log.debug('Placing %s.%s on host %s' % ( | |
516 | daemon_type, daemon_id, host)) | |
517 | ||
518 | try: | |
519 | daemon_spec = self.mgr.cephadm_services[daemon_type].prepare_create(daemon_spec) | |
520 | self.mgr._create_daemon(daemon_spec) | |
521 | r = True | |
522 | except (RuntimeError, OrchestratorError) as e: | |
523 | self.mgr.events.for_service(spec, 'ERROR', | |
524 | f"Failed while placing {daemon_type}.{daemon_id}" | |
525 | f"on {host}: {e}") | |
526 | # only return "no change" if no one else has already succeeded. | |
527 | # later successes will also change to True | |
528 | if r is None: | |
529 | r = False | |
530 | continue | |
531 | ||
532 | # add to daemon list so next name(s) will also be unique | |
533 | sd = orchestrator.DaemonDescription( | |
534 | hostname=host, | |
535 | daemon_type=daemon_type, | |
536 | daemon_id=daemon_id, | |
537 | ) | |
538 | daemons.append(sd) | |
539 | ||
540 | # remove any? | |
541 | def _ok_to_stop(remove_daemon_hosts: Set[orchestrator.DaemonDescription]) -> bool: | |
542 | daemon_ids = [d.daemon_id for d in remove_daemon_hosts] | |
543 | r = self.mgr.cephadm_services[daemon_type].ok_to_stop(daemon_ids) | |
544 | return not r.retval | |
545 | ||
546 | while remove_daemon_hosts and not _ok_to_stop(remove_daemon_hosts): | |
547 | # let's find a subset that is ok-to-stop | |
548 | remove_daemon_hosts.pop() | |
549 | for d in remove_daemon_hosts: | |
550 | r = True | |
551 | # NOTE: we are passing the 'force' flag here, which means | |
552 | # we can delete a mon instances data. | |
553 | self.mgr._remove_daemon(d.name(), d.hostname) | |
554 | ||
555 | if r is None: | |
556 | r = False | |
557 | return r | |
558 | ||
559 | def _check_daemons(self) -> None: | |
560 | ||
561 | daemons = self.mgr.cache.get_daemons() | |
562 | daemons_post: Dict[str, List[orchestrator.DaemonDescription]] = defaultdict(list) | |
563 | for dd in daemons: | |
564 | # orphan? | |
565 | spec = self.mgr.spec_store.specs.get(dd.service_name(), None) | |
566 | if not spec and dd.daemon_type not in ['mon', 'mgr', 'osd']: | |
567 | # (mon and mgr specs should always exist; osds aren't matched | |
568 | # to a service spec) | |
569 | self.log.info('Removing orphan daemon %s...' % dd.name()) | |
570 | self.mgr._remove_daemon(dd.name(), dd.hostname) | |
571 | ||
572 | # ignore unmanaged services | |
573 | if spec and spec.unmanaged: | |
574 | continue | |
575 | ||
576 | # These daemon types require additional configs after creation | |
577 | if dd.daemon_type in ['grafana', 'iscsi', 'prometheus', 'alertmanager', 'nfs']: | |
578 | daemons_post[dd.daemon_type].append(dd) | |
579 | ||
580 | if self.mgr.cephadm_services[dd.daemon_type].get_active_daemon( | |
581 | self.mgr.cache.get_daemons_by_service(dd.service_name())).daemon_id == dd.daemon_id: | |
582 | dd.is_active = True | |
583 | else: | |
584 | dd.is_active = False | |
585 | ||
586 | deps = self.mgr._calc_daemon_deps(dd.daemon_type, dd.daemon_id) | |
587 | last_deps, last_config = self.mgr.cache.get_daemon_last_config_deps( | |
588 | dd.hostname, dd.name()) | |
589 | if last_deps is None: | |
590 | last_deps = [] | |
591 | action = self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) | |
592 | if not last_config: | |
593 | self.log.info('Reconfiguring %s (unknown last config time)...' % ( | |
594 | dd.name())) | |
595 | action = 'reconfig' | |
596 | elif last_deps != deps: | |
597 | self.log.debug('%s deps %s -> %s' % (dd.name(), last_deps, | |
598 | deps)) | |
599 | self.log.info('Reconfiguring %s (dependencies changed)...' % ( | |
600 | dd.name())) | |
601 | action = 'reconfig' | |
602 | elif self.mgr.last_monmap and \ | |
603 | self.mgr.last_monmap > last_config and \ | |
604 | dd.daemon_type in CEPH_TYPES: | |
605 | self.log.info('Reconfiguring %s (monmap changed)...' % dd.name()) | |
606 | action = 'reconfig' | |
607 | elif self.mgr.extra_ceph_conf_is_newer(last_config) and \ | |
608 | dd.daemon_type in CEPH_TYPES: | |
609 | self.log.info('Reconfiguring %s (extra config changed)...' % dd.name()) | |
610 | action = 'reconfig' | |
611 | if action: | |
612 | if self.mgr.cache.get_scheduled_daemon_action(dd.hostname, dd.name()) == 'redeploy' \ | |
613 | and action == 'reconfig': | |
614 | action = 'redeploy' | |
615 | try: | |
616 | self.mgr._daemon_action( | |
617 | daemon_type=dd.daemon_type, | |
618 | daemon_id=dd.daemon_id, | |
619 | host=dd.hostname, | |
620 | action=action | |
621 | ) | |
622 | self.mgr.cache.rm_scheduled_daemon_action(dd.hostname, dd.name()) | |
623 | except OrchestratorError as e: | |
624 | self.mgr.events.from_orch_error(e) | |
625 | if dd.daemon_type in daemons_post: | |
626 | del daemons_post[dd.daemon_type] | |
627 | # continue... | |
628 | except Exception as e: | |
629 | self.mgr.events.for_daemon_from_exception(dd.name(), e) | |
630 | if dd.daemon_type in daemons_post: | |
631 | del daemons_post[dd.daemon_type] | |
632 | # continue... | |
633 | ||
634 | # do daemon post actions | |
635 | for daemon_type, daemon_descs in daemons_post.items(): | |
636 | if daemon_type in self.mgr.requires_post_actions: | |
637 | self.mgr.requires_post_actions.remove(daemon_type) | |
638 | self.mgr._get_cephadm_service(daemon_type).daemon_check_post(daemon_descs) | |
639 | ||
adb31ebb | 640 | def convert_tags_to_repo_digest(self) -> None: |
f91f0fd5 TL |
641 | if not self.mgr.use_repo_digest: |
642 | return | |
643 | settings = self.mgr.upgrade.get_distinct_container_image_settings() | |
644 | digests: Dict[str, ContainerInspectInfo] = {} | |
645 | for container_image_ref in set(settings.values()): | |
646 | if not is_repo_digest(container_image_ref): | |
647 | image_info = self.mgr._get_container_image_info(container_image_ref) | |
648 | if image_info.repo_digest: | |
649 | assert is_repo_digest(image_info.repo_digest), image_info | |
650 | digests[container_image_ref] = image_info | |
651 | ||
652 | for entity, container_image_ref in settings.items(): | |
653 | if not is_repo_digest(container_image_ref): | |
654 | image_info = digests[container_image_ref] | |
655 | if image_info.repo_digest: | |
656 | self.mgr.set_container_image(entity, image_info.repo_digest) | |
adb31ebb TL |
657 | |
658 | def _run_cephadm_json(self, | |
659 | host: str, | |
660 | entity: Union[CephadmNoImage, str], | |
661 | command: str, | |
662 | args: List[str], | |
663 | no_fsid: Optional[bool] = False, | |
664 | image: Optional[str] = "", | |
665 | ) -> Any: | |
666 | try: | |
667 | out, err, code = self.mgr._run_cephadm( | |
668 | host, entity, command, args, no_fsid=no_fsid, image=image) | |
669 | if code: | |
670 | raise OrchestratorError(f'host {host} `cephadm {command}` returned {code}: {err}') | |
671 | except Exception as e: | |
672 | raise OrchestratorError(f'host {host} `cephadm {command}` failed: {e}') | |
673 | try: | |
674 | return json.loads(''.join(out)) | |
675 | except (ValueError, KeyError): | |
676 | msg = f'host {host} `cephadm {command}` failed: Cannot decode JSON' | |
677 | self.log.exception(f'{msg}: {"".join(out)}') | |
678 | raise OrchestratorError(msg) | |
679 |