]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/rest/app/views/v2.py
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / pybind / mgr / rest / app / views / v2.py
CommitLineData
7c673cae
FG
1from collections import defaultdict
2from distutils.version import StrictVersion
3
4from django.http import Http404
5import rest_framework
6from rest_framework.exceptions import ParseError
7from rest_framework.response import Response
8
9from rest_framework import status
10
11from rest.app.serializers.v2 import PoolSerializer, CrushRuleSetSerializer, \
12 CrushRuleSerializer, ServerSerializer, RequestSerializer, OsdSerializer, \
13 ConfigSettingSerializer, MonSerializer, OsdConfigSerializer
14from rest.app.views.rpc_view import RPCViewSet, DataObject
15from rest.app.types import CRUSH_RULE, POOL, OSD, USER_REQUEST_COMPLETE, \
16 USER_REQUEST_SUBMITTED, OSD_IMPLEMENTED_COMMANDS, OSD_MAP, \
17 SYNC_OBJECT_TYPES, OsdMap, Config, MonMap, MonStatus, SYNC_OBJECT_STR_TYPE
18
19
20from rest.logger import logger
21log = logger()
22
23
24class RequestViewSet(RPCViewSet):
25 """
26Calamari server requests, tracking long-running operations on the Calamari server. Some
27API resources return a ``202 ACCEPTED`` response with a request ID, which you can use with
28this resource to learn about progress and completion of an operation. This resource is
29paginated.
30
31May optionally filter by state by passing a ``?state=<state>`` GET parameter, where
32state is one of 'complete', 'submitted'.
33
34The returned records are ordered by the 'requested_at' attribute, in descending order (i.e.
35the first page of results contains the most recent requests).
36
37To cancel a request while it is running, send an empty POST to ``request/<request id>/cancel``.
38 """
39 serializer_class = RequestSerializer
40
41 def cancel(self, request, request_id):
42 user_request = DataObject(self.client.cancel_request(request_id))
43 return Response(self.serializer_class(user_request).data)
44
45 def retrieve(self, request, **kwargs):
46 request_id = kwargs['request_id']
47 user_request = DataObject(self.client.get_request(request_id))
48 return Response(self.serializer_class(user_request).data)
49
50 def list(self, request, **kwargs):
51 fsid = kwargs.get('fsid', None)
52 filter_state = request.GET.get('state', None)
53 valid_states = [USER_REQUEST_COMPLETE, USER_REQUEST_SUBMITTED]
54 if filter_state is not None and filter_state not in valid_states:
55 raise ParseError("State must be one of %s" % ", ".join(valid_states))
56
57 requests = self.client.list_requests({'state': filter_state, 'fsid': fsid})
58 if StrictVersion(rest_framework.__version__) < StrictVersion("3.0.0"):
59 return Response(self._paginate(request, requests))
60 else:
61 # FIXME reinstate pagination, broke in DRF 2.x -> 3.x
62 return Response(requests)
63
64
65class CrushRuleViewSet(RPCViewSet):
66 """
67A CRUSH ruleset is a collection of CRUSH rules which are applied
68together to a pool.
69 """
70 serializer_class = CrushRuleSerializer
71
72 def list(self, request):
73 rules = self.client.list(CRUSH_RULE, {})
74 osds_by_rule_id = self.client.get_sync_object(OsdMap, ['osds_by_rule_id'])
75 for rule in rules:
76 rule['osd_count'] = len(osds_by_rule_id[rule['rule_id']])
77 return Response(CrushRuleSerializer([DataObject(r) for r in rules], many=True).data)
78
79
80class CrushRuleSetViewSet(RPCViewSet):
81 """
82A CRUSH rule is used by Ceph to decide where to locate placement groups on OSDs.
83 """
84 serializer_class = CrushRuleSetSerializer
85
86 def list(self, request):
87 rules = self.client.list(CRUSH_RULE, {})
88 osds_by_rule_id = self.client.get_sync_object(OsdMap, ['osds_by_rule_id'])
89 rulesets_data = defaultdict(list)
90 for rule in rules:
91 rule['osd_count'] = len(osds_by_rule_id[rule['rule_id']])
92 rulesets_data[rule['ruleset']].append(rule)
93
94 rulesets = [DataObject({
95 'id': rd_id,
96 'rules': [DataObject(r) for r in rd_rules]
97 }) for (rd_id, rd_rules) in rulesets_data.items()]
98
99 return Response(CrushRuleSetSerializer(rulesets, many=True).data)
100
101
102class PoolDataObject(DataObject):
103 """
104 Slightly dressed up version of the raw pool from osd dump
105 """
106
107 FLAG_HASHPSPOOL = 1
108 FLAG_FULL = 2
109
110 @property
111 def hashpspool(self):
112 return bool(self.flags & self.FLAG_HASHPSPOOL)
113
114 @property
115 def full(self):
116 return bool(self.flags & self.FLAG_FULL)
117
118
119class RequestReturner(object):
120 """
121 Helper for ViewSets that sometimes need to return a request handle
122 """
123 def _return_request(self, request):
124 if request:
125 return Response(request, status=status.HTTP_202_ACCEPTED)
126 else:
127 return Response(status=status.HTTP_304_NOT_MODIFIED)
128
129
130class NullableDataObject(DataObject):
131 """
132 A DataObject which synthesizes Nones for any attributes it doesn't have
133 """
134 def __getattr__(self, item):
135 if not item.startswith('_'):
136 return self.__dict__.get(item, None)
137 else:
138 raise AttributeError
139
140
141class ConfigViewSet(RPCViewSet):
142 """
143Configuration settings from a Ceph Cluster.
144 """
145 serializer_class = ConfigSettingSerializer
146
147 def list(self, request):
148 ceph_config = self.client.get_sync_object(Config).data
149 settings = [DataObject({'key': k, 'value': v}) for (k, v) in ceph_config.items()]
150 return Response(self.serializer_class(settings, many=True).data)
151
152 def retrieve(self, request, key):
153 ceph_config = self.client.get_sync_object(Config).data
154 try:
155 setting = DataObject({'key': key, 'value': ceph_config[key]})
156 except KeyError:
157 raise Http404("Key '%s' not found" % key)
158 else:
159 return Response(self.serializer_class(setting).data)
160
161
162def _config_to_bool(config_val):
163 return {'true': True, 'false': False}[config_val.lower()]
164
165
166class PoolViewSet(RPCViewSet, RequestReturner):
167 """
168Manage Ceph storage pools.
169
170To get the default values which will be used for any fields omitted from a POST, do
171a GET with the ?defaults argument. The returned pool object will contain all attributes,
172but those without static defaults will be set to null.
173
174 """
175 serializer_class = PoolSerializer
176
177 def _defaults(self):
178 # Issue overlapped RPCs first
179 ceph_config = self.client.get_sync_object(Config)
180 rules = self.client.list(CRUSH_RULE, {})
181
182 if not ceph_config:
183 return Response("Cluster configuration unavailable", status=status.HTTP_503_SERVICE_UNAVAILABLE)
184
185 if not rules:
186 return Response("No CRUSH rules exist, pool creation is impossible",
187 status=status.HTTP_503_SERVICE_UNAVAILABLE)
188
189 # Ceph does not reliably inform us of a default ruleset that exists, so we check
190 # what it tells us against the rulesets we know about.
191 ruleset_ids = sorted(list(set([r['ruleset'] for r in rules])))
192 if int(ceph_config['osd_pool_default_crush_rule']) in ruleset_ids:
193 # This is the ceph<0.80 setting
194 default_ruleset = ceph_config['osd_pool_default_crush_rule']
195 elif int(ceph_config.get('osd_pool_default_crush_replicated_ruleset', -1)) in ruleset_ids:
196 # This is the ceph>=0.80
197 default_ruleset = ceph_config['osd_pool_default_crush_replicated_ruleset']
198 else:
199 # Ceph may have an invalid default set which
200 # would cause undefined behaviour in pool creation (#8373)
201 # In this case, pick lowest numbered ruleset as default
202 default_ruleset = ruleset_ids[0]
203
204 defaults = NullableDataObject({
205 'size': int(ceph_config['osd_pool_default_size']),
206 'crush_ruleset': int(default_ruleset),
207 'min_size': int(ceph_config['osd_pool_default_min_size']),
208 'hashpspool': _config_to_bool(ceph_config['osd_pool_default_flag_hashpspool']),
209 # Crash replay interval is zero by default when you create a pool, but when ceph creates
210 # its own data pool it applies 'osd_default_data_pool_replay_window'. If we add UI for adding
211 # pools to a filesystem, we should check that those data pools have this set.
212 'crash_replay_interval': 0,
213 'quota_max_objects': 0,
214 'quota_max_bytes': 0
215 })
216
217 return Response(PoolSerializer(defaults).data)
218
219 def list(self, request):
220 if 'defaults' in request.GET:
221 return self._defaults()
222
223 pools = [PoolDataObject(p) for p in self.client.list(POOL, {})]
224 return Response(PoolSerializer(pools, many=True).data)
225
226 def retrieve(self, request, pool_id):
227 pool = PoolDataObject(self.client.get(POOL, int(pool_id)))
228 return Response(PoolSerializer(pool).data)
229
230 def create(self, request):
231 serializer = self.serializer_class(data=request.DATA)
232 if serializer.is_valid(request.method):
233 response = self._validate_semantics(None, serializer.get_data())
234 if response is not None:
235 return response
236
237 create_response = self.client.create(POOL, serializer.get_data())
238
239 # TODO: handle case where the creation is rejected for some reason (should
240 # be passed an errors dict for a clean failure, or a zerorpc exception
241 # for a dirty failure)
242 assert 'request_id' in create_response
243 return Response(create_response, status=status.HTTP_202_ACCEPTED)
244 else:
245 return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
246
247 def update(self, request, pool_id):
248 serializer = self.serializer_class(data=request.DATA)
249 if serializer.is_valid(request.method):
250 response = self._validate_semantics(pool_id, serializer.get_data())
251 if response is not None:
252 return response
253
254 return self._return_request(self.client.update(POOL, int(pool_id), serializer.get_data()))
255 else:
256 return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
257
258 def destroy(self, request, pool_id):
259 delete_response = self.client.delete(POOL, int(pool_id), status=status.HTTP_202_ACCEPTED)
260 return Response(delete_response, status=status.HTTP_202_ACCEPTED)
261
262 def _validate_semantics(self, pool_id, data):
263 errors = defaultdict(list)
264 self._check_name_unique(data, errors)
265 self._check_crush_ruleset(data, errors)
266 self._check_pgp_less_than_pg_num(data, errors)
267 self._check_pg_nums_dont_decrease(pool_id, data, errors)
268 self._check_pg_num_inside_config_bounds(data, errors)
269
270 if errors.items():
271 if 'name' in errors:
272 return Response(errors, status=status.HTTP_409_CONFLICT)
273 else:
274 return Response(errors, status=status.HTTP_400_BAD_REQUEST)
275
276 def _check_pg_nums_dont_decrease(self, pool_id, data, errors):
277 if pool_id is not None:
278 detail = self.client.get(POOL, int(pool_id))
279 for field in ['pg_num', 'pgp_num']:
280 expanded_field = 'pg_placement_num' if field == 'pgp_num' else 'pg_num'
281 if field in data and data[field] < detail[expanded_field]:
282 errors[field].append('must be >= than current {field}'.format(field=field))
283
284 def _check_crush_ruleset(self, data, errors):
285 if 'crush_ruleset' in data:
286 rules = self.client.list(CRUSH_RULE, {})
287 rulesets = set(r['ruleset'] for r in rules)
288 if data['crush_ruleset'] not in rulesets:
289 errors['crush_ruleset'].append("CRUSH ruleset {0} not found".format(data['crush_ruleset']))
290
291 def _check_pg_num_inside_config_bounds(self, data, errors):
292 ceph_config = self.client.get_sync_object(Config).data
293 if not ceph_config:
294 return Response("Cluster configuration unavailable", status=status.HTTP_503_SERVICE_UNAVAILABLE)
295 if 'pg_num' in data and data['pg_num'] > int(ceph_config['mon_max_pool_pg_num']):
296 errors['pg_num'].append('requested pg_num must be <= than current limit of {max}'.format(max=ceph_config['mon_max_pool_pg_num']))
297
298 def _check_pgp_less_than_pg_num(self, data, errors):
299 if 'pgp_num' in data and 'pg_num' in data and data['pg_num'] < data['pgp_num']:
300 errors['pgp_num'].append('must be >= to pg_num')
301
302 def _check_name_unique(self, data, errors):
303 if 'name' in data and data['name'] in [x.pool_name for x in [PoolDataObject(p) for p in self.client.list(POOL, {})]]:
304 errors['name'].append('Pool with name {name} already exists'.format(name=data['name']))
305
306
307class OsdViewSet(RPCViewSet, RequestReturner):
308 """
309Manage Ceph OSDs.
310
311Apply ceph commands to an OSD by doing a POST with no data to
312api/v2/cluster/<fsid>/osd/<osd_id>/command/<command>
313where <command> is one of ("scrub", "deep-scrub", "repair")
314
315e.g. Initiate a scrub on OSD 0 by POSTing {} to api/v2/cluster/<fsid>/osd/0/command/scrub
316
317Filtering is available on this resource:
318
319::
320
321 # Pass a ``pool`` URL parameter set to a pool ID to filter by pool, like this:
322 /api/v2/cluster/<fsid>/osd?pool=1
323
324 # Pass a series of ``id__in[]`` parameters to specify a list of OSD IDs
325 # that you wish to receive.
326 /api/v2/cluster/<fsid>/osd?id__in[]=2&id__in[]=3
327
328 """
329 serializer_class = OsdSerializer
330
331 def list(self, request):
332 return self._list(request)
333
334 def _list(self, request):
335 # Get data needed for filtering
336 list_filter = {}
337
338 if 'pool' in request.GET:
339 try:
340 pool_id = int(request.GET['pool'])
341 except ValueError:
342 return Response("Pool ID must be an integer", status=status.HTTP_400_BAD_REQUEST)
343 list_filter['pool'] = pool_id
344
345 if 'id__in[]' in request.GET:
346 try:
347 ids = request.GET.getlist("id__in[]")
348 list_filter['id__in'] = [int(i) for i in ids]
349 except ValueError:
350 return Response("Invalid OSD ID in list", status=status.HTTP_400_BAD_REQUEST)
351
352 # Get data
353 osds = self.client.list(OSD, list_filter)
354 osd_to_pools = self.client.get_sync_object(OsdMap, ['osd_pools'])
355 crush_nodes = self.client.get_sync_object(OsdMap, ['osd_tree_node_by_id'])
356 osd_metadata = self.client.get_sync_object(OsdMap, ['osd_metadata'])
357
358 osd_id_to_hostname = dict(
359 [(int(osd_id), osd_meta["hostname"]) for osd_id, osd_meta in
360 osd_metadata.items()])
361
362 # Get data depending on OSD list
363 osd_commands = self.client.get_valid_commands(OSD, [x['osd'] for x in osds])
364
365 # Build OSD data objects
366 for o in osds:
367 # An OSD being in the OSD map does not guarantee its presence in the CRUSH
368 # map, as "osd crush rm" and "osd rm" are separate operations.
369 try:
370 o.update({'reweight': float(crush_nodes[o['osd']]['reweight'])})
371 except KeyError:
372 log.warning("No CRUSH data available for OSD {0}".format(o['osd']))
373 o.update({'reweight': 0.0})
374
375 o['server'] = osd_id_to_hostname.get(o['osd'], None)
376
377 for o in osds:
378 o['pools'] = osd_to_pools[o['osd']]
379
380 for o in osds:
381 o.update(osd_commands[o['osd']])
382
383 return Response(self.serializer_class([DataObject(o) for o in osds], many=True).data)
384
385 def retrieve(self, request, osd_id):
386 osd = self.client.get_sync_object(OsdMap, ['osds_by_id', int(osd_id)])
387 crush_node = self.client.get_sync_object(OsdMap, ['osd_tree_node_by_id', int(osd_id)])
388 osd['reweight'] = float(crush_node['reweight'])
389
390 osd_metadata = self.client.get_sync_object(OsdMap, ['osd_metadata'])
391
392 osd_id_to_hostname = dict(
393 [(int(oid), osd_meta["hostname"]) for oid, osd_meta in
394 osd_metadata.items()])
395
396 osd['server'] = osd_id_to_hostname.get(osd['osd'], None)
397
398 pools = self.client.get_sync_object(OsdMap, ['osd_pools', int(osd_id)])
399 osd['pools'] = pools
400
401 osd_commands = self.client.get_valid_commands(OSD, [int(osd_id)])
402 osd.update(osd_commands[int(osd_id)])
403
404 return Response(self.serializer_class(DataObject(osd)).data)
405
406 def update(self, request, osd_id):
407 serializer = self.serializer_class(data=request.DATA)
408 if serializer.is_valid(request.method):
409 return self._return_request(self.client.update(OSD, int(osd_id),
410 serializer.get_data()))
411 else:
412 return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
413
414 def apply(self, request, osd_id, command):
415 if command in self.client.get_valid_commands(OSD, [int(osd_id)]).get(int(osd_id)).get('valid_commands'):
416 return Response(self.client.apply(OSD, int(osd_id), command), status=202)
417 else:
418 return Response('{0} not valid on {1}'.format(command, osd_id), status=403)
419
420 def get_implemented_commands(self, request):
421 return Response(OSD_IMPLEMENTED_COMMANDS)
422
423 def get_valid_commands(self, request, osd_id=None):
424 osds = []
425 if osd_id is None:
426 osds = self.client.get_sync_object(OsdMap, ['osds_by_id']).keys()
427 else:
428 osds.append(int(osd_id))
429
430 return Response(self.client.get_valid_commands(OSD, osds))
431
432 def validate_command(self, request, osd_id, command):
433 valid_commands = self.client.get_valid_commands(OSD, [int(osd_id)]).get(int(osd_id)).get('valid_commands')
434
435 return Response({'valid': command in valid_commands})
436
437
438class OsdConfigViewSet(RPCViewSet, RequestReturner):
439 """
440Manage flags in the OsdMap
441 """
442 serializer_class = OsdConfigSerializer
443
444 def osd_config(self, request):
445 osd_map = self.client.get_sync_object(OsdMap, ['flags'])
446 return Response(osd_map)
447
448 def update(self, request):
449
450 serializer = self.serializer_class(data=request.DATA)
451 if not serializer.is_valid(request.method):
452 return Response(serializer.errors, status=403)
453
454 response = self.client.update(OSD_MAP, None, serializer.get_data())
455
456 return self._return_request(response)
457
458
459class SyncObject(RPCViewSet):
460 """
461These objects are the raw data received by the Calamari server from the Ceph cluster,
462such as the cluster maps
463 """
464
465 def retrieve(self, request, sync_type):
466 try:
467 sync_type_cls = SYNC_OBJECT_STR_TYPE[sync_type]
468 except KeyError:
469 return Response("Unknown type '{0}'".format(sync_type), status=404)
470 return Response(self.client.get_sync_object(sync_type_cls).data)
471
472 def describe(self, request):
473 return Response([s.str for s in SYNC_OBJECT_TYPES])
474
475
476class ServerViewSet(RPCViewSet):
477 """
478Servers that we've learned about via the daemon metadata reported by
479Ceph OSDs, MDSs, mons.
480 """
481 serializer_class = ServerSerializer
482
483 def retrieve(self, request, fqdn):
484 return Response(
485 self.serializer_class(
486 DataObject(self.client.server_get(fqdn))).data
487 )
488
489 def list(self, request):
490 servers = self.client.server_list()
491 return Response(self.serializer_class(
492 [DataObject(s) for s in servers],
493 many=True).data)
494
495
496class MonViewSet(RPCViewSet):
497 """
498Ceph monitor services.
499
500Note that the ID used to retrieve a specific mon using this API resource is
501the monitor *name* as opposed to the monitor *rank*.
502
503The quorum status reported here is based on the last mon status reported by
504the Ceph cluster, and also the status of each mon daemon queried by Calamari.
505
506For debugging mons which are failing to join the cluster, it may be
507useful to show users data from the /status sub-url, which returns the
508"mon_status" output from the daemon.
509
510 """
511 serializer_class = MonSerializer
512
513 def _get_mons(self):
514 monmap_mons = self.client.get_sync_object(MonMap).data['mons']
515 mon_status = self.client.get_sync_object(MonStatus).data
516
517 for mon in monmap_mons:
518 mon['in_quorum'] = mon['rank'] in mon_status['quorum']
519 mon['server'] = self.client.get_metadata("mon", mon['name'])['hostname']
520 mon['leader'] = mon['rank'] == mon_status['quorum'][0]
521
522 return monmap_mons
523
524 def retrieve(self, request, mon_id):
525 mons = self._get_mons()
526 try:
527 mon = [m for m in mons if m['name'] == mon_id][0]
528 except IndexError:
529 raise Http404("Mon '%s' not found" % mon_id)
530
531 return Response(self.serializer_class(DataObject(mon)).data)
532
533 def list(self, request):
534 mons = self._get_mons()
535 return Response(
536 self.serializer_class([DataObject(m) for m in mons],
537 many=True).data)
538