from ..exceptions import DashboardException
try:
- from typing import Dict # pylint: disable=unused-import
+ from typing import Dict, Any, Union # pylint: disable=unused-import
except ImportError:
pass # For typing only
@classmethod
def get_pool_name_from_id(cls, pool_id):
+ # type: (int) -> Union[str, None]
+ pool = cls.get_pool_by_attribute('pool', pool_id)
+ return pool['pool_name'] if pool is not None else None
+
+ @classmethod
+ def get_pool_by_attribute(cls, attribute, value):
+ # type: (str, Any) -> Union[dict, None]
pool_list = cls.get_pool_list()
for pool in pool_list:
- if pool['pool'] == pool_id:
- return pool['pool_name']
+ if attribute in pool and pool[attribute] == value:
+ return pool
return None
+ @classmethod
+ def get_pool_pg_status(cls, pool_name):
+ # type: (str) -> dict
+ pool = cls.get_pool_by_attribute('pool_name', pool_name)
+ if pool is None:
+ return {}
+ return mgr.get("pg_summary")['by_pool'][pool['pool'].__str__()]
+
@classmethod
def send_command(cls, srv_type, prefix, srv_spec='', **kwargs):
"""