]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/diskprediction_cloud/module.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / mgr / diskprediction_cloud / module.py
1 """
2 diskprediction with cloud predictor
3 """
4 from __future__ import absolute_import
5
6 import base64
7 from datetime import datetime
8 import errno
9 import json
10 from mgr_module import MgrModule
11 import os
12 from threading import Event
13
14 try:
15 from string import maketrans
16 except ImportError:
17 maketrans = str.maketrans
18
19 from .common import DP_MGR_STAT_ENABLED, DP_MGR_STAT_DISABLED
20 from .task import MetricsRunner, SmartRunner, PredictRunner, TestRunner
21
22 TIME_DAYS = 24*60*60
23 TIME_WEEK = TIME_DAYS * 7
24 DP_AGENTS = [MetricsRunner, SmartRunner, PredictRunner]
25 CUSTOMER_ALPHABET = "ABCDEFG&HIJKLMN@OQRS.TUV(WXYZabcd)efghijlmn-opqrstu*vwxyz0123=45"
26 ORIGIN_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
27
28
29 def get_transtable():
30 transtable = maketrans(ORIGIN_ALPHABET, CUSTOMER_ALPHABET)
31 return transtable
32
33
34 def get_reverse_transtable():
35 transtable = maketrans(CUSTOMER_ALPHABET, ORIGIN_ALPHABET)
36 return transtable
37
38
39 def encode_string(value):
40 if len(value) == 0:
41 return ""
42 transtable = get_transtable()
43 e = str((base64.b64encode(str(value).encode())).decode("utf-8"))
44 e = e.rstrip("=")
45 return e.translate(transtable)
46
47
48 class Module(MgrModule):
49
50 MODULE_OPTIONS = [
51 {
52 'name': 'diskprediction_server',
53 'default': ''
54 },
55 {
56 'name': 'diskprediction_port',
57 'default': '31400'
58 },
59 {
60 'name': 'diskprediction_user',
61 'default': ''
62 },
63 {
64 'name': 'diskprediction_password',
65 'default': ''
66 },
67 {
68 'name': 'diskprediction_upload_metrics_interval',
69 'default': '600'
70 },
71 {
72 'name': 'diskprediction_upload_smart_interval',
73 'default': '43200'
74 },
75 {
76 'name': 'diskprediction_retrieve_prediction_interval',
77 'default': '43200'
78 },
79 {
80 'name': 'diskprediction_cert_context',
81 'default': ''
82 },
83 {
84 'name': 'diskprediction_ssl_target_name_override',
85 'default': 'localhost'
86 },
87 {
88 'name': 'diskprediction_default_authority',
89 'default': 'localhost'
90 },
91 {
92 'name': 'sleep_interval',
93 'default': str(600),
94 }
95 ]
96
97 COMMANDS = [
98 {
99 'cmd': 'device show-prediction-config',
100 'desc': 'Prints diskprediction configuration',
101 'perm': 'r'
102 },
103 {
104 'cmd': 'device set-cloud-prediction-config '
105 'name=server,type=CephString,req=true '
106 'name=user,type=CephString,req=true '
107 'name=password,type=CephString,req=true '
108 'name=certfile,type=CephString,req=true '
109 'name=port,type=CephString,req=false ',
110 'desc': 'Configure Disk Prediction service',
111 'perm': 'rw'
112 },
113 {
114 'cmd': 'device debug metrics-forced',
115 'desc': 'Run metrics agent forced',
116 'perm': 'r'
117 },
118 {
119 'cmd': 'device debug smart-forced',
120 'desc': 'Run smart agent forced',
121 'perm': 'r'
122 },
123 {
124 'cmd': 'diskprediction_cloud status',
125 'desc': 'Check diskprediction_cloud status',
126 'perm': 'r'
127 }
128 ]
129
130 def __init__(self, *args, **kwargs):
131 super(Module, self).__init__(*args, **kwargs)
132 self.status = {'status': DP_MGR_STAT_DISABLED}
133 self._event = Event()
134 self._predict_event = Event()
135 self._agents = []
136 self._activated_cloud = False
137 self.prediction_result = {}
138 self.config = dict()
139 self._run = True
140
141 def config_notify(self):
142 for opt in self.MODULE_OPTIONS:
143 setattr(self,
144 opt['name'],
145 self.get_module_option(opt['name']) or opt['default'])
146 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
147 if not self._activated_cloud and self.get_ceph_option('device_failure_prediction_mode') == 'cloud':
148 self._event.set()
149 if self._activated_cloud and self.get_ceph_option('device_failure_prediction_mode') != 'cloud':
150 self._event.set()
151
152 @property
153 def config_keys(self):
154 return dict((o['name'], o.get('default', None)) for o in self.MODULE_OPTIONS)
155
156 def set_config_option(self, option, value):
157 if option not in self.config_keys.keys():
158 raise RuntimeError('{0} is a unknown configuration '
159 'option'.format(option))
160
161 if option in ['diskprediction_port',
162 'diskprediction_upload_metrics_interval',
163 'diskprediction_upload_smart_interval',
164 'diskprediction_retrieve_prediction_interval']:
165 if not str(value).isdigit():
166 raise RuntimeError('invalid {} configured. Please specify '
167 'a valid integer {}'.format(option, value))
168
169 self.log.debug('Setting in-memory config option %s to: %s', option,
170 value)
171 self.set_module_option(option, value)
172 self.config[option] = value
173
174 return True
175
176 def get_configuration(self, key):
177 return self.get_module_option(key, self.config_keys[key])
178
179 @staticmethod
180 def _convert_timestamp(predicted_timestamp, life_expectancy_day):
181 """
182 :param predicted_timestamp: unit is nanoseconds
183 :param life_expectancy_day: unit is seconds
184 :return:
185 date format '%Y-%m-%d' ex. 2018-01-01
186 """
187 return datetime.fromtimestamp(
188 predicted_timestamp / (1000 ** 3) + life_expectancy_day).strftime('%Y-%m-%d')
189
190 def _show_prediction_config(self, cmd):
191 self.show_module_config()
192 return 0, json.dumps(self.config, indent=4), ''
193
194 def _set_ssl_target_name(self, cmd):
195 str_ssl_target = cmd.get('ssl_target_name', '')
196 try:
197 self.set_module_option('diskprediction_ssl_target_name_override', str_ssl_target)
198 return (0,
199 'success to config ssl target name', 0)
200 except Exception as e:
201 return -errno.EINVAL, '', str(e)
202
203 def _set_ssl_default_authority(self, cmd):
204 str_ssl_authority = cmd.get('ssl_authority', '')
205 try:
206 self.set_module_option('diskprediction_default_authority', str_ssl_authority)
207 return 0, 'success to config ssl default authority', 0
208 except Exception as e:
209 return -errno.EINVAL, '', str(e)
210
211 def _set_cloud_prediction_config(self, cmd):
212 str_cert_path = cmd.get('certfile', '')
213 if os.path.exists(str_cert_path):
214 with open(str_cert_path, 'rb') as f:
215 trusted_certs = f.read()
216 self.set_config_option(
217 'diskprediction_cert_context', trusted_certs)
218 for _agent in self._agents:
219 _agent.event.set()
220 self.set_module_option('diskprediction_server', cmd['server'])
221 self.set_module_option('diskprediction_user', cmd['user'])
222 self.set_module_option('diskprediction_password', encode_string(cmd['password']))
223 if cmd.get('port'):
224 self.set_module_option('diskprediction_port', cmd['port'])
225 return 0, 'succeed to config cloud mode connection', ''
226 else:
227 return -errno.EINVAL, '', 'certification file not existed'
228
229 def _debug_metrics_forced(self, cmd):
230 msg = ''
231 for _agent in self._agents:
232 if isinstance(_agent, MetricsRunner):
233 msg = 'run metrics agent successfully'
234 _agent.event.set()
235 return 0, msg, ''
236
237 def _debug_smart_forced(self, cmd):
238 msg = ' '
239 for _agent in self._agents:
240 if isinstance(_agent, SmartRunner):
241 msg = 'run smart agent successfully'
242 _agent.event.set()
243 return 0, msg, ''
244
245 def refresh_config(self):
246 for opt in self.MODULE_OPTIONS:
247 setattr(self,
248 opt['name'],
249 self.get_module_option(opt['name']) or opt['default'])
250 self.log.debug(' %s = %s', opt['name'], getattr(self, opt['name']))
251
252 def _status(self, cmd):
253 return 0, json.dumps(self.status), ''
254
255 def _refresh_cloud_prediction_result(self):
256 for _agent in self._agents:
257 if isinstance(_agent, PredictRunner):
258 self._predict_event.clear()
259 _agent.event.set()
260 self._predict_event.wait(300)
261 if self._predict_event.is_set():
262 self._predict_event.clear()
263 break
264
265 def predict_life_expectancy(self, devid):
266 assert devid
267 result = self.get('device {}'.format(devid))
268 if not result:
269 return -1, '', 'device {} not found'.format(devid)
270 dev_info = result.get('device', {})
271 if not dev_info:
272 return -1, '', 'device {} not found'.format(devid)
273 self._refresh_cloud_prediction_result()
274 prediction_data = self.prediction_result.get(devid)
275 if not prediction_data:
276 return -1, '', 'device {} prediction data not ready'.format(devid)
277 elif prediction_data.get('near_failure', '').lower() == 'good':
278 return 0, '>6w', ''
279 elif prediction_data.get('near_failure', '').lower() == 'warning':
280 return 0, '>=2w and <=6w', ''
281 elif prediction_data.get('near_failure', '').lower() == 'bad':
282 return 0, '<2w', ''
283 else:
284 return 0, 'unknown', ''
285
286 def _update_device_life_expectancy_day(self, devid, prediction):
287 # Update osd life-expectancy
288 from .common.clusterdata import ClusterAPI
289 predicted = None
290 life_expectancy_day_min = None
291 life_expectancy_day_max = None
292 if prediction.get('predicted'):
293 predicted = int(prediction['predicted'])
294 if prediction.get('near_failure'):
295 if prediction['near_failure'].lower() == 'good':
296 life_expectancy_day_min = (TIME_WEEK * 6) + TIME_DAYS
297 life_expectancy_day_max = None
298 elif prediction['near_failure'].lower() == 'warning':
299 life_expectancy_day_min = (TIME_WEEK * 2)
300 life_expectancy_day_max = (TIME_WEEK * 6)
301 elif prediction['near_failure'].lower() == 'bad':
302 life_expectancy_day_min = 0
303 life_expectancy_day_max = (TIME_WEEK * 2) - TIME_DAYS
304 else:
305 # Near failure state is unknown.
306 predicted = None
307 life_expectancy_day_min = None
308 life_expectancy_day_max = None
309
310 obj_api = ClusterAPI(self)
311 if predicted and devid and life_expectancy_day_min is not None:
312 from_date = None
313 to_date = None
314 try:
315 if life_expectancy_day_min is not None:
316 from_date = self._convert_timestamp(predicted, life_expectancy_day_min)
317
318 if life_expectancy_day_max is not None:
319 to_date = self._convert_timestamp(predicted, life_expectancy_day_max)
320
321 obj_api.set_device_life_expectancy(devid, from_date, to_date)
322 self.log.info(
323 'succeed to set device {} life expectancy from: {}, to: {}'.format(
324 devid, from_date, to_date))
325 except Exception as e:
326 self.log.error(
327 'failed to set device {} life expectancy from: {}, to: {}, {}'.format(
328 devid, from_date, to_date, str(e)))
329 else:
330 obj_api.reset_device_life_expectancy(devid)
331
332 def predict_all_devices(self):
333 if not self._activated_cloud:
334 return -1, '', 'diskprecition_cloud not ready'
335 self.refresh_config()
336 result = self.get('devices')
337 if not result:
338 return -1, '', 'unable to get all devices for prediction'
339 self._refresh_cloud_prediction_result()
340 for dev in result.get('devices', []):
341 devid = dev.get('devid')
342 if not devid:
343 continue
344 prediction_data = self.prediction_result.get(devid)
345 if prediction_data:
346 break
347 if not prediction_data:
348 return -1, '', 'device {} prediction data not ready'.format(dev.get('devid'))
349 else:
350 self._update_device_life_expectancy_day(dev.get('devid'), prediction_data)
351 return 0, '', ''
352
353 def handle_command(self, _, cmd):
354 for o_cmd in self.COMMANDS:
355 if cmd['prefix'] == o_cmd['cmd'][:len(cmd['prefix'])]:
356 fun_name = ''
357 avgs = o_cmd['cmd'].split(' ')
358 for avg in avgs:
359 if avg.lower() == 'diskprediction_cloud':
360 continue
361 if avg.lower() == 'device':
362 continue
363 if '=' in avg or ',' in avg or not avg:
364 continue
365 fun_name += '_%s' % avg.replace('-', '_')
366 if fun_name:
367 fun = getattr(
368 self, fun_name)
369 if fun:
370 return fun(cmd)
371 return -errno.EINVAL, '', 'cmd not found'
372
373 def show_module_config(self):
374 for key, default in self.config_keys.items():
375 self.set_config_option(key, self.get_module_option(key, default))
376
377 def serve(self):
378 self.log.info('Starting diskprediction module')
379 self.config_notify()
380 self.status = {'status': DP_MGR_STAT_ENABLED}
381
382 while self._run:
383 self.refresh_config()
384 mode = self.get_ceph_option('device_failure_prediction_mode')
385 if mode == 'cloud':
386 if not self._activated_cloud:
387 self.start_cloud_disk_prediction()
388 else:
389 if self._activated_cloud:
390 self.stop_disk_prediction()
391
392 # Check agent hang is?
393 restart_agent = False
394 try:
395 for dp_agent in self._agents:
396 if dp_agent.is_timeout():
397 self.log.error('agent name: {] timeout'.format(dp_agent.task_name))
398 restart_agent = True
399 break
400 except Exception as IOError:
401 self.log.error('disk prediction plugin failed to started and try to restart')
402 restart_agent = True
403
404 if restart_agent:
405 self.stop_disk_prediction()
406 else:
407 sleep_interval = int(self.sleep_interval) or 60
408 self._event.wait(sleep_interval)
409 self._event.clear()
410 self.stop_disk_prediction()
411
412 def _agent_call_back(self):
413 self.log.debug('notify refresh devices prediction result')
414 self._predict_event.set()
415
416 def start_cloud_disk_prediction(self):
417 assert not self._activated_cloud
418 for dp_agent in DP_AGENTS:
419 if dp_agent == PredictRunner:
420 obj_agent = dp_agent(self, 300, self._agent_call_back)
421 else:
422 obj_agent = dp_agent(self, 300)
423 if obj_agent:
424 obj_agent.start()
425 else:
426 raise Exception('failed to start task %s' % obj_agent.task_name)
427 self._agents.append(obj_agent)
428 self._activated_cloud = True
429 self.log.info('start cloud disk prediction')
430
431 def stop_disk_prediction(self):
432 assert self._activated_cloud
433 try:
434 self.status = {'status': DP_MGR_STAT_DISABLED}
435 while self._agents:
436 dp_agent = self._agents.pop()
437 self.log.info('agent name: {}'.format(dp_agent.task_name))
438 dp_agent.terminate()
439 dp_agent.join(5)
440 del dp_agent
441 self._activated_cloud = False
442 self.log.info('stop disk prediction')
443 except Exception as IOError:
444 self.log.error('failed to stop disk prediction clould plugin')
445
446 def shutdown(self):
447 self._run = False
448 self._event.set()
449 super(Module, self).shutdown()
450
451 def self_test(self):
452 obj_test = TestRunner(self)
453 obj_test.run()
454 self.log.info('self test completed')