1 from collections
import defaultdict
2 from distutils
.version
import StrictVersion
4 from django
.http
import Http404
6 from rest_framework
.exceptions
import ParseError
7 from rest_framework
.response
import Response
9 from rest_framework
import status
11 from rest
.app
.serializers
.v2
import PoolSerializer
, CrushRuleSetSerializer
, \
12 CrushRuleSerializer
, ServerSerializer
, RequestSerializer
, OsdSerializer
, \
13 ConfigSettingSerializer
, MonSerializer
, OsdConfigSerializer
14 from rest
.app
.views
.rpc_view
import RPCViewSet
, DataObject
15 from rest
.app
.types
import CRUSH_RULE
, POOL
, OSD
, USER_REQUEST_COMPLETE
, \
16 USER_REQUEST_SUBMITTED
, OSD_IMPLEMENTED_COMMANDS
, OSD_MAP
, \
17 SYNC_OBJECT_TYPES
, OsdMap
, Config
, MonMap
, MonStatus
, SYNC_OBJECT_STR_TYPE
20 from rest
.logger
import logger
24 class RequestViewSet(RPCViewSet
):
26 Calamari server requests, tracking long-running operations on the Calamari server. Some
27 API resources return a ``202 ACCEPTED`` response with a request ID, which you can use with
28 this resource to learn about progress and completion of an operation. This resource is
31 May optionally filter by state by passing a ``?state=<state>`` GET parameter, where
32 state is one of 'complete', 'submitted'.
34 The returned records are ordered by the 'requested_at' attribute, in descending order (i.e.
35 the first page of results contains the most recent requests).
37 To cancel a request while it is running, send an empty POST to ``request/<request id>/cancel``.
39 serializer_class
= RequestSerializer
41 def cancel(self
, request
, request_id
):
42 user_request
= DataObject(self
.client
.cancel_request(request_id
))
43 return Response(self
.serializer_class(user_request
).data
)
45 def retrieve(self
, request
, **kwargs
):
46 request_id
= kwargs
['request_id']
47 user_request
= DataObject(self
.client
.get_request(request_id
))
48 return Response(self
.serializer_class(user_request
).data
)
50 def list(self
, request
, **kwargs
):
51 fsid
= kwargs
.get('fsid', None)
52 filter_state
= request
.GET
.get('state', None)
53 valid_states
= [USER_REQUEST_COMPLETE
, USER_REQUEST_SUBMITTED
]
54 if filter_state
is not None and filter_state
not in valid_states
:
55 raise ParseError("State must be one of %s" % ", ".join(valid_states
))
57 requests
= self
.client
.list_requests({'state': filter_state
, 'fsid': fsid
})
58 if StrictVersion(rest_framework
.__version
__) < StrictVersion("3.0.0"):
59 return Response(self
._paginate
(request
, requests
))
61 # FIXME reinstate pagination, broke in DRF 2.x -> 3.x
62 return Response(requests
)
65 class CrushRuleViewSet(RPCViewSet
):
67 A CRUSH ruleset is a collection of CRUSH rules which are applied
70 serializer_class
= CrushRuleSerializer
72 def list(self
, request
):
73 rules
= self
.client
.list(CRUSH_RULE
, {})
74 osds_by_rule_id
= self
.client
.get_sync_object(OsdMap
, ['osds_by_rule_id'])
76 rule
['osd_count'] = len(osds_by_rule_id
[rule
['rule_id']])
77 return Response(CrushRuleSerializer([DataObject(r
) for r
in rules
], many
=True).data
)
80 class CrushRuleSetViewSet(RPCViewSet
):
82 A CRUSH rule is used by Ceph to decide where to locate placement groups on OSDs.
84 serializer_class
= CrushRuleSetSerializer
86 def list(self
, request
):
87 rules
= self
.client
.list(CRUSH_RULE
, {})
88 osds_by_rule_id
= self
.client
.get_sync_object(OsdMap
, ['osds_by_rule_id'])
89 rulesets_data
= defaultdict(list)
91 rule
['osd_count'] = len(osds_by_rule_id
[rule
['rule_id']])
92 rulesets_data
[rule
['ruleset']].append(rule
)
94 rulesets
= [DataObject({
96 'rules': [DataObject(r
) for r
in rd_rules
]
97 }) for (rd_id
, rd_rules
) in rulesets_data
.items()]
99 return Response(CrushRuleSetSerializer(rulesets
, many
=True).data
)
102 class PoolDataObject(DataObject
):
104 Slightly dressed up version of the raw pool from osd dump
111 def hashpspool(self
):
112 return bool(self
.flags
& self
.FLAG_HASHPSPOOL
)
116 return bool(self
.flags
& self
.FLAG_FULL
)
119 class RequestReturner(object):
121 Helper for ViewSets that sometimes need to return a request handle
123 def _return_request(self
, request
):
125 return Response(request
, status
=status
.HTTP_202_ACCEPTED
)
127 return Response(status
=status
.HTTP_304_NOT_MODIFIED
)
130 class NullableDataObject(DataObject
):
132 A DataObject which synthesizes Nones for any attributes it doesn't have
134 def __getattr__(self
, item
):
135 if not item
.startswith('_'):
136 return self
.__dict
__.get(item
, None)
141 class ConfigViewSet(RPCViewSet
):
143 Configuration settings from a Ceph Cluster.
145 serializer_class
= ConfigSettingSerializer
147 def list(self
, request
):
148 ceph_config
= self
.client
.get_sync_object(Config
).data
149 settings
= [DataObject({'key': k
, 'value': v
}) for (k
, v
) in ceph_config
.items()]
150 return Response(self
.serializer_class(settings
, many
=True).data
)
152 def retrieve(self
, request
, key
):
153 ceph_config
= self
.client
.get_sync_object(Config
).data
155 setting
= DataObject({'key': key
, 'value': ceph_config
[key
]})
157 raise Http404("Key '%s' not found" % key
)
159 return Response(self
.serializer_class(setting
).data
)
162 def _config_to_bool(config_val
):
163 return {'true': True, 'false': False}[config_val
.lower()]
166 class PoolViewSet(RPCViewSet
, RequestReturner
):
168 Manage Ceph storage pools.
170 To get the default values which will be used for any fields omitted from a POST, do
171 a GET with the ?defaults argument. The returned pool object will contain all attributes,
172 but those without static defaults will be set to null.
175 serializer_class
= PoolSerializer
178 # Issue overlapped RPCs first
179 ceph_config
= self
.client
.get_sync_object(Config
)
180 rules
= self
.client
.list(CRUSH_RULE
, {})
183 return Response("Cluster configuration unavailable", status
=status
.HTTP_503_SERVICE_UNAVAILABLE
)
186 return Response("No CRUSH rules exist, pool creation is impossible",
187 status
=status
.HTTP_503_SERVICE_UNAVAILABLE
)
189 # Ceph does not reliably inform us of a default ruleset that exists, so we check
190 # what it tells us against the rulesets we know about.
191 ruleset_ids
= sorted(list(set([r
['ruleset'] for r
in rules
])))
192 if int(ceph_config
['osd_pool_default_crush_rule']) in ruleset_ids
:
193 # This is the ceph<0.80 setting
194 default_ruleset
= ceph_config
['osd_pool_default_crush_rule']
195 elif int(ceph_config
.get('osd_pool_default_crush_replicated_ruleset', -1)) in ruleset_ids
:
196 # This is the ceph>=0.80
197 default_ruleset
= ceph_config
['osd_pool_default_crush_replicated_ruleset']
199 # Ceph may have an invalid default set which
200 # would cause undefined behaviour in pool creation (#8373)
201 # In this case, pick lowest numbered ruleset as default
202 default_ruleset
= ruleset_ids
[0]
204 defaults
= NullableDataObject({
205 'size': int(ceph_config
['osd_pool_default_size']),
206 'crush_ruleset': int(default_ruleset
),
207 'min_size': int(ceph_config
['osd_pool_default_min_size']),
208 'hashpspool': _config_to_bool(ceph_config
['osd_pool_default_flag_hashpspool']),
209 # Crash replay interval is zero by default when you create a pool, but when ceph creates
210 # its own data pool it applies 'osd_default_data_pool_replay_window'. If we add UI for adding
211 # pools to a filesystem, we should check that those data pools have this set.
212 'crash_replay_interval': 0,
213 'quota_max_objects': 0,
217 return Response(PoolSerializer(defaults
).data
)
219 def list(self
, request
):
220 if 'defaults' in request
.GET
:
221 return self
._defaults
()
223 pools
= [PoolDataObject(p
) for p
in self
.client
.list(POOL
, {})]
224 return Response(PoolSerializer(pools
, many
=True).data
)
226 def retrieve(self
, request
, pool_id
):
227 pool
= PoolDataObject(self
.client
.get(POOL
, int(pool_id
)))
228 return Response(PoolSerializer(pool
).data
)
230 def create(self
, request
):
231 serializer
= self
.serializer_class(data
=request
.DATA
)
232 if serializer
.is_valid(request
.method
):
233 response
= self
._validate
_semantics
(None, serializer
.get_data())
234 if response
is not None:
237 create_response
= self
.client
.create(POOL
, serializer
.get_data())
239 # TODO: handle case where the creation is rejected for some reason (should
240 # be passed an errors dict for a clean failure, or a zerorpc exception
241 # for a dirty failure)
242 assert 'request_id' in create_response
243 return Response(create_response
, status
=status
.HTTP_202_ACCEPTED
)
245 return Response(serializer
.errors
, status
=status
.HTTP_400_BAD_REQUEST
)
247 def update(self
, request
, pool_id
):
248 serializer
= self
.serializer_class(data
=request
.DATA
)
249 if serializer
.is_valid(request
.method
):
250 response
= self
._validate
_semantics
(pool_id
, serializer
.get_data())
251 if response
is not None:
254 return self
._return
_request
(self
.client
.update(POOL
, int(pool_id
), serializer
.get_data()))
256 return Response(serializer
.errors
, status
=status
.HTTP_400_BAD_REQUEST
)
258 def destroy(self
, request
, pool_id
):
259 delete_response
= self
.client
.delete(POOL
, int(pool_id
), status
=status
.HTTP_202_ACCEPTED
)
260 return Response(delete_response
, status
=status
.HTTP_202_ACCEPTED
)
262 def _validate_semantics(self
, pool_id
, data
):
263 errors
= defaultdict(list)
264 self
._check
_name
_unique
(data
, errors
)
265 self
._check
_crush
_ruleset
(data
, errors
)
266 self
._check
_pgp
_less
_than
_pg
_num
(data
, errors
)
267 self
._check
_pg
_nums
_dont
_decrease
(pool_id
, data
, errors
)
268 self
._check
_pg
_num
_inside
_config
_bounds
(data
, errors
)
272 return Response(errors
, status
=status
.HTTP_409_CONFLICT
)
274 return Response(errors
, status
=status
.HTTP_400_BAD_REQUEST
)
276 def _check_pg_nums_dont_decrease(self
, pool_id
, data
, errors
):
277 if pool_id
is not None:
278 detail
= self
.client
.get(POOL
, int(pool_id
))
279 for field
in ['pg_num', 'pgp_num']:
280 expanded_field
= 'pg_placement_num' if field
== 'pgp_num' else 'pg_num'
281 if field
in data
and data
[field
] < detail
[expanded_field
]:
282 errors
[field
].append('must be >= than current {field}'.format(field
=field
))
284 def _check_crush_ruleset(self
, data
, errors
):
285 if 'crush_ruleset' in data
:
286 rules
= self
.client
.list(CRUSH_RULE
, {})
287 rulesets
= set(r
['ruleset'] for r
in rules
)
288 if data
['crush_ruleset'] not in rulesets
:
289 errors
['crush_ruleset'].append("CRUSH ruleset {0} not found".format(data
['crush_ruleset']))
291 def _check_pg_num_inside_config_bounds(self
, data
, errors
):
292 ceph_config
= self
.client
.get_sync_object(Config
).data
294 return Response("Cluster configuration unavailable", status
=status
.HTTP_503_SERVICE_UNAVAILABLE
)
295 if 'pg_num' in data
and data
['pg_num'] > int(ceph_config
['mon_max_pool_pg_num']):
296 errors
['pg_num'].append('requested pg_num must be <= than current limit of {max}'.format(max=ceph_config
['mon_max_pool_pg_num']))
298 def _check_pgp_less_than_pg_num(self
, data
, errors
):
299 if 'pgp_num' in data
and 'pg_num' in data
and data
['pg_num'] < data
['pgp_num']:
300 errors
['pgp_num'].append('must be >= to pg_num')
302 def _check_name_unique(self
, data
, errors
):
303 if 'name' in data
and data
['name'] in [x
.pool_name
for x
in [PoolDataObject(p
) for p
in self
.client
.list(POOL
, {})]]:
304 errors
['name'].append('Pool with name {name} already exists'.format(name
=data
['name']))
307 class OsdViewSet(RPCViewSet
, RequestReturner
):
311 Apply ceph commands to an OSD by doing a POST with no data to
312 api/v2/cluster/<fsid>/osd/<osd_id>/command/<command>
313 where <command> is one of ("scrub", "deep-scrub", "repair")
315 e.g. Initiate a scrub on OSD 0 by POSTing {} to api/v2/cluster/<fsid>/osd/0/command/scrub
317 Filtering is available on this resource:
321 # Pass a ``pool`` URL parameter set to a pool ID to filter by pool, like this:
322 /api/v2/cluster/<fsid>/osd?pool=1
324 # Pass a series of ``id__in[]`` parameters to specify a list of OSD IDs
325 # that you wish to receive.
326 /api/v2/cluster/<fsid>/osd?id__in[]=2&id__in[]=3
329 serializer_class
= OsdSerializer
331 def list(self
, request
):
332 return self
._list
(request
)
334 def _list(self
, request
):
335 # Get data needed for filtering
338 if 'pool' in request
.GET
:
340 pool_id
= int(request
.GET
['pool'])
342 return Response("Pool ID must be an integer", status
=status
.HTTP_400_BAD_REQUEST
)
343 list_filter
['pool'] = pool_id
345 if 'id__in[]' in request
.GET
:
347 ids
= request
.GET
.getlist("id__in[]")
348 list_filter
['id__in'] = [int(i
) for i
in ids
]
350 return Response("Invalid OSD ID in list", status
=status
.HTTP_400_BAD_REQUEST
)
353 osds
= self
.client
.list(OSD
, list_filter
)
354 osd_to_pools
= self
.client
.get_sync_object(OsdMap
, ['osd_pools'])
355 crush_nodes
= self
.client
.get_sync_object(OsdMap
, ['osd_tree_node_by_id'])
356 osd_metadata
= self
.client
.get_sync_object(OsdMap
, ['osd_metadata'])
358 osd_id_to_hostname
= dict(
359 [(int(osd_id
), osd_meta
["hostname"]) for osd_id
, osd_meta
in
360 osd_metadata
.items()])
362 # Get data depending on OSD list
363 osd_commands
= self
.client
.get_valid_commands(OSD
, [x
['osd'] for x
in osds
])
365 # Build OSD data objects
367 # An OSD being in the OSD map does not guarantee its presence in the CRUSH
368 # map, as "osd crush rm" and "osd rm" are separate operations.
370 o
.update({'reweight': float(crush_nodes
[o
['osd']]['reweight'])})
372 log
.warning("No CRUSH data available for OSD {0}".format(o
['osd']))
373 o
.update({'reweight': 0.0})
375 o
['server'] = osd_id_to_hostname
.get(o
['osd'], None)
378 o
['pools'] = osd_to_pools
[o
['osd']]
381 o
.update(osd_commands
[o
['osd']])
383 return Response(self
.serializer_class([DataObject(o
) for o
in osds
], many
=True).data
)
385 def retrieve(self
, request
, osd_id
):
386 osd
= self
.client
.get_sync_object(OsdMap
, ['osds_by_id', int(osd_id
)])
387 crush_node
= self
.client
.get_sync_object(OsdMap
, ['osd_tree_node_by_id', int(osd_id
)])
388 osd
['reweight'] = float(crush_node
['reweight'])
390 osd_metadata
= self
.client
.get_sync_object(OsdMap
, ['osd_metadata'])
392 osd_id_to_hostname
= dict(
393 [(int(oid
), osd_meta
["hostname"]) for oid
, osd_meta
in
394 osd_metadata
.items()])
396 osd
['server'] = osd_id_to_hostname
.get(osd
['osd'], None)
398 pools
= self
.client
.get_sync_object(OsdMap
, ['osd_pools', int(osd_id
)])
401 osd_commands
= self
.client
.get_valid_commands(OSD
, [int(osd_id
)])
402 osd
.update(osd_commands
[int(osd_id
)])
404 return Response(self
.serializer_class(DataObject(osd
)).data
)
406 def update(self
, request
, osd_id
):
407 serializer
= self
.serializer_class(data
=request
.DATA
)
408 if serializer
.is_valid(request
.method
):
409 return self
._return
_request
(self
.client
.update(OSD
, int(osd_id
),
410 serializer
.get_data()))
412 return Response(serializer
.errors
, status
=status
.HTTP_400_BAD_REQUEST
)
414 def apply(self
, request
, osd_id
, command
):
415 if command
in self
.client
.get_valid_commands(OSD
, [int(osd_id
)]).get(int(osd_id
)).get('valid_commands'):
416 return Response(self
.client
.apply(OSD
, int(osd_id
), command
), status
=202)
418 return Response('{0} not valid on {1}'.format(command
, osd_id
), status
=403)
420 def get_implemented_commands(self
, request
):
421 return Response(OSD_IMPLEMENTED_COMMANDS
)
423 def get_valid_commands(self
, request
, osd_id
=None):
426 osds
= self
.client
.get_sync_object(OsdMap
, ['osds_by_id']).keys()
428 osds
.append(int(osd_id
))
430 return Response(self
.client
.get_valid_commands(OSD
, osds
))
432 def validate_command(self
, request
, osd_id
, command
):
433 valid_commands
= self
.client
.get_valid_commands(OSD
, [int(osd_id
)]).get(int(osd_id
)).get('valid_commands')
435 return Response({'valid': command
in valid_commands
})
438 class OsdConfigViewSet(RPCViewSet
, RequestReturner
):
440 Manage flags in the OsdMap
442 serializer_class
= OsdConfigSerializer
444 def osd_config(self
, request
):
445 osd_map
= self
.client
.get_sync_object(OsdMap
, ['flags'])
446 return Response(osd_map
)
448 def update(self
, request
):
450 serializer
= self
.serializer_class(data
=request
.DATA
)
451 if not serializer
.is_valid(request
.method
):
452 return Response(serializer
.errors
, status
=403)
454 response
= self
.client
.update(OSD_MAP
, None, serializer
.get_data())
456 return self
._return
_request
(response
)
459 class SyncObject(RPCViewSet
):
461 These objects are the raw data received by the Calamari server from the Ceph cluster,
462 such as the cluster maps
465 def retrieve(self
, request
, sync_type
):
467 sync_type_cls
= SYNC_OBJECT_STR_TYPE
[sync_type
]
469 return Response("Unknown type '{0}'".format(sync_type
), status
=404)
470 return Response(self
.client
.get_sync_object(sync_type_cls
).data
)
472 def describe(self
, request
):
473 return Response([s
.str for s
in SYNC_OBJECT_TYPES
])
476 class ServerViewSet(RPCViewSet
):
478 Servers that we've learned about via the daemon metadata reported by
479 Ceph OSDs, MDSs, mons.
481 serializer_class
= ServerSerializer
483 def retrieve(self
, request
, fqdn
):
485 self
.serializer_class(
486 DataObject(self
.client
.server_get(fqdn
))).data
489 def list(self
, request
):
490 servers
= self
.client
.server_list()
491 return Response(self
.serializer_class(
492 [DataObject(s
) for s
in servers
],
496 class MonViewSet(RPCViewSet
):
498 Ceph monitor services.
500 Note that the ID used to retrieve a specific mon using this API resource is
501 the monitor *name* as opposed to the monitor *rank*.
503 The quorum status reported here is based on the last mon status reported by
504 the Ceph cluster, and also the status of each mon daemon queried by Calamari.
506 For debugging mons which are failing to join the cluster, it may be
507 useful to show users data from the /status sub-url, which returns the
508 "mon_status" output from the daemon.
511 serializer_class
= MonSerializer
514 monmap_mons
= self
.client
.get_sync_object(MonMap
).data
['mons']
515 mon_status
= self
.client
.get_sync_object(MonStatus
).data
517 for mon
in monmap_mons
:
518 mon
['in_quorum'] = mon
['rank'] in mon_status
['quorum']
519 mon
['server'] = self
.client
.get_metadata("mon", mon
['name'])['hostname']
520 mon
['leader'] = mon
['rank'] == mon_status
['quorum'][0]
524 def retrieve(self
, request
, mon_id
):
525 mons
= self
._get
_mons
()
527 mon
= [m
for m
in mons
if m
['name'] == mon_id
][0]
529 raise Http404("Mon '%s' not found" % mon_id
)
531 return Response(self
.serializer_class(DataObject(mon
)).data
)
533 def list(self
, request
):
534 mons
= self
._get
_mons
()
536 self
.serializer_class([DataObject(m
) for m
in mons
],