# -*- coding: utf-8 -*-
# pylint: disable=too-many-arguments
-from __future__ import absolute_import
+import contextlib
import json
import logging
import threading
import time
-from typing import Any, Dict
+from typing import Any, Dict, List, Optional
+from unittest import mock
from unittest.mock import Mock
import cherrypy
from cherrypy._cptools import HandlerWrapperTool
from cherrypy.test import helper
from mgr_module import HandleCommandResult
+from orchestrator import HostSpec, InventoryHost
from pyfakefs import fake_filesystem
from .. import mgr
task_name = res['name']
task_metadata = res['metadata']
- # pylint: disable=protected-access
- class Waiter(threading.Thread):
- def __init__(self, task_name, task_metadata, tc):
- super(Waiter, self).__init__()
- self.task_name = task_name
- self.task_metadata = task_metadata
- self.ev = threading.Event()
- self.abort = False
- self.res_task = None
- self.tc = tc
-
- def run(self):
- running = True
- while running and not self.abort:
- logger.info("task (%s, %s) is still executing", self.task_name,
- self.task_metadata)
- time.sleep(1)
- self.tc._get('/api/task?name={}'.format(self.task_name), version=version)
- res = self.tc.json_body()
- for task in res['finished_tasks']:
- if task['metadata'] == self.task_metadata:
- # task finished
- running = False
- self.res_task = task
- self.ev.set()
-
- thread = Waiter(task_name, task_metadata, self)
+ thread = Waiter(task_name, task_metadata, self, version)
thread.start()
status = thread.ev.wait(timeout)
if not status:
logger.info("task (%s, %s) finished", task_name, task_metadata)
if thread.res_task['success']:
self.body = json.dumps(thread.res_task['ret_value'])
- if method == 'POST':
- self.status = '201 Created'
- elif method == 'PUT':
- self.status = '200 OK'
- elif method == 'DELETE':
- self.status = '204 No Content'
- return
-
- if 'status' in thread.res_task['exception']:
- self.status = thread.res_task['exception']['status']
+ self._set_success_status(method)
else:
- self.status = 500
- self.body = json.dumps(thread.res_task['exception'])
+ if 'status' in thread.res_task['exception']:
+ self.status = thread.res_task['exception']['status']
+ else:
+ self.status = 500
+ self.body = json.dumps(thread.res_task['exception'])
+
+ def _set_success_status(self, method):
+ if method == 'POST':
+ self.status = '201 Created'
+ elif method == 'PUT':
+ self.status = '200 OK'
+ elif method == 'DELETE':
+ self.status = '204 No Content'
def _task_post(self, url, data=None, timeout=60, version=APIVersion.DEFAULT):
self._task_request('POST', url, data, timeout, version=version)
'RGW_API_SECRET_KEY': 'fake-secret-key',
}
mgr.get_module_option = Mock(side_effect=settings.get)
+
+
+# pylint: disable=protected-access
+class Waiter(threading.Thread):
+ def __init__(self, task_name, task_metadata, tc, version):
+ super(Waiter, self).__init__()
+ self.task_name = task_name
+ self.task_metadata = task_metadata
+ self.ev = threading.Event()
+ self.abort = False
+ self.res_task = None
+ self.tc = tc
+ self.version = version
+
+ def run(self):
+ running = True
+ while running and not self.abort:
+ logger.info("task (%s, %s) is still executing", self.task_name,
+ self.task_metadata)
+ time.sleep(1)
+ self.tc._get('/api/task?name={}'.format(self.task_name), version=self.version)
+ res = self.tc.json_body()
+ for task in res['finished_tasks']:
+ if task['metadata'] == self.task_metadata:
+ # task finished
+ running = False
+ self.res_task = task
+ self.ev.set()
+
+
+@contextlib.contextmanager
+def patch_orch(available: bool, missing_features: Optional[List[str]] = None,
+ hosts: Optional[List[HostSpec]] = None,
+ inventory: Optional[List[dict]] = None):
+ with mock.patch('dashboard.controllers.orchestrator.OrchClient.instance') as instance:
+ fake_client = mock.Mock()
+ fake_client.available.return_value = available
+ fake_client.get_missing_features.return_value = missing_features
+
+ if hosts is not None:
+ fake_client.hosts.list.return_value = hosts
+
+ if inventory is not None:
+ def _list_inventory(hosts=None, refresh=False): # pylint: disable=unused-argument
+ inv_hosts = []
+ for inv_host in inventory:
+ if hosts is None or inv_host['name'] in hosts:
+ inv_hosts.append(InventoryHost.from_json(inv_host))
+ return inv_hosts
+ fake_client.inventory.list.side_effect = _list_inventory
+
+ instance.return_value = fake_client
+ yield fake_client