]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/test_rgw.py
1 from unittest
.mock
import Mock
, call
, patch
4 from ..controllers
.rgw
import Rgw
, RgwDaemon
, RgwUser
5 from ..rest_client
import RequestException
6 from ..services
.rgw_client
import RgwClient
7 from ..tests
import ControllerTestCase
, RgwStub
10 class RgwControllerTestCase(ControllerTestCase
):
12 def setup_server(cls
):
13 cls
.setup_controllers([Rgw
], '/test')
15 def setUp(self
) -> None:
17 RgwStub
.get_settings()
19 @patch.object(RgwClient
, '_get_user_id', Mock(return_value
='fake-user'))
20 @patch.object(RgwClient
, 'is_service_online', Mock(return_value
=True))
21 @patch.object(RgwClient
, '_is_system_user', Mock(return_value
=True))
22 @patch('dashboard.services.ceph_service.CephService.send_command')
23 def test_status_available(self
, send_command
):
24 send_command
.return_value
= ''
25 self
._get
('/test/ui-api/rgw/status')
26 self
.assertStatus(200)
27 self
.assertJsonBody({'available': True, 'message': None})
29 @patch.object(RgwClient
, '_get_user_id', Mock(return_value
='fake-user'))
30 @patch.object(RgwClient
, 'is_service_online', Mock(
31 side_effect
=RequestException('My test error')))
32 @patch('dashboard.services.ceph_service.CephService.send_command')
33 def test_status_online_check_error(self
, send_command
):
34 send_command
.return_value
= ''
35 self
._get
('/test/ui-api/rgw/status')
36 self
.assertStatus(200)
37 self
.assertJsonBody({'available': False,
38 'message': 'My test error'})
40 @patch.object(RgwClient
, '_get_user_id', Mock(return_value
='fake-user'))
41 @patch.object(RgwClient
, 'is_service_online', Mock(return_value
=False))
42 @patch('dashboard.services.ceph_service.CephService.send_command')
43 def test_status_not_online(self
, send_command
):
44 send_command
.return_value
= ''
45 self
._get
('/test/ui-api/rgw/status')
46 self
.assertStatus(200)
47 self
.assertJsonBody({'available': False,
48 'message': "Failed to connect to the Object Gateway's Admin Ops API."})
50 @patch.object(RgwClient
, '_get_user_id', Mock(return_value
='fake-user'))
51 @patch.object(RgwClient
, 'is_service_online', Mock(return_value
=True))
52 @patch.object(RgwClient
, '_is_system_user', Mock(return_value
=False))
53 @patch('dashboard.services.ceph_service.CephService.send_command')
54 def test_status_not_system_user(self
, send_command
):
55 send_command
.return_value
= ''
56 self
._get
('/test/ui-api/rgw/status')
57 self
.assertStatus(200)
58 self
.assertJsonBody({'available': False,
59 'message': 'The system flag is not set for user "fake-user".'})
61 def test_status_no_service(self
):
62 RgwStub
.get_mgr_no_services()
63 self
._get
('/test/ui-api/rgw/status')
64 self
.assertStatus(200)
65 self
.assertJsonBody({'available': False, 'message': 'No RGW service is running.'})
68 class RgwDaemonControllerTestCase(ControllerTestCase
):
70 def setup_server(cls
):
71 cls
.setup_controllers([RgwDaemon
], '/test')
73 @patch('dashboard.services.rgw_client.RgwClient._get_user_id', Mock(
74 return_value
='dummy_admin'))
75 @patch('dashboard.services.ceph_service.CephService.send_command')
76 def test_list(self
, send_command
):
77 send_command
.return_value
= ''
79 RgwStub
.get_settings()
80 mgr
.list_servers
.return_value
= [{
82 'services': [{'id': '4832', 'type': 'rgw'}, {'id': '5356', 'type': 'rgw'}]
84 mgr
.get_metadata
.side_effect
= [
86 'ceph_version': 'ceph version master (dev)',
88 'realm_name': 'realm1',
89 'zonegroup_name': 'zg1',
91 'frontend_config#0': 'beast port=80'
94 'ceph_version': 'ceph version master (dev)',
96 'realm_name': 'realm2',
97 'zonegroup_name': 'zg2',
99 'frontend_config#0': 'beast port=80 ssl_port=443 ssl_certificate=config:/config'
101 self
._get
('/test/api/rgw/daemon')
102 self
.assertStatus(200)
103 self
.assertJsonBody([{
105 'service_map_id': '4832',
106 'version': 'ceph version master (dev)',
107 'server_hostname': 'host1',
108 'realm_name': 'realm1',
109 'zonegroup_name': 'zg1',
110 'zone_name': 'zone1', 'default': True,
115 'service_map_id': '5356',
116 'version': 'ceph version master (dev)',
117 'server_hostname': 'host1',
118 'realm_name': 'realm2',
119 'zonegroup_name': 'zg2',
120 'zone_name': 'zone2',
125 def test_list_empty(self
):
126 RgwStub
.get_mgr_no_services()
127 self
._get
('/test/api/rgw/daemon')
128 self
.assertStatus(200)
129 self
.assertJsonBody([])
132 class RgwUserControllerTestCase(ControllerTestCase
):
134 def setup_server(cls
):
135 cls
.setup_controllers([RgwUser
], '/test')
137 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
138 def test_user_list(self
, mock_proxy
):
139 mock_proxy
.side_effect
= [{
141 'keys': ['test1', 'test2', 'test3'],
144 self
._get
('/test/api/rgw/user?daemon_name=dummy-daemon')
145 self
.assertStatus(200)
146 mock_proxy
.assert_has_calls([
147 call('dummy-daemon', 'GET', 'user?list', {})
149 self
.assertJsonBody(['test1', 'test2', 'test3'])
151 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
152 def test_user_list_marker(self
, mock_proxy
):
153 mock_proxy
.side_effect
= [{
155 'keys': ['test1', 'test2', 'test3'],
163 self
._get
('/test/api/rgw/user')
164 self
.assertStatus(200)
165 mock_proxy
.assert_has_calls([
166 call(None, 'GET', 'user?list', {}),
167 call(None, 'GET', 'user?list', {'marker': 'foo:bar'})
169 self
.assertJsonBody(['test1', 'test2', 'test3', 'admin'])
171 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
172 @patch('dashboard.services.ceph_service.CephService.send_command')
173 def test_user_list_duplicate_marker(self
, mock_proxy
, send_command
):
174 send_command
.return_value
= ''
175 mock_proxy
.side_effect
= [{
177 'keys': ['test1', 'test2', 'test3'],
182 'keys': ['test4', 'test5', 'test6'],
190 self
._get
('/test/api/rgw/user')
191 self
.assertStatus(500)
193 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
194 def test_user_list_invalid_marker(self
, mock_proxy
):
195 mock_proxy
.side_effect
= [{
197 'keys': ['test1', 'test2', 'test3'],
202 'keys': ['test4', 'test5', 'test6'],
210 self
._get
('/test/api/rgw/user')
211 self
.assertStatus(500)
213 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
214 @patch.object(RgwUser
, '_keys_allowed')
215 def test_user_get_with_keys(self
, keys_allowed
, mock_proxy
):
216 keys_allowed
.return_value
= True
217 mock_proxy
.return_value
= {
219 'user_id': 'my_user_id',
223 self
._get
('/test/api/rgw/user/testuser')
224 self
.assertStatus(200)
225 self
.assertInJsonBody('keys')
226 self
.assertInJsonBody('swift_keys')
228 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
229 @patch.object(RgwUser
, '_keys_allowed')
230 def test_user_get_without_keys(self
, keys_allowed
, mock_proxy
):
231 keys_allowed
.return_value
= False
232 mock_proxy
.return_value
= {
234 'user_id': 'my_user_id',
238 self
._get
('/test/api/rgw/user/testuser')
239 self
.assertStatus(200)
240 self
.assertNotIn('keys', self
.json_body())
241 self
.assertNotIn('swift_keys', self
.json_body())