7 from typing
import TYPE_CHECKING
, Dict
, List
, Iterator
, Optional
, Any
, Tuple
, Set
, Mapping
, cast
, \
11 from ceph
.deployment
import inventory
12 from ceph
.deployment
.service_spec
import ServiceSpec
, PlacementSpec
13 from ceph
.utils
import str_to_datetime
, datetime_to_str
, datetime_now
14 from orchestrator
import OrchestratorError
, HostSpec
, OrchestratorEvent
, service_to_daemon_types
16 from .utils
import resolve_ip
19 from .module
import CephadmOrchestrator
22 logger
= logging
.getLogger(__name__
)
24 HOST_CACHE_PREFIX
= "host."
25 SPEC_STORE_PREFIX
= "spec."
30 The inventory stores a HostSpec for all hosts persistently.
33 def __init__(self
, mgr
: 'CephadmOrchestrator'):
35 adjusted_addrs
= False
37 def is_valid_ip(ip
: str) -> bool:
39 ipaddress
.ip_address(ip
)
45 i
= self
.mgr
.get_store('inventory')
47 self
._inventory
: Dict
[str, dict] = json
.loads(i
)
48 # handle old clusters missing 'hostname' key from hostspec
49 for k
, v
in self
._inventory
.items():
50 if 'hostname' not in v
:
53 # convert legacy non-IP addr?
54 if is_valid_ip(str(v
.get('addr'))):
56 if len(self
._inventory
) > 1:
57 if k
== socket
.gethostname():
58 # Never try to resolve our own host! This is
59 # fraught and can lead to either a loopback
60 # address (due to podman's futzing with
61 # /etc/hosts) or a private IP based on the CNI
62 # configuration. Instead, wait until the mgr
63 # fails over to another host and let them resolve
66 ip
= resolve_ip(cast(str, v
.get('addr')))
68 # we only have 1 node in the cluster, so we can't
69 # rely on another host doing the lookup. use the
70 # IP the mgr binds to.
71 ip
= self
.mgr
.get_mgr_ip()
72 if is_valid_ip(ip
) and not ip
.startswith('127.0.'):
74 f
"inventory: adjusted host {v['hostname']} addr '{v['addr']}' -> '{ip}'"
81 self
._inventory
= dict()
82 logger
.debug('Loaded inventory %s' % self
._inventory
)
84 def keys(self
) -> List
[str]:
85 return list(self
._inventory
.keys())
87 def __contains__(self
, host
: str) -> bool:
88 return host
in self
._inventory
90 def assert_host(self
, host
: str) -> None:
91 if host
not in self
._inventory
:
92 raise OrchestratorError('host %s does not exist' % host
)
94 def add_host(self
, spec
: HostSpec
) -> None:
95 self
._inventory
[spec
.hostname
] = spec
.to_json()
98 def rm_host(self
, host
: str) -> None:
99 self
.assert_host(host
)
100 del self
._inventory
[host
]
103 def set_addr(self
, host
: str, addr
: str) -> None:
104 self
.assert_host(host
)
105 self
._inventory
[host
]['addr'] = addr
108 def add_label(self
, host
: str, label
: str) -> None:
109 self
.assert_host(host
)
111 if 'labels' not in self
._inventory
[host
]:
112 self
._inventory
[host
]['labels'] = list()
113 if label
not in self
._inventory
[host
]['labels']:
114 self
._inventory
[host
]['labels'].append(label
)
117 def rm_label(self
, host
: str, label
: str) -> None:
118 self
.assert_host(host
)
120 if 'labels' not in self
._inventory
[host
]:
121 self
._inventory
[host
]['labels'] = list()
122 if label
in self
._inventory
[host
]['labels']:
123 self
._inventory
[host
]['labels'].remove(label
)
126 def has_label(self
, host
: str, label
: str) -> bool:
128 host
in self
._inventory
129 and label
in self
._inventory
[host
].get('labels', [])
132 def get_addr(self
, host
: str) -> str:
133 self
.assert_host(host
)
134 return self
._inventory
[host
].get('addr', host
)
136 def filter_by_label(self
, label
: Optional
[str] = '', as_hostspec
: bool = False) -> Iterator
:
137 for h
, hostspec
in self
._inventory
.items():
138 if not label
or label
in hostspec
.get('labels', []):
140 yield self
.spec_from_dict(hostspec
)
144 def spec_from_dict(self
, info
: dict) -> HostSpec
:
145 hostname
= info
['hostname']
148 addr
=info
.get('addr', hostname
),
149 labels
=info
.get('labels', []),
150 status
='Offline' if hostname
in self
.mgr
.offline_hosts
else info
.get('status', ''),
153 def all_specs(self
) -> List
[HostSpec
]:
154 return list(map(self
.spec_from_dict
, self
._inventory
.values()))
156 def get_host_with_state(self
, state
: str = "") -> List
[str]:
157 """return a list of host names in a specific state"""
158 return [h
for h
in self
._inventory
if self
._inventory
[h
].get("status", "").lower() == state
]
160 def save(self
) -> None:
161 self
.mgr
.set_store('inventory', json
.dumps(self
._inventory
))
164 class SpecDescription(NamedTuple
):
166 rank_map
: Optional
[Dict
[int, Dict
[int, Optional
[str]]]]
167 created
: datetime
.datetime
168 deleted
: Optional
[datetime
.datetime
]
172 def __init__(self
, mgr
):
173 # type: (CephadmOrchestrator) -> None
175 self
._specs
= {} # type: Dict[str, ServiceSpec]
176 # service_name -> rank -> gen -> daemon_id
177 self
._rank
_maps
= {} # type: Dict[str, Dict[int, Dict[int, Optional[str]]]]
178 self
.spec_created
= {} # type: Dict[str, datetime.datetime]
179 self
.spec_deleted
= {} # type: Dict[str, datetime.datetime]
180 self
.spec_preview
= {} # type: Dict[str, ServiceSpec]
183 def all_specs(self
) -> Mapping
[str, ServiceSpec
]:
185 returns active and deleted specs. Returns read-only dict.
189 def __contains__(self
, name
: str) -> bool:
190 return name
in self
._specs
192 def __getitem__(self
, name
: str) -> SpecDescription
:
193 if name
not in self
._specs
:
194 raise OrchestratorError(f
'Service {name} not found.')
195 return SpecDescription(self
._specs
[name
],
196 self
._rank
_maps
.get(name
),
197 self
.spec_created
[name
],
198 self
.spec_deleted
.get(name
, None))
201 def active_specs(self
) -> Mapping
[str, ServiceSpec
]:
202 return {k
: v
for k
, v
in self
._specs
.items() if k
not in self
.spec_deleted
}
206 for k
, v
in self
.mgr
.get_store_prefix(SPEC_STORE_PREFIX
).items():
207 service_name
= k
[len(SPEC_STORE_PREFIX
):]
209 j
= cast(Dict
[str, dict], json
.loads(v
))
210 spec
= ServiceSpec
.from_json(j
['spec'])
211 created
= str_to_datetime(cast(str, j
['created']))
212 self
._specs
[service_name
] = spec
213 self
.spec_created
[service_name
] = created
216 deleted
= str_to_datetime(cast(str, j
['deleted']))
217 self
.spec_deleted
[service_name
] = deleted
219 if 'rank_map' in j
and isinstance(j
['rank_map'], dict):
220 self
._rank
_maps
[service_name
] = {}
221 for rank_str
, m
in j
['rank_map'].items():
225 logger
.exception(f
"failed to parse rank in {j['rank_map']}")
227 if isinstance(m
, dict):
228 self
._rank
_maps
[service_name
][rank
] = {}
229 for gen_str
, name
in m
.items():
233 logger
.exception(f
"failed to parse gen in {j['rank_map']}")
235 if isinstance(name
, str) or m
is None:
236 self
._rank
_maps
[service_name
][rank
][gen
] = name
238 self
.mgr
.log
.debug('SpecStore: loaded spec for %s' % (
240 except Exception as e
:
241 self
.mgr
.log
.warning('unable to load spec for %s: %s' % (
248 update_create
: bool = True,
250 name
= spec
.service_name()
251 if spec
.preview_only
:
252 self
.spec_preview
[name
] = spec
254 self
._specs
[name
] = spec
257 self
.spec_created
[name
] = datetime_now()
260 def save_rank_map(self
,
262 rank_map
: Dict
[int, Dict
[int, Optional
[str]]]) -> None:
263 self
._rank
_maps
[name
] = rank_map
266 def _save(self
, name
: str) -> None:
267 data
: Dict
[str, Any
] = {
268 'spec': self
._specs
[name
].to_json(),
269 'created': datetime_to_str(self
.spec_created
[name
]),
271 if name
in self
._rank
_maps
:
272 data
['rank_map'] = self
._rank
_maps
[name
]
273 if name
in self
.spec_deleted
:
274 data
['deleted'] = datetime_to_str(self
.spec_deleted
[name
])
277 SPEC_STORE_PREFIX
+ name
,
278 json
.dumps(data
, sort_keys
=True),
280 self
.mgr
.events
.for_service(self
._specs
[name
],
281 OrchestratorEvent
.INFO
,
282 'service was created')
284 def rm(self
, service_name
: str) -> bool:
285 if service_name
not in self
._specs
:
288 if self
._specs
[service_name
].preview_only
:
289 self
.finally_rm(service_name
)
292 self
.spec_deleted
[service_name
] = datetime_now()
293 self
.save(self
._specs
[service_name
], update_create
=False)
296 def finally_rm(self
, service_name
):
297 # type: (str) -> bool
298 found
= service_name
in self
._specs
300 del self
._specs
[service_name
]
301 if service_name
in self
._rank
_maps
:
302 del self
._rank
_maps
[service_name
]
303 del self
.spec_created
[service_name
]
304 if service_name
in self
.spec_deleted
:
305 del self
.spec_deleted
[service_name
]
306 self
.mgr
.set_store(SPEC_STORE_PREFIX
+ service_name
, None)
309 def get_created(self
, spec
: ServiceSpec
) -> Optional
[datetime
.datetime
]:
310 return self
.spec_created
.get(spec
.service_name())
313 class ClientKeyringSpec(object):
315 A client keyring file that we should maintain
321 placement
: PlacementSpec
,
322 mode
: Optional
[int] = None,
323 uid
: Optional
[int] = None,
324 gid
: Optional
[int] = None,
327 self
.placement
= placement
328 self
.mode
= mode
or 0o600
332 def validate(self
) -> None:
335 def to_json(self
) -> Dict
[str, Any
]:
337 'entity': self
.entity
,
338 'placement': self
.placement
.to_json(),
345 def path(self
) -> str:
346 return f
'/etc/ceph/ceph.{self.entity}.keyring'
349 def from_json(cls
: Type
, data
: dict) -> 'ClientKeyringSpec':
352 c
['placement'] = PlacementSpec
.from_json(c
['placement'])
358 class ClientKeyringStore():
360 Track client keyring files that we are supposed to maintain
363 def __init__(self
, mgr
):
364 # type: (CephadmOrchestrator) -> None
365 self
.mgr
: CephadmOrchestrator
= mgr
367 self
.keys
: Dict
[str, ClientKeyringSpec
] = {}
369 def load(self
) -> None:
370 c
= self
.mgr
.get_store('client_keyrings') or b
'{}'
372 for e
, d
in j
.items():
373 self
.keys
[e
] = ClientKeyringSpec
.from_json(d
)
375 def save(self
) -> None:
377 k
: v
.to_json() for k
, v
in self
.keys
.items()
379 self
.mgr
.set_store('client_keyrings', json
.dumps(data
))
381 def update(self
, ks
: ClientKeyringSpec
) -> None:
382 self
.keys
[ks
.entity
] = ks
385 def rm(self
, entity
: str) -> None:
386 if entity
in self
.keys
:
387 del self
.keys
[entity
]
393 HostCache stores different things:
395 1. `daemons`: Deployed daemons O(daemons)
397 They're part of the configuration nowadays and need to be
398 persistent. The name "daemon cache" is unfortunately a bit misleading.
399 Like for example we really need to know where daemons are deployed on
400 hosts that are offline.
402 2. `devices`: ceph-volume inventory cache O(hosts)
404 As soon as this is populated, it becomes more or less read-only.
406 3. `networks`: network interfaces for each host. O(hosts)
408 This is needed in order to deploy MONs. As this is mostly read-only.
410 4. `last_client_files` O(hosts)
412 Stores the last digest and owner/mode for files we've pushed to /etc/ceph
413 (ceph.conf or client keyrings).
415 5. `scheduled_daemon_actions`: O(daemons)
417 Used to run daemon actions after deploying a daemon. We need to
418 store it persistently, in order to stay consistent across
422 def __init__(self
, mgr
):
423 # type: (CephadmOrchestrator) -> None
424 self
.mgr
: CephadmOrchestrator
= mgr
425 self
.daemons
= {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
426 self
.last_daemon_update
= {} # type: Dict[str, datetime.datetime]
427 self
.devices
= {} # type: Dict[str, List[inventory.Device]]
428 self
.facts
= {} # type: Dict[str, Dict[str, Any]]
429 self
.last_facts_update
= {} # type: Dict[str, datetime.datetime]
430 self
.last_autotune
= {} # type: Dict[str, datetime.datetime]
431 self
.osdspec_previews
= {} # type: Dict[str, List[Dict[str, Any]]]
432 self
.osdspec_last_applied
= {} # type: Dict[str, Dict[str, datetime.datetime]]
433 self
.networks
= {} # type: Dict[str, Dict[str, Dict[str, List[str]]]]
434 self
.last_device_update
= {} # type: Dict[str, datetime.datetime]
435 self
.last_device_change
= {} # type: Dict[str, datetime.datetime]
436 self
.daemon_refresh_queue
= [] # type: List[str]
437 self
.device_refresh_queue
= [] # type: List[str]
438 self
.osdspec_previews_refresh_queue
= [] # type: List[str]
440 # host -> daemon name -> dict
441 self
.daemon_config_deps
= {} # type: Dict[str, Dict[str, Dict[str,Any]]]
442 self
.last_host_check
= {} # type: Dict[str, datetime.datetime]
443 self
.loading_osdspec_preview
= set() # type: Set[str]
444 self
.last_client_files
: Dict
[str, Dict
[str, Tuple
[str, int, int, int]]] = {}
445 self
.registry_login_queue
: Set
[str] = set()
447 self
.scheduled_daemon_actions
: Dict
[str, Dict
[str, str]] = {}
451 for k
, v
in self
.mgr
.get_store_prefix(HOST_CACHE_PREFIX
).items():
452 host
= k
[len(HOST_CACHE_PREFIX
):]
453 if host
not in self
.mgr
.inventory
:
454 self
.mgr
.log
.warning('removing stray HostCache host record %s' % (
456 self
.mgr
.set_store(k
, None)
459 if 'last_device_update' in j
:
460 self
.last_device_update
[host
] = str_to_datetime(j
['last_device_update'])
462 self
.device_refresh_queue
.append(host
)
463 if 'last_device_change' in j
:
464 self
.last_device_change
[host
] = str_to_datetime(j
['last_device_change'])
465 # for services, we ignore the persisted last_*_update
466 # and always trigger a new scrape on mgr restart.
467 self
.daemon_refresh_queue
.append(host
)
468 self
.daemons
[host
] = {}
469 self
.osdspec_previews
[host
] = []
470 self
.osdspec_last_applied
[host
] = {}
471 self
.devices
[host
] = []
472 self
.networks
[host
] = {}
473 self
.daemon_config_deps
[host
] = {}
474 for name
, d
in j
.get('daemons', {}).items():
475 self
.daemons
[host
][name
] = \
476 orchestrator
.DaemonDescription
.from_json(d
)
477 for d
in j
.get('devices', []):
478 self
.devices
[host
].append(inventory
.Device
.from_json(d
))
479 self
.networks
[host
] = j
.get('networks_and_interfaces', {})
480 self
.osdspec_previews
[host
] = j
.get('osdspec_previews', {})
481 self
.last_client_files
[host
] = j
.get('last_client_files', {})
482 for name
, ts
in j
.get('osdspec_last_applied', {}).items():
483 self
.osdspec_last_applied
[host
][name
] = str_to_datetime(ts
)
485 for name
, d
in j
.get('daemon_config_deps', {}).items():
486 self
.daemon_config_deps
[host
][name
] = {
487 'deps': d
.get('deps', []),
488 'last_config': str_to_datetime(d
['last_config']),
490 if 'last_host_check' in j
:
491 self
.last_host_check
[host
] = str_to_datetime(j
['last_host_check'])
492 self
.registry_login_queue
.add(host
)
493 self
.scheduled_daemon_actions
[host
] = j
.get('scheduled_daemon_actions', {})
496 'HostCache.load: host %s has %d daemons, '
497 '%d devices, %d networks' % (
498 host
, len(self
.daemons
[host
]), len(self
.devices
[host
]),
499 len(self
.networks
[host
])))
500 except Exception as e
:
501 self
.mgr
.log
.warning('unable to load cached state for %s: %s' % (
505 def update_host_daemons(self
, host
, dm
):
506 # type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
507 self
.daemons
[host
] = dm
508 self
.last_daemon_update
[host
] = datetime_now()
510 def update_host_facts(self
, host
, facts
):
511 # type: (str, Dict[str, Dict[str, Any]]) -> None
512 self
.facts
[host
] = facts
513 self
.last_facts_update
[host
] = datetime_now()
515 def update_autotune(self
, host
: str) -> None:
516 self
.last_autotune
[host
] = datetime_now()
518 def invalidate_autotune(self
, host
: str) -> None:
519 if host
in self
.last_autotune
:
520 del self
.last_autotune
[host
]
522 def devices_changed(self
, host
: str, b
: List
[inventory
.Device
]) -> bool:
523 a
= self
.devices
[host
]
526 aj
= {d
.path
: d
.to_json() for d
in a
}
527 bj
= {d
.path
: d
.to_json() for d
in b
}
529 self
.mgr
.log
.info("Detected new or changed devices on %s" % host
)
533 def update_host_devices_networks(
536 dls
: List
[inventory
.Device
],
537 nets
: Dict
[str, Dict
[str, List
[str]]]
540 host
not in self
.devices
541 or host
not in self
.last_device_change
542 or self
.devices_changed(host
, dls
)
544 self
.last_device_change
[host
] = datetime_now()
545 self
.last_device_update
[host
] = datetime_now()
546 self
.devices
[host
] = dls
547 self
.networks
[host
] = nets
549 def update_daemon_config_deps(self
, host
: str, name
: str, deps
: List
[str], stamp
: datetime
.datetime
) -> None:
550 self
.daemon_config_deps
[host
][name
] = {
552 'last_config': stamp
,
555 def update_last_host_check(self
, host
):
556 # type: (str) -> None
557 self
.last_host_check
[host
] = datetime_now()
559 def update_osdspec_last_applied(self
, host
, service_name
, ts
):
560 # type: (str, str, datetime.datetime) -> None
561 self
.osdspec_last_applied
[host
][service_name
] = ts
563 def update_client_file(self
,
570 if host
not in self
.last_client_files
:
571 self
.last_client_files
[host
] = {}
572 self
.last_client_files
[host
][path
] = (digest
, mode
, uid
, gid
)
574 def removed_client_file(self
, host
: str, path
: str) -> None:
576 host
in self
.last_client_files
577 and path
in self
.last_client_files
[host
]
579 del self
.last_client_files
[host
][path
]
581 def prime_empty_host(self
, host
):
582 # type: (str) -> None
584 Install an empty entry for a host
586 self
.daemons
[host
] = {}
587 self
.devices
[host
] = []
588 self
.networks
[host
] = {}
589 self
.osdspec_previews
[host
] = []
590 self
.osdspec_last_applied
[host
] = {}
591 self
.daemon_config_deps
[host
] = {}
592 self
.daemon_refresh_queue
.append(host
)
593 self
.device_refresh_queue
.append(host
)
594 self
.osdspec_previews_refresh_queue
.append(host
)
595 self
.registry_login_queue
.add(host
)
596 self
.last_client_files
[host
] = {}
598 def invalidate_host_daemons(self
, host
):
599 # type: (str) -> None
600 self
.daemon_refresh_queue
.append(host
)
601 if host
in self
.last_daemon_update
:
602 del self
.last_daemon_update
[host
]
605 def invalidate_host_devices(self
, host
):
606 # type: (str) -> None
607 self
.device_refresh_queue
.append(host
)
608 if host
in self
.last_device_update
:
609 del self
.last_device_update
[host
]
612 def distribute_new_registry_login_info(self
) -> None:
613 self
.registry_login_queue
= set(self
.mgr
.inventory
.keys())
615 def save_host(self
, host
: str) -> None:
616 j
: Dict
[str, Any
] = {
619 'osdspec_previews': [],
620 'osdspec_last_applied': {},
621 'daemon_config_deps': {},
623 if host
in self
.last_daemon_update
:
624 j
['last_daemon_update'] = datetime_to_str(self
.last_daemon_update
[host
])
625 if host
in self
.last_device_update
:
626 j
['last_device_update'] = datetime_to_str(self
.last_device_update
[host
])
627 if host
in self
.last_device_change
:
628 j
['last_device_change'] = datetime_to_str(self
.last_device_change
[host
])
629 if host
in self
.daemons
:
630 for name
, dd
in self
.daemons
[host
].items():
631 j
['daemons'][name
] = dd
.to_json()
632 if host
in self
.devices
:
633 for d
in self
.devices
[host
]:
634 j
['devices'].append(d
.to_json())
635 if host
in self
.networks
:
636 j
['networks_and_interfaces'] = self
.networks
[host
]
637 if host
in self
.daemon_config_deps
:
638 for name
, depi
in self
.daemon_config_deps
[host
].items():
639 j
['daemon_config_deps'][name
] = {
640 'deps': depi
.get('deps', []),
641 'last_config': datetime_to_str(depi
['last_config']),
643 if host
in self
.osdspec_previews
and self
.osdspec_previews
[host
]:
644 j
['osdspec_previews'] = self
.osdspec_previews
[host
]
645 if host
in self
.osdspec_last_applied
:
646 for name
, ts
in self
.osdspec_last_applied
[host
].items():
647 j
['osdspec_last_applied'][name
] = datetime_to_str(ts
)
649 if host
in self
.last_host_check
:
650 j
['last_host_check'] = datetime_to_str(self
.last_host_check
[host
])
652 if host
in self
.last_client_files
:
653 j
['last_client_files'] = self
.last_client_files
[host
]
654 if host
in self
.scheduled_daemon_actions
:
655 j
['scheduled_daemon_actions'] = self
.scheduled_daemon_actions
[host
]
657 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, json
.dumps(j
))
659 def rm_host(self
, host
):
660 # type: (str) -> None
661 if host
in self
.daemons
:
662 del self
.daemons
[host
]
663 if host
in self
.devices
:
664 del self
.devices
[host
]
665 if host
in self
.facts
:
667 if host
in self
.last_facts_update
:
668 del self
.last_facts_update
[host
]
669 if host
in self
.last_autotune
:
670 del self
.last_autotune
[host
]
671 if host
in self
.osdspec_previews
:
672 del self
.osdspec_previews
[host
]
673 if host
in self
.osdspec_last_applied
:
674 del self
.osdspec_last_applied
[host
]
675 if host
in self
.loading_osdspec_preview
:
676 self
.loading_osdspec_preview
.remove(host
)
677 if host
in self
.networks
:
678 del self
.networks
[host
]
679 if host
in self
.last_daemon_update
:
680 del self
.last_daemon_update
[host
]
681 if host
in self
.last_device_update
:
682 del self
.last_device_update
[host
]
683 if host
in self
.last_device_change
:
684 del self
.last_device_change
[host
]
685 if host
in self
.daemon_config_deps
:
686 del self
.daemon_config_deps
[host
]
687 if host
in self
.scheduled_daemon_actions
:
688 del self
.scheduled_daemon_actions
[host
]
689 if host
in self
.last_client_files
:
690 del self
.last_client_files
[host
]
691 self
.mgr
.set_store(HOST_CACHE_PREFIX
+ host
, None)
694 # type: () -> List[str]
696 for host
, di
in self
.daemons
.items():
700 def get_facts(self
, host
: str) -> Dict
[str, Any
]:
701 return self
.facts
.get(host
, {})
703 def get_daemons(self
):
704 # type: () -> List[orchestrator.DaemonDescription]
706 for host
, dm
in self
.daemons
.items():
707 for name
, dd
in dm
.items():
711 def get_daemons_by_host(self
, host
: str) -> List
[orchestrator
.DaemonDescription
]:
712 return list(self
.daemons
.get(host
, {}).values())
714 def get_daemon(self
, daemon_name
: str) -> orchestrator
.DaemonDescription
:
715 assert not daemon_name
.startswith('ha-rgw.')
716 for _
, dm
in self
.daemons
.items():
717 for _
, dd
in dm
.items():
718 if dd
.name() == daemon_name
:
720 raise orchestrator
.OrchestratorError(f
'Unable to find {daemon_name} daemon(s)')
722 def get_daemons_with_volatile_status(self
) -> Iterator
[Tuple
[str, Dict
[str, orchestrator
.DaemonDescription
]]]:
723 def alter(host
: str, dd_orig
: orchestrator
.DaemonDescription
) -> orchestrator
.DaemonDescription
:
725 if host
in self
.mgr
.offline_hosts
:
726 dd
.status
= orchestrator
.DaemonDescriptionStatus
.error
727 dd
.status_desc
= 'host is offline'
728 elif self
.mgr
.inventory
._inventory
[host
].get("status", "").lower() == "maintenance":
729 # We do not refresh daemons on hosts in maintenance mode, so stored daemon statuses
730 # could be wrong. We must assume maintenance is working and daemons are stopped
731 dd
.status
= orchestrator
.DaemonDescriptionStatus
.stopped
732 dd
.status_desc
= 'stopped'
733 dd
.events
= self
.mgr
.events
.get_for_daemon(dd
.name())
736 for host
, dm
in self
.daemons
.items():
737 yield host
, {name
: alter(host
, d
) for name
, d
in dm
.items()}
739 def get_daemons_by_service(self
, service_name
):
740 # type: (str) -> List[orchestrator.DaemonDescription]
741 assert not service_name
.startswith('keepalived.')
742 assert not service_name
.startswith('haproxy.')
744 result
= [] # type: List[orchestrator.DaemonDescription]
745 for host
, dm
in self
.daemons
.items():
746 for name
, d
in dm
.items():
747 if d
.service_name() == service_name
:
751 def get_daemons_by_type(self
, service_type
):
752 # type: (str) -> List[orchestrator.DaemonDescription]
753 assert service_type
not in ['keepalived', 'haproxy']
755 result
= [] # type: List[orchestrator.DaemonDescription]
756 for host
, dm
in self
.daemons
.items():
757 for name
, d
in dm
.items():
758 if d
.daemon_type
in service_to_daemon_types(service_type
):
762 def get_daemon_types(self
, hostname
: str) -> List
[str]:
763 """Provide a list of the types of daemons on the host"""
765 for _d
, dm
in self
.daemons
[hostname
].items():
766 assert dm
.daemon_type
is not None, f
'no daemon type for {dm!r}'
767 result
.add(dm
.daemon_type
)
770 def get_daemon_names(self
):
771 # type: () -> List[str]
773 for host
, dm
in self
.daemons
.items():
774 for name
, dd
in dm
.items():
778 def get_daemon_last_config_deps(self
, host
: str, name
: str) -> Tuple
[Optional
[List
[str]], Optional
[datetime
.datetime
]]:
779 if host
in self
.daemon_config_deps
:
780 if name
in self
.daemon_config_deps
[host
]:
781 return self
.daemon_config_deps
[host
][name
].get('deps', []), \
782 self
.daemon_config_deps
[host
][name
].get('last_config', None)
785 def get_host_client_files(self
, host
: str) -> Dict
[str, Tuple
[str, int, int, int]]:
786 return self
.last_client_files
.get(host
, {})
788 def host_needs_daemon_refresh(self
, host
):
789 # type: (str) -> bool
790 if host
in self
.mgr
.offline_hosts
:
791 logger
.debug(f
'Host "{host}" marked as offline. Skipping daemon refresh')
793 if host
in self
.daemon_refresh_queue
:
794 self
.daemon_refresh_queue
.remove(host
)
796 cutoff
= datetime_now() - datetime
.timedelta(
797 seconds
=self
.mgr
.daemon_cache_timeout
)
798 if host
not in self
.last_daemon_update
or self
.last_daemon_update
[host
] < cutoff
:
802 def host_needs_facts_refresh(self
, host
):
803 # type: (str) -> bool
804 if host
in self
.mgr
.offline_hosts
:
805 logger
.debug(f
'Host "{host}" marked as offline. Skipping gather facts refresh')
807 cutoff
= datetime_now() - datetime
.timedelta(
808 seconds
=self
.mgr
.facts_cache_timeout
)
809 if host
not in self
.last_facts_update
or self
.last_facts_update
[host
] < cutoff
:
813 def host_needs_autotune_memory(self
, host
):
814 # type: (str) -> bool
815 if host
in self
.mgr
.offline_hosts
:
816 logger
.debug(f
'Host "{host}" marked as offline. Skipping autotune')
818 cutoff
= datetime_now() - datetime
.timedelta(
819 seconds
=self
.mgr
.autotune_interval
)
820 if host
not in self
.last_autotune
or self
.last_autotune
[host
] < cutoff
:
824 def host_had_daemon_refresh(self
, host
: str) -> bool:
828 if host
in self
.last_daemon_update
:
830 if host
not in self
.daemons
:
832 return bool(self
.daemons
[host
])
834 def host_needs_device_refresh(self
, host
):
835 # type: (str) -> bool
836 if host
in self
.mgr
.offline_hosts
:
837 logger
.debug(f
'Host "{host}" marked as offline. Skipping device refresh')
839 if host
in self
.device_refresh_queue
:
840 self
.device_refresh_queue
.remove(host
)
842 cutoff
= datetime_now() - datetime
.timedelta(
843 seconds
=self
.mgr
.device_cache_timeout
)
844 if host
not in self
.last_device_update
or self
.last_device_update
[host
] < cutoff
:
848 def host_needs_osdspec_preview_refresh(self
, host
: str) -> bool:
849 if host
in self
.mgr
.offline_hosts
:
850 logger
.debug(f
'Host "{host}" marked as offline. Skipping osdspec preview refresh')
852 if host
in self
.osdspec_previews_refresh_queue
:
853 self
.osdspec_previews_refresh_queue
.remove(host
)
855 # Since this is dependent on other factors (device and spec) this does not need
856 # to be updated periodically.
859 def host_needs_check(self
, host
):
860 # type: (str) -> bool
861 cutoff
= datetime_now() - datetime
.timedelta(
862 seconds
=self
.mgr
.host_check_interval
)
863 return host
not in self
.last_host_check
or self
.last_host_check
[host
] < cutoff
865 def osdspec_needs_apply(self
, host
: str, spec
: ServiceSpec
) -> bool:
867 host
not in self
.devices
868 or host
not in self
.last_device_change
869 or host
not in self
.last_device_update
870 or host
not in self
.osdspec_last_applied
871 or spec
.service_name() not in self
.osdspec_last_applied
[host
]
874 created
= self
.mgr
.spec_store
.get_created(spec
)
875 if not created
or created
> self
.last_device_change
[host
]:
877 return self
.osdspec_last_applied
[host
][spec
.service_name()] < self
.last_device_change
[host
]
879 def host_needs_registry_login(self
, host
: str) -> bool:
880 if host
in self
.mgr
.offline_hosts
:
882 if host
in self
.registry_login_queue
:
883 self
.registry_login_queue
.remove(host
)
887 def add_daemon(self
, host
, dd
):
888 # type: (str, orchestrator.DaemonDescription) -> None
889 assert host
in self
.daemons
890 self
.daemons
[host
][dd
.name()] = dd
892 def rm_daemon(self
, host
: str, name
: str) -> None:
893 assert not name
.startswith('ha-rgw.')
895 if host
in self
.daemons
:
896 if name
in self
.daemons
[host
]:
897 del self
.daemons
[host
][name
]
899 def daemon_cache_filled(self
) -> bool:
901 i.e. we have checked the daemons for each hosts at least once.
902 excluding offline hosts.
904 We're not checking for `host_needs_daemon_refresh`, as this might never be
907 return all((self
.host_had_daemon_refresh(h
) or h
in self
.mgr
.offline_hosts
)
908 for h
in self
.get_hosts())
910 def schedule_daemon_action(self
, host
: str, daemon_name
: str, action
: str) -> None:
911 assert not daemon_name
.startswith('ha-rgw.')
920 existing_action
= self
.scheduled_daemon_actions
.get(host
, {}).get(daemon_name
, None)
921 if existing_action
and priorities
[existing_action
] > priorities
[action
]:
923 f
'skipping {action}ing {daemon_name}, cause {existing_action} already scheduled.')
926 if host
not in self
.scheduled_daemon_actions
:
927 self
.scheduled_daemon_actions
[host
] = {}
928 self
.scheduled_daemon_actions
[host
][daemon_name
] = action
930 def rm_scheduled_daemon_action(self
, host
: str, daemon_name
: str) -> None:
931 if host
in self
.scheduled_daemon_actions
:
932 if daemon_name
in self
.scheduled_daemon_actions
[host
]:
933 del self
.scheduled_daemon_actions
[host
][daemon_name
]
934 if not self
.scheduled_daemon_actions
[host
]:
935 del self
.scheduled_daemon_actions
[host
]
937 def get_scheduled_daemon_action(self
, host
: str, daemon
: str) -> Optional
[str]:
938 assert not daemon
.startswith('ha-rgw.')
940 return self
.scheduled_daemon_actions
.get(host
, {}).get(daemon
)
944 def __init__(self
, mgr
):
945 # type: (CephadmOrchestrator) -> None
946 self
.mgr
: CephadmOrchestrator
= mgr
947 self
.events
= {} # type: Dict[str, List[OrchestratorEvent]]
949 def add(self
, event
: OrchestratorEvent
) -> None:
951 if event
.kind_subject() not in self
.events
:
952 self
.events
[event
.kind_subject()] = [event
]
954 for e
in self
.events
[event
.kind_subject()]:
955 if e
.message
== event
.message
:
958 self
.events
[event
.kind_subject()].append(event
)
960 # limit to five events for now.
961 self
.events
[event
.kind_subject()] = self
.events
[event
.kind_subject()][-5:]
963 def for_service(self
, spec
: ServiceSpec
, level
: str, message
: str) -> None:
964 e
= OrchestratorEvent(datetime_now(), 'service',
965 spec
.service_name(), level
, message
)
968 def from_orch_error(self
, e
: OrchestratorError
) -> None:
969 if e
.event_subject
is not None:
970 self
.add(OrchestratorEvent(
978 def for_daemon(self
, daemon_name
: str, level
: str, message
: str) -> None:
979 e
= OrchestratorEvent(datetime_now(), 'daemon', daemon_name
, level
, message
)
982 def for_daemon_from_exception(self
, daemon_name
: str, e
: Exception) -> None:
989 def cleanup(self
) -> None:
990 # Needs to be properly done, in case events are persistently stored.
992 unknowns
: List
[str] = []
993 daemons
= self
.mgr
.cache
.get_daemon_names()
994 specs
= self
.mgr
.spec_store
.all_specs
.keys()
995 for k_s
, v
in self
.events
.items():
996 kind
, subject
= k_s
.split(':')
997 if kind
== 'service':
998 if subject
not in specs
:
1000 elif kind
== 'daemon':
1001 if subject
not in daemons
:
1002 unknowns
.append(k_s
)
1004 for k_s
in unknowns
:
1005 del self
.events
[k_s
]
1007 def get_for_service(self
, name
: str) -> List
[OrchestratorEvent
]:
1008 return self
.events
.get('service:' + name
, [])
1010 def get_for_daemon(self
, name
: str) -> List
[OrchestratorEvent
]:
1011 return self
.events
.get('daemon:' + name
, [])