]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/controllers/osd.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / dashboard / controllers / osd.py
1 # -*- coding: utf-8 -*-
2
3 import json
4 import logging
5 import time
6 from typing import Any, Dict, List, Optional, Union
7
8 from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupValidationError # type: ignore
9 from mgr_util import get_most_recent_rate
10
11 from .. import mgr
12 from ..exceptions import DashboardException
13 from ..security import Scope
14 from ..services.ceph_service import CephService, SendCommandError
15 from ..services.exception import handle_orchestrator_error, handle_send_command_error
16 from ..services.orchestrator import OrchClient, OrchFeature
17 from ..tools import str_to_bool
18 from . import APIDoc, APIRouter, CreatePermission, DeletePermission, Endpoint, \
19 EndpointDoc, ReadPermission, RESTController, Task, UpdatePermission, \
20 allow_empty_body
21 from ._version import APIVersion
22 from .orchestrator import raise_if_no_orchestrator
23
24 logger = logging.getLogger('controllers.osd')
25
26 SAFE_TO_DESTROY_SCHEMA = {
27 "safe_to_destroy": ([str], "Is OSD safe to destroy?"),
28 "active": ([int], ""),
29 "missing_stats": ([str], ""),
30 "stored_pgs": ([str], "Stored Pool groups in Osd"),
31 "is_safe_to_destroy": (bool, "Is OSD safe to destroy?")
32 }
33
34 EXPORT_FLAGS_SCHEMA = {
35 "list_of_flags": ([str], "")
36 }
37
38 EXPORT_INDIV_FLAGS_SCHEMA = {
39 "added": ([str], "List of added flags"),
40 "removed": ([str], "List of removed flags"),
41 "ids": ([int], "List of updated OSDs")
42 }
43
44 EXPORT_INDIV_FLAGS_GET_SCHEMA = {
45 "osd": (int, "OSD ID"),
46 "flags": ([str], "List of active flags")
47 }
48
49
50 def osd_task(name, metadata, wait_for=2.0):
51 return Task("osd/{}".format(name), metadata, wait_for)
52
53
54 @APIRouter('/osd', Scope.OSD)
55 @APIDoc('OSD management API', 'OSD')
56 class Osd(RESTController):
57 def list(self):
58 osds = self.get_osd_map()
59
60 # Extending by osd stats information
61 for stat in mgr.get('osd_stats')['osd_stats']:
62 if stat['osd'] in osds:
63 osds[stat['osd']]['osd_stats'] = stat
64
65 # Extending by osd node information
66 nodes = mgr.get('osd_map_tree')['nodes']
67 for node in nodes:
68 if node['type'] == 'osd' and node['id'] in osds:
69 osds[node['id']]['tree'] = node
70
71 # Extending by osd parent node information
72 for host in [n for n in nodes if n['type'] == 'host']:
73 for osd_id in host['children']:
74 if osd_id >= 0 and osd_id in osds:
75 osds[osd_id]['host'] = host
76
77 removing_osd_ids = self.get_removing_osds()
78
79 # Extending by osd histogram and orchestrator data
80 for osd_id, osd in osds.items():
81 osd['stats'] = {}
82 osd['stats_history'] = {}
83 osd_spec = str(osd_id)
84 if 'osd' not in osd:
85 continue # pragma: no cover - simple early continue
86 for stat in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']:
87 prop = stat.split('.')[1]
88 rates = CephService.get_rates('osd', osd_spec, stat)
89 osd['stats'][prop] = get_most_recent_rate(rates)
90 osd['stats_history'][prop] = rates
91 # Gauge stats
92 for stat in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
93 osd['stats'][stat.split('.')[1]] = mgr.get_latest('osd', osd_spec, stat)
94 osd['operational_status'] = self._get_operational_status(osd_id, removing_osd_ids)
95 return list(osds.values())
96
97 @RESTController.Collection('GET', version=APIVersion.EXPERIMENTAL)
98 @ReadPermission
99 def settings(self):
100 result = CephService.send_command('mon', 'osd dump')
101 return {
102 'nearfull_ratio': result['nearfull_ratio'],
103 'full_ratio': result['full_ratio']
104 }
105
106 def _get_operational_status(self, osd_id: int, removing_osd_ids: Optional[List[int]]):
107 if removing_osd_ids is None:
108 return 'unmanaged'
109 if osd_id in removing_osd_ids:
110 return 'deleting'
111 return 'working'
112
113 @staticmethod
114 def get_removing_osds() -> Optional[List[int]]:
115 orch = OrchClient.instance()
116 if orch.available(features=[OrchFeature.OSD_GET_REMOVE_STATUS]):
117 return [osd.osd_id for osd in orch.osds.removing_status()]
118 return None
119
120 @staticmethod
121 def get_osd_map(svc_id=None):
122 # type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
123 def add_id(osd):
124 osd['id'] = osd['osd']
125 return osd
126
127 resp = {
128 osd['osd']: add_id(osd)
129 for osd in mgr.get('osd_map')['osds'] if svc_id is None or osd['osd'] == int(svc_id)
130 }
131 return resp if svc_id is None else resp[int(svc_id)]
132
133 @staticmethod
134 def _get_smart_data(osd_id):
135 # type: (str) -> dict
136 """Returns S.M.A.R.T data for the given OSD ID."""
137 logger.debug('[SMART] retrieving data from OSD with ID %s', osd_id)
138 return CephService.get_smart_data_by_daemon('osd', osd_id)
139
140 @RESTController.Resource('GET')
141 def smart(self, svc_id):
142 # type: (str) -> dict
143 return self._get_smart_data(svc_id)
144
145 @handle_send_command_error('osd')
146 def get(self, svc_id):
147 """
148 Returns collected data about an OSD.
149
150 :return: Returns the requested data.
151 """
152 return {
153 'osd_map': self.get_osd_map(svc_id),
154 'osd_metadata': mgr.get_metadata('osd', svc_id),
155 'operational_status': self._get_operational_status(int(svc_id),
156 self.get_removing_osds())
157 }
158
159 @RESTController.Resource('GET')
160 @handle_send_command_error('osd')
161 def histogram(self, svc_id):
162 # type: (int) -> Dict[str, Any]
163 """
164 :return: Returns the histogram data.
165 """
166 try:
167 histogram = CephService.send_command(
168 'osd', srv_spec=svc_id, prefix='perf histogram dump')
169 except SendCommandError as e: # pragma: no cover - the handling is too obvious
170 raise DashboardException(
171 component='osd', http_status_code=400, msg=str(e))
172
173 return histogram
174
175 def set(self, svc_id, device_class): # pragma: no cover
176 old_device_class = CephService.send_command('mon', 'osd crush get-device-class',
177 ids=[svc_id])
178 old_device_class = old_device_class[0]['device_class']
179 if old_device_class != device_class:
180 CephService.send_command('mon', 'osd crush rm-device-class',
181 ids=[svc_id])
182 if device_class:
183 CephService.send_command('mon', 'osd crush set-device-class', **{
184 'class': device_class,
185 'ids': [svc_id]
186 })
187
188 def _check_delete(self, osd_ids):
189 # type: (List[str]) -> Dict[str, Any]
190 """
191 Check if it's safe to remove OSD(s).
192
193 :param osd_ids: list of OSD IDs
194 :return: a dictionary contains the following attributes:
195 `safe`: bool, indicate if it's safe to remove OSDs.
196 `message`: str, help message if it's not safe to remove OSDs.
197 """
198 _ = osd_ids
199 health_data = mgr.get('health') # type: ignore
200 health = json.loads(health_data['json'])
201 checks = health['checks'].keys()
202 unsafe_checks = set(['OSD_FULL', 'OSD_BACKFILLFULL', 'OSD_NEARFULL'])
203 failed_checks = checks & unsafe_checks
204 msg = 'Removing OSD(s) is not recommended because of these failed health check(s): {}.'.\
205 format(', '.join(failed_checks)) if failed_checks else ''
206 return {
207 'safe': not bool(failed_checks),
208 'message': msg
209 }
210
211 @DeletePermission
212 @raise_if_no_orchestrator([OrchFeature.OSD_DELETE, OrchFeature.OSD_GET_REMOVE_STATUS])
213 @handle_orchestrator_error('osd')
214 @osd_task('delete', {'svc_id': '{svc_id}'})
215 def delete(self, svc_id, preserve_id=None, force=None): # pragma: no cover
216 replace = False
217 check: Union[Dict[str, Any], bool] = False
218 try:
219 if preserve_id is not None:
220 replace = str_to_bool(preserve_id)
221 if force is not None:
222 check = not str_to_bool(force)
223 except ValueError:
224 raise DashboardException(
225 component='osd', http_status_code=400, msg='Invalid parameter(s)')
226 orch = OrchClient.instance()
227 if check:
228 logger.info('Check for removing osd.%s...', svc_id)
229 check = self._check_delete([svc_id])
230 if not check['safe']:
231 logger.error('Unable to remove osd.%s: %s', svc_id, check['message'])
232 raise DashboardException(component='osd', msg=check['message'])
233
234 logger.info('Start removing osd.%s (replace: %s)...', svc_id, replace)
235 orch.osds.remove([svc_id], replace)
236 while True:
237 removal_osds = orch.osds.removing_status()
238 logger.info('Current removing OSDs %s', removal_osds)
239 pending = [osd for osd in removal_osds if osd.osd_id == int(svc_id)]
240 if not pending:
241 break
242 logger.info('Wait until osd.%s is removed...', svc_id)
243 time.sleep(60)
244
245 @RESTController.Resource('POST', query_params=['deep'])
246 @UpdatePermission
247 @allow_empty_body
248 def scrub(self, svc_id, deep=False):
249 api_scrub = "osd deep-scrub" if str_to_bool(deep) else "osd scrub"
250 CephService.send_command("mon", api_scrub, who=svc_id)
251
252 @RESTController.Resource('PUT')
253 @EndpointDoc("Mark OSD flags (out, in, down, lost, ...)",
254 parameters={'svc_id': (str, 'SVC ID')})
255 def mark(self, svc_id, action):
256 """
257 Note: osd must be marked `down` before marking lost.
258 """
259 valid_actions = ['out', 'in', 'down', 'lost']
260 args = {'srv_type': 'mon', 'prefix': 'osd ' + action}
261 if action.lower() in valid_actions:
262 if action == 'lost':
263 args['id'] = int(svc_id)
264 args['yes_i_really_mean_it'] = True
265 else:
266 args['ids'] = [svc_id]
267
268 CephService.send_command(**args)
269 else:
270 logger.error("Invalid OSD mark action: %s attempted on SVC_ID: %s", action, svc_id)
271
272 @RESTController.Resource('POST')
273 @allow_empty_body
274 def reweight(self, svc_id, weight):
275 """
276 Reweights the OSD temporarily.
277
278 Note that ‘ceph osd reweight’ is not a persistent setting. When an OSD
279 gets marked out, the osd weight will be set to 0. When it gets marked
280 in again, the weight will be changed to 1.
281
282 Because of this ‘ceph osd reweight’ is a temporary solution. You should
283 only use it to keep your cluster running while you’re ordering more
284 hardware.
285
286 - Craig Lewis (http://lists.ceph.com/pipermail/ceph-users-ceph.com/2014-June/040967.html)
287 """
288 CephService.send_command(
289 'mon',
290 'osd reweight',
291 id=int(svc_id),
292 weight=float(weight))
293
294 def _create_bare(self, data):
295 """Create a OSD container that has no associated device.
296
297 :param data: contain attributes to create a bare OSD.
298 : `uuid`: will be set automatically if the OSD starts up
299 : `svc_id`: the ID is only used if a valid uuid is given.
300 """
301 try:
302 uuid = data['uuid']
303 svc_id = int(data['svc_id'])
304 except (KeyError, ValueError) as e:
305 raise DashboardException(e, component='osd', http_status_code=400)
306
307 result = CephService.send_command(
308 'mon', 'osd create', id=svc_id, uuid=uuid)
309 return {
310 'result': result,
311 'svc_id': svc_id,
312 'uuid': uuid,
313 }
314
315 @raise_if_no_orchestrator([OrchFeature.OSD_CREATE])
316 @handle_orchestrator_error('osd')
317 def _create_with_drive_groups(self, drive_groups):
318 """Create OSDs with DriveGroups."""
319 orch = OrchClient.instance()
320 try:
321 dg_specs = [DriveGroupSpec.from_json(dg) for dg in drive_groups]
322 orch.osds.create(dg_specs)
323 except (ValueError, TypeError, DriveGroupValidationError) as e:
324 raise DashboardException(e, component='osd')
325
326 @CreatePermission
327 @osd_task('create', {'tracking_id': '{tracking_id}'})
328 def create(self, method, data, tracking_id): # pylint: disable=unused-argument
329 if method == 'bare':
330 return self._create_bare(data)
331 if method == 'drive_groups':
332 return self._create_with_drive_groups(data)
333 raise DashboardException(
334 component='osd', http_status_code=400, msg='Unknown method: {}'.format(method))
335
336 @RESTController.Resource('POST')
337 @allow_empty_body
338 def purge(self, svc_id):
339 """
340 Note: osd must be marked `down` before removal.
341 """
342 CephService.send_command('mon', 'osd purge-actual', id=int(svc_id),
343 yes_i_really_mean_it=True)
344
345 @RESTController.Resource('POST')
346 @allow_empty_body
347 def destroy(self, svc_id):
348 """
349 Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but
350 removes cephx keys, config-key data and lockbox keys, rendering data
351 permanently unreadable.
352
353 The osd must be marked down before being destroyed.
354 """
355 CephService.send_command(
356 'mon', 'osd destroy-actual', id=int(svc_id), yes_i_really_mean_it=True)
357
358 @Endpoint('GET', query_params=['ids'])
359 @ReadPermission
360 @EndpointDoc("Check If OSD is Safe to Destroy",
361 parameters={
362 'ids': (str, 'OSD Service Identifier'),
363 },
364 responses={200: SAFE_TO_DESTROY_SCHEMA})
365 def safe_to_destroy(self, ids):
366 """
367 :type ids: int|[int]
368 """
369
370 ids = json.loads(ids)
371 if isinstance(ids, list):
372 ids = list(map(str, ids))
373 else:
374 ids = [str(ids)]
375
376 try:
377 result = CephService.send_command(
378 'mon', 'osd safe-to-destroy', ids=ids, target=('mgr', ''))
379 result['is_safe_to_destroy'] = set(result['safe_to_destroy']) == set(map(int, ids))
380 return result
381
382 except SendCommandError as e:
383 return {
384 'message': str(e),
385 'is_safe_to_destroy': False,
386 }
387
388 @Endpoint('GET', query_params=['svc_ids'])
389 @ReadPermission
390 @raise_if_no_orchestrator()
391 @handle_orchestrator_error('osd')
392 def safe_to_delete(self, svc_ids):
393 """
394 :type ids: int|[int]
395 """
396 check = self._check_delete(svc_ids)
397 return {
398 'is_safe_to_delete': check.get('safe', False),
399 'message': check.get('message', '')
400 }
401
402 @RESTController.Resource('GET')
403 def devices(self, svc_id):
404 # (str) -> dict
405 return CephService.send_command('mon', 'device ls-by-daemon', who='osd.{}'.format(svc_id))
406
407
408 @APIRouter('/osd/flags', Scope.OSD)
409 @APIDoc(group='OSD')
410 class OsdFlagsController(RESTController):
411 @staticmethod
412 def _osd_flags():
413 enabled_flags = mgr.get('osd_map')['flags_set']
414 if 'pauserd' in enabled_flags and 'pausewr' in enabled_flags:
415 # 'pause' is set by calling `ceph osd set pause` and unset by
416 # calling `set osd unset pause`, but `ceph osd dump | jq '.flags'`
417 # will contain 'pauserd,pausewr' if pause is set.
418 # Let's pretend to the API that 'pause' is in fact a proper flag.
419 enabled_flags = list(
420 set(enabled_flags) - {'pauserd', 'pausewr'} | {'pause'})
421 return sorted(enabled_flags)
422
423 @staticmethod
424 def _update_flags(action, flags, ids=None):
425 if ids:
426 if flags:
427 ids = list(map(str, ids))
428 CephService.send_command('mon', 'osd ' + action, who=ids,
429 flags=','.join(flags))
430 else:
431 for flag in flags:
432 CephService.send_command('mon', 'osd ' + action, '', key=flag)
433
434 @EndpointDoc("Display OSD Flags",
435 responses={200: EXPORT_FLAGS_SCHEMA})
436 def list(self):
437 return self._osd_flags()
438
439 @EndpointDoc('Sets OSD flags for the entire cluster.',
440 parameters={
441 'flags': ([str], 'List of flags to set. The flags `recovery_deletes`, '
442 '`sortbitwise` and `pglog_hardlimit` cannot be unset. '
443 'Additionally `purged_snapshots` cannot even be set.')
444 },
445 responses={200: EXPORT_FLAGS_SCHEMA})
446 def bulk_set(self, flags):
447 """
448 The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
449 `purged_snapshots` cannot even be set. It is therefore required to at
450 least include those four flags for a successful operation.
451 """
452 assert isinstance(flags, list)
453
454 enabled_flags = set(self._osd_flags())
455 data = set(flags)
456 added = data - enabled_flags
457 removed = enabled_flags - data
458
459 self._update_flags('set', added)
460 self._update_flags('unset', removed)
461
462 logger.info('Changed OSD flags: added=%s removed=%s', added, removed)
463
464 return sorted(enabled_flags - removed | added)
465
466 @Endpoint('PUT', 'individual')
467 @UpdatePermission
468 @EndpointDoc('Sets OSD flags for a subset of individual OSDs.',
469 parameters={
470 'flags': ({'noout': (bool, 'Sets/unsets `noout`', True, None),
471 'noin': (bool, 'Sets/unsets `noin`', True, None),
472 'noup': (bool, 'Sets/unsets `noup`', True, None),
473 'nodown': (bool, 'Sets/unsets `nodown`', True, None)},
474 'Directory of flags to set or unset. The flags '
475 '`noin`, `noout`, `noup` and `nodown` are going to '
476 'be considered only.'),
477 'ids': ([int], 'List of OSD ids the flags should be applied '
478 'to.')
479 },
480 responses={200: EXPORT_INDIV_FLAGS_SCHEMA})
481 def set_individual(self, flags, ids):
482 """
483 Updates flags (`noout`, `noin`, `nodown`, `noup`) for an individual
484 subset of OSDs.
485 """
486 assert isinstance(flags, dict)
487 assert isinstance(ids, list)
488 assert all(isinstance(id, int) for id in ids)
489
490 # These are to only flags that can be applied to an OSD individually.
491 all_flags = {'noin', 'noout', 'nodown', 'noup'}
492 added = set()
493 removed = set()
494 for flag, activated in flags.items():
495 if flag in all_flags:
496 if activated is not None:
497 if activated:
498 added.add(flag)
499 else:
500 removed.add(flag)
501
502 self._update_flags('set-group', added, ids)
503 self._update_flags('unset-group', removed, ids)
504
505 logger.error('Changed individual OSD flags: added=%s removed=%s for ids=%s',
506 added, removed, ids)
507
508 return {'added': sorted(added),
509 'removed': sorted(removed),
510 'ids': ids}
511
512 @Endpoint('GET', 'individual')
513 @ReadPermission
514 @EndpointDoc('Displays individual OSD flags',
515 responses={200: EXPORT_INDIV_FLAGS_GET_SCHEMA})
516 def get_individual(self):
517 osd_map = mgr.get('osd_map')['osds']
518 resp = []
519
520 for osd in osd_map:
521 resp.append({
522 'osd': osd['osd'],
523 'flags': osd['state']
524 })
525 return resp