import orchestrator
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec
-from cephadm.utils import str_to_datetime, datetime_to_str
+from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent
if TYPE_CHECKING:
class Inventory:
+ """
+ The inventory stores a HostSpec for all hosts persistently.
+ """
+
def __init__(self, mgr: 'CephadmOrchestrator'):
self.mgr = mgr
# load inventory
i = self.mgr.get_store('inventory')
if i:
self._inventory: Dict[str, dict] = json.loads(i)
+ # handle old clusters missing 'hostname' key from hostspec
+ for k, v in self._inventory.items():
+ if 'hostname' not in v:
+ v['hostname'] = k
else:
self._inventory = dict()
logger.debug('Loaded inventory %s' % self._inventory)
def __contains__(self, host: str) -> bool:
return host in self._inventory
- def assert_host(self, host):
+ def assert_host(self, host: str) -> None:
if host not in self._inventory:
raise OrchestratorError('host %s does not exist' % host)
- def add_host(self, spec: HostSpec):
+ def add_host(self, spec: HostSpec) -> None:
self._inventory[spec.hostname] = spec.to_json()
self.save()
- def rm_host(self, host: str):
+ def rm_host(self, host: str) -> None:
self.assert_host(host)
del self._inventory[host]
self.save()
- def set_addr(self, host, addr):
+ def set_addr(self, host: str, addr: str) -> None:
self.assert_host(host)
self._inventory[host]['addr'] = addr
self.save()
- def add_label(self, host, label):
+ def add_label(self, host: str, label: str) -> None:
self.assert_host(host)
if 'labels' not in self._inventory[host]:
self._inventory[host]['labels'].append(label)
self.save()
- def rm_label(self, host, label):
+ def rm_label(self, host: str, label: str) -> None:
self.assert_host(host)
if 'labels' not in self._inventory[host]:
self._inventory[host]['labels'].remove(label)
self.save()
- def get_addr(self, host) -> str:
+ def get_addr(self, host: str) -> str:
self.assert_host(host)
return self._inventory[host].get('addr', host)
else:
yield h
- def spec_from_dict(self, info) -> HostSpec:
+ def spec_from_dict(self, info: dict) -> HostSpec:
hostname = info['hostname']
return HostSpec(
hostname,
def all_specs(self) -> List[HostSpec]:
return list(map(self.spec_from_dict, self._inventory.values()))
- def save(self):
+ def save(self) -> None:
self.mgr.set_store('inventory', json.dumps(self._inventory))
self.spec_preview[spec.service_name()] = spec
return None
self.specs[spec.service_name()] = spec
- self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
+ self.spec_created[spec.service_name()] = datetime_now()
self.mgr.set_store(
SPEC_STORE_PREFIX + spec.service_name(),
json.dumps({
class HostCache():
+ """
+ HostCache stores different things:
+
+ 1. `daemons`: Deployed daemons O(daemons)
+
+ They're part of the configuration nowadays and need to be
+ persistent. The name "daemon cache" is unfortunately a bit misleading.
+ Like for example we really need to know where daemons are deployed on
+ hosts that are offline.
+
+ 2. `devices`: ceph-volume inventory cache O(hosts)
+
+ As soon as this is populated, it becomes more or less read-only.
+
+ 3. `networks`: network interfaces for each host. O(hosts)
+
+ This is needed in order to deploy MONs. As this is mostly read-only.
+
+ 4. `last_etc_ceph_ceph_conf` O(hosts)
+
+ Stores the last refresh time for the /etc/ceph/ceph.conf. Used
+ to avoid deploying new configs when failing over to a new mgr.
+
+ 5. `scheduled_daemon_actions`: O(daemons)
+
+ Used to run daemon actions after deploying a daemon. We need to
+ store it persistently, in order to stay consistent across
+ MGR failovers.
+ """
+
def __init__(self, mgr):
# type: (CephadmOrchestrator) -> None
self.mgr: CephadmOrchestrator = mgr
self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]]
self.last_daemon_update = {} # type: Dict[str, datetime.datetime]
self.devices = {} # type: Dict[str, List[inventory.Device]]
+ self.facts = {} # type: Dict[str, Dict[str, Any]]
+ self.last_facts_update = {} # type: Dict[str, datetime.datetime]
self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]]
self.networks = {} # type: Dict[str, Dict[str, List[str]]]
self.last_device_update = {} # type: Dict[str, datetime.datetime]
def update_host_daemons(self, host, dm):
# type: (str, Dict[str, orchestrator.DaemonDescription]) -> None
self.daemons[host] = dm
- self.last_daemon_update[host] = datetime.datetime.utcnow()
+ self.last_daemon_update[host] = datetime_now()
+
+ def update_host_facts(self, host, facts):
+ # type: (str, Dict[str, Dict[str, Any]]) -> None
+ self.facts[host] = facts
+ self.last_facts_update[host] = datetime.datetime.utcnow()
def update_host_devices_networks(self, host, dls, nets):
# type: (str, List[inventory.Device], Dict[str,List[str]]) -> None
self.devices[host] = dls
self.networks[host] = nets
- self.last_device_update[host] = datetime.datetime.utcnow()
+ self.last_device_update[host] = datetime_now()
- def update_daemon_config_deps(self, host, name, deps, stamp):
+ def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None:
self.daemon_config_deps[host][name] = {
'deps': deps,
'last_config': stamp,
def update_last_host_check(self, host):
# type: (str) -> None
- self.last_host_check[host] = datetime.datetime.utcnow()
+ self.last_host_check[host] = datetime_now()
def prime_empty_host(self, host):
# type: (str) -> None
del self.last_device_update[host]
self.mgr.event.set()
- def distribute_new_registry_login_info(self):
+ def distribute_new_registry_login_info(self) -> None:
self.registry_login_queue = set(self.mgr.inventory.keys())
def save_host(self, host: str) -> None:
j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host])
if host in self.last_device_update:
j['last_device_update'] = datetime_to_str(self.last_device_update[host])
- for name, dd in self.daemons[host].items():
- j['daemons'][name] = dd.to_json()
- for d in self.devices[host]:
- j['devices'].append(d.to_json())
- j['networks'] = self.networks[host]
- for name, depi in self.daemon_config_deps[host].items():
- j['daemon_config_deps'][name] = {
- 'deps': depi.get('deps', []),
- 'last_config': datetime_to_str(depi['last_config']),
- }
- if self.osdspec_previews[host]:
+ if host in self.daemons:
+ for name, dd in self.daemons[host].items():
+ j['daemons'][name] = dd.to_json()
+ if host in self.devices:
+ for d in self.devices[host]:
+ j['devices'].append(d.to_json())
+ if host in self.networks:
+ j['networks'] = self.networks[host]
+ if host in self.daemon_config_deps:
+ for name, depi in self.daemon_config_deps[host].items():
+ j['daemon_config_deps'][name] = {
+ 'deps': depi.get('deps', []),
+ 'last_config': datetime_to_str(depi['last_config']),
+ }
+ if host in self.osdspec_previews and self.osdspec_previews[host]:
j['osdspec_previews'] = self.osdspec_previews[host]
if host in self.last_host_check:
if host in self.last_etc_ceph_ceph_conf:
j['last_etc_ceph_ceph_conf'] = datetime_to_str(self.last_etc_ceph_ceph_conf[host])
- if self.scheduled_daemon_actions.get(host, {}):
+ if host in self.scheduled_daemon_actions:
j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host]
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
del self.daemons[host]
if host in self.devices:
del self.devices[host]
+ if host in self.facts:
+ del self.facts[host]
+ if host in self.last_facts_update:
+ del self.last_facts_update[host]
if host in self.osdspec_previews:
del self.osdspec_previews[host]
if host in self.loading_osdspec_preview:
raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
- def alter(host, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
+ def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
dd = copy(dd_orig)
if host in self.mgr.offline_hosts:
dd.status = -1
r.append(name)
return r
- def get_daemon_last_config_deps(self, host, name) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
+ def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]:
if host in self.daemon_config_deps:
if name in self.daemon_config_deps[host]:
return self.daemon_config_deps[host][name].get('deps', []), \
if host in self.daemon_refresh_queue:
self.daemon_refresh_queue.remove(host)
return True
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+ cutoff = datetime_now() - datetime.timedelta(
seconds=self.mgr.daemon_cache_timeout)
if host not in self.last_daemon_update or self.last_daemon_update[host] < cutoff:
return True
return False
+ def host_needs_facts_refresh(self, host):
+ # type: (str) -> bool
+ if host in self.mgr.offline_hosts:
+ logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh')
+ return False
+ cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+ seconds=self.mgr.facts_cache_timeout)
+ if host not in self.last_facts_update or self.last_facts_update[host] < cutoff:
+ return True
+ return False
+
def host_had_daemon_refresh(self, host: str) -> bool:
"""
... at least once.
if host in self.device_refresh_queue:
self.device_refresh_queue.remove(host)
return True
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+ cutoff = datetime_now() - datetime.timedelta(
seconds=self.mgr.device_cache_timeout)
if host not in self.last_device_update or self.last_device_update[host] < cutoff:
return True
return False
- def host_needs_osdspec_preview_refresh(self, host):
+ def host_needs_osdspec_preview_refresh(self, host: str) -> bool:
if host in self.mgr.offline_hosts:
logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh')
return False
def host_needs_check(self, host):
# type: (str) -> bool
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
+ cutoff = datetime_now() - datetime.timedelta(
seconds=self.mgr.host_check_interval)
return host not in self.last_host_check or self.last_host_check[host] < cutoff
- def host_needs_new_etc_ceph_ceph_conf(self, host: str):
+ def host_needs_new_etc_ceph_ceph_conf(self, host: str) -> bool:
if not self.mgr.manage_etc_ceph_ceph_conf:
return False
if self.mgr.paused:
# already up to date:
return False
- def update_last_etc_ceph_ceph_conf(self, host: str):
+ def update_last_etc_ceph_ceph_conf(self, host: str) -> None:
if not self.mgr.last_monmap:
return
- self.last_etc_ceph_ceph_conf[host] = datetime.datetime.utcnow()
+ self.last_etc_ceph_ceph_conf[host] = datetime_now()
def host_needs_registry_login(self, host: str) -> bool:
if host in self.mgr.offline_hosts:
assert host in self.daemons
self.daemons[host][dd.name()] = dd
- def rm_daemon(self, host, name):
+ def rm_daemon(self, host: str, name: str) -> None:
if host in self.daemons:
if name in self.daemons[host]:
del self.daemons[host][name]
- def daemon_cache_filled(self):
+ def daemon_cache_filled(self) -> bool:
"""
i.e. we have checked the daemons for each hosts at least once.
excluding offline hosts.
return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts)
for h in self.get_hosts())
- def schedule_daemon_action(self, host: str, daemon_name: str, action: str):
+ def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None:
priorities = {
'start': 1,
'restart': 2,
self.scheduled_daemon_actions[host] = {}
self.scheduled_daemon_actions[host][daemon_name] = action
- def rm_scheduled_daemon_action(self, host: str, daemon_name: str):
+ def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None:
if host in self.scheduled_daemon_actions:
if daemon_name in self.scheduled_daemon_actions[host]:
del self.scheduled_daemon_actions[host][daemon_name]
if not self.scheduled_daemon_actions[host]:
del self.scheduled_daemon_actions[host]
- def get_scheduled_daemon_action(self, host, daemon) -> Optional[str]:
+ def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]:
return self.scheduled_daemon_actions.get(host, {}).get(daemon)
# limit to five events for now.
self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
- def for_service(self, spec: ServiceSpec, level, message) -> None:
- e = OrchestratorEvent(datetime.datetime.utcnow(), 'service',
+ def for_service(self, spec: ServiceSpec, level: str, message: str) -> None:
+ e = OrchestratorEvent(datetime_now(), 'service',
spec.service_name(), level, message)
self.add(e)
- def from_orch_error(self, e: OrchestratorError):
+ def from_orch_error(self, e: OrchestratorError) -> None:
if e.event_subject is not None:
self.add(OrchestratorEvent(
- datetime.datetime.utcnow(),
+ datetime_now(),
e.event_subject[0],
e.event_subject[1],
"ERROR",
str(e)
))
- def for_daemon(self, daemon_name, level, message):
- e = OrchestratorEvent(datetime.datetime.utcnow(), 'daemon', daemon_name, level, message)
+ def for_daemon(self, daemon_name: str, level: str, message: str) -> None:
+ e = OrchestratorEvent(datetime_now(), 'daemon', daemon_name, level, message)
self.add(e)
- def for_daemon_from_exception(self, daemon_name, e: Exception):
+ def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None:
self.for_daemon(
daemon_name,
"ERROR",
for k_s in unknowns:
del self.events[k_s]
- def get_for_service(self, name) -> List[OrchestratorEvent]:
+ def get_for_service(self, name: str) -> List[OrchestratorEvent]:
return self.events.get('service:' + name, [])
- def get_for_daemon(self, name) -> List[OrchestratorEvent]:
+ def get_for_daemon(self, name: str) -> List[OrchestratorEvent]:
return self.events.get('daemon:' + name, [])