]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/inventory.py
update dh_systemd restart patch for pacific
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
CommitLineData
e306af50
TL
1import datetime
2from copy import copy
3import json
4import logging
f67539c2
TL
5from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set, Mapping, cast, \
6 NamedTuple
e306af50
TL
7
8import orchestrator
9from ceph.deployment import inventory
10from ceph.deployment.service_spec import ServiceSpec
adb31ebb 11from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
f67539c2 12from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent, service_to_daemon_types
e306af50
TL
13
14if TYPE_CHECKING:
15 from .module import CephadmOrchestrator
16
17
18logger = logging.getLogger(__name__)
19
20HOST_CACHE_PREFIX = "host."
21SPEC_STORE_PREFIX = "spec."
e306af50
TL
22
23
24class Inventory:
adb31ebb
TL
25 """
26 The inventory stores a HostSpec for all hosts persistently.
27 """
28
e306af50
TL
29 def __init__(self, mgr: 'CephadmOrchestrator'):
30 self.mgr = mgr
31 # load inventory
32 i = self.mgr.get_store('inventory')
33 if i:
34 self._inventory: Dict[str, dict] = json.loads(i)
adb31ebb
TL
35 # handle old clusters missing 'hostname' key from hostspec
36 for k, v in self._inventory.items():
37 if 'hostname' not in v:
38 v['hostname'] = k
e306af50
TL
39 else:
40 self._inventory = dict()
41 logger.debug('Loaded inventory %s' % self._inventory)
42
43 def keys(self) -> List[str]:
44 return list(self._inventory.keys())
45
46 def __contains__(self, host: str) -> bool:
47 return host in self._inventory
48
adb31ebb 49 def assert_host(self, host: str) -> None:
e306af50
TL
50 if host not in self._inventory:
51 raise OrchestratorError('host %s does not exist' % host)
52
adb31ebb 53 def add_host(self, spec: HostSpec) -> None:
e306af50
TL
54 self._inventory[spec.hostname] = spec.to_json()
55 self.save()
56
adb31ebb 57 def rm_host(self, host: str) -> None:
e306af50
TL
58 self.assert_host(host)
59 del self._inventory[host]
60 self.save()
61
adb31ebb 62 def set_addr(self, host: str, addr: str) -> None:
e306af50
TL
63 self.assert_host(host)
64 self._inventory[host]['addr'] = addr
65 self.save()
66
adb31ebb 67 def add_label(self, host: str, label: str) -> None:
e306af50
TL
68 self.assert_host(host)
69
70 if 'labels' not in self._inventory[host]:
71 self._inventory[host]['labels'] = list()
72 if label not in self._inventory[host]['labels']:
73 self._inventory[host]['labels'].append(label)
74 self.save()
75
adb31ebb 76 def rm_label(self, host: str, label: str) -> None:
e306af50
TL
77 self.assert_host(host)
78
79 if 'labels' not in self._inventory[host]:
80 self._inventory[host]['labels'] = list()
81 if label in self._inventory[host]['labels']:
82 self._inventory[host]['labels'].remove(label)
83 self.save()
84
adb31ebb 85 def get_addr(self, host: str) -> str:
e306af50
TL
86 self.assert_host(host)
87 return self._inventory[host].get('addr', host)
88
89 def filter_by_label(self, label: Optional[str] = '', as_hostspec: bool = False) -> Iterator:
90 for h, hostspec in self._inventory.items():
91 if not label or label in hostspec.get('labels', []):
92 if as_hostspec:
f6b5b4d7
TL
93 yield self.spec_from_dict(hostspec)
94 else:
95 yield h
e306af50 96
adb31ebb 97 def spec_from_dict(self, info: dict) -> HostSpec:
e306af50
TL
98 hostname = info['hostname']
99 return HostSpec(
f91f0fd5
TL
100 hostname,
101 addr=info.get('addr', hostname),
102 labels=info.get('labels', []),
103 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
104 )
e306af50 105
f91f0fd5
TL
106 def all_specs(self) -> List[HostSpec]:
107 return list(map(self.spec_from_dict, self._inventory.values()))
e306af50 108
f67539c2
TL
109 def get_host_with_state(self, state: str = "") -> List[str]:
110 """return a list of host names in a specific state"""
111 return [h for h in self._inventory if self._inventory[h].get("status", "").lower() == state]
112
adb31ebb 113 def save(self) -> None:
e306af50
TL
114 self.mgr.set_store('inventory', json.dumps(self._inventory))
115
116
f67539c2
TL
117class SpecDescription(NamedTuple):
118 spec: ServiceSpec
119 created: datetime.datetime
120 deleted: Optional[datetime.datetime]
121
122
e306af50
TL
123class SpecStore():
124 def __init__(self, mgr):
125 # type: (CephadmOrchestrator) -> None
126 self.mgr = mgr
f67539c2 127 self._specs = {} # type: Dict[str, ServiceSpec]
f91f0fd5 128 self.spec_created = {} # type: Dict[str, datetime.datetime]
f67539c2 129 self.spec_deleted = {} # type: Dict[str, datetime.datetime]
f91f0fd5 130 self.spec_preview = {} # type: Dict[str, ServiceSpec]
e306af50 131
f67539c2
TL
132 @property
133 def all_specs(self) -> Mapping[str, ServiceSpec]:
134 """
135 returns active and deleted specs. Returns read-only dict.
136 """
137 return self._specs
138
139 def __contains__(self, name: str) -> bool:
140 return name in self._specs
141
142 def __getitem__(self, name: str) -> SpecDescription:
143 if name not in self._specs:
144 raise OrchestratorError(f'Service {name} not found.')
145 return SpecDescription(self._specs[name],
146 self.spec_created[name],
147 self.spec_deleted.get(name, None))
148
149 @property
150 def active_specs(self) -> Mapping[str, ServiceSpec]:
151 return {k: v for k, v in self._specs.items() if k not in self.spec_deleted}
152
e306af50
TL
153 def load(self):
154 # type: () -> None
f67539c2 155 for k, v in self.mgr.get_store_prefix(SPEC_STORE_PREFIX).items():
e306af50
TL
156 service_name = k[len(SPEC_STORE_PREFIX):]
157 try:
f67539c2
TL
158 j = cast(Dict[str, dict], json.loads(v))
159 spec = ServiceSpec.from_json(j['spec'])
160 created = str_to_datetime(cast(str, j['created']))
161 self._specs[service_name] = spec
e306af50 162 self.spec_created[service_name] = created
f67539c2
TL
163
164 if 'deleted' in v:
165 deleted = str_to_datetime(cast(str, j['deleted']))
166 self.spec_deleted[service_name] = deleted
167
e306af50
TL
168 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
169 service_name))
170 except Exception as e:
171 self.mgr.log.warning('unable to load spec for %s: %s' % (
172 service_name, e))
173 pass
174
f67539c2
TL
175 def save(self, spec: ServiceSpec, update_create: bool = True) -> None:
176 name = spec.service_name()
f6b5b4d7 177 if spec.preview_only:
f67539c2 178 self.spec_preview[name] = spec
f6b5b4d7 179 return None
f67539c2
TL
180 self._specs[name] = spec
181
182 if update_create:
183 self.spec_created[name] = datetime_now()
184
185 data = {
186 'spec': spec.to_json(),
187 'created': datetime_to_str(self.spec_created[name]),
188 }
189 if name in self.spec_deleted:
190 data['deleted'] = datetime_to_str(self.spec_deleted[name])
191
e306af50 192 self.mgr.set_store(
f67539c2
TL
193 SPEC_STORE_PREFIX + name,
194 json.dumps(data, sort_keys=True),
e306af50 195 )
f6b5b4d7 196 self.mgr.events.for_service(spec, OrchestratorEvent.INFO, 'service was created')
e306af50 197
f67539c2
TL
198 def rm(self, service_name: str) -> bool:
199 if service_name not in self._specs:
200 return False
201
202 if self._specs[service_name].preview_only:
203 self.finally_rm(service_name)
204 return True
205
206 self.spec_deleted[service_name] = datetime_now()
207 self.save(self._specs[service_name], update_create=False)
208 return True
209
210 def finally_rm(self, service_name):
e306af50 211 # type: (str) -> bool
f67539c2 212 found = service_name in self._specs
e306af50 213 if found:
f67539c2 214 del self._specs[service_name]
e306af50 215 del self.spec_created[service_name]
f67539c2
TL
216 if service_name in self.spec_deleted:
217 del self.spec_deleted[service_name]
e306af50
TL
218 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
219 return found
220
f67539c2
TL
221 def get_created(self, spec: ServiceSpec) -> Optional[datetime.datetime]:
222 return self.spec_created.get(spec.service_name())
e306af50 223
f91f0fd5 224
e306af50 225class HostCache():
adb31ebb
TL
226 """
227 HostCache stores different things:
228
229 1. `daemons`: Deployed daemons O(daemons)
230
231 They're part of the configuration nowadays and need to be
232 persistent. The name "daemon cache" is unfortunately a bit misleading.
233 Like for example we really need to know where daemons are deployed on
234 hosts that are offline.
235
236 2. `devices`: ceph-volume inventory cache O(hosts)
237
238 As soon as this is populated, it becomes more or less read-only.
239
240 3. `networks`: network interfaces for each host. O(hosts)
241
242 This is needed in order to deploy MONs. As this is mostly read-only.
243
244 4. `last_etc_ceph_ceph_conf` O(hosts)
245
f67539c2 246 Stores the last refresh time for the /etc/ceph/ceph.conf. Used
adb31ebb
TL
247 to avoid deploying new configs when failing over to a new mgr.
248
249 5. `scheduled_daemon_actions`: O(daemons)
250
251 Used to run daemon actions after deploying a daemon. We need to
252 store it persistently, in order to stay consistent across
f67539c2 253 MGR failovers.
adb31ebb
TL
254 """
255
e306af50
TL
256 def __init__(self, mgr):
257 # type: (CephadmOrchestrator) -> None
258 self.mgr: CephadmOrchestrator = mgr
259 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
260 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
261 self.devices = {} # type: Dict[str, List[inventory.Device]]
adb31ebb
TL
262 self.facts = {} # type: Dict[str, Dict[str, Any]]
263 self.last_facts_update = {} # type: Dict[str, datetime.datetime]
e306af50 264 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
f67539c2
TL
265 self.osdspec_last_applied = {} # type: Dict[str, Dict[str, datetime.datetime]]
266 self.networks = {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
e306af50 267 self.last_device_update = {} # type: Dict[str, datetime.datetime]
f67539c2 268 self.last_device_change = {} # type: Dict[str, datetime.datetime]
f91f0fd5
TL
269 self.daemon_refresh_queue = [] # type: List[str]
270 self.device_refresh_queue = [] # type: List[str]
271 self.osdspec_previews_refresh_queue = [] # type: List[str]
f6b5b4d7
TL
272
273 # host -> daemon name -> dict
e306af50
TL
274 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
275 self.last_host_check = {} # type: Dict[str, datetime.datetime]
276 self.loading_osdspec_preview = set() # type: Set[str]
f6b5b4d7
TL
277 self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {}
278 self.registry_login_queue: Set[str] = set()
e306af50 279
f91f0fd5
TL
280 self.scheduled_daemon_actions: Dict[str, Dict[str, str]] = {}
281
e306af50
TL
282 def load(self):
283 # type: () -> None
f67539c2 284 for k, v in self.mgr.get_store_prefix(HOST_CACHE_PREFIX).items():
e306af50
TL
285 host = k[len(HOST_CACHE_PREFIX):]
286 if host not in self.mgr.inventory:
287 self.mgr.log.warning('removing stray HostCache host record %s' % (
288 host))
289 self.mgr.set_store(k, None)
290 try:
291 j = json.loads(v)
292 if 'last_device_update' in j:
f91f0fd5 293 self.last_device_update[host] = str_to_datetime(j['last_device_update'])
e306af50
TL
294 else:
295 self.device_refresh_queue.append(host)
f67539c2
TL
296 if 'last_device_change' in j:
297 self.last_device_change[host] = str_to_datetime(j['last_device_change'])
e306af50
TL
298 # for services, we ignore the persisted last_*_update
299 # and always trigger a new scrape on mgr restart.
300 self.daemon_refresh_queue.append(host)
301 self.daemons[host] = {}
302 self.osdspec_previews[host] = []
f67539c2 303 self.osdspec_last_applied[host] = {}
e306af50
TL
304 self.devices[host] = []
305 self.networks[host] = {}
306 self.daemon_config_deps[host] = {}
307 for name, d in j.get('daemons', {}).items():
308 self.daemons[host][name] = \
309 orchestrator.DaemonDescription.from_json(d)
310 for d in j.get('devices', []):
311 self.devices[host].append(inventory.Device.from_json(d))
f67539c2 312 self.networks[host] = j.get('networks_and_interfaces', {})
e306af50 313 self.osdspec_previews[host] = j.get('osdspec_previews', {})
f67539c2
TL
314 for name, ts in j.get('osdspec_last_applied', {}).items():
315 self.osdspec_last_applied[host][name] = str_to_datetime(ts)
e306af50
TL
316
317 for name, d in j.get('daemon_config_deps', {}).items():
318 self.daemon_config_deps[host][name] = {
319 'deps': d.get('deps', []),
f91f0fd5 320 'last_config': str_to_datetime(d['last_config']),
e306af50
TL
321 }
322 if 'last_host_check' in j:
f91f0fd5 323 self.last_host_check[host] = str_to_datetime(j['last_host_check'])
f6b5b4d7 324 if 'last_etc_ceph_ceph_conf' in j:
f91f0fd5
TL
325 self.last_etc_ceph_ceph_conf[host] = str_to_datetime(
326 j['last_etc_ceph_ceph_conf'])
f6b5b4d7 327 self.registry_login_queue.add(host)
f91f0fd5
TL
328 self.scheduled_daemon_actions[host] = j.get('scheduled_daemon_actions', {})
329
e306af50
TL
330 self.mgr.log.debug(
331 'HostCache.load: host %s has %d daemons, '
332 '%d devices, %d networks' % (
333 host, len(self.daemons[host]), len(self.devices[host]),
334 len(self.networks[host])))
335 except Exception as e:
336 self.mgr.log.warning('unable to load cached state for %s: %s' % (
337 host, e))
338 pass
339
340 def update_host_daemons(self, host, dm):
341 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
342 self.daemons[host] = dm
adb31ebb
TL
343 self.last_daemon_update[host] = datetime_now()
344
345 def update_host_facts(self, host, facts):
346 # type: (str, Dict[str, Dict[str, Any]]) -> None
347 self.facts[host] = facts
f67539c2
TL
348 self.last_facts_update[host] = datetime_now()
349
350 def devices_changed(self, host: str, b: List[inventory.Device]) -> bool:
351 a = self.devices[host]
352 if len(a) != len(b):
353 return True
354 aj = {d.path: d.to_json() for d in a}
355 bj = {d.path: d.to_json() for d in b}
356 if aj != bj:
357 self.mgr.log.info("Detected new or changed devices on %s" % host)
358 return True
359 return False
e306af50 360
f67539c2
TL
361 def update_host_devices_networks(
362 self,
363 host: str,
364 dls: List[inventory.Device],
365 nets: Dict[str, Dict[str, List[str]]]
366 ) -> None:
367 if (
368 host not in self.devices
369 or host not in self.last_device_change
370 or self.devices_changed(host, dls)
371 ):
372 self.last_device_change[host] = datetime_now()
373 self.last_device_update[host] = datetime_now()
e306af50
TL
374 self.devices[host] = dls
375 self.networks[host] = nets
e306af50 376
adb31ebb 377 def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
e306af50
TL
378 self.daemon_config_deps[host][name] = {
379 'deps': deps,
380 'last_config': stamp,
381 }
382
383 def update_last_host_check(self, host):
384 # type: (str) -> None
adb31ebb 385 self.last_host_check[host] = datetime_now()
e306af50 386
f67539c2
TL
387 def update_osdspec_last_applied(self, host, service_name, ts):
388 # type: (str, str, datetime.datetime) -> None
389 self.osdspec_last_applied[host][service_name] = ts
390
e306af50
TL
391 def prime_empty_host(self, host):
392 # type: (str) -> None
393 """
394 Install an empty entry for a host
395 """
396 self.daemons[host] = {}
397 self.devices[host] = []
398 self.networks[host] = {}
399 self.osdspec_previews[host] = []
f67539c2 400 self.osdspec_last_applied[host] = {}
e306af50
TL
401 self.daemon_config_deps[host] = {}
402 self.daemon_refresh_queue.append(host)
403 self.device_refresh_queue.append(host)
404 self.osdspec_previews_refresh_queue.append(host)
f6b5b4d7 405 self.registry_login_queue.add(host)
e306af50
TL
406
407 def invalidate_host_daemons(self, host):
408 # type: (str) -> None
409 self.daemon_refresh_queue.append(host)
410 if host in self.last_daemon_update:
411 del self.last_daemon_update[host]
412 self.mgr.event.set()
413
414 def invalidate_host_devices(self, host):
415 # type: (str) -> None
416 self.device_refresh_queue.append(host)
417 if host in self.last_device_update:
418 del self.last_device_update[host]
419 self.mgr.event.set()
f91f0fd5 420
adb31ebb 421 def distribute_new_registry_login_info(self) -> None:
f6b5b4d7 422 self.registry_login_queue = set(self.mgr.inventory.keys())
e306af50 423
f91f0fd5
TL
424 def save_host(self, host: str) -> None:
425 j: Dict[str, Any] = {
e306af50
TL
426 'daemons': {},
427 'devices': [],
428 'osdspec_previews': [],
f67539c2 429 'osdspec_last_applied': {},
e306af50
TL
430 'daemon_config_deps': {},
431 }
432 if host in self.last_daemon_update:
f91f0fd5 433 j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
e306af50 434 if host in self.last_device_update:
f91f0fd5 435 j['last_device_update'] = datetime_to_str(self.last_device_update[host])
f67539c2
TL
436 if host in self.last_device_change:
437 j['last_device_change'] = datetime_to_str(self.last_device_change[host])
adb31ebb
TL
438 if host in self.daemons:
439 for name, dd in self.daemons[host].items():
440 j['daemons'][name] = dd.to_json()
441 if host in self.devices:
442 for d in self.devices[host]:
443 j['devices'].append(d.to_json())
444 if host in self.networks:
f67539c2 445 j['networks_and_interfaces'] = self.networks[host]
adb31ebb
TL
446 if host in self.daemon_config_deps:
447 for name, depi in self.daemon_config_deps[host].items():
448 j['daemon_config_deps'][name] = {
449 'deps': depi.get('deps', []),
450 'last_config': datetime_to_str(depi['last_config']),
451 }
452 if host in self.osdspec_previews and self.osdspec_previews[host]:
e306af50 453 j['osdspec_previews'] = self.osdspec_previews[host]
f67539c2
TL
454 if host in self.osdspec_last_applied:
455 for name, ts in self.osdspec_last_applied[host].items():
456 j['osdspec_last_applied'][name] = datetime_to_str(ts)
e306af50
TL
457
458 if host in self.last_host_check:
f91f0fd5 459 j['last_host_check'] = datetime_to_str(self.last_host_check[host])
f6b5b4d7
TL
460
461 if host in self.last_etc_ceph_ceph_conf:
f91f0fd5 462 j['last_etc_ceph_ceph_conf'] = datetime_to_str(self.last_etc_ceph_ceph_conf[host])
adb31ebb 463 if host in self.scheduled_daemon_actions:
f91f0fd5 464 j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
f6b5b4d7 465
e306af50
TL
466 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
467
468 def rm_host(self, host):
469 # type: (str) -> None
470 if host in self.daemons:
471 del self.daemons[host]
472 if host in self.devices:
473 del self.devices[host]
adb31ebb
TL
474 if host in self.facts:
475 del self.facts[host]
476 if host in self.last_facts_update:
477 del self.last_facts_update[host]
e306af50
TL
478 if host in self.osdspec_previews:
479 del self.osdspec_previews[host]
f67539c2
TL
480 if host in self.osdspec_last_applied:
481 del self.osdspec_last_applied[host]
e306af50
TL
482 if host in self.loading_osdspec_preview:
483 self.loading_osdspec_preview.remove(host)
484 if host in self.networks:
485 del self.networks[host]
486 if host in self.last_daemon_update:
487 del self.last_daemon_update[host]
488 if host in self.last_device_update:
489 del self.last_device_update[host]
f67539c2
TL
490 if host in self.last_device_change:
491 del self.last_device_change[host]
e306af50
TL
492 if host in self.daemon_config_deps:
493 del self.daemon_config_deps[host]
f91f0fd5
TL
494 if host in self.scheduled_daemon_actions:
495 del self.scheduled_daemon_actions[host]
e306af50
TL
496 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
497
498 def get_hosts(self):
499 # type: () -> List[str]
500 r = []
501 for host, di in self.daemons.items():
502 r.append(host)
503 return r
504
505 def get_daemons(self):
506 # type: () -> List[orchestrator.DaemonDescription]
507 r = []
508 for host, dm in self.daemons.items():
509 for name, dd in dm.items():
510 r.append(dd)
511 return r
512
f6b5b4d7 513 def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
f67539c2 514 assert not daemon_name.startswith('ha-rgw.')
f6b5b4d7
TL
515 for _, dm in self.daemons.items():
516 for _, dd in dm.items():
517 if dd.name() == daemon_name:
518 return dd
519 raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
520
e306af50 521 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
adb31ebb 522 def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
f6b5b4d7 523 dd = copy(dd_orig)
e306af50 524 if host in self.mgr.offline_hosts:
f67539c2 525 dd.status = orchestrator.DaemonDescriptionStatus.error
f6b5b4d7
TL
526 dd.status_desc = 'host is offline'
527 dd.events = self.mgr.events.get_for_daemon(dd.name())
528 return dd
529
530 for host, dm in self.daemons.items():
531 yield host, {name: alter(host, d) for name, d in dm.items()}
e306af50
TL
532
533 def get_daemons_by_service(self, service_name):
534 # type: (str) -> List[orchestrator.DaemonDescription]
f67539c2
TL
535 assert not service_name.startswith('keepalived.')
536 assert not service_name.startswith('haproxy.')
537
e306af50
TL
538 result = [] # type: List[orchestrator.DaemonDescription]
539 for host, dm in self.daemons.items():
540 for name, d in dm.items():
f6b5b4d7
TL
541 if d.service_name() == service_name:
542 result.append(d)
543 return result
544
545 def get_daemons_by_type(self, service_type):
546 # type: (str) -> List[orchestrator.DaemonDescription]
f67539c2
TL
547 assert service_type not in ['keepalived', 'haproxy']
548
f6b5b4d7
TL
549 result = [] # type: List[orchestrator.DaemonDescription]
550 for host, dm in self.daemons.items():
551 for name, d in dm.items():
f67539c2 552 if d.daemon_type in service_to_daemon_types(service_type):
e306af50
TL
553 result.append(d)
554 return result
555
f67539c2
TL
556 def get_daemon_types(self, hostname: str) -> List[str]:
557 """Provide a list of the types of daemons on the host"""
558 result = set()
559 for _d, dm in self.daemons[hostname].items():
560 assert dm.daemon_type is not None, f'no daemon type for {dm!r}'
561 result.add(dm.daemon_type)
562 return list(result)
563
e306af50
TL
564 def get_daemon_names(self):
565 # type: () -> List[str]
566 r = []
567 for host, dm in self.daemons.items():
568 for name, dd in dm.items():
569 r.append(name)
570 return r
571
adb31ebb 572 def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
e306af50
TL
573 if host in self.daemon_config_deps:
574 if name in self.daemon_config_deps[host]:
575 return self.daemon_config_deps[host][name].get('deps', []), \
576 self.daemon_config_deps[host][name].get('last_config', None)
577 return None, None
578
579 def host_needs_daemon_refresh(self, host):
580 # type: (str) -> bool
581 if host in self.mgr.offline_hosts:
582 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
583 return False
584 if host in self.daemon_refresh_queue:
585 self.daemon_refresh_queue.remove(host)
586 return True
adb31ebb 587 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
588 seconds=self.mgr.daemon_cache_timeout)
589 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
590 return True
591 return False
592
adb31ebb
TL
593 def host_needs_facts_refresh(self, host):
594 # type: (str) -> bool
595 if host in self.mgr.offline_hosts:
596 logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
597 return False
f67539c2 598 cutoff = datetime_now() - datetime.timedelta(
adb31ebb
TL
599 seconds=self.mgr.facts_cache_timeout)
600 if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
601 return True
602 return False
603
f91f0fd5
TL
604 def host_had_daemon_refresh(self, host: str) -> bool:
605 """
606 ... at least once.
607 """
608 if host in self.last_daemon_update:
609 return True
610 if host not in self.daemons:
611 return False
612 return bool(self.daemons[host])
613
e306af50
TL
614 def host_needs_device_refresh(self, host):
615 # type: (str) -> bool
616 if host in self.mgr.offline_hosts:
617 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
618 return False
619 if host in self.device_refresh_queue:
620 self.device_refresh_queue.remove(host)
621 return True
adb31ebb 622 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
623 seconds=self.mgr.device_cache_timeout)
624 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
625 return True
626 return False
627
adb31ebb 628 def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
e306af50
TL
629 if host in self.mgr.offline_hosts:
630 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
631 return False
632 if host in self.osdspec_previews_refresh_queue:
633 self.osdspec_previews_refresh_queue.remove(host)
634 return True
635 # Since this is dependent on other factors (device and spec) this does not need
636 # to be updated periodically.
637 return False
638
639 def host_needs_check(self, host):
640 # type: (str) -> bool
adb31ebb 641 cutoff = datetime_now() - datetime.timedelta(
e306af50
TL
642 seconds=self.mgr.host_check_interval)
643 return host not in self.last_host_check or self.last_host_check[host] < cutoff
644
adb31ebb 645 def host_needs_new_etc_ceph_ceph_conf(self, host: str) -> bool:
f6b5b4d7
TL
646 if not self.mgr.manage_etc_ceph_ceph_conf:
647 return False
648 if self.mgr.paused:
649 return False
650 if host in self.mgr.offline_hosts:
651 return False
652 if not self.mgr.last_monmap:
653 return False
654 if host not in self.last_etc_ceph_ceph_conf:
655 return True
656 if self.mgr.last_monmap > self.last_etc_ceph_ceph_conf[host]:
657 return True
f91f0fd5
TL
658 if self.mgr.extra_ceph_conf_is_newer(self.last_etc_ceph_ceph_conf[host]):
659 return True
f6b5b4d7
TL
660 # already up to date:
661 return False
f91f0fd5 662
f67539c2
TL
663 def osdspec_needs_apply(self, host: str, spec: ServiceSpec) -> bool:
664 if (
665 host not in self.devices
666 or host not in self.last_device_change
667 or host not in self.last_device_update
668 or host not in self.osdspec_last_applied
669 or spec.service_name() not in self.osdspec_last_applied[host]
670 ):
671 return True
672 created = self.mgr.spec_store.get_created(spec)
673 if not created or created > self.last_device_change[host]:
674 return True
675 return self.osdspec_last_applied[host][spec.service_name()] < self.last_device_change[host]
676
adb31ebb 677 def update_last_etc_ceph_ceph_conf(self, host: str) -> None:
f6b5b4d7
TL
678 if not self.mgr.last_monmap:
679 return
adb31ebb 680 self.last_etc_ceph_ceph_conf[host] = datetime_now()
f6b5b4d7 681
f91f0fd5 682 def host_needs_registry_login(self, host: str) -> bool:
f6b5b4d7
TL
683 if host in self.mgr.offline_hosts:
684 return False
685 if host in self.registry_login_queue:
686 self.registry_login_queue.remove(host)
687 return True
688 return False
689
e306af50
TL
690 def add_daemon(self, host, dd):
691 # type: (str, orchestrator.DaemonDescription) -> None
692 assert host in self.daemons
693 self.daemons[host][dd.name()] = dd
694
adb31ebb 695 def rm_daemon(self, host: str, name: str) -> None:
f67539c2
TL
696 assert not name.startswith('ha-rgw.')
697
e306af50
TL
698 if host in self.daemons:
699 if name in self.daemons[host]:
f6b5b4d7
TL
700 del self.daemons[host][name]
701
adb31ebb 702 def daemon_cache_filled(self) -> bool:
f6b5b4d7
TL
703 """
704 i.e. we have checked the daemons for each hosts at least once.
705 excluding offline hosts.
706
707 We're not checking for `host_needs_daemon_refresh`, as this might never be
708 False for all hosts.
709 """
f91f0fd5 710 return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
f6b5b4d7
TL
711 for h in self.get_hosts())
712
adb31ebb 713 def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
f67539c2
TL
714 assert not daemon_name.startswith('ha-rgw.')
715
f91f0fd5
TL
716 priorities = {
717 'start': 1,
718 'restart': 2,
719 'reconfig': 3,
720 'redeploy': 4,
721 'stop': 5,
722 }
723 existing_action = self.scheduled_daemon_actions.get(host, {}).get(daemon_name, None)
724 if existing_action and priorities[existing_action] > priorities[action]:
725 logger.debug(
726 f'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
727 return
728
729 if host not in self.scheduled_daemon_actions:
730 self.scheduled_daemon_actions[host] = {}
731 self.scheduled_daemon_actions[host][daemon_name] = action
732
adb31ebb 733 def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None:
f91f0fd5
TL
734 if host in self.scheduled_daemon_actions:
735 if daemon_name in self.scheduled_daemon_actions[host]:
736 del self.scheduled_daemon_actions[host][daemon_name]
737 if not self.scheduled_daemon_actions[host]:
738 del self.scheduled_daemon_actions[host]
739
adb31ebb 740 def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
f67539c2
TL
741 assert not daemon.startswith('ha-rgw.')
742
f91f0fd5
TL
743 return self.scheduled_daemon_actions.get(host, {}).get(daemon)
744
f6b5b4d7
TL
745
746class EventStore():
747 def __init__(self, mgr):
748 # type: (CephadmOrchestrator) -> None
749 self.mgr: CephadmOrchestrator = mgr
f91f0fd5 750 self.events = {} # type: Dict[str, List[OrchestratorEvent]]
f6b5b4d7
TL
751
752 def add(self, event: OrchestratorEvent) -> None:
753
754 if event.kind_subject() not in self.events:
755 self.events[event.kind_subject()] = [event]
756
757 for e in self.events[event.kind_subject()]:
758 if e.message == event.message:
759 return
760
761 self.events[event.kind_subject()].append(event)
762
763 # limit to five events for now.
764 self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
765
adb31ebb
TL
766 def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
767 e = OrchestratorEvent(datetime_now(), 'service',
f91f0fd5 768 spec.service_name(), level, message)
f6b5b4d7
TL
769 self.add(e)
770
adb31ebb 771 def from_orch_error(self, e: OrchestratorError) -> None:
f6b5b4d7
TL
772 if e.event_subject is not None:
773 self.add(OrchestratorEvent(
adb31ebb 774 datetime_now(),
f6b5b4d7
TL
775 e.event_subject[0],
776 e.event_subject[1],
777 "ERROR",
778 str(e)
779 ))
780
adb31ebb
TL
781 def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
782 e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
f6b5b4d7
TL
783 self.add(e)
784
adb31ebb 785 def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
f6b5b4d7
TL
786 self.for_daemon(
787 daemon_name,
788 "ERROR",
789 str(e)
790 )
791
792 def cleanup(self) -> None:
793 # Needs to be properly done, in case events are persistently stored.
794
795 unknowns: List[str] = []
796 daemons = self.mgr.cache.get_daemon_names()
f67539c2 797 specs = self.mgr.spec_store.all_specs.keys()
f6b5b4d7
TL
798 for k_s, v in self.events.items():
799 kind, subject = k_s.split(':')
800 if kind == 'service':
801 if subject not in specs:
802 unknowns.append(k_s)
803 elif kind == 'daemon':
804 if subject not in daemons:
805 unknowns.append(k_s)
806
807 for k_s in unknowns:
808 del self.events[k_s]
809
adb31ebb 810 def get_for_service(self, name: str) -> List[OrchestratorEvent]:
f6b5b4d7
TL
811 return self.events.get('service:' + name, [])
812
adb31ebb 813 def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
f6b5b4d7 814 return self.events.get('daemon:' + name, [])