1 # -*- coding: utf-8 -*-
7 import xml
.etree
.ElementTree
as ET
# noqa: N814
8 from distutils
.util
import strtobool
9 from subprocess
import SubprocessError
11 from mgr_util
import build_url
14 from ..awsauth
import S3Auth
15 from ..exceptions
import DashboardException
16 from ..rest_client
import RequestException
, RestClient
17 from ..settings
import Settings
18 from ..tools
import dict_contains_path
, dict_get
, json_str_to_object
21 from typing
import Any
, Dict
, List
, Optional
, Tuple
, Union
23 pass # For typing only
25 logger
= logging
.getLogger('rgw_client')
28 class NoRgwDaemonsException(Exception):
30 super().__init
__('No RGW service is running.')
33 class NoCredentialsException(Exception):
35 super(NoCredentialsException
, self
).__init
__(
36 'No RGW credentials found, '
37 'please consult the documentation on how to enable RGW for '
41 class RgwAdminException(Exception):
46 """Simple representation of a daemon."""
56 def _get_daemons() -> Dict
[str, RgwDaemon
]:
58 Retrieve RGW daemon info from MGR.
60 service_map
= mgr
.get('service_map')
61 if not dict_contains_path(service_map
, ['services', 'rgw', 'daemons']):
62 raise NoRgwDaemonsException
65 daemon_map
= service_map
['services']['rgw']['daemons']
66 for key
in daemon_map
.keys():
67 if dict_contains_path(daemon_map
[key
], ['metadata', 'frontend_config#0']):
68 daemon
= _determine_rgw_addr(daemon_map
[key
])
69 daemon
.name
= daemon_map
[key
]['metadata']['id']
70 daemon
.realm_name
= daemon_map
[key
]['metadata']['realm_name']
71 daemon
.zonegroup_name
= daemon_map
[key
]['metadata']['zonegroup_name']
72 daemon
.zone_name
= daemon_map
[key
]['metadata']['zone_name']
73 daemons
[daemon
.name
] = daemon
74 logger
.info('Found RGW daemon with configuration: host=%s, port=%d, ssl=%s',
75 daemon
.host
, daemon
.port
, str(daemon
.ssl
))
77 raise NoRgwDaemonsException
82 def _determine_rgw_addr(daemon_info
: Dict
[str, Any
]) -> RgwDaemon
:
84 Parse RGW daemon info to determine the configured host (IP address) and port.
87 daemon
.host
= _parse_addr(daemon_info
['addr'])
88 daemon
.port
, daemon
.ssl
= _parse_frontend_config(daemon_info
['metadata']['frontend_config#0'])
93 def _parse_addr(value
) -> str:
95 Get the IP address the RGW is running on.
97 >>> _parse_addr('192.168.178.3:49774/1534999298')
100 >>> _parse_addr('[2001:db8:85a3::8a2e:370:7334]:49774/1534999298')
101 '2001:db8:85a3::8a2e:370:7334'
103 >>> _parse_addr('xyz')
104 Traceback (most recent call last):
106 LookupError: Failed to determine RGW address
108 >>> _parse_addr('192.168.178.a:8080/123456789')
109 Traceback (most recent call last):
111 LookupError: Invalid RGW address '192.168.178.a' found
113 >>> _parse_addr('[2001:0db8:1234]:443/123456789')
114 Traceback (most recent call last):
116 LookupError: Invalid RGW address '2001:0db8:1234' found
118 >>> _parse_addr('2001:0db8::1234:49774/1534999298')
119 Traceback (most recent call last):
121 LookupError: Failed to determine RGW address
123 :param value: The string to process. The syntax is '<HOST>:<PORT>/<NONCE>'.
125 :raises LookupError if parsing fails to determine the IP address.
126 :return: The IP address.
129 match
= re
.search(r
'^(\[)?(?(1)([^\]]+)\]|([^:]+)):\d+/\d+?', value
)
132 # Group 0: 192.168.178.3:49774/1534999298
133 # Group 3: 192.168.178.3
135 # Group 0: [2001:db8:85a3::8a2e:370:7334]:49774/1534999298
137 # Group 2: 2001:db8:85a3::8a2e:370:7334
138 addr
= match
.group(3) if match
.group(3) else match
.group(2)
140 ipaddress
.ip_address(addr
)
143 raise LookupError('Invalid RGW address \'{}\' found'.format(addr
))
144 raise LookupError('Failed to determine RGW address')
147 def _parse_frontend_config(config
) -> Tuple
[int, bool]:
149 Get the port the RGW is running on. Due the complexity of the
150 syntax not all variations are supported.
152 If there are multiple (ssl_)ports/(ssl_)endpoints options, then
153 the first found option will be returned.
155 Get more details about the configuration syntax here:
156 http://docs.ceph.com/en/latest/radosgw/frontends/
157 https://civetweb.github.io/civetweb/UserManual.html
159 :param config: The configuration string to parse.
161 :raises LookupError if parsing fails to determine the port.
162 :return: A tuple containing the port number and the information
164 :rtype: (int, boolean)
166 match
= re
.search(r
'^(beast|civetweb)\s+.+$', config
)
168 if match
.group(1) == 'beast':
169 match
= re
.search(r
'(port|ssl_port|endpoint|ssl_endpoint)=(.+)',
172 option_name
= match
.group(1)
173 if option_name
in ['port', 'ssl_port']:
174 match
= re
.search(r
'(\d+)', match
.group(2))
176 port
= int(match
.group(1))
177 ssl
= option_name
== 'ssl_port'
179 if option_name
in ['endpoint', 'ssl_endpoint']:
180 match
= re
.search(r
'([\d.]+|\[.+\])(:(\d+))?',
181 match
.group(2)) # type: ignore
183 port
= int(match
.group(3)) if \
184 match
.group(2) is not None else 443 if \
185 option_name
== 'ssl_endpoint' else \
187 ssl
= option_name
== 'ssl_endpoint'
189 if match
.group(1) == 'civetweb': # type: ignore
190 match
= re
.search(r
'port=(.*:)?(\d+)(s)?', config
)
192 port
= int(match
.group(2))
193 ssl
= match
.group(3) == 's'
195 raise LookupError('Failed to determine RGW port from "{}"'.format(config
))
198 def _parse_secrets(user
: str, data
: dict) -> Tuple
[str, str]:
199 for key
in data
.get('keys', []):
200 if key
.get('user') == user
and data
.get('system') in ['true', True]:
201 access_key
= key
.get('access_key')
202 secret_key
= key
.get('secret_key')
203 return access_key
, secret_key
207 def _get_user_keys(user
: str, realm
: Optional
[str] = None) -> Tuple
[str, str]:
210 rgw_user_info_cmd
= ['user', 'info', '--uid', user
]
211 cmd_realm_option
= ['--rgw-realm', realm
] if realm
else []
213 rgw_user_info_cmd
+= cmd_realm_option
215 _
, out
, err
= mgr
.send_rgwadmin_command(rgw_user_info_cmd
)
217 access_key
, secret_key
= _parse_secrets(user
, out
)
219 rgw_create_user_cmd
= [
222 '--display-name', 'Ceph Dashboard',
225 _
, out
, err
= mgr
.send_rgwadmin_command(rgw_create_user_cmd
)
227 access_key
, secret_key
= _parse_secrets(user
, out
)
229 logger
.error('Unable to create rgw user "%s": %s', user
, err
)
230 except SubprocessError
as error
:
231 logger
.exception(error
)
233 return access_key
, secret_key
236 def configure_rgw_credentials():
237 logger
.info('Configuring dashboard RGW credentials')
243 _
, out
, err
= mgr
.send_rgwadmin_command(['realm', 'list'])
245 realms
= out
.get('realms', [])
247 logger
.error('Unable to list RGW realms: %s', err
)
249 realm_access_keys
= {}
250 realm_secret_keys
= {}
252 realm_access_key
, realm_secret_key
= _get_user_keys(user
, realm
)
254 realm_access_keys
[realm
] = realm_access_key
255 realm_secret_keys
[realm
] = realm_secret_key
256 if realm_access_keys
:
257 access_key
= json
.dumps(realm_access_keys
)
258 secret_key
= json
.dumps(realm_secret_keys
)
260 access_key
, secret_key
= _get_user_keys(user
)
262 assert access_key
and secret_key
263 Settings
.RGW_API_ACCESS_KEY
= access_key
264 Settings
.RGW_API_SECRET_KEY
= secret_key
265 except (AssertionError, SubprocessError
) as error
:
266 logger
.exception(error
)
267 raise NoCredentialsException
270 class RgwClient(RestClient
):
274 _user_instances
= {} # type: Dict[str, Dict[str, RgwClient]]
275 _config_instances
= {} # type: Dict[str, RgwClient]
276 _rgw_settings_snapshot
= None
277 _daemons
: Dict
[str, RgwDaemon
] = {}
279 got_keys_from_config
: bool
283 def _handle_response_status_code(status_code
: int) -> int:
284 # Do not return auth error codes (so they are not handled as ceph API user auth errors).
285 return 404 if status_code
in [401, 403] else status_code
288 def _get_daemon_connection_info(daemon_name
: str) -> dict:
290 realm_name
= RgwClient
._daemons
[daemon_name
].realm_name
291 access_key
= Settings
.RGW_API_ACCESS_KEY
[realm_name
]
292 secret_key
= Settings
.RGW_API_SECRET_KEY
[realm_name
]
294 # Legacy string values.
295 access_key
= Settings
.RGW_API_ACCESS_KEY
296 secret_key
= Settings
.RGW_API_SECRET_KEY
297 except KeyError as error
:
298 raise DashboardException(msg
='Credentials not found for RGW Daemon: {}'.format(error
),
299 http_status_code
=404,
302 return {'access_key': access_key
, 'secret_key': secret_key
}
304 def _get_daemon_zone_info(self
): # type: () -> dict
305 return json_str_to_object(self
.proxy('GET', 'config?type=zone', None, None))
307 def _get_realms_info(self
): # type: () -> dict
308 return json_str_to_object(self
.proxy('GET', 'realm?list', None, None))
310 def _get_realm_info(self
, realm_id
: str) -> Dict
[str, Any
]:
311 return json_str_to_object(self
.proxy('GET', f
'realm?id={realm_id}', None, None))
315 return (Settings
.RGW_API_ACCESS_KEY
,
316 Settings
.RGW_API_SECRET_KEY
,
317 Settings
.RGW_API_ADMIN_RESOURCE
,
318 Settings
.RGW_API_SSL_VERIFY
)
321 def instance(userid
: Optional
[str] = None,
322 daemon_name
: Optional
[str] = None) -> 'RgwClient':
323 # pylint: disable=too-many-branches
325 RgwClient
._daemons
= _get_daemons()
327 # The API access key and secret key are mandatory for a minimal configuration.
328 if not (Settings
.RGW_API_ACCESS_KEY
and Settings
.RGW_API_SECRET_KEY
):
329 configure_rgw_credentials()
333 daemon_name
= next(iter(RgwClient
._daemons
.keys()))
335 # Discard all cached instances if any rgw setting has changed
336 if RgwClient
._rgw
_settings
_snapshot
!= RgwClient
._rgw
_settings
():
337 RgwClient
._rgw
_settings
_snapshot
= RgwClient
._rgw
_settings
()
338 RgwClient
.drop_instance()
340 if daemon_name
not in RgwClient
._config
_instances
:
341 connection_info
= RgwClient
._get
_daemon
_connection
_info
(daemon_name
)
342 RgwClient
._config
_instances
[daemon_name
] = RgwClient(connection_info
['access_key'],
343 connection_info
['secret_key'],
346 if not userid
or userid
== RgwClient
._config
_instances
[daemon_name
].userid
:
347 return RgwClient
._config
_instances
[daemon_name
]
349 if daemon_name
not in RgwClient
._user
_instances \
350 or userid
not in RgwClient
._user
_instances
[daemon_name
]:
351 # Get the access and secret keys for the specified user.
352 keys
= RgwClient
._config
_instances
[daemon_name
].get_user_keys(userid
)
354 raise RequestException(
355 "User '{}' does not have any keys configured.".format(
357 instance
= RgwClient(keys
['access_key'],
361 RgwClient
._user
_instances
.update({daemon_name
: {userid
: instance
}})
363 return RgwClient
._user
_instances
[daemon_name
][userid
]
366 def admin_instance(daemon_name
: Optional
[str] = None) -> 'RgwClient':
367 return RgwClient
.instance(daemon_name
=daemon_name
)
370 def drop_instance(instance
: Optional
['RgwClient'] = None):
372 Drop a cached instance or all.
375 if instance
.got_keys_from_config
:
376 del RgwClient
._config
_instances
[instance
.daemon
.name
]
378 del RgwClient
._user
_instances
[instance
.daemon
.name
][instance
.userid
]
380 RgwClient
._config
_instances
.clear()
381 RgwClient
._user
_instances
.clear()
383 def _reset_login(self
):
384 if self
.got_keys_from_config
:
385 raise RequestException('Authentication failed for the "{}" user: wrong credentials'
386 .format(self
.userid
), status_code
=401)
387 logger
.info("Fetching new keys for user: %s", self
.userid
)
388 keys
= RgwClient
.admin_instance(daemon_name
=self
.daemon
.name
).get_user_keys(self
.userid
)
389 self
.auth
= S3Auth(keys
['access_key'], keys
['secret_key'],
390 service_url
=self
.service_url
)
396 user_id
: Optional
[str] = None) -> None:
398 daemon
= RgwClient
._daemons
[daemon_name
]
399 except KeyError as error
:
400 raise DashboardException(msg
='RGW Daemon not found: {}'.format(error
),
401 http_status_code
=404,
403 ssl_verify
= Settings
.RGW_API_SSL_VERIFY
404 self
.admin_path
= Settings
.RGW_API_ADMIN_RESOURCE
405 self
.service_url
= build_url(host
=daemon
.host
, port
=daemon
.port
)
407 self
.auth
= S3Auth(access_key
, secret_key
, service_url
=self
.service_url
)
408 super(RgwClient
, self
).__init
__(daemon
.host
,
413 ssl_verify
=ssl_verify
)
414 self
.got_keys_from_config
= not user_id
416 self
.userid
= self
._get
_user
_id
(self
.admin_path
) if self
.got_keys_from_config \
418 except RequestException
as error
:
419 logger
.exception(error
)
420 msg
= 'Error connecting to Object Gateway'
421 if error
.status_code
== 404:
422 msg
= '{}: {}'.format(msg
, str(error
))
423 raise DashboardException(msg
=msg
,
424 http_status_code
=error
.status_code
,
428 logger
.info("Created new connection: daemon=%s, host=%s, port=%s, ssl=%d, sslverify=%d",
429 daemon
.name
, daemon
.host
, daemon
.port
, daemon
.ssl
, ssl_verify
)
431 @RestClient.api_get('/', resp_structure
='[0] > (ID & DisplayName)')
432 def is_service_online(self
, request
=None) -> bool:
434 Consider the service as online if the response contains the
435 specified keys. Nothing more is checked here.
437 _
= request({'format': 'json'})
440 @RestClient.api_get('/{admin_path}/metadata/user?myself',
441 resp_structure
='data > user_id')
442 def _get_user_id(self
, admin_path
, request
=None):
443 # pylint: disable=unused-argument
445 Get the user ID of the user that is used to communicate with the
448 :return: The user ID of the user that is used to sign the
449 RGW Admin Ops API calls.
452 return response
['data']['user_id']
454 @RestClient.api_get('/{admin_path}/metadata/user', resp_structure
='[+]')
455 def _user_exists(self
, admin_path
, user_id
, request
=None):
456 # pylint: disable=unused-argument
459 return user_id
in response
460 return self
.userid
in response
462 def user_exists(self
, user_id
=None):
463 return self
._user
_exists
(self
.admin_path
, user_id
)
465 @RestClient.api_get('/{admin_path}/metadata/user?key={userid}',
466 resp_structure
='data > system')
467 def _is_system_user(self
, admin_path
, userid
, request
=None) -> bool:
468 # pylint: disable=unused-argument
470 return strtobool(response
['data']['system'])
472 def is_system_user(self
) -> bool:
473 return self
._is
_system
_user
(self
.admin_path
, self
.userid
)
476 '/{admin_path}/user',
477 resp_structure
='tenant & user_id & email & keys[*] > '
478 ' (user & access_key & secret_key)')
479 def _admin_get_user_keys(self
, admin_path
, userid
, request
=None):
480 # pylint: disable=unused-argument
481 colon_idx
= userid
.find(':')
482 user
= userid
if colon_idx
== -1 else userid
[:colon_idx
]
483 response
= request({'uid': user
})
484 for key
in response
['keys']:
485 if key
['user'] == userid
:
487 'access_key': key
['access_key'],
488 'secret_key': key
['secret_key']
492 def get_user_keys(self
, userid
):
493 return self
._admin
_get
_user
_keys
(self
.admin_path
, userid
)
495 @RestClient.api('/{admin_path}/{path}')
497 self
, # pylint: disable=too-many-arguments
504 # pylint: disable=unused-argument
505 return request(method
=method
, params
=params
, data
=data
,
508 def proxy(self
, method
, path
, params
, data
):
509 logger
.debug("proxying method=%s path=%s params=%s data=%s",
510 method
, path
, params
, data
)
511 return self
._proxy
_request
(self
.admin_path
, path
, method
,
514 @RestClient.api_get('/', resp_structure
='[1][*] > Name')
515 def get_buckets(self
, request
=None):
517 Get a list of names from all existing buckets of this user.
518 :return: Returns a list of bucket names.
520 response
= request({'format': 'json'})
521 return [bucket
['Name'] for bucket
in response
[1]]
523 @RestClient.api_get('/{bucket_name}')
524 def bucket_exists(self
, bucket_name
, userid
, request
=None):
526 Check if the specified bucket exists for this user.
527 :param bucket_name: The name of the bucket.
528 :return: Returns True if the bucket exists, otherwise False.
530 # pylint: disable=unused-argument
533 my_buckets
= self
.get_buckets()
534 if bucket_name
not in my_buckets
:
535 raise RequestException(
536 'Bucket "{}" belongs to other user'.format(bucket_name
),
539 except RequestException
as e
:
540 if e
.status_code
== 404:
545 @RestClient.api_put('/{bucket_name}')
546 def create_bucket(self
, bucket_name
, zonegroup
=None,
547 placement_target
=None, lock_enabled
=False,
549 logger
.info("Creating bucket: %s, zonegroup: %s, placement_target: %s",
550 bucket_name
, zonegroup
, placement_target
)
552 if zonegroup
and placement_target
:
553 create_bucket_configuration
= ET
.Element('CreateBucketConfiguration')
554 location_constraint
= ET
.SubElement(create_bucket_configuration
, 'LocationConstraint')
555 location_constraint
.text
= '{}:{}'.format(zonegroup
, placement_target
)
556 data
= ET
.tostring(create_bucket_configuration
, encoding
='unicode')
558 headers
= None # type: Optional[dict]
560 headers
= {'x-amz-bucket-object-lock-enabled': 'true'}
562 return request(data
=data
, headers
=headers
)
564 def get_placement_targets(self
): # type: () -> dict
565 zone
= self
._get
_daemon
_zone
_info
()
566 placement_targets
= [] # type: List[Dict]
567 for placement_pool
in zone
['placement_pools']:
568 placement_targets
.append(
570 'name': placement_pool
['key'],
571 'data_pool': placement_pool
['val']['storage_classes']['STANDARD']['data_pool']
575 return {'zonegroup': self
.daemon
.zonegroup_name
,
576 'placement_targets': placement_targets
}
578 def get_realms(self
): # type: () -> List
579 realms_info
= self
._get
_realms
_info
()
580 if 'realms' in realms_info
and realms_info
['realms']:
581 return realms_info
['realms']
585 def get_default_realm(self
) -> str:
586 realms_info
= self
._get
_realms
_info
()
587 if 'default_info' in realms_info
and realms_info
['default_info']:
588 realm_info
= self
._get
_realm
_info
(realms_info
['default_info'])
589 if 'name' in realm_info
and realm_info
['name']:
590 return realm_info
['name']
591 raise DashboardException(msg
='Default realm not found.',
592 http_status_code
=404,
595 @RestClient.api_get('/{bucket_name}?versioning')
596 def get_bucket_versioning(self
, bucket_name
, request
=None):
598 Get bucket versioning.
599 :param str bucket_name: the name of the bucket.
600 :return: versioning info
603 # pylint: disable=unused-argument
605 if 'Status' not in result
:
606 result
['Status'] = 'Suspended'
607 if 'MfaDelete' not in result
:
608 result
['MfaDelete'] = 'Disabled'
611 @RestClient.api_put('/{bucket_name}?versioning')
612 def set_bucket_versioning(self
, bucket_name
, versioning_state
, mfa_delete
,
613 mfa_token_serial
, mfa_token_pin
, request
=None):
615 Set bucket versioning.
616 :param str bucket_name: the name of the bucket.
617 :param str versioning_state:
618 https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketPUTVersioningStatus.html
619 :param str mfa_delete: MFA Delete state.
620 :param str mfa_token_serial:
621 https://docs.ceph.com/docs/master/radosgw/mfa/
622 :param str mfa_token_pin: value of a TOTP token at a certain time (auth code)
625 # pylint: disable=unused-argument
626 versioning_configuration
= ET
.Element('VersioningConfiguration')
627 status_element
= ET
.SubElement(versioning_configuration
, 'Status')
628 status_element
.text
= versioning_state
631 if mfa_delete
and mfa_token_serial
and mfa_token_pin
:
632 headers
['x-amz-mfa'] = '{} {}'.format(mfa_token_serial
, mfa_token_pin
)
633 mfa_delete_element
= ET
.SubElement(versioning_configuration
, 'MfaDelete')
634 mfa_delete_element
.text
= mfa_delete
636 data
= ET
.tostring(versioning_configuration
, encoding
='unicode')
639 request(data
=data
, headers
=headers
)
640 except RequestException
as error
:
642 if mfa_delete
and mfa_token_serial
and mfa_token_pin \
643 and 'AccessDenied' in error
.content
.decode():
644 msg
= 'Bad MFA credentials: {}'.format(msg
)
645 raise DashboardException(msg
=msg
,
646 http_status_code
=error
.status_code
,
649 @RestClient.api_get('/{bucket_name}?object-lock')
650 def get_bucket_locking(self
, bucket_name
, request
=None):
651 # type: (str, Optional[object]) -> dict
653 Gets the locking configuration for a bucket. The locking
654 configuration will be applied by default to every new object
655 placed in the specified bucket.
656 :param bucket_name: The name of the bucket.
657 :type bucket_name: str
658 :return: The locking configuration.
661 # pylint: disable=unused-argument
663 # Try to get the Object Lock configuration. If there is none,
664 # then return default values.
666 result
= request() # type: ignore
668 'lock_enabled': dict_get(result
, 'ObjectLockEnabled') == 'Enabled',
669 'lock_mode': dict_get(result
, 'Rule.DefaultRetention.Mode'),
670 'lock_retention_period_days': dict_get(result
, 'Rule.DefaultRetention.Days', 0),
671 'lock_retention_period_years': dict_get(result
, 'Rule.DefaultRetention.Years', 0)
673 except RequestException
as e
:
675 content
= json_str_to_object(e
.content
)
677 'Code') == 'ObjectLockConfigurationNotFoundError':
679 'lock_enabled': False,
680 'lock_mode': 'compliance',
681 'lock_retention_period_days': None,
682 'lock_retention_period_years': None
686 @RestClient.api_put('/{bucket_name}?object-lock')
687 def set_bucket_locking(self
,
690 retention_period_days
: Optional
[Union
[int, str]] = None,
691 retention_period_years
: Optional
[Union
[int, str]] = None,
692 request
: Optional
[object] = None) -> None:
694 Places the locking configuration on the specified bucket. The
695 locking configuration will be applied by default to every new
696 object placed in the specified bucket.
697 :param bucket_name: The name of the bucket.
698 :type bucket_name: str
699 :param mode: The lock mode, e.g. `COMPLIANCE` or `GOVERNANCE`.
701 :param retention_period_days:
702 :type retention_period_days: int
703 :param retention_period_years:
704 :type retention_period_years: int
707 # pylint: disable=unused-argument
709 # Do some validations.
711 retention_period_days
= int(retention_period_days
) if retention_period_days
else 0
712 retention_period_years
= int(retention_period_years
) if retention_period_years
else 0
713 if retention_period_days
< 0 or retention_period_years
< 0:
715 except (TypeError, ValueError):
716 msg
= "Retention period must be a positive integer."
717 raise DashboardException(msg
=msg
, component
='rgw')
718 if retention_period_days
and retention_period_years
:
719 # https://docs.aws.amazon.com/AmazonS3/latest/API/archive-RESTBucketPUTObjectLockConfiguration.html
720 msg
= "Retention period requires either Days or Years. "\
721 "You can't specify both at the same time."
722 raise DashboardException(msg
=msg
, component
='rgw')
723 if not retention_period_days
and not retention_period_years
:
724 msg
= "Retention period requires either Days or Years. "\
725 "You must specify at least one."
726 raise DashboardException(msg
=msg
, component
='rgw')
727 if not isinstance(mode
, str) or mode
.upper() not in ['COMPLIANCE', 'GOVERNANCE']:
728 msg
= "Retention mode must be either COMPLIANCE or GOVERNANCE."
729 raise DashboardException(msg
=msg
, component
='rgw')
731 # Generate the XML data like this:
732 # <ObjectLockConfiguration>
733 # <ObjectLockEnabled>string</ObjectLockEnabled>
736 # <Days>integer</Days>
737 # <Mode>string</Mode>
738 # <Years>integer</Years>
739 # </DefaultRetention>
741 # </ObjectLockConfiguration>
742 locking_configuration
= ET
.Element('ObjectLockConfiguration')
743 enabled_element
= ET
.SubElement(locking_configuration
,
745 enabled_element
.text
= 'Enabled' # Locking can't be disabled.
746 rule_element
= ET
.SubElement(locking_configuration
, 'Rule')
747 default_retention_element
= ET
.SubElement(rule_element
,
749 mode_element
= ET
.SubElement(default_retention_element
, 'Mode')
750 mode_element
.text
= mode
.upper()
751 if retention_period_days
:
752 days_element
= ET
.SubElement(default_retention_element
, 'Days')
753 days_element
.text
= str(retention_period_days
)
754 if retention_period_years
:
755 years_element
= ET
.SubElement(default_retention_element
, 'Years')
756 years_element
.text
= str(retention_period_years
)
758 data
= ET
.tostring(locking_configuration
, encoding
='unicode')
761 _
= request(data
=data
) # type: ignore
762 except RequestException
as e
:
763 raise DashboardException(msg
=str(e
), component
='rgw')