]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/cephadm/inventory.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / cephadm / inventory.py
CommitLineData
e306af50
TL
1import datetime
2from copy import copy
3import json
4import logging
5from typing import TYPE_CHECKING, Dict, List, Iterator, Optional, Any, Tuple, Set
6
7import six
8
9import orchestrator
10from ceph.deployment import inventory
11from ceph.deployment.service_spec import ServiceSpec
12from orchestrator import OrchestratorError, HostSpec
13
14if TYPE_CHECKING:
15 from .module import CephadmOrchestrator
16
17
18logger = logging.getLogger(__name__)
19
20HOST_CACHE_PREFIX = "host."
21SPEC_STORE_PREFIX = "spec."
22DATEFMT = '%Y-%m-%dT%H:%M:%S.%f'
23
24
25class Inventory:
26 def __init__(self, mgr: 'CephadmOrchestrator'):
27 self.mgr = mgr
28 # load inventory
29 i = self.mgr.get_store('inventory')
30 if i:
31 self._inventory: Dict[str, dict] = json.loads(i)
32 else:
33 self._inventory = dict()
34 logger.debug('Loaded inventory %s' % self._inventory)
35
36 def keys(self) -> List[str]:
37 return list(self._inventory.keys())
38
39 def __contains__(self, host: str) -> bool:
40 return host in self._inventory
41
42 def assert_host(self, host):
43 if host not in self._inventory:
44 raise OrchestratorError('host %s does not exist' % host)
45
46 def add_host(self, spec: HostSpec):
47 self._inventory[spec.hostname] = spec.to_json()
48 self.save()
49
50 def rm_host(self, host: str):
51 self.assert_host(host)
52 del self._inventory[host]
53 self.save()
54
55 def set_addr(self, host, addr):
56 self.assert_host(host)
57 self._inventory[host]['addr'] = addr
58 self.save()
59
60 def add_label(self, host, label):
61 self.assert_host(host)
62
63 if 'labels' not in self._inventory[host]:
64 self._inventory[host]['labels'] = list()
65 if label not in self._inventory[host]['labels']:
66 self._inventory[host]['labels'].append(label)
67 self.save()
68
69 def rm_label(self, host, label):
70 self.assert_host(host)
71
72 if 'labels' not in self._inventory[host]:
73 self._inventory[host]['labels'] = list()
74 if label in self._inventory[host]['labels']:
75 self._inventory[host]['labels'].remove(label)
76 self.save()
77
78 def get_addr(self, host) -> str:
79 self.assert_host(host)
80 return self._inventory[host].get('addr', host)
81
82 def filter_by_label(self, label: Optional[str] = '', as_hostspec: bool = False) -> Iterator:
83 for h, hostspec in self._inventory.items():
84 if not label or label in hostspec.get('labels', []):
85 if as_hostspec:
86 yield hostspec
87 yield h
88
89 def spec_from_dict(self, info):
90 hostname = info['hostname']
91 return HostSpec(
92 hostname,
93 addr=info.get('addr', hostname),
94 labels=info.get('labels', []),
95 status='Offline' if hostname in self.mgr.offline_hosts else info.get('status', ''),
96 )
97
98 def all_specs(self) -> Iterator[HostSpec]:
99 return map(self.spec_from_dict, self._inventory.values())
100
101 def save(self):
102 self.mgr.set_store('inventory', json.dumps(self._inventory))
103
104
105class SpecStore():
106 def __init__(self, mgr):
107 # type: (CephadmOrchestrator) -> None
108 self.mgr = mgr
109 self.specs = {} # type: Dict[str, ServiceSpec]
110 self.spec_created = {} # type: Dict[str, datetime.datetime]
111
112 def load(self):
113 # type: () -> None
114 for k, v in six.iteritems(self.mgr.get_store_prefix(SPEC_STORE_PREFIX)):
115 service_name = k[len(SPEC_STORE_PREFIX):]
116 try:
117 v = json.loads(v)
118 spec = ServiceSpec.from_json(v['spec'])
119 created = datetime.datetime.strptime(v['created'], DATEFMT)
120 self.specs[service_name] = spec
121 self.spec_created[service_name] = created
122 self.mgr.log.debug('SpecStore: loaded spec for %s' % (
123 service_name))
124 except Exception as e:
125 self.mgr.log.warning('unable to load spec for %s: %s' % (
126 service_name, e))
127 pass
128
129 def save(self, spec):
130 # type: (ServiceSpec) -> None
131 self.specs[spec.service_name()] = spec
132 self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
133 self.mgr.set_store(
134 SPEC_STORE_PREFIX + spec.service_name(),
135 json.dumps({
136 'spec': spec.to_json(),
137 'created': self.spec_created[spec.service_name()].strftime(DATEFMT),
138 }, sort_keys=True),
139 )
140
141 def rm(self, service_name):
142 # type: (str) -> bool
143 found = service_name in self.specs
144 if found:
145 del self.specs[service_name]
146 del self.spec_created[service_name]
147 self.mgr.set_store(SPEC_STORE_PREFIX + service_name, None)
148 return found
149
150 def find(self, service_name: Optional[str] = None) -> List[ServiceSpec]:
151 specs = []
152 for sn, spec in self.specs.items():
153 if not service_name or \
154 sn == service_name or \
155 sn.startswith(service_name + '.'):
156 specs.append(spec)
157 self.mgr.log.debug('SpecStore: find spec for %s returned: %s' % (
158 service_name, specs))
159 return specs
160
161class HostCache():
162 def __init__(self, mgr):
163 # type: (CephadmOrchestrator) -> None
164 self.mgr: CephadmOrchestrator = mgr
165 self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
166 self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
167 self.devices = {} # type: Dict[str, List[inventory.Device]]
168 self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
169 self.networks = {} # type: Dict[str, Dict[str, List[str]]]
170 self.last_device_update = {} # type: Dict[str, datetime.datetime]
171 self.daemon_refresh_queue = [] # type: List[str]
172 self.device_refresh_queue = [] # type: List[str]
173 self.osdspec_previews_refresh_queue = [] # type: List[str]
174 self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
175 self.last_host_check = {} # type: Dict[str, datetime.datetime]
176 self.loading_osdspec_preview = set() # type: Set[str]
177
178 def load(self):
179 # type: () -> None
180 for k, v in six.iteritems(self.mgr.get_store_prefix(HOST_CACHE_PREFIX)):
181 host = k[len(HOST_CACHE_PREFIX):]
182 if host not in self.mgr.inventory:
183 self.mgr.log.warning('removing stray HostCache host record %s' % (
184 host))
185 self.mgr.set_store(k, None)
186 try:
187 j = json.loads(v)
188 if 'last_device_update' in j:
189 self.last_device_update[host] = datetime.datetime.strptime(
190 j['last_device_update'], DATEFMT)
191 else:
192 self.device_refresh_queue.append(host)
193 # for services, we ignore the persisted last_*_update
194 # and always trigger a new scrape on mgr restart.
195 self.daemon_refresh_queue.append(host)
196 self.daemons[host] = {}
197 self.osdspec_previews[host] = []
198 self.devices[host] = []
199 self.networks[host] = {}
200 self.daemon_config_deps[host] = {}
201 for name, d in j.get('daemons', {}).items():
202 self.daemons[host][name] = \
203 orchestrator.DaemonDescription.from_json(d)
204 for d in j.get('devices', []):
205 self.devices[host].append(inventory.Device.from_json(d))
206 self.networks[host] = j.get('networks', {})
207 self.osdspec_previews[host] = j.get('osdspec_previews', {})
208
209 for name, d in j.get('daemon_config_deps', {}).items():
210 self.daemon_config_deps[host][name] = {
211 'deps': d.get('deps', []),
212 'last_config': datetime.datetime.strptime(
213 d['last_config'], DATEFMT),
214 }
215 if 'last_host_check' in j:
216 self.last_host_check[host] = datetime.datetime.strptime(
217 j['last_host_check'], DATEFMT)
218 self.mgr.log.debug(
219 'HostCache.load: host %s has %d daemons, '
220 '%d devices, %d networks' % (
221 host, len(self.daemons[host]), len(self.devices[host]),
222 len(self.networks[host])))
223 except Exception as e:
224 self.mgr.log.warning('unable to load cached state for %s: %s' % (
225 host, e))
226 pass
227
228 def update_host_daemons(self, host, dm):
229 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
230 self.daemons[host] = dm
231 self.last_daemon_update[host] = datetime.datetime.utcnow()
232
233 def update_host_devices_networks(self, host, dls, nets):
234 # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
235 self.devices[host] = dls
236 self.networks[host] = nets
237 self.last_device_update[host] = datetime.datetime.utcnow()
238
239 def update_daemon_config_deps(self, host, name, deps, stamp):
240 self.daemon_config_deps[host][name] = {
241 'deps': deps,
242 'last_config': stamp,
243 }
244
245 def update_last_host_check(self, host):
246 # type: (str) -> None
247 self.last_host_check[host] = datetime.datetime.utcnow()
248
249 def prime_empty_host(self, host):
250 # type: (str) -> None
251 """
252 Install an empty entry for a host
253 """
254 self.daemons[host] = {}
255 self.devices[host] = []
256 self.networks[host] = {}
257 self.osdspec_previews[host] = []
258 self.daemon_config_deps[host] = {}
259 self.daemon_refresh_queue.append(host)
260 self.device_refresh_queue.append(host)
261 self.osdspec_previews_refresh_queue.append(host)
262
263 def invalidate_host_daemons(self, host):
264 # type: (str) -> None
265 self.daemon_refresh_queue.append(host)
266 if host in self.last_daemon_update:
267 del self.last_daemon_update[host]
268 self.mgr.event.set()
269
270 def invalidate_host_devices(self, host):
271 # type: (str) -> None
272 self.device_refresh_queue.append(host)
273 if host in self.last_device_update:
274 del self.last_device_update[host]
275 self.mgr.event.set()
276
277 def save_host(self, host):
278 # type: (str) -> None
279 j = { # type: ignore
280 'daemons': {},
281 'devices': [],
282 'osdspec_previews': [],
283 'daemon_config_deps': {},
284 }
285 if host in self.last_daemon_update:
286 j['last_daemon_update'] = self.last_daemon_update[host].strftime(DATEFMT) # type: ignore
287 if host in self.last_device_update:
288 j['last_device_update'] = self.last_device_update[host].strftime(DATEFMT) # type: ignore
289 for name, dd in self.daemons[host].items():
290 j['daemons'][name] = dd.to_json() # type: ignore
291 for d in self.devices[host]:
292 j['devices'].append(d.to_json()) # type: ignore
293 j['networks'] = self.networks[host]
294 for name, depi in self.daemon_config_deps[host].items():
295 j['daemon_config_deps'][name] = { # type: ignore
296 'deps': depi.get('deps', []),
297 'last_config': depi['last_config'].strftime(DATEFMT),
298 }
299 if self.osdspec_previews[host]:
300 j['osdspec_previews'] = self.osdspec_previews[host]
301
302 if host in self.last_host_check:
303 j['last_host_check'] = self.last_host_check[host].strftime(DATEFMT)
304 self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
305
306 def rm_host(self, host):
307 # type: (str) -> None
308 if host in self.daemons:
309 del self.daemons[host]
310 if host in self.devices:
311 del self.devices[host]
312 if host in self.osdspec_previews:
313 del self.osdspec_previews[host]
314 if host in self.loading_osdspec_preview:
315 self.loading_osdspec_preview.remove(host)
316 if host in self.networks:
317 del self.networks[host]
318 if host in self.last_daemon_update:
319 del self.last_daemon_update[host]
320 if host in self.last_device_update:
321 del self.last_device_update[host]
322 if host in self.daemon_config_deps:
323 del self.daemon_config_deps[host]
324 self.mgr.set_store(HOST_CACHE_PREFIX + host, None)
325
326 def get_hosts(self):
327 # type: () -> List[str]
328 r = []
329 for host, di in self.daemons.items():
330 r.append(host)
331 return r
332
333 def get_daemons(self):
334 # type: () -> List[orchestrator.DaemonDescription]
335 r = []
336 for host, dm in self.daemons.items():
337 for name, dd in dm.items():
338 r.append(dd)
339 return r
340
341 def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
342 for host, dm in self.daemons.items():
343 if host in self.mgr.offline_hosts:
344 def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
345 ret = copy(dd)
346 ret.status = -1
347 ret.status_desc = 'host is offline'
348 return ret
349 yield host, {name: set_offline(d) for name, d in dm.items()}
350 else:
351 yield host, dm
352
353 def get_daemons_by_service(self, service_name):
354 # type: (str) -> List[orchestrator.DaemonDescription]
355 result = [] # type: List[orchestrator.DaemonDescription]
356 for host, dm in self.daemons.items():
357 for name, d in dm.items():
358 if name.startswith(service_name + '.'):
359 result.append(d)
360 return result
361
362 def get_daemon_names(self):
363 # type: () -> List[str]
364 r = []
365 for host, dm in self.daemons.items():
366 for name, dd in dm.items():
367 r.append(name)
368 return r
369
370 def get_daemon_last_config_deps(self, host, name):
371 if host in self.daemon_config_deps:
372 if name in self.daemon_config_deps[host]:
373 return self.daemon_config_deps[host][name].get('deps', []), \
374 self.daemon_config_deps[host][name].get('last_config', None)
375 return None, None
376
377 def host_needs_daemon_refresh(self, host):
378 # type: (str) -> bool
379 if host in self.mgr.offline_hosts:
380 logger.debug(f'Host "{host}" marked as offline. Skipping daemon refresh')
381 return False
382 if host in self.daemon_refresh_queue:
383 self.daemon_refresh_queue.remove(host)
384 return True
385 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
386 seconds=self.mgr.daemon_cache_timeout)
387 if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
388 return True
389 return False
390
391 def host_needs_device_refresh(self, host):
392 # type: (str) -> bool
393 if host in self.mgr.offline_hosts:
394 logger.debug(f'Host "{host}" marked as offline. Skipping device refresh')
395 return False
396 if host in self.device_refresh_queue:
397 self.device_refresh_queue.remove(host)
398 return True
399 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
400 seconds=self.mgr.device_cache_timeout)
401 if host not in self.last_device_update or self.last_device_update[host] < cutoff:
402 return True
403 return False
404
405 def host_needs_osdspec_preview_refresh(self, host):
406 if host in self.mgr.offline_hosts:
407 logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
408 return False
409 if host in self.osdspec_previews_refresh_queue:
410 self.osdspec_previews_refresh_queue.remove(host)
411 return True
412 # Since this is dependent on other factors (device and spec) this does not need
413 # to be updated periodically.
414 return False
415
416 def host_needs_check(self, host):
417 # type: (str) -> bool
418 cutoff = datetime.datetime.utcnow() - datetime.timedelta(
419 seconds=self.mgr.host_check_interval)
420 return host not in self.last_host_check or self.last_host_check[host] < cutoff
421
422 def add_daemon(self, host, dd):
423 # type: (str, orchestrator.DaemonDescription) -> None
424 assert host in self.daemons
425 self.daemons[host][dd.name()] = dd
426
427 def rm_daemon(self, host, name):
428 if host in self.daemons:
429 if name in self.daemons[host]:
430 del self.daemons[host][name]