import orchestrator
from ceph.deployment import inventory
from ceph.deployment.service_spec import ServiceSpec
-from orchestrator import OrchestratorError, HostSpec
+from orchestrator import OrchestratorError, HostSpec, OrchestratorEvent
if TYPE_CHECKING:
from .module import CephadmOrchestrator
for h, hostspec in self._inventory.items():
if not label or label in hostspec.get('labels', []):
if as_hostspec:
- yield hostspec
- yield h
+ yield self.spec_from_dict(hostspec)
+ else:
+ yield h
def spec_from_dict(self, info):
hostname = info['hostname']
self.mgr = mgr
self.specs = {} # type: Dict[str, ServiceSpec]
self.spec_created = {} # type: Dict[str, datetime.datetime]
+ self.spec_preview = {} # type: Dict[str, ServiceSpec]
def load(self):
# type: () -> None
def save(self, spec):
# type: (ServiceSpec) -> None
+ if spec.preview_only:
+ self.spec_preview[spec.service_name()] = spec
+ return None
self.specs[spec.service_name()] = spec
self.spec_created[spec.service_name()] = datetime.datetime.utcnow()
self.mgr.set_store(
'created': self.spec_created[spec.service_name()].strftime(DATEFMT),
}, sort_keys=True),
)
+ self.mgr.events.for_service(spec, OrchestratorEvent.INFO, 'service was created')
def rm(self, service_name):
# type: (str) -> bool
self.daemon_refresh_queue = [] # type: List[str]
self.device_refresh_queue = [] # type: List[str]
self.osdspec_previews_refresh_queue = [] # type: List[str]
+
+ # host -> daemon name -> dict
self.daemon_config_deps = {} # type: Dict[str, Dict[str, Dict[str,Any]]]
self.last_host_check = {} # type: Dict[str, datetime.datetime]
self.loading_osdspec_preview = set() # type: Set[str]
+ self.last_etc_ceph_ceph_conf: Dict[str, datetime.datetime] = {}
+ self.registry_login_queue: Set[str] = set()
def load(self):
# type: () -> None
if 'last_host_check' in j:
self.last_host_check[host] = datetime.datetime.strptime(
j['last_host_check'], DATEFMT)
+ if 'last_etc_ceph_ceph_conf' in j:
+ self.last_etc_ceph_ceph_conf[host] = datetime.datetime.strptime(
+ j['last_etc_ceph_ceph_conf'], DATEFMT)
+ self.registry_login_queue.add(host)
self.mgr.log.debug(
'HostCache.load: host %s has %d daemons, '
'%d devices, %d networks' % (
self.daemon_refresh_queue.append(host)
self.device_refresh_queue.append(host)
self.osdspec_previews_refresh_queue.append(host)
+ self.registry_login_queue.add(host)
def invalidate_host_daemons(self, host):
# type: (str) -> None
if host in self.last_device_update:
del self.last_device_update[host]
self.mgr.event.set()
+
+ def distribute_new_registry_login_info(self):
+ self.registry_login_queue = set(self.mgr.inventory.keys())
def save_host(self, host):
# type: (str) -> None
if host in self.last_host_check:
j['last_host_check'] = self.last_host_check[host].strftime(DATEFMT)
+
+ if host in self.last_etc_ceph_ceph_conf:
+ j['last_etc_ceph_ceph_conf'] = self.last_etc_ceph_ceph_conf[host].strftime(DATEFMT)
+
self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j))
def rm_host(self, host):
r.append(dd)
return r
+ def get_daemon(self, daemon_name: str) -> orchestrator.DaemonDescription:
+ for _, dm in self.daemons.items():
+ for _, dd in dm.items():
+ if dd.name() == daemon_name:
+ return dd
+ raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)')
+
def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]:
- for host, dm in self.daemons.items():
+ def alter(host, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
+ dd = copy(dd_orig)
if host in self.mgr.offline_hosts:
- def set_offline(dd: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription:
- ret = copy(dd)
- ret.status = -1
- ret.status_desc = 'host is offline'
- return ret
- yield host, {name: set_offline(d) for name, d in dm.items()}
- else:
- yield host, dm
+ dd.status = -1
+ dd.status_desc = 'host is offline'
+ dd.events = self.mgr.events.get_for_daemon(dd.name())
+ return dd
+
+ for host, dm in self.daemons.items():
+ yield host, {name: alter(host, d) for name, d in dm.items()}
def get_daemons_by_service(self, service_name):
# type: (str) -> List[orchestrator.DaemonDescription]
result = [] # type: List[orchestrator.DaemonDescription]
for host, dm in self.daemons.items():
for name, d in dm.items():
- if name.startswith(service_name + '.'):
+ if d.service_name() == service_name:
+ result.append(d)
+ return result
+
+ def get_daemons_by_type(self, service_type):
+ # type: (str) -> List[orchestrator.DaemonDescription]
+ result = [] # type: List[orchestrator.DaemonDescription]
+ for host, dm in self.daemons.items():
+ for name, d in dm.items():
+ if d.daemon_type == service_type:
result.append(d)
return result
seconds=self.mgr.host_check_interval)
return host not in self.last_host_check or self.last_host_check[host] < cutoff
+ def host_needs_new_etc_ceph_ceph_conf(self, host: str):
+ if not self.mgr.manage_etc_ceph_ceph_conf:
+ return False
+ if self.mgr.paused:
+ return False
+ if host in self.mgr.offline_hosts:
+ return False
+ if not self.mgr.last_monmap:
+ return False
+ if host not in self.last_etc_ceph_ceph_conf:
+ return True
+ if self.mgr.last_monmap > self.last_etc_ceph_ceph_conf[host]:
+ return True
+ # already up to date:
+ return False
+
+ def update_last_etc_ceph_ceph_conf(self, host: str):
+ if not self.mgr.last_monmap:
+ return
+ self.last_etc_ceph_ceph_conf[host] = self.mgr.last_monmap
+
+ def host_needs_registry_login(self, host):
+ if host in self.mgr.offline_hosts:
+ return False
+ if host in self.registry_login_queue:
+ self.registry_login_queue.remove(host)
+ return True
+ return False
+
def add_daemon(self, host, dd):
# type: (str, orchestrator.DaemonDescription) -> None
assert host in self.daemons
def rm_daemon(self, host, name):
if host in self.daemons:
if name in self.daemons[host]:
- del self.daemons[host][name]
\ No newline at end of file
+ del self.daemons[host][name]
+
+ def daemon_cache_filled(self):
+ """
+ i.e. we have checked the daemons for each hosts at least once.
+ excluding offline hosts.
+
+ We're not checking for `host_needs_daemon_refresh`, as this might never be
+ False for all hosts.
+ """
+ return all((h in self.last_daemon_update or h in self.mgr.offline_hosts)
+ for h in self.get_hosts())
+
+
+class EventStore():
+ def __init__(self, mgr):
+ # type: (CephadmOrchestrator) -> None
+ self.mgr: CephadmOrchestrator = mgr
+ self.events = {} # type: Dict[str, List[OrchestratorEvent]]
+
+ def add(self, event: OrchestratorEvent) -> None:
+
+ if event.kind_subject() not in self.events:
+ self.events[event.kind_subject()] = [event]
+
+ for e in self.events[event.kind_subject()]:
+ if e.message == event.message:
+ return
+
+ self.events[event.kind_subject()].append(event)
+
+ # limit to five events for now.
+ self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:]
+
+ def for_service(self, spec: ServiceSpec, level, message) -> None:
+ e = OrchestratorEvent(datetime.datetime.utcnow(), 'service', spec.service_name(), level, message)
+ self.add(e)
+
+ def from_orch_error(self, e: OrchestratorError):
+ if e.event_subject is not None:
+ self.add(OrchestratorEvent(
+ datetime.datetime.utcnow(),
+ e.event_subject[0],
+ e.event_subject[1],
+ "ERROR",
+ str(e)
+ ))
+
+
+ def for_daemon(self, daemon_name, level, message):
+ e = OrchestratorEvent(datetime.datetime.utcnow(), 'daemon', daemon_name, level, message)
+ self.add(e)
+
+ def for_daemon_from_exception(self, daemon_name, e: Exception):
+ self.for_daemon(
+ daemon_name,
+ "ERROR",
+ str(e)
+ )
+
+ def cleanup(self) -> None:
+ # Needs to be properly done, in case events are persistently stored.
+
+ unknowns: List[str] = []
+ daemons = self.mgr.cache.get_daemon_names()
+ specs = self.mgr.spec_store.specs.keys()
+ for k_s, v in self.events.items():
+ kind, subject = k_s.split(':')
+ if kind == 'service':
+ if subject not in specs:
+ unknowns.append(k_s)
+ elif kind == 'daemon':
+ if subject not in daemons:
+ unknowns.append(k_s)
+
+ for k_s in unknowns:
+ del self.events[k_s]
+
+ def get_for_service(self, name) -> List[OrchestratorEvent]:
+ return self.events.get('service:' + name, [])
+
+ def get_for_daemon(self, name) -> List[OrchestratorEvent]:
+ return self.events.get('daemon:' + name, [])