# -*- coding: utf-8 -*-
# pylint: disable=too-many-public-methods
-import unittest
+from unittest import TestCase
+from unittest.mock import Mock, patch
-try:
- from unittest.mock import patch
-except ImportError:
- from mock import patch # type: ignore
-
-from ..services.rgw_client import RgwClient, _parse_frontend_config
+from ..exceptions import DashboardException
+from ..services.rgw_client import NoCredentialsException, \
+ NoRgwDaemonsException, RgwClient, _parse_frontend_config
from ..settings import Settings
-from . import KVStoreMockMixin
-
-
-def _dummy_daemon_info():
- return {
- 'addr': '172.20.0.2:0/256594694',
- 'metadata': {
- 'zonegroup_name': 'zonegroup2-realm1'
- }
- }
+from . import KVStoreMockMixin, RgwStub # pylint: disable=no-name-in-module
-@patch('dashboard.services.rgw_client._get_daemon_info', _dummy_daemon_info)
-class RgwClientTest(unittest.TestCase, KVStoreMockMixin):
+@patch('dashboard.services.rgw_client.RgwClient._get_user_id', Mock(
+ return_value='dummy_admin'))
+class RgwClientTest(TestCase, KVStoreMockMixin):
def setUp(self):
- RgwClient._user_instances.clear() # pylint: disable=protected-access
+ RgwStub.get_daemons()
self.mock_kv_store()
self.CONFIG_KEY_DICT.update({
'RGW_API_ACCESS_KEY': 'klausmustermann',
'RGW_API_SECRET_KEY': 'supergeheim',
- 'RGW_API_HOST': 'localhost',
- 'RGW_API_USER_ID': 'rgwadmin'
})
def test_ssl_verify(self):
instance = RgwClient.admin_instance()
self.assertFalse(instance.session.verify)
+ def test_no_daemons(self):
+ RgwStub.get_mgr_no_services()
+ with self.assertRaises(NoRgwDaemonsException) as cm:
+ RgwClient.admin_instance()
+ self.assertIn('No RGW service is running.', str(cm.exception))
+
+ def test_no_credentials(self):
+ self.CONFIG_KEY_DICT.update({
+ 'RGW_API_ACCESS_KEY': '',
+ 'RGW_API_SECRET_KEY': '',
+ })
+ with self.assertRaises(NoCredentialsException) as cm:
+ RgwClient.admin_instance()
+ self.assertIn('No RGW credentials found', str(cm.exception))
+
+ def test_default_daemon_wrong_settings(self):
+ self.CONFIG_KEY_DICT.update({
+ 'RGW_API_HOST': '172.20.0.2',
+ 'RGW_API_PORT': '7990',
+ })
+ with self.assertRaises(DashboardException) as cm:
+ RgwClient.admin_instance()
+ self.assertIn('No RGW daemon found with user-defined host:', str(cm.exception))
+
@patch.object(RgwClient, '_get_daemon_zone_info')
def test_get_placement_targets_from_zone(self, zone_info):
zone_info.return_value = {
instance = RgwClient.admin_instance()
expected_result = {
- 'zonegroup': 'zonegroup2-realm1',
+ 'zonegroup': 'zonegroup1',
'placement_targets': [
{
'name': 'default-placement',
self.assertEqual([], instance.get_realms())
-class RgwClientHelperTest(unittest.TestCase):
+class RgwClientHelperTest(TestCase):
def test_parse_frontend_config_1(self):
self.assertEqual(_parse_frontend_config('beast port=8000'), (8000, False))