1 # -*- coding: utf-8 -*-
2 from __future__
import absolute_import
7 from ceph
.deployment
.drive_group
import DriveGroupSpec
, DriveGroupValidationError
8 from mgr_util
import get_most_recent_rate
10 from . import ApiController
, RESTController
, Endpoint
, Task
, allow_empty_body
11 from . import CreatePermission
, ReadPermission
, UpdatePermission
, DeletePermission
12 from .orchestrator
import raise_if_no_orchestrator
14 from ..exceptions
import DashboardException
15 from ..security
import Scope
16 from ..services
.ceph_service
import CephService
, SendCommandError
17 from ..services
.exception
import handle_send_command_error
, handle_orchestrator_error
18 from ..services
.orchestrator
import OrchClient
19 from ..tools
import str_to_bool
21 from typing
import Dict
, List
, Any
, Union
# noqa: F401 pylint: disable=unused-import
22 except ImportError: # pragma: no cover
23 pass # For typing only
26 logger
= logging
.getLogger('controllers.osd')
29 def osd_task(name
, metadata
, wait_for
=2.0):
30 return Task("osd/{}".format(name
), metadata
, wait_for
)
33 @ApiController('/osd', Scope
.OSD
)
34 class Osd(RESTController
):
36 osds
= self
.get_osd_map()
38 # Extending by osd stats information
39 for stat
in mgr
.get('osd_stats')['osd_stats']:
40 if stat
['osd'] in osds
:
41 osds
[stat
['osd']]['osd_stats'] = stat
43 # Extending by osd node information
44 nodes
= mgr
.get('osd_map_tree')['nodes']
46 if node
['type'] == 'osd' and node
['id'] in osds
:
47 osds
[node
['id']]['tree'] = node
49 # Extending by osd parent node information
50 for host
in [n
for n
in nodes
if n
['type'] == 'host']:
51 for osd_id
in host
['children']:
52 if osd_id
>= 0 and osd_id
in osds
:
53 osds
[osd_id
]['host'] = host
55 # Extending by osd histogram data
56 for osd_id
, osd
in osds
.items():
58 osd
['stats_history'] = {}
59 osd_spec
= str(osd_id
)
61 continue # pragma: no cover - simple early continue
62 for stat
in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
63 prop
= stat
.split('.')[1]
64 rates
= CephService
.get_rates('osd', osd_spec
, stat
)
65 osd
['stats'][prop
] = get_most_recent_rate(rates
)
66 osd
['stats_history'][prop
] = rates
68 for stat
in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
69 osd
['stats'][stat
.split('.')[1]] = mgr
.get_latest('osd', osd_spec
, stat
)
71 return list(osds
.values())
74 def get_osd_map(svc_id
=None):
75 # type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
77 osd
['id'] = osd
['osd']
81 osd
['osd']: add_id(osd
)
82 for osd
in mgr
.get('osd_map')['osds'] if svc_id
is None or osd
['osd'] == int(svc_id
)
84 return resp
if svc_id
is None else resp
[int(svc_id
)]
87 def _get_smart_data(osd_id
):
89 """Returns S.M.A.R.T data for the given OSD ID."""
90 return CephService
.get_smart_data_by_daemon('osd', osd_id
)
92 @RESTController.Resource('GET')
93 def smart(self
, svc_id
):
95 return self
._get
_smart
_data
(svc_id
)
97 @handle_send_command_error('osd')
98 def get(self
, svc_id
):
100 Returns collected data about an OSD.
102 :return: Returns the requested data. The `histogram` key may contain a
103 string with an error that occurred if the OSD is down.
106 histogram
= CephService
.send_command(
107 'osd', srv_spec
=svc_id
, prefix
='perf histogram dump')
108 except SendCommandError
as e
: # pragma: no cover - the handling is too obvious
109 if 'osd down' in str(e
): # pragma: no cover - no complexity there
111 else: # pragma: no cover - no complexity there
115 'osd_map': self
.get_osd_map(svc_id
),
116 'osd_metadata': mgr
.get_metadata('osd', svc_id
),
117 'histogram': histogram
,
120 @RESTController.Resource('GET')
121 @handle_send_command_error('osd')
122 def histogram(self
, svc_id
):
123 # type: (int) -> Dict[str, Any]
125 :return: Returns the histogram data.
128 histogram
= CephService
.send_command(
129 'osd', srv_spec
=svc_id
, prefix
='perf histogram dump')
130 except SendCommandError
as e
: # pragma: no cover - the handling is too obvious
131 raise DashboardException(
132 component
='osd', http_status_code
=400, msg
=str(e
))
136 def set(self
, svc_id
, device_class
): # pragma: no cover
137 old_device_class
= CephService
.send_command('mon', 'osd crush get-device-class',
139 old_device_class
= old_device_class
[0]['device_class']
140 if old_device_class
!= device_class
:
141 CephService
.send_command('mon', 'osd crush rm-device-class',
144 CephService
.send_command('mon', 'osd crush set-device-class', **{
145 'class': device_class
,
149 def _check_delete(self
, osd_ids
):
150 # type: (List[str]) -> Dict[str, Any]
152 Check if it's safe to remove OSD(s).
154 :param osd_ids: list of OSD IDs
155 :return: a dictionary contains the following attributes:
156 `safe`: bool, indicate if it's safe to remove OSDs.
157 `message`: str, help message if it's not safe to remove OSDs.
160 health_data
= mgr
.get('health') # type: ignore
161 health
= json
.loads(health_data
['json'])
162 checks
= health
['checks'].keys()
163 unsafe_checks
= set(['OSD_FULL', 'OSD_BACKFILLFULL', 'OSD_NEARFULL'])
164 failed_checks
= checks
& unsafe_checks
165 msg
= 'Removing OSD(s) is not recommended because of these failed health check(s): {}.'.\
166 format(', '.join(failed_checks
)) if failed_checks
else ''
168 'safe': not bool(failed_checks
),
173 @raise_if_no_orchestrator
174 @handle_orchestrator_error('osd')
175 @osd_task('delete', {'svc_id': '{svc_id}'})
176 def delete(self
, svc_id
, preserve_id
=None, force
=None): # pragma: no cover
180 if preserve_id
is not None:
181 replace
= str_to_bool(preserve_id
)
182 if force
is not None:
183 check
= not str_to_bool(force
)
185 raise DashboardException(
186 component
='osd', http_status_code
=400, msg
='Invalid parameter(s)')
188 orch
= OrchClient
.instance()
190 logger
.info('Check for removing osd.%s...', svc_id
)
191 check
= self
._check
_delete
([svc_id
])
192 if not check
['safe']:
193 logger
.error('Unable to remove osd.%s: %s', svc_id
, check
['message'])
194 raise DashboardException(component
='osd', msg
=check
['message'])
196 logger
.info('Start removing osd.%s (replace: %s)...', svc_id
, replace
)
197 orch
.osds
.remove([svc_id
], replace
)
199 removal_osds
= orch
.osds
.removing_status()
200 logger
.info('Current removing OSDs %s', removal_osds
)
201 pending
= [osd
for osd
in removal_osds
if osd
.osd_id
== svc_id
]
204 logger
.info('Wait until osd.%s is removed...', svc_id
)
207 @RESTController.Resource('POST', query_params
=['deep'])
210 def scrub(self
, svc_id
, deep
=False):
211 api_scrub
= "osd deep-scrub" if str_to_bool(deep
) else "osd scrub"
212 CephService
.send_command("mon", api_scrub
, who
=svc_id
)
214 @RESTController.Resource('POST')
216 def mark_out(self
, svc_id
):
217 CephService
.send_command('mon', 'osd out', ids
=[svc_id
])
219 @RESTController.Resource('POST')
221 def mark_in(self
, svc_id
):
222 CephService
.send_command('mon', 'osd in', ids
=[svc_id
])
224 @RESTController.Resource('POST')
226 def mark_down(self
, svc_id
):
227 CephService
.send_command('mon', 'osd down', ids
=[svc_id
])
229 @RESTController.Resource('POST')
231 def reweight(self
, svc_id
, weight
):
233 Reweights the OSD temporarily.
235 Note that ‘ceph osd reweight’ is not a persistent setting. When an OSD
236 gets marked out, the osd weight will be set to 0. When it gets marked
237 in again, the weight will be changed to 1.
239 Because of this ‘ceph osd reweight’ is a temporary solution. You should
240 only use it to keep your cluster running while you’re ordering more
243 - Craig Lewis (http://lists.ceph.com/pipermail/ceph-users-ceph.com/2014-June/040967.html)
245 CephService
.send_command(
249 weight
=float(weight
))
251 @RESTController.Resource('POST')
253 def mark_lost(self
, svc_id
):
255 Note: osd must be marked `down` before marking lost.
257 CephService
.send_command(
261 yes_i_really_mean_it
=True)
263 def _create_bare(self
, data
):
264 """Create a OSD container that has no associated device.
266 :param data: contain attributes to create a bare OSD.
267 : `uuid`: will be set automatically if the OSD starts up
268 : `svc_id`: the ID is only used if a valid uuid is given.
272 svc_id
= int(data
['svc_id'])
273 except (KeyError, ValueError) as e
:
274 raise DashboardException(e
, component
='osd', http_status_code
=400)
276 result
= CephService
.send_command(
277 'mon', 'osd create', id=svc_id
, uuid
=uuid
)
284 @raise_if_no_orchestrator
285 @handle_orchestrator_error('osd')
286 def _create_with_drive_groups(self
, drive_groups
):
287 """Create OSDs with DriveGroups."""
288 orch
= OrchClient
.instance()
290 dg_specs
= [DriveGroupSpec
.from_json(dg
) for dg
in drive_groups
]
291 orch
.osds
.create(dg_specs
)
292 except (ValueError, TypeError, DriveGroupValidationError
) as e
:
293 raise DashboardException(e
, component
='osd')
296 @osd_task('create', {'tracking_id': '{tracking_id}'})
297 def create(self
, method
, data
, tracking_id
): # pylint: disable=W0622
299 return self
._create
_bare
(data
)
300 if method
== 'drive_groups':
301 return self
._create
_with
_drive
_groups
(data
)
302 raise DashboardException(
303 component
='osd', http_status_code
=400, msg
='Unknown method: {}'.format(method
))
305 @RESTController.Resource('POST')
307 def purge(self
, svc_id
):
309 Note: osd must be marked `down` before removal.
311 CephService
.send_command('mon', 'osd purge-actual', id=int(svc_id
),
312 yes_i_really_mean_it
=True)
314 @RESTController.Resource('POST')
316 def destroy(self
, svc_id
):
318 Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but
319 removes cephx keys, config-key data and lockbox keys, rendering data
320 permanently unreadable.
322 The osd must be marked down before being destroyed.
324 CephService
.send_command(
325 'mon', 'osd destroy-actual', id=int(svc_id
), yes_i_really_mean_it
=True)
327 @Endpoint('GET', query_params
=['ids'])
329 def safe_to_destroy(self
, ids
):
334 ids
= json
.loads(ids
)
335 if isinstance(ids
, list):
336 ids
= list(map(str, ids
))
341 result
= CephService
.send_command(
342 'mon', 'osd safe-to-destroy', ids
=ids
, target
=('mgr', ''))
343 result
['is_safe_to_destroy'] = set(result
['safe_to_destroy']) == set(map(int, ids
))
346 except SendCommandError
as e
:
349 'is_safe_to_destroy': False,
352 @Endpoint('GET', query_params
=['svc_ids'])
354 @raise_if_no_orchestrator
355 @handle_orchestrator_error('osd')
356 def safe_to_delete(self
, svc_ids
):
360 check
= self
._check
_delete
(svc_ids
)
362 'is_safe_to_delete': check
.get('safe', False),
363 'message': check
.get('message', '')
366 @RESTController.Resource('GET')
367 def devices(self
, svc_id
):
369 return CephService
.send_command('mon', 'device ls-by-daemon', who
='osd.{}'.format(svc_id
))
372 @ApiController('/osd/flags', Scope
.OSD
)
373 class OsdFlagsController(RESTController
):
376 enabled_flags
= mgr
.get('osd_map')['flags_set']
377 if 'pauserd' in enabled_flags
and 'pausewr' in enabled_flags
:
378 # 'pause' is set by calling `ceph osd set pause` and unset by
379 # calling `set osd unset pause`, but `ceph osd dump | jq '.flags'`
380 # will contain 'pauserd,pausewr' if pause is set.
381 # Let's pretend to the API that 'pause' is in fact a proper flag.
382 enabled_flags
= list(
383 set(enabled_flags
) - {'pauserd', 'pausewr'} |
{'pause'})
384 return sorted(enabled_flags
)
387 def _update_flags(action
, flags
, ids
=None):
390 ids
= list(map(str, ids
))
391 CephService
.send_command('mon', 'osd ' + action
, who
=ids
,
392 flags
=','.join(flags
))
395 CephService
.send_command('mon', 'osd ' + action
, '', key
=flag
)
398 return self
._osd
_flags
()
400 def bulk_set(self
, flags
):
402 The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
403 `purged_snapshots` cannot even be set. It is therefore required to at
404 least include those four flags for a successful operation.
406 assert isinstance(flags
, list)
408 enabled_flags
= set(self
._osd
_flags
())
410 added
= data
- enabled_flags
411 removed
= enabled_flags
- data
413 self
._update
_flags
('set', added
)
414 self
._update
_flags
('unset', removed
)
416 logger
.info('Changed OSD flags: added=%s removed=%s', added
, removed
)
418 return sorted(enabled_flags
- removed | added
)
420 @Endpoint('PUT', 'individual')
422 def set_individual(self
, flags
, ids
):
424 Updates flags (`noout`, `noin`, `nodown`, `noup`) for an individual
427 assert isinstance(flags
, dict)
428 assert isinstance(ids
, list)
429 assert all(isinstance(id, int) for id in ids
)
431 # These are to only flags that can be applied to an OSD individually.
432 all_flags
= {'noin', 'noout', 'nodown', 'noup'}
435 for flag
, activated
in flags
.items():
436 if flag
in all_flags
:
437 if activated
is not None:
443 self
._update
_flags
('set-group', added
, ids
)
444 self
._update
_flags
('unset-group', removed
, ids
)
446 logger
.error('Changed individual OSD flags: added=%s removed=%s for ids=%s',
449 return {'added': sorted(added
),
450 'removed': sorted(removed
),
453 @Endpoint('GET', 'individual')
455 def get_individual(self
):
456 osd_map
= mgr
.get('osd_map')['osds']
462 'flags': osd
['state']