1 # -*- coding: utf-8 -*-
6 from typing
import Any
, Dict
, List
, Optional
, Union
8 from ceph
.deployment
.drive_group
import DriveGroupSpec
, DriveGroupValidationError
# type: ignore
9 from mgr_util
import get_most_recent_rate
12 from ..exceptions
import DashboardException
13 from ..security
import Scope
14 from ..services
.ceph_service
import CephService
, SendCommandError
15 from ..services
.exception
import handle_orchestrator_error
, handle_send_command_error
16 from ..services
.orchestrator
import OrchClient
, OrchFeature
17 from ..tools
import str_to_bool
18 from . import APIDoc
, APIRouter
, CreatePermission
, DeletePermission
, Endpoint
, \
19 EndpointDoc
, ReadPermission
, RESTController
, Task
, UpdatePermission
, \
21 from ._version
import APIVersion
22 from .orchestrator
import raise_if_no_orchestrator
24 logger
= logging
.getLogger('controllers.osd')
26 SAFE_TO_DESTROY_SCHEMA
= {
27 "safe_to_destroy": ([str], "Is OSD safe to destroy?"),
28 "active": ([int], ""),
29 "missing_stats": ([str], ""),
30 "stored_pgs": ([str], "Stored Pool groups in Osd"),
31 "is_safe_to_destroy": (bool, "Is OSD safe to destroy?")
34 EXPORT_FLAGS_SCHEMA
= {
35 "list_of_flags": ([str], "")
38 EXPORT_INDIV_FLAGS_SCHEMA
= {
39 "added": ([str], "List of added flags"),
40 "removed": ([str], "List of removed flags"),
41 "ids": ([int], "List of updated OSDs")
44 EXPORT_INDIV_FLAGS_GET_SCHEMA
= {
45 "osd": (int, "OSD ID"),
46 "flags": ([str], "List of active flags")
50 def osd_task(name
, metadata
, wait_for
=2.0):
51 return Task("osd/{}".format(name
), metadata
, wait_for
)
54 @APIRouter('/osd', Scope
.OSD
)
55 @APIDoc('OSD management API', 'OSD')
56 class Osd(RESTController
):
58 osds
= self
.get_osd_map()
60 # Extending by osd stats information
61 for stat
in mgr
.get('osd_stats')['osd_stats']:
62 if stat
['osd'] in osds
:
63 osds
[stat
['osd']]['osd_stats'] = stat
65 # Extending by osd node information
66 nodes
= mgr
.get('osd_map_tree')['nodes']
68 if node
['type'] == 'osd' and node
['id'] in osds
:
69 osds
[node
['id']]['tree'] = node
71 # Extending by osd parent node information
72 for host
in [n
for n
in nodes
if n
['type'] == 'host']:
73 for osd_id
in host
['children']:
74 if osd_id
>= 0 and osd_id
in osds
:
75 osds
[osd_id
]['host'] = host
77 removing_osd_ids
= self
.get_removing_osds()
79 # Extending by osd histogram and orchestrator data
80 for osd_id
, osd
in osds
.items():
82 osd
['stats_history'] = {}
83 osd_spec
= str(osd_id
)
85 continue # pragma: no cover - simple early continue
86 for stat
in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
87 prop
= stat
.split('.')[1]
88 rates
= CephService
.get_rates('osd', osd_spec
, stat
)
89 osd
['stats'][prop
] = get_most_recent_rate(rates
)
90 osd
['stats_history'][prop
] = rates
92 for stat
in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
93 osd
['stats'][stat
.split('.')[1]] = mgr
.get_latest('osd', osd_spec
, stat
)
94 osd
['operational_status'] = self
._get
_operational
_status
(osd_id
, removing_osd_ids
)
95 return list(osds
.values())
97 @RESTController.Collection('GET', version
=APIVersion
.EXPERIMENTAL
)
100 result
= CephService
.send_command('mon', 'osd dump')
102 'nearfull_ratio': result
['nearfull_ratio'],
103 'full_ratio': result
['full_ratio']
106 def _get_operational_status(self
, osd_id
: int, removing_osd_ids
: Optional
[List
[int]]):
107 if removing_osd_ids
is None:
109 if osd_id
in removing_osd_ids
:
114 def get_removing_osds() -> Optional
[List
[int]]:
115 orch
= OrchClient
.instance()
116 if orch
.available(features
=[OrchFeature
.OSD_GET_REMOVE_STATUS
]):
117 return [osd
.osd_id
for osd
in orch
.osds
.removing_status()]
121 def get_osd_map(svc_id
=None):
122 # type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
124 osd
['id'] = osd
['osd']
128 osd
['osd']: add_id(osd
)
129 for osd
in mgr
.get('osd_map')['osds'] if svc_id
is None or osd
['osd'] == int(svc_id
)
131 return resp
if svc_id
is None else resp
[int(svc_id
)]
134 def _get_smart_data(osd_id
):
135 # type: (str) -> dict
136 """Returns S.M.A.R.T data for the given OSD ID."""
137 logger
.debug('[SMART] retrieving data from OSD with ID %s', osd_id
)
138 return CephService
.get_smart_data_by_daemon('osd', osd_id
)
140 @RESTController.Resource('GET')
141 def smart(self
, svc_id
):
142 # type: (str) -> dict
143 return self
._get
_smart
_data
(svc_id
)
145 @handle_send_command_error('osd')
146 def get(self
, svc_id
):
148 Returns collected data about an OSD.
150 :return: Returns the requested data.
153 'osd_map': self
.get_osd_map(svc_id
),
154 'osd_metadata': mgr
.get_metadata('osd', svc_id
),
155 'operational_status': self
._get
_operational
_status
(int(svc_id
),
156 self
.get_removing_osds())
159 @RESTController.Resource('GET')
160 @handle_send_command_error('osd')
161 def histogram(self
, svc_id
):
162 # type: (int) -> Dict[str, Any]
164 :return: Returns the histogram data.
167 histogram
= CephService
.send_command(
168 'osd', srv_spec
=svc_id
, prefix
='perf histogram dump')
169 except SendCommandError
as e
: # pragma: no cover - the handling is too obvious
170 raise DashboardException(
171 component
='osd', http_status_code
=400, msg
=str(e
))
175 def set(self
, svc_id
, device_class
): # pragma: no cover
176 old_device_class
= CephService
.send_command('mon', 'osd crush get-device-class',
178 old_device_class
= old_device_class
[0]['device_class']
179 if old_device_class
!= device_class
:
180 CephService
.send_command('mon', 'osd crush rm-device-class',
183 CephService
.send_command('mon', 'osd crush set-device-class', **{
184 'class': device_class
,
188 def _check_delete(self
, osd_ids
):
189 # type: (List[str]) -> Dict[str, Any]
191 Check if it's safe to remove OSD(s).
193 :param osd_ids: list of OSD IDs
194 :return: a dictionary contains the following attributes:
195 `safe`: bool, indicate if it's safe to remove OSDs.
196 `message`: str, help message if it's not safe to remove OSDs.
199 health_data
= mgr
.get('health') # type: ignore
200 health
= json
.loads(health_data
['json'])
201 checks
= health
['checks'].keys()
202 unsafe_checks
= set(['OSD_FULL', 'OSD_BACKFILLFULL', 'OSD_NEARFULL'])
203 failed_checks
= checks
& unsafe_checks
204 msg
= 'Removing OSD(s) is not recommended because of these failed health check(s): {}.'.\
205 format(', '.join(failed_checks
)) if failed_checks
else ''
207 'safe': not bool(failed_checks
),
212 @raise_if_no_orchestrator([OrchFeature
.OSD_DELETE
, OrchFeature
.OSD_GET_REMOVE_STATUS
])
213 @handle_orchestrator_error('osd')
214 @osd_task('delete', {'svc_id': '{svc_id}'})
215 def delete(self
, svc_id
, preserve_id
=None, force
=None): # pragma: no cover
217 check
: Union
[Dict
[str, Any
], bool] = False
219 if preserve_id
is not None:
220 replace
= str_to_bool(preserve_id
)
221 if force
is not None:
222 check
= not str_to_bool(force
)
224 raise DashboardException(
225 component
='osd', http_status_code
=400, msg
='Invalid parameter(s)')
226 orch
= OrchClient
.instance()
228 logger
.info('Check for removing osd.%s...', svc_id
)
229 check
= self
._check
_delete
([svc_id
])
230 if not check
['safe']:
231 logger
.error('Unable to remove osd.%s: %s', svc_id
, check
['message'])
232 raise DashboardException(component
='osd', msg
=check
['message'])
234 logger
.info('Start removing osd.%s (replace: %s)...', svc_id
, replace
)
235 orch
.osds
.remove([svc_id
], replace
)
237 removal_osds
= orch
.osds
.removing_status()
238 logger
.info('Current removing OSDs %s', removal_osds
)
239 pending
= [osd
for osd
in removal_osds
if osd
.osd_id
== int(svc_id
)]
242 logger
.info('Wait until osd.%s is removed...', svc_id
)
245 @RESTController.Resource('POST', query_params
=['deep'])
248 def scrub(self
, svc_id
, deep
=False):
249 api_scrub
= "osd deep-scrub" if str_to_bool(deep
) else "osd scrub"
250 CephService
.send_command("mon", api_scrub
, who
=svc_id
)
252 @RESTController.Resource('PUT')
253 @EndpointDoc("Mark OSD flags (out, in, down, lost, ...)",
254 parameters
={'svc_id': (str, 'SVC ID')})
255 def mark(self
, svc_id
, action
):
257 Note: osd must be marked `down` before marking lost.
259 valid_actions
= ['out', 'in', 'down', 'lost']
260 args
= {'srv_type': 'mon', 'prefix': 'osd ' + action
}
261 if action
.lower() in valid_actions
:
263 args
['id'] = int(svc_id
)
264 args
['yes_i_really_mean_it'] = True
266 args
['ids'] = [svc_id
]
268 CephService
.send_command(**args
)
270 logger
.error("Invalid OSD mark action: %s attempted on SVC_ID: %s", action
, svc_id
)
272 @RESTController.Resource('POST')
274 def reweight(self
, svc_id
, weight
):
276 Reweights the OSD temporarily.
278 Note that ‘ceph osd reweight’ is not a persistent setting. When an OSD
279 gets marked out, the osd weight will be set to 0. When it gets marked
280 in again, the weight will be changed to 1.
282 Because of this ‘ceph osd reweight’ is a temporary solution. You should
283 only use it to keep your cluster running while you’re ordering more
286 - Craig Lewis (http://lists.ceph.com/pipermail/ceph-users-ceph.com/2014-June/040967.html)
288 CephService
.send_command(
292 weight
=float(weight
))
294 def _create_bare(self
, data
):
295 """Create a OSD container that has no associated device.
297 :param data: contain attributes to create a bare OSD.
298 : `uuid`: will be set automatically if the OSD starts up
299 : `svc_id`: the ID is only used if a valid uuid is given.
303 svc_id
= int(data
['svc_id'])
304 except (KeyError, ValueError) as e
:
305 raise DashboardException(e
, component
='osd', http_status_code
=400)
307 result
= CephService
.send_command(
308 'mon', 'osd create', id=svc_id
, uuid
=uuid
)
315 @raise_if_no_orchestrator([OrchFeature
.OSD_CREATE
])
316 @handle_orchestrator_error('osd')
317 def _create_with_drive_groups(self
, drive_groups
):
318 """Create OSDs with DriveGroups."""
319 orch
= OrchClient
.instance()
321 dg_specs
= [DriveGroupSpec
.from_json(dg
) for dg
in drive_groups
]
322 orch
.osds
.create(dg_specs
)
323 except (ValueError, TypeError, DriveGroupValidationError
) as e
:
324 raise DashboardException(e
, component
='osd')
327 @osd_task('create', {'tracking_id': '{tracking_id}'})
328 def create(self
, method
, data
, tracking_id
): # pylint: disable=unused-argument
330 return self
._create
_bare
(data
)
331 if method
== 'drive_groups':
332 return self
._create
_with
_drive
_groups
(data
)
333 raise DashboardException(
334 component
='osd', http_status_code
=400, msg
='Unknown method: {}'.format(method
))
336 @RESTController.Resource('POST')
338 def purge(self
, svc_id
):
340 Note: osd must be marked `down` before removal.
342 CephService
.send_command('mon', 'osd purge-actual', id=int(svc_id
),
343 yes_i_really_mean_it
=True)
345 @RESTController.Resource('POST')
347 def destroy(self
, svc_id
):
349 Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but
350 removes cephx keys, config-key data and lockbox keys, rendering data
351 permanently unreadable.
353 The osd must be marked down before being destroyed.
355 CephService
.send_command(
356 'mon', 'osd destroy-actual', id=int(svc_id
), yes_i_really_mean_it
=True)
358 @Endpoint('GET', query_params
=['ids'])
360 @EndpointDoc("Check If OSD is Safe to Destroy",
362 'ids': (str, 'OSD Service Identifier'),
364 responses
={200: SAFE_TO_DESTROY_SCHEMA
})
365 def safe_to_destroy(self
, ids
):
370 ids
= json
.loads(ids
)
371 if isinstance(ids
, list):
372 ids
= list(map(str, ids
))
377 result
= CephService
.send_command(
378 'mon', 'osd safe-to-destroy', ids
=ids
, target
=('mgr', ''))
379 result
['is_safe_to_destroy'] = set(result
['safe_to_destroy']) == set(map(int, ids
))
382 except SendCommandError
as e
:
385 'is_safe_to_destroy': False,
388 @Endpoint('GET', query_params
=['svc_ids'])
390 @raise_if_no_orchestrator()
391 @handle_orchestrator_error('osd')
392 def safe_to_delete(self
, svc_ids
):
396 check
= self
._check
_delete
(svc_ids
)
398 'is_safe_to_delete': check
.get('safe', False),
399 'message': check
.get('message', '')
402 @RESTController.Resource('GET')
403 def devices(self
, svc_id
):
405 return CephService
.send_command('mon', 'device ls-by-daemon', who
='osd.{}'.format(svc_id
))
408 @APIRouter('/osd/flags', Scope
.OSD
)
410 class OsdFlagsController(RESTController
):
413 enabled_flags
= mgr
.get('osd_map')['flags_set']
414 if 'pauserd' in enabled_flags
and 'pausewr' in enabled_flags
:
415 # 'pause' is set by calling `ceph osd set pause` and unset by
416 # calling `set osd unset pause`, but `ceph osd dump | jq '.flags'`
417 # will contain 'pauserd,pausewr' if pause is set.
418 # Let's pretend to the API that 'pause' is in fact a proper flag.
419 enabled_flags
= list(
420 set(enabled_flags
) - {'pauserd', 'pausewr'} |
{'pause'})
421 return sorted(enabled_flags
)
424 def _update_flags(action
, flags
, ids
=None):
427 ids
= list(map(str, ids
))
428 CephService
.send_command('mon', 'osd ' + action
, who
=ids
,
429 flags
=','.join(flags
))
432 CephService
.send_command('mon', 'osd ' + action
, '', key
=flag
)
434 @EndpointDoc("Display OSD Flags",
435 responses
={200: EXPORT_FLAGS_SCHEMA
})
437 return self
._osd
_flags
()
439 @EndpointDoc('Sets OSD flags for the entire cluster.',
441 'flags': ([str], 'List of flags to set. The flags `recovery_deletes`, '
442 '`sortbitwise` and `pglog_hardlimit` cannot be unset. '
443 'Additionally `purged_snapshots` cannot even be set.')
445 responses
={200: EXPORT_FLAGS_SCHEMA
})
446 def bulk_set(self
, flags
):
448 The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
449 `purged_snapshots` cannot even be set. It is therefore required to at
450 least include those four flags for a successful operation.
452 assert isinstance(flags
, list)
454 enabled_flags
= set(self
._osd
_flags
())
456 added
= data
- enabled_flags
457 removed
= enabled_flags
- data
459 self
._update
_flags
('set', added
)
460 self
._update
_flags
('unset', removed
)
462 logger
.info('Changed OSD flags: added=%s removed=%s', added
, removed
)
464 return sorted(enabled_flags
- removed | added
)
466 @Endpoint('PUT', 'individual')
468 @EndpointDoc('Sets OSD flags for a subset of individual OSDs.',
470 'flags': ({'noout': (bool, 'Sets/unsets `noout`', True, None),
471 'noin': (bool, 'Sets/unsets `noin`', True, None),
472 'noup': (bool, 'Sets/unsets `noup`', True, None),
473 'nodown': (bool, 'Sets/unsets `nodown`', True, None)},
474 'Directory of flags to set or unset. The flags '
475 '`noin`, `noout`, `noup` and `nodown` are going to '
476 'be considered only.'),
477 'ids': ([int], 'List of OSD ids the flags should be applied '
480 responses
={200: EXPORT_INDIV_FLAGS_SCHEMA
})
481 def set_individual(self
, flags
, ids
):
483 Updates flags (`noout`, `noin`, `nodown`, `noup`) for an individual
486 assert isinstance(flags
, dict)
487 assert isinstance(ids
, list)
488 assert all(isinstance(id, int) for id in ids
)
490 # These are to only flags that can be applied to an OSD individually.
491 all_flags
= {'noin', 'noout', 'nodown', 'noup'}
494 for flag
, activated
in flags
.items():
495 if flag
in all_flags
:
496 if activated
is not None:
502 self
._update
_flags
('set-group', added
, ids
)
503 self
._update
_flags
('unset-group', removed
, ids
)
505 logger
.error('Changed individual OSD flags: added=%s removed=%s for ids=%s',
508 return {'added': sorted(added
),
509 'removed': sorted(removed
),
512 @Endpoint('GET', 'individual')
514 @EndpointDoc('Displays individual OSD flags',
515 responses
={200: EXPORT_INDIV_FLAGS_GET_SCHEMA
})
516 def get_individual(self
):
517 osd_map
= mgr
.get('osd_map')['osds']
523 'flags': osd
['state']