]>
Commit | Line | Data |
---|---|---|
1 | # -*- coding: utf-8 -*- | |
2 | from __future__ import absolute_import | |
3 | import json | |
4 | import logging | |
5 | import time | |
6 | ||
7 | from ceph.deployment.drive_group import DriveGroupSpec, DriveGroupValidationError | |
8 | from mgr_util import get_most_recent_rate | |
9 | ||
10 | from . import ApiController, RESTController, Endpoint, Task, allow_empty_body | |
11 | from . import CreatePermission, ReadPermission, UpdatePermission, DeletePermission | |
12 | from .orchestrator import raise_if_no_orchestrator | |
13 | from .. import mgr | |
14 | from ..exceptions import DashboardException | |
15 | from ..security import Scope | |
16 | from ..services.ceph_service import CephService, SendCommandError | |
17 | from ..services.exception import handle_send_command_error, handle_orchestrator_error | |
18 | from ..services.orchestrator import OrchClient | |
19 | from ..tools import str_to_bool | |
20 | try: | |
21 | from typing import Dict, List, Any, Union # noqa: F401 pylint: disable=unused-import | |
22 | except ImportError: # pragma: no cover | |
23 | pass # For typing only | |
24 | ||
25 | ||
26 | logger = logging.getLogger('controllers.osd') | |
27 | ||
28 | ||
29 | def osd_task(name, metadata, wait_for=2.0): | |
30 | return Task("osd/{}".format(name), metadata, wait_for) | |
31 | ||
32 | ||
33 | @ApiController('/osd', Scope.OSD) | |
34 | class Osd(RESTController): | |
35 | def list(self): | |
36 | osds = self.get_osd_map() | |
37 | ||
38 | # Extending by osd stats information | |
39 | for stat in mgr.get('osd_stats')['osd_stats']: | |
40 | if stat['osd'] in osds: | |
41 | osds[stat['osd']]['osd_stats'] = stat | |
42 | ||
43 | # Extending by osd node information | |
44 | nodes = mgr.get('osd_map_tree')['nodes'] | |
45 | for node in nodes: | |
46 | if node['type'] == 'osd' and node['id'] in osds: | |
47 | osds[node['id']]['tree'] = node | |
48 | ||
49 | # Extending by osd parent node information | |
50 | for host in [n for n in nodes if n['type'] == 'host']: | |
51 | for osd_id in host['children']: | |
52 | if osd_id >= 0 and osd_id in osds: | |
53 | osds[osd_id]['host'] = host | |
54 | ||
55 | # Extending by osd histogram data | |
56 | for osd_id, osd in osds.items(): | |
57 | osd['stats'] = {} | |
58 | osd['stats_history'] = {} | |
59 | osd_spec = str(osd_id) | |
60 | if 'osd' not in osd: | |
61 | continue # pragma: no cover - simple early continue | |
62 | for stat in ['osd.op_w', 'osd.op_in_bytes', 'osd.op_r', 'osd.op_out_bytes']: | |
63 | prop = stat.split('.')[1] | |
64 | rates = CephService.get_rates('osd', osd_spec, stat) | |
65 | osd['stats'][prop] = get_most_recent_rate(rates) | |
66 | osd['stats_history'][prop] = rates | |
67 | # Gauge stats | |
68 | for stat in ['osd.numpg', 'osd.stat_bytes', 'osd.stat_bytes_used']: | |
69 | osd['stats'][stat.split('.')[1]] = mgr.get_latest('osd', osd_spec, stat) | |
70 | ||
71 | return list(osds.values()) | |
72 | ||
73 | @staticmethod | |
74 | def get_osd_map(svc_id=None): | |
75 | # type: (Union[int, None]) -> Dict[int, Union[dict, Any]] | |
76 | def add_id(osd): | |
77 | osd['id'] = osd['osd'] | |
78 | return osd | |
79 | ||
80 | resp = { | |
81 | osd['osd']: add_id(osd) | |
82 | for osd in mgr.get('osd_map')['osds'] if svc_id is None or osd['osd'] == int(svc_id) | |
83 | } | |
84 | return resp if svc_id is None else resp[int(svc_id)] | |
85 | ||
86 | @staticmethod | |
87 | def _get_smart_data(osd_id): | |
88 | # type: (str) -> dict | |
89 | """Returns S.M.A.R.T data for the given OSD ID.""" | |
90 | return CephService.get_smart_data_by_daemon('osd', osd_id) | |
91 | ||
92 | @RESTController.Resource('GET') | |
93 | def smart(self, svc_id): | |
94 | # type: (str) -> dict | |
95 | return self._get_smart_data(svc_id) | |
96 | ||
97 | @handle_send_command_error('osd') | |
98 | def get(self, svc_id): | |
99 | """ | |
100 | Returns collected data about an OSD. | |
101 | ||
102 | :return: Returns the requested data. The `histogram` key may contain a | |
103 | string with an error that occurred if the OSD is down. | |
104 | """ | |
105 | try: | |
106 | histogram = CephService.send_command( | |
107 | 'osd', srv_spec=svc_id, prefix='perf histogram dump') | |
108 | except SendCommandError as e: # pragma: no cover - the handling is too obvious | |
109 | if 'osd down' in str(e): # pragma: no cover - no complexity there | |
110 | histogram = str(e) | |
111 | else: # pragma: no cover - no complexity there | |
112 | raise | |
113 | ||
114 | return { | |
115 | 'osd_map': self.get_osd_map(svc_id), | |
116 | 'osd_metadata': mgr.get_metadata('osd', svc_id), | |
117 | 'histogram': histogram, | |
118 | } | |
119 | ||
120 | @RESTController.Resource('GET') | |
121 | @handle_send_command_error('osd') | |
122 | def histogram(self, svc_id): | |
123 | # type: (int) -> Dict[str, Any] | |
124 | """ | |
125 | :return: Returns the histogram data. | |
126 | """ | |
127 | try: | |
128 | histogram = CephService.send_command( | |
129 | 'osd', srv_spec=svc_id, prefix='perf histogram dump') | |
130 | except SendCommandError as e: # pragma: no cover - the handling is too obvious | |
131 | raise DashboardException( | |
132 | component='osd', http_status_code=400, msg=str(e)) | |
133 | ||
134 | return histogram | |
135 | ||
136 | def set(self, svc_id, device_class): # pragma: no cover | |
137 | old_device_class = CephService.send_command('mon', 'osd crush get-device-class', | |
138 | ids=[svc_id]) | |
139 | old_device_class = old_device_class[0]['device_class'] | |
140 | if old_device_class != device_class: | |
141 | CephService.send_command('mon', 'osd crush rm-device-class', | |
142 | ids=[svc_id]) | |
143 | if device_class: | |
144 | CephService.send_command('mon', 'osd crush set-device-class', **{ | |
145 | 'class': device_class, | |
146 | 'ids': [svc_id] | |
147 | }) | |
148 | ||
149 | def _check_delete(self, osd_ids): | |
150 | # type: (List[str]) -> Dict[str, Any] | |
151 | """ | |
152 | Check if it's safe to remove OSD(s). | |
153 | ||
154 | :param osd_ids: list of OSD IDs | |
155 | :return: a dictionary contains the following attributes: | |
156 | `safe`: bool, indicate if it's safe to remove OSDs. | |
157 | `message`: str, help message if it's not safe to remove OSDs. | |
158 | """ | |
159 | _ = osd_ids | |
160 | health_data = mgr.get('health') # type: ignore | |
161 | health = json.loads(health_data['json']) | |
162 | checks = health['checks'].keys() | |
163 | unsafe_checks = set(['OSD_FULL', 'OSD_BACKFILLFULL', 'OSD_NEARFULL']) | |
164 | failed_checks = checks & unsafe_checks | |
165 | msg = 'Removing OSD(s) is not recommended because of these failed health check(s): {}.'.\ | |
166 | format(', '.join(failed_checks)) if failed_checks else '' | |
167 | return { | |
168 | 'safe': not bool(failed_checks), | |
169 | 'message': msg | |
170 | } | |
171 | ||
172 | @DeletePermission | |
173 | @raise_if_no_orchestrator | |
174 | @handle_orchestrator_error('osd') | |
175 | @osd_task('delete', {'svc_id': '{svc_id}'}) | |
176 | def delete(self, svc_id, preserve_id=None, force=None): # pragma: no cover | |
177 | replace = False | |
178 | check = False | |
179 | try: | |
180 | if preserve_id is not None: | |
181 | replace = str_to_bool(preserve_id) | |
182 | if force is not None: | |
183 | check = not str_to_bool(force) | |
184 | except ValueError: | |
185 | raise DashboardException( | |
186 | component='osd', http_status_code=400, msg='Invalid parameter(s)') | |
187 | ||
188 | orch = OrchClient.instance() | |
189 | if check: | |
190 | logger.info('Check for removing osd.%s...', svc_id) | |
191 | check = self._check_delete([svc_id]) | |
192 | if not check['safe']: | |
193 | logger.error('Unable to remove osd.%s: %s', svc_id, check['message']) | |
194 | raise DashboardException(component='osd', msg=check['message']) | |
195 | ||
196 | logger.info('Start removing osd.%s (replace: %s)...', svc_id, replace) | |
197 | orch.osds.remove([svc_id], replace) | |
198 | while True: | |
199 | removal_osds = orch.osds.removing_status() | |
200 | logger.info('Current removing OSDs %s', removal_osds) | |
201 | pending = [osd for osd in removal_osds if osd.osd_id == svc_id] | |
202 | if not pending: | |
203 | break | |
204 | logger.info('Wait until osd.%s is removed...', svc_id) | |
205 | time.sleep(60) | |
206 | ||
207 | @RESTController.Resource('POST', query_params=['deep']) | |
208 | @UpdatePermission | |
209 | @allow_empty_body | |
210 | def scrub(self, svc_id, deep=False): | |
211 | api_scrub = "osd deep-scrub" if str_to_bool(deep) else "osd scrub" | |
212 | CephService.send_command("mon", api_scrub, who=svc_id) | |
213 | ||
214 | @RESTController.Resource('POST') | |
215 | @allow_empty_body | |
216 | def mark_out(self, svc_id): | |
217 | CephService.send_command('mon', 'osd out', ids=[svc_id]) | |
218 | ||
219 | @RESTController.Resource('POST') | |
220 | @allow_empty_body | |
221 | def mark_in(self, svc_id): | |
222 | CephService.send_command('mon', 'osd in', ids=[svc_id]) | |
223 | ||
224 | @RESTController.Resource('POST') | |
225 | @allow_empty_body | |
226 | def mark_down(self, svc_id): | |
227 | CephService.send_command('mon', 'osd down', ids=[svc_id]) | |
228 | ||
229 | @RESTController.Resource('POST') | |
230 | @allow_empty_body | |
231 | def reweight(self, svc_id, weight): | |
232 | """ | |
233 | Reweights the OSD temporarily. | |
234 | ||
235 | Note that ‘ceph osd reweight’ is not a persistent setting. When an OSD | |
236 | gets marked out, the osd weight will be set to 0. When it gets marked | |
237 | in again, the weight will be changed to 1. | |
238 | ||
239 | Because of this ‘ceph osd reweight’ is a temporary solution. You should | |
240 | only use it to keep your cluster running while you’re ordering more | |
241 | hardware. | |
242 | ||
243 | - Craig Lewis (http://lists.ceph.com/pipermail/ceph-users-ceph.com/2014-June/040967.html) | |
244 | """ | |
245 | CephService.send_command( | |
246 | 'mon', | |
247 | 'osd reweight', | |
248 | id=int(svc_id), | |
249 | weight=float(weight)) | |
250 | ||
251 | @RESTController.Resource('POST') | |
252 | @allow_empty_body | |
253 | def mark_lost(self, svc_id): | |
254 | """ | |
255 | Note: osd must be marked `down` before marking lost. | |
256 | """ | |
257 | CephService.send_command( | |
258 | 'mon', | |
259 | 'osd lost', | |
260 | id=int(svc_id), | |
261 | yes_i_really_mean_it=True) | |
262 | ||
263 | def _create_bare(self, data): | |
264 | """Create a OSD container that has no associated device. | |
265 | ||
266 | :param data: contain attributes to create a bare OSD. | |
267 | : `uuid`: will be set automatically if the OSD starts up | |
268 | : `svc_id`: the ID is only used if a valid uuid is given. | |
269 | """ | |
270 | try: | |
271 | uuid = data['uuid'] | |
272 | svc_id = int(data['svc_id']) | |
273 | except (KeyError, ValueError) as e: | |
274 | raise DashboardException(e, component='osd', http_status_code=400) | |
275 | ||
276 | result = CephService.send_command( | |
277 | 'mon', 'osd create', id=svc_id, uuid=uuid) | |
278 | return { | |
279 | 'result': result, | |
280 | 'svc_id': svc_id, | |
281 | 'uuid': uuid, | |
282 | } | |
283 | ||
284 | @raise_if_no_orchestrator | |
285 | @handle_orchestrator_error('osd') | |
286 | def _create_with_drive_groups(self, drive_groups): | |
287 | """Create OSDs with DriveGroups.""" | |
288 | orch = OrchClient.instance() | |
289 | try: | |
290 | dg_specs = [DriveGroupSpec.from_json(dg) for dg in drive_groups] | |
291 | orch.osds.create(dg_specs) | |
292 | except (ValueError, TypeError, DriveGroupValidationError) as e: | |
293 | raise DashboardException(e, component='osd') | |
294 | ||
295 | @CreatePermission | |
296 | @osd_task('create', {'tracking_id': '{tracking_id}'}) | |
297 | def create(self, method, data, tracking_id): # pylint: disable=W0622 | |
298 | if method == 'bare': | |
299 | return self._create_bare(data) | |
300 | if method == 'drive_groups': | |
301 | return self._create_with_drive_groups(data) | |
302 | raise DashboardException( | |
303 | component='osd', http_status_code=400, msg='Unknown method: {}'.format(method)) | |
304 | ||
305 | @RESTController.Resource('POST') | |
306 | @allow_empty_body | |
307 | def purge(self, svc_id): | |
308 | """ | |
309 | Note: osd must be marked `down` before removal. | |
310 | """ | |
311 | CephService.send_command('mon', 'osd purge-actual', id=int(svc_id), | |
312 | yes_i_really_mean_it=True) | |
313 | ||
314 | @RESTController.Resource('POST') | |
315 | @allow_empty_body | |
316 | def destroy(self, svc_id): | |
317 | """ | |
318 | Mark osd as being destroyed. Keeps the ID intact (allowing reuse), but | |
319 | removes cephx keys, config-key data and lockbox keys, rendering data | |
320 | permanently unreadable. | |
321 | ||
322 | The osd must be marked down before being destroyed. | |
323 | """ | |
324 | CephService.send_command( | |
325 | 'mon', 'osd destroy-actual', id=int(svc_id), yes_i_really_mean_it=True) | |
326 | ||
327 | @Endpoint('GET', query_params=['ids']) | |
328 | @ReadPermission | |
329 | def safe_to_destroy(self, ids): | |
330 | """ | |
331 | :type ids: int|[int] | |
332 | """ | |
333 | ||
334 | ids = json.loads(ids) | |
335 | if isinstance(ids, list): | |
336 | ids = list(map(str, ids)) | |
337 | else: | |
338 | ids = [str(ids)] | |
339 | ||
340 | try: | |
341 | result = CephService.send_command( | |
342 | 'mon', 'osd safe-to-destroy', ids=ids, target=('mgr', '')) | |
343 | result['is_safe_to_destroy'] = set(result['safe_to_destroy']) == set(map(int, ids)) | |
344 | return result | |
345 | ||
346 | except SendCommandError as e: | |
347 | return { | |
348 | 'message': str(e), | |
349 | 'is_safe_to_destroy': False, | |
350 | } | |
351 | ||
352 | @Endpoint('GET', query_params=['svc_ids']) | |
353 | @ReadPermission | |
354 | @raise_if_no_orchestrator | |
355 | @handle_orchestrator_error('osd') | |
356 | def safe_to_delete(self, svc_ids): | |
357 | """ | |
358 | :type ids: int|[int] | |
359 | """ | |
360 | check = self._check_delete(svc_ids) | |
361 | return { | |
362 | 'is_safe_to_delete': check.get('safe', False), | |
363 | 'message': check.get('message', '') | |
364 | } | |
365 | ||
366 | @RESTController.Resource('GET') | |
367 | def devices(self, svc_id): | |
368 | # (str) -> dict | |
369 | return CephService.send_command('mon', 'device ls-by-daemon', who='osd.{}'.format(svc_id)) | |
370 | ||
371 | ||
372 | @ApiController('/osd/flags', Scope.OSD) | |
373 | class OsdFlagsController(RESTController): | |
374 | @staticmethod | |
375 | def _osd_flags(): | |
376 | enabled_flags = mgr.get('osd_map')['flags_set'] | |
377 | if 'pauserd' in enabled_flags and 'pausewr' in enabled_flags: | |
378 | # 'pause' is set by calling `ceph osd set pause` and unset by | |
379 | # calling `set osd unset pause`, but `ceph osd dump | jq '.flags'` | |
380 | # will contain 'pauserd,pausewr' if pause is set. | |
381 | # Let's pretend to the API that 'pause' is in fact a proper flag. | |
382 | enabled_flags = list( | |
383 | set(enabled_flags) - {'pauserd', 'pausewr'} | {'pause'}) | |
384 | return sorted(enabled_flags) | |
385 | ||
386 | def list(self): | |
387 | return self._osd_flags() | |
388 | ||
389 | def bulk_set(self, flags): | |
390 | """ | |
391 | The `recovery_deletes`, `sortbitwise` and `pglog_hardlimit` flags cannot be unset. | |
392 | `purged_snapshots` cannot even be set. It is therefore required to at | |
393 | least include those four flags for a successful operation. | |
394 | """ | |
395 | assert isinstance(flags, list) | |
396 | ||
397 | enabled_flags = set(self._osd_flags()) | |
398 | data = set(flags) | |
399 | added = data - enabled_flags | |
400 | removed = enabled_flags - data | |
401 | for flag in added: | |
402 | CephService.send_command('mon', 'osd set', '', key=flag) | |
403 | for flag in removed: | |
404 | CephService.send_command('mon', 'osd unset', '', key=flag) | |
405 | logger.info('Changed OSD flags: added=%s removed=%s', added, removed) | |
406 | ||
407 | return sorted(enabled_flags - removed | added) |