]>
Commit | Line | Data |
---|---|---|
1 | import datetime | |
2 | from copy import copy | |
3 | import json | |
4 | import logging | |
5 | from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set | |
6 | ||
7 | import six | |
8 | ||
9 | import orchestrator | |
10 | from ceph.deployment import inventory | |
11 | from ceph.deployment.service_spec import ServiceSpec | |
12 | from orchestrator import OrchestratorError, HostSpec | |
13 | ||
14 | if TYPE_CHECKING: | |
15 | from .module import CephadmOrchestrator | |
16 | ||
17 | ||
18 | logger = logging.getLogger(__name__) | |
19 | ||
20 | HOST_CACHE_PREFIX = "host." | |
21 | SPEC_STORE_PREFIX = "spec." | |
22 | DATEFMT = '%Y-%m-%dT%H:%M:%S.%f' | |
23 | ||
24 | ||
25 | class Inventory: | |
26 | def __init__(self, mgr: 'CephadmOrchestrator'): | |
27 | self.mgr = mgr | |
28 | # load inventory | |
29 | i = self.mgr.get_store('inventory') | |
30 | if i: | |
31 | self._inventory: Dict[str, dict] = json.loads(i) | |
32 | else: | |
33 | self._inventory = dict() | |
34 | logger.debug('Loaded inventory %s' % self._inventory) | |
35 | ||
36 | def keys(self) -> List[str]: | |
37 | return list(self._inventory.keys()) | |
38 | ||
39 | def __contains__(self, host: str) -> bool: | |
40 | return host in self._inventory | |
41 | ||
42 | def assert_host(self, host): | |
43 | if host not in self._inventory: | |
44 | raise OrchestratorError('host %s does not exist' % host) | |
45 | ||
46 | def add_host(self, spec: HostSpec): | |
47 | self._inventory[spec.hostname] = spec.to_json() | |
48 | self.save() | |
49 | ||
50 | def rm_host(self, host: str): | |
51 | self.assert_host(host) | |
52 | del self._inventory[host] | |
53 | self.save() | |
54 | ||
55 | def set_addr(self, host, addr): | |
56 | self.assert_host(host) | |
57 | self._inventory[host]['addr'] = addr | |
58 | self.save() | |
59 | ||
60 | def add_label(self, host, label): | |
61 | self.assert_host(host) | |
62 | ||
63 | if 'labels' not in self._inventory[host]: | |
64 | self._inventory[host]['labels'] = list() | |
65 | if label not in self._inventory[host]['labels']: | |
66 | self._inventory[host]['labels'].append(label) | |
67 | self.save() | |
68 | ||
69 | def rm_label(self, host, label): | |
70 | self.assert_host(host) | |
71 | ||
72 | if 'labels' not in self._inventory[host]: | |
73 | self._inventory[host]['labels'] = list() | |
74 | if label in self._inventory[host]['labels']: | |
75 | self._inventory[host]['labels'].remove(label) | |
76 | self.save() | |
77 | ||
78 | def get_addr(self, host) -> str: | |
79 | self.assert_host(host) | |
80 | return self._inventory[host].get('addr', host) | |
81 | ||
82 | def filter_by_label(self, label: Optional[str] = '', as_hostspec: bool = False) -> Iterator: | |
83 | for h, hostspec in self._inventory.items(): | |
84 | if not label or label in hostspec.get('labels', []): | |
85 | if as_hostspec: | |
86 | yield hostspec | |
87 | yield h | |
88 | ||
89 | def spec_from_dict(self, info): | |
90 | hostname = info['hostname'] | |
91 | return HostSpec( | |
92 | hostname, | |
93 | addr=info.get('addr', hostname), | |
94 | labels=info.get('labels', []), | |
95 | status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''), | |
96 | ) | |
97 | ||
98 | def all_specs(self) -> Iterator[HostSpec]: | |
99 | return map(self.spec_from_dict, self._inventory.values()) | |
100 | ||
101 | def save(self): | |
102 | self.mgr.set_store('inventory', json.dumps(self._inventory)) | |
103 | ||
104 | ||
105 | class SpecStore(): | |
106 | def __init__(self, mgr): | |
107 | # type: (CephadmOrchestrator) -> None | |
108 | self.mgr = mgr | |
109 | self.specs = {} # type: Dict[str, ServiceSpec] | |
110 | self.spec_created = {} # type: Dict[str, datetime.datetime] | |
111 | ||
112 | def load(self): | |
113 | # type: () -> None | |
114 | for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)): | |
115 | service_name = k[len(SPEC_STORE_PREFIX):] | |
116 | try: | |
117 | v = json.loads(v) | |
118 | spec = ServiceSpec.from_json(v['spec']) | |
119 | created = datetime.datetime.strptime(v['created'], DATEFMT) | |
120 | self.specs[service_name] = spec | |
121 | self.spec_created[service_name] = created | |
122 | self.mgr.log.debug('SpecStore: loaded spec for %s' % ( | |
123 | service_name)) | |
124 | except Exception as e: | |
125 | self.mgr.log.warning('unable to load spec for %s: %s' % ( | |
126 | service_name, e)) | |
127 | pass | |
128 | ||
129 | def save(self, spec): | |
130 | # type: (ServiceSpec) -> None | |
131 | self.specs[spec.service_name()] = spec | |
132 | self.spec_created[spec.service_name()] = datetime.datetime.utcnow() | |
133 | self.mgr.set_store( | |
134 | SPEC_STORE_PREFIX + spec.service_name(), | |
135 | json.dumps({ | |
136 | 'spec': spec.to_json(), | |
137 | 'created': self.spec_created[spec.service_name()].strftime(DATEFMT), | |
138 | }, sort_keys=True), | |
139 | ) | |
140 | ||
141 | def rm(self, service_name): | |
142 | # type: (str) -> bool | |
143 | found = service_name in self.specs | |
144 | if found: | |
145 | del self.specs[service_name] | |
146 | del self.spec_created[service_name] | |
147 | self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None) | |
148 | return found | |
149 | ||
150 | def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]: | |
151 | specs = [] | |
152 | for sn, spec in self.specs.items(): | |
153 | if not service_name or \ | |
154 | sn == service_name or \ | |
155 | sn.startswith(service_name + '.'): | |
156 | specs.append(spec) | |
157 | self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % ( | |
158 | service_name, specs)) | |
159 | return specs | |
160 | ||
161 | class HostCache(): | |
162 | def __init__(self, mgr): | |
163 | # type: (CephadmOrchestrator) -> None | |
164 | self.mgr: CephadmOrchestrator = mgr | |
165 | self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] | |
166 | self.last_daemon_update = {} # type: Dict[str, datetime.datetime] | |
167 | self.devices = {} # type: Dict[str, List[inventory.Device]] | |
168 | self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]] | |
169 | self.networks = {} # type: Dict[str, Dict[str, List[str]]] | |
170 | self.last_device_update = {} # type: Dict[str, datetime.datetime] | |
171 | self.daemon_refresh_queue = [] # type: List[str] | |
172 | self.device_refresh_queue = [] # type: List[str] | |
173 | self.osdspec_previews_refresh_queue = [] # type: List[str] | |
174 | self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]] | |
175 | self.last_host_check = {} # type: Dict[str, datetime.datetime] | |
176 | self.loading_osdspec_preview = set() # type: Set[str] | |
177 | ||
178 | def load(self): | |
179 | # type: () -> None | |
180 | for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)): | |
181 | host = k[len(HOST_CACHE_PREFIX):] | |
182 | if host not in self.mgr.inventory: | |
183 | self.mgr.log.warning('removing stray HostCache host record %s' % ( | |
184 | host)) | |
185 | self.mgr.set_store(k, None) | |
186 | try: | |
187 | j = json.loads(v) | |
188 | if 'last_device_update' in j: | |
189 | self.last_device_update[host] = datetime.datetime.strptime( | |
190 | j['last_device_update'], DATEFMT) | |
191 | else: | |
192 | self.device_refresh_queue.append(host) | |
193 | # for services, we ignore the persisted last_*_update | |
194 | # and always trigger a new scrape on mgr restart. | |
195 | self.daemon_refresh_queue.append(host) | |
196 | self.daemons[host] = {} | |
197 | self.osdspec_previews[host] = [] | |
198 | self.devices[host] = [] | |
199 | self.networks[host] = {} | |
200 | self.daemon_config_deps[host] = {} | |
201 | for name, d in j.get('daemons', {}).items(): | |
202 | self.daemons[host][name] = \ | |
203 | orchestrator.DaemonDescription.from_json(d) | |
204 | for d in j.get('devices', []): | |
205 | self.devices[host].append(inventory.Device.from_json(d)) | |
206 | self.networks[host] = j.get('networks', {}) | |
207 | self.osdspec_previews[host] = j.get('osdspec_previews', {}) | |
208 | ||
209 | for name, d in j.get('daemon_config_deps', {}).items(): | |
210 | self.daemon_config_deps[host][name] = { | |
211 | 'deps': d.get('deps', []), | |
212 | 'last_config': datetime.datetime.strptime( | |
213 | d['last_config'], DATEFMT), | |
214 | } | |
215 | if 'last_host_check' in j: | |
216 | self.last_host_check[host] = datetime.datetime.strptime( | |
217 | j['last_host_check'], DATEFMT) | |
218 | self.mgr.log.debug( | |
219 | 'HostCache.load: host %s has %d daemons, ' | |
220 | '%d devices, %d networks' % ( | |
221 | host, len(self.daemons[host]), len(self.devices[host]), | |
222 | len(self.networks[host]))) | |
223 | except Exception as e: | |
224 | self.mgr.log.warning('unable to load cached state for %s: %s' % ( | |
225 | host, e)) | |
226 | pass | |
227 | ||
228 | def update_host_daemons(self, host, dm): | |
229 | # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None | |
230 | self.daemons[host] = dm | |
231 | self.last_daemon_update[host] = datetime.datetime.utcnow() | |
232 | ||
233 | def update_host_devices_networks(self, host, dls, nets): | |
234 | # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None | |
235 | self.devices[host] = dls | |
236 | self.networks[host] = nets | |
237 | self.last_device_update[host] = datetime.datetime.utcnow() | |
238 | ||
239 | def update_daemon_config_deps(self, host, name, deps, stamp): | |
240 | self.daemon_config_deps[host][name] = { | |
241 | 'deps': deps, | |
242 | 'last_config': stamp, | |
243 | } | |
244 | ||
245 | def update_last_host_check(self, host): | |
246 | # type: (str) -> None | |
247 | self.last_host_check[host] = datetime.datetime.utcnow() | |
248 | ||
249 | def prime_empty_host(self, host): | |
250 | # type: (str) -> None | |
251 | """ | |
252 | Install an empty entry for a host | |
253 | """ | |
254 | self.daemons[host] = {} | |
255 | self.devices[host] = [] | |
256 | self.networks[host] = {} | |
257 | self.osdspec_previews[host] = [] | |
258 | self.daemon_config_deps[host] = {} | |
259 | self.daemon_refresh_queue.append(host) | |
260 | self.device_refresh_queue.append(host) | |
261 | self.osdspec_previews_refresh_queue.append(host) | |
262 | ||
263 | def invalidate_host_daemons(self, host): | |
264 | # type: (str) -> None | |
265 | self.daemon_refresh_queue.append(host) | |
266 | if host in self.last_daemon_update: | |
267 | del self.last_daemon_update[host] | |
268 | self.mgr.event.set() | |
269 | ||
270 | def invalidate_host_devices(self, host): | |
271 | # type: (str) -> None | |
272 | self.device_refresh_queue.append(host) | |
273 | if host in self.last_device_update: | |
274 | del self.last_device_update[host] | |
275 | self.mgr.event.set() | |
276 | ||
277 | def save_host(self, host): | |
278 | # type: (str) -> None | |
279 | j = { # type: ignore | |
280 | 'daemons': {}, | |
281 | 'devices': [], | |
282 | 'osdspec_previews': [], | |
283 | 'daemon_config_deps': {}, | |
284 | } | |
285 | if host in self.last_daemon_update: | |
286 | j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore | |
287 | if host in self.last_device_update: | |
288 | j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore | |
289 | for name, dd in self.daemons[host].items(): | |
290 | j['daemons'][name] = dd.to_json() # type: ignore | |
291 | for d in self.devices[host]: | |
292 | j['devices'].append(d.to_json()) # type: ignore | |
293 | j['networks'] = self.networks[host] | |
294 | for name, depi in self.daemon_config_deps[host].items(): | |
295 | j['daemon_config_deps'][name] = { # type: ignore | |
296 | 'deps': depi.get('deps', []), | |
297 | 'last_config': depi['last_config'].strftime(DATEFMT), | |
298 | } | |
299 | if self.osdspec_previews[host]: | |
300 | j['osdspec_previews'] = self.osdspec_previews[host] | |
301 | ||
302 | if host in self.last_host_check: | |
303 | j['last_host_check'] = self.last_host_check[host].strftime(DATEFMT) | |
304 | self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) | |
305 | ||
306 | def rm_host(self, host): | |
307 | # type: (str) -> None | |
308 | if host in self.daemons: | |
309 | del self.daemons[host] | |
310 | if host in self.devices: | |
311 | del self.devices[host] | |
312 | if host in self.osdspec_previews: | |
313 | del self.osdspec_previews[host] | |
314 | if host in self.loading_osdspec_preview: | |
315 | self.loading_osdspec_preview.remove(host) | |
316 | if host in self.networks: | |
317 | del self.networks[host] | |
318 | if host in self.last_daemon_update: | |
319 | del self.last_daemon_update[host] | |
320 | if host in self.last_device_update: | |
321 | del self.last_device_update[host] | |
322 | if host in self.daemon_config_deps: | |
323 | del self.daemon_config_deps[host] | |
324 | self.mgr.set_store(HOST_CACHE_PREFIX + host, None) | |
325 | ||
326 | def get_hosts(self): | |
327 | # type: () -> List[str] | |
328 | r = [] | |
329 | for host, di in self.daemons.items(): | |
330 | r.append(host) | |
331 | return r | |
332 | ||
333 | def get_daemons(self): | |
334 | # type: () -> List[orchestrator.DaemonDescription] | |
335 | r = [] | |
336 | for host, dm in self.daemons.items(): | |
337 | for name, dd in dm.items(): | |
338 | r.append(dd) | |
339 | return r | |
340 | ||
341 | def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: | |
342 | for host, dm in self.daemons.items(): | |
343 | if host in self.mgr.offline_hosts: | |
344 | def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: | |
345 | ret = copy(dd) | |
346 | ret.status = -1 | |
347 | ret.status_desc = 'host is offline' | |
348 | return ret | |
349 | yield host, {name: set_offline(d) for name, d in dm.items()} | |
350 | else: | |
351 | yield host, dm | |
352 | ||
353 | def get_daemons_by_service(self, service_name): | |
354 | # type: (str) -> List[orchestrator.DaemonDescription] | |
355 | result = [] # type: List[orchestrator.DaemonDescription] | |
356 | for host, dm in self.daemons.items(): | |
357 | for name, d in dm.items(): | |
358 | if name.startswith(service_name + '.'): | |
359 | result.append(d) | |
360 | return result | |
361 | ||
362 | def get_daemon_names(self): | |
363 | # type: () -> List[str] | |
364 | r = [] | |
365 | for host, dm in self.daemons.items(): | |
366 | for name, dd in dm.items(): | |
367 | r.append(name) | |
368 | return r | |
369 | ||
370 | def get_daemon_last_config_deps(self, host, name): | |
371 | if host in self.daemon_config_deps: | |
372 | if name in self.daemon_config_deps[host]: | |
373 | return self.daemon_config_deps[host][name].get('deps', []), \ | |
374 | self.daemon_config_deps[host][name].get('last_config', None) | |
375 | return None, None | |
376 | ||
377 | def host_needs_daemon_refresh(self, host): | |
378 | # type: (str) -> bool | |
379 | if host in self.mgr.offline_hosts: | |
380 | logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh') | |
381 | return False | |
382 | if host in self.daemon_refresh_queue: | |
383 | self.daemon_refresh_queue.remove(host) | |
384 | return True | |
385 | cutoff = datetime.datetime.utcnow() - datetime.timedelta( | |
386 | seconds=self.mgr.daemon_cache_timeout) | |
387 | if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff: | |
388 | return True | |
389 | return False | |
390 | ||
391 | def host_needs_device_refresh(self, host): | |
392 | # type: (str) -> bool | |
393 | if host in self.mgr.offline_hosts: | |
394 | logger.debug(f'Host "{host}" marked as offline. Skipping device refresh') | |
395 | return False | |
396 | if host in self.device_refresh_queue: | |
397 | self.device_refresh_queue.remove(host) | |
398 | return True | |
399 | cutoff = datetime.datetime.utcnow() - datetime.timedelta( | |
400 | seconds=self.mgr.device_cache_timeout) | |
401 | if host not in self.last_device_update or self.last_device_update[host] < cutoff: | |
402 | return True | |
403 | return False | |
404 | ||
405 | def host_needs_osdspec_preview_refresh(self, host): | |
406 | if host in self.mgr.offline_hosts: | |
407 | logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh') | |
408 | return False | |
409 | if host in self.osdspec_previews_refresh_queue: | |
410 | self.osdspec_previews_refresh_queue.remove(host) | |
411 | return True | |
412 | # Since this is dependent on other factors (device and spec) this does not need | |
413 | # to be updated periodically. | |
414 | return False | |
415 | ||
416 | def host_needs_check(self, host): | |
417 | # type: (str) -> bool | |
418 | cutoff = datetime.datetime.utcnow() - datetime.timedelta( | |
419 | seconds=self.mgr.host_check_interval) | |
420 | return host not in self.last_host_check or self.last_host_check[host] < cutoff | |
421 | ||
422 | def add_daemon(self, host, dd): | |
423 | # type: (str, orchestrator.DaemonDescription) -> None | |
424 | assert host in self.daemons | |
425 | self.daemons[host][dd.name()] = dd | |
426 | ||
427 | def rm_daemon(self, host, name): | |
428 | if host in self.daemons: | |
429 | if name in self.daemons[host]: | |
430 | del self.daemons[host][name] |