1 # -*- coding: utf-8 -*-
6 from typing
import Any
, Dict
, List
, Optional
, Union
8 from ceph
.deployment
.drive_group
import DriveGroupSpec
, DriveGroupValidationError
# type: ignore
9 from mgr_util
import get_most_recent_rate
12 from ..exceptions
import DashboardException
13 from ..security
import Scope
14 from ..services
.ceph_service
import CephService
, SendCommandError
15 from ..services
.exception
import handle_orchestrator_error
, handle_send_command_error
16 from ..services
.orchestrator
import OrchClient
, OrchFeature
17 from ..services
.osd
import HostStorageSummary
, OsdDeploymentOptions
18 from ..tools
import str_to_bool
19 from . import APIDoc
, APIRouter
, CreatePermission
, DeletePermission
, Endpoint
, \
20 EndpointDoc
, ReadPermission
, RESTController
, Task
, UIRouter
, \
21 UpdatePermission
, allow_empty_body
22 from ._version
import APIVersion
23 from .orchestrator
import raise_if_no_orchestrator
25 logger
= logging
.getLogger('controllers.osd')
27 SAFE_TO_DESTROY_SCHEMA
= {
28 "safe_to_destroy": ([str], "Is OSD safe to destroy?"),
29 "active": ([int], ""),
30 "missing_stats": ([str], ""),
31 "stored_pgs": ([str], "Stored Pool groups in Osd"),
32 "is_safe_to_destroy": (bool, "Is OSD safe to destroy?")
35 EXPORT_FLAGS_SCHEMA
= {
36 "list_of_flags": ([str], "")
39 EXPORT_INDIV_FLAGS_SCHEMA
= {
40 "added": ([str], "List of added flags"),
41 "removed": ([str], "List of removed flags"),
42 "ids": ([int], "List of updated OSDs")
45 EXPORT_INDIV_FLAGS_GET_SCHEMA
= {
46 "osd": (int, "OSD ID"),
47 "flags": ([str], "List of active flags")
51 class DeploymentOptions
:
54 OsdDeploymentOptions
.COST_CAPACITY
:
55 HostStorageSummary(OsdDeploymentOptions
.COST_CAPACITY
,
56 title
='Cost/Capacity-optimized',
57 desc
='All the available HDDs are selected'),
58 OsdDeploymentOptions
.THROUGHPUT
:
59 HostStorageSummary(OsdDeploymentOptions
.THROUGHPUT
,
60 title
='Throughput-optimized',
61 desc
="HDDs/SSDs are selected for data"
62 "devices and SSDs/NVMes for DB/WAL devices"),
63 OsdDeploymentOptions
.IOPS
:
64 HostStorageSummary(OsdDeploymentOptions
.IOPS
,
65 title
='IOPS-optimized',
66 desc
='All the available NVMes are selected'),
68 self
.recommended_option
= None
72 'options': {k
: v
.as_dict() for k
, v
in self
.options
.items()},
73 'recommended_option': self
.recommended_option
77 predefined_drive_groups
= {
78 OsdDeploymentOptions
.COST_CAPACITY
: {
79 'service_type': 'osd',
80 'service_id': 'cost_capacity',
89 OsdDeploymentOptions
.THROUGHPUT
: {
90 'service_type': 'osd',
91 'service_id': 'throughput_optimized',
103 OsdDeploymentOptions
.IOPS
: {
104 'service_type': 'osd',
105 'service_id': 'iops_optimized',
117 def osd_task(name
, metadata
, wait_for
=2.0):
118 return Task("osd/{}".format(name
), metadata
, wait_for
)
121 @APIRouter('/osd', Scope
.OSD
)
122 @APIDoc('OSD management API', 'OSD')
123 class Osd(RESTController
):
125 osds
= self
.get_osd_map()
127 # Extending by osd stats information
128 for stat
in mgr
.get('osd_stats')['osd_stats']:
129 if stat
['osd'] in osds
:
130 osds
[stat
['osd']]['osd_stats'] = stat
132 # Extending by osd node information
133 nodes
= mgr
.get('osd_map_tree')['nodes']
135 if node
['type'] == 'osd' and node
['id'] in osds
:
136 osds
[node
['id']]['tree'] = node
138 # Extending by osd parent node information
139 for host
in [n
for n
in nodes
if n
['type'] == 'host']:
140 for osd_id
in host
['children']:
141 if osd_id
>= 0 and osd_id
in osds
:
142 osds
[osd_id
]['host'] = host
144 removing_osd_ids
= self
.get_removing_osds()
146 # Extending by osd histogram and orchestrator data
147 for osd_id
, osd
in osds
.items():
149 osd
['stats_history'] = {}
150 osd_spec
= str(osd_id
)
152 continue # pragma: no cover - simple early continue
153 for stat
in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
154 prop
= stat
.split('.')[1]
155 rates
= CephService
.get_rates('osd', osd_spec
, stat
)
156 osd
['stats'][prop
] = get_most_recent_rate(rates
)
157 osd
['stats_history'][prop
] = rates
159 for stat
in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
160 osd
['stats'][stat
.split('.')[1]] = mgr
.get_latest('osd', osd_spec
, stat
)
161 osd
['operational_status'] = self
._get
_operational
_status
(osd_id
, removing_osd_ids
)
162 return list(osds
.values())
164 @RESTController.Collection('GET', version
=APIVersion
.EXPERIMENTAL
)
167 result
= CephService
.send_command('mon', 'osd dump')
169 'nearfull_ratio': result
['nearfull_ratio'],
170 'full_ratio': result
['full_ratio']
173 def _get_operational_status(self
, osd_id
: int, removing_osd_ids
: Optional
[List
[int]]):
174 if removing_osd_ids
is None:
176 if osd_id
in removing_osd_ids
:
181 def get_removing_osds() -> Optional
[List
[int]]:
182 orch
= OrchClient
.instance()
183 if orch
.available(features
=[OrchFeature
.OSD_GET_REMOVE_STATUS
]):
184 return [osd
.osd_id
for osd
in orch
.osds
.removing_status()]
188 def get_osd_map(svc_id
=None):
189 # type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
191 osd
['id'] = osd
['osd']
195 osd
['osd']: add_id(osd
)
196 for osd
in mgr
.get('osd_map')['osds'] if svc_id
is None or osd
['osd'] == int(svc_id
)
198 return resp
if svc_id
is None else resp
[int(svc_id
)]
201 def _get_smart_data(osd_id
):
202 # type: (str) -> dict
203 """Returns S.M.A.R.T data for the given OSD ID."""
204 logger
.debug('[SMART] retrieving data from OSD with ID %s', osd_id
)
205 return CephService
.get_smart_data_by_daemon('osd', osd_id
)
207 @RESTController.Resource('GET')
208 def smart(self
, svc_id
):
209 # type: (str) -> dict
210 return self
._get
_smart
_data
(svc_id
)
212 @handle_send_command_error('osd')
213 def get(self
, svc_id
):
215 Returns collected data about an OSD.
217 :return: Returns the requested data.
220 'osd_map': self
.get_osd_map(svc_id
),
221 'osd_metadata': mgr
.get_metadata('osd', svc_id
),
222 'operational_status': self
._get
_operational
_status
(int(svc_id
),
223 self
.get_removing_osds())
226 @RESTController.Resource('GET')
227 @handle_send_command_error('osd')
228 def histogram(self
, svc_id
):
229 # type: (int) -> Dict[str, Any]
231 :return: Returns the histogram data.
234 histogram
= CephService
.send_command(
235 'osd', srv_spec
=svc_id
, prefix
='perf histogram dump')
236 except SendCommandError
as e
: # pragma: no cover - the handling is too obvious
237 raise DashboardException(
238 component
='osd', http_status_code
=400, msg
=str(e
))
242 def set(self
, svc_id
, device_class
): # pragma: no cover
243 old_device_class
= CephService
.send_command('mon', 'osd crush get-device-class',
245 old_device_class
= old_device_class
[0]['device_class']
246 if old_device_class
!= device_class
:
247 CephService
.send_command('mon', 'osd crush rm-device-class',
250 CephService
.send_command('mon', 'osd crush set-device-class', **{
251 'class': device_class
,
255 def _check_delete(self
, osd_ids
):
256 # type: (List[str]) -> Dict[str, Any]
258 Check if it's safe to remove OSD(s).
260 :param osd_ids: list of OSD IDs
261 :return: a dictionary contains the following attributes:
262 `safe`: bool, indicate if it's safe to remove OSDs.
263 `message`: str, help message if it's not safe to remove OSDs.
266 health_data
= mgr
.get('health') # type: ignore
267 health
= json
.loads(health_data
['json'])
268 checks
= health
['checks'].keys()
269 unsafe_checks
= set(['OSD_FULL', 'OSD_BACKFILLFULL', 'OSD_NEARFULL'])
270 failed_checks
= checks
& unsafe_checks
271 msg
= 'Removing OSD(s) is not recommended because of these failed health check(s): {}.'.\
272 format(', '.join(failed_checks
)) if failed_checks
else ''
274 'safe': not bool(failed_checks
),
279 @raise_if_no_orchestrator([OrchFeature
.OSD_DELETE
, OrchFeature
.OSD_GET_REMOVE_STATUS
])
280 @handle_orchestrator_error('osd')
281 @osd_task('delete', {'svc_id': '{svc_id}'})
282 def delete(self
, svc_id
, preserve_id
=None, force
=None): # pragma: no cover
284 check
: Union
[Dict
[str, Any
], bool] = False
286 if preserve_id
is not None:
287 replace
= str_to_bool(preserve_id
)
288 if force
is not None:
289 check
= not str_to_bool(force
)
291 raise DashboardException(
292 component
='osd', http_status_code
=400, msg
='Invalid parameter(s)')
293 orch
= OrchClient
.instance()
295 logger
.info('Check for removing osd.%s...', svc_id
)
296 check
= self
._check
_delete
([svc_id
])
297 if not check
['safe']:
298 logger
.error('Unable to remove osd.%s: %s', svc_id
, check
['message'])
299 raise DashboardException(component
='osd', msg
=check
['message'])
301 logger
.info('Start removing osd.%s (replace: %s)...', svc_id
, replace
)
302 orch
.osds
.remove([svc_id
], replace
)
304 removal_osds
= orch
.osds
.removing_status()
305 logger
.info('Current removing OSDs %s', removal_osds
)
306 pending
= [osd
for osd
in removal_osds
if osd
.osd_id
== int(svc_id
)]
309 logger
.info('Wait until osd.%s is removed...', svc_id
)
312 @RESTController.Resource('POST', query_params
=['deep'])
315 def scrub(self
, svc_id
, deep
=False):
316 api_scrub
= "osd deep-scrub" if str_to_bool(deep
) else "osd scrub"
317 CephService
.send_command("mon", api_scrub
, who
=svc_id
)
319 @RESTController.Resource('PUT')
320 @EndpointDoc("Mark OSD flags (out, in, down, lost, ...)",
321 parameters
={'svc_id': (str, 'SVC ID')})
322 def mark(self
, svc_id
, action
):
324 Note: osd must be marked `down` before marking lost.
326 valid_actions
= ['out', 'in', 'down', 'lost']
327 args
= {'srv_type': 'mon', 'prefix': 'osd ' + action
}
328 if action
.lower() in valid_actions
:
330 args
['id'] = int(svc_id
)
331 args
['yes_i_really_mean_it'] = True
333 args
['ids'] = [svc_id
]
335 CephService
.send_command(**args
)
337 logger
.error("Invalid OSD mark action: %s attempted on SVC_ID: %s", action
, svc_id
)
339 @RESTController.Resource('POST')
341 def reweight(self
, svc_id
, weight
):
343 Reweights the OSD temporarily.
345 Note that ‘ceph osd reweight’ is not a persistent setting. When an OSD
346 gets marked out, the osd weight will be set to 0. When it gets marked
347 in again, the weight will be changed to 1.
349 Because of this ‘ceph osd reweight’ is a temporary solution. You should
350 only use it to keep your cluster running while you’re ordering more
353 - Craig Lewis (http://lists.ceph.com/pipermail/ceph-users-ceph.com/2014-June/040967.html)
355 CephService
.send_command(
359 weight
=float(weight
))
361 def _create_predefined_drive_group(self
, data
):
362 orch
= OrchClient
.instance()
363 option
= OsdDeploymentOptions(data
[0]['option'])
364 if option
in list(OsdDeploymentOptions
):
366 predefined_drive_groups
[
367 option
]['encrypted'] = data
[0]['encrypted']
368 orch
.osds
.create([DriveGroupSpec
.from_json(
369 predefined_drive_groups
[option
])])
370 except (ValueError, TypeError, DriveGroupValidationError
) as e
:
371 raise DashboardException(e
, component
='osd')
373 def _create_bare(self
, data
):
374 """Create a OSD container that has no associated device.
376 :param data: contain attributes to create a bare OSD.
377 : `uuid`: will be set automatically if the OSD starts up
378 : `svc_id`: the ID is only used if a valid uuid is given.
382 svc_id
= int(data
['svc_id'])
383 except (KeyError, ValueError) as e
:
384 raise DashboardException(e
, component
='osd', http_status_code
=400)
386 result
= CephService
.send_command(
387 'mon', 'osd create', id=svc_id
, uuid
=uuid
)
394 @raise_if_no_orchestrator([OrchFeature
.OSD_CREATE
])
395 @handle_orchestrator_error('osd')
396 def _create_with_drive_groups(self
, drive_groups
):
397 """Create OSDs with DriveGroups."""
398 orch
= OrchClient
.instance()
400 dg_specs
= [DriveGroupSpec
.from_json(dg
) for dg
in drive_groups
]
401 orch
.osds
.create(dg_specs
)
402 except (ValueError, TypeError, DriveGroupValidationError
) as e
:
403 raise DashboardException(e
, component
='osd')
406 @osd_task('create', {'tracking_id': '{tracking_id}'})
407 def create(self
, method
, data
, tracking_id
): # pylint: disable=unused-argument
409 return self
._create
_bare
(data
)
410 if method
== 'drive_groups':
411 return self
._create
_with
_drive
_groups
(data
)
412 if method
== 'predefined':
413 return self
._create
_predefined
_drive
_group
(data
)
414 raise DashboardException(
415 component
='osd', http_status_code
=400, msg
='Unknown method: {}'.format(method
))
417 @RESTController.Resource('POST')
419 def purge(self
, svc_id
):
421 Note: osd must be marked `down` before removal.
423 CephService
.send_command('mon', 'osd purge-actual', id=int(svc_id
),
424 yes_i_really_mean_it
=True)
426 @RESTController.Resource('POST')
428 def destroy(self
, svc_id
):
430 Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but
431 removes cephx keys, config-key data and lockbox keys, rendering data
432 permanently unreadable.
434 The osd must be marked down before being destroyed.
436 CephService
.send_command(
437 'mon', 'osd destroy-actual', id=int(svc_id
), yes_i_really_mean_it
=True)
439 @Endpoint('GET', query_params
=['ids'])
441 @EndpointDoc("Check If OSD is Safe to Destroy",
443 'ids': (str, 'OSD Service Identifier'),
445 responses
={200: SAFE_TO_DESTROY_SCHEMA
})
446 def safe_to_destroy(self
, ids
):
451 ids
= json
.loads(ids
)
452 if isinstance(ids
, list):
453 ids
= list(map(str, ids
))
458 result
= CephService
.send_command(
459 'mon', 'osd safe-to-destroy', ids
=ids
, target
=('mgr', ''))
460 result
['is_safe_to_destroy'] = set(result
['safe_to_destroy']) == set(map(int, ids
))
463 except SendCommandError
as e
:
466 'is_safe_to_destroy': False,
469 @Endpoint('GET', query_params
=['svc_ids'])
471 @raise_if_no_orchestrator()
472 @handle_orchestrator_error('osd')
473 def safe_to_delete(self
, svc_ids
):
477 check
= self
._check
_delete
(svc_ids
)
479 'is_safe_to_delete': check
.get('safe', False),
480 'message': check
.get('message', '')
483 @RESTController.Resource('GET')
484 def devices(self
, svc_id
):
486 return CephService
.send_command('mon', 'device ls-by-daemon', who
='osd.{}'.format(svc_id
))
489 @UIRouter('/osd', Scope
.OSD
)
490 @APIDoc("Dashboard UI helper function; not part of the public API", "OsdUI")
494 @raise_if_no_orchestrator([OrchFeature
.DAEMON_LIST
])
495 @handle_orchestrator_error('host')
496 def deployment_options(self
):
497 orch
= OrchClient
.instance()
501 res
= DeploymentOptions()
503 for inventory_host
in orch
.inventory
.list(hosts
=None, refresh
=True):
504 for device
in inventory_host
.devices
.devices
:
506 if device
.human_readable_type
== 'hdd':
508 # SSDs and NVMe are both counted as 'ssd'
509 # so differentiating nvme using its path
510 elif '/dev/nvme' in device
.path
:
516 res
.options
[OsdDeploymentOptions
.COST_CAPACITY
].available
= True
517 res
.recommended_option
= OsdDeploymentOptions
.COST_CAPACITY
519 res
.options
[OsdDeploymentOptions
.THROUGHPUT
].available
= True
520 res
.recommended_option
= OsdDeploymentOptions
.THROUGHPUT
522 res
.options
[OsdDeploymentOptions
.IOPS
].available
= True
527 @APIRouter('/osd/flags', Scope
.OSD
)
529 class OsdFlagsController(RESTController
):
532 enabled_flags
= mgr
.get('osd_map')['flags_set']
533 if 'pauserd' in enabled_flags
and 'pausewr' in enabled_flags
:
534 # 'pause' is set by calling `ceph osd set pause` and unset by
535 # calling `set osd unset pause`, but `ceph osd dump | jq '.flags'`
536 # will contain 'pauserd,pausewr' if pause is set.
537 # Let's pretend to the API that 'pause' is in fact a proper flag.
538 enabled_flags
= list(
539 set(enabled_flags
) - {'pauserd', 'pausewr'} |
{'pause'})
540 return sorted(enabled_flags
)
543 def _update_flags(action
, flags
, ids
=None):
546 ids
= list(map(str, ids
))
547 CephService
.send_command('mon', 'osd ' + action
, who
=ids
,
548 flags
=','.join(flags
))
551 CephService
.send_command('mon', 'osd ' + action
, '', key
=flag
)
553 @EndpointDoc("Display OSD Flags",
554 responses
={200: EXPORT_FLAGS_SCHEMA
})
556 return self
._osd
_flags
()
558 @EndpointDoc('Sets OSD flags for the entire cluster.',
560 'flags': ([str], 'List of flags to set. The flags `recovery_deletes`, '
561 '`sortbitwise` and `pglog_hardlimit` cannot be unset. '
562 'Additionally `purged_snapshots` cannot even be set.')
564 responses
={200: EXPORT_FLAGS_SCHEMA
})
565 def bulk_set(self
, flags
):
567 The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
568 `purged_snapshots` cannot even be set. It is therefore required to at
569 least include those four flags for a successful operation.
571 assert isinstance(flags
, list)
573 enabled_flags
= set(self
._osd
_flags
())
575 added
= data
- enabled_flags
576 removed
= enabled_flags
- data
578 self
._update
_flags
('set', added
)
579 self
._update
_flags
('unset', removed
)
581 logger
.info('Changed OSD flags: added=%s removed=%s', added
, removed
)
583 return sorted(enabled_flags
- removed | added
)
585 @Endpoint('PUT', 'individual')
587 @EndpointDoc('Sets OSD flags for a subset of individual OSDs.',
589 'flags': ({'noout': (bool, 'Sets/unsets `noout`', True, None),
590 'noin': (bool, 'Sets/unsets `noin`', True, None),
591 'noup': (bool, 'Sets/unsets `noup`', True, None),
592 'nodown': (bool, 'Sets/unsets `nodown`', True, None)},
593 'Directory of flags to set or unset. The flags '
594 '`noin`, `noout`, `noup` and `nodown` are going to '
595 'be considered only.'),
596 'ids': ([int], 'List of OSD ids the flags should be applied '
599 responses
={200: EXPORT_INDIV_FLAGS_SCHEMA
})
600 def set_individual(self
, flags
, ids
):
602 Updates flags (`noout`, `noin`, `nodown`, `noup`) for an individual
605 assert isinstance(flags
, dict)
606 assert isinstance(ids
, list)
607 assert all(isinstance(id, int) for id in ids
)
609 # These are to only flags that can be applied to an OSD individually.
610 all_flags
= {'noin', 'noout', 'nodown', 'noup'}
613 for flag
, activated
in flags
.items():
614 if flag
in all_flags
:
615 if activated
is not None:
621 self
._update
_flags
('set-group', added
, ids
)
622 self
._update
_flags
('unset-group', removed
, ids
)
624 logger
.error('Changed individual OSD flags: added=%s removed=%s for ids=%s',
627 return {'added': sorted(added
),
628 'removed': sorted(removed
),
631 @Endpoint('GET', 'individual')
633 @EndpointDoc('Displays individual OSD flags',
634 responses
={200: EXPORT_INDIV_FLAGS_GET_SCHEMA
})
635 def get_individual(self
):
636 osd_map
= mgr
.get('osd_map')['osds']
642 'flags': osd
['state']