from __future__ import absolute_import
from functools import partial
+import logging
import cherrypy
import cephfs
from . import ApiController, RESTController, UiApiController, BaseController, \
Endpoint, Task, ReadPermission, ControllerDoc, EndpointDoc
-from .. import logger
from ..security import Scope
from ..services.cephfs import CephFS
from ..services.cephx import CephX
from ..services.rgw_client import RgwClient
+logger = logging.getLogger('controllers.ganesha')
+
+
# documentation helpers
EXPORT_SCHEMA = {
'export_id': (int, 'Export ID'),
# pylint: disable=not-callable
-def NfsTask(name, metadata, wait_for):
+def NfsTask(name, metadata, wait_for): # noqa: N802
def composed_decorator(func):
return Task("nfs/{}".format(name), metadata, wait_for,
partial(serialize_dashboard_exception,
try:
Ganesha.get_ganesha_clusters()
except NFSException as e:
- status['message'] = str(e)
+ status['message'] = str(e) # type: ignore
status['available'] = False
return status
ganesha_conf = GaneshaConf.instance(cluster_id)
if not ganesha_conf.has_export(export_id):
- raise cherrypy.HTTPError(404)
+ raise cherrypy.HTTPError(404) # pragma: no cover - the handling is too obvious
if fsal['name'] not in Ganesha.fsals_available():
raise NFSException("Cannot make modifications to this export. "
ganesha_conf = GaneshaConf.instance(cluster_id)
if not ganesha_conf.has_export(export_id):
- raise cherrypy.HTTPError(404)
-
+ raise cherrypy.HTTPError(404) # pragma: no cover - the handling is too obvious
export = ganesha_conf.remove_export(export_id)
if reload_daemons:
ganesha_conf.reload_daemons(export.daemons)
'status': status_dict[cluster_id][daemon_id]['status'],
'desc': status_dict[cluster_id][daemon_id]['desc']
}
- for daemon_id in status_dict[cluster_id]
for cluster_id in status_dict
+ for daemon_id in status_dict[cluster_id]
]
result = []
return Ganesha.fsals_available()
@Endpoint('GET', '/lsdir')
- def lsdir(self, root_dir=None, depth=1):
+ def lsdir(self, root_dir=None, depth=1): # pragma: no cover
if root_dir is None:
root_dir = "/"
depth = int(depth)
if depth > 5:
- logger.warning("[NFS] Limiting depth to maximum value of 5: "
+ logger.warning("Limiting depth to maximum value of 5: "
"input depth=%s", depth)
depth = 5
- root_dir = '{}/'.format(root_dir) \
- if not root_dir.endswith('/') else root_dir
-
+ root_dir = '{}{}'.format(root_dir.rstrip('/'), '/')
try:
cfs = CephFS()
- paths = cfs.get_dir_list(root_dir, depth)
- paths = [p[:-1] for p in paths if p != root_dir]
- return {'paths': paths}
+ root_dir = root_dir.encode()
+ paths = cfs.ls_dir(root_dir, depth)
+ # Convert (bytes => string) and prettify paths (strip slashes).
+ paths = [p.decode().rstrip('/') for p in paths if p != root_dir]
except (cephfs.ObjectNotFound, cephfs.PermissionError):
- return {'paths': []}
+ paths = []
+ return {'paths': paths}
@Endpoint('GET', '/cephfs/filesystems')
def filesystems(self):