+import errno
import json
import re
import os
import threading
import functools
-import uuid
+import itertools
from subprocess import check_output, CalledProcessError
-from mgr_module import MgrModule, PersistentStoreDict
-
-import orchestrator
-
-
-
-
-class TestCompletionMixin(object):
- all_completions = [] # Hacky global
-
- def __init__(self, cb, message, *args, **kwargs):
- super(TestCompletionMixin, self).__init__(*args, **kwargs)
- self.cb = cb
- self._result = None
- self._complete = False
-
- self.message = message
- self.id = str(uuid.uuid4())
-
- TestCompletionMixin.all_completions.append(self)
-
- @property
- def result(self):
- return self._result
-
- @property
- def is_complete(self):
- return self._complete
-
- def execute(self):
- self._result = self.cb()
- self.executed = True
- self._complete = True
-
- def __str__(self):
- return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
- self.message, self.exception)
+from ceph.deployment.service_spec import NFSServiceSpec, ServiceSpec
+try:
+ from typing import Callable, List, Sequence, Tuple
+except ImportError:
+ pass # type checking
-class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion):
- def __init__(self, cb):
- super(TestReadCompletion, self).__init__(cb, "<read op>")
+import six
+from ceph.deployment import inventory
+from ceph.deployment.drive_group import DriveGroupSpec
+from mgr_module import CLICommand, HandleCommandResult
+from mgr_module import MgrModule
-class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion):
- def __init__(self, cb, message):
- super(TestWriteCompletion, self).__init__(cb, message)
+import orchestrator
- @property
- def is_persistent(self):
- return (not self.is_errored) and self.executed
- @property
- def is_effective(self):
- return self._complete
-
-
-def deferred_write(message):
- def wrapper(f):
- @functools.wraps(f)
- def inner(*args, **kwargs):
- return TestWriteCompletion(lambda: f(*args, **kwargs),
- '{}, args={}, kwargs={}'.format(message, args, kwargs))
- return inner
- return wrapper
+class TestCompletion(orchestrator.Completion):
+ def evaluate(self):
+ self.finalize(None)
def deferred_read(f):
+ # type: (Callable) -> Callable[..., TestCompletion]
"""
- Decorator to make TestOrchestrator methods return
+ Decorator to make methods return
a completion object that executes themselves.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
- return TestReadCompletion(lambda: f(*args, **kwargs))
+ return TestCompletion(on_complete=lambda _: f(*args, **kwargs))
return wrapper
+def deferred_write(message):
+ def inner(f):
+ # type: (Callable) -> Callable[..., TestCompletion]
+
+ @functools.wraps(f)
+ def wrapper(self, *args, **kwargs):
+ return TestCompletion.with_progress(
+ message=message,
+ mgr=self,
+ on_complete=lambda _: f(self, *args, **kwargs),
+ )
+
+ return wrapper
+ return inner
+
+
class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
"""
This is an orchestrator implementation used for internal testing. It's meant for
The implementation is similar to the Rook orchestrator, but simpler.
"""
- def wait(self, completions):
- self.log.info("wait: completions={0}".format(completions))
-
- # Our `wait` implementation is very simple because everything's
- # just an API call.
- for c in completions:
- if not isinstance(c, TestReadCompletion) and \
- not isinstance(c, TestWriteCompletion):
- raise TypeError(
- "wait() requires list of completions, not {0}".format(
- c.__class__
- ))
+ def process(self, completions):
+ # type: (List[TestCompletion]) -> None
+ if completions:
+ self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
- if c.is_complete:
- continue
+ for p in completions:
+ p.evaluate()
- try:
- c.execute()
- except Exception as e:
- self.log.exception("Completion {0} threw an exception:".format(
- c.message
- ))
- c.exception = e
- c._complete = True
-
- return all(c.is_complete for c in completions)
+ @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
+ def _load_data(self, inbuf):
+ try:
+ data = json.loads(inbuf)
+ self._init_data(data)
+ return HandleCommandResult()
+ except json.decoder.JSONDecodeError as e:
+ msg = 'Invalid JSON file: {}'.format(e)
+ return HandleCommandResult(retval=-errno.EINVAL, stderr=msg)
+ except orchestrator.OrchestratorValidationError as e:
+ return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e))
def available(self):
return True, ""
self._initialized = threading.Event()
self._shutdown = threading.Event()
+ self._init_data({})
+ self.all_progress_references = list() # type: List[orchestrator.ProgressReference]
def shutdown(self):
self._shutdown.set()
# in case we had a caller that wait()'ed on them long enough
# to get persistence but not long enough to get completion
- self.wait(TestCompletionMixin.all_completions)
- TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if
- not c.is_complete]
+ self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
+ for p in self.all_progress_references:
+ p.update()
self._shutdown.wait(5)
+ def _init_data(self, data=None):
+ self._inventory = [orchestrator.InventoryHost.from_json(inventory_host)
+ for inventory_host in data.get('inventory', [])]
+ self._services = [orchestrator.ServiceDescription.from_json(service)
+ for service in data.get('services', [])]
+ self._daemons = [orchestrator.DaemonDescription.from_json(daemon)
+ for daemon in data.get('daemons', [])]
+
@deferred_read
- def get_inventory(self, node_filter=None, refresh=False):
+ def get_inventory(self, host_filter=None, refresh=False):
"""
There is no guarantee which devices are returned by get_inventory.
"""
- if node_filter and node_filter.nodes is not None:
- assert isinstance(node_filter.nodes, list)
+ if host_filter and host_filter.hosts is not None:
+ assert isinstance(host_filter.hosts, list)
+
+ if self._inventory:
+ if host_filter:
+ return list(filter(lambda host: host.name in host_filter.hosts,
+ self._inventory))
+ return self._inventory
+
try:
c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
except OSError:
for out in c_v_out.splitlines():
self.log.error(out)
- devs = []
- for device in json.loads(out):
- dev = orchestrator.InventoryDevice.from_ceph_volume_inventory(device)
- devs.append(dev)
- return [orchestrator.InventoryNode('localhost', devs)]
+ devs = inventory.Devices.from_json(json.loads(out))
+ return [orchestrator.InventoryHost('localhost', devs)]
self.log.error('c-v failed: ' + str(c_v_out))
raise Exception('c-v failed')
+ def _get_ceph_daemons(self):
+ # type: () -> List[orchestrator.DaemonDescription]
+ """ Return ceph daemons on the running host."""
+ types = ("mds", "osd", "mon", "rgw", "mgr")
+ out = map(str, check_output(['ps', 'aux']).splitlines())
+ processes = [p for p in out if any(
+ [('ceph-' + t in p) for t in types])]
+
+ daemons = []
+ for p in processes:
+ daemon = orchestrator.DaemonDescription()
+ # parse daemon type
+ m = re.search('ceph-([^ ]+)', p)
+ if m:
+ _daemon_type = m.group(1)
+ else:
+ raise AssertionError('Fail to determine daemon type from {}'.format(p))
+
+ # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>`
+ patterns = ['-i\s(\w+)', '--id[\s=](\w+)']
+ daemon_id = None
+ for pattern in patterns:
+ m = re.search(pattern, p)
+ if m:
+ daemon_id = m.group(1)
+ break
+ else:
+ raise AssertionError('Fail to determine daemon ID from {}'.format(p))
+ daemon = orchestrator.DaemonDescription(
+ daemon_type=_daemon_type, daemon_id=daemon_id, hostname='localhost')
+ daemons.append(daemon)
+ return daemons
+
@deferred_read
- def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
+ def describe_service(self, service_type=None, service_name=None, refresh=False):
+ if self._services:
+ # Dummy data
+ services = self._services
+ # Can't deduce service type from dummy data (no daemons).
+ # Assume service_type is service_name.
+ if service_type is not None:
+ services = list(filter(lambda s: s.service_name == service_type, services))
+ else:
+ # Deduce services from daemons running on localhost
+ all_daemons = self._get_ceph_daemons()
+ services = []
+ for daemon_type, daemons in itertools.groupby(all_daemons, key=lambda d: d.daemon_type):
+ if service_type is not None and service_type != daemon_type:
+ continue
+ daemon_size = len(list(daemons))
+ services.append(orchestrator.ServiceDescription(
+ service_name=daemon_type, size=daemon_size, running=daemon_size))
+
+ def _filter_func(svc):
+ if service_name is not None and service_name != svc.service_name:
+ return False
+ return True
+
+ return list(filter(_filter_func, services))
+
+ @deferred_read
+ def list_daemons(self, daemon_type=None, daemon_id=None, host=None, refresh=False):
"""
There is no guarantee which daemons are returned by describe_service, except that
it returns the mgr we're running in.
"""
- if service_type:
- assert service_type in ("mds", "osd", "mon", "rgw", "mgr"), service_type + " unsupported"
+ if daemon_type:
+ daemon_types = ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash")
+ assert daemon_type in daemon_types, daemon_type + " unsupported"
- out = map(str, check_output(['ps', 'aux']).splitlines())
- types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr")
- processes = [p for p in out if any([('ceph-' + t in p) for t in types])]
+ daemons = self._daemons if self._daemons else self._get_ceph_daemons()
- result = []
- for p in processes:
- sd = orchestrator.ServiceDescription()
- sd.nodename = 'localhost'
- sd.service_instance = re.search('ceph-[^ ]+', p).group()
- result.append(sd)
+ def _filter_func(d):
+ if daemon_type is not None and daemon_type != d.daemon_type:
+ return False
+ if daemon_id is not None and daemon_id != d.daemon_id:
+ return False
+ if host is not None and host != d.hostname:
+ return False
+ return True
- return result
+ return list(filter(_filter_func, daemons))
- @deferred_write("Adding stateless service")
- def add_stateless_service(self, service_type, spec):
- pass
+ def create_osds(self, drive_group):
+ # type: (DriveGroupSpec) -> TestCompletion
+ """ Creates OSDs from a drive group specification.
- @deferred_write("create_osds")
- def create_osds(self, drive_group, all_hosts):
- drive_group.validate(all_hosts)
+ $: ceph orch osd create -i <dg.file>
+
+ The drivegroup file must only contain one spec at a time.
+ """
- @deferred_write("remove_osds")
- def remove_osds(self, osd_ids):
- assert isinstance(osd_ids, list)
+ def run(all_hosts):
+ # type: (List[orchestrator.HostSpec]) -> None
+ drive_group.validate()
+ if drive_group.placement.host_pattern:
+ if not drive_group.placement.pattern_matches_hosts([h.hostname for h in all_hosts]):
+ raise orchestrator.OrchestratorValidationError('failed to match')
+ return self.get_hosts().then(run).then(
+ on_complete=orchestrator.ProgressReference(
+ message='create_osds',
+ mgr=self,
+ )
+ )
+
+ def apply_drivegroups(self, specs):
+ # type: (List[DriveGroupSpec]) -> TestCompletion
+ drive_group = specs[0]
+ def run(all_hosts):
+ # type: (List[orchestrator.HostSpec]) -> None
+ drive_group.validate()
+ if drive_group.placement.host_pattern:
+ if not drive_group.placement.pattern_matches_hosts([h.hostname for h in all_hosts]):
+ raise orchestrator.OrchestratorValidationError('failed to match')
+ return self.get_hosts().then(run).then(
+ on_complete=orchestrator.ProgressReference(
+ message='apply_drivesgroups',
+ mgr=self,
+ )
+ )
+
+ @deferred_write("remove_daemons")
+ def remove_daemons(self, names):
+ assert isinstance(names, list)
+
+ @deferred_write("remove_service")
+ def remove_service(self, service_name):
+ assert isinstance(service_name, str)
+
+ @deferred_write("blink_device_light")
+ def blink_device_light(self, ident_fault, on, locations):
+ assert ident_fault in ("ident", "fault")
+ assert len(locations)
+ return ''
@deferred_write("service_action")
- def service_action(self, action, service_type, service_name=None, service_id=None):
+ def service_action(self, action, service_name):
pass
- @deferred_write("remove_stateless_service")
- def remove_stateless_service(self, service_type, id_):
+ @deferred_write("daemon_action")
+ def daemon_action(self, action, daemon_type, daemon_id):
+ pass
+
+ @deferred_write("Adding NFS service")
+ def add_nfs(self, spec):
+ # type: (NFSServiceSpec) -> None
+ assert isinstance(spec.pool, str)
+
+ @deferred_write("apply_nfs")
+ def apply_nfs(self, spec):
pass
- @deferred_write("update_stateless_service")
- def update_stateless_service(self, service_type, spec):
+ @deferred_write("add_mds")
+ def add_mds(self, spec):
+ pass
+
+ @deferred_write("add_rgw")
+ def add_rgw(self, spec):
pass
@deferred_read
def get_hosts(self):
- return [orchestrator.InventoryNode('localhost', [])]
+ if self._inventory:
+ return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory]
+ return [orchestrator.HostSpec('localhost')]
@deferred_write("add_host")
- def add_host(self, host):
+ def add_host(self, spec):
+ # type: (orchestrator.HostSpec) -> None
+ host = spec.hostname
if host == 'raise_no_support':
raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
if host == 'raise_bug':
raise orchestrator.NoOrchestrator()
if host == 'raise_import_error':
raise ImportError("test_orchestrator not enabled")
- assert isinstance(host, str)
+ assert isinstance(host, six.string_types)
@deferred_write("remove_host")
def remove_host(self, host):
- assert isinstance(host, str)
-
- @deferred_write("update_mgrs")
- def update_mgrs(self, num, hosts):
- assert not hosts or len(hosts) == num
- assert all([isinstance(h, str) for h in hosts])
-
- @deferred_write("update_mons")
- def update_mons(self, num, hosts):
- assert not hosts or len(hosts) == num
- assert all([isinstance(h[0], str) for h in hosts])
- assert all([isinstance(h[1], str) or h[1] is None for h in hosts])
+ assert isinstance(host, six.string_types)
+
+ @deferred_write("apply_mgr")
+ def apply_mgr(self, spec):
+ # type: (ServiceSpec) -> None
+
+ assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
+ assert all([isinstance(h, str) for h in spec.placement.hosts])
+
+ @deferred_write("apply_mon")
+ def apply_mon(self, spec):
+ # type: (ServiceSpec) -> None
+
+ assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
+ assert all([isinstance(h[0], str) for h in spec.placement.hosts])
+ assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts])