]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/test_orchestrator/module.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / test_orchestrator / module.py
index 0112ba4c51a6dec516aac447d21ae447a8171fef..276d0894655ae1362246baf6ec8effc22161eecc 100644 (file)
@@ -1,91 +1,64 @@
+import errno
 import json
 import re
 import os
 import threading
 import functools
-import uuid
+import itertools
 from subprocess import check_output, CalledProcessError
 
-from mgr_module import MgrModule, PersistentStoreDict
-
-import orchestrator
-
-
-
-
-class TestCompletionMixin(object):
-    all_completions = []  # Hacky global
-
-    def __init__(self, cb, message, *args, **kwargs):
-        super(TestCompletionMixin, self).__init__(*args, **kwargs)
-        self.cb = cb
-        self._result = None
-        self._complete = False
-
-        self.message = message
-        self.id = str(uuid.uuid4())
-
-        TestCompletionMixin.all_completions.append(self)
-
-    @property
-    def result(self):
-        return self._result
-
-    @property
-    def is_complete(self):
-        return self._complete
-
-    def execute(self):
-        self._result = self.cb()
-        self.executed = True
-        self._complete = True
-
-    def __str__(self):
-        return "{}(result={} message={}, exception={})".format(self.__class__.__name__, self.result,
-                                                               self.message, self.exception)
+from ceph.deployment.service_spec import NFSServiceSpec, ServiceSpec
 
+try:
+    from typing import Callable, List, Sequence, Tuple
+except ImportError:
+    pass  # type checking
 
-class TestReadCompletion(TestCompletionMixin, orchestrator.ReadCompletion):
-    def __init__(self, cb):
-        super(TestReadCompletion, self).__init__(cb, "<read op>")
+import six
 
+from ceph.deployment import inventory
+from ceph.deployment.drive_group import DriveGroupSpec
+from mgr_module import CLICommand, HandleCommandResult
+from mgr_module import MgrModule
 
-class TestWriteCompletion(TestCompletionMixin, orchestrator.WriteCompletion):
-    def __init__(self, cb, message):
-        super(TestWriteCompletion, self).__init__(cb, message)
+import orchestrator
 
-    @property
-    def is_persistent(self):
-        return (not self.is_errored) and self.executed
 
-    @property
-    def is_effective(self):
-        return self._complete
-
-
-def deferred_write(message):
-    def wrapper(f):
-        @functools.wraps(f)
-        def inner(*args, **kwargs):
-            return TestWriteCompletion(lambda: f(*args, **kwargs),
-                                       '{}, args={}, kwargs={}'.format(message, args, kwargs))
-        return inner
-    return wrapper
+class TestCompletion(orchestrator.Completion):
+    def evaluate(self):
+        self.finalize(None)
 
 
 def deferred_read(f):
+    # type: (Callable) -> Callable[..., TestCompletion]
     """
-    Decorator to make TestOrchestrator methods return
+    Decorator to make methods return
     a completion object that executes themselves.
     """
 
     @functools.wraps(f)
     def wrapper(*args, **kwargs):
-        return TestReadCompletion(lambda: f(*args, **kwargs))
+        return TestCompletion(on_complete=lambda _: f(*args, **kwargs))
 
     return wrapper
 
 
+def deferred_write(message):
+    def inner(f):
+        # type: (Callable) -> Callable[..., TestCompletion]
+
+        @functools.wraps(f)
+        def wrapper(self, *args, **kwargs):
+            return TestCompletion.with_progress(
+                message=message,
+                mgr=self,
+                on_complete=lambda _: f(self, *args, **kwargs),
+            )
+
+        return wrapper
+    return inner
+
+
 class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
     """
     This is an orchestrator implementation used for internal testing. It's meant for
@@ -96,32 +69,25 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
     The implementation is similar to the Rook orchestrator, but simpler.
     """
 
-    def wait(self, completions):
-        self.log.info("wait: completions={0}".format(completions))
-
-        # Our `wait` implementation is very simple because everything's
-        # just an API call.
-        for c in completions:
-            if not isinstance(c, TestReadCompletion) and \
-                    not isinstance(c, TestWriteCompletion):
-                raise TypeError(
-                    "wait() requires list of completions, not {0}".format(
-                        c.__class__
-                    ))
+    def process(self, completions):
+        # type: (List[TestCompletion]) -> None
+        if completions:
+            self.log.info("process: completions={0}".format(orchestrator.pretty_print(completions)))
 
-            if c.is_complete:
-                continue
+            for p in completions:
+                p.evaluate()
 
-            try:
-                c.execute()
-            except Exception as e:
-                self.log.exception("Completion {0} threw an exception:".format(
-                    c.message
-                ))
-                c.exception = e
-                c._complete = True
-
-        return all(c.is_complete for c in completions)
+    @CLICommand('test_orchestrator load_data', '', 'load dummy data into test orchestrator', 'w')
+    def _load_data(self, inbuf):
+        try:
+            data = json.loads(inbuf)
+            self._init_data(data)
+            return HandleCommandResult()
+        except json.decoder.JSONDecodeError as e:
+            msg = 'Invalid JSON file: {}'.format(e)
+            return HandleCommandResult(retval=-errno.EINVAL, stderr=msg)
+        except orchestrator.OrchestratorValidationError as e:
+            return HandleCommandResult(retval=-errno.EINVAL, stderr=str(e))
 
     def available(self):
         return True, ""
@@ -131,6 +97,8 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         self._initialized = threading.Event()
         self._shutdown = threading.Event()
+        self._init_data({})
+        self.all_progress_references = list()  # type: List[orchestrator.ProgressReference]
 
     def shutdown(self):
         self._shutdown.set()
@@ -144,19 +112,34 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
             # in case we had a caller that wait()'ed on them long enough
             # to get persistence but not long enough to get completion
 
-            self.wait(TestCompletionMixin.all_completions)
-            TestCompletionMixin.all_completions = [c for c in TestCompletionMixin.all_completions if
-                                                   not c.is_complete]
+            self.all_progress_references = [p for p in self.all_progress_references if not p.effective]
+            for p in self.all_progress_references:
+                p.update()
 
             self._shutdown.wait(5)
 
+    def _init_data(self, data=None):
+        self._inventory = [orchestrator.InventoryHost.from_json(inventory_host)
+                           for inventory_host in data.get('inventory', [])]
+        self._services = [orchestrator.ServiceDescription.from_json(service)
+                           for service in data.get('services', [])]
+        self._daemons = [orchestrator.DaemonDescription.from_json(daemon)
+                          for daemon in data.get('daemons', [])]
+
     @deferred_read
-    def get_inventory(self, node_filter=None, refresh=False):
+    def get_inventory(self, host_filter=None, refresh=False):
         """
         There is no guarantee which devices are returned by get_inventory.
         """
-        if node_filter and node_filter.nodes is not None:
-            assert isinstance(node_filter.nodes, list)
+        if host_filter and host_filter.hosts is not None:
+            assert isinstance(host_filter.hosts, list)
+
+        if self._inventory:
+            if host_filter:
+                return list(filter(lambda host: host.name in host_filter.hosts,
+                                   self._inventory))
+            return self._inventory
+
         try:
             c_v_out = check_output(['ceph-volume', 'inventory', '--format', 'json'])
         except OSError:
@@ -171,66 +154,181 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
 
         for out in c_v_out.splitlines():
             self.log.error(out)
-            devs = []
-            for device in json.loads(out):
-                dev = orchestrator.InventoryDevice.from_ceph_volume_inventory(device)
-                devs.append(dev)
-            return [orchestrator.InventoryNode('localhost', devs)]
+            devs = inventory.Devices.from_json(json.loads(out))
+            return [orchestrator.InventoryHost('localhost', devs)]
         self.log.error('c-v failed: ' + str(c_v_out))
         raise Exception('c-v failed')
 
+    def _get_ceph_daemons(self):
+        # type: () -> List[orchestrator.DaemonDescription]
+        """ Return ceph daemons on the running host."""
+        types = ("mds", "osd", "mon", "rgw", "mgr")
+        out = map(str, check_output(['ps', 'aux']).splitlines())
+        processes = [p for p in out if any(
+            [('ceph-' + t in p) for t in types])]
+
+        daemons = []
+        for p in processes:
+            daemon = orchestrator.DaemonDescription()
+            # parse daemon type
+            m = re.search('ceph-([^ ]+)', p)
+            if m:
+                _daemon_type = m.group(1)
+            else:
+                raise AssertionError('Fail to determine daemon type from {}'.format(p))
+
+            # parse daemon ID. Possible options: `-i <id>`, `--id=<id>`, `--id <id>`
+            patterns = ['-i\s(\w+)', '--id[\s=](\w+)']
+            daemon_id = None
+            for pattern in patterns:
+                m = re.search(pattern, p)
+                if m:
+                    daemon_id = m.group(1)
+                    break
+            else:
+                raise AssertionError('Fail to determine daemon ID from {}'.format(p))
+            daemon = orchestrator.DaemonDescription(
+                daemon_type=_daemon_type, daemon_id=daemon_id, hostname='localhost')
+            daemons.append(daemon)
+        return daemons
+
     @deferred_read
-    def describe_service(self, service_type=None, service_id=None, node_name=None, refresh=False):
+    def describe_service(self, service_type=None, service_name=None, refresh=False):
+        if self._services:
+            # Dummy data
+            services = self._services
+            # Can't deduce service type from dummy data (no daemons).
+            # Assume service_type is service_name.
+            if service_type is not None:
+                services = list(filter(lambda s: s.service_name == service_type, services))
+        else:
+            # Deduce services from daemons running on localhost
+            all_daemons = self._get_ceph_daemons()
+            services = []
+            for daemon_type, daemons in itertools.groupby(all_daemons, key=lambda d: d.daemon_type):
+                if service_type is not None and service_type != daemon_type:
+                    continue
+                daemon_size = len(list(daemons))
+                services.append(orchestrator.ServiceDescription(
+                    service_name=daemon_type, size=daemon_size, running=daemon_size))
+        
+        def _filter_func(svc):
+            if service_name is not None and service_name != svc.service_name:
+                return False
+            return True
+
+        return list(filter(_filter_func, services))
+
+    @deferred_read
+    def list_daemons(self, daemon_type=None, daemon_id=None, host=None, refresh=False):
         """
         There is no guarantee which daemons are returned by describe_service, except that
         it returns the mgr we're running in.
         """
-        if service_type:
-            assert service_type in ("mds", "osd", "mon", "rgw", "mgr"), service_type + " unsupported"
+        if daemon_type:
+            daemon_types = ("mds", "osd", "mon", "rgw", "mgr", "iscsi", "crash")
+            assert daemon_type in daemon_types, daemon_type + " unsupported"
 
-        out = map(str, check_output(['ps', 'aux']).splitlines())
-        types = [service_type] if service_type else ("mds", "osd", "mon", "rgw", "mgr")
-        processes = [p for p in out if any([('ceph-' + t in p) for t in types])]
+        daemons = self._daemons if self._daemons else self._get_ceph_daemons()
 
-        result = []
-        for p in processes:
-            sd = orchestrator.ServiceDescription()
-            sd.nodename = 'localhost'
-            sd.service_instance = re.search('ceph-[^ ]+', p).group()
-            result.append(sd)
+        def _filter_func(d):
+            if daemon_type is not None and daemon_type != d.daemon_type:
+                return False
+            if daemon_id is not None and daemon_id != d.daemon_id:
+                return False
+            if host is not None and host != d.hostname:
+                return False
+            return True
 
-        return result
+        return list(filter(_filter_func, daemons))
 
-    @deferred_write("Adding stateless service")
-    def add_stateless_service(self, service_type, spec):
-        pass
+    def create_osds(self, drive_group):
+        # type: (DriveGroupSpec) -> TestCompletion
+        """ Creates OSDs from a drive group specification.
 
-    @deferred_write("create_osds")
-    def create_osds(self, drive_group, all_hosts):
-        drive_group.validate(all_hosts)
+        $: ceph orch osd create -i <dg.file>
+
+        The drivegroup file must only contain one spec at a time.
+        """
 
-    @deferred_write("remove_osds")
-    def remove_osds(self, osd_ids):
-        assert isinstance(osd_ids, list)
+        def run(all_hosts):
+            # type: (List[orchestrator.HostSpec]) -> None
+            drive_group.validate()
+            if drive_group.placement.host_pattern:
+                if not drive_group.placement.pattern_matches_hosts([h.hostname for h in all_hosts]):
+                    raise orchestrator.OrchestratorValidationError('failed to match')
+        return self.get_hosts().then(run).then(
+            on_complete=orchestrator.ProgressReference(
+                message='create_osds',
+                mgr=self,
+            )
+        )
+
+    def apply_drivegroups(self, specs):
+        # type: (List[DriveGroupSpec]) -> TestCompletion
+        drive_group = specs[0]
+        def run(all_hosts):
+            # type: (List[orchestrator.HostSpec]) -> None
+            drive_group.validate()
+            if drive_group.placement.host_pattern:
+                if not drive_group.placement.pattern_matches_hosts([h.hostname for h in all_hosts]):
+                    raise orchestrator.OrchestratorValidationError('failed to match')
+        return self.get_hosts().then(run).then(
+            on_complete=orchestrator.ProgressReference(
+                message='apply_drivesgroups',
+                mgr=self,
+            )
+        )
+
+    @deferred_write("remove_daemons")
+    def remove_daemons(self, names):
+        assert isinstance(names, list)
+
+    @deferred_write("remove_service")
+    def remove_service(self, service_name):
+        assert isinstance(service_name, str)
+
+    @deferred_write("blink_device_light")
+    def blink_device_light(self, ident_fault, on, locations):
+        assert ident_fault in ("ident", "fault")
+        assert len(locations)
+        return ''
 
     @deferred_write("service_action")
-    def service_action(self, action, service_type, service_name=None, service_id=None):
+    def service_action(self, action, service_name):
         pass
 
-    @deferred_write("remove_stateless_service")
-    def remove_stateless_service(self, service_type, id_):
+    @deferred_write("daemon_action")
+    def daemon_action(self, action, daemon_type, daemon_id):
+        pass
+
+    @deferred_write("Adding NFS service")
+    def add_nfs(self, spec):
+        # type: (NFSServiceSpec) -> None
+        assert isinstance(spec.pool, str)
+
+    @deferred_write("apply_nfs")
+    def apply_nfs(self, spec):
         pass
 
-    @deferred_write("update_stateless_service")
-    def update_stateless_service(self, service_type, spec):
+    @deferred_write("add_mds")
+    def add_mds(self, spec):
+        pass
+
+    @deferred_write("add_rgw")
+    def add_rgw(self, spec):
         pass
 
     @deferred_read
     def get_hosts(self):
-        return [orchestrator.InventoryNode('localhost', [])]
+        if self._inventory:
+            return [orchestrator.HostSpec(i.name, i.addr, i.labels) for i in self._inventory]
+        return [orchestrator.HostSpec('localhost')]
 
     @deferred_write("add_host")
-    def add_host(self, host):
+    def add_host(self, spec):
+        # type: (orchestrator.HostSpec) -> None
+        host = spec.hostname
         if host == 'raise_no_support':
             raise orchestrator.OrchestratorValidationError("MON count must be either 1, 3 or 5")
         if host == 'raise_bug':
@@ -241,19 +339,23 @@ class TestOrchestrator(MgrModule, orchestrator.Orchestrator):
             raise orchestrator.NoOrchestrator()
         if host == 'raise_import_error':
             raise ImportError("test_orchestrator not enabled")
-        assert isinstance(host, str)
+        assert isinstance(host, six.string_types)
 
     @deferred_write("remove_host")
     def remove_host(self, host):
-        assert isinstance(host, str)
-
-    @deferred_write("update_mgrs")
-    def update_mgrs(self, num, hosts):
-        assert not hosts or len(hosts) == num
-        assert all([isinstance(h, str) for h in hosts])
-
-    @deferred_write("update_mons")
-    def update_mons(self, num, hosts):
-        assert not hosts or len(hosts) == num
-        assert all([isinstance(h[0], str) for h in hosts])
-        assert all([isinstance(h[1], str) or h[1] is None for h in hosts])
+        assert isinstance(host, six.string_types)
+
+    @deferred_write("apply_mgr")
+    def apply_mgr(self, spec):
+        # type: (ServiceSpec) -> None
+
+        assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
+        assert all([isinstance(h, str) for h in spec.placement.hosts])
+
+    @deferred_write("apply_mon")
+    def apply_mon(self, spec):
+        # type: (ServiceSpec) -> None
+
+        assert not spec.placement.hosts or len(spec.placement.hosts) == spec.placement.count
+        assert all([isinstance(h[0], str) for h in spec.placement.hosts])
+        assert all([isinstance(h[1], str) or h[1] is None for h in spec.placement.hosts])