]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/diskprediction_cloud/module.py
2 diskprediction with cloud predictor
4 from __future__
import absolute_import
7 from datetime
import datetime
10 from mgr_module
import MgrModule
12 from threading
import Event
15 from string
import maketrans
17 maketrans
= str.maketrans
19 from .common
import DP_MGR_STAT_ENABLED
, DP_MGR_STAT_DISABLED
20 from .task
import MetricsRunner
, SmartRunner
, PredictRunner
, TestRunner
23 TIME_WEEK
= TIME_DAYS
* 7
24 DP_AGENTS
= [MetricsRunner
, SmartRunner
, PredictRunner
]
25 CUSTOMER_ALPHABET
= "ABCDEFG&HIJKLMN@OQRS.TUV(WXYZabcd)efghijlmn-opqrstu*vwxyz0123=45"
26 ORIGIN_ALPHABET
= "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
30 transtable
= maketrans(ORIGIN_ALPHABET
, CUSTOMER_ALPHABET
)
34 def get_reverse_transtable():
35 transtable
= maketrans(CUSTOMER_ALPHABET
, ORIGIN_ALPHABET
)
39 def encode_string(value
):
42 transtable
= get_transtable()
43 e
= str((base64
.b64encode(str(value
).encode())).decode("utf-8"))
45 return e
.translate(transtable
)
48 class Module(MgrModule
):
52 'name': 'diskprediction_server',
56 'name': 'diskprediction_port',
60 'name': 'diskprediction_user',
64 'name': 'diskprediction_password',
68 'name': 'diskprediction_upload_metrics_interval',
72 'name': 'diskprediction_upload_smart_interval',
76 'name': 'diskprediction_retrieve_prediction_interval',
80 'name': 'diskprediction_cert_context',
84 'name': 'diskprediction_ssl_target_name_override',
85 'default': 'localhost'
88 'name': 'diskprediction_default_authority',
89 'default': 'localhost'
92 'name': 'sleep_interval',
99 'cmd': 'device show-prediction-config',
100 'desc': 'Prints diskprediction configuration',
104 'cmd': 'device set-cloud-prediction-config '
105 'name=server,type=CephString,req=true '
106 'name=user,type=CephString,req=true '
107 'name=password,type=CephString,req=true '
108 'name=certfile,type=CephString,req=true '
109 'name=port,type=CephString,req=false ',
110 'desc': 'Configure Disk Prediction service',
114 'cmd': 'device debug metrics-forced',
115 'desc': 'Run metrics agent forced',
119 'cmd': 'device debug smart-forced',
120 'desc': 'Run smart agent forced',
124 'cmd': 'diskprediction_cloud status',
125 'desc': 'Check diskprediction_cloud status',
130 def __init__(self
, *args
, **kwargs
):
131 super(Module
, self
).__init
__(*args
, **kwargs
)
132 self
.status
= {'status': DP_MGR_STAT_DISABLED
}
133 self
._event
= Event()
134 self
._predict
_event
= Event()
136 self
._activated
_cloud
= False
137 self
.prediction_result
= {}
141 def config_notify(self
):
142 for opt
in self
.MODULE_OPTIONS
:
145 self
.get_module_option(opt
['name']))
146 self
.log
.debug(' %s = %s', opt
['name'], getattr(self
, opt
['name']))
147 if not self
._activated
_cloud
and self
.get_ceph_option('device_failure_prediction_mode') == 'cloud':
149 if self
._activated
_cloud
and self
.get_ceph_option('device_failure_prediction_mode') != 'cloud':
153 def config_keys(self
):
154 return dict((o
['name'], o
.get('default', None)) for o
in self
.MODULE_OPTIONS
)
156 def set_config_option(self
, option
, value
):
157 if option
not in self
.config_keys
.keys():
158 raise RuntimeError('{0} is a unknown configuration '
159 'option'.format(option
))
161 if option
in ['diskprediction_port',
162 'diskprediction_upload_metrics_interval',
163 'diskprediction_upload_smart_interval',
164 'diskprediction_retrieve_prediction_interval']:
165 if not str(value
).isdigit():
166 raise RuntimeError('invalid {} configured. Please specify '
167 'a valid integer {}'.format(option
, value
))
169 self
.log
.debug('Setting in-memory config option %s to: %s', option
,
171 self
.set_module_option(option
, value
)
172 self
.config
[option
] = value
176 def get_configuration(self
, key
):
177 return self
.get_module_option(key
, self
.config_keys
[key
])
180 def _convert_timestamp(predicted_timestamp
, life_expectancy_day
):
182 :param predicted_timestamp: unit is nanoseconds
183 :param life_expectancy_day: unit is seconds
185 date format '%Y-%m-%d' ex. 2018-01-01
187 return datetime
.fromtimestamp(
188 predicted_timestamp
/ (1000 ** 3) + life_expectancy_day
).strftime('%Y-%m-%d')
190 def _show_prediction_config(self
, cmd
):
191 self
.show_module_config()
192 return 0, json
.dumps(self
.config
, indent
=4, sort_keys
=True), ''
194 def _set_ssl_target_name(self
, cmd
):
195 str_ssl_target
= cmd
.get('ssl_target_name', '')
197 self
.set_module_option('diskprediction_ssl_target_name_override', str_ssl_target
)
199 'success to config ssl target name', 0)
200 except Exception as e
:
201 return -errno
.EINVAL
, '', str(e
)
203 def _set_ssl_default_authority(self
, cmd
):
204 str_ssl_authority
= cmd
.get('ssl_authority', '')
206 self
.set_module_option('diskprediction_default_authority', str_ssl_authority
)
207 return 0, 'success to config ssl default authority', 0
208 except Exception as e
:
209 return -errno
.EINVAL
, '', str(e
)
211 def _set_cloud_prediction_config(self
, cmd
):
212 str_cert_path
= cmd
.get('certfile', '')
213 if os
.path
.exists(str_cert_path
):
214 with
open(str_cert_path
, 'rb') as f
:
215 trusted_certs
= f
.read()
216 self
.set_config_option(
217 'diskprediction_cert_context', trusted_certs
)
218 for _agent
in self
._agents
:
220 self
.set_module_option('diskprediction_server', cmd
['server'])
221 self
.set_module_option('diskprediction_user', cmd
['user'])
222 self
.set_module_option('diskprediction_password', encode_string(cmd
['password']))
224 self
.set_module_option('diskprediction_port', cmd
['port'])
225 return 0, 'succeed to config cloud mode connection', ''
227 return -errno
.EINVAL
, '', 'certification file not existed'
229 def _debug_metrics_forced(self
, cmd
):
231 for _agent
in self
._agents
:
232 if isinstance(_agent
, MetricsRunner
):
233 msg
= 'run metrics agent successfully'
237 def _debug_smart_forced(self
, cmd
):
239 for _agent
in self
._agents
:
240 if isinstance(_agent
, SmartRunner
):
241 msg
= 'run smart agent successfully'
245 def refresh_config(self
):
246 for opt
in self
.MODULE_OPTIONS
:
249 self
.get_module_option(opt
['name']))
250 self
.log
.debug(' %s = %s', opt
['name'], getattr(self
, opt
['name']))
252 def _status(self
, cmd
):
253 return 0, json
.dumps(self
.status
), ''
255 def _refresh_cloud_prediction_result(self
):
256 for _agent
in self
._agents
:
257 if isinstance(_agent
, PredictRunner
):
258 self
._predict
_event
.clear()
260 self
._predict
_event
.wait(300)
261 if self
._predict
_event
.is_set():
262 self
._predict
_event
.clear()
265 def predict_life_expectancy(self
, devid
):
267 result
= self
.get('device {}'.format(devid
))
269 return -1, '', 'device {} not found'.format(devid
)
270 dev_info
= result
.get('device', {})
272 return -1, '', 'device {} not found'.format(devid
)
273 self
._refresh
_cloud
_prediction
_result
()
274 prediction_data
= self
.prediction_result
.get(devid
)
275 if not prediction_data
:
276 return -1, '', 'device {} prediction data not ready'.format(devid
)
277 elif prediction_data
.get('near_failure', '').lower() == 'good':
279 elif prediction_data
.get('near_failure', '').lower() == 'warning':
280 return 0, '>=2w and <=6w', ''
281 elif prediction_data
.get('near_failure', '').lower() == 'bad':
284 return 0, 'unknown', ''
286 def _update_device_life_expectancy_day(self
, devid
, prediction
):
287 # Update osd life-expectancy
288 from .common
.clusterdata
import ClusterAPI
290 life_expectancy_day_min
= None
291 life_expectancy_day_max
= None
292 if prediction
.get('predicted'):
293 predicted
= int(prediction
['predicted'])
294 if prediction
.get('near_failure'):
295 if prediction
['near_failure'].lower() == 'good':
296 life_expectancy_day_min
= (TIME_WEEK
* 6) + TIME_DAYS
297 life_expectancy_day_max
= None
298 elif prediction
['near_failure'].lower() == 'warning':
299 life_expectancy_day_min
= (TIME_WEEK
* 2)
300 life_expectancy_day_max
= (TIME_WEEK
* 6)
301 elif prediction
['near_failure'].lower() == 'bad':
302 life_expectancy_day_min
= 0
303 life_expectancy_day_max
= (TIME_WEEK
* 2) - TIME_DAYS
305 # Near failure state is unknown.
307 life_expectancy_day_min
= None
308 life_expectancy_day_max
= None
310 obj_api
= ClusterAPI(self
)
311 if predicted
and devid
and life_expectancy_day_min
is not None:
315 if life_expectancy_day_min
is not None:
316 from_date
= self
._convert
_timestamp
(predicted
, life_expectancy_day_min
)
318 if life_expectancy_day_max
is not None:
319 to_date
= self
._convert
_timestamp
(predicted
, life_expectancy_day_max
)
321 obj_api
.set_device_life_expectancy(devid
, from_date
, to_date
)
323 'succeed to set device {} life expectancy from: {}, to: {}'.format(
324 devid
, from_date
, to_date
))
325 except Exception as e
:
327 'failed to set device {} life expectancy from: {}, to: {}, {}'.format(
328 devid
, from_date
, to_date
, str(e
)))
330 obj_api
.reset_device_life_expectancy(devid
)
332 def predict_all_devices(self
):
333 if not self
._activated
_cloud
:
334 return -1, '', 'diskprecition_cloud not ready'
335 self
.refresh_config()
336 result
= self
.get('devices')
338 return -1, '', 'unable to get all devices for prediction'
339 self
._refresh
_cloud
_prediction
_result
()
340 for dev
in result
.get('devices', []):
341 devid
= dev
.get('devid')
344 prediction_data
= self
.prediction_result
.get(devid
)
347 if not prediction_data
:
348 return -1, '', 'device {} prediction data not ready'.format(dev
.get('devid'))
350 self
._update
_device
_life
_expectancy
_day
(dev
.get('devid'), prediction_data
)
353 def handle_command(self
, _
, cmd
):
354 for o_cmd
in self
.COMMANDS
:
355 if cmd
['prefix'] == o_cmd
['cmd'][:len(cmd
['prefix'])]:
357 avgs
= o_cmd
['cmd'].split(' ')
359 if avg
.lower() == 'diskprediction_cloud':
361 if avg
.lower() == 'device':
363 if '=' in avg
or ',' in avg
or not avg
:
365 fun_name
+= '_%s' % avg
.replace('-', '_')
371 return -errno
.EINVAL
, '', 'cmd not found'
373 def show_module_config(self
):
374 for key
, default
in self
.config_keys
.items():
375 self
.set_config_option(key
, self
.get_module_option(key
, default
))
378 self
.log
.info('Starting diskprediction module')
380 self
.status
= {'status': DP_MGR_STAT_ENABLED
}
383 self
.refresh_config()
384 mode
= self
.get_ceph_option('device_failure_prediction_mode')
386 if not self
._activated
_cloud
:
387 self
.start_cloud_disk_prediction()
389 if self
._activated
_cloud
:
390 self
.stop_disk_prediction()
392 # Check agent hang is?
393 restart_agent
= False
395 for dp_agent
in self
._agents
:
396 if dp_agent
.is_timeout():
397 self
.log
.error('agent name: {] timeout'.format(dp_agent
.task_name
))
400 except Exception as IOError:
401 self
.log
.error('disk prediction plugin failed to started and try to restart')
405 self
.stop_disk_prediction()
407 sleep_interval
= int(self
.sleep_interval
) or 60
408 self
._event
.wait(sleep_interval
)
410 self
.stop_disk_prediction()
412 def _agent_call_back(self
):
413 self
.log
.debug('notify refresh devices prediction result')
414 self
._predict
_event
.set()
416 def start_cloud_disk_prediction(self
):
417 assert not self
._activated
_cloud
418 for dp_agent
in DP_AGENTS
:
419 if dp_agent
== PredictRunner
:
420 obj_agent
= dp_agent(self
, 300, self
._agent
_call
_back
)
422 obj_agent
= dp_agent(self
, 300)
426 raise Exception('failed to start task %s' % obj_agent
.task_name
)
427 self
._agents
.append(obj_agent
)
428 self
._activated
_cloud
= True
429 self
.log
.info('start cloud disk prediction')
431 def stop_disk_prediction(self
):
432 assert self
._activated
_cloud
434 self
.status
= {'status': DP_MGR_STAT_DISABLED
}
436 dp_agent
= self
._agents
.pop()
437 self
.log
.info('agent name: {}'.format(dp_agent
.task_name
))
441 self
._activated
_cloud
= False
442 self
.log
.info('stop disk prediction')
443 except Exception as IOError:
444 self
.log
.error('failed to stop disk prediction clould plugin')
449 super(Module
, self
).shutdown()
452 obj_test
= TestRunner(self
)
454 self
.log
.info('self test completed')