]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/services/ceph_service.py
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / pybind / mgr / dashboard / services / ceph_service.py
1 # -*- coding: utf-8 -*-
2
3 import json
4 import logging
5
6 import rados
7 from mgr_module import CommandResult
8 from mgr_util import get_most_recent_rate, get_time_series_rates, name_to_config_section
9
10 from .. import mgr
11
12 try:
13 from typing import Any, Dict, Optional, Union
14 except ImportError:
15 pass # For typing only
16
17 logger = logging.getLogger('ceph_service')
18
19
20 class SendCommandError(rados.Error):
21 def __init__(self, err, prefix, argdict, errno):
22 self.prefix = prefix
23 self.argdict = argdict
24 super(SendCommandError, self).__init__(err, errno)
25
26
27 # pylint: disable=too-many-public-methods
28 class CephService(object):
29
30 OSD_FLAG_NO_SCRUB = 'noscrub'
31 OSD_FLAG_NO_DEEP_SCRUB = 'nodeep-scrub'
32
33 PG_STATUS_SCRUBBING = 'scrubbing'
34 PG_STATUS_DEEP_SCRUBBING = 'deep'
35
36 SCRUB_STATUS_DISABLED = 'Disabled'
37 SCRUB_STATUS_ACTIVE = 'Active'
38 SCRUB_STATUS_INACTIVE = 'Inactive'
39
40 @classmethod
41 def get_service_map(cls, service_name):
42 service_map = {} # type: Dict[str, dict]
43 for server in mgr.list_servers():
44 for service in server['services']:
45 if service['type'] == service_name:
46 if server['hostname'] not in service_map:
47 service_map[server['hostname']] = {
48 'server': server,
49 'services': []
50 }
51 inst_id = service['id']
52 metadata = mgr.get_metadata(service_name, inst_id)
53 status = mgr.get_daemon_status(service_name, inst_id)
54 service_map[server['hostname']]['services'].append({
55 'id': inst_id,
56 'type': service_name,
57 'hostname': server['hostname'],
58 'metadata': metadata,
59 'status': status
60 })
61 return service_map
62
63 @classmethod
64 def get_service_list(cls, service_name):
65 service_map = cls.get_service_map(service_name)
66 return [svc for _, svcs in service_map.items() for svc in svcs['services']]
67
68 @classmethod
69 def get_service_data_by_metadata_id(cls,
70 service_type: str,
71 metadata_id: str) -> Optional[Dict[str, Any]]:
72 for server in mgr.list_servers():
73 for service in server['services']:
74 if service['type'] == service_type:
75 metadata = mgr.get_metadata(service_type, service['id'])
76 if metadata_id == metadata['id']:
77 return {
78 'id': metadata['id'],
79 'service_map_id': str(service['id']),
80 'type': service_type,
81 'hostname': server['hostname'],
82 'metadata': metadata
83 }
84 return None
85
86 @classmethod
87 def get_service(cls, service_type: str, metadata_id: str) -> Optional[Dict[str, Any]]:
88 svc_data = cls.get_service_data_by_metadata_id(service_type, metadata_id)
89 if svc_data:
90 svc_data['status'] = mgr.get_daemon_status(svc_data['type'], svc_data['service_map_id'])
91 return svc_data
92
93 @classmethod
94 def get_service_perf_counters(cls, service_type: str, service_id: str) -> Dict[str, Any]:
95 schema_dict = mgr.get_perf_schema(service_type, service_id)
96 schema = schema_dict["{}.{}".format(service_type, service_id)]
97 counters = []
98 for key, value in sorted(schema.items()):
99 counter = {'name': str(key), 'description': value['description']}
100 # pylint: disable=W0212
101 if mgr._stattype_to_str(value['type']) == 'counter':
102 counter['value'] = cls.get_rate(
103 service_type, service_id, key)
104 counter['unit'] = mgr._unit_to_str(value['units'])
105 else:
106 counter['value'] = mgr.get_latest(
107 service_type, service_id, key)
108 counter['unit'] = ''
109 counters.append(counter)
110
111 return {
112 'service': {
113 'type': service_type,
114 'id': str(service_id)
115 },
116 'counters': counters
117 }
118
119 @classmethod
120 def get_pool_list(cls, application=None):
121 osd_map = mgr.get('osd_map')
122 if not application:
123 return osd_map['pools']
124 return [pool for pool in osd_map['pools']
125 if application in pool.get('application_metadata', {})]
126
127 @classmethod
128 def get_pool_list_with_stats(cls, application=None):
129 # pylint: disable=too-many-locals
130 pools = cls.get_pool_list(application)
131
132 pools_w_stats = []
133
134 pg_summary = mgr.get("pg_summary")
135 pool_stats = mgr.get_updated_pool_stats()
136
137 for pool in pools:
138 pool['pg_status'] = pg_summary['by_pool'][pool['pool'].__str__()]
139 stats = pool_stats[pool['pool']]
140 s = {}
141
142 for stat_name, stat_series in stats.items():
143 rates = get_time_series_rates(stat_series)
144 s[stat_name] = {
145 'latest': stat_series[0][1],
146 'rate': get_most_recent_rate(rates),
147 'rates': rates
148 }
149 pool['stats'] = s
150 pools_w_stats.append(pool)
151 return pools_w_stats
152
153 @classmethod
154 def get_erasure_code_profiles(cls):
155 def _serialize_ecp(name, ecp):
156 def serialize_numbers(key):
157 value = ecp.get(key)
158 if value is not None:
159 ecp[key] = int(value)
160
161 ecp['name'] = name
162 serialize_numbers('k')
163 serialize_numbers('m')
164 return ecp
165
166 ret = []
167 for name, ecp in mgr.get('osd_map').get('erasure_code_profiles', {}).items():
168 ret.append(_serialize_ecp(name, ecp))
169 return ret
170
171 @classmethod
172 def get_pool_name_from_id(cls, pool_id):
173 # type: (int) -> Union[str, None]
174 return mgr.rados.pool_reverse_lookup(pool_id)
175
176 @classmethod
177 def get_pool_by_attribute(cls, attribute, value):
178 # type: (str, Any) -> Union[dict, None]
179 pool_list = cls.get_pool_list()
180 for pool in pool_list:
181 if attribute in pool and pool[attribute] == value:
182 return pool
183 return None
184
185 @classmethod
186 def get_encryption_config(cls, daemon_name):
187 kms_vault_configured = False
188 s3_vault_configured = False
189 kms_backend: str = ''
190 sse_s3_backend: str = ''
191 vault_stats = []
192 full_daemon_name = 'rgw.' + daemon_name
193
194 kms_backend = CephService.send_command('mon', 'config get',
195 who=name_to_config_section(full_daemon_name),
196 key='rgw_crypt_s3_kms_backend')
197 sse_s3_backend = CephService.send_command('mon', 'config get',
198 who=name_to_config_section(full_daemon_name),
199 key='rgw_crypt_sse_s3_backend')
200
201 if kms_backend.strip() == 'vault':
202 kms_vault_auth: str = CephService.send_command('mon', 'config get',
203 who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
204 key='rgw_crypt_vault_auth')
205 kms_vault_engine: str = CephService.send_command('mon', 'config get',
206 who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
207 key='rgw_crypt_vault_secret_engine')
208 kms_vault_address: str = CephService.send_command('mon', 'config get',
209 who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
210 key='rgw_crypt_vault_addr')
211 kms_vault_token: str = CephService.send_command('mon', 'config get',
212 who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
213 key='rgw_crypt_vault_token_file') # noqa E501 #pylint: disable=line-too-long
214 if (kms_vault_auth.strip() != "" and kms_vault_engine.strip() != "" and kms_vault_address.strip() != ""): # noqa E501 #pylint: disable=line-too-long
215 if(kms_vault_auth == 'token' and kms_vault_token.strip() == ""):
216 kms_vault_configured = False
217 else:
218 kms_vault_configured = True
219
220 if sse_s3_backend.strip() == 'vault':
221 s3_vault_auth: str = CephService.send_command('mon', 'config get',
222 who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
223 key='rgw_crypt_sse_s3_vault_auth')
224 s3_vault_engine: str = CephService.send_command('mon',
225 'config get',
226 who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
227 key='rgw_crypt_sse_s3_vault_secret_engine') # noqa E501 #pylint: disable=line-too-long
228 s3_vault_address: str = CephService.send_command('mon', 'config get',
229 who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
230 key='rgw_crypt_sse_s3_vault_addr')
231 s3_vault_token: str = CephService.send_command('mon', 'config get',
232 who=name_to_config_section(full_daemon_name), # noqa E501 #pylint: disable=line-too-long
233 key='rgw_crypt_sse_s3_vault_token_file') # noqa E501 #pylint: disable=line-too-long
234
235 if (s3_vault_auth.strip() != "" and s3_vault_engine.strip() != "" and s3_vault_address.strip() != ""): # noqa E501 #pylint: disable=line-too-long
236 if(s3_vault_auth == 'token' and s3_vault_token.strip() == ""):
237 s3_vault_configured = False
238 else:
239 s3_vault_configured = True
240
241 vault_stats.append(kms_vault_configured)
242 vault_stats.append(s3_vault_configured)
243 return vault_stats
244
245 @classmethod
246 def set_encryption_config(cls, encryption_type, kms_provider, auth_method,
247 secret_engine, secret_path, namespace, address,
248 token, daemon_name, ssl_cert, client_cert, client_key):
249 full_daemon_name = 'rgw.' + daemon_name
250 if encryption_type == 'aws:kms':
251
252 KMS_CONFIG = [
253 ['rgw_crypt_s3_kms_backend', kms_provider],
254 ['rgw_crypt_vault_auth', auth_method],
255 ['rgw_crypt_vault_prefix', secret_path],
256 ['rgw_crypt_vault_namespace', namespace],
257 ['rgw_crypt_vault_secret_engine', secret_engine],
258 ['rgw_crypt_vault_addr', address],
259 ['rgw_crypt_vault_token_file', token],
260 ['rgw_crypt_vault_ssl_cacert', ssl_cert],
261 ['rgw_crypt_vault_ssl_clientcert', client_cert],
262 ['rgw_crypt_vault_ssl_clientkey', client_key]
263 ]
264
265 for (key, value) in KMS_CONFIG:
266 if value == 'null':
267 continue
268 CephService.send_command('mon', 'config set',
269 who=name_to_config_section(full_daemon_name),
270 name=key, value=value)
271
272 if encryption_type == 'AES256':
273
274 SSE_S3_CONFIG = [
275 ['rgw_crypt_sse_s3_backend', kms_provider],
276 ['rgw_crypt_sse_s3_vault_auth', auth_method],
277 ['rgw_crypt_sse_s3_vault_prefix', secret_path],
278 ['rgw_crypt_sse_s3_vault_namespace', namespace],
279 ['rgw_crypt_sse_s3_vault_secret_engine', secret_engine],
280 ['rgw_crypt_sse_s3_vault_addr', address],
281 ['rgw_crypt_sse_s3_vault_token_file', token],
282 ['rgw_crypt_sse_s3_vault_ssl_cacert', ssl_cert],
283 ['rgw_crypt_sse_s3_vault_ssl_clientcert', client_cert],
284 ['rgw_crypt_sse_s3_vault_ssl_clientkey', client_key]
285 ]
286
287 for (key, value) in SSE_S3_CONFIG:
288 if value == 'null':
289 continue
290 CephService.send_command('mon', 'config set',
291 who=name_to_config_section(full_daemon_name),
292 name=key, value=value)
293
294 return {}
295
296 @classmethod
297 def set_multisite_config(cls, realm_name, zonegroup_name, zone_name, daemon_name):
298 full_daemon_name = 'rgw.' + daemon_name
299
300 KMS_CONFIG = [
301 ['rgw_realm', realm_name],
302 ['rgw_zonegroup', zonegroup_name],
303 ['rgw_zone', zone_name]
304 ]
305
306 for (key, value) in KMS_CONFIG:
307 if value == 'null':
308 continue
309 CephService.send_command('mon', 'config set',
310 who=name_to_config_section(full_daemon_name),
311 name=key, value=value)
312 return {}
313
314 @classmethod
315 def get_realm_tokens(cls):
316 tokens_info = mgr.remote('rgw', 'get_realm_tokens')
317 return tokens_info
318
319 @classmethod
320 def import_realm_token(cls, realm_token, zone_name, port, placement_spec):
321 tokens_info = mgr.remote('rgw', 'import_realm_token', zone_name=zone_name,
322 realm_token=realm_token, port=port, placement=placement_spec,
323 start_radosgw=True)
324 return tokens_info
325
326 @classmethod
327 def get_pool_pg_status(cls, pool_name):
328 # type: (str) -> dict
329 pool = cls.get_pool_by_attribute('pool_name', pool_name)
330 if pool is None:
331 return {}
332 return mgr.get("pg_summary")['by_pool'][pool['pool'].__str__()]
333
334 @staticmethod
335 def send_command(srv_type, prefix, srv_spec='', to_json=True, inbuf='', **kwargs):
336 # type: (str, str, Optional[str], bool, str, Any) -> Any
337 """
338 :type prefix: str
339 :param srv_type: mon |
340 :param kwargs: will be added to argdict
341 :param srv_spec: typically empty. or something like "<fs_id>:0"
342 :param to_json: if true return as json format
343
344 :raises PermissionError: See rados.make_ex
345 :raises ObjectNotFound: See rados.make_ex
346 :raises IOError: See rados.make_ex
347 :raises NoSpace: See rados.make_ex
348 :raises ObjectExists: See rados.make_ex
349 :raises ObjectBusy: See rados.make_ex
350 :raises NoData: See rados.make_ex
351 :raises InterruptedOrTimeoutError: See rados.make_ex
352 :raises TimedOut: See rados.make_ex
353 :raises ValueError: return code != 0
354 """
355 argdict = {
356 "prefix": prefix,
357 }
358 if to_json:
359 argdict["format"] = "json"
360 argdict.update({k: v for k, v in kwargs.items() if v is not None})
361 result = CommandResult("")
362 mgr.send_command(result, srv_type, srv_spec, json.dumps(argdict), "", inbuf=inbuf)
363 r, outb, outs = result.wait()
364 if r != 0:
365 logger.error("send_command '%s' failed. (r=%s, outs=\"%s\", kwargs=%s)", prefix, r,
366 outs, kwargs)
367
368 raise SendCommandError(outs, prefix, argdict, r)
369
370 try:
371 return json.loads(outb or outs)
372 except Exception: # pylint: disable=broad-except
373 return outb
374
375 @staticmethod
376 def _get_smart_data_by_device(device):
377 # type: (dict) -> Dict[str, dict]
378 # Check whether the device is associated with daemons.
379 if 'daemons' in device and device['daemons']:
380 dev_smart_data: Dict[str, Any] = {}
381
382 # Get a list of all OSD daemons on all hosts that are 'up'
383 # because SMART data can not be retrieved from daemons that
384 # are 'down' or 'destroyed'.
385 osd_tree = CephService.send_command('mon', 'osd tree')
386 osd_daemons_up = [
387 node['name'] for node in osd_tree.get('nodes', {})
388 if node.get('status') == 'up'
389 ]
390
391 # All daemons on the same host can deliver SMART data,
392 # thus it is not relevant for us which daemon we are using.
393 # NOTE: the list may contain daemons that are 'down' or 'destroyed'.
394 for daemon in device['daemons']:
395 svc_type, svc_id = daemon.split('.', 1)
396 if 'osd' in svc_type:
397 if daemon not in osd_daemons_up:
398 continue
399 try:
400 dev_smart_data = CephService.send_command(
401 svc_type, 'smart', svc_id, devid=device['devid'])
402 except SendCommandError as error:
403 logger.warning(str(error))
404 # Try to retrieve SMART data from another daemon.
405 continue
406 elif 'mon' in svc_type:
407 try:
408 dev_smart_data = CephService.send_command(
409 svc_type, 'device query-daemon-health-metrics', who=daemon)
410 except SendCommandError as error:
411 logger.warning(str(error))
412 # Try to retrieve SMART data from another daemon.
413 continue
414 else:
415 dev_smart_data = {}
416
417 CephService.log_dev_data_error(dev_smart_data)
418
419 break
420
421 return dev_smart_data
422 logger.warning('[SMART] No daemons associated with device ID "%s"',
423 device['devid'])
424 return {}
425
426 @staticmethod
427 def log_dev_data_error(dev_smart_data):
428 for dev_id, dev_data in dev_smart_data.items():
429 if 'error' in dev_data:
430 logger.warning(
431 '[SMART] Error retrieving smartctl data for device ID "%s": %s',
432 dev_id, dev_data)
433
434 @staticmethod
435 def get_devices_by_host(hostname):
436 # type: (str) -> dict
437 return CephService.send_command('mon',
438 'device ls-by-host',
439 host=hostname)
440
441 @staticmethod
442 def get_devices_by_daemon(daemon_type, daemon_id):
443 # type: (str, str) -> dict
444 return CephService.send_command('mon',
445 'device ls-by-daemon',
446 who='{}.{}'.format(
447 daemon_type, daemon_id))
448
449 @staticmethod
450 def get_smart_data_by_host(hostname):
451 # type: (str) -> dict
452 """
453 Get the SMART data of all devices on the given host, regardless
454 of the daemon (osd, mon, ...).
455 :param hostname: The name of the host.
456 :return: A dictionary containing the SMART data of every device
457 on the given host. The device name is used as the key in the
458 dictionary.
459 """
460 devices = CephService.get_devices_by_host(hostname)
461 smart_data = {} # type: dict
462 if devices:
463 for device in devices:
464 if device['devid'] not in smart_data:
465 smart_data.update(
466 CephService._get_smart_data_by_device(device))
467 else:
468 logger.debug('[SMART] could not retrieve device list from host %s', hostname)
469 return smart_data
470
471 @staticmethod
472 def get_smart_data_by_daemon(daemon_type, daemon_id):
473 # type: (str, str) -> Dict[str, dict]
474 """
475 Get the SMART data of the devices associated with the given daemon.
476 :param daemon_type: The daemon type, e.g. 'osd' or 'mon'.
477 :param daemon_id: The daemon identifier.
478 :return: A dictionary containing the SMART data of every device
479 associated with the given daemon. The device name is used as the
480 key in the dictionary.
481 """
482 devices = CephService.get_devices_by_daemon(daemon_type, daemon_id)
483 smart_data = {} # type: Dict[str, dict]
484 if devices:
485 for device in devices:
486 if device['devid'] not in smart_data:
487 smart_data.update(
488 CephService._get_smart_data_by_device(device))
489 else:
490 msg = '[SMART] could not retrieve device list from daemon with type %s and ' +\
491 'with ID %s'
492 logger.debug(msg, daemon_type, daemon_id)
493 return smart_data
494
495 @classmethod
496 def get_rates(cls, svc_type, svc_name, path):
497 """
498 :return: the derivative of mgr.get_counter()
499 :rtype: list[tuple[int, float]]"""
500 data = mgr.get_counter(svc_type, svc_name, path)[path]
501 return get_time_series_rates(data)
502
503 @classmethod
504 def get_rate(cls, svc_type, svc_name, path):
505 """returns most recent rate"""
506 return get_most_recent_rate(cls.get_rates(svc_type, svc_name, path))
507
508 @classmethod
509 def get_client_perf(cls):
510 pools_stats = mgr.get('osd_pool_stats')['pool_stats']
511
512 io_stats = {
513 'read_bytes_sec': 0,
514 'read_op_per_sec': 0,
515 'write_bytes_sec': 0,
516 'write_op_per_sec': 0,
517 }
518 recovery_stats = {'recovering_bytes_per_sec': 0}
519
520 for pool_stats in pools_stats:
521 client_io = pool_stats['client_io_rate']
522 for stat in list(io_stats.keys()):
523 if stat in client_io:
524 io_stats[stat] += client_io[stat]
525
526 client_recovery = pool_stats['recovery_rate']
527 for stat in list(recovery_stats.keys()):
528 if stat in client_recovery:
529 recovery_stats[stat] += client_recovery[stat]
530
531 client_perf = io_stats.copy()
532 client_perf.update(recovery_stats)
533
534 return client_perf
535
536 @classmethod
537 def get_scrub_status(cls):
538 enabled_flags = mgr.get('osd_map')['flags_set']
539 if cls.OSD_FLAG_NO_SCRUB in enabled_flags or cls.OSD_FLAG_NO_DEEP_SCRUB in enabled_flags:
540 return cls.SCRUB_STATUS_DISABLED
541
542 grouped_pg_statuses = mgr.get('pg_summary')['all']
543 for grouped_pg_status in grouped_pg_statuses.keys():
544 if len(grouped_pg_status.split(cls.PG_STATUS_SCRUBBING)) > 1 \
545 or len(grouped_pg_status.split(cls.PG_STATUS_DEEP_SCRUBBING)) > 1:
546 return cls.SCRUB_STATUS_ACTIVE
547
548 return cls.SCRUB_STATUS_INACTIVE
549
550 @classmethod
551 def get_pg_info(cls):
552 pg_summary = mgr.get('pg_summary')
553 object_stats = {stat: pg_summary['pg_stats_sum']['stat_sum'][stat] for stat in [
554 'num_objects', 'num_object_copies', 'num_objects_degraded',
555 'num_objects_misplaced', 'num_objects_unfound']}
556
557 pgs_per_osd = 0.0
558 total_osds = len(pg_summary['by_osd'])
559 if total_osds > 0:
560 total_pgs = 0.0
561 for _, osd_pg_statuses in pg_summary['by_osd'].items():
562 for _, pg_amount in osd_pg_statuses.items():
563 total_pgs += pg_amount
564
565 pgs_per_osd = total_pgs / total_osds
566
567 return {
568 'object_stats': object_stats,
569 'statuses': pg_summary['all'],
570 'pgs_per_osd': pgs_per_osd,
571 }