+++ /dev/null
-from __future__ import absolute_import\r
-\r
-import time\r
-from threading import Event, Thread\r
-\r
-from .agent.predictor import PredictAgent\r
-from .agent.metrics.ceph_cluster import CephClusterAgent\r
-from .agent.metrics.ceph_mon_osd import CephMonOsdAgent\r
-from .agent.metrics.ceph_pool import CephPoolAgent\r
-from .agent.metrics.db_relay import DBRelayAgent\r
-from .agent.metrics.sai_agent import SAIAgent\r
-from .agent.metrics.sai_cluster import SAICluserAgent\r
-from .agent.metrics.sai_disk import SAIDiskAgent\r
-from .agent.metrics.sai_disk_smart import SAIDiskSmartAgent\r
-from .agent.metrics.sai_host import SAIHostAgent\r
-from .common import DP_MGR_STAT_FAILED, DP_MGR_STAT_OK, DP_MGR_STAT_WARNING\r
-\r
-\r
-class AgentRunner(Thread):\r
-\r
- task_name = ''\r
- interval_key = ''\r
- agents = []\r
-\r
- def __init__(self, mgr_module, agent_timeout=60, call_back=None):\r
- """\r
-\r
- :param mgr_module: parent ceph mgr module\r
- :param agent_timeout: (unit seconds) agent execute timeout value, default: 60 secs\r
- """\r
- Thread.__init__(self)\r
- self._agent_timeout = agent_timeout\r
- self._module_inst = mgr_module\r
- self._log = mgr_module.log\r
- self._start_time = time.time()\r
- self._th = None\r
- self._call_back = call_back\r
- self.exit = False\r
- self.event = Event()\r
- self.task_interval = \\r
- int(self._module_inst.get_configuration(self.interval_key))\r
-\r
- def terminate(self):\r
- self.exit = True\r
- self.event.set()\r
- self._log.info('PDS terminate %s complete' % self.task_name)\r
-\r
- def run(self):\r
- self._start_time = time.time()\r
- self._log.info(\r
- 'start %s, interval: %s'\r
- % (self.task_name, self.task_interval))\r
- while not self.exit:\r
- self.run_agents()\r
- if self._call_back:\r
- self._call_back()\r
- if self.event:\r
- self.event.wait(int(self.task_interval))\r
- self.event.clear()\r
- self._log.info(\r
- 'completed %s(%s)' % (self.task_name, time.time()-self._start_time))\r
-\r
- def run_agents(self):\r
- obj_sender = None\r
- try:\r
- self._log.debug('run_agents %s' % self.task_name)\r
- from .common.grpcclient import GRPcClient, gen_configuration\r
- conf = gen_configuration(\r
- host=self._module_inst.get_configuration('diskprediction_server'),\r
- user=self._module_inst.get_configuration('diskprediction_user'),\r
- password=self._module_inst.get_configuration(\r
- 'diskprediction_password'),\r
- port=self._module_inst.get_configuration('diskprediction_port'),\r
- cert_context=self._module_inst.get_configuration('diskprediction_cert_context'),\r
- mgr_inst=self._module_inst,\r
- ssl_target_name=self._module_inst.get_configuration('diskprediction_ssl_target_name_override'),\r
- default_authority=self._module_inst.get_configuration('diskprediction_default_authority'))\r
- obj_sender = GRPcClient(conf)\r
- if not obj_sender:\r
- self._log.error('invalid diskprediction sender')\r
- self._module_inst.status = \\r
- {'status': DP_MGR_STAT_FAILED,\r
- 'reason': 'invalid diskprediction sender'}\r
- raise Exception('invalid diskprediction sender')\r
- if obj_sender.test_connection():\r
- self._module_inst.status = {'status': DP_MGR_STAT_OK}\r
- self._log.debug('succeed to test connection')\r
- self._run(self._module_inst, obj_sender)\r
- else:\r
- self._log.error('failed to test connection')\r
- self._module_inst.status = \\r
- {'status': DP_MGR_STAT_FAILED,\r
- 'reason': 'failed to test connection'}\r
- raise Exception('failed to test connection')\r
- except Exception as e:\r
- self._module_inst.status = \\r
- {'status': DP_MGR_STAT_FAILED,\r
- 'reason': 'failed to start %s agents, %s'\r
- % (self.task_name, str(e))}\r
- self._log.error(\r
- 'failed to start %s agents, %s' % (self.task_name, str(e)))\r
- raise\r
- finally:\r
- if obj_sender:\r
- obj_sender.close()\r
-\r
- def is_timeout(self):\r
- now = time.time()\r
- if (now - self._start_time) > self._agent_timeout:\r
- return True\r
- else:\r
- return False\r
-\r
- def _run(self, module_inst, sender):\r
- self._log.debug('%s run' % self.task_name)\r
- for agent in self.agents:\r
- self._start_time = time.time()\r
- retry_count = 3\r
- while retry_count:\r
- retry_count -= 1\r
- try:\r
- obj_agent = agent(module_inst, sender, self._agent_timeout)\r
- obj_agent.run()\r
- del obj_agent\r
- break\r
- except Exception as e:\r
- if str(e).find('configuring') >= 0:\r
- self._log.debug(\r
- 'failed to execute {}, {}, retry again.'.format(\r
- agent.measurement, str(e)))\r
- time.sleep(1)\r
- continue\r
- else:\r
- module_inst.status = \\r
- {'status': DP_MGR_STAT_WARNING,\r
- 'reason': 'failed to execute {}, {}'.format(\r
- agent.measurement, ';'.join(str(e).split('\n\t')))}\r
- self._log.warning(\r
- 'failed to execute {}, {}'.format(\r
- agent.measurement, ';'.join(str(e).split('\n\t'))))\r
- break\r
-\r
-\r
-class MetricsRunner(AgentRunner):\r
-\r
- task_name = 'Metrics Agent'\r
- interval_key = 'diskprediction_upload_metrics_interval'\r
- agents = [CephClusterAgent, CephMonOsdAgent, CephPoolAgent,\r
- SAICluserAgent, SAIDiskAgent, SAIHostAgent, DBRelayAgent,\r
- SAIAgent]\r
-\r
-\r
-class PredictRunner(AgentRunner):\r
-\r
- task_name = 'Predictor Agent'\r
- interval_key = 'diskprediction_retrieve_prediction_interval'\r
- agents = [PredictAgent]\r
-\r
-\r
-class SmartRunner(AgentRunner):\r
-\r
- task_name = 'Smart data Agent'\r
- interval_key = 'diskprediction_upload_smart_interval'\r
- agents = [SAIDiskSmartAgent]\r
-\r
-\r
-class TestRunner(object):\r
- task_name = 'Test Agent'\r
- interval_key = 'diskprediction_upload_metrics_interval'\r
- agents = [CephClusterAgent, CephMonOsdAgent, CephPoolAgent,\r
- SAICluserAgent, SAIDiskAgent, SAIHostAgent, DBRelayAgent,\r
- SAIAgent, SAIDiskSmartAgent]\r
-\r
- def __init__(self, mgr_module):\r
- self._module_inst = mgr_module\r
-\r
- def run(self):\r
- for agent in self.agents:\r
- obj_agent = agent(self._module_inst, None)\r
- obj_agent.run()\r
- del obj_agent\r