from ..settings import Settings, Options
from ..rest_client import RestClient, RequestException
from ..tools import build_url, dict_contains_path, json_str_to_object,\
- partial_dict, dict_get
+ dict_get
from .. import mgr
try:
- from typing import Dict, List, Optional # pylint: disable=unused-import
+ from typing import Any, Dict, List, Optional # pylint: disable=unused-import
except ImportError:
pass # For typing only
'the dashboard.')
-def _determine_rgw_addr():
+def _get_daemon_info() -> Dict[str, Any]:
"""
- Get a RGW daemon to determine the configured host (IP address) and port.
+ Retrieve RGW daemon info from MGR.
Note, the service id of the RGW daemons may differ depending on the setup.
Example 1:
{
if daemon is None:
raise LookupError('No RGW daemon found')
+ return daemon
+
+
+def _determine_rgw_addr():
+ """
+ Parse RGW daemon info to determine the configured host (IP address) and port.
+ """
+ daemon = _get_daemon_info()
+
addr = _parse_addr(daemon['addr'])
port, ssl = _parse_frontend_config(daemon['metadata']['frontend_config#0'])
def _get_daemon_zone_info(self): # type: () -> dict
return json_str_to_object(self.proxy('GET', 'config?type=zone', None, None))
- def _get_daemon_zonegroup_map(self): # type: () -> List[dict]
- zonegroups = json_str_to_object(
- self.proxy('GET', 'config?type=zonegroup-map', None, None)
- )
-
- return [partial_dict(
- zonegroup['val'],
- ['api_name', 'zones']
- ) for zonegroup in zonegroups['zonegroups']]
-
def _get_realms_info(self): # type: () -> dict
return json_str_to_object(self.proxy('GET', 'realm?list', None, None))
# If user ID is not set, then try to get it via the RGW Admin Ops API.
self.userid = userid if userid else self._get_user_id(self.admin_path) # type: str
+ self._zonegroup_name: str = _get_daemon_info()['metadata']['zonegroup_name']
+
logger.info("Created new connection: user=%s, host=%s, port=%s, ssl=%d, sslverify=%d",
self.userid, host, port, ssl, ssl_verify)
def get_placement_targets(self): # type: () -> dict
zone = self._get_daemon_zone_info()
- # A zone without realm id can only belong to default zonegroup.
- zonegroup_name = 'default'
- if zone['realm_id']:
- zonegroup_map = self._get_daemon_zonegroup_map()
- for zonegroup in zonegroup_map:
- for realm_zone in zonegroup['zones']:
- if realm_zone['id'] == zone['id']:
- zonegroup_name = zonegroup['api_name']
- break
-
placement_targets = [] # type: List[Dict]
for placement_pool in zone['placement_pools']:
placement_targets.append(
}
)
- return {'zonegroup': zonegroup_name, 'placement_targets': placement_targets}
+ return {'zonegroup': self._zonegroup_name, 'placement_targets': placement_targets}
def get_realms(self): # type: () -> List
realms_info = self._get_realms_info()