1 # -*- coding: utf-8 -*-
7 import xml
.etree
.ElementTree
as ET
# noqa: N814
8 from distutils
.util
import strtobool
9 from subprocess
import SubprocessError
11 from mgr_util
import build_url
14 from ..awsauth
import S3Auth
15 from ..exceptions
import DashboardException
16 from ..rest_client
import RequestException
, RestClient
17 from ..settings
import Settings
18 from ..tools
import dict_contains_path
, dict_get
, json_str_to_object
21 from typing
import Any
, Dict
, List
, Optional
, Tuple
, Union
23 pass # For typing only
25 logger
= logging
.getLogger('rgw_client')
28 class NoRgwDaemonsException(Exception):
30 super().__init
__('No RGW service is running.')
33 class NoCredentialsException(Exception):
35 super(NoCredentialsException
, self
).__init
__(
36 'No RGW credentials found, '
37 'please consult the documentation on how to enable RGW for '
41 class RgwAdminException(Exception):
46 """Simple representation of a daemon."""
56 def _get_daemons() -> Dict
[str, RgwDaemon
]:
58 Retrieve RGW daemon info from MGR.
60 service_map
= mgr
.get('service_map')
61 if not dict_contains_path(service_map
, ['services', 'rgw', 'daemons']):
62 raise NoRgwDaemonsException
65 daemon_map
= service_map
['services']['rgw']['daemons']
66 for key
in daemon_map
.keys():
67 if dict_contains_path(daemon_map
[key
], ['metadata', 'frontend_config#0']):
68 daemon
= _determine_rgw_addr(daemon_map
[key
])
69 daemon
.name
= daemon_map
[key
]['metadata']['id']
70 daemon
.realm_name
= daemon_map
[key
]['metadata']['realm_name']
71 daemon
.zonegroup_name
= daemon_map
[key
]['metadata']['zonegroup_name']
72 daemon
.zone_name
= daemon_map
[key
]['metadata']['zone_name']
73 daemons
[daemon
.name
] = daemon
74 logger
.info('Found RGW daemon with configuration: host=%s, port=%d, ssl=%s',
75 daemon
.host
, daemon
.port
, str(daemon
.ssl
))
77 raise NoRgwDaemonsException
82 def _determine_rgw_addr(daemon_info
: Dict
[str, Any
]) -> RgwDaemon
:
84 Parse RGW daemon info to determine the configured host (IP address) and port.
87 daemon
.host
= daemon_info
['metadata']['hostname']
88 daemon
.port
, daemon
.ssl
= _parse_frontend_config(daemon_info
['metadata']['frontend_config#0'])
93 def _parse_addr(value
) -> str:
95 Get the IP address the RGW is running on.
97 >>> _parse_addr('192.168.178.3:49774/1534999298')
100 >>> _parse_addr('[2001:db8:85a3::8a2e:370:7334]:49774/1534999298')
101 '2001:db8:85a3::8a2e:370:7334'
103 >>> _parse_addr('xyz')
104 Traceback (most recent call last):
106 LookupError: Failed to determine RGW address
108 >>> _parse_addr('192.168.178.a:8080/123456789')
109 Traceback (most recent call last):
111 LookupError: Invalid RGW address '192.168.178.a' found
113 >>> _parse_addr('[2001:0db8:1234]:443/123456789')
114 Traceback (most recent call last):
116 LookupError: Invalid RGW address '2001:0db8:1234' found
118 >>> _parse_addr('2001:0db8::1234:49774/1534999298')
119 Traceback (most recent call last):
121 LookupError: Failed to determine RGW address
123 :param value: The string to process. The syntax is '<HOST>:<PORT>/<NONCE>'.
125 :raises LookupError if parsing fails to determine the IP address.
126 :return: The IP address.
129 match
= re
.search(r
'^(\[)?(?(1)([^\]]+)\]|([^:]+)):\d+/\d+?', value
)
132 # Group 0: 192.168.178.3:49774/1534999298
133 # Group 3: 192.168.178.3
135 # Group 0: [2001:db8:85a3::8a2e:370:7334]:49774/1534999298
137 # Group 2: 2001:db8:85a3::8a2e:370:7334
138 addr
= match
.group(3) if match
.group(3) else match
.group(2)
140 ipaddress
.ip_address(addr
)
143 raise LookupError('Invalid RGW address \'{}\' found'.format(addr
))
144 raise LookupError('Failed to determine RGW address')
147 def _parse_frontend_config(config
) -> Tuple
[int, bool]:
149 Get the port the RGW is running on. Due the complexity of the
150 syntax not all variations are supported.
152 If there are multiple (ssl_)ports/(ssl_)endpoints options, then
153 the first found option will be returned.
155 Get more details about the configuration syntax here:
156 http://docs.ceph.com/en/latest/radosgw/frontends/
157 https://civetweb.github.io/civetweb/UserManual.html
159 :param config: The configuration string to parse.
161 :raises LookupError if parsing fails to determine the port.
162 :return: A tuple containing the port number and the information
164 :rtype: (int, boolean)
166 match
= re
.search(r
'^(beast|civetweb)\s+.+$', config
)
168 if match
.group(1) == 'beast':
169 match
= re
.search(r
'(port|ssl_port|endpoint|ssl_endpoint)=(.+)',
172 option_name
= match
.group(1)
173 if option_name
in ['port', 'ssl_port']:
174 match
= re
.search(r
'(\d+)', match
.group(2))
176 port
= int(match
.group(1))
177 ssl
= option_name
== 'ssl_port'
179 if option_name
in ['endpoint', 'ssl_endpoint']:
180 match
= re
.search(r
'([\d.]+|\[.+\])(:(\d+))?',
181 match
.group(2)) # type: ignore
183 port
= int(match
.group(3)) if \
184 match
.group(2) is not None else 443 if \
185 option_name
== 'ssl_endpoint' else \
187 ssl
= option_name
== 'ssl_endpoint'
189 if match
.group(1) == 'civetweb': # type: ignore
190 match
= re
.search(r
'port=(.*:)?(\d+)(s)?', config
)
192 port
= int(match
.group(2))
193 ssl
= match
.group(3) == 's'
195 raise LookupError('Failed to determine RGW port from "{}"'.format(config
))
198 def _parse_secrets(user
: str, data
: dict) -> Tuple
[str, str]:
199 for key
in data
.get('keys', []):
200 if key
.get('user') == user
and data
.get('system') in ['true', True]:
201 access_key
= key
.get('access_key')
202 secret_key
= key
.get('secret_key')
203 return access_key
, secret_key
207 def _get_user_keys(user
: str, realm
: Optional
[str] = None) -> Tuple
[str, str]:
210 rgw_user_info_cmd
= ['user', 'info', '--uid', user
]
211 cmd_realm_option
= ['--rgw-realm', realm
] if realm
else []
213 rgw_user_info_cmd
+= cmd_realm_option
215 _
, out
, err
= mgr
.send_rgwadmin_command(rgw_user_info_cmd
)
217 access_key
, secret_key
= _parse_secrets(user
, out
)
219 rgw_create_user_cmd
= [
222 '--display-name', 'Ceph Dashboard',
225 _
, out
, err
= mgr
.send_rgwadmin_command(rgw_create_user_cmd
)
227 access_key
, secret_key
= _parse_secrets(user
, out
)
229 logger
.error('Unable to create rgw user "%s": %s', user
, err
)
230 except SubprocessError
as error
:
231 logger
.exception(error
)
233 return access_key
, secret_key
236 def configure_rgw_credentials():
237 logger
.info('Configuring dashboard RGW credentials')
243 _
, out
, err
= mgr
.send_rgwadmin_command(['realm', 'list'])
245 realms
= out
.get('realms', [])
247 logger
.error('Unable to list RGW realms: %s', err
)
249 realm_access_keys
= {}
250 realm_secret_keys
= {}
252 realm_access_key
, realm_secret_key
= _get_user_keys(user
, realm
)
254 realm_access_keys
[realm
] = realm_access_key
255 realm_secret_keys
[realm
] = realm_secret_key
256 if realm_access_keys
:
257 access_key
= json
.dumps(realm_access_keys
)
258 secret_key
= json
.dumps(realm_secret_keys
)
260 access_key
, secret_key
= _get_user_keys(user
)
262 assert access_key
and secret_key
263 Settings
.RGW_API_ACCESS_KEY
= access_key
264 Settings
.RGW_API_SECRET_KEY
= secret_key
265 except (AssertionError, SubprocessError
) as error
:
266 logger
.exception(error
)
267 raise NoCredentialsException
270 # pylint: disable=R0904
271 class RgwClient(RestClient
):
275 _user_instances
= {} # type: Dict[str, Dict[str, RgwClient]]
276 _config_instances
= {} # type: Dict[str, RgwClient]
277 _rgw_settings_snapshot
= None
278 _daemons
: Dict
[str, RgwDaemon
] = {}
280 got_keys_from_config
: bool
284 def _handle_response_status_code(status_code
: int) -> int:
285 # Do not return auth error codes (so they are not handled as ceph API user auth errors).
286 return 404 if status_code
in [401, 403] else status_code
289 def _get_daemon_connection_info(daemon_name
: str) -> dict:
291 realm_name
= RgwClient
._daemons
[daemon_name
].realm_name
292 access_key
= Settings
.RGW_API_ACCESS_KEY
[realm_name
]
293 secret_key
= Settings
.RGW_API_SECRET_KEY
[realm_name
]
295 # Legacy string values.
296 access_key
= Settings
.RGW_API_ACCESS_KEY
297 secret_key
= Settings
.RGW_API_SECRET_KEY
298 except KeyError as error
:
299 raise DashboardException(msg
='Credentials not found for RGW Daemon: {}'.format(error
),
300 http_status_code
=404,
303 return {'access_key': access_key
, 'secret_key': secret_key
}
305 def _get_daemon_zone_info(self
): # type: () -> dict
306 return json_str_to_object(self
.proxy('GET', 'config?type=zone', None, None))
308 def _get_realms_info(self
): # type: () -> dict
309 return json_str_to_object(self
.proxy('GET', 'realm?list', None, None))
311 def _get_realm_info(self
, realm_id
: str) -> Dict
[str, Any
]:
312 return json_str_to_object(self
.proxy('GET', f
'realm?id={realm_id}', None, None))
316 return (Settings
.RGW_API_ACCESS_KEY
,
317 Settings
.RGW_API_SECRET_KEY
,
318 Settings
.RGW_API_ADMIN_RESOURCE
,
319 Settings
.RGW_API_SSL_VERIFY
)
322 def instance(userid
: Optional
[str] = None,
323 daemon_name
: Optional
[str] = None) -> 'RgwClient':
324 # pylint: disable=too-many-branches
326 RgwClient
._daemons
= _get_daemons()
328 # The API access key and secret key are mandatory for a minimal configuration.
329 if not (Settings
.RGW_API_ACCESS_KEY
and Settings
.RGW_API_SECRET_KEY
):
330 configure_rgw_credentials()
334 daemon_name
= next(iter(RgwClient
._daemons
.keys()))
336 # Discard all cached instances if any rgw setting has changed
337 if RgwClient
._rgw
_settings
_snapshot
!= RgwClient
._rgw
_settings
():
338 RgwClient
._rgw
_settings
_snapshot
= RgwClient
._rgw
_settings
()
339 RgwClient
.drop_instance()
341 if daemon_name
not in RgwClient
._config
_instances
:
342 connection_info
= RgwClient
._get
_daemon
_connection
_info
(daemon_name
)
343 RgwClient
._config
_instances
[daemon_name
] = RgwClient(connection_info
['access_key'],
344 connection_info
['secret_key'],
347 if not userid
or userid
== RgwClient
._config
_instances
[daemon_name
].userid
:
348 return RgwClient
._config
_instances
[daemon_name
]
350 if daemon_name
not in RgwClient
._user
_instances \
351 or userid
not in RgwClient
._user
_instances
[daemon_name
]:
352 # Get the access and secret keys for the specified user.
353 keys
= RgwClient
._config
_instances
[daemon_name
].get_user_keys(userid
)
355 raise RequestException(
356 "User '{}' does not have any keys configured.".format(
358 instance
= RgwClient(keys
['access_key'],
362 RgwClient
._user
_instances
.update({daemon_name
: {userid
: instance
}})
364 return RgwClient
._user
_instances
[daemon_name
][userid
]
367 def admin_instance(daemon_name
: Optional
[str] = None) -> 'RgwClient':
368 return RgwClient
.instance(daemon_name
=daemon_name
)
371 def drop_instance(instance
: Optional
['RgwClient'] = None):
373 Drop a cached instance or all.
376 if instance
.got_keys_from_config
:
377 del RgwClient
._config
_instances
[instance
.daemon
.name
]
379 del RgwClient
._user
_instances
[instance
.daemon
.name
][instance
.userid
]
381 RgwClient
._config
_instances
.clear()
382 RgwClient
._user
_instances
.clear()
384 def _reset_login(self
):
385 if self
.got_keys_from_config
:
386 raise RequestException('Authentication failed for the "{}" user: wrong credentials'
387 .format(self
.userid
), status_code
=401)
388 logger
.info("Fetching new keys for user: %s", self
.userid
)
389 keys
= RgwClient
.admin_instance(daemon_name
=self
.daemon
.name
).get_user_keys(self
.userid
)
390 self
.auth
= S3Auth(keys
['access_key'], keys
['secret_key'],
391 service_url
=self
.service_url
)
397 user_id
: Optional
[str] = None) -> None:
399 daemon
= RgwClient
._daemons
[daemon_name
]
400 except KeyError as error
:
401 raise DashboardException(msg
='RGW Daemon not found: {}'.format(error
),
402 http_status_code
=404,
404 ssl_verify
= Settings
.RGW_API_SSL_VERIFY
405 self
.admin_path
= Settings
.RGW_API_ADMIN_RESOURCE
406 self
.service_url
= build_url(host
=daemon
.host
, port
=daemon
.port
)
408 self
.auth
= S3Auth(access_key
, secret_key
, service_url
=self
.service_url
)
409 super(RgwClient
, self
).__init
__(daemon
.host
,
414 ssl_verify
=ssl_verify
)
415 self
.got_keys_from_config
= not user_id
417 self
.userid
= self
._get
_user
_id
(self
.admin_path
) if self
.got_keys_from_config \
419 except RequestException
as error
:
420 logger
.exception(error
)
421 msg
= 'Error connecting to Object Gateway'
422 if error
.status_code
== 404:
423 msg
= '{}: {}'.format(msg
, str(error
))
424 raise DashboardException(msg
=msg
,
425 http_status_code
=error
.status_code
,
429 logger
.info("Created new connection: daemon=%s, host=%s, port=%s, ssl=%d, sslverify=%d",
430 daemon
.name
, daemon
.host
, daemon
.port
, daemon
.ssl
, ssl_verify
)
432 @RestClient.api_get('/', resp_structure
='[0] > (ID & DisplayName)')
433 def is_service_online(self
, request
=None) -> bool:
435 Consider the service as online if the response contains the
436 specified keys. Nothing more is checked here.
438 _
= request({'format': 'json'})
441 @RestClient.api_get('/{admin_path}/metadata/user?myself',
442 resp_structure
='data > user_id')
443 def _get_user_id(self
, admin_path
, request
=None):
444 # pylint: disable=unused-argument
446 Get the user ID of the user that is used to communicate with the
449 :return: The user ID of the user that is used to sign the
450 RGW Admin Ops API calls.
453 return response
['data']['user_id']
455 @RestClient.api_get('/{admin_path}/metadata/user', resp_structure
='[+]')
456 def _user_exists(self
, admin_path
, user_id
, request
=None):
457 # pylint: disable=unused-argument
460 return user_id
in response
461 return self
.userid
in response
463 def user_exists(self
, user_id
=None):
464 return self
._user
_exists
(self
.admin_path
, user_id
)
466 @RestClient.api_get('/{admin_path}/metadata/user?key={userid}',
467 resp_structure
='data > system')
468 def _is_system_user(self
, admin_path
, userid
, request
=None) -> bool:
469 # pylint: disable=unused-argument
471 return strtobool(response
['data']['system'])
473 def is_system_user(self
) -> bool:
474 return self
._is
_system
_user
(self
.admin_path
, self
.userid
)
477 '/{admin_path}/user',
478 resp_structure
='tenant & user_id & email & keys[*] > '
479 ' (user & access_key & secret_key)')
480 def _admin_get_user_keys(self
, admin_path
, userid
, request
=None):
481 # pylint: disable=unused-argument
482 colon_idx
= userid
.find(':')
483 user
= userid
if colon_idx
== -1 else userid
[:colon_idx
]
484 response
= request({'uid': user
})
485 for key
in response
['keys']:
486 if key
['user'] == userid
:
488 'access_key': key
['access_key'],
489 'secret_key': key
['secret_key']
493 def get_user_keys(self
, userid
):
494 return self
._admin
_get
_user
_keys
(self
.admin_path
, userid
)
496 @RestClient.api('/{admin_path}/{path}')
498 self
, # pylint: disable=too-many-arguments
505 # pylint: disable=unused-argument
506 return request(method
=method
, params
=params
, data
=data
,
509 def proxy(self
, method
, path
, params
, data
):
510 logger
.debug("proxying method=%s path=%s params=%s data=%s",
511 method
, path
, params
, data
)
512 return self
._proxy
_request
(self
.admin_path
, path
, method
,
515 @RestClient.api_get('/', resp_structure
='[1][*] > Name')
516 def get_buckets(self
, request
=None):
518 Get a list of names from all existing buckets of this user.
519 :return: Returns a list of bucket names.
521 response
= request({'format': 'json'})
522 return [bucket
['Name'] for bucket
in response
[1]]
524 @RestClient.api_get('/{bucket_name}')
525 def bucket_exists(self
, bucket_name
, userid
, request
=None):
527 Check if the specified bucket exists for this user.
528 :param bucket_name: The name of the bucket.
529 :return: Returns True if the bucket exists, otherwise False.
531 # pylint: disable=unused-argument
534 my_buckets
= self
.get_buckets()
535 if bucket_name
not in my_buckets
:
536 raise RequestException(
537 'Bucket "{}" belongs to other user'.format(bucket_name
),
540 except RequestException
as e
:
541 if e
.status_code
== 404:
546 @RestClient.api_put('/{bucket_name}')
547 def create_bucket(self
, bucket_name
, zonegroup
=None,
548 placement_target
=None, lock_enabled
=False,
550 logger
.info("Creating bucket: %s, zonegroup: %s, placement_target: %s",
551 bucket_name
, zonegroup
, placement_target
)
553 if zonegroup
and placement_target
:
554 create_bucket_configuration
= ET
.Element('CreateBucketConfiguration')
555 location_constraint
= ET
.SubElement(create_bucket_configuration
, 'LocationConstraint')
556 location_constraint
.text
= '{}:{}'.format(zonegroup
, placement_target
)
557 data
= ET
.tostring(create_bucket_configuration
, encoding
='unicode')
559 headers
= None # type: Optional[dict]
561 headers
= {'x-amz-bucket-object-lock-enabled': 'true'}
563 return request(data
=data
, headers
=headers
)
565 def get_placement_targets(self
): # type: () -> dict
566 zone
= self
._get
_daemon
_zone
_info
()
567 placement_targets
= [] # type: List[Dict]
568 for placement_pool
in zone
['placement_pools']:
569 placement_targets
.append(
571 'name': placement_pool
['key'],
572 'data_pool': placement_pool
['val']['storage_classes']['STANDARD']['data_pool']
576 return {'zonegroup': self
.daemon
.zonegroup_name
,
577 'placement_targets': placement_targets
}
579 def get_realms(self
): # type: () -> List
580 realms_info
= self
._get
_realms
_info
()
581 if 'realms' in realms_info
and realms_info
['realms']:
582 return realms_info
['realms']
586 def get_default_realm(self
) -> str:
587 realms_info
= self
._get
_realms
_info
()
588 if 'default_info' in realms_info
and realms_info
['default_info']:
589 realm_info
= self
._get
_realm
_info
(realms_info
['default_info'])
590 if 'name' in realm_info
and realm_info
['name']:
591 return realm_info
['name']
592 raise DashboardException(msg
='Default realm not found.',
593 http_status_code
=404,
596 @RestClient.api_get('/{bucket_name}?versioning')
597 def get_bucket_versioning(self
, bucket_name
, request
=None):
599 Get bucket versioning.
600 :param str bucket_name: the name of the bucket.
601 :return: versioning info
604 # pylint: disable=unused-argument
606 if 'Status' not in result
:
607 result
['Status'] = 'Suspended'
608 if 'MfaDelete' not in result
:
609 result
['MfaDelete'] = 'Disabled'
612 @RestClient.api_put('/{bucket_name}?versioning')
613 def set_bucket_versioning(self
, bucket_name
, versioning_state
, mfa_delete
,
614 mfa_token_serial
, mfa_token_pin
, request
=None):
616 Set bucket versioning.
617 :param str bucket_name: the name of the bucket.
618 :param str versioning_state:
619 https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTVersioningStatus.html
620 :param str mfa_delete: MFA Delete state.
621 :param str mfa_token_serial:
622 https://docs.ceph.com/docs/master/radosgw/mfa/
623 :param str mfa_token_pin: value of a TOTP token at a certain time (auth code)
626 # pylint: disable=unused-argument
627 versioning_configuration
= ET
.Element('VersioningConfiguration')
628 status_element
= ET
.SubElement(versioning_configuration
, 'Status')
629 status_element
.text
= versioning_state
632 if mfa_delete
and mfa_token_serial
and mfa_token_pin
:
633 headers
['x-amz-mfa'] = '{} {}'.format(mfa_token_serial
, mfa_token_pin
)
634 mfa_delete_element
= ET
.SubElement(versioning_configuration
, 'MfaDelete')
635 mfa_delete_element
.text
= mfa_delete
637 data
= ET
.tostring(versioning_configuration
, encoding
='unicode')
640 request(data
=data
, headers
=headers
)
641 except RequestException
as error
:
643 if mfa_delete
and mfa_token_serial
and mfa_token_pin \
644 and 'AccessDenied' in error
.content
.decode():
645 msg
= 'Bad MFA credentials: {}'.format(msg
)
646 raise DashboardException(msg
=msg
,
647 http_status_code
=error
.status_code
,
650 @RestClient.api_get('/{bucket_name}?encryption')
651 def get_bucket_encryption(self
, bucket_name
, request
=None):
652 # pylint: disable=unused-argument
654 result
= request() # type: ignore
655 result
['Status'] = 'Enabled'
657 except RequestException
as e
:
659 content
= json_str_to_object(e
.content
)
661 'Code') == 'ServerSideEncryptionConfigurationNotFoundError':
663 'Status': 'Disabled',
667 @RestClient.api_delete('/{bucket_name}?encryption')
668 def delete_bucket_encryption(self
, bucket_name
, request
=None):
669 # pylint: disable=unused-argument
670 result
= request() # type: ignore
673 @RestClient.api_put('/{bucket_name}?encryption')
674 def set_bucket_encryption(self
, bucket_name
, key_id
,
675 sse_algorithm
, request
: Optional
[object] = None):
676 # pylint: disable=unused-argument
677 encryption_configuration
= ET
.Element('ServerSideEncryptionConfiguration')
678 rule_element
= ET
.SubElement(encryption_configuration
, 'Rule')
679 default_encryption_element
= ET
.SubElement(rule_element
,
680 'ApplyServerSideEncryptionByDefault')
681 sse_algo_element
= ET
.SubElement(default_encryption_element
,
683 sse_algo_element
.text
= sse_algorithm
684 if sse_algorithm
== 'aws:kms':
685 kms_master_key_element
= ET
.SubElement(default_encryption_element
,
687 kms_master_key_element
.text
= key_id
688 data
= ET
.tostring(encryption_configuration
, encoding
='unicode')
690 _
= request(data
=data
) # type: ignore
691 except RequestException
as e
:
692 raise DashboardException(msg
=str(e
), component
='rgw')
694 @RestClient.api_get('/{bucket_name}?object-lock')
695 def get_bucket_locking(self
, bucket_name
, request
=None):
696 # type: (str, Optional[object]) -> dict
698 Gets the locking configuration for a bucket. The locking
699 configuration will be applied by default to every new object
700 placed in the specified bucket.
701 :param bucket_name: The name of the bucket.
702 :type bucket_name: str
703 :return: The locking configuration.
706 # pylint: disable=unused-argument
708 # Try to get the Object Lock configuration. If there is none,
709 # then return default values.
711 result
= request() # type: ignore
713 'lock_enabled': dict_get(result
, 'ObjectLockEnabled') == 'Enabled',
714 'lock_mode': dict_get(result
, 'Rule.DefaultRetention.Mode'),
715 'lock_retention_period_days': dict_get(result
, 'Rule.DefaultRetention.Days', 0),
716 'lock_retention_period_years': dict_get(result
, 'Rule.DefaultRetention.Years', 0)
718 except RequestException
as e
:
720 content
= json_str_to_object(e
.content
)
722 'Code') == 'ObjectLockConfigurationNotFoundError':
724 'lock_enabled': False,
725 'lock_mode': 'compliance',
726 'lock_retention_period_days': None,
727 'lock_retention_period_years': None
731 @RestClient.api_put('/{bucket_name}?object-lock')
732 def set_bucket_locking(self
,
735 retention_period_days
: Optional
[Union
[int, str]] = None,
736 retention_period_years
: Optional
[Union
[int, str]] = None,
737 request
: Optional
[object] = None) -> None:
739 Places the locking configuration on the specified bucket. The
740 locking configuration will be applied by default to every new
741 object placed in the specified bucket.
742 :param bucket_name: The name of the bucket.
743 :type bucket_name: str
744 :param mode: The lock mode, e.g. `COMPLIANCE` or `GOVERNANCE`.
746 :param retention_period_days:
747 :type retention_period_days: int
748 :param retention_period_years:
749 :type retention_period_years: int
752 # pylint: disable=unused-argument
754 # Do some validations.
756 retention_period_days
= int(retention_period_days
) if retention_period_days
else 0
757 retention_period_years
= int(retention_period_years
) if retention_period_years
else 0
758 if retention_period_days
< 0 or retention_period_years
< 0:
760 except (TypeError, ValueError):
761 msg
= "Retention period must be a positive integer."
762 raise DashboardException(msg
=msg
, component
='rgw')
763 if retention_period_days
and retention_period_years
:
764 # https://docs.aws.amazon.com/AmazonS3/latest/API/archive-RESTBucketPUTObjectLockConfiguration.html
765 msg
= "Retention period requires either Days or Years. "\
766 "You can't specify both at the same time."
767 raise DashboardException(msg
=msg
, component
='rgw')
768 if not retention_period_days
and not retention_period_years
:
769 msg
= "Retention period requires either Days or Years. "\
770 "You must specify at least one."
771 raise DashboardException(msg
=msg
, component
='rgw')
772 if not isinstance(mode
, str) or mode
.upper() not in ['COMPLIANCE', 'GOVERNANCE']:
773 msg
= "Retention mode must be either COMPLIANCE or GOVERNANCE."
774 raise DashboardException(msg
=msg
, component
='rgw')
776 # Generate the XML data like this:
777 # <ObjectLockConfiguration>
778 # <ObjectLockEnabled>string</ObjectLockEnabled>
781 # <Days>integer</Days>
782 # <Mode>string</Mode>
783 # <Years>integer</Years>
784 # </DefaultRetention>
786 # </ObjectLockConfiguration>
787 locking_configuration
= ET
.Element('ObjectLockConfiguration')
788 enabled_element
= ET
.SubElement(locking_configuration
,
790 enabled_element
.text
= 'Enabled' # Locking can't be disabled.
791 rule_element
= ET
.SubElement(locking_configuration
, 'Rule')
792 default_retention_element
= ET
.SubElement(rule_element
,
794 mode_element
= ET
.SubElement(default_retention_element
, 'Mode')
795 mode_element
.text
= mode
.upper()
796 if retention_period_days
:
797 days_element
= ET
.SubElement(default_retention_element
, 'Days')
798 days_element
.text
= str(retention_period_days
)
799 if retention_period_years
:
800 years_element
= ET
.SubElement(default_retention_element
, 'Years')
801 years_element
.text
= str(retention_period_years
)
803 data
= ET
.tostring(locking_configuration
, encoding
='unicode')
806 _
= request(data
=data
) # type: ignore
807 except RequestException
as e
:
808 raise DashboardException(msg
=str(e
), component
='rgw')