from __future__ import absolute_import
import re
+import logging
+import ipaddress
from distutils.util import strtobool
+import xml.etree.ElementTree as ET # noqa: N814
+import six
from ..awsauth import S3Auth
+from ..exceptions import DashboardException
from ..settings import Settings, Options
from ..rest_client import RestClient, RequestException
-from ..tools import build_url, dict_contains_path, is_valid_ip_address
-from .. import mgr, logger
+from ..tools import build_url, dict_contains_path, json_str_to_object,\
+ partial_dict, dict_get
+from .. import mgr
+
+try:
+ from typing import Dict, List, Optional # pylint: disable=unused-import
+except ImportError:
+ pass # For typing only
+
+logger = logging.getLogger('rgw_client')
class NoCredentialsException(RequestException):
# Group 1: [
# Group 2: 2001:db8:85a3::8a2e:370:7334
addr = match.group(3) if match.group(3) else match.group(2)
- if not is_valid_ip_address(addr):
+ try:
+ ipaddress.ip_address(six.u(addr))
+ return addr
+ except ValueError:
raise LookupError('Invalid RGW address \'{}\' found'.format(addr))
- return addr
raise LookupError('Failed to determine RGW address')
Get the port the RGW is running on. Due the complexity of the
syntax not all variations are supported.
+ If there are multiple (ssl_)ports/(ssl_)endpoints options, then
+ the first found option will be returned.
+
Get more details about the configuration syntax here:
http://docs.ceph.com/docs/master/radosgw/frontends/
https://civetweb.github.io/civetweb/UserManual.html
- >>> _parse_frontend_config('beast port=8000')
- (8000, False)
-
- >>> _parse_frontend_config('civetweb port=8000s')
- (8000, True)
-
- >>> _parse_frontend_config('beast port=192.0.2.3:80')
- (80, False)
-
- >>> _parse_frontend_config('civetweb port=172.5.2.51:8080s')
- (8080, True)
-
- >>> _parse_frontend_config('civetweb port=[::]:8080')
- (8080, False)
-
- >>> _parse_frontend_config('civetweb port=ip6-localhost:80s')
- (80, True)
-
- >>> _parse_frontend_config('civetweb port=[2001:0db8::1234]:80')
- (80, False)
-
- >>> _parse_frontend_config('civetweb port=[::1]:8443s')
- (8443, True)
-
- >>> _parse_frontend_config('civetweb port=xyz')
- Traceback (most recent call last):
- ...
- LookupError: Failed to determine RGW port
-
- >>> _parse_frontend_config('civetweb')
- Traceback (most recent call last):
- ...
- LookupError: Failed to determine RGW port
-
:param config: The configuration string to parse.
:type config: str
:raises LookupError if parsing fails to determine the port.
whether SSL is used.
:rtype: (int, boolean)
"""
- match = re.search(r'port=(.*:)?(\d+)(s)?', config)
+ match = re.search(r'^(beast|civetweb)\s+.+$', config)
if match:
- port = int(match.group(2))
- ssl = match.group(3) == 's'
- return port, ssl
- raise LookupError('Failed to determine RGW port')
+ if match.group(1) == 'beast':
+ match = re.search(r'(port|ssl_port|endpoint|ssl_endpoint)=(.+)',
+ config)
+ if match:
+ option_name = match.group(1)
+ if option_name in ['port', 'ssl_port']:
+ match = re.search(r'(\d+)', match.group(2))
+ if match:
+ port = int(match.group(1))
+ ssl = option_name == 'ssl_port'
+ return port, ssl
+ if option_name in ['endpoint', 'ssl_endpoint']:
+ match = re.search(r'([\d.]+|\[.+\])(:(\d+))?',
+ match.group(2)) # type: ignore
+ if match:
+ port = int(match.group(3)) if \
+ match.group(2) is not None else 443 if \
+ option_name == 'ssl_endpoint' else \
+ 80
+ ssl = option_name == 'ssl_endpoint'
+ return port, ssl
+ if match.group(1) == 'civetweb': # type: ignore
+ match = re.search(r'port=(.*:)?(\d+)(s)?', config)
+ if match:
+ port = int(match.group(2))
+ ssl = match.group(3) == 's'
+ return port, ssl
+ raise LookupError('Failed to determine RGW port from "{}"'.format(config))
class RgwClient(RestClient):
_host = None
_port = None
_ssl = None
- _user_instances = {}
+ _user_instances = {} # type: Dict[str, RgwClient]
_rgw_settings_snapshot = None
@staticmethod
# Append the instance to the internal map.
RgwClient._user_instances[RgwClient._SYSTEM_USERID] = instance
+ def _get_daemon_zone_info(self): # type: () -> dict
+ return json_str_to_object(self.proxy('GET', 'config?type=zone', None, None))
+
+ def _get_daemon_zonegroup_map(self): # type: () -> List[dict]
+ zonegroups = json_str_to_object(
+ self.proxy('GET', 'config?type=zonegroup-map', None, None)
+ )
+
+ return [partial_dict(
+ zonegroup['val'],
+ ['api_name', 'zones']
+ ) for zonegroup in zonegroups['zonegroups']]
+
@staticmethod
def _rgw_settings():
return (Settings.RGW_API_HOST,
@staticmethod
def instance(userid):
+ # type: (Optional[str]) -> RgwClient
# Discard all cached instances if any rgw setting has changed
if RgwClient._rgw_settings_snapshot != RgwClient._rgw_settings():
RgwClient._rgw_settings_snapshot = RgwClient._rgw_settings()
userid))
# Create an instance and append it to the internal map.
- RgwClient._user_instances[userid] = RgwClient(userid,
+ RgwClient._user_instances[userid] = RgwClient(userid, # type: ignore
keys['access_key'],
keys['secret_key'])
- return RgwClient._user_instances[userid]
+ return RgwClient._user_instances[userid] # type: ignore
@staticmethod
def admin_instance():
super(RgwClient, self).__init__(host, port, 'RGW', ssl, s3auth, ssl_verify=ssl_verify)
# If user ID is not set, then try to get it via the RGW Admin Ops API.
- self.userid = userid if userid else self._get_user_id(self.admin_path)
+ self.userid = userid if userid else self._get_user_id(self.admin_path) # type: str
logger.info("Created new connection for user: %s", self.userid)
return self._admin_get_user_keys(self.admin_path, userid)
@RestClient.api('/{admin_path}/{path}')
- def _proxy_request(self, # pylint: disable=too-many-arguments
- admin_path,
- path,
- method,
- params,
- data,
- request=None):
+ def _proxy_request(
+ self, # pylint: disable=too-many-arguments
+ admin_path,
+ path,
+ method,
+ params,
+ data,
+ request=None):
# pylint: disable=unused-argument
- return request(
- method=method, params=params, data=data, raw_content=True)
+ return request(method=method, params=params, data=data,
+ raw_content=True)
def proxy(self, method, path, params, data):
- logger.debug("proxying method=%s path=%s params=%s data=%s", method,
- path, params, data)
- return self._proxy_request(self.admin_path, path, method, params, data)
+ logger.debug("proxying method=%s path=%s params=%s data=%s",
+ method, path, params, data)
+ return self._proxy_request(self.admin_path, path, method,
+ params, data)
@RestClient.api_get('/', resp_structure='[1][*] > Name')
def get_buckets(self, request=None):
raise e
@RestClient.api_put('/{bucket_name}')
- def create_bucket(self, bucket_name, request=None):
- logger.info("Creating bucket: %s", bucket_name)
- return request()
+ def create_bucket(self, bucket_name, zonegroup=None,
+ placement_target=None, lock_enabled=False,
+ request=None):
+ logger.info("Creating bucket: %s, zonegroup: %s, placement_target: %s",
+ bucket_name, zonegroup, placement_target)
+ data = None
+ if zonegroup and placement_target:
+ create_bucket_configuration = ET.Element('CreateBucketConfiguration')
+ location_constraint = ET.SubElement(create_bucket_configuration, 'LocationConstraint')
+ location_constraint.text = '{}:{}'.format(zonegroup, placement_target)
+ data = ET.tostring(create_bucket_configuration, encoding='unicode')
+
+ headers = None # type: Optional[dict]
+ if lock_enabled:
+ headers = {'x-amz-bucket-object-lock-enabled': 'true'}
+
+ return request(data=data, headers=headers)
+
+ def get_placement_targets(self): # type: () -> dict
+ zone = self._get_daemon_zone_info()
+ # A zone without realm id can only belong to default zonegroup.
+ zonegroup_name = 'default'
+ if zone['realm_id']:
+ zonegroup_map = self._get_daemon_zonegroup_map()
+ for zonegroup in zonegroup_map:
+ for realm_zone in zonegroup['zones']:
+ if realm_zone['id'] == zone['id']:
+ zonegroup_name = zonegroup['api_name']
+ break
+
+ placement_targets = [] # type: List[Dict]
+ for placement_pool in zone['placement_pools']:
+ placement_targets.append(
+ {
+ 'name': placement_pool['key'],
+ 'data_pool': placement_pool['val']['storage_classes']['STANDARD']['data_pool']
+ }
+ )
+
+ return {'zonegroup': zonegroup_name, 'placement_targets': placement_targets}
+
+ @RestClient.api_get('/{bucket_name}?versioning')
+ def get_bucket_versioning(self, bucket_name, request=None):
+ """
+ Get bucket versioning.
+ :param str bucket_name: the name of the bucket.
+ :return: versioning info
+ :rtype: Dict
+ """
+ # pylint: disable=unused-argument
+ result = request()
+ if 'Status' not in result:
+ result['Status'] = 'Suspended'
+ if 'MfaDelete' not in result:
+ result['MfaDelete'] = 'Disabled'
+ return result
+
+ @RestClient.api_put('/{bucket_name}?versioning')
+ def set_bucket_versioning(self, bucket_name, versioning_state, mfa_delete,
+ mfa_token_serial, mfa_token_pin, request=None):
+ """
+ Set bucket versioning.
+ :param str bucket_name: the name of the bucket.
+ :param str versioning_state:
+ https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTVersioningStatus.html
+ :param str mfa_delete: MFA Delete state.
+ :param str mfa_token_serial:
+ https://docs.ceph.com/docs/master/radosgw/mfa/
+ :param str mfa_token_pin: value of a TOTP token at a certain time (auth code)
+ :return: None
+ """
+ # pylint: disable=unused-argument
+ versioning_configuration = ET.Element('VersioningConfiguration')
+ status_element = ET.SubElement(versioning_configuration, 'Status')
+ status_element.text = versioning_state
+
+ headers = {}
+ if mfa_delete and mfa_token_serial and mfa_token_pin:
+ headers['x-amz-mfa'] = '{} {}'.format(mfa_token_serial, mfa_token_pin)
+ mfa_delete_element = ET.SubElement(versioning_configuration, 'MfaDelete')
+ mfa_delete_element.text = mfa_delete
+
+ data = ET.tostring(versioning_configuration, encoding='unicode')
+
+ try:
+ request(data=data, headers=headers)
+ except RequestException as error:
+ msg = str(error)
+ if error.status_code == 403:
+ msg = 'Bad MFA credentials: {}'.format(msg)
+ # Avoid dashboard GUI redirections caused by status code (403, ...):
+ http_status_code = 400 if 400 <= error.status_code < 500 else error.status_code
+ raise DashboardException(msg=msg,
+ http_status_code=http_status_code,
+ component='rgw')
+
+ @RestClient.api_get('/{bucket_name}?object-lock')
+ def get_bucket_locking(self, bucket_name, request=None):
+ # type: (str, Optional[object]) -> dict
+ """
+ Gets the locking configuration for a bucket. The locking
+ configuration will be applied by default to every new object
+ placed in the specified bucket.
+ :param bucket_name: The name of the bucket.
+ :type bucket_name: str
+ :return: The locking configuration.
+ :rtype: Dict
+ """
+ # pylint: disable=unused-argument
+
+ # Try to get the Object Lock configuration. If there is none,
+ # then return default values.
+ try:
+ result = request() # type: ignore
+ return {
+ 'lock_enabled': dict_get(result, 'ObjectLockEnabled') == 'Enabled',
+ 'lock_mode': dict_get(result, 'Rule.DefaultRetention.Mode'),
+ 'lock_retention_period_days': dict_get(result, 'Rule.DefaultRetention.Days', 0),
+ 'lock_retention_period_years': dict_get(result, 'Rule.DefaultRetention.Years', 0)
+ }
+ except RequestException as e:
+ if e.content:
+ content = json_str_to_object(e.content)
+ if content.get(
+ 'Code') == 'ObjectLockConfigurationNotFoundError':
+ return {
+ 'lock_enabled': False,
+ 'lock_mode': 'compliance',
+ 'lock_retention_period_days': None,
+ 'lock_retention_period_years': None
+ }
+ raise e
+
+ @RestClient.api_put('/{bucket_name}?object-lock')
+ def set_bucket_locking(self,
+ bucket_name,
+ mode,
+ retention_period_days,
+ retention_period_years,
+ request=None):
+ # type: (str, str, int, int, Optional[object]) -> None
+ """
+ Places the locking configuration on the specified bucket. The
+ locking configuration will be applied by default to every new
+ object placed in the specified bucket.
+ :param bucket_name: The name of the bucket.
+ :type bucket_name: str
+ :param mode: The lock mode, e.g. `COMPLIANCE` or `GOVERNANCE`.
+ :type mode: str
+ :param retention_period_days:
+ :type retention_period_days: int
+ :param retention_period_years:
+ :type retention_period_years: int
+ :rtype: None
+ """
+ # pylint: disable=unused-argument
+
+ # Do some validations.
+ if retention_period_days and retention_period_years:
+ # https://docs.aws.amazon.com/AmazonS3/latest/API/archive-RESTBucketPUTObjectLockConfiguration.html
+ msg = "Retention period requires either Days or Years. "\
+ "You can't specify both at the same time."
+ raise DashboardException(msg=msg, component='rgw')
+ if not retention_period_days and not retention_period_years:
+ msg = "Retention period requires either Days or Years. "\
+ "You must specify at least one."
+ raise DashboardException(msg=msg, component='rgw')
+
+ # Generate the XML data like this:
+ # <ObjectLockConfiguration>
+ # <ObjectLockEnabled>string</ObjectLockEnabled>
+ # <Rule>
+ # <DefaultRetention>
+ # <Days>integer</Days>
+ # <Mode>string</Mode>
+ # <Years>integer</Years>
+ # </DefaultRetention>
+ # </Rule>
+ # </ObjectLockConfiguration>
+ locking_configuration = ET.Element('ObjectLockConfiguration')
+ enabled_element = ET.SubElement(locking_configuration,
+ 'ObjectLockEnabled')
+ enabled_element.text = 'Enabled' # Locking can't be disabled.
+ rule_element = ET.SubElement(locking_configuration, 'Rule')
+ default_retention_element = ET.SubElement(rule_element,
+ 'DefaultRetention')
+ mode_element = ET.SubElement(default_retention_element, 'Mode')
+ mode_element.text = mode.upper()
+ if retention_period_days:
+ days_element = ET.SubElement(default_retention_element, 'Days')
+ days_element.text = str(retention_period_days)
+ if retention_period_years:
+ years_element = ET.SubElement(default_retention_element, 'Years')
+ years_element.text = str(retention_period_years)
+
+ data = ET.tostring(locking_configuration, encoding='unicode')
+
+ try:
+ _ = request(data=data) # type: ignore
+ except RequestException as e:
+ raise DashboardException(msg=str(e), component='rgw')