1 """Machine learning model for disk failure prediction.
3 This classes defined here provide the disk failure prediction module.
4 RHDiskFailurePredictor uses the models developed at the AICoE in the
5 Office of the CTO at Red Hat. These models were built using the open
6 source Backblaze SMART metrics dataset.
7 PSDiskFailurePredictor uses the models developed by ProphetStor as an
10 An instance of the predictor is initialized by providing the path to trained
11 models. Then, to predict hard drive health and deduce time to failure, the
12 predict function is called with 6 days worth of SMART data from the hard drive.
13 It will return a string to indicate disk failure status: "Good", "Warning",
16 An example code is as follows:
18 >>> model = RHDiskFailurePredictor()
19 >>> model.initialize(get_diskfailurepredictor_path() + "/models/redhat")
20 >>> vendor = list(RHDiskFailurePredictor.MANUFACTURER_MODELNAME_PREFIXES.keys())[0]
21 >>> disk_days = [{'vendor': vendor}]
22 >>> model.predict(disk_days)
29 from typing
import Any
, Dict
, List
, Optional
, Sequence
, Tuple
34 def get_diskfailurepredictor_path() -> str:
35 path
= os
.path
.abspath(__file__
)
36 dir_path
= os
.path
.dirname(path
)
40 DevSmartT
= Dict
[str, Any
]
41 AttrNamesT
= List
[str]
42 AttrDiffsT
= List
[Dict
[str, int]]
47 def create(cls
, name
: str) -> Optional
['Predictor']:
48 if name
== 'prophetstor':
49 return PSDiskFailurePredictor()
50 elif name
== 'redhat':
51 return RHDiskFailurePredictor()
55 def initialize(self
, model_dir
: str) -> None:
56 raise NotImplementedError()
58 def predict(self
, dataset
: Sequence
[DevSmartT
]) -> str:
59 raise NotImplementedError()
62 class RHDiskFailurePredictor(Predictor
):
63 """Disk failure prediction module developed at Red Hat
65 This class implements a disk failure prediction module.
68 # json with manufacturer names as keys
69 # and features used for prediction as values
70 CONFIG_FILE
= "config.json"
71 PREDICTION_CLASSES
= {-1: "Unknown", 0: "Good", 1: "Warning", 2: "Bad"}
73 # model name prefixes to identify vendor
74 MANUFACTURER_MODELNAME_PREFIXES
= {
76 "Toshiba": "Toshiba", # for cases like "Toshiba xxx"
77 "TOSHIBA": "Toshiba", # for cases like "TOSHIBA xxx"
78 "toshiba": "Toshiba", # for cases like "toshiba xxx"
79 "S": "Seagate", # for cases like "STxxxx" and "Seagate BarraCuda ZAxxx"
80 "ZA": "Seagate", # for cases like "ZAxxxx"
85 LOGGER
= logging
.getLogger()
87 def __init__(self
) -> None:
89 This function may throw exception due to wrong file operation.
91 self
.model_dirpath
= ""
92 self
.model_context
: Dict
[str, List
[str]] = {}
94 def initialize(self
, model_dirpath
: str) -> None:
95 """Initialize all models. Save paths of all trained model files to list
98 model_dirpath {str} -- path to directory of trained models
101 str -- Error message. If all goes well, return None
103 # read config file as json, if it exists
104 config_path
= os
.path
.join(model_dirpath
, self
.CONFIG_FILE
)
105 if not os
.path
.isfile(config_path
):
106 raise Exception("Missing config file: " + config_path
)
107 with
open(config_path
) as f_conf
:
108 self
.model_context
= json
.load(f_conf
)
110 # ensure all manufacturers whose context is defined in config file
111 # have models and scalers saved inside model_dirpath
112 for manufacturer
in self
.model_context
:
113 scaler_path
= os
.path
.join(model_dirpath
, manufacturer
+ "_scaler.pkl")
114 if not os
.path
.isfile(scaler_path
):
115 raise Exception(f
"Missing scaler file: {scaler_path}")
116 model_path
= os
.path
.join(model_dirpath
, manufacturer
+ "_predictor.pkl")
117 if not os
.path
.isfile(model_path
):
118 raise Exception(f
"Missing model file: {model_path}")
120 self
.model_dirpath
= model_dirpath
122 def __preprocess(self
, disk_days
: Sequence
[DevSmartT
], manufacturer
: str) -> Optional
[np
.ndarray
]:
123 """Scales and transforms input dataframe to feed it to prediction model
126 disk_days {list} -- list in which each element is a dictionary with key,val
127 as feature name,value respectively.
128 e.g.[{'smart_1_raw': 0, 'user_capacity': 512 ...}, ...]
129 manufacturer {str} -- manufacturer of the hard drive
132 numpy.ndarray -- (n, d) shaped array of n days worth of data and d
135 # get the attributes that were used to train model for current manufacturer
137 model_smart_attr
= self
.model_context
[manufacturer
]
139 RHDiskFailurePredictor
.LOGGER
.debug(
140 "No context (SMART attributes on which model has been trained) found for manufacturer: {}".format(
146 # convert to structured array, keeping only the required features
147 # assumes all data is in float64 dtype
149 struc_dtypes
= [(attr
, np
.float64
) for attr
in model_smart_attr
]
150 values
= [tuple(day
[attr
] for attr
in model_smart_attr
) for day
in disk_days
]
151 disk_days_sa
= np
.array(values
, dtype
=struc_dtypes
)
153 RHDiskFailurePredictor
.LOGGER
.debug(
154 "Mismatch in SMART attributes used to train model and SMART attributes available"
158 # view structured array as 2d array for applying rolling window transforms
159 # do not include capacity_bytes in this. only use smart_attrs
160 disk_days_attrs
= disk_days_sa
[[attr
for attr
in model_smart_attr
if 'smart_' in attr
]]\
161 .view(np
.float64
).reshape(disk_days_sa
.shape
+ (-1,))
163 # featurize n (6 to 12) days data - mean,std,coefficient of variation
164 # current model is trained on 6 days of data because that is what will be
165 # available at runtime
167 # rolling time window interval size in days
170 # rolling means generator
171 dataset_size
= disk_days_attrs
.shape
[0] - roll_window_size
+ 1
172 gen
= (disk_days_attrs
[i
: i
+ roll_window_size
, ...].mean(axis
=0)
173 for i
in range(dataset_size
))
174 means
= np
.vstack(gen
) # type: ignore
176 # rolling stds generator
177 gen
= (disk_days_attrs
[i
: i
+ roll_window_size
, ...].std(axis
=0, ddof
=1)
178 for i
in range(dataset_size
))
179 stds
= np
.vstack(gen
) # type: ignore
181 # coefficient of variation
183 cvs
[np
.isnan(cvs
)] = 0
184 featurized
= np
.hstack((means
,
187 disk_days_sa
['user_capacity'][: dataset_size
].reshape(-1, 1)))
190 scaler_path
= os
.path
.join(self
.model_dirpath
, manufacturer
+ "_scaler.pkl")
191 with
open(scaler_path
, 'rb') as f
:
192 scaler
= pickle
.load(f
)
193 featurized
= scaler
.transform(featurized
)
197 def __get_manufacturer(model_name
: str) -> Optional
[str]:
198 """Returns the manufacturer name for a given hard drive model name
201 model_name {str} -- hard drive model name
204 str -- manufacturer name
206 for prefix
, manufacturer
in RHDiskFailurePredictor
.MANUFACTURER_MODELNAME_PREFIXES
.items():
207 if model_name
.startswith(prefix
):
208 return manufacturer
.lower()
209 # print error message
210 RHDiskFailurePredictor
.LOGGER
.debug(
211 f
"Could not infer manufacturer from model name {model_name}")
214 def predict(self
, disk_days
: Sequence
[DevSmartT
]) -> str:
215 # get manufacturer preferably as a smartctl attribute
216 # if not available then infer using model name
217 manufacturer
= disk_days
[0].get("vendor")
218 if manufacturer
is None:
219 RHDiskFailurePredictor
.LOGGER
.debug(
220 '"vendor" field not found in smartctl output. Will try to infer manufacturer from model name.'
222 manufacturer
= RHDiskFailurePredictor
.__get
_manufacturer
(
223 disk_days
[0].get("model_name", ""))
225 # print error message, return Unknown, and continue execution
226 if manufacturer
is None:
227 RHDiskFailurePredictor
.LOGGER
.debug(
228 "Manufacturer could not be determiend. This may be because \
229 DiskPredictor has never encountered this manufacturer before, \
230 or the model name is not according to the manufacturer's \
231 naming conventions known to DiskPredictor"
233 return RHDiskFailurePredictor
.PREDICTION_CLASSES
[-1]
235 # preprocess for feeding to model
236 preprocessed_data
= self
.__preprocess
(disk_days
, manufacturer
)
237 if preprocessed_data
is None:
238 return RHDiskFailurePredictor
.PREDICTION_CLASSES
[-1]
240 # get model for current manufacturer
241 model_path
= os
.path
.join(
242 self
.model_dirpath
, manufacturer
+ "_predictor.pkl"
244 with
open(model_path
, 'rb') as f
:
245 model
= pickle
.load(f
)
247 # use prediction for most recent day
248 # TODO: ensure that most recent day is last element and most previous day
249 # is first element in input disk_days
250 pred_class_id
= model
.predict(preprocessed_data
)[-1]
251 return RHDiskFailurePredictor
.PREDICTION_CLASSES
[pred_class_id
]
254 class PSDiskFailurePredictor(Predictor
):
255 """Disk failure prediction developed at ProphetStor
257 This class implements a disk failure prediction module.
260 CONFIG_FILE
= "config.json"
261 EXCLUDED_ATTRS
= ["smart_9_raw", "smart_241_raw", "smart_242_raw"]
263 def __init__(self
) -> None:
265 This function may throw exception due to wrong file operation.
268 self
.model_dirpath
= ""
269 self
.model_context
: Dict
[str, List
[str]] = {}
271 def initialize(self
, model_dirpath
: str) -> None:
273 Initialize all models.
278 Error message. If all goes well, return an empty string.
283 config_path
= os
.path
.join(model_dirpath
, self
.CONFIG_FILE
)
284 if not os
.path
.isfile(config_path
):
285 raise Exception(f
"Missing config file: {config_path}")
286 with
open(config_path
) as f_conf
:
287 self
.model_context
= json
.load(f_conf
)
289 for model_name
in self
.model_context
:
290 model_path
= os
.path
.join(model_dirpath
, model_name
)
292 if not os
.path
.isfile(model_path
):
293 raise Exception(f
"Missing model file: {model_path}")
295 self
.model_dirpath
= model_dirpath
297 def __preprocess(self
, disk_days
: Sequence
[DevSmartT
]) -> Sequence
[DevSmartT
]:
299 Preprocess disk attributes.
302 disk_days: Refer to function predict(...).
305 new_disk_days: Processed disk days.
311 attr_list
= set.intersection(*[set(disk_day
.keys()) for disk_day
in disk_days
])
312 for attr
in attr_list
:
314 attr
.startswith("smart_") and attr
.endswith("_raw")
315 ) and attr
not in self
.EXCLUDED_ATTRS
:
316 req_attrs
.append(attr
)
318 for disk_day
in disk_days
:
320 for attr
in req_attrs
:
321 if float(disk_day
[attr
]) >= 0.0:
322 new_disk_day
[attr
] = disk_day
[attr
]
324 new_disk_days
.append(new_disk_day
)
329 def __get_diff_attrs(disk_days
: Sequence
[DevSmartT
]) -> Tuple
[AttrNamesT
, AttrDiffsT
]:
331 Get 5 days differential attributes.
334 disk_days: Refer to function predict(...).
337 attr_list: All S.M.A.R.T. attributes used in given disk. Here we
338 use intersection set of all disk days.
340 diff_disk_days: A list struct comprises 5 dictionaries, each
341 dictionary contains differential attributes.
344 Exceptions of wrong list/dict operations.
347 all_attrs
= [set(disk_day
.keys()) for disk_day
in disk_days
]
348 attr_list
= list(set.intersection(*all_attrs
))
349 prev_days
= disk_days
[:-1]
350 curr_days
= disk_days
[1:]
352 # TODO: ensure that this ordering is correct
353 for prev
, cur
in zip(prev_days
, curr_days
):
354 diff_disk_days
.append(
355 {attr
: (int(cur
[attr
]) - int(prev
[attr
])) for attr
in attr_list
}
358 return attr_list
, diff_disk_days
360 def __get_best_models(self
, attr_list
: AttrNamesT
) -> Optional
[Dict
[str, List
[str]]]:
362 Find the best model from model list according to given attribute list.
365 attr_list: All S.M.A.R.T. attributes used in given disk.
368 modelpath: The best model for the given attribute list.
369 model_attrlist: 'Ordered' attribute list of the returned model.
370 Must be aware that SMART attributes is in order.
375 models
= self
.model_context
.keys()
378 for model_name
in models
:
380 sum(attr
in attr_list
for attr
in self
.model_context
[model_name
])
382 max_score
= max(scores
)
384 # Skip if too few matched attributes.
386 print("Too few matched attributes")
389 best_models
: Dict
[str, List
[str]] = {}
390 best_model_indices
= [
391 idx
for idx
, score
in enumerate(scores
) if score
> max_score
- 2
393 for model_idx
in best_model_indices
:
394 model_name
= list(models
)[model_idx
]
395 model_path
= os
.path
.join(self
.model_dirpath
, model_name
)
396 model_attrlist
= self
.model_context
[model_name
]
397 best_models
[model_path
] = model_attrlist
400 # return os.path.join(self.model_dirpath, model_name), model_attrlist
403 def __get_ordered_attrs(disk_days
: Sequence
[DevSmartT
], model_attrlist
: List
[str]) -> List
[List
[float]]:
405 Return ordered attributes of given disk days.
408 disk_days: Unordered disk days.
409 model_attrlist: Model's ordered attribute list.
412 ordered_attrs: Ordered disk days.
419 for one_day
in disk_days
:
422 for attr
in model_attrlist
:
424 one_day_attrs
.append(one_day
[attr
])
426 one_day_attrs
.append(0)
428 ordered_attrs
.append(one_day_attrs
)
432 def predict(self
, disk_days
: Sequence
[DevSmartT
]) -> str:
434 Predict using given 6-days disk S.M.A.R.T. attributes.
437 disk_days: A list struct comprises 6 dictionaries. These
438 dictionaries store 'consecutive' days of disk SMART
441 A string indicates prediction result. One of following four strings
442 will be returned according to disk failure status:
443 (1) Good : Disk is health
444 (2) Warning : Disk has some symptoms but may not fail immediately
445 (3) Bad : Disk is in danger and data backup is highly recommended
446 (4) Unknown : Not enough data for prediction.
454 proc_disk_days
= self
.__preprocess
(disk_days
)
455 attr_list
, diff_data
= PSDiskFailurePredictor
.__get
_diff
_attrs
(proc_disk_days
)
456 modellist
= self
.__get
_best
_models
(attr_list
)
457 if modellist
is None:
460 for modelpath
in modellist
:
461 model_attrlist
= modellist
[modelpath
]
462 ordered_data
= PSDiskFailurePredictor
.__get
_ordered
_attrs
(
463 diff_data
, model_attrlist
467 with
open(modelpath
, "rb") as f_model
:
468 clf
= pickle
.load(f_model
)
470 except UnicodeDecodeError:
471 # Compatibility for python3
472 with
open(modelpath
, "rb") as f_model
:
473 clf
= pickle
.load(f_model
, encoding
="latin1")
475 pred
= clf
.predict(ordered_data
)
477 all_pred
.append(1 if any(pred
) else 0)
479 score
= 2 ** sum(all_pred
) - len(modellist
)