1 # -*- coding: utf-8 -*-
2 from __future__
import absolute_import
7 from distutils
.util
import strtobool
8 import xml
.etree
.ElementTree
as ET
# noqa: N814
10 from ..awsauth
import S3Auth
11 from ..exceptions
import DashboardException
12 from ..settings
import Settings
, Options
13 from ..rest_client
import RestClient
, RequestException
14 from ..tools
import build_url
, dict_contains_path
, json_str_to_object
,\
19 from typing
import Any
, Dict
, List
, Optional
# pylint: disable=unused-import
21 pass # For typing only
23 logger
= logging
.getLogger('rgw_client')
26 class NoCredentialsException(RequestException
):
28 super(NoCredentialsException
, self
).__init
__(
29 'No RGW credentials found, '
30 'please consult the documentation on how to enable RGW for '
34 def _get_daemon_info() -> Dict
[str, Any
]:
36 Retrieve RGW daemon info from MGR.
37 Note, the service id of the RGW daemons may differ depending on the setup.
47 'addr': '[2001:db8:85a3::8a2e:370:7334]:49774/1534999298',
49 'frontend_config#0': 'civetweb port=7280',
66 'addr': '192.168.178.3:49774/1534999298',
68 'frontend_config#0': 'civetweb port=8000',
77 service_map
= mgr
.get('service_map')
78 if not dict_contains_path(service_map
, ['services', 'rgw', 'daemons']):
79 raise LookupError('No RGW found')
81 daemons
= service_map
['services']['rgw']['daemons']
82 for key
in daemons
.keys():
83 if dict_contains_path(daemons
[key
], ['metadata', 'frontend_config#0']):
87 raise LookupError('No RGW daemon found')
92 def _determine_rgw_addr():
94 Parse RGW daemon info to determine the configured host (IP address) and port.
96 daemon
= _get_daemon_info()
98 addr
= _parse_addr(daemon
['addr'])
99 port
, ssl
= _parse_frontend_config(daemon
['metadata']['frontend_config#0'])
101 return addr
, port
, ssl
104 def _parse_addr(value
):
106 Get the IP address the RGW is running on.
108 >>> _parse_addr('192.168.178.3:49774/1534999298')
111 >>> _parse_addr('[2001:db8:85a3::8a2e:370:7334]:49774/1534999298')
112 '2001:db8:85a3::8a2e:370:7334'
114 >>> _parse_addr('xyz')
115 Traceback (most recent call last):
117 LookupError: Failed to determine RGW address
119 >>> _parse_addr('192.168.178.a:8080/123456789')
120 Traceback (most recent call last):
122 LookupError: Invalid RGW address '192.168.178.a' found
124 >>> _parse_addr('[2001:0db8:1234]:443/123456789')
125 Traceback (most recent call last):
127 LookupError: Invalid RGW address '2001:0db8:1234' found
129 >>> _parse_addr('2001:0db8::1234:49774/1534999298')
130 Traceback (most recent call last):
132 LookupError: Failed to determine RGW address
134 :param value: The string to process. The syntax is '<HOST>:<PORT>/<NONCE>'.
136 :raises LookupError if parsing fails to determine the IP address.
137 :return: The IP address.
140 match
= re
.search(r
'^(\[)?(?(1)([^\]]+)\]|([^:]+)):\d+/\d+?', value
)
143 # Group 0: 192.168.178.3:49774/1534999298
144 # Group 3: 192.168.178.3
146 # Group 0: [2001:db8:85a3::8a2e:370:7334]:49774/1534999298
148 # Group 2: 2001:db8:85a3::8a2e:370:7334
149 addr
= match
.group(3) if match
.group(3) else match
.group(2)
151 ipaddress
.ip_address(six
.u(addr
))
154 raise LookupError('Invalid RGW address \'{}\' found'.format(addr
))
155 raise LookupError('Failed to determine RGW address')
158 def _parse_frontend_config(config
):
160 Get the port the RGW is running on. Due the complexity of the
161 syntax not all variations are supported.
163 If there are multiple (ssl_)ports/(ssl_)endpoints options, then
164 the first found option will be returned.
166 Get more details about the configuration syntax here:
167 http://docs.ceph.com/docs/master/radosgw/frontends/
168 https://civetweb.github.io/civetweb/UserManual.html
170 :param config: The configuration string to parse.
172 :raises LookupError if parsing fails to determine the port.
173 :return: A tuple containing the port number and the information
175 :rtype: (int, boolean)
177 match
= re
.search(r
'^(beast|civetweb)\s+.+$', config
)
179 if match
.group(1) == 'beast':
180 match
= re
.search(r
'(port|ssl_port|endpoint|ssl_endpoint)=(.+)',
183 option_name
= match
.group(1)
184 if option_name
in ['port', 'ssl_port']:
185 match
= re
.search(r
'(\d+)', match
.group(2))
187 port
= int(match
.group(1))
188 ssl
= option_name
== 'ssl_port'
190 if option_name
in ['endpoint', 'ssl_endpoint']:
191 match
= re
.search(r
'([\d.]+|\[.+\])(:(\d+))?',
192 match
.group(2)) # type: ignore
194 port
= int(match
.group(3)) if \
195 match
.group(2) is not None else 443 if \
196 option_name
== 'ssl_endpoint' else \
198 ssl
= option_name
== 'ssl_endpoint'
200 if match
.group(1) == 'civetweb': # type: ignore
201 match
= re
.search(r
'port=(.*:)?(\d+)(s)?', config
)
203 port
= int(match
.group(2))
204 ssl
= match
.group(3) == 's'
206 raise LookupError('Failed to determine RGW port from "{}"'.format(config
))
209 class RgwClient(RestClient
):
210 _SYSTEM_USERID
= None
215 _user_instances
= {} # type: Dict[str, RgwClient]
216 _rgw_settings_snapshot
= None
219 def _load_settings():
220 # The API access key and secret key are mandatory for a minimal configuration.
221 if not (Settings
.RGW_API_ACCESS_KEY
and Settings
.RGW_API_SECRET_KEY
):
222 logger
.warning('No credentials found, please consult the '
223 'documentation about how to enable RGW for the '
225 raise NoCredentialsException()
227 if Options
.has_default_value('RGW_API_HOST') and \
228 Options
.has_default_value('RGW_API_PORT') and \
229 Options
.has_default_value('RGW_API_SCHEME'):
230 host
, port
, ssl
= _determine_rgw_addr()
232 host
= Settings
.RGW_API_HOST
233 port
= Settings
.RGW_API_PORT
234 ssl
= Settings
.RGW_API_SCHEME
== 'https'
236 RgwClient
._host
= host
237 RgwClient
._port
= port
239 RgwClient
._ADMIN
_PATH
= Settings
.RGW_API_ADMIN_RESOURCE
241 # Create an instance using the configured settings.
242 instance
= RgwClient(Settings
.RGW_API_USER_ID
,
243 Settings
.RGW_API_ACCESS_KEY
,
244 Settings
.RGW_API_SECRET_KEY
)
246 RgwClient
._SYSTEM
_USERID
= instance
.userid
248 # Append the instance to the internal map.
249 RgwClient
._user
_instances
[RgwClient
._SYSTEM
_USERID
] = instance
251 def _get_daemon_zone_info(self
): # type: () -> dict
252 return json_str_to_object(self
.proxy('GET', 'config?type=zone', None, None))
254 def _get_realms_info(self
): # type: () -> dict
255 return json_str_to_object(self
.proxy('GET', 'realm?list', None, None))
259 return (Settings
.RGW_API_HOST
,
260 Settings
.RGW_API_PORT
,
261 Settings
.RGW_API_ACCESS_KEY
,
262 Settings
.RGW_API_SECRET_KEY
,
263 Settings
.RGW_API_ADMIN_RESOURCE
,
264 Settings
.RGW_API_SCHEME
,
265 Settings
.RGW_API_USER_ID
,
266 Settings
.RGW_API_SSL_VERIFY
)
269 def instance(userid
):
270 # type: (Optional[str]) -> RgwClient
271 # Discard all cached instances if any rgw setting has changed
272 if RgwClient
._rgw
_settings
_snapshot
!= RgwClient
._rgw
_settings
():
273 RgwClient
._rgw
_settings
_snapshot
= RgwClient
._rgw
_settings
()
274 RgwClient
._user
_instances
.clear()
276 if not RgwClient
._user
_instances
:
277 RgwClient
._load
_settings
()
280 userid
= RgwClient
._SYSTEM
_USERID
282 if userid
not in RgwClient
._user
_instances
:
283 # Get the access and secret keys for the specified user.
284 keys
= RgwClient
.admin_instance().get_user_keys(userid
)
286 raise RequestException(
287 "User '{}' does not have any keys configured.".format(
290 # Create an instance and append it to the internal map.
291 RgwClient
._user
_instances
[userid
] = RgwClient(userid
, # type: ignore
295 return RgwClient
._user
_instances
[userid
] # type: ignore
298 def admin_instance():
299 return RgwClient
.instance(RgwClient
._SYSTEM
_USERID
)
301 def _reset_login(self
):
302 if self
.userid
!= RgwClient
._SYSTEM
_USERID
:
303 logger
.info("Fetching new keys for user: %s", self
.userid
)
304 keys
= RgwClient
.admin_instance().get_user_keys(self
.userid
)
305 self
.auth
= S3Auth(keys
['access_key'], keys
['secret_key'],
306 service_url
=self
.service_url
)
308 raise RequestException('Authentication failed for the "{}" user: wrong credentials'
309 .format(self
.userid
), status_code
=401)
311 def __init__(self
, # pylint: disable-msg=R0913
320 if not host
and not RgwClient
._host
:
321 RgwClient
._load
_settings
()
322 host
= host
if host
else RgwClient
._host
323 port
= port
if port
else RgwClient
._port
324 admin_path
= admin_path
if admin_path
else RgwClient
._ADMIN
_PATH
325 ssl
= ssl
if ssl
else RgwClient
._ssl
326 ssl_verify
= Settings
.RGW_API_SSL_VERIFY
328 self
.service_url
= build_url(host
=host
, port
=port
)
329 self
.admin_path
= admin_path
331 s3auth
= S3Auth(access_key
, secret_key
, service_url
=self
.service_url
)
332 super(RgwClient
, self
).__init
__(host
, port
, 'RGW', ssl
, s3auth
, ssl_verify
=ssl_verify
)
334 # If user ID is not set, then try to get it via the RGW Admin Ops API.
335 self
.userid
= userid
if userid
else self
._get
_user
_id
(self
.admin_path
) # type: str
337 self
._zonegroup
_name
: str = _get_daemon_info()['metadata']['zonegroup_name']
339 logger
.info("Created new connection: user=%s, host=%s, port=%s, ssl=%d, sslverify=%d",
340 self
.userid
, host
, port
, ssl
, ssl_verify
)
342 @RestClient.api_get('/', resp_structure
='[0] > (ID & DisplayName)')
343 def is_service_online(self
, request
=None):
345 Consider the service as online if the response contains the
346 specified keys. Nothing more is checked here.
348 _
= request({'format': 'json'})
351 @RestClient.api_get('/{admin_path}/metadata/user?myself',
352 resp_structure
='data > user_id')
353 def _get_user_id(self
, admin_path
, request
=None):
354 # pylint: disable=unused-argument
356 Get the user ID of the user that is used to communicate with the
359 :return: The user ID of the user that is used to sign the
360 RGW Admin Ops API calls.
363 return response
['data']['user_id']
365 @RestClient.api_get('/{admin_path}/metadata/user', resp_structure
='[+]')
366 def _user_exists(self
, admin_path
, user_id
, request
=None):
367 # pylint: disable=unused-argument
370 return user_id
in response
371 return self
.userid
in response
373 def user_exists(self
, user_id
=None):
374 return self
._user
_exists
(self
.admin_path
, user_id
)
376 @RestClient.api_get('/{admin_path}/metadata/user?key={userid}',
377 resp_structure
='data > system')
378 def _is_system_user(self
, admin_path
, userid
, request
=None):
379 # pylint: disable=unused-argument
381 return strtobool(response
['data']['system'])
383 def is_system_user(self
):
384 return self
._is
_system
_user
(self
.admin_path
, self
.userid
)
387 '/{admin_path}/user',
388 resp_structure
='tenant & user_id & email & keys[*] > '
389 ' (user & access_key & secret_key)')
390 def _admin_get_user_keys(self
, admin_path
, userid
, request
=None):
391 # pylint: disable=unused-argument
392 colon_idx
= userid
.find(':')
393 user
= userid
if colon_idx
== -1 else userid
[:colon_idx
]
394 response
= request({'uid': user
})
395 for key
in response
['keys']:
396 if key
['user'] == userid
:
398 'access_key': key
['access_key'],
399 'secret_key': key
['secret_key']
403 def get_user_keys(self
, userid
):
404 return self
._admin
_get
_user
_keys
(self
.admin_path
, userid
)
406 @RestClient.api('/{admin_path}/{path}')
408 self
, # pylint: disable=too-many-arguments
415 # pylint: disable=unused-argument
416 return request(method
=method
, params
=params
, data
=data
,
419 def proxy(self
, method
, path
, params
, data
):
420 logger
.debug("proxying method=%s path=%s params=%s data=%s",
421 method
, path
, params
, data
)
422 return self
._proxy
_request
(self
.admin_path
, path
, method
,
425 @RestClient.api_get('/', resp_structure
='[1][*] > Name')
426 def get_buckets(self
, request
=None):
428 Get a list of names from all existing buckets of this user.
429 :return: Returns a list of bucket names.
431 response
= request({'format': 'json'})
432 return [bucket
['Name'] for bucket
in response
[1]]
434 @RestClient.api_get('/{bucket_name}')
435 def bucket_exists(self
, bucket_name
, userid
, request
=None):
437 Check if the specified bucket exists for this user.
438 :param bucket_name: The name of the bucket.
439 :return: Returns True if the bucket exists, otherwise False.
441 # pylint: disable=unused-argument
444 my_buckets
= self
.get_buckets()
445 if bucket_name
not in my_buckets
:
446 raise RequestException(
447 'Bucket "{}" belongs to other user'.format(bucket_name
),
450 except RequestException
as e
:
451 if e
.status_code
== 404:
456 @RestClient.api_put('/{bucket_name}')
457 def create_bucket(self
, bucket_name
, zonegroup
=None,
458 placement_target
=None, lock_enabled
=False,
460 logger
.info("Creating bucket: %s, zonegroup: %s, placement_target: %s",
461 bucket_name
, zonegroup
, placement_target
)
463 if zonegroup
and placement_target
:
464 create_bucket_configuration
= ET
.Element('CreateBucketConfiguration')
465 location_constraint
= ET
.SubElement(create_bucket_configuration
, 'LocationConstraint')
466 location_constraint
.text
= '{}:{}'.format(zonegroup
, placement_target
)
467 data
= ET
.tostring(create_bucket_configuration
, encoding
='unicode')
469 headers
= None # type: Optional[dict]
471 headers
= {'x-amz-bucket-object-lock-enabled': 'true'}
473 return request(data
=data
, headers
=headers
)
475 def get_placement_targets(self
): # type: () -> dict
476 zone
= self
._get
_daemon
_zone
_info
()
477 placement_targets
= [] # type: List[Dict]
478 for placement_pool
in zone
['placement_pools']:
479 placement_targets
.append(
481 'name': placement_pool
['key'],
482 'data_pool': placement_pool
['val']['storage_classes']['STANDARD']['data_pool']
486 return {'zonegroup': self
._zonegroup
_name
, 'placement_targets': placement_targets
}
488 def get_realms(self
): # type: () -> List
489 realms_info
= self
._get
_realms
_info
()
490 if 'realms' in realms_info
and realms_info
['realms']:
491 return realms_info
['realms']
495 @RestClient.api_get('/{bucket_name}?versioning')
496 def get_bucket_versioning(self
, bucket_name
, request
=None):
498 Get bucket versioning.
499 :param str bucket_name: the name of the bucket.
500 :return: versioning info
503 # pylint: disable=unused-argument
505 if 'Status' not in result
:
506 result
['Status'] = 'Suspended'
507 if 'MfaDelete' not in result
:
508 result
['MfaDelete'] = 'Disabled'
511 @RestClient.api_put('/{bucket_name}?versioning')
512 def set_bucket_versioning(self
, bucket_name
, versioning_state
, mfa_delete
,
513 mfa_token_serial
, mfa_token_pin
, request
=None):
515 Set bucket versioning.
516 :param str bucket_name: the name of the bucket.
517 :param str versioning_state:
518 https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTVersioningStatus.html
519 :param str mfa_delete: MFA Delete state.
520 :param str mfa_token_serial:
521 https://docs.ceph.com/docs/master/radosgw/mfa/
522 :param str mfa_token_pin: value of a TOTP token at a certain time (auth code)
525 # pylint: disable=unused-argument
526 versioning_configuration
= ET
.Element('VersioningConfiguration')
527 status_element
= ET
.SubElement(versioning_configuration
, 'Status')
528 status_element
.text
= versioning_state
531 if mfa_delete
and mfa_token_serial
and mfa_token_pin
:
532 headers
['x-amz-mfa'] = '{} {}'.format(mfa_token_serial
, mfa_token_pin
)
533 mfa_delete_element
= ET
.SubElement(versioning_configuration
, 'MfaDelete')
534 mfa_delete_element
.text
= mfa_delete
536 data
= ET
.tostring(versioning_configuration
, encoding
='unicode')
539 request(data
=data
, headers
=headers
)
540 except RequestException
as error
:
542 if error
.status_code
== 403:
543 msg
= 'Bad MFA credentials: {}'.format(msg
)
544 # Avoid dashboard GUI redirections caused by status code (403, ...):
545 http_status_code
= 400 if 400 <= error
.status_code
< 500 else error
.status_code
546 raise DashboardException(msg
=msg
,
547 http_status_code
=http_status_code
,
550 @RestClient.api_get('/{bucket_name}?object-lock')
551 def get_bucket_locking(self
, bucket_name
, request
=None):
552 # type: (str, Optional[object]) -> dict
554 Gets the locking configuration for a bucket. The locking
555 configuration will be applied by default to every new object
556 placed in the specified bucket.
557 :param bucket_name: The name of the bucket.
558 :type bucket_name: str
559 :return: The locking configuration.
562 # pylint: disable=unused-argument
564 # Try to get the Object Lock configuration. If there is none,
565 # then return default values.
567 result
= request() # type: ignore
569 'lock_enabled': dict_get(result
, 'ObjectLockEnabled') == 'Enabled',
570 'lock_mode': dict_get(result
, 'Rule.DefaultRetention.Mode'),
571 'lock_retention_period_days': dict_get(result
, 'Rule.DefaultRetention.Days', 0),
572 'lock_retention_period_years': dict_get(result
, 'Rule.DefaultRetention.Years', 0)
574 except RequestException
as e
:
576 content
= json_str_to_object(e
.content
)
578 'Code') == 'ObjectLockConfigurationNotFoundError':
580 'lock_enabled': False,
581 'lock_mode': 'compliance',
582 'lock_retention_period_days': None,
583 'lock_retention_period_years': None
587 @RestClient.api_put('/{bucket_name}?object-lock')
588 def set_bucket_locking(self
,
591 retention_period_days
,
592 retention_period_years
,
594 # type: (str, str, int, int, Optional[object]) -> None
596 Places the locking configuration on the specified bucket. The
597 locking configuration will be applied by default to every new
598 object placed in the specified bucket.
599 :param bucket_name: The name of the bucket.
600 :type bucket_name: str
601 :param mode: The lock mode, e.g. `COMPLIANCE` or `GOVERNANCE`.
603 :param retention_period_days:
604 :type retention_period_days: int
605 :param retention_period_years:
606 :type retention_period_years: int
609 # pylint: disable=unused-argument
611 # Do some validations.
612 if retention_period_days
and retention_period_years
:
613 # https://docs.aws.amazon.com/AmazonS3/latest/API/archive-RESTBucketPUTObjectLockConfiguration.html
614 msg
= "Retention period requires either Days or Years. "\
615 "You can't specify both at the same time."
616 raise DashboardException(msg
=msg
, component
='rgw')
617 if not retention_period_days
and not retention_period_years
:
618 msg
= "Retention period requires either Days or Years. "\
619 "You must specify at least one."
620 raise DashboardException(msg
=msg
, component
='rgw')
622 # Generate the XML data like this:
623 # <ObjectLockConfiguration>
624 # <ObjectLockEnabled>string</ObjectLockEnabled>
627 # <Days>integer</Days>
628 # <Mode>string</Mode>
629 # <Years>integer</Years>
630 # </DefaultRetention>
632 # </ObjectLockConfiguration>
633 locking_configuration
= ET
.Element('ObjectLockConfiguration')
634 enabled_element
= ET
.SubElement(locking_configuration
,
636 enabled_element
.text
= 'Enabled' # Locking can't be disabled.
637 rule_element
= ET
.SubElement(locking_configuration
, 'Rule')
638 default_retention_element
= ET
.SubElement(rule_element
,
640 mode_element
= ET
.SubElement(default_retention_element
, 'Mode')
641 mode_element
.text
= mode
.upper()
642 if retention_period_days
:
643 days_element
= ET
.SubElement(default_retention_element
, 'Days')
644 days_element
.text
= str(retention_period_days
)
645 if retention_period_years
:
646 years_element
= ET
.SubElement(default_retention_element
, 'Years')
647 years_element
.text
= str(retention_period_years
)
649 data
= ET
.tostring(locking_configuration
, encoding
='unicode')
652 _
= request(data
=data
) # type: ignore
653 except RequestException
as e
:
654 raise DashboardException(msg
=str(e
), component
='rgw')