1 # -*- coding: utf-8 -*-
2 from __future__
import absolute_import
7 from ceph
.deployment
.drive_group
import DriveGroupSpec
, DriveGroupValidationError
8 from mgr_util
import get_most_recent_rate
10 from . import ApiController
, RESTController
, Endpoint
, Task
11 from . import CreatePermission
, ReadPermission
, UpdatePermission
, DeletePermission
12 from .orchestrator
import raise_if_no_orchestrator
14 from ..exceptions
import DashboardException
15 from ..security
import Scope
16 from ..services
.ceph_service
import CephService
, SendCommandError
17 from ..services
.exception
import handle_send_command_error
, handle_orchestrator_error
18 from ..services
.orchestrator
import OrchClient
19 from ..tools
import str_to_bool
21 from typing
import Dict
, List
, Any
, Union
# noqa: F401 pylint: disable=unused-import
23 pass # For typing only
26 logger
= logging
.getLogger('controllers.osd')
29 def osd_task(name
, metadata
, wait_for
=2.0):
30 return Task("osd/{}".format(name
), metadata
, wait_for
)
33 @ApiController('/osd', Scope
.OSD
)
34 class Osd(RESTController
):
36 osds
= self
.get_osd_map()
38 # Extending by osd stats information
39 for stat
in mgr
.get('osd_stats')['osd_stats']:
40 if stat
['osd'] in osds
:
41 osds
[stat
['osd']]['osd_stats'] = stat
43 # Extending by osd node information
44 nodes
= mgr
.get('osd_map_tree')['nodes']
46 if node
['type'] == 'osd' and node
['id'] in osds
:
47 osds
[node
['id']]['tree'] = node
49 # Extending by osd parent node information
50 for host
in [n
for n
in nodes
if n
['type'] == 'host']:
51 for osd_id
in host
['children']:
52 if osd_id
>= 0 and osd_id
in osds
:
53 osds
[osd_id
]['host'] = host
55 # Extending by osd histogram data
56 for osd_id
, osd
in osds
.items():
58 osd
['stats_history'] = {}
59 osd_spec
= str(osd_id
)
62 for stat
in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
63 prop
= stat
.split('.')[1]
64 rates
= CephService
.get_rates('osd', osd_spec
, stat
)
65 osd
['stats'][prop
] = get_most_recent_rate(rates
)
66 osd
['stats_history'][prop
] = rates
68 for stat
in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
69 osd
['stats'][stat
.split('.')[1]] = mgr
.get_latest('osd', osd_spec
, stat
)
71 return list(osds
.values())
74 def get_osd_map(svc_id
=None):
75 # type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
77 osd
['id'] = osd
['osd']
81 osd
['osd']: add_id(osd
)
82 for osd
in mgr
.get('osd_map')['osds'] if svc_id
is None or osd
['osd'] == int(svc_id
)
84 return resp
if svc_id
is None else resp
[int(svc_id
)]
87 def _get_smart_data(osd_id
):
89 """Returns S.M.A.R.T data for the given OSD ID."""
90 return CephService
.get_smart_data_by_daemon('osd', osd_id
)
92 @RESTController.Resource('GET')
93 def smart(self
, svc_id
):
95 return self
._get
_smart
_data
(svc_id
)
97 @handle_send_command_error('osd')
98 def get(self
, svc_id
):
100 Returns collected data about an OSD.
102 :return: Returns the requested data. The `histogram` key may contain a
103 string with an error that occurred if the OSD is down.
106 histogram
= CephService
.send_command(
107 'osd', srv_spec
=svc_id
, prefix
='perf histogram dump')
108 except SendCommandError
as e
:
109 if 'osd down' in str(e
):
115 'osd_map': self
.get_osd_map(svc_id
),
116 'osd_metadata': mgr
.get_metadata('osd', svc_id
),
117 'histogram': histogram
,
120 def set(self
, svc_id
, device_class
):
121 old_device_class
= CephService
.send_command('mon', 'osd crush get-device-class',
123 old_device_class
= old_device_class
[0]['device_class']
124 if old_device_class
!= device_class
:
125 CephService
.send_command('mon', 'osd crush rm-device-class',
128 CephService
.send_command('mon', 'osd crush set-device-class', **{
129 'class': device_class
,
133 def _check_delete(self
, osd_ids
):
134 # type: (List[str]) -> Dict[str, Any]
136 Check if it's safe to remove OSD(s).
138 :param osd_ids: list of OSD IDs
139 :return: a dictionary contains the following attributes:
140 `safe`: bool, indicate if it's safe to remove OSDs.
141 `message`: str, help message if it's not safe to remove OSDs.
144 health_data
= mgr
.get('health') # type: ignore
145 health
= json
.loads(health_data
['json'])
146 checks
= health
['checks'].keys()
147 unsafe_checks
= set(['OSD_FULL', 'OSD_BACKFILLFULL', 'OSD_NEARFULL'])
148 failed_checks
= checks
& unsafe_checks
149 msg
= 'Removing OSD(s) is not recommended because of these failed health check(s): {}.'.\
150 format(', '.join(failed_checks
)) if failed_checks
else ''
152 'safe': not bool(failed_checks
),
157 @raise_if_no_orchestrator
158 @handle_orchestrator_error('osd')
159 @osd_task('delete', {'svc_id': '{svc_id}'})
160 def delete(self
, svc_id
, force
=None):
161 orch
= OrchClient
.instance()
163 logger
.info('Check for removing osd.%s...', svc_id
)
164 check
= self
._check
_delete
([svc_id
])
165 if not check
['safe']:
166 logger
.error('Unable to remove osd.%s: %s', svc_id
, check
['message'])
167 raise DashboardException(component
='osd', msg
=check
['message'])
168 logger
.info('Start removing osd.%s...', svc_id
)
169 orch
.osds
.remove([svc_id
])
171 removal_osds
= orch
.osds
.removing_status()
172 logger
.info('Current removing OSDs %s', removal_osds
)
173 pending
= [osd
for osd
in removal_osds
if osd
.osd_id
== svc_id
]
176 logger
.info('Wait until osd.%s is removed...', svc_id
)
179 @RESTController.Resource('POST', query_params
=['deep'])
181 def scrub(self
, svc_id
, deep
=False):
182 api_scrub
= "osd deep-scrub" if str_to_bool(deep
) else "osd scrub"
183 CephService
.send_command("mon", api_scrub
, who
=svc_id
)
185 @RESTController.Resource('POST')
186 def mark_out(self
, svc_id
):
187 CephService
.send_command('mon', 'osd out', ids
=[svc_id
])
189 @RESTController.Resource('POST')
190 def mark_in(self
, svc_id
):
191 CephService
.send_command('mon', 'osd in', ids
=[svc_id
])
193 @RESTController.Resource('POST')
194 def mark_down(self
, svc_id
):
195 CephService
.send_command('mon', 'osd down', ids
=[svc_id
])
197 @RESTController.Resource('POST')
198 def reweight(self
, svc_id
, weight
):
200 Reweights the OSD temporarily.
202 Note that ‘ceph osd reweight’ is not a persistent setting. When an OSD
203 gets marked out, the osd weight will be set to 0. When it gets marked
204 in again, the weight will be changed to 1.
206 Because of this ‘ceph osd reweight’ is a temporary solution. You should
207 only use it to keep your cluster running while you’re ordering more
210 - Craig Lewis (http://lists.ceph.com/pipermail/ceph-users-ceph.com/2014-June/040967.html)
212 CephService
.send_command(
216 weight
=float(weight
))
218 @RESTController.Resource('POST')
219 def mark_lost(self
, svc_id
):
221 Note: osd must be marked `down` before marking lost.
223 CephService
.send_command(
227 yes_i_really_mean_it
=True)
229 def _create_bare(self
, data
):
230 """Create a OSD container that has no associated device.
232 :param data: contain attributes to create a bare OSD.
233 : `uuid`: will be set automatically if the OSD starts up
234 : `svc_id`: the ID is only used if a valid uuid is given.
238 svc_id
= int(data
['svc_id'])
239 except (KeyError, ValueError) as e
:
240 raise DashboardException(e
, component
='osd', http_status_code
=400)
242 result
= CephService
.send_command(
243 'mon', 'osd create', id=svc_id
, uuid
=uuid
)
250 @raise_if_no_orchestrator
251 @handle_orchestrator_error('osd')
252 def _create_with_drive_groups(self
, drive_groups
):
253 """Create OSDs with DriveGroups."""
254 orch
= OrchClient
.instance()
256 dg_specs
= [DriveGroupSpec
.from_json(dg
) for dg
in drive_groups
]
257 orch
.osds
.create(dg_specs
)
258 except (ValueError, TypeError, DriveGroupValidationError
) as e
:
259 raise DashboardException(e
, component
='osd')
262 @osd_task('create', {'tracking_id': '{tracking_id}'})
263 def create(self
, method
, data
, tracking_id
): # pylint: disable=W0622
265 return self
._create
_bare
(data
)
266 if method
== 'drive_groups':
267 return self
._create
_with
_drive
_groups
(data
)
268 raise DashboardException(
269 component
='osd', http_status_code
=400, msg
='Unknown method: {}'.format(method
))
271 @RESTController.Resource('POST')
272 def purge(self
, svc_id
):
274 Note: osd must be marked `down` before removal.
276 CephService
.send_command('mon', 'osd purge-actual', id=int(svc_id
),
277 yes_i_really_mean_it
=True)
279 @RESTController.Resource('POST')
280 def destroy(self
, svc_id
):
282 Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but
283 removes cephx keys, config-key data and lockbox keys, rendering data
284 permanently unreadable.
286 The osd must be marked down before being destroyed.
288 CephService
.send_command(
289 'mon', 'osd destroy-actual', id=int(svc_id
), yes_i_really_mean_it
=True)
291 @Endpoint('GET', query_params
=['ids'])
293 def safe_to_destroy(self
, ids
):
298 ids
= json
.loads(ids
)
299 if isinstance(ids
, list):
300 ids
= list(map(str, ids
))
305 result
= CephService
.send_command(
306 'mon', 'osd safe-to-destroy', ids
=ids
, target
=('mgr', ''))
307 result
['is_safe_to_destroy'] = set(result
['safe_to_destroy']) == set(map(int, ids
))
310 except SendCommandError
as e
:
313 'is_safe_to_destroy': False,
316 @Endpoint('GET', query_params
=['svc_ids'])
318 @raise_if_no_orchestrator
319 @handle_orchestrator_error('osd')
320 def safe_to_delete(self
, svc_ids
):
324 check
= self
._check
_delete
(svc_ids
)
326 'is_safe_to_delete': check
.get('safe', False),
327 'message': check
.get('message', '')
330 @RESTController.Resource('GET')
331 def devices(self
, svc_id
):
333 return CephService
.send_command('mon', 'device ls-by-daemon', who
='osd.{}'.format(svc_id
))
336 @ApiController('/osd/flags', Scope
.OSD
)
337 class OsdFlagsController(RESTController
):
340 enabled_flags
= mgr
.get('osd_map')['flags_set']
341 if 'pauserd' in enabled_flags
and 'pausewr' in enabled_flags
:
342 # 'pause' is set by calling `ceph osd set pause` and unset by
343 # calling `set osd unset pause`, but `ceph osd dump | jq '.flags'`
344 # will contain 'pauserd,pausewr' if pause is set.
345 # Let's pretend to the API that 'pause' is in fact a proper flag.
346 enabled_flags
= list(
347 set(enabled_flags
) - {'pauserd', 'pausewr'} |
{'pause'})
348 return sorted(enabled_flags
)
351 return self
._osd
_flags
()
353 def bulk_set(self
, flags
):
355 The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
356 `purged_snapshots` cannot even be set. It is therefore required to at
357 least include those four flags for a successful operation.
359 assert isinstance(flags
, list)
361 enabled_flags
= set(self
._osd
_flags
())
363 added
= data
- enabled_flags
364 removed
= enabled_flags
- data
366 CephService
.send_command('mon', 'osd set', '', key
=flag
)
368 CephService
.send_command('mon', 'osd unset', '', key
=flag
)
369 logger
.info('Changed OSD flags: added=%s removed=%s', added
, removed
)
371 return sorted(enabled_flags
- removed | added
)