# COPYING file in the root directory) and Apache 2.0 License
# (found in the LICENSE.Apache file in the root directory).
+import math
from abc import abstractmethod
-from advisor.db_log_parser import DataSource
from enum import Enum
-import math
+from typing import Dict
+
+from advisor.db_log_parser import DataSource
-NO_ENTITY = 'ENTITY_PLACEHOLDER'
+NO_ENTITY = "ENTITY_PLACEHOLDER"
class TimeSeriesData(DataSource):
pass
def fetch_burst_epochs(
- self, entities, statistic, window_sec, threshold, percent
- ):
- # type: (str, int, float, bool) -> Dict[str, Dict[int, float]]
+ self,
+ entities: str,
+ statistic: int,
+ window_sec: float,
+ threshold: bool,
+ percent: bool,
+ ) -> Dict[str, Dict[int, float]]:
# this method calculates the (percent) rate change in the 'statistic'
# for each entity (over 'window_sec' seconds) and returns the epochs
# where this rate change is greater than or equal to the 'threshold'
return burst_epochs
def fetch_aggregated_values(self, entity, statistics, aggregation_op):
- # type: (str, AggregationOperator) -> Dict[str, float]
# this method performs the aggregation specified by 'aggregation_op'
# on the timeseries of 'statistics' for 'entity' and returns:
# Dict[statistic, aggregated_value]
complete_keys[0], # there should be only one key
cond.window_sec,
cond.rate_threshold,
- True
+ True,
)
# Trigger in this case is:
# Dict[entity_name, Dict[timestamp, rate_change]]
cond.set_trigger(result)
elif cond.behavior is self.Behavior.evaluate_expression:
self.handle_evaluate_expression(
- cond,
- complete_keys,
- entities_with_stats
+ cond, complete_keys, entities_with_stats
)
def handle_evaluate_expression(self, condition, statistics, entities):
trigger = {}
# check 'condition' for each of these entities
for entity in entities:
- if hasattr(condition, 'aggregation_op'):
+ if hasattr(condition, "aggregation_op"):
# in this case, the aggregation operation is performed on each
# of the condition's 'keys' and then with aggregated values
# condition's 'expression' is evaluated; if it evaluates to
# True, then list of the keys values is added to the
# condition's trigger: Dict[entity_name, List[stats]]
result = self.fetch_aggregated_values(
- entity, statistics, condition.aggregation_op
+ entity, statistics, condition.aggregation_op
)
keys = [result[key] for key in statistics]
try:
if eval(condition.expression):
trigger[entity] = keys
except Exception as e:
- print(
- 'WARNING(TimeSeriesData) check_and_trigger: ' + str(e)
- )
+ print("WARNING(TimeSeriesData) check_and_trigger: " + str(e))
else:
# assumption: all stats have same series of timestamps
# this is similar to the above but 'expression' is evaluated at
# 'expression' evaluated to true; so trigger is:
# Dict[entity, Dict[timestamp, List[stats]]]
for epoch in self.keys_ts[entity][statistics[0]].keys():
- keys = [
- self.keys_ts[entity][key][epoch]
- for key in statistics
- ]
+ keys = [self.keys_ts[entity][key][epoch] for key in statistics]
try:
if eval(condition.expression):
if entity not in trigger:
trigger[entity] = {}
trigger[entity][epoch] = keys
except Exception as e:
- print(
- 'WARNING(TimeSeriesData) check_and_trigger: ' +
- str(e)
- )
+ print("WARNING(TimeSeriesData) check_and_trigger: " + str(e))
if trigger:
condition.set_trigger(trigger)