1 # -*- coding: utf-8 -*-
6 from typing
import Any
, Dict
, List
, Optional
, Union
8 from ceph
.deployment
.drive_group
import DriveGroupSpec
, DriveGroupValidationError
# type: ignore
9 from mgr_util
import get_most_recent_rate
12 from ..exceptions
import DashboardException
13 from ..security
import Scope
14 from ..services
.ceph_service
import CephService
, SendCommandError
15 from ..services
.exception
import handle_orchestrator_error
, handle_send_command_error
16 from ..services
.orchestrator
import OrchClient
, OrchFeature
17 from ..services
.osd
import HostStorageSummary
, OsdDeploymentOptions
18 from ..tools
import str_to_bool
19 from . import APIDoc
, APIRouter
, CreatePermission
, DeletePermission
, Endpoint
, \
20 EndpointDoc
, ReadPermission
, RESTController
, Task
, UIRouter
, \
21 UpdatePermission
, allow_empty_body
22 from ._version
import APIVersion
23 from .orchestrator
import raise_if_no_orchestrator
25 logger
= logging
.getLogger('controllers.osd')
27 SAFE_TO_DESTROY_SCHEMA
= {
28 "safe_to_destroy": ([str], "Is OSD safe to destroy?"),
29 "active": ([int], ""),
30 "missing_stats": ([str], ""),
31 "stored_pgs": ([str], "Stored Pool groups in Osd"),
32 "is_safe_to_destroy": (bool, "Is OSD safe to destroy?")
35 EXPORT_FLAGS_SCHEMA
= {
36 "list_of_flags": ([str], "")
39 EXPORT_INDIV_FLAGS_SCHEMA
= {
40 "added": ([str], "List of added flags"),
41 "removed": ([str], "List of removed flags"),
42 "ids": ([int], "List of updated OSDs")
45 EXPORT_INDIV_FLAGS_GET_SCHEMA
= {
46 "osd": (int, "OSD ID"),
47 "flags": ([str], "List of active flags")
51 class DeploymentOptions
:
54 OsdDeploymentOptions
.COST_CAPACITY
:
55 HostStorageSummary(OsdDeploymentOptions
.COST_CAPACITY
,
56 title
='Cost/Capacity-optimized',
57 desc
='All the available HDDs are selected'),
58 OsdDeploymentOptions
.THROUGHPUT
:
59 HostStorageSummary(OsdDeploymentOptions
.THROUGHPUT
,
60 title
='Throughput-optimized',
61 desc
="HDDs/SSDs are selected for data"
62 "devices and SSDs/NVMes for DB/WAL devices"),
63 OsdDeploymentOptions
.IOPS
:
64 HostStorageSummary(OsdDeploymentOptions
.IOPS
,
65 title
='IOPS-optimized',
66 desc
='All the available NVMes are selected'),
68 self
.recommended_option
= None
72 'options': {k
: v
.as_dict() for k
, v
in self
.options
.items()},
73 'recommended_option': self
.recommended_option
77 predefined_drive_groups
= {
78 OsdDeploymentOptions
.COST_CAPACITY
: {
79 'service_type': 'osd',
80 'service_id': 'cost_capacity',
89 OsdDeploymentOptions
.THROUGHPUT
: {
90 'service_type': 'osd',
91 'service_id': 'throughput_optimized',
103 OsdDeploymentOptions
.IOPS
: {
104 'service_type': 'osd',
105 'service_id': 'iops_optimized',
117 def osd_task(name
, metadata
, wait_for
=2.0):
118 return Task("osd/{}".format(name
), metadata
, wait_for
)
121 @APIRouter('/osd', Scope
.OSD
)
122 @APIDoc('OSD management API', 'OSD')
123 class Osd(RESTController
):
125 osds
= self
.get_osd_map()
127 # Extending by osd stats information
128 for stat
in mgr
.get('osd_stats')['osd_stats']:
129 if stat
['osd'] in osds
:
130 osds
[stat
['osd']]['osd_stats'] = stat
132 # Extending by osd node information
133 nodes
= mgr
.get('osd_map_tree')['nodes']
135 if node
['type'] == 'osd' and node
['id'] in osds
:
136 osds
[node
['id']]['tree'] = node
138 # Extending by osd parent node information
139 for host
in [n
for n
in nodes
if n
['type'] == 'host']:
140 for osd_id
in host
['children']:
141 if osd_id
>= 0 and osd_id
in osds
:
142 osds
[osd_id
]['host'] = host
144 removing_osd_ids
= self
.get_removing_osds()
146 # Extending by osd histogram and orchestrator data
147 for osd_id
, osd
in osds
.items():
149 osd
['stats_history'] = {}
150 osd_spec
= str(osd_id
)
152 continue # pragma: no cover - simple early continue
153 self
.gauge_stats(osd
, osd_spec
)
154 osd
['operational_status'] = self
._get
_operational
_status
(osd_id
, removing_osd_ids
)
155 return list(osds
.values())
158 def gauge_stats(osd
, osd_spec
):
159 for stat
in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
160 prop
= stat
.split('.')[1]
161 rates
= CephService
.get_rates('osd', osd_spec
, stat
)
162 osd
['stats'][prop
] = get_most_recent_rate(rates
)
163 osd
['stats_history'][prop
] = rates
165 for stat
in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
166 osd
['stats'][stat
.split('.')[1]] = mgr
.get_latest('osd', osd_spec
, stat
)
168 @RESTController.Collection('GET', version
=APIVersion
.EXPERIMENTAL
)
171 result
= CephService
.send_command('mon', 'osd dump')
173 'nearfull_ratio': result
['nearfull_ratio'],
174 'full_ratio': result
['full_ratio']
177 def _get_operational_status(self
, osd_id
: int, removing_osd_ids
: Optional
[List
[int]]):
178 if removing_osd_ids
is None:
180 if osd_id
in removing_osd_ids
:
185 def get_removing_osds() -> Optional
[List
[int]]:
186 orch
= OrchClient
.instance()
187 if orch
.available(features
=[OrchFeature
.OSD_GET_REMOVE_STATUS
]):
188 return [osd
.osd_id
for osd
in orch
.osds
.removing_status()]
192 def get_osd_map(svc_id
=None):
193 # type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
195 osd
['id'] = osd
['osd']
199 osd
['osd']: add_id(osd
)
200 for osd
in mgr
.get('osd_map')['osds'] if svc_id
is None or osd
['osd'] == int(svc_id
)
202 return resp
if svc_id
is None else resp
[int(svc_id
)]
205 def _get_smart_data(osd_id
):
206 # type: (str) -> dict
207 """Returns S.M.A.R.T data for the given OSD ID."""
208 logger
.debug('[SMART] retrieving data from OSD with ID %s', osd_id
)
209 return CephService
.get_smart_data_by_daemon('osd', osd_id
)
211 @RESTController.Resource('GET')
212 def smart(self
, svc_id
):
213 # type: (str) -> dict
214 return self
._get
_smart
_data
(svc_id
)
216 @handle_send_command_error('osd')
217 def get(self
, svc_id
):
219 Returns collected data about an OSD.
221 :return: Returns the requested data.
224 'osd_map': self
.get_osd_map(svc_id
),
225 'osd_metadata': mgr
.get_metadata('osd', svc_id
),
226 'operational_status': self
._get
_operational
_status
(int(svc_id
),
227 self
.get_removing_osds())
230 @RESTController.Resource('GET')
231 @handle_send_command_error('osd')
232 def histogram(self
, svc_id
):
233 # type: (int) -> Dict[str, Any]
235 :return: Returns the histogram data.
238 histogram
= CephService
.send_command(
239 'osd', srv_spec
=svc_id
, prefix
='perf histogram dump')
240 except SendCommandError
as e
: # pragma: no cover - the handling is too obvious
241 raise DashboardException(
242 component
='osd', http_status_code
=400, msg
=str(e
))
246 def set(self
, svc_id
, device_class
): # pragma: no cover
247 old_device_class
= CephService
.send_command('mon', 'osd crush get-device-class',
249 old_device_class
= old_device_class
[0]['device_class']
250 if old_device_class
!= device_class
:
251 CephService
.send_command('mon', 'osd crush rm-device-class',
254 CephService
.send_command('mon', 'osd crush set-device-class', **{
255 'class': device_class
,
259 def _check_delete(self
, osd_ids
):
260 # type: (List[str]) -> Dict[str, Any]
262 Check if it's safe to remove OSD(s).
264 :param osd_ids: list of OSD IDs
265 :return: a dictionary contains the following attributes:
266 `safe`: bool, indicate if it's safe to remove OSDs.
267 `message`: str, help message if it's not safe to remove OSDs.
270 health_data
= mgr
.get('health') # type: ignore
271 health
= json
.loads(health_data
['json'])
272 checks
= health
['checks'].keys()
273 unsafe_checks
= set(['OSD_FULL', 'OSD_BACKFILLFULL', 'OSD_NEARFULL'])
274 failed_checks
= checks
& unsafe_checks
275 msg
= 'Removing OSD(s) is not recommended because of these failed health check(s): {}.'.\
276 format(', '.join(failed_checks
)) if failed_checks
else ''
278 'safe': not bool(failed_checks
),
283 @raise_if_no_orchestrator([OrchFeature
.OSD_DELETE
, OrchFeature
.OSD_GET_REMOVE_STATUS
])
284 @handle_orchestrator_error('osd')
285 @osd_task('delete', {'svc_id': '{svc_id}'})
286 def delete(self
, svc_id
, preserve_id
=None, force
=None): # pragma: no cover
288 check
: Union
[Dict
[str, Any
], bool] = False
290 if preserve_id
is not None:
291 replace
= str_to_bool(preserve_id
)
292 if force
is not None:
293 check
= not str_to_bool(force
)
295 raise DashboardException(
296 component
='osd', http_status_code
=400, msg
='Invalid parameter(s)')
297 orch
= OrchClient
.instance()
299 logger
.info('Check for removing osd.%s...', svc_id
)
300 check
= self
._check
_delete
([svc_id
])
301 if not check
['safe']:
302 logger
.error('Unable to remove osd.%s: %s', svc_id
, check
['message'])
303 raise DashboardException(component
='osd', msg
=check
['message'])
305 logger
.info('Start removing osd.%s (replace: %s)...', svc_id
, replace
)
306 orch
.osds
.remove([svc_id
], replace
)
308 removal_osds
= orch
.osds
.removing_status()
309 logger
.info('Current removing OSDs %s', removal_osds
)
310 pending
= [osd
for osd
in removal_osds
if osd
.osd_id
== int(svc_id
)]
313 logger
.info('Wait until osd.%s is removed...', svc_id
)
316 @RESTController.Resource('POST', query_params
=['deep'])
319 def scrub(self
, svc_id
, deep
=False):
320 api_scrub
= "osd deep-scrub" if str_to_bool(deep
) else "osd scrub"
321 CephService
.send_command("mon", api_scrub
, who
=svc_id
)
323 @RESTController.Resource('PUT')
324 @EndpointDoc("Mark OSD flags (out, in, down, lost, ...)",
325 parameters
={'svc_id': (str, 'SVC ID')})
326 def mark(self
, svc_id
, action
):
328 Note: osd must be marked `down` before marking lost.
330 valid_actions
= ['out', 'in', 'down', 'lost']
331 args
= {'srv_type': 'mon', 'prefix': 'osd ' + action
}
332 if action
.lower() in valid_actions
:
334 args
['id'] = int(svc_id
)
335 args
['yes_i_really_mean_it'] = True
337 args
['ids'] = [svc_id
]
339 CephService
.send_command(**args
)
341 logger
.error("Invalid OSD mark action: %s attempted on SVC_ID: %s", action
, svc_id
)
343 @RESTController.Resource('POST')
345 def reweight(self
, svc_id
, weight
):
347 Reweights the OSD temporarily.
349 Note that ‘ceph osd reweight’ is not a persistent setting. When an OSD
350 gets marked out, the osd weight will be set to 0. When it gets marked
351 in again, the weight will be changed to 1.
353 Because of this ‘ceph osd reweight’ is a temporary solution. You should
354 only use it to keep your cluster running while you’re ordering more
357 - Craig Lewis (http://lists.ceph.com/pipermail/ceph-users-ceph.com/2014-June/040967.html)
359 CephService
.send_command(
363 weight
=float(weight
))
365 def _create_predefined_drive_group(self
, data
):
366 orch
= OrchClient
.instance()
367 option
= OsdDeploymentOptions(data
[0]['option'])
368 if option
in list(OsdDeploymentOptions
):
370 predefined_drive_groups
[
371 option
]['encrypted'] = data
[0]['encrypted']
372 orch
.osds
.create([DriveGroupSpec
.from_json(
373 predefined_drive_groups
[option
])])
374 except (ValueError, TypeError, KeyError, DriveGroupValidationError
) as e
:
375 raise DashboardException(e
, component
='osd')
377 def _create_bare(self
, data
):
378 """Create a OSD container that has no associated device.
380 :param data: contain attributes to create a bare OSD.
381 : `uuid`: will be set automatically if the OSD starts up
382 : `svc_id`: the ID is only used if a valid uuid is given.
386 svc_id
= int(data
['svc_id'])
387 except (KeyError, ValueError) as e
:
388 raise DashboardException(e
, component
='osd', http_status_code
=400)
390 result
= CephService
.send_command(
391 'mon', 'osd create', id=svc_id
, uuid
=uuid
)
398 @raise_if_no_orchestrator([OrchFeature
.OSD_CREATE
])
399 @handle_orchestrator_error('osd')
400 def _create_with_drive_groups(self
, drive_groups
):
401 """Create OSDs with DriveGroups."""
402 orch
= OrchClient
.instance()
404 dg_specs
= [DriveGroupSpec
.from_json(dg
) for dg
in drive_groups
]
405 orch
.osds
.create(dg_specs
)
406 except (ValueError, TypeError, DriveGroupValidationError
) as e
:
407 raise DashboardException(e
, component
='osd')
410 @osd_task('create', {'tracking_id': '{tracking_id}'})
411 def create(self
, method
, data
, tracking_id
): # pylint: disable=unused-argument
413 return self
._create
_bare
(data
)
414 if method
== 'drive_groups':
415 return self
._create
_with
_drive
_groups
(data
)
416 if method
== 'predefined':
417 return self
._create
_predefined
_drive
_group
(data
)
418 raise DashboardException(
419 component
='osd', http_status_code
=400, msg
='Unknown method: {}'.format(method
))
421 @RESTController.Resource('POST')
423 def purge(self
, svc_id
):
425 Note: osd must be marked `down` before removal.
427 CephService
.send_command('mon', 'osd purge-actual', id=int(svc_id
),
428 yes_i_really_mean_it
=True)
430 @RESTController.Resource('POST')
432 def destroy(self
, svc_id
):
434 Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but
435 removes cephx keys, config-key data and lockbox keys, rendering data
436 permanently unreadable.
438 The osd must be marked down before being destroyed.
440 CephService
.send_command(
441 'mon', 'osd destroy-actual', id=int(svc_id
), yes_i_really_mean_it
=True)
443 @Endpoint('GET', query_params
=['ids'])
445 @EndpointDoc("Check If OSD is Safe to Destroy",
447 'ids': (str, 'OSD Service Identifier'),
449 responses
={200: SAFE_TO_DESTROY_SCHEMA
})
450 def safe_to_destroy(self
, ids
):
455 ids
= json
.loads(ids
)
456 if isinstance(ids
, list):
457 ids
= list(map(str, ids
))
462 result
= CephService
.send_command(
463 'mon', 'osd safe-to-destroy', ids
=ids
, target
=('mgr', ''))
464 result
['is_safe_to_destroy'] = set(result
['safe_to_destroy']) == set(map(int, ids
))
467 except SendCommandError
as e
:
470 'is_safe_to_destroy': False,
473 @Endpoint('GET', query_params
=['svc_ids'])
475 @raise_if_no_orchestrator()
476 @handle_orchestrator_error('osd')
477 def safe_to_delete(self
, svc_ids
):
481 check
= self
._check
_delete
(svc_ids
)
483 'is_safe_to_delete': check
.get('safe', False),
484 'message': check
.get('message', '')
487 @RESTController.Resource('GET')
488 def devices(self
, svc_id
):
489 # type: (str) -> Union[list, str]
490 devices
: Union
[list, str] = CephService
.send_command(
491 'mon', 'device ls-by-daemon', who
='osd.{}'.format(svc_id
))
492 mgr_map
= mgr
.get('mgr_map')
493 available_modules
= [m
['name'] for m
in mgr_map
['available_modules']]
495 life_expectancy_enabled
= any(
496 item
.startswith('diskprediction_') for item
in available_modules
)
497 for device
in devices
:
498 device
['life_expectancy_enabled'] = life_expectancy_enabled
503 @UIRouter('/osd', Scope
.OSD
)
504 @APIDoc("Dashboard UI helper function; not part of the public API", "OsdUI")
508 @raise_if_no_orchestrator([OrchFeature
.DAEMON_LIST
])
509 @handle_orchestrator_error('host')
510 def deployment_options(self
):
511 orch
= OrchClient
.instance()
515 res
= DeploymentOptions()
517 for inventory_host
in orch
.inventory
.list(hosts
=None, refresh
=True):
518 for device
in inventory_host
.devices
.devices
:
520 if device
.human_readable_type
== 'hdd':
522 # SSDs and NVMe are both counted as 'ssd'
523 # so differentiating nvme using its path
524 elif '/dev/nvme' in device
.path
:
530 res
.options
[OsdDeploymentOptions
.COST_CAPACITY
].available
= True
531 res
.recommended_option
= OsdDeploymentOptions
.COST_CAPACITY
533 res
.options
[OsdDeploymentOptions
.THROUGHPUT
].available
= True
534 res
.recommended_option
= OsdDeploymentOptions
.THROUGHPUT
536 res
.options
[OsdDeploymentOptions
.IOPS
].available
= True
541 @APIRouter('/osd/flags', Scope
.OSD
)
543 class OsdFlagsController(RESTController
):
546 enabled_flags
= mgr
.get('osd_map')['flags_set']
547 if 'pauserd' in enabled_flags
and 'pausewr' in enabled_flags
:
548 # 'pause' is set by calling `ceph osd set pause` and unset by
549 # calling `set osd unset pause`, but `ceph osd dump | jq '.flags'`
550 # will contain 'pauserd,pausewr' if pause is set.
551 # Let's pretend to the API that 'pause' is in fact a proper flag.
552 enabled_flags
= list(
553 set(enabled_flags
) - {'pauserd', 'pausewr'} |
{'pause'})
554 return sorted(enabled_flags
)
557 def _update_flags(action
, flags
, ids
=None):
560 ids
= list(map(str, ids
))
561 CephService
.send_command('mon', 'osd ' + action
, who
=ids
,
562 flags
=','.join(flags
))
565 CephService
.send_command('mon', 'osd ' + action
, '', key
=flag
)
567 @EndpointDoc("Display OSD Flags",
568 responses
={200: EXPORT_FLAGS_SCHEMA
})
570 return self
._osd
_flags
()
572 @EndpointDoc('Sets OSD flags for the entire cluster.',
574 'flags': ([str], 'List of flags to set. The flags `recovery_deletes`, '
575 '`sortbitwise` and `pglog_hardlimit` cannot be unset. '
576 'Additionally `purged_snapshots` cannot even be set.')
578 responses
={200: EXPORT_FLAGS_SCHEMA
})
579 def bulk_set(self
, flags
):
581 The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
582 `purged_snapshots` cannot even be set. It is therefore required to at
583 least include those four flags for a successful operation.
585 assert isinstance(flags
, list)
587 enabled_flags
= set(self
._osd
_flags
())
589 added
= data
- enabled_flags
590 removed
= enabled_flags
- data
592 self
._update
_flags
('set', added
)
593 self
._update
_flags
('unset', removed
)
595 logger
.info('Changed OSD flags: added=%s removed=%s', added
, removed
)
597 return sorted(enabled_flags
- removed | added
)
599 @Endpoint('PUT', 'individual')
601 @EndpointDoc('Sets OSD flags for a subset of individual OSDs.',
603 'flags': ({'noout': (bool, 'Sets/unsets `noout`', True, None),
604 'noin': (bool, 'Sets/unsets `noin`', True, None),
605 'noup': (bool, 'Sets/unsets `noup`', True, None),
606 'nodown': (bool, 'Sets/unsets `nodown`', True, None)},
607 'Directory of flags to set or unset. The flags '
608 '`noin`, `noout`, `noup` and `nodown` are going to '
609 'be considered only.'),
610 'ids': ([int], 'List of OSD ids the flags should be applied '
613 responses
={200: EXPORT_INDIV_FLAGS_SCHEMA
})
614 def set_individual(self
, flags
, ids
):
616 Updates flags (`noout`, `noin`, `nodown`, `noup`) for an individual
619 assert isinstance(flags
, dict)
620 assert isinstance(ids
, list)
621 assert all(isinstance(id, int) for id in ids
)
623 # These are to only flags that can be applied to an OSD individually.
624 all_flags
= {'noin', 'noout', 'nodown', 'noup'}
627 for flag
, activated
in flags
.items():
628 if flag
in all_flags
:
629 if activated
is not None:
635 self
._update
_flags
('set-group', added
, ids
)
636 self
._update
_flags
('unset-group', removed
, ids
)
638 logger
.error('Changed individual OSD flags: added=%s removed=%s for ids=%s',
641 return {'added': sorted(added
),
642 'removed': sorted(removed
),
645 @Endpoint('GET', 'individual')
647 @EndpointDoc('Displays individual OSD flags',
648 responses
={200: EXPORT_INDIV_FLAGS_GET_SCHEMA
})
649 def get_individual(self
):
650 osd_map
= mgr
.get('osd_map')['osds']
656 'flags': osd
['state']