]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/dashboard/controllers/cephfs.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / dashboard / controllers / cephfs.py
1 # -*- coding: utf-8 -*-
2 import logging
3 import os
4 from collections import defaultdict
5
6 import cephfs
7 import cherrypy
8
9 from .. import mgr
10 from ..exceptions import DashboardException
11 from ..security import Scope
12 from ..services.ceph_service import CephService
13 from ..services.cephfs import CephFS as CephFS_
14 from ..services.exception import handle_cephfs_error
15 from ..tools import ViewCache
16 from . import APIDoc, APIRouter, EndpointDoc, RESTController, UIRouter, allow_empty_body
17
18 GET_QUOTAS_SCHEMA = {
19 'max_bytes': (int, ''),
20 'max_files': (int, '')
21 }
22
23 logger = logging.getLogger("controllers.rgw")
24
25
26 @APIRouter('/cephfs', Scope.CEPHFS)
27 @APIDoc("Cephfs Management API", "Cephfs")
28 class CephFS(RESTController):
29 def __init__(self): # pragma: no cover
30 super().__init__()
31
32 # Stateful instances of CephFSClients, hold cached results. Key to
33 # dict is FSCID
34 self.cephfs_clients = {}
35
36 def list(self):
37 fsmap = mgr.get("fs_map")
38 return fsmap['filesystems']
39
40 def get(self, fs_id):
41 fs_id = self.fs_id_to_int(fs_id)
42 return self.fs_status(fs_id)
43
44 @RESTController.Resource('GET')
45 def clients(self, fs_id):
46 fs_id = self.fs_id_to_int(fs_id)
47
48 return self._clients(fs_id)
49
50 @RESTController.Resource('DELETE', path='/client/{client_id}')
51 def evict(self, fs_id, client_id):
52 fs_id = self.fs_id_to_int(fs_id)
53 client_id = self.client_id_to_int(client_id)
54
55 return self._evict(fs_id, client_id)
56
57 @RESTController.Resource('GET')
58 def mds_counters(self, fs_id, counters=None):
59 fs_id = self.fs_id_to_int(fs_id)
60 return self._mds_counters(fs_id, counters)
61
62 def _mds_counters(self, fs_id, counters=None):
63 """
64 Result format: map of daemon name to map of counter to list of datapoints
65 rtype: dict[str, dict[str, list]]
66 """
67
68 if counters is None:
69 # Opinionated list of interesting performance counters for the GUI
70 counters = [
71 "mds_server.handle_client_request",
72 "mds_log.ev",
73 "mds_cache.num_strays",
74 "mds.exported",
75 "mds.exported_inodes",
76 "mds.imported",
77 "mds.imported_inodes",
78 "mds.inodes",
79 "mds.caps",
80 "mds.subtrees",
81 "mds_mem.ino"
82 ]
83
84 result: dict = {}
85 mds_names = self._get_mds_names(fs_id)
86
87 for mds_name in mds_names:
88 result[mds_name] = {}
89 for counter in counters:
90 data = mgr.get_counter("mds", mds_name, counter)
91 if data is not None:
92 result[mds_name][counter] = data[counter]
93 else:
94 result[mds_name][counter] = []
95
96 return dict(result)
97
98 @staticmethod
99 def fs_id_to_int(fs_id):
100 try:
101 return int(fs_id)
102 except ValueError:
103 raise DashboardException(code='invalid_cephfs_id',
104 msg="Invalid cephfs ID {}".format(fs_id),
105 component='cephfs')
106
107 @staticmethod
108 def client_id_to_int(client_id):
109 try:
110 return int(client_id)
111 except ValueError:
112 raise DashboardException(code='invalid_cephfs_client_id',
113 msg="Invalid cephfs client ID {}".format(client_id),
114 component='cephfs')
115
116 def _get_mds_names(self, filesystem_id=None):
117 names = []
118
119 fsmap = mgr.get("fs_map")
120 for fs in fsmap['filesystems']:
121 if filesystem_id is not None and fs['id'] != filesystem_id:
122 continue
123 names.extend([info['name']
124 for _, info in fs['mdsmap']['info'].items()])
125
126 if filesystem_id is None:
127 names.extend(info['name'] for info in fsmap['standbys'])
128
129 return names
130
131 def _append_mds_metadata(self, mds_versions, metadata_key):
132 metadata = mgr.get_metadata('mds', metadata_key)
133 if metadata is None:
134 return
135 mds_versions[metadata.get('ceph_version', 'unknown')].append(metadata_key)
136
137 # pylint: disable=too-many-statements,too-many-branches
138 def fs_status(self, fs_id):
139 mds_versions: dict = defaultdict(list)
140
141 fsmap = mgr.get("fs_map")
142 filesystem = None
143 for fs in fsmap['filesystems']:
144 if fs['id'] == fs_id:
145 filesystem = fs
146 break
147
148 if filesystem is None:
149 raise cherrypy.HTTPError(404,
150 "CephFS id {0} not found".format(fs_id))
151
152 rank_table = []
153
154 mdsmap = filesystem['mdsmap']
155
156 client_count = 0
157
158 for rank in mdsmap["in"]:
159 up = "mds_{0}".format(rank) in mdsmap["up"]
160 if up:
161 gid = mdsmap['up']["mds_{0}".format(rank)]
162 info = mdsmap['info']['gid_{0}'.format(gid)]
163 dns = mgr.get_latest("mds", info['name'], "mds_mem.dn")
164 inos = mgr.get_latest("mds", info['name'], "mds_mem.ino")
165 dirs = mgr.get_latest("mds", info['name'], "mds_mem.dir")
166 caps = mgr.get_latest("mds", info['name'], "mds_mem.cap")
167
168 if rank == 0:
169 client_count = mgr.get_latest("mds", info['name'],
170 "mds_sessions.session_count")
171 elif client_count == 0:
172 # In case rank 0 was down, look at another rank's
173 # sessionmap to get an indication of clients.
174 client_count = mgr.get_latest("mds", info['name'],
175 "mds_sessions.session_count")
176
177 laggy = "laggy_since" in info
178
179 state = info['state'].split(":")[1]
180 if laggy:
181 state += "(laggy)"
182
183 # Populate based on context of state, e.g. client
184 # ops for an active daemon, replay progress, reconnect
185 # progress
186 if state == "active":
187 activity = CephService.get_rate("mds",
188 info['name'],
189 "mds_server.handle_client_request")
190 else:
191 activity = 0.0 # pragma: no cover
192
193 self._append_mds_metadata(mds_versions, info['name'])
194 rank_table.append(
195 {
196 "rank": rank,
197 "state": state,
198 "mds": info['name'],
199 "activity": activity,
200 "dns": dns,
201 "inos": inos,
202 "dirs": dirs,
203 "caps": caps
204 }
205 )
206
207 else:
208 rank_table.append(
209 {
210 "rank": rank,
211 "state": "failed",
212 "mds": "",
213 "activity": 0.0,
214 "dns": 0,
215 "inos": 0,
216 "dirs": 0,
217 "caps": 0
218 }
219 )
220
221 # Find the standby replays
222 # pylint: disable=unused-variable
223 for gid_str, daemon_info in mdsmap['info'].items():
224 if daemon_info['state'] != "up:standby-replay":
225 continue
226
227 inos = mgr.get_latest("mds", daemon_info['name'], "mds_mem.ino")
228 dns = mgr.get_latest("mds", daemon_info['name'], "mds_mem.dn")
229 dirs = mgr.get_latest("mds", daemon_info['name'], "mds_mem.dir")
230 caps = mgr.get_latest("mds", daemon_info['name'], "mds_mem.cap")
231
232 activity = CephService.get_rate(
233 "mds", daemon_info['name'], "mds_log.replay")
234
235 rank_table.append(
236 {
237 "rank": "{0}-s".format(daemon_info['rank']),
238 "state": "standby-replay",
239 "mds": daemon_info['name'],
240 "activity": activity,
241 "dns": dns,
242 "inos": inos,
243 "dirs": dirs,
244 "caps": caps
245 }
246 )
247
248 df = mgr.get("df")
249 pool_stats = {p['id']: p['stats'] for p in df['pools']}
250 osdmap = mgr.get("osd_map")
251 pools = {p['pool']: p for p in osdmap['pools']}
252 metadata_pool_id = mdsmap['metadata_pool']
253 data_pool_ids = mdsmap['data_pools']
254
255 pools_table = []
256 for pool_id in [metadata_pool_id] + data_pool_ids:
257 pool_type = "metadata" if pool_id == metadata_pool_id else "data"
258 stats = pool_stats[pool_id]
259 pools_table.append({
260 "pool": pools[pool_id]['pool_name'],
261 "type": pool_type,
262 "used": stats['stored'],
263 "avail": stats['max_avail']
264 })
265
266 standby_table = []
267 for standby in fsmap['standbys']:
268 self._append_mds_metadata(mds_versions, standby['name'])
269 standby_table.append({
270 'name': standby['name']
271 })
272
273 return {
274 "cephfs": {
275 "id": fs_id,
276 "name": mdsmap['fs_name'],
277 "client_count": client_count,
278 "ranks": rank_table,
279 "pools": pools_table
280 },
281 "standbys": standby_table,
282 "versions": mds_versions
283 }
284
285 def _clients(self, fs_id):
286 cephfs_clients = self.cephfs_clients.get(fs_id, None)
287 if cephfs_clients is None:
288 cephfs_clients = CephFSClients(mgr, fs_id)
289 self.cephfs_clients[fs_id] = cephfs_clients
290
291 try:
292 status, clients = cephfs_clients.get()
293 except AttributeError:
294 raise cherrypy.HTTPError(404,
295 "No cephfs with id {0}".format(fs_id))
296
297 if clients is None:
298 raise cherrypy.HTTPError(404,
299 "No cephfs with id {0}".format(fs_id))
300
301 # Decorate the metadata with some fields that will be
302 # indepdendent of whether it's a kernel or userspace
303 # client, so that the javascript doesn't have to grok that.
304 for client in clients:
305 if "ceph_version" in client['client_metadata']: # pragma: no cover - no complexity
306 client['type'] = "userspace"
307 client['version'] = client['client_metadata']['ceph_version']
308 client['hostname'] = client['client_metadata']['hostname']
309 client['root'] = client['client_metadata']['root']
310 elif "kernel_version" in client['client_metadata']: # pragma: no cover - no complexity
311 client['type'] = "kernel"
312 client['version'] = client['client_metadata']['kernel_version']
313 client['hostname'] = client['client_metadata']['hostname']
314 client['root'] = client['client_metadata']['root']
315 else: # pragma: no cover - no complexity there
316 client['type'] = "unknown"
317 client['version'] = ""
318 client['hostname'] = ""
319
320 return {
321 'status': status,
322 'data': clients
323 }
324
325 def _evict(self, fs_id, client_id):
326 clients = self._clients(fs_id)
327 if not [c for c in clients['data'] if c['id'] == client_id]:
328 raise cherrypy.HTTPError(404,
329 "Client {0} does not exist in cephfs {1}".format(client_id,
330 fs_id))
331 CephService.send_command('mds', 'client evict',
332 srv_spec='{0}:0'.format(fs_id), id=client_id)
333
334 @staticmethod
335 def _cephfs_instance(fs_id):
336 """
337 :param fs_id: The filesystem identifier.
338 :type fs_id: int | str
339 :return: A instance of the CephFS class.
340 """
341 fs_name = CephFS_.fs_name_from_id(fs_id)
342 if fs_name is None:
343 raise cherrypy.HTTPError(404, "CephFS id {} not found".format(fs_id))
344 return CephFS_(fs_name)
345
346 @RESTController.Resource('GET')
347 def get_root_directory(self, fs_id):
348 """
349 The root directory that can't be fetched using ls_dir (api).
350 :param fs_id: The filesystem identifier.
351 :return: The root directory
352 :rtype: dict
353 """
354 try:
355 return self._get_root_directory(self._cephfs_instance(fs_id))
356 except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
357 return None
358
359 def _get_root_directory(self, cfs):
360 """
361 The root directory that can't be fetched using ls_dir (api).
362 It's used in ls_dir (ui-api) and in get_root_directory (api).
363 :param cfs: CephFS service instance
364 :type cfs: CephFS
365 :return: The root directory
366 :rtype: dict
367 """
368 return cfs.get_directory(os.sep.encode())
369
370 @handle_cephfs_error()
371 @RESTController.Resource('GET')
372 def ls_dir(self, fs_id, path=None, depth=1):
373 """
374 List directories of specified path.
375 :param fs_id: The filesystem identifier.
376 :param path: The path where to start listing the directory content.
377 Defaults to '/' if not set.
378 :type path: str | bytes
379 :param depth: The number of steps to go down the directory tree.
380 :type depth: int | str
381 :return: The names of the directories below the specified path.
382 :rtype: list
383 """
384 path = self._set_ls_dir_path(path)
385 try:
386 cfs = self._cephfs_instance(fs_id)
387 paths = cfs.ls_dir(path, depth)
388 except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
389 paths = []
390 return paths
391
392 def _set_ls_dir_path(self, path):
393 """
394 Transforms input path parameter of ls_dir methods (api and ui-api).
395 :param path: The path where to start listing the directory content.
396 Defaults to '/' if not set.
397 :type path: str | bytes
398 :return: Normalized path or root path
399 :return: str
400 """
401 if path is None:
402 path = os.sep
403 else:
404 path = os.path.normpath(path)
405 return path
406
407 @RESTController.Resource('POST', path='/tree')
408 @allow_empty_body
409 def mk_tree(self, fs_id, path):
410 """
411 Create a directory.
412 :param fs_id: The filesystem identifier.
413 :param path: The path of the directory.
414 """
415 cfs = self._cephfs_instance(fs_id)
416 cfs.mk_dirs(path)
417
418 @RESTController.Resource('DELETE', path='/tree')
419 def rm_tree(self, fs_id, path):
420 """
421 Remove a directory.
422 :param fs_id: The filesystem identifier.
423 :param path: The path of the directory.
424 """
425 cfs = self._cephfs_instance(fs_id)
426 cfs.rm_dir(path)
427
428 @RESTController.Resource('PUT', path='/quota')
429 @allow_empty_body
430 def quota(self, fs_id, path, max_bytes=None, max_files=None):
431 """
432 Set the quotas of the specified path.
433 :param fs_id: The filesystem identifier.
434 :param path: The path of the directory/file.
435 :param max_bytes: The byte limit.
436 :param max_files: The file limit.
437 """
438 cfs = self._cephfs_instance(fs_id)
439 return cfs.set_quotas(path, max_bytes, max_files)
440
441 @RESTController.Resource('GET', path='/quota')
442 @EndpointDoc("Get Cephfs Quotas of the specified path",
443 parameters={
444 'fs_id': (str, 'File System Identifier'),
445 'path': (str, 'File System Path'),
446 },
447 responses={200: GET_QUOTAS_SCHEMA})
448 def get_quota(self, fs_id, path):
449 """
450 Get the quotas of the specified path.
451 :param fs_id: The filesystem identifier.
452 :param path: The path of the directory/file.
453 :return: Returns a dictionary containing 'max_bytes'
454 and 'max_files'.
455 :rtype: dict
456 """
457 cfs = self._cephfs_instance(fs_id)
458 return cfs.get_quotas(path)
459
460 @RESTController.Resource('POST', path='/snapshot')
461 @allow_empty_body
462 def snapshot(self, fs_id, path, name=None):
463 """
464 Create a snapshot.
465 :param fs_id: The filesystem identifier.
466 :param path: The path of the directory.
467 :param name: The name of the snapshot. If not specified, a name using the
468 current time in RFC3339 UTC format will be generated.
469 :return: The name of the snapshot.
470 :rtype: str
471 """
472 cfs = self._cephfs_instance(fs_id)
473 list_snaps = cfs.ls_snapshots(path)
474 for snap in list_snaps:
475 if name == snap['name']:
476 raise DashboardException(code='Snapshot name already in use',
477 msg='Snapshot name {} is already in use.'
478 'Please use another name'.format(name),
479 component='cephfs')
480
481 return cfs.mk_snapshot(path, name)
482
483 @RESTController.Resource('DELETE', path='/snapshot')
484 def rm_snapshot(self, fs_id, path, name):
485 """
486 Remove a snapshot.
487 :param fs_id: The filesystem identifier.
488 :param path: The path of the directory.
489 :param name: The name of the snapshot.
490 """
491 cfs = self._cephfs_instance(fs_id)
492 cfs.rm_snapshot(path, name)
493
494
495 class CephFSClients(object):
496 def __init__(self, module_inst, fscid):
497 self._module = module_inst
498 self.fscid = fscid
499
500 @ViewCache()
501 def get(self):
502 return CephService.send_command('mds', 'session ls', srv_spec='{0}:0'.format(self.fscid))
503
504
505 @UIRouter('/cephfs', Scope.CEPHFS)
506 @APIDoc("Dashboard UI helper function; not part of the public API", "CephFSUi")
507 class CephFsUi(CephFS):
508 RESOURCE_ID = 'fs_id'
509
510 @RESTController.Resource('GET')
511 def tabs(self, fs_id):
512 data = {}
513 fs_id = self.fs_id_to_int(fs_id)
514
515 # Needed for detail tab
516 fs_status = self.fs_status(fs_id)
517 for pool in fs_status['cephfs']['pools']:
518 pool['size'] = pool['used'] + pool['avail']
519 data['pools'] = fs_status['cephfs']['pools']
520 data['ranks'] = fs_status['cephfs']['ranks']
521 data['name'] = fs_status['cephfs']['name']
522 data['standbys'] = ', '.join([x['name'] for x in fs_status['standbys']])
523 counters = self._mds_counters(fs_id)
524 for k, v in counters.items():
525 v['name'] = k
526 data['mds_counters'] = counters
527
528 # Needed for client tab
529 data['clients'] = self._clients(fs_id)
530
531 return data
532
533 @handle_cephfs_error()
534 @RESTController.Resource('GET')
535 def ls_dir(self, fs_id, path=None, depth=1):
536 """
537 The difference to the API version is that the root directory will be send when listing
538 the root directory.
539 To only do one request this endpoint was created.
540 :param fs_id: The filesystem identifier.
541 :type fs_id: int | str
542 :param path: The path where to start listing the directory content.
543 Defaults to '/' if not set.
544 :type path: str | bytes
545 :param depth: The number of steps to go down the directory tree.
546 :type depth: int | str
547 :return: The names of the directories below the specified path.
548 :rtype: list
549 """
550 path = self._set_ls_dir_path(path)
551 try:
552 cfs = self._cephfs_instance(fs_id)
553 paths = cfs.ls_dir(path, depth)
554 if path == os.sep:
555 paths = [self._get_root_directory(cfs)] + paths
556 except (cephfs.PermissionError, cephfs.ObjectNotFound): # pragma: no cover
557 paths = []
558 return paths