]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/controllers/osd.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / dashboard / controllers / osd.py
1 # -*- coding: utf-8 -*-
2 from __future__ import absolute_import
3 import json
4 import logging
5 import time
6
7 from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupValidationError
8 from mgr_util import get_most_recent_rate
9
10 from . import ApiController, RESTController, Endpoint, Task
11 from . import CreatePermission, ReadPermission, UpdatePermission, DeletePermission
12 from .orchestrator import raise_if_no_orchestrator
13 from .. import mgr
14 from ..exceptions import DashboardException
15 from ..security import Scope
16 from ..services.ceph_service import CephService, SendCommandError
17 from ..services.exception import handle_send_command_error, handle_orchestrator_error
18 from ..services.orchestrator import OrchClient
19 from ..tools import str_to_bool
20 try:
21 from typing import Dict, List, Any, Union # noqa: F401 pylint: disable=unused-import
22 except ImportError:
23 pass # For typing only
24
25
26 logger = logging.getLogger('controllers.osd')
27
28
29 def osd_task(name, metadata, wait_for=2.0):
30 return Task("osd/{}".format(name), metadata, wait_for)
31
32
33 @ApiController('/osd', Scope.OSD)
34 class Osd(RESTController):
35 def list(self):
36 osds = self.get_osd_map()
37
38 # Extending by osd stats information
39 for stat in mgr.get('osd_stats')['osd_stats']:
40 if stat['osd'] in osds:
41 osds[stat['osd']]['osd_stats'] = stat
42
43 # Extending by osd node information
44 nodes = mgr.get('osd_map_tree')['nodes']
45 for node in nodes:
46 if node['type'] == 'osd' and node['id'] in osds:
47 osds[node['id']]['tree'] = node
48
49 # Extending by osd parent node information
50 for host in [n for n in nodes if n['type'] == 'host']:
51 for osd_id in host['children']:
52 if osd_id >= 0 and osd_id in osds:
53 osds[osd_id]['host'] = host
54
55 # Extending by osd histogram data
56 for osd_id, osd in osds.items():
57 osd['stats'] = {}
58 osd['stats_history'] = {}
59 osd_spec = str(osd_id)
60 if 'osd' not in osd:
61 continue
62 for stat in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
63 prop = stat.split('.')[1]
64 rates = CephService.get_rates('osd', osd_spec, stat)
65 osd['stats'][prop] = get_most_recent_rate(rates)
66 osd['stats_history'][prop] = rates
67 # Gauge stats
68 for stat in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
69 osd['stats'][stat.split('.')[1]] = mgr.get_latest('osd', osd_spec, stat)
70
71 return list(osds.values())
72
73 @staticmethod
74 def get_osd_map(svc_id=None):
75 # type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
76 def add_id(osd):
77 osd['id'] = osd['osd']
78 return osd
79
80 resp = {
81 osd['osd']: add_id(osd)
82 for osd in mgr.get('osd_map')['osds'] if svc_id is None or osd['osd'] == int(svc_id)
83 }
84 return resp if svc_id is None else resp[int(svc_id)]
85
86 @staticmethod
87 def _get_smart_data(osd_id):
88 # type: (str) -> dict
89 """Returns S.M.A.R.T data for the given OSD ID."""
90 return CephService.get_smart_data_by_daemon('osd', osd_id)
91
92 @RESTController.Resource('GET')
93 def smart(self, svc_id):
94 # type: (str) -> dict
95 return self._get_smart_data(svc_id)
96
97 @handle_send_command_error('osd')
98 def get(self, svc_id):
99 """
100 Returns collected data about an OSD.
101
102 :return: Returns the requested data. The `histogram` key may contain a
103 string with an error that occurred if the OSD is down.
104 """
105 try:
106 histogram = CephService.send_command(
107 'osd', srv_spec=svc_id, prefix='perf histogram dump')
108 except SendCommandError as e:
109 if 'osd down' in str(e):
110 histogram = str(e)
111 else:
112 raise
113
114 return {
115 'osd_map': self.get_osd_map(svc_id),
116 'osd_metadata': mgr.get_metadata('osd', svc_id),
117 'histogram': histogram,
118 }
119
120 def set(self, svc_id, device_class):
121 old_device_class = CephService.send_command('mon', 'osd crush get-device-class',
122 ids=[svc_id])
123 old_device_class = old_device_class[0]['device_class']
124 if old_device_class != device_class:
125 CephService.send_command('mon', 'osd crush rm-device-class',
126 ids=[svc_id])
127 if device_class:
128 CephService.send_command('mon', 'osd crush set-device-class', **{
129 'class': device_class,
130 'ids': [svc_id]
131 })
132
133 def _check_delete(self, osd_ids):
134 # type: (List[str]) -> Dict[str, Any]
135 """
136 Check if it's safe to remove OSD(s).
137
138 :param osd_ids: list of OSD IDs
139 :return: a dictionary contains the following attributes:
140 `safe`: bool, indicate if it's safe to remove OSDs.
141 `message`: str, help message if it's not safe to remove OSDs.
142 """
143 _ = osd_ids
144 health_data = mgr.get('health') # type: ignore
145 health = json.loads(health_data['json'])
146 checks = health['checks'].keys()
147 unsafe_checks = set(['OSD_FULL', 'OSD_BACKFILLFULL', 'OSD_NEARFULL'])
148 failed_checks = checks & unsafe_checks
149 msg = 'Removing OSD(s) is not recommended because of these failed health check(s): {}.'.\
150 format(', '.join(failed_checks)) if failed_checks else ''
151 return {
152 'safe': not bool(failed_checks),
153 'message': msg
154 }
155
156 @DeletePermission
157 @raise_if_no_orchestrator
158 @handle_orchestrator_error('osd')
159 @osd_task('delete', {'svc_id': '{svc_id}'})
160 def delete(self, svc_id, force=None):
161 orch = OrchClient.instance()
162 if not force:
163 logger.info('Check for removing osd.%s...', svc_id)
164 check = self._check_delete([svc_id])
165 if not check['safe']:
166 logger.error('Unable to remove osd.%s: %s', svc_id, check['message'])
167 raise DashboardException(component='osd', msg=check['message'])
168 logger.info('Start removing osd.%s...', svc_id)
169 orch.osds.remove([svc_id])
170 while True:
171 removal_osds = orch.osds.removing_status()
172 logger.info('Current removing OSDs %s', removal_osds)
173 pending = [osd for osd in removal_osds if osd.osd_id == svc_id]
174 if not pending:
175 break
176 logger.info('Wait until osd.%s is removed...', svc_id)
177 time.sleep(60)
178
179 @RESTController.Resource('POST', query_params=['deep'])
180 @UpdatePermission
181 def scrub(self, svc_id, deep=False):
182 api_scrub = "osd deep-scrub" if str_to_bool(deep) else "osd scrub"
183 CephService.send_command("mon", api_scrub, who=svc_id)
184
185 @RESTController.Resource('POST')
186 def mark_out(self, svc_id):
187 CephService.send_command('mon', 'osd out', ids=[svc_id])
188
189 @RESTController.Resource('POST')
190 def mark_in(self, svc_id):
191 CephService.send_command('mon', 'osd in', ids=[svc_id])
192
193 @RESTController.Resource('POST')
194 def mark_down(self, svc_id):
195 CephService.send_command('mon', 'osd down', ids=[svc_id])
196
197 @RESTController.Resource('POST')
198 def reweight(self, svc_id, weight):
199 """
200 Reweights the OSD temporarily.
201
202 Note that ‘ceph osd reweight’ is not a persistent setting. When an OSD
203 gets marked out, the osd weight will be set to 0. When it gets marked
204 in again, the weight will be changed to 1.
205
206 Because of this ‘ceph osd reweight’ is a temporary solution. You should
207 only use it to keep your cluster running while you’re ordering more
208 hardware.
209
210 - Craig Lewis (http://lists.ceph.com/pipermail/ceph-users-ceph.com/2014-June/040967.html)
211 """
212 CephService.send_command(
213 'mon',
214 'osd reweight',
215 id=int(svc_id),
216 weight=float(weight))
217
218 @RESTController.Resource('POST')
219 def mark_lost(self, svc_id):
220 """
221 Note: osd must be marked `down` before marking lost.
222 """
223 CephService.send_command(
224 'mon',
225 'osd lost',
226 id=int(svc_id),
227 yes_i_really_mean_it=True)
228
229 def _create_bare(self, data):
230 """Create a OSD container that has no associated device.
231
232 :param data: contain attributes to create a bare OSD.
233 : `uuid`: will be set automatically if the OSD starts up
234 : `svc_id`: the ID is only used if a valid uuid is given.
235 """
236 try:
237 uuid = data['uuid']
238 svc_id = int(data['svc_id'])
239 except (KeyError, ValueError) as e:
240 raise DashboardException(e, component='osd', http_status_code=400)
241
242 result = CephService.send_command(
243 'mon', 'osd create', id=svc_id, uuid=uuid)
244 return {
245 'result': result,
246 'svc_id': svc_id,
247 'uuid': uuid,
248 }
249
250 @raise_if_no_orchestrator
251 @handle_orchestrator_error('osd')
252 def _create_with_drive_groups(self, drive_groups):
253 """Create OSDs with DriveGroups."""
254 orch = OrchClient.instance()
255 try:
256 dg_specs = [DriveGroupSpec.from_json(dg) for dg in drive_groups]
257 orch.osds.create(dg_specs)
258 except (ValueError, TypeError, DriveGroupValidationError) as e:
259 raise DashboardException(e, component='osd')
260
261 @CreatePermission
262 @osd_task('create', {'tracking_id': '{tracking_id}'})
263 def create(self, method, data, tracking_id): # pylint: disable=W0622
264 if method == 'bare':
265 return self._create_bare(data)
266 if method == 'drive_groups':
267 return self._create_with_drive_groups(data)
268 raise DashboardException(
269 component='osd', http_status_code=400, msg='Unknown method: {}'.format(method))
270
271 @RESTController.Resource('POST')
272 def purge(self, svc_id):
273 """
274 Note: osd must be marked `down` before removal.
275 """
276 CephService.send_command('mon', 'osd purge-actual', id=int(svc_id),
277 yes_i_really_mean_it=True)
278
279 @RESTController.Resource('POST')
280 def destroy(self, svc_id):
281 """
282 Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but
283 removes cephx keys, config-key data and lockbox keys, rendering data
284 permanently unreadable.
285
286 The osd must be marked down before being destroyed.
287 """
288 CephService.send_command(
289 'mon', 'osd destroy-actual', id=int(svc_id), yes_i_really_mean_it=True)
290
291 @Endpoint('GET', query_params=['ids'])
292 @ReadPermission
293 def safe_to_destroy(self, ids):
294 """
295 :type ids: int|[int]
296 """
297
298 ids = json.loads(ids)
299 if isinstance(ids, list):
300 ids = list(map(str, ids))
301 else:
302 ids = [str(ids)]
303
304 try:
305 result = CephService.send_command(
306 'mon', 'osd safe-to-destroy', ids=ids, target=('mgr', ''))
307 result['is_safe_to_destroy'] = set(result['safe_to_destroy']) == set(map(int, ids))
308 return result
309
310 except SendCommandError as e:
311 return {
312 'message': str(e),
313 'is_safe_to_destroy': False,
314 }
315
316 @Endpoint('GET', query_params=['svc_ids'])
317 @ReadPermission
318 @raise_if_no_orchestrator
319 @handle_orchestrator_error('osd')
320 def safe_to_delete(self, svc_ids):
321 """
322 :type ids: int|[int]
323 """
324 check = self._check_delete(svc_ids)
325 return {
326 'is_safe_to_delete': check.get('safe', False),
327 'message': check.get('message', '')
328 }
329
330 @RESTController.Resource('GET')
331 def devices(self, svc_id):
332 # (str) -> dict
333 return CephService.send_command('mon', 'device ls-by-daemon', who='osd.{}'.format(svc_id))
334
335
336 @ApiController('/osd/flags', Scope.OSD)
337 class OsdFlagsController(RESTController):
338 @staticmethod
339 def _osd_flags():
340 enabled_flags = mgr.get('osd_map')['flags_set']
341 if 'pauserd' in enabled_flags and 'pausewr' in enabled_flags:
342 # 'pause' is set by calling `ceph osd set pause` and unset by
343 # calling `set osd unset pause`, but `ceph osd dump | jq '.flags'`
344 # will contain 'pauserd,pausewr' if pause is set.
345 # Let's pretend to the API that 'pause' is in fact a proper flag.
346 enabled_flags = list(
347 set(enabled_flags) - {'pauserd', 'pausewr'} | {'pause'})
348 return sorted(enabled_flags)
349
350 def list(self):
351 return self._osd_flags()
352
353 def bulk_set(self, flags):
354 """
355 The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
356 `purged_snapshots` cannot even be set. It is therefore required to at
357 least include those four flags for a successful operation.
358 """
359 assert isinstance(flags, list)
360
361 enabled_flags = set(self._osd_flags())
362 data = set(flags)
363 added = data - enabled_flags
364 removed = enabled_flags - data
365 for flag in added:
366 CephService.send_command('mon', 'osd set', '', key=flag)
367 for flag in removed:
368 CephService.send_command('mon', 'osd unset', '', key=flag)
369 logger.info('Changed OSD flags: added=%s removed=%s', added, removed)
370
371 return sorted(enabled_flags - removed | added)