# -*- coding: utf-8 -*-
-from __future__ import absolute_import
import json
+import logging
import cherrypy
-from . import ApiController, BaseController, RESTController, Endpoint, \
- ReadPermission
-from .. import logger
from ..exceptions import DashboardException
from ..rest_client import RequestException
-from ..security import Scope
+from ..security import Permission, Scope
+from ..services.auth import AuthManager, JwtManager
from ..services.ceph_service import CephService
-from ..services.rgw_client import RgwClient
-
-
-@ApiController('/rgw', Scope.RGW)
+from ..services.rgw_client import NoRgwDaemonsException, RgwClient
+from ..tools import json_str_to_object, str_to_bool
+from . import APIDoc, APIRouter, BaseController, Endpoint, EndpointDoc, \
+ ReadPermission, RESTController, UIRouter, allow_empty_body
+from ._version import APIVersion
+
+try:
+ from typing import Any, Dict, List, Optional, Union
+except ImportError: # pragma: no cover
+ pass # Just for type checking
+
+logger = logging.getLogger("controllers.rgw")
+
+RGW_SCHEMA = {
+ "available": (bool, "Is RGW available?"),
+ "message": (str, "Descriptions")
+}
+
+RGW_DAEMON_SCHEMA = {
+ "id": (str, "Daemon ID"),
+ "version": (str, "Ceph Version"),
+ "server_hostname": (str, ""),
+ "zonegroup_name": (str, "Zone Group"),
+ "zone_name": (str, "Zone")
+}
+
+RGW_USER_SCHEMA = {
+ "list_of_users": ([str], "list of rgw users")
+}
+
+
+@UIRouter('/rgw', Scope.RGW)
+@APIDoc("RGW Management API", "Rgw")
class Rgw(BaseController):
-
@Endpoint()
@ReadPermission
- def status(self):
+ @EndpointDoc("Display RGW Status",
+ responses={200: RGW_SCHEMA})
+ def status(self) -> dict:
status = {'available': False, 'message': None}
try:
instance = RgwClient.admin_instance()
# Check if the service is online.
- if not instance.is_service_online():
+ try:
+ is_online = instance.is_service_online()
+ except RequestException as e:
+ # Drop this instance because the RGW client seems not to
+ # exist anymore (maybe removed via orchestrator). Removing
+ # the instance from the cache will result in the correct
+ # error message next time when the backend tries to
+ # establish a new connection (-> 'No RGW found' instead
+ # of 'RGW REST API failed request ...').
+ # Note, this only applies to auto-detected RGW clients.
+ RgwClient.drop_instance(instance)
+ raise e
+ if not is_online:
msg = 'Failed to connect to the Object Gateway\'s Admin Ops API.'
raise RequestException(msg)
- # Ensure the API user ID is known by the RGW.
- if not instance.user_exists():
- msg = 'The user "{}" is unknown to the Object Gateway.'.format(
- instance.userid)
- raise RequestException(msg)
# Ensure the system flag is set for the API user ID.
- if not instance.is_system_user():
+ if not instance.is_system_user(): # pragma: no cover - no complexity there
msg = 'The system flag is not set for user "{}".'.format(
instance.userid)
raise RequestException(msg)
status['available'] = True
- except (RequestException, LookupError) as ex:
- status['message'] = str(ex)
+ except (DashboardException, RequestException, NoRgwDaemonsException) as ex:
+ status['message'] = str(ex) # type: ignore
return status
-@ApiController('/rgw/daemon', Scope.RGW)
+@APIRouter('/rgw/daemon', Scope.RGW)
+@APIDoc("RGW Daemon Management API", "RgwDaemon")
class RgwDaemon(RESTController):
+ @EndpointDoc("Display RGW Daemons",
+ responses={200: [RGW_DAEMON_SCHEMA]})
+ def list(self) -> List[dict]:
+ daemons: List[dict] = []
+ try:
+ instance = RgwClient.admin_instance()
+ except NoRgwDaemonsException:
+ return daemons
- def list(self):
- daemons = []
for hostname, server in CephService.get_service_map('rgw').items():
for service in server['services']:
metadata = service['metadata']
# extract per-daemon service data and health
daemon = {
- 'id': service['id'],
+ 'id': metadata['id'],
+ 'service_map_id': service['id'],
'version': metadata['ceph_version'],
- 'server_hostname': hostname
+ 'server_hostname': hostname,
+ 'realm_name': metadata['realm_name'],
+ 'zonegroup_name': metadata['zonegroup_name'],
+ 'zone_name': metadata['zone_name'],
+ 'default': instance.daemon.name == metadata['id']
}
daemons.append(daemon)
return sorted(daemons, key=lambda k: k['id'])
def get(self, svc_id):
+ # type: (str) -> dict
daemon = {
'rgw_metadata': [],
'rgw_id': svc_id,
class RgwRESTController(RESTController):
-
- def proxy(self, method, path, params=None, json_response=True):
+ def proxy(self, daemon_name, method, path, params=None, json_response=True):
try:
- instance = RgwClient.admin_instance()
+ instance = RgwClient.admin_instance(daemon_name=daemon_name)
result = instance.proxy(method, path, params, None)
- if json_response and result != '':
- result = json.loads(result.decode('utf-8'))
+ if json_response:
+ result = json_str_to_object(result)
return result
except (DashboardException, RequestException) as e:
- raise DashboardException(e, http_status_code=500, component='rgw')
+ http_status_code = e.status if isinstance(e, DashboardException) else 500
+ raise DashboardException(e, http_status_code=http_status_code, component='rgw')
-@ApiController('/rgw/bucket', Scope.RGW)
-class RgwBucket(RgwRESTController):
+@APIRouter('/rgw/site', Scope.RGW)
+@APIDoc("RGW Site Management API", "RgwSite")
+class RgwSite(RgwRESTController):
+ def list(self, query=None, daemon_name=None):
+ if query == 'placement-targets':
+ return RgwClient.admin_instance(daemon_name=daemon_name).get_placement_targets()
+ if query == 'realms':
+ return RgwClient.admin_instance(daemon_name=daemon_name).get_realms()
+ if query == 'default-realm':
+ return RgwClient.admin_instance(daemon_name=daemon_name).get_default_realm()
+
+ # @TODO: for multisite: by default, retrieve cluster topology/map.
+ raise DashboardException(http_status_code=501, component='rgw', msg='Not Implemented')
+
+@APIRouter('/rgw/bucket', Scope.RGW)
+@APIDoc("RGW Bucket Management API", "RgwBucket")
+class RgwBucket(RgwRESTController):
def _append_bid(self, bucket):
"""
Append the bucket identifier that looks like [<tenant>/]<bucket>.
if bucket['tenant'] else bucket['bucket']
return bucket
+ def _get_versioning(self, owner, daemon_name, bucket_name):
+ rgw_client = RgwClient.instance(owner, daemon_name)
+ return rgw_client.get_bucket_versioning(bucket_name)
+
+ def _set_versioning(self, owner, daemon_name, bucket_name, versioning_state, mfa_delete,
+ mfa_token_serial, mfa_token_pin):
+ bucket_versioning = self._get_versioning(owner, daemon_name, bucket_name)
+ if versioning_state != bucket_versioning['Status']\
+ or (mfa_delete and mfa_delete != bucket_versioning['MfaDelete']):
+ rgw_client = RgwClient.instance(owner, daemon_name)
+ rgw_client.set_bucket_versioning(bucket_name, versioning_state, mfa_delete,
+ mfa_token_serial, mfa_token_pin)
+
+ def _get_locking(self, owner, daemon_name, bucket_name):
+ rgw_client = RgwClient.instance(owner, daemon_name)
+ return rgw_client.get_bucket_locking(bucket_name)
+
+ def _set_locking(self, owner, daemon_name, bucket_name, mode,
+ retention_period_days, retention_period_years):
+ rgw_client = RgwClient.instance(owner, daemon_name)
+ return rgw_client.set_bucket_locking(bucket_name, mode,
+ retention_period_days,
+ retention_period_years)
+
@staticmethod
- def strip_tenant_from_bucket_name(bucket_name, uid):
- # type (str, str) => str
+ def strip_tenant_from_bucket_name(bucket_name):
+ # type (str) -> str
"""
- When linking a bucket to a new user belonging to same tenant
- as the previous owner, tenant must be removed from the bucket name.
- >>> RgwBucket.strip_tenant_from_bucket_name('tenant/bucket-name', 'tenant$user1')
+ >>> RgwBucket.strip_tenant_from_bucket_name('tenant/bucket-name')
'bucket-name'
- >>> RgwBucket.strip_tenant_from_bucket_name('tenant/bucket-name', 'tenant2$user2')
- 'tenant/bucket-name'
- >>> RgwBucket.strip_tenant_from_bucket_name('bucket-name', 'user1')
+ >>> RgwBucket.strip_tenant_from_bucket_name('bucket-name')
'bucket-name'
"""
- bucket_tenant = bucket_name[:bucket_name.find('/')] if bucket_name.find('/') >= 0 else None
- uid_tenant = uid[:uid.find('$')] if uid.find('$') >= 0 else None
- if bucket_tenant and uid_tenant and bucket_tenant == uid_tenant:
- return bucket_name[bucket_name.find('/') + 1:]
+ return bucket_name[bucket_name.find('/') + 1:]
+ @staticmethod
+ def get_s3_bucket_name(bucket_name, tenant=None):
+ # type (str, str) -> str
+ """
+ >>> RgwBucket.get_s3_bucket_name('bucket-name', 'tenant')
+ 'tenant:bucket-name'
+ >>> RgwBucket.get_s3_bucket_name('tenant/bucket-name', 'tenant')
+ 'tenant:bucket-name'
+ >>> RgwBucket.get_s3_bucket_name('bucket-name')
+ 'bucket-name'
+ """
+ bucket_name = RgwBucket.strip_tenant_from_bucket_name(bucket_name)
+ if tenant:
+ bucket_name = '{}:{}'.format(tenant, bucket_name)
return bucket_name
- def list(self):
- return self.proxy('GET', 'bucket')
+ @RESTController.MethodMap(version=APIVersion(1, 1)) # type: ignore
+ def list(self, stats: bool = False, daemon_name: Optional[str] = None,
+ uid: Optional[str] = None) -> List[Union[str, Dict[str, Any]]]:
+ query_params = f'?stats={str_to_bool(stats)}'
+ if uid and uid.strip():
+ query_params = f'{query_params}&uid={uid.strip()}'
+ result = self.proxy(daemon_name, 'GET', 'bucket{}'.format(query_params))
+
+ if stats:
+ result = [self._append_bid(bucket) for bucket in result]
+
+ return result
+
+ def get(self, bucket, daemon_name=None):
+ # type: (str, Optional[str]) -> dict
+ result = self.proxy(daemon_name, 'GET', 'bucket', {'bucket': bucket})
+ bucket_name = RgwBucket.get_s3_bucket_name(result['bucket'],
+ result['tenant'])
+
+ # Append the versioning configuration.
+ versioning = self._get_versioning(result['owner'], daemon_name, bucket_name)
+ result['versioning'] = versioning['Status']
+ result['mfa_delete'] = versioning['MfaDelete']
+
+ # Append the locking configuration.
+ locking = self._get_locking(result['owner'], daemon_name, bucket_name)
+ result.update(locking)
- def get(self, bucket):
- result = self.proxy('GET', 'bucket', {'bucket': bucket})
return self._append_bid(result)
- def create(self, bucket, uid):
+ @allow_empty_body
+ def create(self, bucket, uid, zonegroup=None, placement_target=None,
+ lock_enabled='false', lock_mode=None,
+ lock_retention_period_days=None,
+ lock_retention_period_years=None, daemon_name=None):
+ lock_enabled = str_to_bool(lock_enabled)
try:
- rgw_client = RgwClient.instance(uid)
- return rgw_client.create_bucket(bucket)
- except RequestException as e:
+ rgw_client = RgwClient.instance(uid, daemon_name)
+ result = rgw_client.create_bucket(bucket, zonegroup,
+ placement_target,
+ lock_enabled)
+ if lock_enabled:
+ self._set_locking(uid, daemon_name, bucket, lock_mode,
+ lock_retention_period_days,
+ lock_retention_period_years)
+ return result
+ except RequestException as e: # pragma: no cover - handling is too obvious
raise DashboardException(e, http_status_code=500, component='rgw')
- def set(self, bucket, bucket_id, uid):
- result = self.proxy('PUT', 'bucket', {
- 'bucket': RgwBucket.strip_tenant_from_bucket_name(bucket, uid),
- 'bucket-id': bucket_id,
- 'uid': uid
- }, json_response=False)
+ @allow_empty_body
+ def set(self, bucket, bucket_id, uid, versioning_state=None,
+ mfa_delete=None, mfa_token_serial=None, mfa_token_pin=None,
+ lock_mode=None, lock_retention_period_days=None,
+ lock_retention_period_years=None, daemon_name=None):
+ # When linking a non-tenant-user owned bucket to a tenanted user, we
+ # need to prefix bucket name with '/'. e.g. photos -> /photos
+ if '$' in uid and '/' not in bucket:
+ bucket = '/{}'.format(bucket)
+
+ # Link bucket to new user:
+ result = self.proxy(daemon_name,
+ 'PUT',
+ 'bucket', {
+ 'bucket': bucket,
+ 'bucket-id': bucket_id,
+ 'uid': uid
+ },
+ json_response=False)
+
+ uid_tenant = uid[:uid.find('$')] if uid.find('$') >= 0 else None
+ bucket_name = RgwBucket.get_s3_bucket_name(bucket, uid_tenant)
+
+ locking = self._get_locking(uid, daemon_name, bucket_name)
+ if versioning_state:
+ if versioning_state == 'Suspended' and locking['lock_enabled']:
+ raise DashboardException(msg='Bucket versioning cannot be disabled/suspended '
+ 'on buckets with object lock enabled ',
+ http_status_code=409, component='rgw')
+ self._set_versioning(uid, daemon_name, bucket_name, versioning_state,
+ mfa_delete, mfa_token_serial, mfa_token_pin)
+
+ # Update locking if it is enabled.
+ if locking['lock_enabled']:
+ self._set_locking(uid, daemon_name, bucket_name, lock_mode,
+ lock_retention_period_days,
+ lock_retention_period_years)
+
return self._append_bid(result)
- def delete(self, bucket, purge_objects='true'):
- return self.proxy('DELETE', 'bucket', {
+ def delete(self, bucket, purge_objects='true', daemon_name=None):
+ return self.proxy(daemon_name, 'DELETE', 'bucket', {
'bucket': bucket,
'purge-objects': purge_objects
}, json_response=False)
-@ApiController('/rgw/user', Scope.RGW)
+@APIRouter('/rgw/user', Scope.RGW)
+@APIDoc("RGW User Management API", "RgwUser")
class RgwUser(RgwRESTController):
-
def _append_uid(self, user):
"""
Append the user identifier that looks like [<tenant>$]<user>.
if user['tenant'] else user['user_id']
return user
- def list(self):
- users = []
+ @staticmethod
+ def _keys_allowed():
+ permissions = AuthManager.get_user(JwtManager.get_username()).permissions_dict()
+ edit_permissions = [Permission.CREATE, Permission.UPDATE, Permission.DELETE]
+ return Scope.RGW in permissions and Permission.READ in permissions[Scope.RGW] \
+ and len(set(edit_permissions).intersection(set(permissions[Scope.RGW]))) > 0
+
+ @EndpointDoc("Display RGW Users",
+ responses={200: RGW_USER_SCHEMA})
+ def list(self, daemon_name=None):
+ # type: (Optional[str]) -> List[str]
+ users = [] # type: List[str]
marker = None
while True:
- params = {}
+ params = {} # type: dict
if marker:
params['marker'] = marker
- result = self.proxy('GET', 'user?list', params)
+ result = self.proxy(daemon_name, 'GET', 'user?list', params)
users.extend(result['keys'])
if not result['truncated']:
break
marker = result['marker']
return users
- def get(self, uid):
- result = self.proxy('GET', 'user', {'uid': uid})
+ def get(self, uid, daemon_name=None, stats=True) -> dict:
+ query_params = '?stats' if stats else ''
+ result = self.proxy(daemon_name, 'GET', 'user{}'.format(query_params),
+ {'uid': uid, 'stats': stats})
+ if not self._keys_allowed():
+ del result['keys']
+ del result['swift_keys']
return self._append_uid(result)
@Endpoint()
@ReadPermission
- def get_emails(self):
+ def get_emails(self, daemon_name=None):
+ # type: (Optional[str]) -> List[str]
emails = []
- for uid in json.loads(self.list()):
- user = json.loads(self.get(uid))
+ for uid in json.loads(self.list(daemon_name)): # type: ignore
+ user = json.loads(self.get(uid, daemon_name)) # type: ignore
if user["email"]:
emails.append(user["email"])
return emails
+ @allow_empty_body
def create(self, uid, display_name, email=None, max_buckets=None,
suspended=None, generate_key=None, access_key=None,
- secret_key=None):
+ secret_key=None, daemon_name=None):
params = {'uid': uid}
if display_name is not None:
params['display-name'] = display_name
params['access-key'] = access_key
if secret_key is not None:
params['secret-key'] = secret_key
- result = self.proxy('PUT', 'user', params)
+ result = self.proxy(daemon_name, 'PUT', 'user', params)
return self._append_uid(result)
+ @allow_empty_body
def set(self, uid, display_name=None, email=None, max_buckets=None,
- suspended=None):
+ suspended=None, daemon_name=None):
params = {'uid': uid}
if display_name is not None:
params['display-name'] = display_name
params['max-buckets'] = max_buckets
if suspended is not None:
params['suspended'] = suspended
- result = self.proxy('POST', 'user', params)
+ result = self.proxy(daemon_name, 'POST', 'user', params)
return self._append_uid(result)
- def delete(self, uid):
+ def delete(self, uid, daemon_name=None):
try:
- instance = RgwClient.admin_instance()
+ instance = RgwClient.admin_instance(daemon_name=daemon_name)
# Ensure the user is not configured to access the RGW Object Gateway.
if instance.userid == uid:
raise DashboardException(msg='Unable to delete "{}" - this user '
'account is required for managing the '
'Object Gateway'.format(uid))
# Finally redirect request to the RGW proxy.
- return self.proxy('DELETE', 'user', {'uid': uid}, json_response=False)
- except (DashboardException, RequestException) as e:
+ return self.proxy(daemon_name, 'DELETE', 'user', {'uid': uid}, json_response=False)
+ except (DashboardException, RequestException) as e: # pragma: no cover
raise DashboardException(e, component='rgw')
# pylint: disable=redefined-builtin
@RESTController.Resource(method='POST', path='/capability', status=201)
- def create_cap(self, uid, type, perm):
- return self.proxy('PUT', 'user?caps', {
+ @allow_empty_body
+ def create_cap(self, uid, type, perm, daemon_name=None):
+ return self.proxy(daemon_name, 'PUT', 'user?caps', {
'uid': uid,
'user-caps': '{}={}'.format(type, perm)
})
# pylint: disable=redefined-builtin
@RESTController.Resource(method='DELETE', path='/capability', status=204)
- def delete_cap(self, uid, type, perm):
- return self.proxy('DELETE', 'user?caps', {
+ def delete_cap(self, uid, type, perm, daemon_name=None):
+ return self.proxy(daemon_name, 'DELETE', 'user?caps', {
'uid': uid,
'user-caps': '{}={}'.format(type, perm)
})
@RESTController.Resource(method='POST', path='/key', status=201)
+ @allow_empty_body
def create_key(self, uid, key_type='s3', subuser=None, generate_key='true',
- access_key=None, secret_key=None):
+ access_key=None, secret_key=None, daemon_name=None):
params = {'uid': uid, 'key-type': key_type, 'generate-key': generate_key}
if subuser is not None:
params['subuser'] = subuser
params['access-key'] = access_key
if secret_key is not None:
params['secret-key'] = secret_key
- return self.proxy('PUT', 'user?key', params)
+ return self.proxy(daemon_name, 'PUT', 'user?key', params)
@RESTController.Resource(method='DELETE', path='/key', status=204)
- def delete_key(self, uid, key_type='s3', subuser=None, access_key=None):
+ def delete_key(self, uid, key_type='s3', subuser=None, access_key=None, daemon_name=None):
params = {'uid': uid, 'key-type': key_type}
if subuser is not None:
params['subuser'] = subuser
if access_key is not None:
params['access-key'] = access_key
- return self.proxy('DELETE', 'user?key', params, json_response=False)
+ return self.proxy(daemon_name, 'DELETE', 'user?key', params, json_response=False)
@RESTController.Resource(method='GET', path='/quota')
- def get_quota(self, uid):
- return self.proxy('GET', 'user?quota', {'uid': uid})
+ def get_quota(self, uid, daemon_name=None):
+ return self.proxy(daemon_name, 'GET', 'user?quota', {'uid': uid})
@RESTController.Resource(method='PUT', path='/quota')
- def set_quota(self, uid, quota_type, enabled, max_size_kb, max_objects):
- return self.proxy('PUT', 'user?quota', {
+ @allow_empty_body
+ def set_quota(self, uid, quota_type, enabled, max_size_kb, max_objects, daemon_name=None):
+ return self.proxy(daemon_name, 'PUT', 'user?quota', {
'uid': uid,
'quota-type': quota_type,
'enabled': enabled,
}, json_response=False)
@RESTController.Resource(method='POST', path='/subuser', status=201)
+ @allow_empty_body
def create_subuser(self, uid, subuser, access, key_type='s3',
generate_secret='true', access_key=None,
- secret_key=None):
- return self.proxy('PUT', 'user', {
+ secret_key=None, daemon_name=None):
+ return self.proxy(daemon_name, 'PUT', 'user', {
'uid': uid,
'subuser': subuser,
'key-type': key_type,
})
@RESTController.Resource(method='DELETE', path='/subuser/{subuser}', status=204)
- def delete_subuser(self, uid, subuser, purge_keys='true'):
+ def delete_subuser(self, uid, subuser, purge_keys='true', daemon_name=None):
"""
:param purge_keys: Set to False to do not purge the keys.
Note, this only works for s3 subusers.
"""
- return self.proxy('DELETE', 'user', {
+ return self.proxy(daemon_name, 'DELETE', 'user', {
'uid': uid,
'subuser': subuser,
'purge-keys': purge_keys