]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/controllers/osd.py
import ceph quincy 17.2.4
[ceph.git] / ceph / src / pybind / mgr / dashboard / controllers / osd.py
1 # -*- coding: utf-8 -*-
2
3 import json
4 import logging
5 import time
6 from typing import Any, Dict, List, Optional, Union
7
8 from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupValidationError # type: ignore
9 from mgr_util import get_most_recent_rate
10
11 from .. import mgr
12 from ..exceptions import DashboardException
13 from ..security import Scope
14 from ..services.ceph_service import CephService, SendCommandError
15 from ..services.exception import handle_orchestrator_error, handle_send_command_error
16 from ..services.orchestrator import OrchClient, OrchFeature
17 from ..services.osd import HostStorageSummary, OsdDeploymentOptions
18 from ..tools import str_to_bool
19 from . import APIDoc, APIRouter, CreatePermission, DeletePermission, Endpoint, \
20 EndpointDoc, ReadPermission, RESTController, Task, UIRouter, \
21 UpdatePermission, allow_empty_body
22 from ._version import APIVersion
23 from .orchestrator import raise_if_no_orchestrator
24
25 logger = logging.getLogger('controllers.osd')
26
27 SAFE_TO_DESTROY_SCHEMA = {
28 "safe_to_destroy": ([str], "Is OSD safe to destroy?"),
29 "active": ([int], ""),
30 "missing_stats": ([str], ""),
31 "stored_pgs": ([str], "Stored Pool groups in Osd"),
32 "is_safe_to_destroy": (bool, "Is OSD safe to destroy?")
33 }
34
35 EXPORT_FLAGS_SCHEMA = {
36 "list_of_flags": ([str], "")
37 }
38
39 EXPORT_INDIV_FLAGS_SCHEMA = {
40 "added": ([str], "List of added flags"),
41 "removed": ([str], "List of removed flags"),
42 "ids": ([int], "List of updated OSDs")
43 }
44
45 EXPORT_INDIV_FLAGS_GET_SCHEMA = {
46 "osd": (int, "OSD ID"),
47 "flags": ([str], "List of active flags")
48 }
49
50
51 class DeploymentOptions:
52 def __init__(self):
53 self.options = {
54 OsdDeploymentOptions.COST_CAPACITY:
55 HostStorageSummary(OsdDeploymentOptions.COST_CAPACITY,
56 title='Cost/Capacity-optimized',
57 desc='All the available HDDs are selected'),
58 OsdDeploymentOptions.THROUGHPUT:
59 HostStorageSummary(OsdDeploymentOptions.THROUGHPUT,
60 title='Throughput-optimized',
61 desc="HDDs/SSDs are selected for data"
62 "devices and SSDs/NVMes for DB/WAL devices"),
63 OsdDeploymentOptions.IOPS:
64 HostStorageSummary(OsdDeploymentOptions.IOPS,
65 title='IOPS-optimized',
66 desc='All the available NVMes are selected'),
67 }
68 self.recommended_option = None
69
70 def as_dict(self):
71 return {
72 'options': {k: v.as_dict() for k, v in self.options.items()},
73 'recommended_option': self.recommended_option
74 }
75
76
77 predefined_drive_groups = {
78 OsdDeploymentOptions.COST_CAPACITY: {
79 'service_type': 'osd',
80 'service_id': 'cost_capacity',
81 'placement': {
82 'host_pattern': '*'
83 },
84 'data_devices': {
85 'rotational': 1
86 },
87 'encrypted': False
88 },
89 OsdDeploymentOptions.THROUGHPUT: {
90 'service_type': 'osd',
91 'service_id': 'throughput_optimized',
92 'placement': {
93 'host_pattern': '*'
94 },
95 'data_devices': {
96 'rotational': 1
97 },
98 'db_devices': {
99 'rotational': 0
100 },
101 'encrypted': False
102 },
103 OsdDeploymentOptions.IOPS: {
104 'service_type': 'osd',
105 'service_id': 'iops_optimized',
106 'placement': {
107 'host_pattern': '*'
108 },
109 'data_devices': {
110 'rotational': 0
111 },
112 'encrypted': False
113 },
114 }
115
116
117 def osd_task(name, metadata, wait_for=2.0):
118 return Task("osd/{}".format(name), metadata, wait_for)
119
120
121 @APIRouter('/osd', Scope.OSD)
122 @APIDoc('OSD management API', 'OSD')
123 class Osd(RESTController):
124 def list(self):
125 osds = self.get_osd_map()
126
127 # Extending by osd stats information
128 for stat in mgr.get('osd_stats')['osd_stats']:
129 if stat['osd'] in osds:
130 osds[stat['osd']]['osd_stats'] = stat
131
132 # Extending by osd node information
133 nodes = mgr.get('osd_map_tree')['nodes']
134 for node in nodes:
135 if node['type'] == 'osd' and node['id'] in osds:
136 osds[node['id']]['tree'] = node
137
138 # Extending by osd parent node information
139 for host in [n for n in nodes if n['type'] == 'host']:
140 for osd_id in host['children']:
141 if osd_id >= 0 and osd_id in osds:
142 osds[osd_id]['host'] = host
143
144 removing_osd_ids = self.get_removing_osds()
145
146 # Extending by osd histogram and orchestrator data
147 for osd_id, osd in osds.items():
148 osd['stats'] = {}
149 osd['stats_history'] = {}
150 osd_spec = str(osd_id)
151 if 'osd' not in osd:
152 continue # pragma: no cover - simple early continue
153 for stat in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
154 prop = stat.split('.')[1]
155 rates = CephService.get_rates('osd', osd_spec, stat)
156 osd['stats'][prop] = get_most_recent_rate(rates)
157 osd['stats_history'][prop] = rates
158 # Gauge stats
159 for stat in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
160 osd['stats'][stat.split('.')[1]] = mgr.get_latest('osd', osd_spec, stat)
161 osd['operational_status'] = self._get_operational_status(osd_id, removing_osd_ids)
162 return list(osds.values())
163
164 @RESTController.Collection('GET', version=APIVersion.EXPERIMENTAL)
165 @ReadPermission
166 def settings(self):
167 result = CephService.send_command('mon', 'osd dump')
168 return {
169 'nearfull_ratio': result['nearfull_ratio'],
170 'full_ratio': result['full_ratio']
171 }
172
173 def _get_operational_status(self, osd_id: int, removing_osd_ids: Optional[List[int]]):
174 if removing_osd_ids is None:
175 return 'unmanaged'
176 if osd_id in removing_osd_ids:
177 return 'deleting'
178 return 'working'
179
180 @staticmethod
181 def get_removing_osds() -> Optional[List[int]]:
182 orch = OrchClient.instance()
183 if orch.available(features=[OrchFeature.OSD_GET_REMOVE_STATUS]):
184 return [osd.osd_id for osd in orch.osds.removing_status()]
185 return None
186
187 @staticmethod
188 def get_osd_map(svc_id=None):
189 # type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
190 def add_id(osd):
191 osd['id'] = osd['osd']
192 return osd
193
194 resp = {
195 osd['osd']: add_id(osd)
196 for osd in mgr.get('osd_map')['osds'] if svc_id is None or osd['osd'] == int(svc_id)
197 }
198 return resp if svc_id is None else resp[int(svc_id)]
199
200 @staticmethod
201 def _get_smart_data(osd_id):
202 # type: (str) -> dict
203 """Returns S.M.A.R.T data for the given OSD ID."""
204 logger.debug('[SMART] retrieving data from OSD with ID %s', osd_id)
205 return CephService.get_smart_data_by_daemon('osd', osd_id)
206
207 @RESTController.Resource('GET')
208 def smart(self, svc_id):
209 # type: (str) -> dict
210 return self._get_smart_data(svc_id)
211
212 @handle_send_command_error('osd')
213 def get(self, svc_id):
214 """
215 Returns collected data about an OSD.
216
217 :return: Returns the requested data.
218 """
219 return {
220 'osd_map': self.get_osd_map(svc_id),
221 'osd_metadata': mgr.get_metadata('osd', svc_id),
222 'operational_status': self._get_operational_status(int(svc_id),
223 self.get_removing_osds())
224 }
225
226 @RESTController.Resource('GET')
227 @handle_send_command_error('osd')
228 def histogram(self, svc_id):
229 # type: (int) -> Dict[str, Any]
230 """
231 :return: Returns the histogram data.
232 """
233 try:
234 histogram = CephService.send_command(
235 'osd', srv_spec=svc_id, prefix='perf histogram dump')
236 except SendCommandError as e: # pragma: no cover - the handling is too obvious
237 raise DashboardException(
238 component='osd', http_status_code=400, msg=str(e))
239
240 return histogram
241
242 def set(self, svc_id, device_class): # pragma: no cover
243 old_device_class = CephService.send_command('mon', 'osd crush get-device-class',
244 ids=[svc_id])
245 old_device_class = old_device_class[0]['device_class']
246 if old_device_class != device_class:
247 CephService.send_command('mon', 'osd crush rm-device-class',
248 ids=[svc_id])
249 if device_class:
250 CephService.send_command('mon', 'osd crush set-device-class', **{
251 'class': device_class,
252 'ids': [svc_id]
253 })
254
255 def _check_delete(self, osd_ids):
256 # type: (List[str]) -> Dict[str, Any]
257 """
258 Check if it's safe to remove OSD(s).
259
260 :param osd_ids: list of OSD IDs
261 :return: a dictionary contains the following attributes:
262 `safe`: bool, indicate if it's safe to remove OSDs.
263 `message`: str, help message if it's not safe to remove OSDs.
264 """
265 _ = osd_ids
266 health_data = mgr.get('health') # type: ignore
267 health = json.loads(health_data['json'])
268 checks = health['checks'].keys()
269 unsafe_checks = set(['OSD_FULL', 'OSD_BACKFILLFULL', 'OSD_NEARFULL'])
270 failed_checks = checks & unsafe_checks
271 msg = 'Removing OSD(s) is not recommended because of these failed health check(s): {}.'.\
272 format(', '.join(failed_checks)) if failed_checks else ''
273 return {
274 'safe': not bool(failed_checks),
275 'message': msg
276 }
277
278 @DeletePermission
279 @raise_if_no_orchestrator([OrchFeature.OSD_DELETE, OrchFeature.OSD_GET_REMOVE_STATUS])
280 @handle_orchestrator_error('osd')
281 @osd_task('delete', {'svc_id': '{svc_id}'})
282 def delete(self, svc_id, preserve_id=None, force=None): # pragma: no cover
283 replace = False
284 check: Union[Dict[str, Any], bool] = False
285 try:
286 if preserve_id is not None:
287 replace = str_to_bool(preserve_id)
288 if force is not None:
289 check = not str_to_bool(force)
290 except ValueError:
291 raise DashboardException(
292 component='osd', http_status_code=400, msg='Invalid parameter(s)')
293 orch = OrchClient.instance()
294 if check:
295 logger.info('Check for removing osd.%s...', svc_id)
296 check = self._check_delete([svc_id])
297 if not check['safe']:
298 logger.error('Unable to remove osd.%s: %s', svc_id, check['message'])
299 raise DashboardException(component='osd', msg=check['message'])
300
301 logger.info('Start removing osd.%s (replace: %s)...', svc_id, replace)
302 orch.osds.remove([svc_id], replace)
303 while True:
304 removal_osds = orch.osds.removing_status()
305 logger.info('Current removing OSDs %s', removal_osds)
306 pending = [osd for osd in removal_osds if osd.osd_id == int(svc_id)]
307 if not pending:
308 break
309 logger.info('Wait until osd.%s is removed...', svc_id)
310 time.sleep(60)
311
312 @RESTController.Resource('POST', query_params=['deep'])
313 @UpdatePermission
314 @allow_empty_body
315 def scrub(self, svc_id, deep=False):
316 api_scrub = "osd deep-scrub" if str_to_bool(deep) else "osd scrub"
317 CephService.send_command("mon", api_scrub, who=svc_id)
318
319 @RESTController.Resource('PUT')
320 @EndpointDoc("Mark OSD flags (out, in, down, lost, ...)",
321 parameters={'svc_id': (str, 'SVC ID')})
322 def mark(self, svc_id, action):
323 """
324 Note: osd must be marked `down` before marking lost.
325 """
326 valid_actions = ['out', 'in', 'down', 'lost']
327 args = {'srv_type': 'mon', 'prefix': 'osd ' + action}
328 if action.lower() in valid_actions:
329 if action == 'lost':
330 args['id'] = int(svc_id)
331 args['yes_i_really_mean_it'] = True
332 else:
333 args['ids'] = [svc_id]
334
335 CephService.send_command(**args)
336 else:
337 logger.error("Invalid OSD mark action: %s attempted on SVC_ID: %s", action, svc_id)
338
339 @RESTController.Resource('POST')
340 @allow_empty_body
341 def reweight(self, svc_id, weight):
342 """
343 Reweights the OSD temporarily.
344
345 Note that ‘ceph osd reweight’ is not a persistent setting. When an OSD
346 gets marked out, the osd weight will be set to 0. When it gets marked
347 in again, the weight will be changed to 1.
348
349 Because of this ‘ceph osd reweight’ is a temporary solution. You should
350 only use it to keep your cluster running while you’re ordering more
351 hardware.
352
353 - Craig Lewis (http://lists.ceph.com/pipermail/ceph-users-ceph.com/2014-June/040967.html)
354 """
355 CephService.send_command(
356 'mon',
357 'osd reweight',
358 id=int(svc_id),
359 weight=float(weight))
360
361 def _create_predefined_drive_group(self, data):
362 orch = OrchClient.instance()
363 option = OsdDeploymentOptions(data[0]['option'])
364 if option in list(OsdDeploymentOptions):
365 try:
366 predefined_drive_groups[
367 option]['encrypted'] = data[0]['encrypted']
368 orch.osds.create([DriveGroupSpec.from_json(
369 predefined_drive_groups[option])])
370 except (ValueError, TypeError, DriveGroupValidationError) as e:
371 raise DashboardException(e, component='osd')
372
373 def _create_bare(self, data):
374 """Create a OSD container that has no associated device.
375
376 :param data: contain attributes to create a bare OSD.
377 : `uuid`: will be set automatically if the OSD starts up
378 : `svc_id`: the ID is only used if a valid uuid is given.
379 """
380 try:
381 uuid = data['uuid']
382 svc_id = int(data['svc_id'])
383 except (KeyError, ValueError) as e:
384 raise DashboardException(e, component='osd', http_status_code=400)
385
386 result = CephService.send_command(
387 'mon', 'osd create', id=svc_id, uuid=uuid)
388 return {
389 'result': result,
390 'svc_id': svc_id,
391 'uuid': uuid,
392 }
393
394 @raise_if_no_orchestrator([OrchFeature.OSD_CREATE])
395 @handle_orchestrator_error('osd')
396 def _create_with_drive_groups(self, drive_groups):
397 """Create OSDs with DriveGroups."""
398 orch = OrchClient.instance()
399 try:
400 dg_specs = [DriveGroupSpec.from_json(dg) for dg in drive_groups]
401 orch.osds.create(dg_specs)
402 except (ValueError, TypeError, DriveGroupValidationError) as e:
403 raise DashboardException(e, component='osd')
404
405 @CreatePermission
406 @osd_task('create', {'tracking_id': '{tracking_id}'})
407 def create(self, method, data, tracking_id): # pylint: disable=unused-argument
408 if method == 'bare':
409 return self._create_bare(data)
410 if method == 'drive_groups':
411 return self._create_with_drive_groups(data)
412 if method == 'predefined':
413 return self._create_predefined_drive_group(data)
414 raise DashboardException(
415 component='osd', http_status_code=400, msg='Unknown method: {}'.format(method))
416
417 @RESTController.Resource('POST')
418 @allow_empty_body
419 def purge(self, svc_id):
420 """
421 Note: osd must be marked `down` before removal.
422 """
423 CephService.send_command('mon', 'osd purge-actual', id=int(svc_id),
424 yes_i_really_mean_it=True)
425
426 @RESTController.Resource('POST')
427 @allow_empty_body
428 def destroy(self, svc_id):
429 """
430 Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but
431 removes cephx keys, config-key data and lockbox keys, rendering data
432 permanently unreadable.
433
434 The osd must be marked down before being destroyed.
435 """
436 CephService.send_command(
437 'mon', 'osd destroy-actual', id=int(svc_id), yes_i_really_mean_it=True)
438
439 @Endpoint('GET', query_params=['ids'])
440 @ReadPermission
441 @EndpointDoc("Check If OSD is Safe to Destroy",
442 parameters={
443 'ids': (str, 'OSD Service Identifier'),
444 },
445 responses={200: SAFE_TO_DESTROY_SCHEMA})
446 def safe_to_destroy(self, ids):
447 """
448 :type ids: int|[int]
449 """
450
451 ids = json.loads(ids)
452 if isinstance(ids, list):
453 ids = list(map(str, ids))
454 else:
455 ids = [str(ids)]
456
457 try:
458 result = CephService.send_command(
459 'mon', 'osd safe-to-destroy', ids=ids, target=('mgr', ''))
460 result['is_safe_to_destroy'] = set(result['safe_to_destroy']) == set(map(int, ids))
461 return result
462
463 except SendCommandError as e:
464 return {
465 'message': str(e),
466 'is_safe_to_destroy': False,
467 }
468
469 @Endpoint('GET', query_params=['svc_ids'])
470 @ReadPermission
471 @raise_if_no_orchestrator()
472 @handle_orchestrator_error('osd')
473 def safe_to_delete(self, svc_ids):
474 """
475 :type ids: int|[int]
476 """
477 check = self._check_delete(svc_ids)
478 return {
479 'is_safe_to_delete': check.get('safe', False),
480 'message': check.get('message', '')
481 }
482
483 @RESTController.Resource('GET')
484 def devices(self, svc_id):
485 # (str) -> dict
486 return CephService.send_command('mon', 'device ls-by-daemon', who='osd.{}'.format(svc_id))
487
488
489 @UIRouter('/osd', Scope.OSD)
490 @APIDoc("Dashboard UI helper function; not part of the public API", "OsdUI")
491 class OsdUi(Osd):
492 @Endpoint('GET')
493 @ReadPermission
494 @raise_if_no_orchestrator([OrchFeature.DAEMON_LIST])
495 @handle_orchestrator_error('host')
496 def deployment_options(self):
497 orch = OrchClient.instance()
498 hdds = 0
499 ssds = 0
500 nvmes = 0
501 res = DeploymentOptions()
502
503 for inventory_host in orch.inventory.list(hosts=None, refresh=True):
504 for device in inventory_host.devices.devices:
505 if device.available:
506 if device.human_readable_type == 'hdd':
507 hdds += 1
508 # SSDs and NVMe are both counted as 'ssd'
509 # so differentiating nvme using its path
510 elif '/dev/nvme' in device.path:
511 nvmes += 1
512 else:
513 ssds += 1
514
515 if hdds:
516 res.options[OsdDeploymentOptions.COST_CAPACITY].available = True
517 res.recommended_option = OsdDeploymentOptions.COST_CAPACITY
518 if hdds and ssds:
519 res.options[OsdDeploymentOptions.THROUGHPUT].available = True
520 res.recommended_option = OsdDeploymentOptions.THROUGHPUT
521 if nvmes:
522 res.options[OsdDeploymentOptions.IOPS].available = True
523
524 return res.as_dict()
525
526
527 @APIRouter('/osd/flags', Scope.OSD)
528 @APIDoc(group='OSD')
529 class OsdFlagsController(RESTController):
530 @staticmethod
531 def _osd_flags():
532 enabled_flags = mgr.get('osd_map')['flags_set']
533 if 'pauserd' in enabled_flags and 'pausewr' in enabled_flags:
534 # 'pause' is set by calling `ceph osd set pause` and unset by
535 # calling `set osd unset pause`, but `ceph osd dump | jq '.flags'`
536 # will contain 'pauserd,pausewr' if pause is set.
537 # Let's pretend to the API that 'pause' is in fact a proper flag.
538 enabled_flags = list(
539 set(enabled_flags) - {'pauserd', 'pausewr'} | {'pause'})
540 return sorted(enabled_flags)
541
542 @staticmethod
543 def _update_flags(action, flags, ids=None):
544 if ids:
545 if flags:
546 ids = list(map(str, ids))
547 CephService.send_command('mon', 'osd ' + action, who=ids,
548 flags=','.join(flags))
549 else:
550 for flag in flags:
551 CephService.send_command('mon', 'osd ' + action, '', key=flag)
552
553 @EndpointDoc("Display OSD Flags",
554 responses={200: EXPORT_FLAGS_SCHEMA})
555 def list(self):
556 return self._osd_flags()
557
558 @EndpointDoc('Sets OSD flags for the entire cluster.',
559 parameters={
560 'flags': ([str], 'List of flags to set. The flags `recovery_deletes`, '
561 '`sortbitwise` and `pglog_hardlimit` cannot be unset. '
562 'Additionally `purged_snapshots` cannot even be set.')
563 },
564 responses={200: EXPORT_FLAGS_SCHEMA})
565 def bulk_set(self, flags):
566 """
567 The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
568 `purged_snapshots` cannot even be set. It is therefore required to at
569 least include those four flags for a successful operation.
570 """
571 assert isinstance(flags, list)
572
573 enabled_flags = set(self._osd_flags())
574 data = set(flags)
575 added = data - enabled_flags
576 removed = enabled_flags - data
577
578 self._update_flags('set', added)
579 self._update_flags('unset', removed)
580
581 logger.info('Changed OSD flags: added=%s removed=%s', added, removed)
582
583 return sorted(enabled_flags - removed | added)
584
585 @Endpoint('PUT', 'individual')
586 @UpdatePermission
587 @EndpointDoc('Sets OSD flags for a subset of individual OSDs.',
588 parameters={
589 'flags': ({'noout': (bool, 'Sets/unsets `noout`', True, None),
590 'noin': (bool, 'Sets/unsets `noin`', True, None),
591 'noup': (bool, 'Sets/unsets `noup`', True, None),
592 'nodown': (bool, 'Sets/unsets `nodown`', True, None)},
593 'Directory of flags to set or unset. The flags '
594 '`noin`, `noout`, `noup` and `nodown` are going to '
595 'be considered only.'),
596 'ids': ([int], 'List of OSD ids the flags should be applied '
597 'to.')
598 },
599 responses={200: EXPORT_INDIV_FLAGS_SCHEMA})
600 def set_individual(self, flags, ids):
601 """
602 Updates flags (`noout`, `noin`, `nodown`, `noup`) for an individual
603 subset of OSDs.
604 """
605 assert isinstance(flags, dict)
606 assert isinstance(ids, list)
607 assert all(isinstance(id, int) for id in ids)
608
609 # These are to only flags that can be applied to an OSD individually.
610 all_flags = {'noin', 'noout', 'nodown', 'noup'}
611 added = set()
612 removed = set()
613 for flag, activated in flags.items():
614 if flag in all_flags:
615 if activated is not None:
616 if activated:
617 added.add(flag)
618 else:
619 removed.add(flag)
620
621 self._update_flags('set-group', added, ids)
622 self._update_flags('unset-group', removed, ids)
623
624 logger.error('Changed individual OSD flags: added=%s removed=%s for ids=%s',
625 added, removed, ids)
626
627 return {'added': sorted(added),
628 'removed': sorted(removed),
629 'ids': ids}
630
631 @Endpoint('GET', 'individual')
632 @ReadPermission
633 @EndpointDoc('Displays individual OSD flags',
634 responses={200: EXPORT_INDIV_FLAGS_GET_SCHEMA})
635 def get_individual(self):
636 osd_map = mgr.get('osd_map')['osds']
637 resp = []
638
639 for osd in osd_map:
640 resp.append({
641 'osd': osd['osd'],
642 'flags': osd['state']
643 })
644 return resp