# -*- coding: utf-8 -*-
-from __future__ import absolute_import
+
import json
import logging
import time
+from typing import Any, Dict, List, Optional, Union
-from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupValidationError
+from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupValidationError # type: ignore
from mgr_util import get_most_recent_rate
-from . import ApiController, RESTController, Endpoint, Task
-from . import CreatePermission, ReadPermission, UpdatePermission, DeletePermission
-from .orchestrator import raise_if_no_orchestrator
from .. import mgr
from ..exceptions import DashboardException
from ..security import Scope
from ..services.ceph_service import CephService, SendCommandError
-from ..services.exception import handle_send_command_error, handle_orchestrator_error
-from ..services.orchestrator import OrchClient
+from ..services.exception import handle_orchestrator_error, handle_send_command_error
+from ..services.orchestrator import OrchClient, OrchFeature
from ..tools import str_to_bool
-try:
- from typing import Dict, List, Any, Union # noqa: F401 pylint: disable=unused-import
-except ImportError: # pragma: no cover
- pass # For typing only
-
+from . import APIDoc, APIRouter, CreatePermission, DeletePermission, Endpoint, \
+ EndpointDoc, ReadPermission, RESTController, Task, UpdatePermission, \
+ allow_empty_body
+from ._version import APIVersion
+from .orchestrator import raise_if_no_orchestrator
logger = logging.getLogger('controllers.osd')
+SAFE_TO_DESTROY_SCHEMA = {
+ "safe_to_destroy": ([str], "Is OSD safe to destroy?"),
+ "active": ([int], ""),
+ "missing_stats": ([str], ""),
+ "stored_pgs": ([str], "Stored Pool groups in Osd"),
+ "is_safe_to_destroy": (bool, "Is OSD safe to destroy?")
+}
+
+EXPORT_FLAGS_SCHEMA = {
+ "list_of_flags": ([str], "")
+}
+
+EXPORT_INDIV_FLAGS_SCHEMA = {
+ "added": ([str], "List of added flags"),
+ "removed": ([str], "List of removed flags"),
+ "ids": ([int], "List of updated OSDs")
+}
+
+EXPORT_INDIV_FLAGS_GET_SCHEMA = {
+ "osd": (int, "OSD ID"),
+ "flags": ([str], "List of active flags")
+}
+
def osd_task(name, metadata, wait_for=2.0):
return Task("osd/{}".format(name), metadata, wait_for)
-@ApiController('/osd', Scope.OSD)
+@APIRouter('/osd', Scope.OSD)
+@APIDoc('OSD management API', 'OSD')
class Osd(RESTController):
def list(self):
osds = self.get_osd_map()
if osd_id >= 0 and osd_id in osds:
osds[osd_id]['host'] = host
- # Extending by osd histogram data
+ removing_osd_ids = self.get_removing_osds()
+
+ # Extending by osd histogram and orchestrator data
for osd_id, osd in osds.items():
osd['stats'] = {}
osd['stats_history'] = {}
# Gauge stats
for stat in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
osd['stats'][stat.split('.')[1]] = mgr.get_latest('osd', osd_spec, stat)
-
+ osd['operational_status'] = self._get_operational_status(osd_id, removing_osd_ids)
return list(osds.values())
+ @RESTController.Collection('GET', version=APIVersion.EXPERIMENTAL)
+ @ReadPermission
+ def settings(self):
+ result = CephService.send_command('mon', 'osd dump')
+ return {
+ 'nearfull_ratio': result['nearfull_ratio'],
+ 'full_ratio': result['full_ratio']
+ }
+
+ def _get_operational_status(self, osd_id: int, removing_osd_ids: Optional[List[int]]):
+ if removing_osd_ids is None:
+ return 'unmanaged'
+ if osd_id in removing_osd_ids:
+ return 'deleting'
+ return 'working'
+
+ @staticmethod
+ def get_removing_osds() -> Optional[List[int]]:
+ orch = OrchClient.instance()
+ if orch.available(features=[OrchFeature.OSD_GET_REMOVE_STATUS]):
+ return [osd.osd_id for osd in orch.osds.removing_status()]
+ return None
+
@staticmethod
def get_osd_map(svc_id=None):
# type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
def _get_smart_data(osd_id):
# type: (str) -> dict
"""Returns S.M.A.R.T data for the given OSD ID."""
+ logger.debug('[SMART] retrieving data from OSD with ID %s', osd_id)
return CephService.get_smart_data_by_daemon('osd', osd_id)
@RESTController.Resource('GET')
"""
Returns collected data about an OSD.
- :return: Returns the requested data. The `histogram` key may contain a
- string with an error that occurred if the OSD is down.
+ :return: Returns the requested data.
+ """
+ return {
+ 'osd_map': self.get_osd_map(svc_id),
+ 'osd_metadata': mgr.get_metadata('osd', svc_id),
+ 'operational_status': self._get_operational_status(int(svc_id),
+ self.get_removing_osds())
+ }
+
+ @RESTController.Resource('GET')
+ @handle_send_command_error('osd')
+ def histogram(self, svc_id):
+ # type: (int) -> Dict[str, Any]
+ """
+ :return: Returns the histogram data.
"""
try:
histogram = CephService.send_command(
'osd', srv_spec=svc_id, prefix='perf histogram dump')
except SendCommandError as e: # pragma: no cover - the handling is too obvious
- if 'osd down' in str(e): # pragma: no cover - no complexity there
- histogram = str(e)
- else: # pragma: no cover - no complexity there
- raise
+ raise DashboardException(
+ component='osd', http_status_code=400, msg=str(e))
- return {
- 'osd_map': self.get_osd_map(svc_id),
- 'osd_metadata': mgr.get_metadata('osd', svc_id),
- 'histogram': histogram,
- }
+ return histogram
def set(self, svc_id, device_class): # pragma: no cover
old_device_class = CephService.send_command('mon', 'osd crush get-device-class',
}
@DeletePermission
- @raise_if_no_orchestrator
+ @raise_if_no_orchestrator([OrchFeature.OSD_DELETE, OrchFeature.OSD_GET_REMOVE_STATUS])
@handle_orchestrator_error('osd')
@osd_task('delete', {'svc_id': '{svc_id}'})
def delete(self, svc_id, preserve_id=None, force=None): # pragma: no cover
replace = False
- check = False
+ check: Union[Dict[str, Any], bool] = False
try:
if preserve_id is not None:
replace = str_to_bool(preserve_id)
except ValueError:
raise DashboardException(
component='osd', http_status_code=400, msg='Invalid parameter(s)')
-
orch = OrchClient.instance()
if check:
logger.info('Check for removing osd.%s...', svc_id)
while True:
removal_osds = orch.osds.removing_status()
logger.info('Current removing OSDs %s', removal_osds)
- pending = [osd for osd in removal_osds if osd.osd_id == svc_id]
+ pending = [osd for osd in removal_osds if osd.osd_id == int(svc_id)]
if not pending:
break
logger.info('Wait until osd.%s is removed...', svc_id)
@RESTController.Resource('POST', query_params=['deep'])
@UpdatePermission
+ @allow_empty_body
def scrub(self, svc_id, deep=False):
api_scrub = "osd deep-scrub" if str_to_bool(deep) else "osd scrub"
CephService.send_command("mon", api_scrub, who=svc_id)
- @RESTController.Resource('POST')
- def mark_out(self, svc_id):
- CephService.send_command('mon', 'osd out', ids=[svc_id])
-
- @RESTController.Resource('POST')
- def mark_in(self, svc_id):
- CephService.send_command('mon', 'osd in', ids=[svc_id])
-
- @RESTController.Resource('POST')
- def mark_down(self, svc_id):
- CephService.send_command('mon', 'osd down', ids=[svc_id])
+ @RESTController.Resource('PUT')
+ @EndpointDoc("Mark OSD flags (out, in, down, lost, ...)",
+ parameters={'svc_id': (str, 'SVC ID')})
+ def mark(self, svc_id, action):
+ """
+ Note: osd must be marked `down` before marking lost.
+ """
+ valid_actions = ['out', 'in', 'down', 'lost']
+ args = {'srv_type': 'mon', 'prefix': 'osd ' + action}
+ if action.lower() in valid_actions:
+ if action == 'lost':
+ args['id'] = int(svc_id)
+ args['yes_i_really_mean_it'] = True
+ else:
+ args['ids'] = [svc_id]
+
+ CephService.send_command(**args)
+ else:
+ logger.error("Invalid OSD mark action: %s attempted on SVC_ID: %s", action, svc_id)
@RESTController.Resource('POST')
+ @allow_empty_body
def reweight(self, svc_id, weight):
"""
Reweights the OSD temporarily.
id=int(svc_id),
weight=float(weight))
- @RESTController.Resource('POST')
- def mark_lost(self, svc_id):
- """
- Note: osd must be marked `down` before marking lost.
- """
- CephService.send_command(
- 'mon',
- 'osd lost',
- id=int(svc_id),
- yes_i_really_mean_it=True)
-
def _create_bare(self, data):
"""Create a OSD container that has no associated device.
'uuid': uuid,
}
- @raise_if_no_orchestrator
+ @raise_if_no_orchestrator([OrchFeature.OSD_CREATE])
@handle_orchestrator_error('osd')
def _create_with_drive_groups(self, drive_groups):
"""Create OSDs with DriveGroups."""
@CreatePermission
@osd_task('create', {'tracking_id': '{tracking_id}'})
- def create(self, method, data, tracking_id): # pylint: disable=W0622
+ def create(self, method, data, tracking_id): # pylint: disable=unused-argument
if method == 'bare':
return self._create_bare(data)
if method == 'drive_groups':
component='osd', http_status_code=400, msg='Unknown method: {}'.format(method))
@RESTController.Resource('POST')
+ @allow_empty_body
def purge(self, svc_id):
"""
Note: osd must be marked `down` before removal.
yes_i_really_mean_it=True)
@RESTController.Resource('POST')
+ @allow_empty_body
def destroy(self, svc_id):
"""
Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but
@Endpoint('GET', query_params=['ids'])
@ReadPermission
+ @EndpointDoc("Check If OSD is Safe to Destroy",
+ parameters={
+ 'ids': (str, 'OSD Service Identifier'),
+ },
+ responses={200: SAFE_TO_DESTROY_SCHEMA})
def safe_to_destroy(self, ids):
"""
:type ids: int|[int]
@Endpoint('GET', query_params=['svc_ids'])
@ReadPermission
- @raise_if_no_orchestrator
+ @raise_if_no_orchestrator()
@handle_orchestrator_error('osd')
def safe_to_delete(self, svc_ids):
"""
return CephService.send_command('mon', 'device ls-by-daemon', who='osd.{}'.format(svc_id))
-@ApiController('/osd/flags', Scope.OSD)
+@APIRouter('/osd/flags', Scope.OSD)
+@APIDoc(group='OSD')
class OsdFlagsController(RESTController):
@staticmethod
def _osd_flags():
set(enabled_flags) - {'pauserd', 'pausewr'} | {'pause'})
return sorted(enabled_flags)
+ @staticmethod
+ def _update_flags(action, flags, ids=None):
+ if ids:
+ if flags:
+ ids = list(map(str, ids))
+ CephService.send_command('mon', 'osd ' + action, who=ids,
+ flags=','.join(flags))
+ else:
+ for flag in flags:
+ CephService.send_command('mon', 'osd ' + action, '', key=flag)
+
+ @EndpointDoc("Display OSD Flags",
+ responses={200: EXPORT_FLAGS_SCHEMA})
def list(self):
return self._osd_flags()
+ @EndpointDoc('Sets OSD flags for the entire cluster.',
+ parameters={
+ 'flags': ([str], 'List of flags to set. The flags `recovery_deletes`, '
+ '`sortbitwise` and `pglog_hardlimit` cannot be unset. '
+ 'Additionally `purged_snapshots` cannot even be set.')
+ },
+ responses={200: EXPORT_FLAGS_SCHEMA})
def bulk_set(self, flags):
"""
The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
data = set(flags)
added = data - enabled_flags
removed = enabled_flags - data
- for flag in added:
- CephService.send_command('mon', 'osd set', '', key=flag)
- for flag in removed:
- CephService.send_command('mon', 'osd unset', '', key=flag)
+
+ self._update_flags('set', added)
+ self._update_flags('unset', removed)
+
logger.info('Changed OSD flags: added=%s removed=%s', added, removed)
return sorted(enabled_flags - removed | added)
+
+ @Endpoint('PUT', 'individual')
+ @UpdatePermission
+ @EndpointDoc('Sets OSD flags for a subset of individual OSDs.',
+ parameters={
+ 'flags': ({'noout': (bool, 'Sets/unsets `noout`', True, None),
+ 'noin': (bool, 'Sets/unsets `noin`', True, None),
+ 'noup': (bool, 'Sets/unsets `noup`', True, None),
+ 'nodown': (bool, 'Sets/unsets `nodown`', True, None)},
+ 'Directory of flags to set or unset. The flags '
+ '`noin`, `noout`, `noup` and `nodown` are going to '
+ 'be considered only.'),
+ 'ids': ([int], 'List of OSD ids the flags should be applied '
+ 'to.')
+ },
+ responses={200: EXPORT_INDIV_FLAGS_SCHEMA})
+ def set_individual(self, flags, ids):
+ """
+ Updates flags (`noout`, `noin`, `nodown`, `noup`) for an individual
+ subset of OSDs.
+ """
+ assert isinstance(flags, dict)
+ assert isinstance(ids, list)
+ assert all(isinstance(id, int) for id in ids)
+
+ # These are to only flags that can be applied to an OSD individually.
+ all_flags = {'noin', 'noout', 'nodown', 'noup'}
+ added = set()
+ removed = set()
+ for flag, activated in flags.items():
+ if flag in all_flags:
+ if activated is not None:
+ if activated:
+ added.add(flag)
+ else:
+ removed.add(flag)
+
+ self._update_flags('set-group', added, ids)
+ self._update_flags('unset-group', removed, ids)
+
+ logger.error('Changed individual OSD flags: added=%s removed=%s for ids=%s',
+ added, removed, ids)
+
+ return {'added': sorted(added),
+ 'removed': sorted(removed),
+ 'ids': ids}
+
+ @Endpoint('GET', 'individual')
+ @ReadPermission
+ @EndpointDoc('Displays individual OSD flags',
+ responses={200: EXPORT_INDIV_FLAGS_GET_SCHEMA})
+ def get_individual(self):
+ osd_map = mgr.get('osd_map')['osds']
+ resp = []
+
+ for osd in osd_map:
+ resp.append({
+ 'osd': osd['osd'],
+ 'flags': osd['state']
+ })
+ return resp