]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/services/rgw_client.py
import ceph 15.2.10
[ceph.git] / ceph / src / pybind / mgr / dashboard / services / rgw_client.py
1 # -*- coding: utf-8 -*-
2 from __future__ import absolute_import
3
4 import re
5 import logging
6 import ipaddress
7 from distutils.util import strtobool
8 import xml.etree.ElementTree as ET # noqa: N814
9 import six
10 from ..awsauth import S3Auth
11 from ..exceptions import DashboardException
12 from ..settings import Settings, Options
13 from ..rest_client import RestClient, RequestException
14 from ..tools import build_url, dict_contains_path, json_str_to_object,\
15 dict_get
16 from .. import mgr
17
18 try:
19 from typing import Any, Dict, List, Optional, Tuple # pylint: disable=unused-import
20 except ImportError:
21 pass # For typing only
22
23 logger = logging.getLogger('rgw_client')
24
25
26 class NoCredentialsException(RequestException):
27 def __init__(self):
28 super(NoCredentialsException, self).__init__(
29 'No RGW credentials found, '
30 'please consult the documentation on how to enable RGW for '
31 'the dashboard.')
32
33
34 def _get_daemon_info() -> Dict[str, Any]:
35 """
36 Retrieve RGW daemon info from MGR.
37 Note, the service id of the RGW daemons may differ depending on the setup.
38 Example 1:
39 {
40 ...
41 'services': {
42 'rgw': {
43 'daemons': {
44 'summary': '',
45 '0': {
46 ...
47 'addr': '[2001:db8:85a3::8a2e:370:7334]:49774/1534999298',
48 'metadata': {
49 'frontend_config#0': 'civetweb port=7280',
50 }
51 ...
52 }
53 }
54 }
55 }
56 }
57 Example 2:
58 {
59 ...
60 'services': {
61 'rgw': {
62 'daemons': {
63 'summary': '',
64 'rgw': {
65 ...
66 'addr': '192.168.178.3:49774/1534999298',
67 'metadata': {
68 'frontend_config#0': 'civetweb port=8000',
69 }
70 ...
71 }
72 }
73 }
74 }
75 }
76 """
77 service_map = mgr.get('service_map')
78 if not dict_contains_path(service_map, ['services', 'rgw', 'daemons']):
79 raise LookupError('No RGW found')
80 daemon = None
81 daemons = service_map['services']['rgw']['daemons']
82 for key in daemons.keys():
83 if dict_contains_path(daemons[key], ['metadata', 'frontend_config#0']):
84 daemon = daemons[key]
85 break
86 if daemon is None:
87 raise LookupError('No RGW daemon found')
88
89 return daemon
90
91
92 def _determine_rgw_addr() -> Tuple[str, int, bool]:
93 """
94 Parse RGW daemon info to determine the configured host (IP address) and port.
95 """
96 daemon = _get_daemon_info()
97 addr = _parse_addr(daemon['addr'])
98 port, ssl = _parse_frontend_config(daemon['metadata']['frontend_config#0'])
99
100 logger.info('Auto-detected RGW configuration: addr=%s, port=%d, ssl=%s',
101 addr, port, str(ssl))
102
103 return addr, port, ssl
104
105
106 def _parse_addr(value) -> str:
107 """
108 Get the IP address the RGW is running on.
109
110 >>> _parse_addr('192.168.178.3:49774/1534999298')
111 '192.168.178.3'
112
113 >>> _parse_addr('[2001:db8:85a3::8a2e:370:7334]:49774/1534999298')
114 '2001:db8:85a3::8a2e:370:7334'
115
116 >>> _parse_addr('xyz')
117 Traceback (most recent call last):
118 ...
119 LookupError: Failed to determine RGW address
120
121 >>> _parse_addr('192.168.178.a:8080/123456789')
122 Traceback (most recent call last):
123 ...
124 LookupError: Invalid RGW address '192.168.178.a' found
125
126 >>> _parse_addr('[2001:0db8:1234]:443/123456789')
127 Traceback (most recent call last):
128 ...
129 LookupError: Invalid RGW address '2001:0db8:1234' found
130
131 >>> _parse_addr('2001:0db8::1234:49774/1534999298')
132 Traceback (most recent call last):
133 ...
134 LookupError: Failed to determine RGW address
135
136 :param value: The string to process. The syntax is '<HOST>:<PORT>/<NONCE>'.
137 :type: str
138 :raises LookupError if parsing fails to determine the IP address.
139 :return: The IP address.
140 :rtype: str
141 """
142 match = re.search(r'^(\[)?(?(1)([^\]]+)\]|([^:]+)):\d+/\d+?', value)
143 if match:
144 # IPv4:
145 # Group 0: 192.168.178.3:49774/1534999298
146 # Group 3: 192.168.178.3
147 # IPv6:
148 # Group 0: [2001:db8:85a3::8a2e:370:7334]:49774/1534999298
149 # Group 1: [
150 # Group 2: 2001:db8:85a3::8a2e:370:7334
151 addr = match.group(3) if match.group(3) else match.group(2)
152 try:
153 ipaddress.ip_address(six.u(addr))
154 return addr
155 except ValueError:
156 raise LookupError('Invalid RGW address \'{}\' found'.format(addr))
157 raise LookupError('Failed to determine RGW address')
158
159
160 def _parse_frontend_config(config) -> Tuple[int, bool]:
161 """
162 Get the port the RGW is running on. Due the complexity of the
163 syntax not all variations are supported.
164
165 If there are multiple (ssl_)ports/(ssl_)endpoints options, then
166 the first found option will be returned.
167
168 Get more details about the configuration syntax here:
169 http://docs.ceph.com/docs/master/radosgw/frontends/
170 https://civetweb.github.io/civetweb/UserManual.html
171
172 :param config: The configuration string to parse.
173 :type config: str
174 :raises LookupError if parsing fails to determine the port.
175 :return: A tuple containing the port number and the information
176 whether SSL is used.
177 :rtype: (int, boolean)
178 """
179 match = re.search(r'^(beast|civetweb)\s+.+$', config)
180 if match:
181 if match.group(1) == 'beast':
182 match = re.search(r'(port|ssl_port|endpoint|ssl_endpoint)=(.+)',
183 config)
184 if match:
185 option_name = match.group(1)
186 if option_name in ['port', 'ssl_port']:
187 match = re.search(r'(\d+)', match.group(2))
188 if match:
189 port = int(match.group(1))
190 ssl = option_name == 'ssl_port'
191 return port, ssl
192 if option_name in ['endpoint', 'ssl_endpoint']:
193 match = re.search(r'([\d.]+|\[.+\])(:(\d+))?',
194 match.group(2)) # type: ignore
195 if match:
196 port = int(match.group(3)) if \
197 match.group(2) is not None else 443 if \
198 option_name == 'ssl_endpoint' else \
199 80
200 ssl = option_name == 'ssl_endpoint'
201 return port, ssl
202 if match.group(1) == 'civetweb': # type: ignore
203 match = re.search(r'port=(.*:)?(\d+)(s)?', config)
204 if match:
205 port = int(match.group(2))
206 ssl = match.group(3) == 's'
207 return port, ssl
208 raise LookupError('Failed to determine RGW port from "{}"'.format(config))
209
210
211 class RgwClient(RestClient):
212 _SYSTEM_USERID = None
213 _ADMIN_PATH = None
214 _host = None
215 _port = None
216 _ssl = None
217 _user_instances = {} # type: Dict[str, RgwClient]
218 _rgw_settings_snapshot = None
219
220 @staticmethod
221 def _load_settings():
222 # The API access key and secret key are mandatory for a minimal configuration.
223 if not (Settings.RGW_API_ACCESS_KEY and Settings.RGW_API_SECRET_KEY):
224 logger.warning('No credentials found, please consult the '
225 'documentation about how to enable RGW for the '
226 'dashboard.')
227 raise NoCredentialsException()
228
229 if Options.has_default_value('RGW_API_HOST') and \
230 Options.has_default_value('RGW_API_PORT') and \
231 Options.has_default_value('RGW_API_SCHEME'):
232 host, port, ssl = _determine_rgw_addr()
233 else:
234 host = Settings.RGW_API_HOST
235 port = Settings.RGW_API_PORT
236 ssl = Settings.RGW_API_SCHEME == 'https'
237
238 RgwClient._host = host
239 RgwClient._port = port
240 RgwClient._ssl = ssl
241 RgwClient._ADMIN_PATH = Settings.RGW_API_ADMIN_RESOURCE
242
243 # Create an instance using the configured settings.
244 instance = RgwClient(Settings.RGW_API_USER_ID,
245 Settings.RGW_API_ACCESS_KEY,
246 Settings.RGW_API_SECRET_KEY)
247
248 RgwClient._SYSTEM_USERID = instance.userid
249
250 # Append the instance to the internal map.
251 RgwClient._user_instances[RgwClient._SYSTEM_USERID] = instance
252
253 def _get_daemon_zone_info(self): # type: () -> dict
254 return json_str_to_object(self.proxy('GET', 'config?type=zone', None, None))
255
256 def _get_realms_info(self): # type: () -> dict
257 return json_str_to_object(self.proxy('GET', 'realm?list', None, None))
258
259 @staticmethod
260 def _rgw_settings():
261 return (Settings.RGW_API_HOST,
262 Settings.RGW_API_PORT,
263 Settings.RGW_API_ACCESS_KEY,
264 Settings.RGW_API_SECRET_KEY,
265 Settings.RGW_API_ADMIN_RESOURCE,
266 Settings.RGW_API_SCHEME,
267 Settings.RGW_API_USER_ID,
268 Settings.RGW_API_SSL_VERIFY)
269
270 @staticmethod
271 def instance(userid):
272 # type: (Optional[str]) -> RgwClient
273 # Discard all cached instances if any rgw setting has changed
274 if RgwClient._rgw_settings_snapshot != RgwClient._rgw_settings():
275 RgwClient._rgw_settings_snapshot = RgwClient._rgw_settings()
276 RgwClient.drop_instance()
277
278 if not RgwClient._user_instances:
279 RgwClient._load_settings()
280
281 if not userid:
282 userid = RgwClient._SYSTEM_USERID
283
284 if userid not in RgwClient._user_instances:
285 # Get the access and secret keys for the specified user.
286 keys = RgwClient.admin_instance().get_user_keys(userid)
287 if not keys:
288 raise RequestException(
289 "User '{}' does not have any keys configured.".format(
290 userid))
291
292 # Create an instance and append it to the internal map.
293 RgwClient._user_instances[userid] = RgwClient(userid, # type: ignore
294 keys['access_key'],
295 keys['secret_key'])
296
297 return RgwClient._user_instances[userid] # type: ignore
298
299 @staticmethod
300 def admin_instance():
301 return RgwClient.instance(RgwClient._SYSTEM_USERID)
302
303 @staticmethod
304 def drop_instance(userid: Optional[str] = None):
305 """
306 Drop a cached instance by name or all.
307 """
308 if userid:
309 RgwClient._user_instances.pop(userid, None)
310 else:
311 RgwClient._user_instances.clear()
312
313 def _reset_login(self):
314 if self.userid != RgwClient._SYSTEM_USERID:
315 logger.info("Fetching new keys for user: %s", self.userid)
316 keys = RgwClient.admin_instance().get_user_keys(self.userid)
317 self.auth = S3Auth(keys['access_key'], keys['secret_key'],
318 service_url=self.service_url)
319 else:
320 raise RequestException('Authentication failed for the "{}" user: wrong credentials'
321 .format(self.userid), status_code=401)
322
323 def __init__(self, # pylint: disable-msg=R0913
324 userid,
325 access_key,
326 secret_key,
327 host=None,
328 port=None,
329 admin_path=None,
330 ssl=False):
331
332 if not host and not RgwClient._host:
333 RgwClient._load_settings()
334 host = host if host else RgwClient._host
335 port = port if port else RgwClient._port
336 admin_path = admin_path if admin_path else RgwClient._ADMIN_PATH
337 ssl = ssl if ssl else RgwClient._ssl
338 ssl_verify = Settings.RGW_API_SSL_VERIFY
339
340 self.service_url = build_url(host=host, port=port)
341 self.admin_path = admin_path
342
343 s3auth = S3Auth(access_key, secret_key, service_url=self.service_url)
344 super(RgwClient, self).__init__(host, port, 'RGW', ssl, s3auth, ssl_verify=ssl_verify)
345
346 # If user ID is not set, then try to get it via the RGW Admin Ops API.
347 self.userid = userid if userid else self._get_user_id(self.admin_path) # type: str
348
349 self._zonegroup_name: str = _get_daemon_info()['metadata']['zonegroup_name']
350
351 logger.info("Created new connection: user=%s, host=%s, port=%s, ssl=%d, sslverify=%d",
352 self.userid, host, port, ssl, ssl_verify)
353
354 @RestClient.api_get('/', resp_structure='[0] > (ID & DisplayName)')
355 def is_service_online(self, request=None):
356 """
357 Consider the service as online if the response contains the
358 specified keys. Nothing more is checked here.
359 """
360 _ = request({'format': 'json'})
361 return True
362
363 @RestClient.api_get('/{admin_path}/metadata/user?myself',
364 resp_structure='data > user_id')
365 def _get_user_id(self, admin_path, request=None):
366 # pylint: disable=unused-argument
367 """
368 Get the user ID of the user that is used to communicate with the
369 RGW Admin Ops API.
370 :rtype: str
371 :return: The user ID of the user that is used to sign the
372 RGW Admin Ops API calls.
373 """
374 response = request()
375 return response['data']['user_id']
376
377 @RestClient.api_get('/{admin_path}/metadata/user', resp_structure='[+]')
378 def _user_exists(self, admin_path, user_id, request=None):
379 # pylint: disable=unused-argument
380 response = request()
381 if user_id:
382 return user_id in response
383 return self.userid in response
384
385 def user_exists(self, user_id=None):
386 return self._user_exists(self.admin_path, user_id)
387
388 @RestClient.api_get('/{admin_path}/metadata/user?key={userid}',
389 resp_structure='data > system')
390 def _is_system_user(self, admin_path, userid, request=None):
391 # pylint: disable=unused-argument
392 response = request()
393 return strtobool(response['data']['system'])
394
395 def is_system_user(self):
396 return self._is_system_user(self.admin_path, self.userid)
397
398 @RestClient.api_get(
399 '/{admin_path}/user',
400 resp_structure='tenant & user_id & email & keys[*] > '
401 ' (user & access_key & secret_key)')
402 def _admin_get_user_keys(self, admin_path, userid, request=None):
403 # pylint: disable=unused-argument
404 colon_idx = userid.find(':')
405 user = userid if colon_idx == -1 else userid[:colon_idx]
406 response = request({'uid': user})
407 for key in response['keys']:
408 if key['user'] == userid:
409 return {
410 'access_key': key['access_key'],
411 'secret_key': key['secret_key']
412 }
413 return None
414
415 def get_user_keys(self, userid):
416 return self._admin_get_user_keys(self.admin_path, userid)
417
418 @RestClient.api('/{admin_path}/{path}')
419 def _proxy_request(
420 self, # pylint: disable=too-many-arguments
421 admin_path,
422 path,
423 method,
424 params,
425 data,
426 request=None):
427 # pylint: disable=unused-argument
428 return request(method=method, params=params, data=data,
429 raw_content=True)
430
431 def proxy(self, method, path, params, data):
432 logger.debug("proxying method=%s path=%s params=%s data=%s",
433 method, path, params, data)
434 return self._proxy_request(self.admin_path, path, method,
435 params, data)
436
437 @RestClient.api_get('/', resp_structure='[1][*] > Name')
438 def get_buckets(self, request=None):
439 """
440 Get a list of names from all existing buckets of this user.
441 :return: Returns a list of bucket names.
442 """
443 response = request({'format': 'json'})
444 return [bucket['Name'] for bucket in response[1]]
445
446 @RestClient.api_get('/{bucket_name}')
447 def bucket_exists(self, bucket_name, userid, request=None):
448 """
449 Check if the specified bucket exists for this user.
450 :param bucket_name: The name of the bucket.
451 :return: Returns True if the bucket exists, otherwise False.
452 """
453 # pylint: disable=unused-argument
454 try:
455 request()
456 my_buckets = self.get_buckets()
457 if bucket_name not in my_buckets:
458 raise RequestException(
459 'Bucket "{}" belongs to other user'.format(bucket_name),
460 403)
461 return True
462 except RequestException as e:
463 if e.status_code == 404:
464 return False
465
466 raise e
467
468 @RestClient.api_put('/{bucket_name}')
469 def create_bucket(self, bucket_name, zonegroup=None,
470 placement_target=None, lock_enabled=False,
471 request=None):
472 logger.info("Creating bucket: %s, zonegroup: %s, placement_target: %s",
473 bucket_name, zonegroup, placement_target)
474 data = None
475 if zonegroup and placement_target:
476 create_bucket_configuration = ET.Element('CreateBucketConfiguration')
477 location_constraint = ET.SubElement(create_bucket_configuration, 'LocationConstraint')
478 location_constraint.text = '{}:{}'.format(zonegroup, placement_target)
479 data = ET.tostring(create_bucket_configuration, encoding='unicode')
480
481 headers = None # type: Optional[dict]
482 if lock_enabled:
483 headers = {'x-amz-bucket-object-lock-enabled': 'true'}
484
485 return request(data=data, headers=headers)
486
487 def get_placement_targets(self): # type: () -> dict
488 zone = self._get_daemon_zone_info()
489 placement_targets = [] # type: List[Dict]
490 for placement_pool in zone['placement_pools']:
491 placement_targets.append(
492 {
493 'name': placement_pool['key'],
494 'data_pool': placement_pool['val']['storage_classes']['STANDARD']['data_pool']
495 }
496 )
497
498 return {'zonegroup': self._zonegroup_name, 'placement_targets': placement_targets}
499
500 def get_realms(self): # type: () -> List
501 realms_info = self._get_realms_info()
502 if 'realms' in realms_info and realms_info['realms']:
503 return realms_info['realms']
504
505 return []
506
507 @RestClient.api_get('/{bucket_name}?versioning')
508 def get_bucket_versioning(self, bucket_name, request=None):
509 """
510 Get bucket versioning.
511 :param str bucket_name: the name of the bucket.
512 :return: versioning info
513 :rtype: Dict
514 """
515 # pylint: disable=unused-argument
516 result = request()
517 if 'Status' not in result:
518 result['Status'] = 'Suspended'
519 if 'MfaDelete' not in result:
520 result['MfaDelete'] = 'Disabled'
521 return result
522
523 @RestClient.api_put('/{bucket_name}?versioning')
524 def set_bucket_versioning(self, bucket_name, versioning_state, mfa_delete,
525 mfa_token_serial, mfa_token_pin, request=None):
526 """
527 Set bucket versioning.
528 :param str bucket_name: the name of the bucket.
529 :param str versioning_state:
530 https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTVersioningStatus.html
531 :param str mfa_delete: MFA Delete state.
532 :param str mfa_token_serial:
533 https://docs.ceph.com/docs/master/radosgw/mfa/
534 :param str mfa_token_pin: value of a TOTP token at a certain time (auth code)
535 :return: None
536 """
537 # pylint: disable=unused-argument
538 versioning_configuration = ET.Element('VersioningConfiguration')
539 status_element = ET.SubElement(versioning_configuration, 'Status')
540 status_element.text = versioning_state
541
542 headers = {}
543 if mfa_delete and mfa_token_serial and mfa_token_pin:
544 headers['x-amz-mfa'] = '{} {}'.format(mfa_token_serial, mfa_token_pin)
545 mfa_delete_element = ET.SubElement(versioning_configuration, 'MfaDelete')
546 mfa_delete_element.text = mfa_delete
547
548 data = ET.tostring(versioning_configuration, encoding='unicode')
549
550 try:
551 request(data=data, headers=headers)
552 except RequestException as error:
553 msg = str(error)
554 if error.status_code == 403:
555 msg = 'Bad MFA credentials: {}'.format(msg)
556 # Avoid dashboard GUI redirections caused by status code (403, ...):
557 http_status_code = 400 if 400 <= error.status_code < 500 else error.status_code
558 raise DashboardException(msg=msg,
559 http_status_code=http_status_code,
560 component='rgw')
561
562 @RestClient.api_get('/{bucket_name}?object-lock')
563 def get_bucket_locking(self, bucket_name, request=None):
564 # type: (str, Optional[object]) -> dict
565 """
566 Gets the locking configuration for a bucket. The locking
567 configuration will be applied by default to every new object
568 placed in the specified bucket.
569 :param bucket_name: The name of the bucket.
570 :type bucket_name: str
571 :return: The locking configuration.
572 :rtype: Dict
573 """
574 # pylint: disable=unused-argument
575
576 # Try to get the Object Lock configuration. If there is none,
577 # then return default values.
578 try:
579 result = request() # type: ignore
580 return {
581 'lock_enabled': dict_get(result, 'ObjectLockEnabled') == 'Enabled',
582 'lock_mode': dict_get(result, 'Rule.DefaultRetention.Mode'),
583 'lock_retention_period_days': dict_get(result, 'Rule.DefaultRetention.Days', 0),
584 'lock_retention_period_years': dict_get(result, 'Rule.DefaultRetention.Years', 0)
585 }
586 except RequestException as e:
587 if e.content:
588 content = json_str_to_object(e.content)
589 if content.get(
590 'Code') == 'ObjectLockConfigurationNotFoundError':
591 return {
592 'lock_enabled': False,
593 'lock_mode': 'compliance',
594 'lock_retention_period_days': None,
595 'lock_retention_period_years': None
596 }
597 raise e
598
599 @RestClient.api_put('/{bucket_name}?object-lock')
600 def set_bucket_locking(self,
601 bucket_name,
602 mode,
603 retention_period_days,
604 retention_period_years,
605 request=None):
606 # type: (str, str, int, int, Optional[object]) -> None
607 """
608 Places the locking configuration on the specified bucket. The
609 locking configuration will be applied by default to every new
610 object placed in the specified bucket.
611 :param bucket_name: The name of the bucket.
612 :type bucket_name: str
613 :param mode: The lock mode, e.g. `COMPLIANCE` or `GOVERNANCE`.
614 :type mode: str
615 :param retention_period_days:
616 :type retention_period_days: int
617 :param retention_period_years:
618 :type retention_period_years: int
619 :rtype: None
620 """
621 # pylint: disable=unused-argument
622
623 # Do some validations.
624 if retention_period_days and retention_period_years:
625 # https://docs.aws.amazon.com/AmazonS3/latest/API/archive-RESTBucketPUTObjectLockConfiguration.html
626 msg = "Retention period requires either Days or Years. "\
627 "You can't specify both at the same time."
628 raise DashboardException(msg=msg, component='rgw')
629 if not retention_period_days and not retention_period_years:
630 msg = "Retention period requires either Days or Years. "\
631 "You must specify at least one."
632 raise DashboardException(msg=msg, component='rgw')
633
634 # Generate the XML data like this:
635 # <ObjectLockConfiguration>
636 # <ObjectLockEnabled>string</ObjectLockEnabled>
637 # <Rule>
638 # <DefaultRetention>
639 # <Days>integer</Days>
640 # <Mode>string</Mode>
641 # <Years>integer</Years>
642 # </DefaultRetention>
643 # </Rule>
644 # </ObjectLockConfiguration>
645 locking_configuration = ET.Element('ObjectLockConfiguration')
646 enabled_element = ET.SubElement(locking_configuration,
647 'ObjectLockEnabled')
648 enabled_element.text = 'Enabled' # Locking can't be disabled.
649 rule_element = ET.SubElement(locking_configuration, 'Rule')
650 default_retention_element = ET.SubElement(rule_element,
651 'DefaultRetention')
652 mode_element = ET.SubElement(default_retention_element, 'Mode')
653 mode_element.text = mode.upper()
654 if retention_period_days:
655 days_element = ET.SubElement(default_retention_element, 'Days')
656 days_element.text = str(retention_period_days)
657 if retention_period_years:
658 years_element = ET.SubElement(default_retention_element, 'Years')
659 years_element.text = str(retention_period_years)
660
661 data = ET.tostring(locking_configuration, encoding='unicode')
662
663 try:
664 _ = request(data=data) # type: ignore
665 except RequestException as e:
666 raise DashboardException(msg=str(e), component='rgw')