5 from typing
import TYPE_CHECKING
, Dict
, List
, Iterator
, Optional
, Any
, Tuple
, Set
10 from ceph
.deployment
import inventory
11 from ceph
.deployment
.service_spec
import ServiceSpec
12 from orchestrator
import OrchestratorError
, HostSpec
15 from .module
import CephadmOrchestrator
18 logger
= logging
.getLogger(__name__
)
20 HOST_CACHE_PREFIX
= "host."
21 SPEC_STORE_PREFIX
= "spec."
22 DATEFMT
= '%Y-%m-%dT%H:%M:%S.%f'
26 def __init__(self
, mgr
: 'CephadmOrchestrator'):
29 i
= self
.mgr
.get_store('inventory')
31 self
._inventory
: Dict
[str, dict] = json
.loads(i
)
33 self
._inventory
= dict()
34 logger
.debug('Loaded inventory %s' % self
._inventory
)
36 def keys(self
) -> List
[str]:
37 return list(self
._inventory
.keys())
39 def __contains__(self
, host
: str) -> bool:
40 return host
in self
._inventory
42 def assert_host(self
, host
):
43 if host
not in self
._inventory
:
44 raise OrchestratorError('host %s does not exist' % host
)
46 def add_host(self
, spec
: HostSpec
):
47 self
._inventory
[spec
.hostname
] = spec
.to_json()
50 def rm_host(self
, host
: str):
51 self
.assert_host(host
)
52 del self
._inventory
[host
]
55 def set_addr(self
, host
, addr
):
56 self
.assert_host(host
)
57 self
._inventory
[host
]['addr'] = addr
60 def add_label(self
, host
, label
):
61 self
.assert_host(host
)
63 if 'labels' not in self
._inventory
[host
]:
64 self
._inventory
[host
]['labels'] = list()
65 if label
not in self
._inventory
[host
]['labels']:
66 self
._inventory
[host
]['labels'].append(label
)
69 def rm_label(self
, host
, label
):
70 self
.assert_host(host
)
72 if 'labels' not in self
._inventory
[host
]:
73 self
._inventory
[host
]['labels'] = list()
74 if label
in self
._inventory
[host
]['labels']:
75 self
._inventory
[host
]['labels'].remove(label
)
78 def get_addr(self
, host
) -> str:
79 self
.assert_host(host
)
80 return self
._inventory
[host
].get('addr', host
)
82 def filter_by_label(self
, label
: Optional
[str] = '', as_hostspec
: bool = False) -> Iterator
:
83 for h
, hostspec
in self
._inventory
.items():
84 if not label
or label
in hostspec
.get('labels', []):
89 def spec_from_dict(self
, info
):
90 hostname
= info
['hostname']
93 addr
=info
.get('addr', hostname
),
94 labels
=info
.get('labels', []),
95 status
='Offline' if hostname
in self
.mgr
.offline_hosts
else info
.get('status', ''),
98 def all_specs(self
) -> Iterator
[HostSpec
]:
99 return map(self
.spec_from_dict
, self
._inventory
.values())
102 self
.mgr
.set_store('inventory', json
.dumps(self
._inventory
))
106 def __init__(self
, mgr
):
107 # type: (CephadmOrchestrator) -> None
109 self
.specs
= {} # type: Dict[str, ServiceSpec]
110 self
.spec_created
= {} # type: Dict[str, datetime.datetime]
114 for k
, v
in six
.iteritems(self
.mgr
.get_store_prefix(SPEC_STORE_PREFIX
)):
115 service_name
= k
[len(SPEC_STORE_PREFIX
):]
118 spec
= ServiceSpec
.from_json(v
['spec'])
119 created
= datetime
.datetime
.strptime(v
['created'], DATEFMT
)
120 self
.specs
[service_name
] = spec
121 self
.spec_created
[service_name
] = created
122 self
.mgr
.log
.debug('SpecStore: loaded spec for %s' % (
124 except Exception as e
:
125 self
.mgr
.log
.warning('unable to load spec for %s: %s' % (
129 def save(self
, spec
):
130 # type: (ServiceSpec) -> None
131 self
.specs
[spec
.service_name()] = spec
132 self
.spec_created
[spec
.service_name()] = datetime
.datetime
.utcnow()
134 SPEC_STORE_PREFIX
+ spec
.service_name(),
136 'spec': spec
.to_json(),
137 'created': self
.spec_created
[spec
.service_name()].strftime(DATEFMT
),
141 def rm(self
, service_name
):
142 # type: (str) -> bool
143 found
= service_name
in self
.specs
145 del self
.specs
[service_name
]
146 del self
.spec_created
[service_name
]
147 self
.mgr
.set_store(SPEC_STORE_PREFIX
+ service_name
, None)
150 def find(self
, service_name
: Optional
[str] = None) -> List
[ServiceSpec
]:
152 for sn
, spec
in self
.specs
.items():
153 if not service_name
or \
154 sn
== service_name
or \
155 sn
.startswith(service_name
+ '.'):
157 self
.mgr
.log
.debug('SpecStore: find spec for %s returned: %s' % (
158 service_name
, specs
))
162 def __init__(self
, mgr
):
163 # type: (CephadmOrchestrator) -> None
164 self
.mgr
: CephadmOrchestrator
= mgr
165 self
.daemons
= {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
166 self
.last_daemon_update
= {} # type: Dict[str, datetime.datetime]
167 self
.devices
= {} # type: Dict[str, List[inventory.Device]]
168 self
.osdspec_previews
= {} # type: Dict[str, List[Dict[str, Any]]]
169 self
.networks
= {} # type: Dict[str, Dict[str, List[str]]]
170 self
.last_device_update
= {} # type: Dict[str, datetime.datetime]
171 self
.daemon_refresh_queue
= [] # type: List[str]
172 self
.device_refresh_queue
= [] # type: List[str]
173 self
.osdspec_previews_refresh_queue
= [] # type: List[str]
174 self
.daemon_config_deps
= {} # type: Dict[str, Dict[str, Dict[str,Any]]]
175 self
.last_host_check
= {} # type: Dict[str, datetime.datetime]
176 self
.loading_osdspec_preview
= set() # type: Set[str]
180 for k
, v
in six
.iteritems(self
.mgr
.get_store_prefix(HOST_CACHE_PREFIX
)):
181 host
= k
[len(HOST_CACHE_PREFIX
):]
182 if host
not in self
.mgr
.inventory
:
183 self
.mgr
.log
.warning('removing stray HostCache host record %s' % (
185 self
.mgr
.set_store(k
, None)
188 if 'last_device_update' in j
:
189 self
.last_device_update
[host
] = datetime
.datetime
.strptime(
190 j
['last_device_update'], DATEFMT
)
192 self
.device_refresh_queue
.append(host
)
193 # for services, we ignore the persisted last_*_update
194 # and always trigger a new scrape on mgr restart.
195 self
.daemon_refresh_queue
.append(host
)
196 self
.daemons
[host
] = {}
197 self
.osdspec_previews
[host
] = []
198 self
.devices
[host
] = []
199 self
.networks
[host
] = {}
200 self
.daemon_config_deps
[host
] = {}
201 for name
, d
in j
.get('daemons', {}).items():
202 self
.daemons
[host
][name
] = \
203 orchestrator
.DaemonDescription
.from_json(d
)
204 for d
in j
.get('devices', []):
205 self
.devices
[host
].append(inventory
.Device
.from_json(d
))
206 self
.networks
[host
] = j
.get('networks', {})
207 self
.osdspec_previews
[host
] = j
.get('osdspec_previews', {})
209 for name
, d
in j
.get('daemon_config_deps', {}).items():
210 self
.daemon_config_deps
[host
][name
] = {
211 'deps': d
.get('deps', []),
212 'last_config': datetime
.datetime
.strptime(
213 d
['last_config'], DATEFMT
),
215 if 'last_host_check' in j
:
216 self
.last_host_check
[host
] = datetime
.datetime
.strptime(
217 j
['last_host_check'], DATEFMT
)
219 'HostCache.load: host %s has %d daemons, '
220 '%d devices, %d networks' % (
221 host
, len(self
.daemons
[host
]), len(self
.devices
[host
]),
222 len(self
.networks
[host
])))
223 except Exception as e
:
224 self
.mgr
.log
.warning('unable to load cached state for %s: %s' % (
228 def update_host_daemons(self
, host
, dm
):
229 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
230 self
.daemons
[host
] = dm
231 self
.last_daemon_update
[host
] = datetime
.datetime
.utcnow()
233 def update_host_devices_networks(self
, host
, dls
, nets
):
234 # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
235 self
.devices
[host
] = dls
236 self
.networks
[host
] = nets
237 self
.last_device_update
[host
] = datetime
.datetime
.utcnow()
239 def update_daemon_config_deps(self
, host
, name
, deps
, stamp
):
240 self
.daemon_config_deps
[host
][name
] = {
242 'last_config': stamp
,
245 def update_last_host_check(self
, host
):
246 # type: (str) -> None
247 self
.last_host_check
[host
] = datetime
.datetime
.utcnow()
249 def prime_empty_host(self
, host
):
250 # type: (str) -> None
252 Install an empty entry for a host
254 self
.daemons
[host
] = {}
255 self
.devices
[host
] = []
256 self
.networks
[host
] = {}
257 self
.osdspec_previews
[host
] = []
258 self
.daemon_config_deps
[host
] = {}
259 self
.daemon_refresh_queue
.append(host
)
260 self
.device_refresh_queue
.append(host
)
261 self
.osdspec_previews_refresh_queue
.append(host
)
263 def invalidate_host_daemons(self
, host
):
264 # type: (str) -> None
265 self
.daemon_refresh_queue
.append(host
)
266 if host
in self
.last_daemon_update
:
267 del self
.last_daemon_update
[host
]
270 def invalidate_host_devices(self
, host
):
271 # type: (str) -> None
272 self
.device_refresh_queue
.append(host
)
273 if host
in self
.last_device_update
:
274 del self
.last_device_update
[host
]
277 def save_host(self
, host
):
278 # type: (str) -> None
282 'osdspec_previews': [],
283 'daemon_config_deps': {},
285 if host
in self
.last_daemon_update
:
286 j
['last_daemon_update'] = self
.last_daemon_update
[host
].strftime(DATEFMT
) # type: ignore
287 if host
in self
.last_device_update
:
288 j
['last_device_update'] = self
.last_device_update
[host
].strftime(DATEFMT
) # type: ignore
289 for name
, dd
in self
.daemons
[host
].items():
290 j
['daemons'][name
] = dd
.to_json() # type: ignore
291 for d
in self
.devices
[host
]:
292 j
['devices'].append(d
.to_json()) # type: ignore
293 j
['networks'] = self
.networks
[host
]
294 for name
, depi
in self
.daemon_config_deps
[host
].items():
295 j
['daemon_config_deps'][name
] = { # type: ignore
296 'deps': depi
.get('deps', []),
297 'last_config': depi
['last_config'].strftime(DATEFMT
),
299 if self
.osdspec_previews
[host
]:
300 j
['osdspec_previews'] = self
.osdspec_previews
[host
]
302 if host
in self
.last_host_check
:
303 j
['last_host_check'] = self
.last_host_check
[host
].strftime(DATEFMT
)
304 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, json
.dumps(j
))
306 def rm_host(self
, host
):
307 # type: (str) -> None
308 if host
in self
.daemons
:
309 del self
.daemons
[host
]
310 if host
in self
.devices
:
311 del self
.devices
[host
]
312 if host
in self
.osdspec_previews
:
313 del self
.osdspec_previews
[host
]
314 if host
in self
.loading_osdspec_preview
:
315 self
.loading_osdspec_preview
.remove(host
)
316 if host
in self
.networks
:
317 del self
.networks
[host
]
318 if host
in self
.last_daemon_update
:
319 del self
.last_daemon_update
[host
]
320 if host
in self
.last_device_update
:
321 del self
.last_device_update
[host
]
322 if host
in self
.daemon_config_deps
:
323 del self
.daemon_config_deps
[host
]
324 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, None)
327 # type: () -> List[str]
329 for host
, di
in self
.daemons
.items():
333 def get_daemons(self
):
334 # type: () -> List[orchestrator.DaemonDescription]
336 for host
, dm
in self
.daemons
.items():
337 for name
, dd
in dm
.items():
341 def get_daemons_with_volatile_status(self
) -> Iterator
[Tuple
[str, Dict
[str, orchestrator
.DaemonDescription
]]]:
342 for host
, dm
in self
.daemons
.items():
343 if host
in self
.mgr
.offline_hosts
:
344 def set_offline(dd
: orchestrator
.DaemonDescription
) -> orchestrator
.DaemonDescription
:
347 ret
.status_desc
= 'host is offline'
349 yield host
, {name
: set_offline(d
) for name
, d
in dm
.items()}
353 def get_daemons_by_service(self
, service_name
):
354 # type: (str) -> List[orchestrator.DaemonDescription]
355 result
= [] # type: List[orchestrator.DaemonDescription]
356 for host
, dm
in self
.daemons
.items():
357 for name
, d
in dm
.items():
358 if name
.startswith(service_name
+ '.'):
362 def get_daemon_names(self
):
363 # type: () -> List[str]
365 for host
, dm
in self
.daemons
.items():
366 for name
, dd
in dm
.items():
370 def get_daemon_last_config_deps(self
, host
, name
):
371 if host
in self
.daemon_config_deps
:
372 if name
in self
.daemon_config_deps
[host
]:
373 return self
.daemon_config_deps
[host
][name
].get('deps', []), \
374 self
.daemon_config_deps
[host
][name
].get('last_config', None)
377 def host_needs_daemon_refresh(self
, host
):
378 # type: (str) -> bool
379 if host
in self
.mgr
.offline_hosts
:
380 logger
.debug(f
'Host "{host}" marked as offline. Skipping daemon refresh')
382 if host
in self
.daemon_refresh_queue
:
383 self
.daemon_refresh_queue
.remove(host
)
385 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
386 seconds
=self
.mgr
.daemon_cache_timeout
)
387 if host
not in self
.last_daemon_update
or self
.last_daemon_update
[host
] < cutoff
:
391 def host_needs_device_refresh(self
, host
):
392 # type: (str) -> bool
393 if host
in self
.mgr
.offline_hosts
:
394 logger
.debug(f
'Host "{host}" marked as offline. Skipping device refresh')
396 if host
in self
.device_refresh_queue
:
397 self
.device_refresh_queue
.remove(host
)
399 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
400 seconds
=self
.mgr
.device_cache_timeout
)
401 if host
not in self
.last_device_update
or self
.last_device_update
[host
] < cutoff
:
405 def host_needs_osdspec_preview_refresh(self
, host
):
406 if host
in self
.mgr
.offline_hosts
:
407 logger
.debug(f
'Host "{host}" marked as offline. Skipping osdspec preview refresh')
409 if host
in self
.osdspec_previews_refresh_queue
:
410 self
.osdspec_previews_refresh_queue
.remove(host
)
412 # Since this is dependent on other factors (device and spec) this does not need
413 # to be updated periodically.
416 def host_needs_check(self
, host
):
417 # type: (str) -> bool
418 cutoff
= datetime
.datetime
.utcnow() - datetime
.timedelta(
419 seconds
=self
.mgr
.host_check_interval
)
420 return host
not in self
.last_host_check
or self
.last_host_check
[host
] < cutoff
422 def add_daemon(self
, host
, dd
):
423 # type: (str, orchestrator.DaemonDescription) -> None
424 assert host
in self
.daemons
425 self
.daemons
[host
][dd
.name()] = dd
427 def rm_daemon(self
, host
, name
):
428 if host
in self
.daemons
:
429 if name
in self
.daemons
[host
]:
430 del self
.daemons
[host
][name
]