]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/pybind/mgr/dashboard/tests/__init__.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / __init__.py
index ba005c480d52ae8eea2d9094da0d96d88f1a9e83..2859e89a2599815cdf0a888711a0bc5714a5cee9 100644 (file)
@@ -1,18 +1,20 @@
 # -*- coding: utf-8 -*-
 # pylint: disable=too-many-arguments
-from __future__ import absolute_import
 
+import contextlib
 import json
 import logging
 import threading
 import time
-from typing import Any, Dict
+from typing import Any, Dict, List, Optional
+from unittest import mock
 from unittest.mock import Mock
 
 import cherrypy
 from cherrypy._cptools import HandlerWrapperTool
 from cherrypy.test import helper
 from mgr_module import HandleCommandResult
+from orchestrator import HostSpec, InventoryHost
 from pyfakefs import fake_filesystem
 
 from .. import mgr
@@ -210,33 +212,7 @@ class ControllerTestCase(helper.CPWebCase):
         task_name = res['name']
         task_metadata = res['metadata']
 
-        # pylint: disable=protected-access
-        class Waiter(threading.Thread):
-            def __init__(self, task_name, task_metadata, tc):
-                super(Waiter, self).__init__()
-                self.task_name = task_name
-                self.task_metadata = task_metadata
-                self.ev = threading.Event()
-                self.abort = False
-                self.res_task = None
-                self.tc = tc
-
-            def run(self):
-                running = True
-                while running and not self.abort:
-                    logger.info("task (%s, %s) is still executing", self.task_name,
-                                self.task_metadata)
-                    time.sleep(1)
-                    self.tc._get('/api/task?name={}'.format(self.task_name), version=version)
-                    res = self.tc.json_body()
-                    for task in res['finished_tasks']:
-                        if task['metadata'] == self.task_metadata:
-                            # task finished
-                            running = False
-                            self.res_task = task
-                            self.ev.set()
-
-        thread = Waiter(task_name, task_metadata, self)
+        thread = Waiter(task_name, task_metadata, self, version)
         thread.start()
         status = thread.ev.wait(timeout)
         if not status:
@@ -248,19 +224,21 @@ class ControllerTestCase(helper.CPWebCase):
         logger.info("task (%s, %s) finished", task_name, task_metadata)
         if thread.res_task['success']:
             self.body = json.dumps(thread.res_task['ret_value'])
-            if method == 'POST':
-                self.status = '201 Created'
-            elif method == 'PUT':
-                self.status = '200 OK'
-            elif method == 'DELETE':
-                self.status = '204 No Content'
-            return
-
-        if 'status' in thread.res_task['exception']:
-            self.status = thread.res_task['exception']['status']
+            self._set_success_status(method)
         else:
-            self.status = 500
-        self.body = json.dumps(thread.res_task['exception'])
+            if 'status' in thread.res_task['exception']:
+                self.status = thread.res_task['exception']['status']
+            else:
+                self.status = 500
+            self.body = json.dumps(thread.res_task['exception'])
+
+    def _set_success_status(self, method):
+        if method == 'POST':
+            self.status = '201 Created'
+        elif method == 'PUT':
+            self.status = '200 OK'
+        elif method == 'DELETE':
+            self.status = '204 No Content'
 
     def _task_post(self, url, data=None, timeout=60, version=APIVersion.DEFAULT):
         self._task_request('POST', url, data, timeout, version=version)
@@ -334,3 +312,56 @@ class RgwStub(Stub):
             'RGW_API_SECRET_KEY': 'fake-secret-key',
         }
         mgr.get_module_option = Mock(side_effect=settings.get)
+
+
+# pylint: disable=protected-access
+class Waiter(threading.Thread):
+    def __init__(self, task_name, task_metadata, tc, version):
+        super(Waiter, self).__init__()
+        self.task_name = task_name
+        self.task_metadata = task_metadata
+        self.ev = threading.Event()
+        self.abort = False
+        self.res_task = None
+        self.tc = tc
+        self.version = version
+
+    def run(self):
+        running = True
+        while running and not self.abort:
+            logger.info("task (%s, %s) is still executing", self.task_name,
+                        self.task_metadata)
+            time.sleep(1)
+            self.tc._get('/api/task?name={}'.format(self.task_name), version=self.version)
+            res = self.tc.json_body()
+            for task in res['finished_tasks']:
+                if task['metadata'] == self.task_metadata:
+                    # task finished
+                    running = False
+                    self.res_task = task
+                    self.ev.set()
+
+
+@contextlib.contextmanager
+def patch_orch(available: bool, missing_features: Optional[List[str]] = None,
+               hosts: Optional[List[HostSpec]] = None,
+               inventory: Optional[List[dict]] = None):
+    with mock.patch('dashboard.controllers.orchestrator.OrchClient.instance') as instance:
+        fake_client = mock.Mock()
+        fake_client.available.return_value = available
+        fake_client.get_missing_features.return_value = missing_features
+
+        if hosts is not None:
+            fake_client.hosts.list.return_value = hosts
+
+        if inventory is not None:
+            def _list_inventory(hosts=None, refresh=False):  # pylint: disable=unused-argument
+                inv_hosts = []
+                for inv_host in inventory:
+                    if hosts is None or inv_host['name'] in hosts:
+                        inv_hosts.append(InventoryHost.from_json(inv_host))
+                return inv_hosts
+            fake_client.inventory.list.side_effect = _list_inventory
+
+        instance.return_value = fake_client
+        yield fake_client