]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/controllers/cephfs.py
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / pybind / mgr / dashboard / controllers / cephfs.py
1 # -*- coding: utf-8 -*-
2 import logging
3 import os
4 from collections import defaultdict
5
6 import cephfs
7 import cherrypy
8
9 from .. import mgr
10 from ..exceptions import DashboardException
11 from ..security import Scope
12 from ..services.ceph_service import CephService
13 from ..services.cephfs import CephFS as CephFS_
14 from ..services.exception import handle_cephfs_error
15 from ..tools import ViewCache
16 from . import APIDoc, APIRouter, EndpointDoc, RESTController, UIRouter, allow_empty_body
17
18 GET_QUOTAS_SCHEMA = {
19 'max_bytes': (int, ''),
20 'max_files': (int, '')
21 }
22
23 logger = logging.getLogger("controllers.rgw")
24
25
26 @APIRouter('/cephfs', Scope.CEPHFS)
27 @APIDoc("Cephfs Management API", "Cephfs")
28 class CephFS(RESTController):
29 def __init__(self): # pragma: no cover
30 super().__init__()
31
32 # Stateful instances of CephFSClients, hold cached results. Key to
33 # dict is FSCID
34 self.cephfs_clients = {}
35
36 def list(self):
37 fsmap = mgr.get("fs_map")
38 return fsmap['filesystems']
39
40 def get(self, fs_id):
41 fs_id = self.fs_id_to_int(fs_id)
42 return self.fs_status(fs_id)
43
44 @RESTController.Resource('GET')
45 def clients(self, fs_id):
46 fs_id = self.fs_id_to_int(fs_id)
47
48 return self._clients(fs_id)
49
50 @RESTController.Resource('DELETE', path='/client/{client_id}')
51 def evict(self, fs_id, client_id):
52 fs_id = self.fs_id_to_int(fs_id)
53 client_id = self.client_id_to_int(client_id)
54
55 return self._evict(fs_id, client_id)
56
57 @RESTController.Resource('GET')
58 def mds_counters(self, fs_id, counters=None):
59 fs_id = self.fs_id_to_int(fs_id)
60 return self._mds_counters(fs_id, counters)
61
62 def _mds_counters(self, fs_id, counters=None):
63 """
64 Result format: map of daemon name to map of counter to list of datapoints
65 rtype: dict[str, dict[str, list]]
66 """
67
68 if counters is None:
69 # Opinionated list of interesting performance counters for the GUI
70 counters = [
71 "mds_server.handle_client_request",
72 "mds_log.ev",
73 "mds_cache.num_strays",
74 "mds.exported",
75 "mds.exported_inodes",
76 "mds.imported",
77 "mds.imported_inodes",
78 "mds.inodes",
79 "mds.caps",
80 "mds.subtrees",
81 "mds_mem.ino"
82 ]
83
84 result: dict = {}
85 mds_names = self._get_mds_names(fs_id)
86
87 for mds_name in mds_names:
88 result[mds_name] = {}
89 for counter in counters:
90 data = mgr.get_counter("mds", mds_name, counter)
91 if data is not None:
92 result[mds_name][counter] = data[counter]
93 else:
94 result[mds_name][counter] = []
95
96 return dict(result)
97
98 @staticmethod
99 def fs_id_to_int(fs_id):
100 try:
101 return int(fs_id)
102 except ValueError:
103 raise DashboardException(code='invalid_cephfs_id',
104 msg="Invalid cephfs ID {}".format(fs_id),
105 component='cephfs')
106
107 @staticmethod
108 def client_id_to_int(client_id):
109 try:
110 return int(client_id)
111 except ValueError:
112 raise DashboardException(code='invalid_cephfs_client_id',
113 msg="Invalid cephfs client ID {}".format(client_id),
114 component='cephfs')
115
116 def _get_mds_names(self, filesystem_id=None):
117 names = []
118
119 fsmap = mgr.get("fs_map")
120 for fs in fsmap['filesystems']:
121 if filesystem_id is not None and fs['id'] != filesystem_id:
122 continue
123 names.extend([info['name']
124 for _, info in fs['mdsmap']['info'].items()])
125
126 if filesystem_id is None:
127 names.extend(info['name'] for info in fsmap['standbys'])
128
129 return names
130
131 def _append_mds_metadata(self, mds_versions, metadata_key):
132 metadata = mgr.get_metadata('mds', metadata_key)
133 if metadata is None:
134 return
135 mds_versions[metadata.get('ceph_version', 'unknown')].append(metadata_key)
136
137 def _find_standby_replays(self, mdsmap_info, rank_table):
138 # pylint: disable=unused-variable
139 for gid_str, daemon_info in mdsmap_info.items():
140 if daemon_info['state'] != "up:standby-replay":
141 continue
142
143 inos = mgr.get_latest("mds", daemon_info['name'], "mds_mem.ino")
144 dns = mgr.get_latest("mds", daemon_info['name'], "mds_mem.dn")
145 dirs = mgr.get_latest("mds", daemon_info['name'], "mds_mem.dir")
146 caps = mgr.get_latest("mds", daemon_info['name'], "mds_mem.cap")
147
148 activity = CephService.get_rate(
149 "mds", daemon_info['name'], "mds_log.replay")
150
151 rank_table.append(
152 {
153 "rank": "{0}-s".format(daemon_info['rank']),
154 "state": "standby-replay",
155 "mds": daemon_info['name'],
156 "activity": activity,
157 "dns": dns,
158 "inos": inos,
159 "dirs": dirs,
160 "caps": caps
161 }
162 )
163
164 def get_standby_table(self, standbys, mds_versions):
165 standby_table = []
166 for standby in standbys:
167 self._append_mds_metadata(mds_versions, standby['name'])
168 standby_table.append({
169 'name': standby['name']
170 })
171 return standby_table
172
173 # pylint: disable=too-many-statements,too-many-branches
174 def fs_status(self, fs_id):
175 mds_versions: dict = defaultdict(list)
176
177 fsmap = mgr.get("fs_map")
178 filesystem = None
179 for fs in fsmap['filesystems']:
180 if fs['id'] == fs_id:
181 filesystem = fs
182 break
183
184 if filesystem is None:
185 raise cherrypy.HTTPError(404,
186 "CephFS id {0} not found".format(fs_id))
187
188 rank_table = []
189
190 mdsmap = filesystem['mdsmap']
191
192 client_count = 0
193
194 for rank in mdsmap["in"]:
195 up = "mds_{0}".format(rank) in mdsmap["up"]
196 if up:
197 gid = mdsmap['up']["mds_{0}".format(rank)]
198 info = mdsmap['info']['gid_{0}'.format(gid)]
199 dns = mgr.get_latest("mds", info['name'], "mds_mem.dn")
200 inos = mgr.get_latest("mds", info['name'], "mds_mem.ino")
201 dirs = mgr.get_latest("mds", info['name'], "mds_mem.dir")
202 caps = mgr.get_latest("mds", info['name'], "mds_mem.cap")
203
204 # In case rank 0 was down, look at another rank's
205 # sessionmap to get an indication of clients.
206 if rank == 0 or client_count == 0:
207 client_count = mgr.get_latest("mds", info['name'],
208 "mds_sessions.session_count")
209
210 laggy = "laggy_since" in info
211
212 state = info['state'].split(":")[1]
213 if laggy:
214 state += "(laggy)"
215
216 # Populate based on context of state, e.g. client
217 # ops for an active daemon, replay progress, reconnect
218 # progress
219 if state == "active":
220 activity = CephService.get_rate("mds",
221 info['name'],
222 "mds_server.handle_client_request")
223 else:
224 activity = 0.0 # pragma: no cover
225
226 self._append_mds_metadata(mds_versions, info['name'])
227 rank_table.append(
228 {
229 "rank": rank,
230 "state": state,
231 "mds": info['name'],
232 "activity": activity,
233 "dns": dns,
234 "inos": inos,
235 "dirs": dirs,
236 "caps": caps
237 }
238 )
239
240 else:
241 rank_table.append(
242 {
243 "rank": rank,
244 "state": "failed",
245 "mds": "",
246 "activity": 0.0,
247 "dns": 0,
248 "inos": 0,
249 "dirs": 0,
250 "caps": 0
251 }
252 )
253
254 self._find_standby_replays(mdsmap['info'], rank_table)
255
256 df = mgr.get("df")
257 pool_stats = {p['id']: p['stats'] for p in df['pools']}
258 osdmap = mgr.get("osd_map")
259 pools = {p['pool']: p for p in osdmap['pools']}
260 metadata_pool_id = mdsmap['metadata_pool']
261 data_pool_ids = mdsmap['data_pools']
262
263 pools_table = []
264 for pool_id in [metadata_pool_id] + data_pool_ids:
265 pool_type = "metadata" if pool_id == metadata_pool_id else "data"
266 stats = pool_stats[pool_id]
267 pools_table.append({
268 "pool": pools[pool_id]['pool_name'],
269 "type": pool_type,
270 "used": stats['stored'],
271 "avail": stats['max_avail']
272 })
273
274 standby_table = self.get_standby_table(fsmap['standbys'], mds_versions)
275
276 return {
277 "cephfs": {
278 "id": fs_id,
279 "name": mdsmap['fs_name'],
280 "client_count": client_count,
281 "ranks": rank_table,
282 "pools": pools_table
283 },
284 "standbys": standby_table,
285 "versions": mds_versions
286 }
287
288 def _clients(self, fs_id):
289 cephfs_clients = self.cephfs_clients.get(fs_id, None)
290 if cephfs_clients is None:
291 cephfs_clients = CephFSClients(mgr, fs_id)
292 self.cephfs_clients[fs_id] = cephfs_clients
293
294 try:
295 status, clients = cephfs_clients.get()
296 except AttributeError:
297 raise cherrypy.HTTPError(404,
298 "No cephfs with id {0}".format(fs_id))
299
300 if clients is None:
301 raise cherrypy.HTTPError(404,
302 "No cephfs with id {0}".format(fs_id))
303
304 # Decorate the metadata with some fields that will be
305 # indepdendent of whether it's a kernel or userspace
306 # client, so that the javascript doesn't have to grok that.
307 for client in clients:
308 if "ceph_version" in client['client_metadata']: # pragma: no cover - no complexity
309 client['type'] = "userspace"
310 client['version'] = client['client_metadata']['ceph_version']
311 client['hostname'] = client['client_metadata']['hostname']
312 client['root'] = client['client_metadata']['root']
313 elif "kernel_version" in client['client_metadata']: # pragma: no cover - no complexity
314 client['type'] = "kernel"
315 client['version'] = client['client_metadata']['kernel_version']
316 client['hostname'] = client['client_metadata']['hostname']
317 client['root'] = client['client_metadata']['root']
318 else: # pragma: no cover - no complexity there
319 client['type'] = "unknown"
320 client['version'] = ""
321 client['hostname'] = ""
322
323 return {
324 'status': status,
325 'data': clients
326 }
327
328 def _evict(self, fs_id, client_id):
329 clients = self._clients(fs_id)
330 if not [c for c in clients['data'] if c['id'] == client_id]:
331 raise cherrypy.HTTPError(404,
332 "Client {0} does not exist in cephfs {1}".format(client_id,
333 fs_id))
334 filters = [f'id={client_id}']
335 CephService.send_command('mds', 'client evict',
336 srv_spec='{0}:0'.format(fs_id), filters=filters)
337
338 @staticmethod
339 def _cephfs_instance(fs_id):
340 """
341 :param fs_id: The filesystem identifier.
342 :type fs_id: int | str
343 :return: A instance of the CephFS class.
344 """
345 fs_name = CephFS_.fs_name_from_id(fs_id)
346 if fs_name is None:
347 raise cherrypy.HTTPError(404, "CephFS id {} not found".format(fs_id))
348 return CephFS_(fs_name)
349
350 @RESTController.Resource('GET')
351 def get_root_directory(self, fs_id):
352 """
353 The root directory that can't be fetched using ls_dir (api).
354 :param fs_id: The filesystem identifier.
355 :return: The root directory
356 :rtype: dict
357 """
358 try:
359 return self._get_root_directory(self._cephfs_instance(fs_id))
360 except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
361 return None
362
363 def _get_root_directory(self, cfs):
364 """
365 The root directory that can't be fetched using ls_dir (api).
366 It's used in ls_dir (ui-api) and in get_root_directory (api).
367 :param cfs: CephFS service instance
368 :type cfs: CephFS
369 :return: The root directory
370 :rtype: dict
371 """
372 return cfs.get_directory(os.sep.encode())
373
374 @handle_cephfs_error()
375 @RESTController.Resource('GET')
376 def ls_dir(self, fs_id, path=None, depth=1):
377 """
378 List directories of specified path.
379 :param fs_id: The filesystem identifier.
380 :param path: The path where to start listing the directory content.
381 Defaults to '/' if not set.
382 :type path: str | bytes
383 :param depth: The number of steps to go down the directory tree.
384 :type depth: int | str
385 :return: The names of the directories below the specified path.
386 :rtype: list
387 """
388 path = self._set_ls_dir_path(path)
389 try:
390 cfs = self._cephfs_instance(fs_id)
391 paths = cfs.ls_dir(path, depth)
392 except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
393 paths = []
394 return paths
395
396 def _set_ls_dir_path(self, path):
397 """
398 Transforms input path parameter of ls_dir methods (api and ui-api).
399 :param path: The path where to start listing the directory content.
400 Defaults to '/' if not set.
401 :type path: str | bytes
402 :return: Normalized path or root path
403 :return: str
404 """
405 if path is None:
406 path = os.sep
407 else:
408 path = os.path.normpath(path)
409 return path
410
411 @RESTController.Resource('POST', path='/tree')
412 @allow_empty_body
413 def mk_tree(self, fs_id, path):
414 """
415 Create a directory.
416 :param fs_id: The filesystem identifier.
417 :param path: The path of the directory.
418 """
419 cfs = self._cephfs_instance(fs_id)
420 cfs.mk_dirs(path)
421
422 @RESTController.Resource('DELETE', path='/tree')
423 def rm_tree(self, fs_id, path):
424 """
425 Remove a directory.
426 :param fs_id: The filesystem identifier.
427 :param path: The path of the directory.
428 """
429 cfs = self._cephfs_instance(fs_id)
430 cfs.rm_dir(path)
431
432 @RESTController.Resource('PUT', path='/quota')
433 @allow_empty_body
434 def quota(self, fs_id, path, max_bytes=None, max_files=None):
435 """
436 Set the quotas of the specified path.
437 :param fs_id: The filesystem identifier.
438 :param path: The path of the directory/file.
439 :param max_bytes: The byte limit.
440 :param max_files: The file limit.
441 """
442 cfs = self._cephfs_instance(fs_id)
443 return cfs.set_quotas(path, max_bytes, max_files)
444
445 @RESTController.Resource('GET', path='/quota')
446 @EndpointDoc("Get Cephfs Quotas of the specified path",
447 parameters={
448 'fs_id': (str, 'File System Identifier'),
449 'path': (str, 'File System Path'),
450 },
451 responses={200: GET_QUOTAS_SCHEMA})
452 def get_quota(self, fs_id, path):
453 """
454 Get the quotas of the specified path.
455 :param fs_id: The filesystem identifier.
456 :param path: The path of the directory/file.
457 :return: Returns a dictionary containing 'max_bytes'
458 and 'max_files'.
459 :rtype: dict
460 """
461 cfs = self._cephfs_instance(fs_id)
462 return cfs.get_quotas(path)
463
464 @RESTController.Resource('POST', path='/snapshot')
465 @allow_empty_body
466 def snapshot(self, fs_id, path, name=None):
467 """
468 Create a snapshot.
469 :param fs_id: The filesystem identifier.
470 :param path: The path of the directory.
471 :param name: The name of the snapshot. If not specified, a name using the
472 current time in RFC3339 UTC format will be generated.
473 :return: The name of the snapshot.
474 :rtype: str
475 """
476 cfs = self._cephfs_instance(fs_id)
477 list_snaps = cfs.ls_snapshots(path)
478 for snap in list_snaps:
479 if name == snap['name']:
480 raise DashboardException(code='Snapshot name already in use',
481 msg='Snapshot name {} is already in use.'
482 'Please use another name'.format(name),
483 component='cephfs')
484
485 return cfs.mk_snapshot(path, name)
486
487 @RESTController.Resource('DELETE', path='/snapshot')
488 def rm_snapshot(self, fs_id, path, name):
489 """
490 Remove a snapshot.
491 :param fs_id: The filesystem identifier.
492 :param path: The path of the directory.
493 :param name: The name of the snapshot.
494 """
495 cfs = self._cephfs_instance(fs_id)
496 cfs.rm_snapshot(path, name)
497
498
499 class CephFSClients(object):
500 def __init__(self, module_inst, fscid):
501 self._module = module_inst
502 self.fscid = fscid
503
504 @ViewCache()
505 def get(self):
506 return CephService.send_command('mds', 'session ls', srv_spec='{0}:0'.format(self.fscid))
507
508
509 @UIRouter('/cephfs', Scope.CEPHFS)
510 @APIDoc("Dashboard UI helper function; not part of the public API", "CephFSUi")
511 class CephFsUi(CephFS):
512 RESOURCE_ID = 'fs_id'
513
514 @RESTController.Resource('GET')
515 def tabs(self, fs_id):
516 data = {}
517 fs_id = self.fs_id_to_int(fs_id)
518
519 # Needed for detail tab
520 fs_status = self.fs_status(fs_id)
521 for pool in fs_status['cephfs']['pools']:
522 pool['size'] = pool['used'] + pool['avail']
523 data['pools'] = fs_status['cephfs']['pools']
524 data['ranks'] = fs_status['cephfs']['ranks']
525 data['name'] = fs_status['cephfs']['name']
526 data['standbys'] = ', '.join([x['name'] for x in fs_status['standbys']])
527 counters = self._mds_counters(fs_id)
528 for k, v in counters.items():
529 v['name'] = k
530 data['mds_counters'] = counters
531
532 # Needed for client tab
533 data['clients'] = self._clients(fs_id)
534
535 return data
536
537 @handle_cephfs_error()
538 @RESTController.Resource('GET')
539 def ls_dir(self, fs_id, path=None, depth=1):
540 """
541 The difference to the API version is that the root directory will be send when listing
542 the root directory.
543 To only do one request this endpoint was created.
544 :param fs_id: The filesystem identifier.
545 :type fs_id: int | str
546 :param path: The path where to start listing the directory content.
547 Defaults to '/' if not set.
548 :type path: str | bytes
549 :param depth: The number of steps to go down the directory tree.
550 :type depth: int | str
551 :return: The names of the directories below the specified path.
552 :rtype: list
553 """
554 path = self._set_ls_dir_path(path)
555 try:
556 cfs = self._cephfs_instance(fs_id)
557 paths = cfs.ls_dir(path, depth)
558 if path == os.sep:
559 paths = [self._get_root_directory(cfs)] + paths
560 except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
561 paths = []
562 return paths