# -*- coding: utf-8 -*-
-from __future__ import absolute_import
+
import json
import logging
import time
+from typing import Any, Dict, List, Optional, Union
-from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupValidationError
+from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupValidationError # type: ignore
from mgr_util import get_most_recent_rate
-from . import ApiController, RESTController, Endpoint, Task, allow_empty_body
-from . import CreatePermission, ReadPermission, UpdatePermission, DeletePermission
-from .orchestrator import raise_if_no_orchestrator
from .. import mgr
from ..exceptions import DashboardException
from ..security import Scope
from ..services.ceph_service import CephService, SendCommandError
-from ..services.exception import handle_send_command_error, handle_orchestrator_error
-from ..services.orchestrator import OrchClient
+from ..services.exception import handle_orchestrator_error, handle_send_command_error
+from ..services.orchestrator import OrchClient, OrchFeature
from ..tools import str_to_bool
-try:
- from typing import Dict, List, Any, Union # noqa: F401 pylint: disable=unused-import
-except ImportError: # pragma: no cover
- pass # For typing only
-
+from . import APIDoc, APIRouter, CreatePermission, DeletePermission, Endpoint, \
+ EndpointDoc, ReadPermission, RESTController, Task, UpdatePermission, \
+ allow_empty_body
+from ._version import APIVersion
+from .orchestrator import raise_if_no_orchestrator
logger = logging.getLogger('controllers.osd')
+SAFE_TO_DESTROY_SCHEMA = {
+ "safe_to_destroy": ([str], "Is OSD safe to destroy?"),
+ "active": ([int], ""),
+ "missing_stats": ([str], ""),
+ "stored_pgs": ([str], "Stored Pool groups in Osd"),
+ "is_safe_to_destroy": (bool, "Is OSD safe to destroy?")
+}
+
+EXPORT_FLAGS_SCHEMA = {
+ "list_of_flags": ([str], "")
+}
+
+EXPORT_INDIV_FLAGS_SCHEMA = {
+ "added": ([str], "List of added flags"),
+ "removed": ([str], "List of removed flags"),
+ "ids": ([int], "List of updated OSDs")
+}
+
+EXPORT_INDIV_FLAGS_GET_SCHEMA = {
+ "osd": (int, "OSD ID"),
+ "flags": ([str], "List of active flags")
+}
+
def osd_task(name, metadata, wait_for=2.0):
return Task("osd/{}".format(name), metadata, wait_for)
-@ApiController('/osd', Scope.OSD)
+@APIRouter('/osd', Scope.OSD)
+@APIDoc('OSD management API', 'OSD')
class Osd(RESTController):
def list(self):
osds = self.get_osd_map()
if osd_id >= 0 and osd_id in osds:
osds[osd_id]['host'] = host
- # Extending by osd histogram data
+ removing_osd_ids = self.get_removing_osds()
+
+ # Extending by osd histogram and orchestrator data
for osd_id, osd in osds.items():
osd['stats'] = {}
osd['stats_history'] = {}
# Gauge stats
for stat in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']:
osd['stats'][stat.split('.')[1]] = mgr.get_latest('osd', osd_spec, stat)
-
+ osd['operational_status'] = self._get_operational_status(osd_id, removing_osd_ids)
return list(osds.values())
+ @RESTController.Collection('GET', version=APIVersion.EXPERIMENTAL)
+ @ReadPermission
+ def settings(self):
+ result = CephService.send_command('mon', 'osd dump')
+ return {
+ 'nearfull_ratio': result['nearfull_ratio'],
+ 'full_ratio': result['full_ratio']
+ }
+
+ def _get_operational_status(self, osd_id: int, removing_osd_ids: Optional[List[int]]):
+ if removing_osd_ids is None:
+ return 'unmanaged'
+ if osd_id in removing_osd_ids:
+ return 'deleting'
+ return 'working'
+
+ @staticmethod
+ def get_removing_osds() -> Optional[List[int]]:
+ orch = OrchClient.instance()
+ if orch.available(features=[OrchFeature.OSD_GET_REMOVE_STATUS]):
+ return [osd.osd_id for osd in orch.osds.removing_status()]
+ return None
+
@staticmethod
def get_osd_map(svc_id=None):
# type: (Union[int, None]) -> Dict[int, Union[dict, Any]]
def _get_smart_data(osd_id):
# type: (str) -> dict
"""Returns S.M.A.R.T data for the given OSD ID."""
+ logger.debug('[SMART] retrieving data from OSD with ID %s', osd_id)
return CephService.get_smart_data_by_daemon('osd', osd_id)
@RESTController.Resource('GET')
"""
Returns collected data about an OSD.
- :return: Returns the requested data. The `histogram` key may contain a
- string with an error that occurred if the OSD is down.
+ :return: Returns the requested data.
"""
- try:
- histogram = CephService.send_command(
- 'osd', srv_spec=svc_id, prefix='perf histogram dump')
- except SendCommandError as e: # pragma: no cover - the handling is too obvious
- if 'osd down' in str(e): # pragma: no cover - no complexity there
- histogram = str(e)
- else: # pragma: no cover - no complexity there
- raise
-
return {
'osd_map': self.get_osd_map(svc_id),
'osd_metadata': mgr.get_metadata('osd', svc_id),
- 'histogram': histogram,
+ 'operational_status': self._get_operational_status(int(svc_id),
+ self.get_removing_osds())
}
@RESTController.Resource('GET')
}
@DeletePermission
- @raise_if_no_orchestrator
+ @raise_if_no_orchestrator([OrchFeature.OSD_DELETE, OrchFeature.OSD_GET_REMOVE_STATUS])
@handle_orchestrator_error('osd')
@osd_task('delete', {'svc_id': '{svc_id}'})
def delete(self, svc_id, preserve_id=None, force=None): # pragma: no cover
replace = False
- check = False
+ check: Union[Dict[str, Any], bool] = False
try:
if preserve_id is not None:
replace = str_to_bool(preserve_id)
except ValueError:
raise DashboardException(
component='osd', http_status_code=400, msg='Invalid parameter(s)')
-
orch = OrchClient.instance()
if check:
logger.info('Check for removing osd.%s...', svc_id)
while True:
removal_osds = orch.osds.removing_status()
logger.info('Current removing OSDs %s', removal_osds)
- pending = [osd for osd in removal_osds if osd.osd_id == svc_id]
+ pending = [osd for osd in removal_osds if osd.osd_id == int(svc_id)]
if not pending:
break
logger.info('Wait until osd.%s is removed...', svc_id)
api_scrub = "osd deep-scrub" if str_to_bool(deep) else "osd scrub"
CephService.send_command("mon", api_scrub, who=svc_id)
- @RESTController.Resource('POST')
- @allow_empty_body
- def mark_out(self, svc_id):
- CephService.send_command('mon', 'osd out', ids=[svc_id])
-
- @RESTController.Resource('POST')
- @allow_empty_body
- def mark_in(self, svc_id):
- CephService.send_command('mon', 'osd in', ids=[svc_id])
-
- @RESTController.Resource('POST')
- @allow_empty_body
- def mark_down(self, svc_id):
- CephService.send_command('mon', 'osd down', ids=[svc_id])
+ @RESTController.Resource('PUT')
+ @EndpointDoc("Mark OSD flags (out, in, down, lost, ...)",
+ parameters={'svc_id': (str, 'SVC ID')})
+ def mark(self, svc_id, action):
+ """
+ Note: osd must be marked `down` before marking lost.
+ """
+ valid_actions = ['out', 'in', 'down', 'lost']
+ args = {'srv_type': 'mon', 'prefix': 'osd ' + action}
+ if action.lower() in valid_actions:
+ if action == 'lost':
+ args['id'] = int(svc_id)
+ args['yes_i_really_mean_it'] = True
+ else:
+ args['ids'] = [svc_id]
+
+ CephService.send_command(**args)
+ else:
+ logger.error("Invalid OSD mark action: %s attempted on SVC_ID: %s", action, svc_id)
@RESTController.Resource('POST')
@allow_empty_body
id=int(svc_id),
weight=float(weight))
- @RESTController.Resource('POST')
- @allow_empty_body
- def mark_lost(self, svc_id):
- """
- Note: osd must be marked `down` before marking lost.
- """
- CephService.send_command(
- 'mon',
- 'osd lost',
- id=int(svc_id),
- yes_i_really_mean_it=True)
-
def _create_bare(self, data):
"""Create a OSD container that has no associated device.
'uuid': uuid,
}
- @raise_if_no_orchestrator
+ @raise_if_no_orchestrator([OrchFeature.OSD_CREATE])
@handle_orchestrator_error('osd')
def _create_with_drive_groups(self, drive_groups):
"""Create OSDs with DriveGroups."""
@CreatePermission
@osd_task('create', {'tracking_id': '{tracking_id}'})
- def create(self, method, data, tracking_id): # pylint: disable=W0622
+ def create(self, method, data, tracking_id): # pylint: disable=unused-argument
if method == 'bare':
return self._create_bare(data)
if method == 'drive_groups':
@Endpoint('GET', query_params=['ids'])
@ReadPermission
+ @EndpointDoc("Check If OSD is Safe to Destroy",
+ parameters={
+ 'ids': (str, 'OSD Service Identifier'),
+ },
+ responses={200: SAFE_TO_DESTROY_SCHEMA})
def safe_to_destroy(self, ids):
"""
:type ids: int|[int]
@Endpoint('GET', query_params=['svc_ids'])
@ReadPermission
- @raise_if_no_orchestrator
+ @raise_if_no_orchestrator()
@handle_orchestrator_error('osd')
def safe_to_delete(self, svc_ids):
"""
return CephService.send_command('mon', 'device ls-by-daemon', who='osd.{}'.format(svc_id))
-@ApiController('/osd/flags', Scope.OSD)
+@APIRouter('/osd/flags', Scope.OSD)
+@APIDoc(group='OSD')
class OsdFlagsController(RESTController):
@staticmethod
def _osd_flags():
for flag in flags:
CephService.send_command('mon', 'osd ' + action, '', key=flag)
+ @EndpointDoc("Display OSD Flags",
+ responses={200: EXPORT_FLAGS_SCHEMA})
def list(self):
return self._osd_flags()
+ @EndpointDoc('Sets OSD flags for the entire cluster.',
+ parameters={
+ 'flags': ([str], 'List of flags to set. The flags `recovery_deletes`, '
+ '`sortbitwise` and `pglog_hardlimit` cannot be unset. '
+ 'Additionally `purged_snapshots` cannot even be set.')
+ },
+ responses={200: EXPORT_FLAGS_SCHEMA})
def bulk_set(self, flags):
"""
The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset.
@Endpoint('PUT', 'individual')
@UpdatePermission
+ @EndpointDoc('Sets OSD flags for a subset of individual OSDs.',
+ parameters={
+ 'flags': ({'noout': (bool, 'Sets/unsets `noout`', True, None),
+ 'noin': (bool, 'Sets/unsets `noin`', True, None),
+ 'noup': (bool, 'Sets/unsets `noup`', True, None),
+ 'nodown': (bool, 'Sets/unsets `nodown`', True, None)},
+ 'Directory of flags to set or unset. The flags '
+ '`noin`, `noout`, `noup` and `nodown` are going to '
+ 'be considered only.'),
+ 'ids': ([int], 'List of OSD ids the flags should be applied '
+ 'to.')
+ },
+ responses={200: EXPORT_INDIV_FLAGS_SCHEMA})
def set_individual(self, flags, ids):
"""
Updates flags (`noout`, `noin`, `nodown`, `noup`) for an individual
@Endpoint('GET', 'individual')
@ReadPermission
+ @EndpointDoc('Displays individual OSD flags',
+ responses={200: EXPORT_INDIV_FLAGS_GET_SCHEMA})
def get_individual(self):
osd_map = mgr.get('osd_map')['osds']
resp = []