import json
import errno
+import logging
+from functools import wraps
+
import six
import os
-import datetime
import tempfile
import multiprocessing.pool
remoto = None
remoto_import_error = str(e)
-DATEFMT = '%Y-%m-%d %H:%M:%S.%f'
+logger = logging.getLogger(__name__)
# high-level TODO:
# - bring over some of the protections from ceph-deploy that guard against
# multiple bootstrapping / initialization
-class SSHReadCompletion(orchestrator.ReadCompletion):
+class SSHCompletionmMixin(object):
def __init__(self, result):
if isinstance(result, multiprocessing.pool.AsyncResult):
self._result = [result]
def result(self):
return list(map(lambda r: r.get(), self._result))
+class SSHReadCompletion(SSHCompletionmMixin, orchestrator.ReadCompletion):
@property
def is_complete(self):
return all(map(lambda r: r.ready(), self._result))
-class SSHReadCompletionReady(SSHReadCompletion):
- def __init__(self, result):
- self._result = result
-
- @property
- def result(self):
- return self._result
-
- @property
- def is_complete(self):
- return True
-class SSHWriteCompletion(orchestrator.WriteCompletion):
- def __init__(self, result):
- super(SSHWriteCompletion, self).__init__()
- if isinstance(result, multiprocessing.pool.AsyncResult):
- self._result = [result]
- else:
- self._result = result
- assert isinstance(self._result, list)
-
- @property
- def result(self):
- return list(map(lambda r: r.get(), self._result))
+class SSHWriteCompletion(SSHCompletionmMixin, orchestrator.WriteCompletion):
@property
def is_persistent(self):
def __getattr__(self, name):
return getattr(self.conn, name)
+
+def log_exceptions(f):
+ if six.PY3:
+ return f
+ else:
+ # Python 2 does no exception chaining, thus the
+ # real exception is lost
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ try:
+ return f(*args, **kwargs)
+ except Exception:
+ logger.exception('something went wrong.')
+ raise
+ return wrapper
+
+
class SSHOrchestrator(MgrModule, orchestrator.Orchestrator):
_STORE_HOST_PREFIX = "host"
self._cluster_fsid = None
self._worker_pool = multiprocessing.pool.ThreadPool(1)
+ # the keys in inventory_cache are authoritative.
+ # You must not call remove_outdated()
+ # The values are cached by instance.
+ # cache is invalidated by
+ # 1. timeout
+ # 2. refresh parameter
+ self.inventory_cache = orchestrator.OutdatablePersistentDict(self, self._STORE_HOST_PREFIX)
+
def handle_command(self, inbuf, command):
if command["prefix"] == "ssh set-ssh-config":
return self._set_ssh_config(inbuf, command)
complete = True
for c in completions:
+ if c.is_complete:
+ continue
+
if not isinstance(c, SSHReadCompletion) and \
not isinstance(c, SSHWriteCompletion):
raise TypeError("unexpected completion: {}".format(c.__class__))
- if c.is_complete:
- continue
-
complete = False
return complete
- @staticmethod
- def time_from_string(timestr):
- # drop the 'Z' timezone indication, it's always UTC
- timestr = timestr.rstrip('Z')
- return datetime.datetime.strptime(timestr, DATEFMT)
-
def _get_cluster_fsid(self):
"""
Fetch and cache the cluster fsid.
"""
if isinstance(hosts, six.string_types):
hosts = [hosts]
- unregistered_hosts = []
- for host in hosts:
- key = self._hostname_to_store_key(host)
- if not self.get_store(key):
- unregistered_hosts.append(host)
+ keys = self.inventory_cache.keys()
+ unregistered_hosts = set(hosts) - keys
if unregistered_hosts:
+ logger.warning('keys = {}'.format(keys))
raise RuntimeError("Host(s) {} not registered".format(
", ".join(map(lambda h: "'{}'".format(h),
unregistered_hosts))))
conn.remote_module.write_keyring(keyring_path, keyring)
return keyring_path
- def _hostname_to_store_key(self, host):
- return "{}.{}".format(self._STORE_HOST_PREFIX, host)
-
def _get_hosts(self, wanted=None):
- if wanted:
- hosts_info = []
- for host in wanted:
- key = self._hostname_to_store_key(host)
- info = self.get_store(key)
- if info:
- hosts_info.append((key, info))
- else:
- hosts_info = six.iteritems(self.get_store_prefix(self._STORE_HOST_PREFIX))
-
- return list(map(lambda kv: (kv[0], json.loads(kv[1])), hosts_info))
+ return self.inventory_cache.items_filtered(wanted)
def add_host(self, host):
"""
:param host: host name
"""
+ @log_exceptions
def run(host):
- key = self._hostname_to_store_key(host)
- self.set_store(key, json.dumps({
- "host": host,
- "inventory": None,
- "last_inventory_refresh": None
- }))
+ self.inventory_cache[host] = orchestrator.OutdatableData()
return "Added host '{}'".format(host)
return SSHWriteCompletion(
:param host: host name
"""
+ @log_exceptions
def run(host):
- key = self._hostname_to_store_key(host)
- self.set_store(key, None)
+ del self.inventory_cache[host]
return "Removed host '{}'".format(host)
return SSHWriteCompletion(
TODO:
- InventoryNode probably needs to be able to report labels
"""
- nodes = []
- for key, host_info in self._get_hosts():
- node = orchestrator.InventoryNode(host_info["host"], [])
- nodes.append(node)
- return SSHReadCompletionReady(nodes)
+ nodes = [orchestrator.InventoryNode(host_name, []) for host_name in self.inventory_cache]
+ return orchestrator.TrivialReadCompletion(nodes)
def _get_device_inventory(self, host):
"""
# this implies the returned hosts are registered
hosts = self._get_hosts()
- def run(key, host_info):
- updated = False
- host = host_info["host"]
-
- if not host_info["inventory"]:
- self.log.info("caching inventory for '{}'".format(host))
- host_info["inventory"] = self._get_device_inventory(host)
- updated = True
- else:
- timeout_min = int(self.get_module_option(
- "inventory_cache_timeout_min",
- self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
-
- cutoff = datetime.datetime.utcnow() - datetime.timedelta(
- minutes=timeout_min)
-
- last_update = self.time_from_string(host_info["last_inventory_refresh"])
+ @log_exceptions
+ def run(host, host_info):
+ # type: (str, orchestrator.OutdatableData) -> orchestrator.InventoryNode
- if last_update < cutoff or refresh:
- self.log.info("refresh stale inventory for '{}'".format(host))
- host_info["inventory"] = self._get_device_inventory(host)
- updated = True
- else:
- self.log.info("reading cached inventory for '{}'".format(host))
- pass
+ timeout_min = int(self.get_module_option(
+ "inventory_cache_timeout_min",
+ self._DEFAULT_INVENTORY_CACHE_TIMEOUT_MIN))
- if updated:
- now = datetime.datetime.utcnow()
- now = now.strftime(DATEFMT)
- host_info["last_inventory_refresh"] = now
- self.set_store(key, json.dumps(host_info))
-
- devices = list(map(lambda di:
- orchestrator.InventoryDevice.from_ceph_volume_inventory(di),
- host_info["inventory"]))
+ if host_info.outdated(timeout_min) or refresh:
+ self.log.info("refresh stale inventory for '{}'".format(host))
+ data = self._get_device_inventory(host)
+ host_info = orchestrator.OutdatableData(data)
+ self.inventory_cache[host] = host_info
+ else:
+ self.log.debug("reading cached inventory for '{}'".format(host))
+ devices = orchestrator.InventoryDevice.from_ceph_volume_inventory_list(host_info.data)
return orchestrator.InventoryNode(host, devices)
results = []
return SSHReadCompletion(results)
+ @log_exceptions
def _create_osd(self, host, drive_group):
conn = self._get_connection(host)
try:
raise NotImplementedError("Removing managers is not supported")
# check that all the hosts are registered
- hosts = list(set(hosts))
self._require_hosts(hosts)
# we assume explicit placement by which there are the same number of