]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/test_rgw.py
1 from unittest
.mock
import Mock
, call
, patch
4 from ..controllers
.rgw
import Rgw
, RgwDaemon
, RgwUser
5 from ..rest_client
import RequestException
6 from ..services
.rgw_client
import RgwClient
7 from . import ControllerTestCase
, RgwStub
# pylint: disable=no-name-in-module
10 class RgwControllerTestCase(ControllerTestCase
):
12 def setup_server(cls
):
13 Rgw
._cp
_config
['tools.authenticate.on'] = False # pylint: disable=protected-access
14 cls
.setup_controllers([Rgw
], '/test')
16 def setUp(self
) -> None:
18 RgwStub
.get_settings()
20 @patch.object(RgwClient
, '_get_user_id', Mock(return_value
='fake-user'))
21 @patch.object(RgwClient
, 'is_service_online', Mock(return_value
=True))
22 @patch.object(RgwClient
, '_is_system_user', Mock(return_value
=True))
23 def test_status_available(self
):
24 self
._get
('/test/api/rgw/status')
25 self
.assertStatus(200)
26 self
.assertJsonBody({'available': True, 'message': None})
28 @patch.object(RgwClient
, '_get_user_id', Mock(return_value
='fake-user'))
29 @patch.object(RgwClient
, 'is_service_online', Mock(
30 side_effect
=RequestException('My test error')))
31 def test_status_online_check_error(self
):
32 self
._get
('/test/api/rgw/status')
33 self
.assertStatus(200)
34 self
.assertJsonBody({'available': False,
35 'message': 'My test error'})
37 @patch.object(RgwClient
, '_get_user_id', Mock(return_value
='fake-user'))
38 @patch.object(RgwClient
, 'is_service_online', Mock(return_value
=False))
39 def test_status_not_online(self
):
40 self
._get
('/test/api/rgw/status')
41 self
.assertStatus(200)
42 self
.assertJsonBody({'available': False,
43 'message': "Failed to connect to the Object Gateway's Admin Ops API."})
45 @patch.object(RgwClient
, '_get_user_id', Mock(return_value
='fake-user'))
46 @patch.object(RgwClient
, 'is_service_online', Mock(return_value
=True))
47 @patch.object(RgwClient
, '_is_system_user', Mock(return_value
=False))
48 def test_status_not_system_user(self
):
49 self
._get
('/test/api/rgw/status')
50 self
.assertStatus(200)
51 self
.assertJsonBody({'available': False,
52 'message': 'The system flag is not set for user "fake-user".'})
54 def test_status_no_service(self
):
55 RgwStub
.get_mgr_no_services()
56 self
._get
('/test/api/rgw/status')
57 self
.assertStatus(200)
58 self
.assertJsonBody({'available': False, 'message': 'No RGW service is running.'})
61 class RgwDaemonControllerTestCase(ControllerTestCase
):
63 def setup_server(cls
):
64 RgwDaemon
._cp
_config
['tools.authenticate.on'] = False # pylint: disable=protected-access
65 cls
.setup_controllers([RgwDaemon
], '/test')
67 @patch('dashboard.services.rgw_client.RgwClient._get_user_id', Mock(
68 return_value
='dummy_admin'))
71 RgwStub
.get_settings()
72 mgr
.list_servers
.return_value
= [{
74 'services': [{'id': 'daemon1', 'type': 'rgw'}, {'id': 'daemon2', 'type': 'rgw'}]
76 mgr
.get_metadata
.side_effect
= [
78 'ceph_version': 'ceph version master (dev)',
80 'zonegroup_name': 'zg1',
84 'ceph_version': 'ceph version master (dev)',
86 'zonegroup_name': 'zg2',
89 self
._get
('/test/api/rgw/daemon')
90 self
.assertStatus(200)
91 self
.assertJsonBody([{
93 'version': 'ceph version master (dev)',
94 'server_hostname': 'host1',
95 'zonegroup_name': 'zg1',
96 'zone_name': 'zone1', 'default': True
100 'version': 'ceph version master (dev)',
101 'server_hostname': 'host1',
102 'zonegroup_name': 'zg2',
103 'zone_name': 'zone2',
107 def test_list_empty(self
):
108 RgwStub
.get_mgr_no_services()
109 self
._get
('/test/api/rgw/daemon')
110 self
.assertStatus(200)
111 self
.assertJsonBody([])
114 class RgwUserControllerTestCase(ControllerTestCase
):
116 def setup_server(cls
):
117 RgwUser
._cp
_config
['tools.authenticate.on'] = False # pylint: disable=protected-access
118 cls
.setup_controllers([RgwUser
], '/test')
120 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
121 def test_user_list(self
, mock_proxy
):
122 mock_proxy
.side_effect
= [{
124 'keys': ['test1', 'test2', 'test3'],
127 self
._get
('/test/api/rgw/user?daemon_name=dummy-daemon')
128 self
.assertStatus(200)
129 mock_proxy
.assert_has_calls([
130 call('dummy-daemon', 'GET', 'user?list', {})
132 self
.assertJsonBody(['test1', 'test2', 'test3'])
134 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
135 def test_user_list_marker(self
, mock_proxy
):
136 mock_proxy
.side_effect
= [{
138 'keys': ['test1', 'test2', 'test3'],
146 self
._get
('/test/api/rgw/user')
147 self
.assertStatus(200)
148 mock_proxy
.assert_has_calls([
149 call(None, 'GET', 'user?list', {}),
150 call(None, 'GET', 'user?list', {'marker': 'foo:bar'})
152 self
.assertJsonBody(['test1', 'test2', 'test3', 'admin'])
154 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
155 def test_user_list_duplicate_marker(self
, mock_proxy
):
156 mock_proxy
.side_effect
= [{
158 'keys': ['test1', 'test2', 'test3'],
163 'keys': ['test4', 'test5', 'test6'],
171 self
._get
('/test/api/rgw/user')
172 self
.assertStatus(500)
174 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
175 def test_user_list_invalid_marker(self
, mock_proxy
):
176 mock_proxy
.side_effect
= [{
178 'keys': ['test1', 'test2', 'test3'],
183 'keys': ['test4', 'test5', 'test6'],
191 self
._get
('/test/api/rgw/user')
192 self
.assertStatus(500)
194 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
195 @patch.object(RgwUser
, '_keys_allowed')
196 def test_user_get_with_keys(self
, keys_allowed
, mock_proxy
):
197 keys_allowed
.return_value
= True
198 mock_proxy
.return_value
= {
200 'user_id': 'my_user_id',
204 self
._get
('/test/api/rgw/user/testuser')
205 self
.assertStatus(200)
206 self
.assertInJsonBody('keys')
207 self
.assertInJsonBody('swift_keys')
209 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
210 @patch.object(RgwUser
, '_keys_allowed')
211 def test_user_get_without_keys(self
, keys_allowed
, mock_proxy
):
212 keys_allowed
.return_value
= False
213 mock_proxy
.return_value
= {
215 'user_id': 'my_user_id',
219 self
._get
('/test/api/rgw/user/testuser')
220 self
.assertStatus(200)
221 self
.assertNotIn('keys', self
.json_body())
222 self
.assertNotIn('swift_keys', self
.json_body())