+from mgr_util import get_most_recent_rate
+
from dashboard.services.ceph_service import CephService
from .. import mgr
+try:
+ from typing import Dict
+except ImportError:
+ pass # Just for type checking
+
+
SERVICE_TYPE = 'tcmu-runner'
# pylint: disable=too-many-nested-blocks
@staticmethod
def get_iscsi_info():
- daemons = {}
- images = {}
+ daemons = {} # type: Dict[str, dict]
+ images = {} # type: Dict[str, dict]
for service in CephService.get_service_list(SERVICE_TYPE):
metadata = service['metadata']
if metadata is None:
image['stats_history'] = {}
for s in ['rd', 'wr', 'rd_bytes', 'wr_bytes']:
perf_key = "{}{}".format(perf_key_prefix, s)
- image['stats'][s] = CephService.get_rate(
- 'tcmu-runner', service_id, perf_key)
- image['stats_history'][s] = CephService.get_rates(
- 'tcmu-runner', service_id, perf_key)
+ rates = CephService.get_rates('tcmu-runner', service_id, perf_key)
+ image['stats'][s] = get_most_recent_rate(rates)
+ image['stats_history'][s] = rates
else:
daemon['non_optimized_paths'] += 1
image['non_optimized_paths'].append(hostname)