# -*- coding: utf-8 -*-
-from __future__ import absolute_import
import json
import logging
from ..services.ceph_service import CephService
from ..services.rgw_client import NoRgwDaemonsException, RgwClient
from ..tools import json_str_to_object, str_to_bool
-from . import ApiController, BaseController, ControllerDoc, Endpoint, \
- EndpointDoc, ReadPermission, RESTController, allow_empty_body
+from . import APIDoc, APIRouter, BaseController, Endpoint, EndpointDoc, \
+ ReadPermission, RESTController, UIRouter, allow_empty_body
+from ._version import APIVersion
try:
- from typing import Any, List, Optional
+ from typing import Any, Dict, List, Optional, Union
except ImportError: # pragma: no cover
pass # Just for type checking
}
-@ApiController('/rgw', Scope.RGW)
-@ControllerDoc("RGW Management API", "Rgw")
+@UIRouter('/rgw', Scope.RGW)
+@APIDoc("RGW Management API", "Rgw")
class Rgw(BaseController):
@Endpoint()
@ReadPermission
return status
-@ApiController('/rgw/daemon', Scope.RGW)
-@ControllerDoc("RGW Daemon Management API", "RgwDaemon")
+@APIRouter('/rgw/daemon', Scope.RGW)
+@APIDoc("RGW Daemon Management API", "RgwDaemon")
class RgwDaemon(RESTController):
@EndpointDoc("Display RGW Daemons",
responses={200: [RGW_DAEMON_SCHEMA]})
# extract per-daemon service data and health
daemon = {
'id': metadata['id'],
+ 'service_map_id': service['id'],
'version': metadata['ceph_version'],
'server_hostname': hostname,
+ 'realm_name': metadata['realm_name'],
'zonegroup_name': metadata['zonegroup_name'],
'zone_name': metadata['zone_name'],
'default': instance.daemon.name == metadata['id']
raise DashboardException(e, http_status_code=http_status_code, component='rgw')
-@ApiController('/rgw/site', Scope.RGW)
-@ControllerDoc("RGW Site Management API", "RgwSite")
+@APIRouter('/rgw/site', Scope.RGW)
+@APIDoc("RGW Site Management API", "RgwSite")
class RgwSite(RgwRESTController):
def list(self, query=None, daemon_name=None):
if query == 'placement-targets':
return RgwClient.admin_instance(daemon_name=daemon_name).get_placement_targets()
if query == 'realms':
return RgwClient.admin_instance(daemon_name=daemon_name).get_realms()
+ if query == 'default-realm':
+ return RgwClient.admin_instance(daemon_name=daemon_name).get_default_realm()
# @TODO: for multisite: by default, retrieve cluster topology/map.
raise DashboardException(http_status_code=501, component='rgw', msg='Not Implemented')
-@ApiController('/rgw/bucket', Scope.RGW)
-@ControllerDoc("RGW Bucket Management API", "RgwBucket")
+@APIRouter('/rgw/bucket', Scope.RGW)
+@APIDoc("RGW Bucket Management API", "RgwBucket")
class RgwBucket(RgwRESTController):
def _append_bid(self, bucket):
"""
retention_period_days, retention_period_years):
rgw_client = RgwClient.instance(owner, daemon_name)
return rgw_client.set_bucket_locking(bucket_name, mode,
- int(retention_period_days),
- int(retention_period_years))
+ retention_period_days,
+ retention_period_years)
@staticmethod
def strip_tenant_from_bucket_name(bucket_name):
bucket_name = '{}:{}'.format(tenant, bucket_name)
return bucket_name
- def list(self, stats=False, daemon_name=None):
- # type: (bool, Optional[str]) -> List[Any]
- query_params = '?stats' if stats else ''
+ @RESTController.MethodMap(version=APIVersion(1, 1)) # type: ignore
+ def list(self, stats: bool = False, daemon_name: Optional[str] = None,
+ uid: Optional[str] = None) -> List[Union[str, Dict[str, Any]]]:
+ query_params = f'?stats={str_to_bool(stats)}'
+ if uid and uid.strip():
+ query_params = f'{query_params}&uid={uid.strip()}'
result = self.proxy(daemon_name, 'GET', 'bucket{}'.format(query_params))
if stats:
uid_tenant = uid[:uid.find('$')] if uid.find('$') >= 0 else None
bucket_name = RgwBucket.get_s3_bucket_name(bucket, uid_tenant)
+ locking = self._get_locking(uid, daemon_name, bucket_name)
if versioning_state:
+ if versioning_state == 'Suspended' and locking['lock_enabled']:
+ raise DashboardException(msg='Bucket versioning cannot be disabled/suspended '
+ 'on buckets with object lock enabled ',
+ http_status_code=409, component='rgw')
self._set_versioning(uid, daemon_name, bucket_name, versioning_state,
mfa_delete, mfa_token_serial, mfa_token_pin)
# Update locking if it is enabled.
- locking = self._get_locking(uid, daemon_name, bucket_name)
if locking['lock_enabled']:
self._set_locking(uid, daemon_name, bucket_name, lock_mode,
lock_retention_period_days,
}, json_response=False)
-@ApiController('/rgw/user', Scope.RGW)
-@ControllerDoc("RGW User Management API", "RgwUser")
+@APIRouter('/rgw/user', Scope.RGW)
+@APIDoc("RGW User Management API", "RgwUser")
class RgwUser(RgwRESTController):
def _append_uid(self, user):
"""