]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/services/ceph_service.py
1 # -*- coding: utf-8 -*-
7 from mgr_module
import CommandResult
8 from mgr_util
import get_most_recent_rate
, get_time_series_rates
, name_to_config_section
13 from typing
import Any
, Dict
, Optional
, Union
15 pass # For typing only
17 logger
= logging
.getLogger('ceph_service')
20 class SendCommandError(rados
.Error
):
21 def __init__(self
, err
, prefix
, argdict
, errno
):
23 self
.argdict
= argdict
24 super(SendCommandError
, self
).__init
__(err
, errno
)
27 # pylint: disable=too-many-public-methods
28 class CephService(object):
30 OSD_FLAG_NO_SCRUB
= 'noscrub'
31 OSD_FLAG_NO_DEEP_SCRUB
= 'nodeep-scrub'
33 PG_STATUS_SCRUBBING
= 'scrubbing'
34 PG_STATUS_DEEP_SCRUBBING
= 'deep'
36 SCRUB_STATUS_DISABLED
= 'Disabled'
37 SCRUB_STATUS_ACTIVE
= 'Active'
38 SCRUB_STATUS_INACTIVE
= 'Inactive'
41 def get_service_map(cls
, service_name
):
42 service_map
= {} # type: Dict[str, dict]
43 for server
in mgr
.list_servers():
44 for service
in server
['services']:
45 if service
['type'] == service_name
:
46 if server
['hostname'] not in service_map
:
47 service_map
[server
['hostname']] = {
51 inst_id
= service
['id']
52 metadata
= mgr
.get_metadata(service_name
, inst_id
)
53 status
= mgr
.get_daemon_status(service_name
, inst_id
)
54 service_map
[server
['hostname']]['services'].append({
57 'hostname': server
['hostname'],
64 def get_service_list(cls
, service_name
):
65 service_map
= cls
.get_service_map(service_name
)
66 return [svc
for _
, svcs
in service_map
.items() for svc
in svcs
['services']]
69 def get_service_data_by_metadata_id(cls
,
71 metadata_id
: str) -> Optional
[Dict
[str, Any
]]:
72 for server
in mgr
.list_servers():
73 for service
in server
['services']:
74 if service
['type'] == service_type
:
75 metadata
= mgr
.get_metadata(service_type
, service
['id'])
76 if metadata_id
== metadata
['id']:
79 'service_map_id': str(service
['id']),
81 'hostname': server
['hostname'],
87 def get_service(cls
, service_type
: str, metadata_id
: str) -> Optional
[Dict
[str, Any
]]:
88 svc_data
= cls
.get_service_data_by_metadata_id(service_type
, metadata_id
)
90 svc_data
['status'] = mgr
.get_daemon_status(svc_data
['type'], svc_data
['service_map_id'])
94 def get_service_perf_counters(cls
, service_type
: str, service_id
: str) -> Dict
[str, Any
]:
95 schema_dict
= mgr
.get_perf_schema(service_type
, service_id
)
96 schema
= schema_dict
["{}.{}".format(service_type
, service_id
)]
98 for key
, value
in sorted(schema
.items()):
99 counter
= {'name': str(key
), 'description': value
['description']}
100 # pylint: disable=W0212
101 if mgr
._stattype
_to
_str
(value
['type']) == 'counter':
102 counter
['value'] = cls
.get_rate(
103 service_type
, service_id
, key
)
104 counter
['unit'] = mgr
._unit
_to
_str
(value
['units'])
106 counter
['value'] = mgr
.get_latest(
107 service_type
, service_id
, key
)
109 counters
.append(counter
)
113 'type': service_type
,
114 'id': str(service_id
)
120 def get_pool_list(cls
, application
=None):
121 osd_map
= mgr
.get('osd_map')
123 return osd_map
['pools']
124 return [pool
for pool
in osd_map
['pools']
125 if application
in pool
.get('application_metadata', {})]
128 def get_pool_list_with_stats(cls
, application
=None):
129 # pylint: disable=too-many-locals
130 pools
= cls
.get_pool_list(application
)
134 pg_summary
= mgr
.get("pg_summary")
135 pool_stats
= mgr
.get_updated_pool_stats()
138 pool
['pg_status'] = pg_summary
['by_pool'][pool
['pool'].__str
__()]
139 stats
= pool_stats
[pool
['pool']]
142 for stat_name
, stat_series
in stats
.items():
143 rates
= get_time_series_rates(stat_series
)
145 'latest': stat_series
[0][1],
146 'rate': get_most_recent_rate(rates
),
150 pools_w_stats
.append(pool
)
154 def get_erasure_code_profiles(cls
):
155 def _serialize_ecp(name
, ecp
):
156 def serialize_numbers(key
):
158 if value
is not None:
159 ecp
[key
] = int(value
)
162 serialize_numbers('k')
163 serialize_numbers('m')
167 for name
, ecp
in mgr
.get('osd_map').get('erasure_code_profiles', {}).items():
168 ret
.append(_serialize_ecp(name
, ecp
))
172 def get_pool_name_from_id(cls
, pool_id
):
173 # type: (int) -> Union[str, None]
174 return mgr
.rados
.pool_reverse_lookup(pool_id
)
177 def get_pool_by_attribute(cls
, attribute
, value
):
178 # type: (str, Any) -> Union[dict, None]
179 pool_list
= cls
.get_pool_list()
180 for pool
in pool_list
:
181 if attribute
in pool
and pool
[attribute
] == value
:
186 def get_encryption_config(cls
, daemon_name
):
187 kms_vault_configured
= False
188 s3_vault_configured
= False
189 kms_backend
: str = ''
190 sse_s3_backend
: str = ''
192 full_daemon_name
= 'rgw.' + daemon_name
194 kms_backend
= CephService
.send_command('mon', 'config get',
195 who
=name_to_config_section(full_daemon_name
),
196 key
='rgw_crypt_s3_kms_backend')
197 sse_s3_backend
= CephService
.send_command('mon', 'config get',
198 who
=name_to_config_section(full_daemon_name
),
199 key
='rgw_crypt_sse_s3_backend')
201 if kms_backend
.strip() == 'vault':
202 kms_vault_auth
: str = CephService
.send_command('mon', 'config get',
203 who
=name_to_config_section(full_daemon_name
), # noqa E501 #pylint: disable=line-too-long
204 key
='rgw_crypt_vault_auth')
205 kms_vault_engine
: str = CephService
.send_command('mon', 'config get',
206 who
=name_to_config_section(full_daemon_name
), # noqa E501 #pylint: disable=line-too-long
207 key
='rgw_crypt_vault_secret_engine')
208 kms_vault_address
: str = CephService
.send_command('mon', 'config get',
209 who
=name_to_config_section(full_daemon_name
), # noqa E501 #pylint: disable=line-too-long
210 key
='rgw_crypt_vault_addr')
211 kms_vault_token
: str = CephService
.send_command('mon', 'config get',
212 who
=name_to_config_section(full_daemon_name
), # noqa E501 #pylint: disable=line-too-long
213 key
='rgw_crypt_vault_token_file') # noqa E501 #pylint: disable=line-too-long
214 if (kms_vault_auth
.strip() != "" and kms_vault_engine
.strip() != "" and kms_vault_address
.strip() != ""): # noqa E501 #pylint: disable=line-too-long
215 if(kms_vault_auth
== 'token' and kms_vault_token
.strip() == ""):
216 kms_vault_configured
= False
218 kms_vault_configured
= True
220 if sse_s3_backend
.strip() == 'vault':
221 s3_vault_auth
: str = CephService
.send_command('mon', 'config get',
222 who
=name_to_config_section(full_daemon_name
), # noqa E501 #pylint: disable=line-too-long
223 key
='rgw_crypt_sse_s3_vault_auth')
224 s3_vault_engine
: str = CephService
.send_command('mon',
226 who
=name_to_config_section(full_daemon_name
), # noqa E501 #pylint: disable=line-too-long
227 key
='rgw_crypt_sse_s3_vault_secret_engine') # noqa E501 #pylint: disable=line-too-long
228 s3_vault_address
: str = CephService
.send_command('mon', 'config get',
229 who
=name_to_config_section(full_daemon_name
), # noqa E501 #pylint: disable=line-too-long
230 key
='rgw_crypt_sse_s3_vault_addr')
231 s3_vault_token
: str = CephService
.send_command('mon', 'config get',
232 who
=name_to_config_section(full_daemon_name
), # noqa E501 #pylint: disable=line-too-long
233 key
='rgw_crypt_sse_s3_vault_token_file') # noqa E501 #pylint: disable=line-too-long
235 if (s3_vault_auth
.strip() != "" and s3_vault_engine
.strip() != "" and s3_vault_address
.strip() != ""): # noqa E501 #pylint: disable=line-too-long
236 if(s3_vault_auth
== 'token' and s3_vault_token
.strip() == ""):
237 s3_vault_configured
= False
239 s3_vault_configured
= True
241 vault_stats
.append(kms_vault_configured
)
242 vault_stats
.append(s3_vault_configured
)
246 def set_encryption_config(cls
, encryption_type
, kms_provider
, auth_method
,
247 secret_engine
, secret_path
, namespace
, address
,
248 token
, daemon_name
, ssl_cert
, client_cert
, client_key
):
249 full_daemon_name
= 'rgw.' + daemon_name
250 if encryption_type
== 'aws:kms':
253 ['rgw_crypt_s3_kms_backend', kms_provider
],
254 ['rgw_crypt_vault_auth', auth_method
],
255 ['rgw_crypt_vault_prefix', secret_path
],
256 ['rgw_crypt_vault_namespace', namespace
],
257 ['rgw_crypt_vault_secret_engine', secret_engine
],
258 ['rgw_crypt_vault_addr', address
],
259 ['rgw_crypt_vault_token_file', token
],
260 ['rgw_crypt_vault_ssl_cacert', ssl_cert
],
261 ['rgw_crypt_vault_ssl_clientcert', client_cert
],
262 ['rgw_crypt_vault_ssl_clientkey', client_key
]
265 for (key
, value
) in KMS_CONFIG
:
268 CephService
.send_command('mon', 'config set',
269 who
=name_to_config_section(full_daemon_name
),
270 name
=key
, value
=value
)
272 if encryption_type
== 'AES256':
275 ['rgw_crypt_sse_s3_backend', kms_provider
],
276 ['rgw_crypt_sse_s3_vault_auth', auth_method
],
277 ['rgw_crypt_sse_s3_vault_prefix', secret_path
],
278 ['rgw_crypt_sse_s3_vault_namespace', namespace
],
279 ['rgw_crypt_sse_s3_vault_secret_engine', secret_engine
],
280 ['rgw_crypt_sse_s3_vault_addr', address
],
281 ['rgw_crypt_sse_s3_vault_token_file', token
],
282 ['rgw_crypt_sse_s3_vault_ssl_cacert', ssl_cert
],
283 ['rgw_crypt_sse_s3_vault_ssl_clientcert', client_cert
],
284 ['rgw_crypt_sse_s3_vault_ssl_clientkey', client_key
]
287 for (key
, value
) in SSE_S3_CONFIG
:
290 CephService
.send_command('mon', 'config set',
291 who
=name_to_config_section(full_daemon_name
),
292 name
=key
, value
=value
)
297 def set_multisite_config(cls
, realm_name
, zonegroup_name
, zone_name
, daemon_name
):
298 full_daemon_name
= 'rgw.' + daemon_name
301 ['rgw_realm', realm_name
],
302 ['rgw_zonegroup', zonegroup_name
],
303 ['rgw_zone', zone_name
]
306 for (key
, value
) in KMS_CONFIG
:
309 CephService
.send_command('mon', 'config set',
310 who
=name_to_config_section(full_daemon_name
),
311 name
=key
, value
=value
)
315 def get_realm_tokens(cls
):
316 tokens_info
= mgr
.remote('rgw', 'get_realm_tokens')
320 def import_realm_token(cls
, realm_token
, zone_name
, port
, placement_spec
):
321 tokens_info
= mgr
.remote('rgw', 'import_realm_token', zone_name
=zone_name
,
322 realm_token
=realm_token
, port
=port
, placement
=placement_spec
,
327 def get_pool_pg_status(cls
, pool_name
):
328 # type: (str) -> dict
329 pool
= cls
.get_pool_by_attribute('pool_name', pool_name
)
332 return mgr
.get("pg_summary")['by_pool'][pool
['pool'].__str
__()]
335 def send_command(srv_type
, prefix
, srv_spec
='', to_json
=True, inbuf
='', **kwargs
):
336 # type: (str, str, Optional[str], bool, str, Any) -> Any
339 :param srv_type: mon |
340 :param kwargs: will be added to argdict
341 :param srv_spec: typically empty. or something like "<fs_id>:0"
342 :param to_json: if true return as json format
344 :raises PermissionError: See rados.make_ex
345 :raises ObjectNotFound: See rados.make_ex
346 :raises IOError: See rados.make_ex
347 :raises NoSpace: See rados.make_ex
348 :raises ObjectExists: See rados.make_ex
349 :raises ObjectBusy: See rados.make_ex
350 :raises NoData: See rados.make_ex
351 :raises InterruptedOrTimeoutError: See rados.make_ex
352 :raises TimedOut: See rados.make_ex
353 :raises ValueError: return code != 0
359 argdict
["format"] = "json"
360 argdict
.update({k
: v
for k
, v
in kwargs
.items() if v
is not None})
361 result
= CommandResult("")
362 mgr
.send_command(result
, srv_type
, srv_spec
, json
.dumps(argdict
), "", inbuf
=inbuf
)
363 r
, outb
, outs
= result
.wait()
365 logger
.error("send_command '%s' failed. (r=%s, outs=\"%s\", kwargs=%s)", prefix
, r
,
368 raise SendCommandError(outs
, prefix
, argdict
, r
)
371 return json
.loads(outb
or outs
)
372 except Exception: # pylint: disable=broad-except
376 def _get_smart_data_by_device(device
):
377 # type: (dict) -> Dict[str, dict]
378 # Check whether the device is associated with daemons.
379 if 'daemons' in device
and device
['daemons']:
380 dev_smart_data
: Dict
[str, Any
] = {}
382 # Get a list of all OSD daemons on all hosts that are 'up'
383 # because SMART data can not be retrieved from daemons that
384 # are 'down' or 'destroyed'.
385 osd_tree
= CephService
.send_command('mon', 'osd tree')
387 node
['name'] for node
in osd_tree
.get('nodes', {})
388 if node
.get('status') == 'up'
391 # All daemons on the same host can deliver SMART data,
392 # thus it is not relevant for us which daemon we are using.
393 # NOTE: the list may contain daemons that are 'down' or 'destroyed'.
394 for daemon
in device
['daemons']:
395 svc_type
, svc_id
= daemon
.split('.', 1)
396 if 'osd' in svc_type
:
397 if daemon
not in osd_daemons_up
:
400 dev_smart_data
= CephService
.send_command(
401 svc_type
, 'smart', svc_id
, devid
=device
['devid'])
402 except SendCommandError
as error
:
403 logger
.warning(str(error
))
404 # Try to retrieve SMART data from another daemon.
406 elif 'mon' in svc_type
:
408 dev_smart_data
= CephService
.send_command(
409 svc_type
, 'device query-daemon-health-metrics', who
=daemon
)
410 except SendCommandError
as error
:
411 logger
.warning(str(error
))
412 # Try to retrieve SMART data from another daemon.
417 CephService
.log_dev_data_error(dev_smart_data
)
421 return dev_smart_data
422 logger
.warning('[SMART] No daemons associated with device ID "%s"',
427 def log_dev_data_error(dev_smart_data
):
428 for dev_id
, dev_data
in dev_smart_data
.items():
429 if 'error' in dev_data
:
431 '[SMART] Error retrieving smartctl data for device ID "%s": %s',
435 def get_devices_by_host(hostname
):
436 # type: (str) -> dict
437 return CephService
.send_command('mon',
442 def get_devices_by_daemon(daemon_type
, daemon_id
):
443 # type: (str, str) -> dict
444 return CephService
.send_command('mon',
445 'device ls-by-daemon',
447 daemon_type
, daemon_id
))
450 def get_smart_data_by_host(hostname
):
451 # type: (str) -> dict
453 Get the SMART data of all devices on the given host, regardless
454 of the daemon (osd, mon, ...).
455 :param hostname: The name of the host.
456 :return: A dictionary containing the SMART data of every device
457 on the given host. The device name is used as the key in the
460 devices
= CephService
.get_devices_by_host(hostname
)
461 smart_data
= {} # type: dict
463 for device
in devices
:
464 if device
['devid'] not in smart_data
:
466 CephService
._get
_smart
_data
_by
_device
(device
))
468 logger
.debug('[SMART] could not retrieve device list from host %s', hostname
)
472 def get_smart_data_by_daemon(daemon_type
, daemon_id
):
473 # type: (str, str) -> Dict[str, dict]
475 Get the SMART data of the devices associated with the given daemon.
476 :param daemon_type: The daemon type, e.g. 'osd' or 'mon'.
477 :param daemon_id: The daemon identifier.
478 :return: A dictionary containing the SMART data of every device
479 associated with the given daemon. The device name is used as the
480 key in the dictionary.
482 devices
= CephService
.get_devices_by_daemon(daemon_type
, daemon_id
)
483 smart_data
= {} # type: Dict[str, dict]
485 for device
in devices
:
486 if device
['devid'] not in smart_data
:
488 CephService
._get
_smart
_data
_by
_device
(device
))
490 msg
= '[SMART] could not retrieve device list from daemon with type %s and ' +\
492 logger
.debug(msg
, daemon_type
, daemon_id
)
496 def get_rates(cls
, svc_type
, svc_name
, path
):
498 :return: the derivative of mgr.get_counter()
499 :rtype: list[tuple[int, float]]"""
500 data
= mgr
.get_counter(svc_type
, svc_name
, path
)[path
]
501 return get_time_series_rates(data
)
504 def get_rate(cls
, svc_type
, svc_name
, path
):
505 """returns most recent rate"""
506 return get_most_recent_rate(cls
.get_rates(svc_type
, svc_name
, path
))
509 def get_client_perf(cls
):
510 pools_stats
= mgr
.get('osd_pool_stats')['pool_stats']
514 'read_op_per_sec': 0,
515 'write_bytes_sec': 0,
516 'write_op_per_sec': 0,
518 recovery_stats
= {'recovering_bytes_per_sec': 0}
520 for pool_stats
in pools_stats
:
521 client_io
= pool_stats
['client_io_rate']
522 for stat
in list(io_stats
.keys()):
523 if stat
in client_io
:
524 io_stats
[stat
] += client_io
[stat
]
526 client_recovery
= pool_stats
['recovery_rate']
527 for stat
in list(recovery_stats
.keys()):
528 if stat
in client_recovery
:
529 recovery_stats
[stat
] += client_recovery
[stat
]
531 client_perf
= io_stats
.copy()
532 client_perf
.update(recovery_stats
)
537 def get_scrub_status(cls
):
538 enabled_flags
= mgr
.get('osd_map')['flags_set']
539 if cls
.OSD_FLAG_NO_SCRUB
in enabled_flags
or cls
.OSD_FLAG_NO_DEEP_SCRUB
in enabled_flags
:
540 return cls
.SCRUB_STATUS_DISABLED
542 grouped_pg_statuses
= mgr
.get('pg_summary')['all']
543 for grouped_pg_status
in grouped_pg_statuses
.keys():
544 if len(grouped_pg_status
.split(cls
.PG_STATUS_SCRUBBING
)) > 1 \
545 or len(grouped_pg_status
.split(cls
.PG_STATUS_DEEP_SCRUBBING
)) > 1:
546 return cls
.SCRUB_STATUS_ACTIVE
548 return cls
.SCRUB_STATUS_INACTIVE
551 def get_pg_info(cls
):
552 pg_summary
= mgr
.get('pg_summary')
553 object_stats
= {stat
: pg_summary
['pg_stats_sum']['stat_sum'][stat
] for stat
in [
554 'num_objects', 'num_object_copies', 'num_objects_degraded',
555 'num_objects_misplaced', 'num_objects_unfound']}
558 total_osds
= len(pg_summary
['by_osd'])
561 for _
, osd_pg_statuses
in pg_summary
['by_osd'].items():
562 for _
, pg_amount
in osd_pg_statuses
.items():
563 total_pgs
+= pg_amount
565 pgs_per_osd
= total_pgs
/ total_osds
568 'object_stats': object_stats
,
569 'statuses': pg_summary
['all'],
570 'pgs_per_osd': pgs_per_osd
,