]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/controllers/osd.py
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / pybind / mgr / dashboard / controllers / osd.py
1 # -*- coding: utf-8 -*-
2
3 import json
4 import logging
5 import time
6 from typing import Any, Dict, List, Optional, Union
7
8 from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupValidationError # type: ignore
9 from mgr_util import get_most_recent_rate
10
11 from .. import mgr
12 from ..exceptions import DashboardException
13 from ..security import Scope
14 from ..services.ceph_service import CephService, SendCommandError
15 from ..services.exception import handle_orchestrator_error, handle_send_command_error
16 from ..services.orchestrator import OrchClient, OrchFeature
17 from ..services.osd import HostStorageSummary, OsdDeploymentOptions
18 from ..tools import str_to_bool
19 from . import APIDoc, APIRouter, CreatePermission, DeletePermission, Endpoint, \
20 EndpointDoc, ReadPermission, RESTController, Task, UIRouter, \
21 UpdatePermission, allow_empty_body
22 from ._version import APIVersion
23 from .orchestrator import raise_if_no_orchestrator
24
25 logger = logging.getLogger('controllers.osd')
26
27 SAFE_TO_DESTROY_SCHEMA = {
28 "safe_to_destroy": ([str], "Is OSD safe to destroy?"),
29 "active": ([int], ""),
30 "missing_stats": ([str], ""),
31 "stored_pgs": ([str], "Stored Pool groups in Osd"),
32 "is_safe_to_destroy": (bool, "Is OSD safe to destroy?")
33 }
34
35 EXPORT_FLAGS_SCHEMA = {
36 "list_of_flags": ([str], "")
37 }
38
39 EXPORT_INDIV_FLAGS_SCHEMA = {
40 "added": ([str], "List of added flags"),
41 "removed": ([str], "List of removed flags"),
42 "ids": ([int], "List of updated OSDs")
43 }
44
45 EXPORT_INDIV_FLAGS_GET_SCHEMA = {
46 "osd": (int, "OSD ID"),
47 "flags": ([str], "List of active flags")
48 }
49
50
51 class DeploymentOptions:
52 def __init__(self):
53 self.options = {
54 OsdDeploymentOptions.COST_CAPACITY:
55 HostStorageSummary(OsdDeploymentOptions.COST_CAPACITY,
56 title='Cost/Capacity-optimized',
57 desc='All the available HDDs are selected'),
58 OsdDeploymentOptions.THROUGHPUT:
59 HostStorageSummary(OsdDeploymentOptions.THROUGHPUT,
60 title='Throughput-optimized',
61 desc="HDDs/SSDs are selected for data"
62 "devices and SSDs/NVMes for DB/WAL devices"),
63 OsdDeploymentOptions.IOPS:
64 HostStorageSummary(OsdDeploymentOptions.IOPS,
65 title='IOPS-optimized',
66 desc='All the available NVMes are selected'),
67 }
68 self.recommended_option = None
69
70 def as_dict(self):
71 return {
72 'options': {k: v.as_dict() for k, v in self.options.items()},
73 'recommended_option': self.recommended_option
74 }
75
76
77 predefined_drive_groups = {
78 OsdDeploymentOptions.COST_CAPACITY: {
79 'service_type': 'osd',
80 'service_id': 'cost_capacity',
81 'placement': {
82 'host_pattern': '*'
83 },
84 'data_devices': {
85 'rotational': 1
86 },
87 'encrypted': False
88 },
89 OsdDeploymentOptions.THROUGHPUT: {
90 'service_type': 'osd',
91 'service_id': 'throughput_optimized',
92 'placement': {
93 'host_pattern': '*'
94 },
95 'data_devices': {
96 'rotational': 1
97 },
98 'db_devices': {
99 'rotational': 0
100 },
101 'encrypted': False
102 },
103 OsdDeploymentOptions.IOPS: {
104 'service_type': 'osd',
105 'service_id': 'iops_optimized',
106 'placement': {
107 'host_pattern': '*'
108 },
109 'data_devices': {
110 'rotational': 0
111 },
112 'encrypted': False
113 },
114 }
115
116
117 def osd_task(name, metadata, wait_for=2.0):
118 return Task("osd/{}".format(name), metadata, wait_for)
119
120
121 @APIRouter('/osd', Scope.OSD)
122 @APIDoc('OSD management API', 'OSD')
123 class Osd(RESTController):
124 def list(self):
125 osds = self.get_osd_map()
126
127 # Extending by osd stats information
128 for stat in mgr.get('osd_stats')['osd_stats']:
129 if stat['osd'] in osds:
130 osds[stat['osd']]['osd_stats'] = stat
131
132 # Extending by osd node information
133 nodes = mgr.get('osd_map_tree')['nodes']
134 for node in nodes:
135 if node['type'] == 'osd' and node['id'] in osds:
136 osds[node['id']]['tree'] = node
137
138 # Extending by osd parent node information
139 for host in [n for n in nodes if n['type'] == 'host']:
140 for osd_id in host['children']:
141 if osd_id >= 0 and osd_id in osds:
142 osds[osd_id]['host'] = host
143
144 removing_osd_ids = self.get_removing_osds()
145
146 # Extending by osd histogram and orchestrator data
147 for osd_id, osd in osds.items():
148 osd['stats'] = {}
149 osd['stats_history'] = {}
150 osd_spec = str(osd_id)
151 if 'osd' not in osd:
152 continue # pragma: no cover - simple early continue
153 self.gauge_stats(osd, osd_spec)
154 osd['operational_status'] = self._get_operational_status(osd_id, removing_osd_ids)
155 return list(osds.values())
156
157 @staticmethod
158 def gauge_stats(osd, osd_spec):
159 for stat in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
160 prop = stat.split('.')[1]
161 rates = CephService.get_rates('osd', osd_spec, stat)
162 osd['stats'][prop] = get_most_recent_rate(rates)
163 osd['stats_history'][prop] = rates
164 # Gauge stats
165 for stat in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
166 osd['stats'][stat.split('.')[1]] = mgr.get_latest('osd', osd_spec, stat)
167
168 @RESTController.Collection('GET', version=APIVersion.EXPERIMENTAL)
169 @ReadPermission
170 def settings(self):
171 result = CephService.send_command('mon', 'osd dump')
172 return {
173 'nearfull_ratio': result['nearfull_ratio'],
174 'full_ratio': result['full_ratio']
175 }
176
177 def _get_operational_status(self, osd_id: int, removing_osd_ids: Optional[List[int]]):
178 if removing_osd_ids is None:
179 return 'unmanaged'
180 if osd_id in removing_osd_ids:
181 return 'deleting'
182 return 'working'
183
184 @staticmethod
185 def get_removing_osds() -> Optional[List[int]]:
186 orch = OrchClient.instance()
187 if orch.available(features=[OrchFeature.OSD_GET_REMOVE_STATUS]):
188 return [osd.osd_id for osd in orch.osds.removing_status()]
189 return None
190
191 @staticmethod
192 def get_osd_map(svc_id=None):
193 # type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
194 def add_id(osd):
195 osd['id'] = osd['osd']
196 return osd
197
198 resp = {
199 osd['osd']: add_id(osd)
200 for osd in mgr.get('osd_map')['osds'] if svc_id is None or osd['osd'] == int(svc_id)
201 }
202 return resp if svc_id is None else resp[int(svc_id)]
203
204 @staticmethod
205 def _get_smart_data(osd_id):
206 # type: (str) -> dict
207 """Returns S.M.A.R.T data for the given OSD ID."""
208 logger.debug('[SMART] retrieving data from OSD with ID %s', osd_id)
209 return CephService.get_smart_data_by_daemon('osd', osd_id)
210
211 @RESTController.Resource('GET')
212 def smart(self, svc_id):
213 # type: (str) -> dict
214 return self._get_smart_data(svc_id)
215
216 @handle_send_command_error('osd')
217 def get(self, svc_id):
218 """
219 Returns collected data about an OSD.
220
221 :return: Returns the requested data.
222 """
223 return {
224 'osd_map': self.get_osd_map(svc_id),
225 'osd_metadata': mgr.get_metadata('osd', svc_id),
226 'operational_status': self._get_operational_status(int(svc_id),
227 self.get_removing_osds())
228 }
229
230 @RESTController.Resource('GET')
231 @handle_send_command_error('osd')
232 def histogram(self, svc_id):
233 # type: (int) -> Dict[str, Any]
234 """
235 :return: Returns the histogram data.
236 """
237 try:
238 histogram = CephService.send_command(
239 'osd', srv_spec=svc_id, prefix='perf histogram dump')
240 except SendCommandError as e: # pragma: no cover - the handling is too obvious
241 raise DashboardException(
242 component='osd', http_status_code=400, msg=str(e))
243
244 return histogram
245
246 def set(self, svc_id, device_class): # pragma: no cover
247 old_device_class = CephService.send_command('mon', 'osd crush get-device-class',
248 ids=[svc_id])
249 old_device_class = old_device_class[0]['device_class']
250 if old_device_class != device_class:
251 CephService.send_command('mon', 'osd crush rm-device-class',
252 ids=[svc_id])
253 if device_class:
254 CephService.send_command('mon', 'osd crush set-device-class', **{
255 'class': device_class,
256 'ids': [svc_id]
257 })
258
259 def _check_delete(self, osd_ids):
260 # type: (List[str]) -> Dict[str, Any]
261 """
262 Check if it's safe to remove OSD(s).
263
264 :param osd_ids: list of OSD IDs
265 :return: a dictionary contains the following attributes:
266 `safe`: bool, indicate if it's safe to remove OSDs.
267 `message`: str, help message if it's not safe to remove OSDs.
268 """
269 _ = osd_ids
270 health_data = mgr.get('health') # type: ignore
271 health = json.loads(health_data['json'])
272 checks = health['checks'].keys()
273 unsafe_checks = set(['OSD_FULL', 'OSD_BACKFILLFULL', 'OSD_NEARFULL'])
274 failed_checks = checks & unsafe_checks
275 msg = 'Removing OSD(s) is not recommended because of these failed health check(s): {}.'.\
276 format(', '.join(failed_checks)) if failed_checks else ''
277 return {
278 'safe': not bool(failed_checks),
279 'message': msg
280 }
281
282 @DeletePermission
283 @raise_if_no_orchestrator([OrchFeature.OSD_DELETE, OrchFeature.OSD_GET_REMOVE_STATUS])
284 @handle_orchestrator_error('osd')
285 @osd_task('delete', {'svc_id': '{svc_id}'})
286 def delete(self, svc_id, preserve_id=None, force=None): # pragma: no cover
287 replace = False
288 check: Union[Dict[str, Any], bool] = False
289 try:
290 if preserve_id is not None:
291 replace = str_to_bool(preserve_id)
292 if force is not None:
293 check = not str_to_bool(force)
294 except ValueError:
295 raise DashboardException(
296 component='osd', http_status_code=400, msg='Invalid parameter(s)')
297 orch = OrchClient.instance()
298 if check:
299 logger.info('Check for removing osd.%s...', svc_id)
300 check = self._check_delete([svc_id])
301 if not check['safe']:
302 logger.error('Unable to remove osd.%s: %s', svc_id, check['message'])
303 raise DashboardException(component='osd', msg=check['message'])
304
305 logger.info('Start removing osd.%s (replace: %s)...', svc_id, replace)
306 orch.osds.remove([svc_id], replace)
307 while True:
308 removal_osds = orch.osds.removing_status()
309 logger.info('Current removing OSDs %s', removal_osds)
310 pending = [osd for osd in removal_osds if osd.osd_id == int(svc_id)]
311 if not pending:
312 break
313 logger.info('Wait until osd.%s is removed...', svc_id)
314 time.sleep(60)
315
316 @RESTController.Resource('POST', query_params=['deep'])
317 @UpdatePermission
318 @allow_empty_body
319 def scrub(self, svc_id, deep=False):
320 api_scrub = "osd deep-scrub" if str_to_bool(deep) else "osd scrub"
321 CephService.send_command("mon", api_scrub, who=svc_id)
322
323 @RESTController.Resource('PUT')
324 @EndpointDoc("Mark OSD flags (out, in, down, lost, ...)",
325 parameters={'svc_id': (str, 'SVC ID')})
326 def mark(self, svc_id, action):
327 """
328 Note: osd must be marked `down` before marking lost.
329 """
330 valid_actions = ['out', 'in', 'down', 'lost']
331 args = {'srv_type': 'mon', 'prefix': 'osd ' + action}
332 if action.lower() in valid_actions:
333 if action == 'lost':
334 args['id'] = int(svc_id)
335 args['yes_i_really_mean_it'] = True
336 else:
337 args['ids'] = [svc_id]
338
339 CephService.send_command(**args)
340 else:
341 logger.error("Invalid OSD mark action: %s attempted on SVC_ID: %s", action, svc_id)
342
343 @RESTController.Resource('POST')
344 @allow_empty_body
345 def reweight(self, svc_id, weight):
346 """
347 Reweights the OSD temporarily.
348
349 Note that ‘ceph osd reweight’ is not a persistent setting. When an OSD
350 gets marked out, the osd weight will be set to 0. When it gets marked
351 in again, the weight will be changed to 1.
352
353 Because of this ‘ceph osd reweight’ is a temporary solution. You should
354 only use it to keep your cluster running while you’re ordering more
355 hardware.
356
357 - Craig Lewis (http://lists.ceph.com/pipermail/ceph-users-ceph.com/2014-June/040967.html)
358 """
359 CephService.send_command(
360 'mon',
361 'osd reweight',
362 id=int(svc_id),
363 weight=float(weight))
364
365 def _create_predefined_drive_group(self, data):
366 orch = OrchClient.instance()
367 option = OsdDeploymentOptions(data[0]['option'])
368 if option in list(OsdDeploymentOptions):
369 try:
370 predefined_drive_groups[
371 option]['encrypted'] = data[0]['encrypted']
372 orch.osds.create([DriveGroupSpec.from_json(
373 predefined_drive_groups[option])])
374 except (ValueError, TypeError, KeyError, DriveGroupValidationError) as e:
375 raise DashboardException(e, component='osd')
376
377 def _create_bare(self, data):
378 """Create a OSD container that has no associated device.
379
380 :param data: contain attributes to create a bare OSD.
381 : `uuid`: will be set automatically if the OSD starts up
382 : `svc_id`: the ID is only used if a valid uuid is given.
383 """
384 try:
385 uuid = data['uuid']
386 svc_id = int(data['svc_id'])
387 except (KeyError, ValueError) as e:
388 raise DashboardException(e, component='osd', http_status_code=400)
389
390 result = CephService.send_command(
391 'mon', 'osd create', id=svc_id, uuid=uuid)
392 return {
393 'result': result,
394 'svc_id': svc_id,
395 'uuid': uuid,
396 }
397
398 @raise_if_no_orchestrator([OrchFeature.OSD_CREATE])
399 @handle_orchestrator_error('osd')
400 def _create_with_drive_groups(self, drive_groups):
401 """Create OSDs with DriveGroups."""
402 orch = OrchClient.instance()
403 try:
404 dg_specs = [DriveGroupSpec.from_json(dg) for dg in drive_groups]
405 orch.osds.create(dg_specs)
406 except (ValueError, TypeError, DriveGroupValidationError) as e:
407 raise DashboardException(e, component='osd')
408
409 @CreatePermission
410 @osd_task('create', {'tracking_id': '{tracking_id}'})
411 def create(self, method, data, tracking_id): # pylint: disable=unused-argument
412 if method == 'bare':
413 return self._create_bare(data)
414 if method == 'drive_groups':
415 return self._create_with_drive_groups(data)
416 if method == 'predefined':
417 return self._create_predefined_drive_group(data)
418 raise DashboardException(
419 component='osd', http_status_code=400, msg='Unknown method: {}'.format(method))
420
421 @RESTController.Resource('POST')
422 @allow_empty_body
423 def purge(self, svc_id):
424 """
425 Note: osd must be marked `down` before removal.
426 """
427 CephService.send_command('mon', 'osd purge-actual', id=int(svc_id),
428 yes_i_really_mean_it=True)
429
430 @RESTController.Resource('POST')
431 @allow_empty_body
432 def destroy(self, svc_id):
433 """
434 Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but
435 removes cephx keys, config-key data and lockbox keys, rendering data
436 permanently unreadable.
437
438 The osd must be marked down before being destroyed.
439 """
440 CephService.send_command(
441 'mon', 'osd destroy-actual', id=int(svc_id), yes_i_really_mean_it=True)
442
443 @Endpoint('GET', query_params=['ids'])
444 @ReadPermission
445 @EndpointDoc("Check If OSD is Safe to Destroy",
446 parameters={
447 'ids': (str, 'OSD Service Identifier'),
448 },
449 responses={200: SAFE_TO_DESTROY_SCHEMA})
450 def safe_to_destroy(self, ids):
451 """
452 :type ids: int|[int]
453 """
454
455 ids = json.loads(ids)
456 if isinstance(ids, list):
457 ids = list(map(str, ids))
458 else:
459 ids = [str(ids)]
460
461 try:
462 result = CephService.send_command(
463 'mon', 'osd safe-to-destroy', ids=ids, target=('mgr', ''))
464 result['is_safe_to_destroy'] = set(result['safe_to_destroy']) == set(map(int, ids))
465 return result
466
467 except SendCommandError as e:
468 return {
469 'message': str(e),
470 'is_safe_to_destroy': False,
471 }
472
473 @Endpoint('GET', query_params=['svc_ids'])
474 @ReadPermission
475 @raise_if_no_orchestrator()
476 @handle_orchestrator_error('osd')
477 def safe_to_delete(self, svc_ids):
478 """
479 :type ids: int|[int]
480 """
481 check = self._check_delete(svc_ids)
482 return {
483 'is_safe_to_delete': check.get('safe', False),
484 'message': check.get('message', '')
485 }
486
487 @RESTController.Resource('GET')
488 def devices(self, svc_id):
489 # type: (str) -> Union[list, str]
490 devices: Union[list, str] = CephService.send_command(
491 'mon', 'device ls-by-daemon', who='osd.{}'.format(svc_id))
492 mgr_map = mgr.get('mgr_map')
493 available_modules = [m['name'] for m in mgr_map['available_modules']]
494
495 life_expectancy_enabled = any(
496 item.startswith('diskprediction_') for item in available_modules)
497 for device in devices:
498 device['life_expectancy_enabled'] = life_expectancy_enabled
499
500 return devices
501
502
503 @UIRouter('/osd', Scope.OSD)
504 @APIDoc("Dashboard UI helper function; not part of the public API", "OsdUI")
505 class OsdUi(Osd):
506 @Endpoint('GET')
507 @ReadPermission
508 @raise_if_no_orchestrator([OrchFeature.DAEMON_LIST])
509 @handle_orchestrator_error('host')
510 def deployment_options(self):
511 orch = OrchClient.instance()
512 hdds = 0
513 ssds = 0
514 nvmes = 0
515 res = DeploymentOptions()
516
517 for inventory_host in orch.inventory.list(hosts=None, refresh=True):
518 for device in inventory_host.devices.devices:
519 if device.available:
520 if device.human_readable_type == 'hdd':
521 hdds += 1
522 # SSDs and NVMe are both counted as 'ssd'
523 # so differentiating nvme using its path
524 elif '/dev/nvme' in device.path:
525 nvmes += 1
526 else:
527 ssds += 1
528
529 if hdds:
530 res.options[OsdDeploymentOptions.COST_CAPACITY].available = True
531 res.recommended_option = OsdDeploymentOptions.COST_CAPACITY
532 if hdds and ssds:
533 res.options[OsdDeploymentOptions.THROUGHPUT].available = True
534 res.recommended_option = OsdDeploymentOptions.THROUGHPUT
535 if nvmes:
536 res.options[OsdDeploymentOptions.IOPS].available = True
537
538 return res.as_dict()
539
540
541 @APIRouter('/osd/flags', Scope.OSD)
542 @APIDoc(group='OSD')
543 class OsdFlagsController(RESTController):
544 @staticmethod
545 def _osd_flags():
546 enabled_flags = mgr.get('osd_map')['flags_set']
547 if 'pauserd' in enabled_flags and 'pausewr' in enabled_flags:
548 # 'pause' is set by calling `ceph osd set pause` and unset by
549 # calling `set osd unset pause`, but `ceph osd dump | jq '.flags'`
550 # will contain 'pauserd,pausewr' if pause is set.
551 # Let's pretend to the API that 'pause' is in fact a proper flag.
552 enabled_flags = list(
553 set(enabled_flags) - {'pauserd', 'pausewr'} | {'pause'})
554 return sorted(enabled_flags)
555
556 @staticmethod
557 def _update_flags(action, flags, ids=None):
558 if ids:
559 if flags:
560 ids = list(map(str, ids))
561 CephService.send_command('mon', 'osd ' + action, who=ids,
562 flags=','.join(flags))
563 else:
564 for flag in flags:
565 CephService.send_command('mon', 'osd ' + action, '', key=flag)
566
567 @EndpointDoc("Display OSD Flags",
568 responses={200: EXPORT_FLAGS_SCHEMA})
569 def list(self):
570 return self._osd_flags()
571
572 @EndpointDoc('Sets OSD flags for the entire cluster.',
573 parameters={
574 'flags': ([str], 'List of flags to set. The flags `recovery_deletes`, '
575 '`sortbitwise` and `pglog_hardlimit` cannot be unset. '
576 'Additionally `purged_snapshots` cannot even be set.')
577 },
578 responses={200: EXPORT_FLAGS_SCHEMA})
579 def bulk_set(self, flags):
580 """
581 The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
582 `purged_snapshots` cannot even be set. It is therefore required to at
583 least include those four flags for a successful operation.
584 """
585 assert isinstance(flags, list)
586
587 enabled_flags = set(self._osd_flags())
588 data = set(flags)
589 added = data - enabled_flags
590 removed = enabled_flags - data
591
592 self._update_flags('set', added)
593 self._update_flags('unset', removed)
594
595 logger.info('Changed OSD flags: added=%s removed=%s', added, removed)
596
597 return sorted(enabled_flags - removed | added)
598
599 @Endpoint('PUT', 'individual')
600 @UpdatePermission
601 @EndpointDoc('Sets OSD flags for a subset of individual OSDs.',
602 parameters={
603 'flags': ({'noout': (bool, 'Sets/unsets `noout`', True, None),
604 'noin': (bool, 'Sets/unsets `noin`', True, None),
605 'noup': (bool, 'Sets/unsets `noup`', True, None),
606 'nodown': (bool, 'Sets/unsets `nodown`', True, None)},
607 'Directory of flags to set or unset. The flags '
608 '`noin`, `noout`, `noup` and `nodown` are going to '
609 'be considered only.'),
610 'ids': ([int], 'List of OSD ids the flags should be applied '
611 'to.')
612 },
613 responses={200: EXPORT_INDIV_FLAGS_SCHEMA})
614 def set_individual(self, flags, ids):
615 """
616 Updates flags (`noout`, `noin`, `nodown`, `noup`) for an individual
617 subset of OSDs.
618 """
619 assert isinstance(flags, dict)
620 assert isinstance(ids, list)
621 assert all(isinstance(id, int) for id in ids)
622
623 # These are to only flags that can be applied to an OSD individually.
624 all_flags = {'noin', 'noout', 'nodown', 'noup'}
625 added = set()
626 removed = set()
627 for flag, activated in flags.items():
628 if flag in all_flags:
629 if activated is not None:
630 if activated:
631 added.add(flag)
632 else:
633 removed.add(flag)
634
635 self._update_flags('set-group', added, ids)
636 self._update_flags('unset-group', removed, ids)
637
638 logger.error('Changed individual OSD flags: added=%s removed=%s for ids=%s',
639 added, removed, ids)
640
641 return {'added': sorted(added),
642 'removed': sorted(removed),
643 'ids': ids}
644
645 @Endpoint('GET', 'individual')
646 @ReadPermission
647 @EndpointDoc('Displays individual OSD flags',
648 responses={200: EXPORT_INDIV_FLAGS_GET_SCHEMA})
649 def get_individual(self):
650 osd_map = mgr.get('osd_map')['osds']
651 resp = []
652
653 for osd in osd_map:
654 resp.append({
655 'osd': osd['osd'],
656 'flags': osd['state']
657 })
658 return resp