]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/tests/test_rgw.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / dashboard / tests / test_rgw.py
1 from unittest.mock import Mock, call, patch
2
3 from .. import mgr
4 from ..controllers.rgw import Rgw, RgwDaemon, RgwUser
5 from ..rest_client import RequestException
6 from ..services.rgw_client import RgwClient
7 from ..tests import ControllerTestCase, RgwStub
8
9
10 class RgwControllerTestCase(ControllerTestCase):
11 @classmethod
12 def setup_server(cls):
13 cls.setup_controllers([Rgw], '/test')
14
15 def setUp(self) -> None:
16 RgwStub.get_daemons()
17 RgwStub.get_settings()
18
19 @patch.object(RgwClient, '_get_user_id', Mock(return_value='fake-user'))
20 @patch.object(RgwClient, 'is_service_online', Mock(return_value=True))
21 @patch.object(RgwClient, '_is_system_user', Mock(return_value=True))
22 @patch('dashboard.services.ceph_service.CephService.send_command')
23 def test_status_available(self, send_command):
24 send_command.return_value = ''
25 self._get('/test/ui-api/rgw/status')
26 self.assertStatus(200)
27 self.assertJsonBody({'available': True, 'message': None})
28
29 @patch.object(RgwClient, '_get_user_id', Mock(return_value='fake-user'))
30 @patch.object(RgwClient, 'is_service_online', Mock(
31 side_effect=RequestException('My test error')))
32 @patch('dashboard.services.ceph_service.CephService.send_command')
33 def test_status_online_check_error(self, send_command):
34 send_command.return_value = ''
35 self._get('/test/ui-api/rgw/status')
36 self.assertStatus(200)
37 self.assertJsonBody({'available': False,
38 'message': 'My test error'})
39
40 @patch.object(RgwClient, '_get_user_id', Mock(return_value='fake-user'))
41 @patch.object(RgwClient, 'is_service_online', Mock(return_value=False))
42 @patch('dashboard.services.ceph_service.CephService.send_command')
43 def test_status_not_online(self, send_command):
44 send_command.return_value = ''
45 self._get('/test/ui-api/rgw/status')
46 self.assertStatus(200)
47 self.assertJsonBody({'available': False,
48 'message': "Failed to connect to the Object Gateway's Admin Ops API."})
49
50 @patch.object(RgwClient, '_get_user_id', Mock(return_value='fake-user'))
51 @patch.object(RgwClient, 'is_service_online', Mock(return_value=True))
52 @patch.object(RgwClient, '_is_system_user', Mock(return_value=False))
53 @patch('dashboard.services.ceph_service.CephService.send_command')
54 def test_status_not_system_user(self, send_command):
55 send_command.return_value = ''
56 self._get('/test/ui-api/rgw/status')
57 self.assertStatus(200)
58 self.assertJsonBody({'available': False,
59 'message': 'The system flag is not set for user "fake-user".'})
60
61 def test_status_no_service(self):
62 RgwStub.get_mgr_no_services()
63 self._get('/test/ui-api/rgw/status')
64 self.assertStatus(200)
65 self.assertJsonBody({'available': False, 'message': 'No RGW service is running.'})
66
67
68 class RgwDaemonControllerTestCase(ControllerTestCase):
69 @classmethod
70 def setup_server(cls):
71 cls.setup_controllers([RgwDaemon], '/test')
72
73 @patch('dashboard.services.rgw_client.RgwClient._get_user_id', Mock(
74 return_value='dummy_admin'))
75 @patch('dashboard.services.ceph_service.CephService.send_command')
76 def test_list(self, send_command):
77 send_command.return_value = ''
78 RgwStub.get_daemons()
79 RgwStub.get_settings()
80 mgr.list_servers.return_value = [{
81 'hostname': 'host1',
82 'services': [{'id': '4832', 'type': 'rgw'}, {'id': '5356', 'type': 'rgw'}]
83 }]
84 mgr.get_metadata.side_effect = [
85 {
86 'ceph_version': 'ceph version master (dev)',
87 'id': 'daemon1',
88 'realm_name': 'realm1',
89 'zonegroup_name': 'zg1',
90 'zone_name': 'zone1',
91 'frontend_config#0': 'beast port=80'
92 },
93 {
94 'ceph_version': 'ceph version master (dev)',
95 'id': 'daemon2',
96 'realm_name': 'realm2',
97 'zonegroup_name': 'zg2',
98 'zone_name': 'zone2',
99 'frontend_config#0': 'beast port=80 ssl_port=443 ssl_certificate=config:/config'
100 }]
101 self._get('/test/api/rgw/daemon')
102 self.assertStatus(200)
103 self.assertJsonBody([{
104 'id': 'daemon1',
105 'service_map_id': '4832',
106 'version': 'ceph version master (dev)',
107 'server_hostname': 'host1',
108 'realm_name': 'realm1',
109 'zonegroup_name': 'zg1',
110 'zone_name': 'zone1', 'default': True,
111 'port': 80
112 },
113 {
114 'id': 'daemon2',
115 'service_map_id': '5356',
116 'version': 'ceph version master (dev)',
117 'server_hostname': 'host1',
118 'realm_name': 'realm2',
119 'zonegroup_name': 'zg2',
120 'zone_name': 'zone2',
121 'default': False,
122 'port': 80
123 }])
124
125 def test_list_empty(self):
126 RgwStub.get_mgr_no_services()
127 self._get('/test/api/rgw/daemon')
128 self.assertStatus(200)
129 self.assertJsonBody([])
130
131
132 class RgwUserControllerTestCase(ControllerTestCase):
133 @classmethod
134 def setup_server(cls):
135 cls.setup_controllers([RgwUser], '/test')
136
137 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
138 def test_user_list(self, mock_proxy):
139 mock_proxy.side_effect = [{
140 'count': 3,
141 'keys': ['test1', 'test2', 'test3'],
142 'truncated': False
143 }]
144 self._get('/test/api/rgw/user?daemon_name=dummy-daemon')
145 self.assertStatus(200)
146 mock_proxy.assert_has_calls([
147 call('dummy-daemon', 'GET', 'user?list', {})
148 ])
149 self.assertJsonBody(['test1', 'test2', 'test3'])
150
151 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
152 def test_user_list_marker(self, mock_proxy):
153 mock_proxy.side_effect = [{
154 'count': 3,
155 'keys': ['test1', 'test2', 'test3'],
156 'marker': 'foo:bar',
157 'truncated': True
158 }, {
159 'count': 1,
160 'keys': ['admin'],
161 'truncated': False
162 }]
163 self._get('/test/api/rgw/user')
164 self.assertStatus(200)
165 mock_proxy.assert_has_calls([
166 call(None, 'GET', 'user?list', {}),
167 call(None, 'GET', 'user?list', {'marker': 'foo:bar'})
168 ])
169 self.assertJsonBody(['test1', 'test2', 'test3', 'admin'])
170
171 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
172 @patch('dashboard.services.ceph_service.CephService.send_command')
173 def test_user_list_duplicate_marker(self, mock_proxy, send_command):
174 send_command.return_value = ''
175 mock_proxy.side_effect = [{
176 'count': 3,
177 'keys': ['test1', 'test2', 'test3'],
178 'marker': 'foo:bar',
179 'truncated': True
180 }, {
181 'count': 3,
182 'keys': ['test4', 'test5', 'test6'],
183 'marker': 'foo:bar',
184 'truncated': True
185 }, {
186 'count': 1,
187 'keys': ['admin'],
188 'truncated': False
189 }]
190 self._get('/test/api/rgw/user')
191 self.assertStatus(500)
192
193 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
194 def test_user_list_invalid_marker(self, mock_proxy):
195 mock_proxy.side_effect = [{
196 'count': 3,
197 'keys': ['test1', 'test2', 'test3'],
198 'marker': 'foo:bar',
199 'truncated': True
200 }, {
201 'count': 3,
202 'keys': ['test4', 'test5', 'test6'],
203 'marker': '',
204 'truncated': True
205 }, {
206 'count': 1,
207 'keys': ['admin'],
208 'truncated': False
209 }]
210 self._get('/test/api/rgw/user')
211 self.assertStatus(500)
212
213 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
214 @patch.object(RgwUser, '_keys_allowed')
215 def test_user_get_with_keys(self, keys_allowed, mock_proxy):
216 keys_allowed.return_value = True
217 mock_proxy.return_value = {
218 'tenant': '',
219 'user_id': 'my_user_id',
220 'keys': [],
221 'swift_keys': []
222 }
223 self._get('/test/api/rgw/user/testuser')
224 self.assertStatus(200)
225 self.assertInJsonBody('keys')
226 self.assertInJsonBody('swift_keys')
227
228 @patch('dashboard.controllers.rgw.RgwRESTController.proxy')
229 @patch.object(RgwUser, '_keys_allowed')
230 def test_user_get_without_keys(self, keys_allowed, mock_proxy):
231 keys_allowed.return_value = False
232 mock_proxy.return_value = {
233 'tenant': '',
234 'user_id': 'my_user_id',
235 'keys': [],
236 'swift_keys': []
237 }
238 self._get('/test/api/rgw/user/testuser')
239 self.assertStatus(200)
240 self.assertNotIn('keys', self.json_body())
241 self.assertNotIn('swift_keys', self.json_body())