# -*- coding: utf-8 -*-
# pylint: disable=too-many-arguments
-from __future__ import absolute_import
+import contextlib
import json
import logging
import threading
-import sys
import time
+from typing import Any, Dict, List, Optional
+from unittest import mock
+from unittest.mock import Mock
import cherrypy
from cherrypy._cptools import HandlerWrapperTool
from cherrypy.test import helper
+from mgr_module import HandleCommandResult
+from orchestrator import DaemonDescription, HostSpec, InventoryHost
from pyfakefs import fake_filesystem
-from mgr_module import CLICommand
-
from .. import mgr
-from ..controllers import json_error_page, generate_controller_routes
+from ..controllers import generate_controller_routes, json_error_page
+from ..controllers._version import APIVersion
+from ..module import Module
+from ..plugins import PLUGIN_MANAGER, debug, feature_toggles # noqa
from ..services.auth import AuthManagerTool
from ..services.exception import dashboard_exception_handler
-
-from ..plugins import PLUGIN_MANAGER
-from ..plugins import feature_toggles, debug # noqa # pylint: disable=unused-import
-
+from ..tools import RequestLoggingTool
PLUGIN_MANAGER.hook.init()
PLUGIN_MANAGER.hook.register_commands()
logger = logging.getLogger('tests')
+class ModuleTestClass(Module):
+ """Dashboard module subclass for testing the module methods."""
+
+ def __init__(self) -> None:
+ pass
+
+ def _unconfigure_logging(self) -> None:
+ pass
+
+
class CmdException(Exception):
def __init__(self, retcode, message):
super(CmdException, self).__init__(message)
self.retcode = retcode
-def exec_dashboard_cmd(command_handler, cmd, **kwargs):
- cmd_dict = {'prefix': 'dashboard {}'.format(cmd)}
- cmd_dict.update(kwargs)
- if cmd_dict['prefix'] not in CLICommand.COMMANDS:
- ret, out, err = command_handler(cmd_dict)
- if ret < 0:
- raise CmdException(ret, err)
- try:
- return json.loads(out)
- except ValueError:
- return out
-
- ret, out, err = CLICommand.COMMANDS[cmd_dict['prefix']].call(mgr, cmd_dict,
- None)
- if ret < 0:
- raise CmdException(ret, err)
- try:
- return json.loads(out)
- except ValueError:
- return out
-
-
class KVStoreMockMixin(object):
CONFIG_KEY_DICT = {}
return cls.CONFIG_KEY_DICT.get(key, None)
+# pylint: disable=protected-access
class CLICommandTestMixin(KVStoreMockMixin):
+ _dashboard_module = ModuleTestClass()
+
@classmethod
def exec_cmd(cls, cmd, **kwargs):
- return exec_dashboard_cmd(None, cmd, **kwargs)
+ inbuf = kwargs['inbuf'] if 'inbuf' in kwargs else None
+ cmd_dict = {'prefix': 'dashboard {}'.format(cmd)}
+ cmd_dict.update(kwargs)
+
+ result = HandleCommandResult(*cls._dashboard_module._handle_command(inbuf, cmd_dict))
+
+ if result.retval < 0:
+ raise CmdException(result.retval, result.stderr)
+ try:
+ return json.loads(result.stdout)
+ except ValueError:
+ return result.stdout
class FakeFsMixin(object):
fs = fake_filesystem.FakeFilesystem()
f_open = fake_filesystem.FakeFileOpen(fs)
f_os = fake_filesystem.FakeOsModule(fs)
-
- if sys.version_info > (3, 0):
- builtins_open = 'builtins.open'
- else:
- builtins_open = '__builtin__.open'
+ builtins_open = 'builtins.open'
class ControllerTestCase(helper.CPWebCase):
_endpoints_cache = {}
@classmethod
- def setup_controllers(cls, ctrl_classes, base_url=''):
+ def setup_controllers(cls, ctrl_classes, base_url='', cp_config: Dict[str, Any] = None):
if not isinstance(ctrl_classes, list):
ctrl_classes = [ctrl_classes]
mapper = cherrypy.dispatch.RoutesDispatcher()
endpoint_list = []
for ctrl in ctrl_classes:
+ ctrl._cp_config = {
+ 'tools.dashboard_exception_handler.on': True,
+ 'tools.authenticate.on': False
+ }
+ if cp_config:
+ ctrl._cp_config.update(cp_config)
inst = ctrl()
# We need to cache the controller endpoints because
cherrypy.tree.mount(None, config={
base_url: {'request.dispatch': mapper}})
- def __init__(self, *args, **kwargs):
+ _request_logging = False
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
cherrypy.tools.authenticate = AuthManagerTool()
cherrypy.tools.dashboard_exception_handler = HandlerWrapperTool(dashboard_exception_handler,
priority=31)
'tools.json_in.force': False
})
PLUGIN_MANAGER.hook.configure_cherrypy(config=cherrypy.config)
- super(ControllerTestCase, self).__init__(*args, **kwargs)
- def _request(self, url, method, data=None, headers=None):
+ if cls._request_logging:
+ cherrypy.tools.request_logging = RequestLoggingTool()
+ cherrypy.config.update({'tools.request_logging.on': True})
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls._request_logging:
+ cherrypy.config.update({'tools.request_logging.on': False})
+
+ def _request(self, url, method, data=None, headers=None, version=APIVersion.DEFAULT):
if not data:
b = None
- h = None
+ if version:
+ h = [('Accept', version.to_mime_type()),
+ ('Content-Length', '0')]
+ else:
+ h = None
else:
b = json.dumps(data)
- h = [('Content-Type', 'application/json'),
- ('Content-Length', str(len(b)))]
+ if version is not None:
+ h = [('Accept', version.to_mime_type()),
+ ('Content-Type', 'application/json'),
+ ('Content-Length', str(len(b)))]
+
+ else:
+ h = [('Content-Type', 'application/json'),
+ ('Content-Length', str(len(b)))]
+
if headers:
h = headers
self.getPage(url, method=method, body=b, headers=h)
- def _get(self, url, headers=None):
- self._request(url, 'GET', headers=headers)
+ def _get(self, url, headers=None, version=APIVersion.DEFAULT):
+ self._request(url, 'GET', headers=headers, version=version)
- def _post(self, url, data=None):
- self._request(url, 'POST', data)
+ def _post(self, url, data=None, version=APIVersion.DEFAULT):
+ self._request(url, 'POST', data, version=version)
- def _delete(self, url, data=None):
- self._request(url, 'DELETE', data)
+ def _delete(self, url, data=None, version=APIVersion.DEFAULT):
+ self._request(url, 'DELETE', data, version=version)
- def _put(self, url, data=None):
- self._request(url, 'PUT', data)
+ def _put(self, url, data=None, version=APIVersion.DEFAULT):
+ self._request(url, 'PUT', data, version=version)
- def _task_request(self, method, url, data, timeout):
- self._request(url, method, data)
+ def _task_request(self, method, url, data, timeout, version=APIVersion.DEFAULT):
+ self._request(url, method, data, version=version)
if self.status != '202 Accepted':
logger.info("task finished immediately")
return
task_name = res['name']
task_metadata = res['metadata']
- # pylint: disable=protected-access
- class Waiter(threading.Thread):
- def __init__(self, task_name, task_metadata, tc):
- super(Waiter, self).__init__()
- self.task_name = task_name
- self.task_metadata = task_metadata
- self.ev = threading.Event()
- self.abort = False
- self.res_task = None
- self.tc = tc
-
- def run(self):
- running = True
- while running and not self.abort:
- logger.info("task (%s, %s) is still executing", self.task_name,
- self.task_metadata)
- time.sleep(1)
- self.tc._get('/api/task?name={}'.format(self.task_name))
- res = self.tc.json_body()
- for task in res['finished_tasks']:
- if task['metadata'] == self.task_metadata:
- # task finished
- running = False
- self.res_task = task
- self.ev.set()
-
- thread = Waiter(task_name, task_metadata, self)
+ thread = Waiter(task_name, task_metadata, self, version)
thread.start()
status = thread.ev.wait(timeout)
if not status:
logger.info("task (%s, %s) finished", task_name, task_metadata)
if thread.res_task['success']:
self.body = json.dumps(thread.res_task['ret_value'])
- if method == 'POST':
- self.status = '201 Created'
- elif method == 'PUT':
- self.status = '200 OK'
- elif method == 'DELETE':
- self.status = '204 No Content'
- return
-
- if 'status' in thread.res_task['exception']:
- self.status = thread.res_task['exception']['status']
+ self._set_success_status(method)
else:
- self.status = 500
- self.body = json.dumps(thread.res_task['exception'])
+ if 'status' in thread.res_task['exception']:
+ self.status = thread.res_task['exception']['status']
+ else:
+ self.status = 500
+ self.body = json.dumps(thread.res_task['exception'])
- def _task_post(self, url, data=None, timeout=60):
- self._task_request('POST', url, data, timeout)
+ def _set_success_status(self, method):
+ if method == 'POST':
+ self.status = '201 Created'
+ elif method == 'PUT':
+ self.status = '200 OK'
+ elif method == 'DELETE':
+ self.status = '204 No Content'
- def _task_delete(self, url, timeout=60):
- self._task_request('DELETE', url, None, timeout)
+ def _task_post(self, url, data=None, timeout=60, version=APIVersion.DEFAULT):
+ self._task_request('POST', url, data, timeout, version=version)
- def _task_put(self, url, data=None, timeout=60):
- self._task_request('PUT', url, data, timeout)
+ def _task_delete(self, url, timeout=60, version=APIVersion.DEFAULT):
+ self._task_request('DELETE', url, None, timeout, version=version)
+
+ def _task_put(self, url, data=None, timeout=60, version=APIVersion.DEFAULT):
+ self._task_request('PUT', url, data, timeout, version=version)
def json_body(self):
body_str = self.body.decode('utf-8') if isinstance(self.body, bytes) else self.body
if msg is None:
msg = 'expected %r to be in %r' % (data, json_body)
self._handlewebError(msg)
+
+
+class Stub:
+ """Test class for returning predefined values"""
+
+ @classmethod
+ def get_mgr_no_services(cls):
+ mgr.get = Mock(return_value={})
+
+
+class RgwStub(Stub):
+
+ @classmethod
+ def get_daemons(cls):
+ mgr.get = Mock(return_value={'services': {'rgw': {'daemons': {
+ '5297': {
+ 'addr': '192.168.178.3:49774/1534999298',
+ 'metadata': {
+ 'frontend_config#0': 'beast port=8000',
+ 'id': 'daemon1',
+ 'realm_name': 'realm1',
+ 'zonegroup_name': 'zonegroup1',
+ 'zone_name': 'zone1',
+ 'hostname': 'daemon1.server.lan'
+ }
+ },
+ '5398': {
+ 'addr': '[2001:db8:85a3::8a2e:370:7334]:49774/1534999298',
+ 'metadata': {
+ 'frontend_config#0': 'civetweb port=8002',
+ 'id': 'daemon2',
+ 'realm_name': 'realm2',
+ 'zonegroup_name': 'zonegroup2',
+ 'zone_name': 'zone2',
+ 'hostname': 'daemon2.server.lan'
+ }
+ }
+ }}}})
+
+ @classmethod
+ def get_settings(cls):
+ settings = {
+ 'RGW_API_ACCESS_KEY': 'fake-access-key',
+ 'RGW_API_SECRET_KEY': 'fake-secret-key',
+ }
+ mgr.get_module_option = Mock(side_effect=settings.get)
+
+
+# pylint: disable=protected-access
+class Waiter(threading.Thread):
+ def __init__(self, task_name, task_metadata, tc, version):
+ super(Waiter, self).__init__()
+ self.task_name = task_name
+ self.task_metadata = task_metadata
+ self.ev = threading.Event()
+ self.abort = False
+ self.res_task = None
+ self.tc = tc
+ self.version = version
+
+ def run(self):
+ running = True
+ while running and not self.abort:
+ logger.info("task (%s, %s) is still executing", self.task_name,
+ self.task_metadata)
+ time.sleep(1)
+ self.tc._get('/api/task?name={}'.format(self.task_name), version=self.version)
+ res = self.tc.json_body()
+ for task in res['finished_tasks']:
+ if task['metadata'] == self.task_metadata:
+ # task finished
+ running = False
+ self.res_task = task
+ self.ev.set()
+
+
+@contextlib.contextmanager
+def patch_orch(available: bool, missing_features: Optional[List[str]] = None,
+ hosts: Optional[List[HostSpec]] = None,
+ inventory: Optional[List[dict]] = None,
+ daemons: Optional[List[DaemonDescription]] = None):
+ with mock.patch('dashboard.controllers.orchestrator.OrchClient.instance') as instance:
+ fake_client = mock.Mock()
+ fake_client.available.return_value = available
+ fake_client.get_missing_features.return_value = missing_features
+
+ if not daemons:
+ daemons = [
+ DaemonDescription(
+ daemon_type='mon',
+ daemon_id='a',
+ hostname='node0'
+ )
+ ]
+ fake_client.services.list_daemons.return_value = daemons
+ if hosts is not None:
+ fake_client.hosts.list.return_value = hosts
+
+ if inventory is not None:
+ def _list_inventory(hosts=None, refresh=False): # pylint: disable=unused-argument
+ inv_hosts = []
+ for inv_host in inventory:
+ if hosts is None or inv_host['name'] in hosts:
+ inv_hosts.append(InventoryHost.from_json(inv_host))
+ return inv_hosts
+ fake_client.inventory.list.side_effect = _list_inventory
+
+ instance.return_value = fake_client
+ yield fake_client