List,
Any,
Dict,
- Tuple,
Optional,
TYPE_CHECKING,
TypeVar,
Set,
cast)
from os.path import normpath
+import cephfs
from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
+from object_format import ErrorResponse
from orchestrator import NoOrchestrator
from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
RGWFSAL,
RawBlock,
format_block)
-from .exception import NFSException, NFSInvalidOperation, FSNotFound
+from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound
from .utils import (
CONF_PREFIX,
EXPORT_PREFIX,
+ NonFatalError,
USER_CONF_PREFIX,
export_obj_name,
conf_obj_name,
available_clusters,
check_fs,
- restart_nfs_service)
+ restart_nfs_service, cephfs_path_is_dir)
if TYPE_CHECKING:
from nfs.module import Module
return clusters
-def export_cluster_checker(func: FuncT) -> FuncT:
- def cluster_check(
- export: 'ExportMgr',
- *args: Any,
- **kwargs: Any
- ) -> Tuple[int, str, str]:
- """
- This method checks if cluster exists
- """
- clusters = known_cluster_ids(export.mgr)
- cluster_id: str = kwargs['cluster_id']
- log.debug("checking for %r in known nfs clusters: %r",
- cluster_id, clusters)
- if cluster_id not in clusters:
- return -errno.ENOENT, "", "Cluster does not exist"
- return func(export, *args, **kwargs)
- return cast(FuncT, cluster_check)
-
-
-def exception_handler(
- exception_obj: Exception,
- log_msg: str = ""
-) -> Tuple[int, str, str]:
- if log_msg:
- log.exception(log_msg)
- return getattr(exception_obj, 'errno', -1), "", str(exception_obj)
-
-
def _check_rados_notify(ioctx: Any, obj: str) -> None:
try:
ioctx.notify(obj)
return ns
+class AppliedExportResults:
+ """Gathers the results of multiple changed exports.
+ Returned by apply_export.
+ """
+
+ def __init__(self) -> None:
+ self.changes: List[Dict[str, str]] = []
+ self.has_error = False
+
+ def append(self, value: Dict[str, str]) -> None:
+ if value.get("state", "") == "error":
+ self.has_error = True
+ self.changes.append(value)
+
+ def to_simplified(self) -> List[Dict[str, str]]:
+ return self.changes
+
+ def mgr_return_value(self) -> int:
+ return -errno.EIO if self.has_error else 0
+
+
class ExportMgr:
def __init__(
self,
cluster_id: str,
pseudo_path: Optional[str],
export_obj: Optional[Export] = None
- ) -> Tuple[int, str, str]:
+ ) -> None:
try:
if export_obj:
export: Optional[Export] = export_obj
if not self.exports[cluster_id]:
del self.exports[cluster_id]
log.debug("Deleted all exports for cluster %s", cluster_id)
- return 0, "Successfully deleted export", ""
- return 0, "", "Export does not exist"
+ return None
+ raise NonFatalError("Export does not exist")
except Exception as e:
- return exception_handler(e, f"Failed to delete {pseudo_path} export for {cluster_id}")
+ log.exception(f"Failed to delete {pseudo_path} export for {cluster_id}")
+ raise ErrorResponse.wrap(e)
def _fetch_export_obj(self, cluster_id: str, ex_id: int) -> Optional[Export]:
try:
if need_nfs_service_restart:
restart_nfs_service(self.mgr, export.cluster_id)
- @export_cluster_checker
- def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Tuple[int, str, str]:
+ def _validate_cluster_id(self, cluster_id: str) -> None:
+ """Raise an exception if cluster_id is not valid."""
+ clusters = known_cluster_ids(self.mgr)
+ log.debug("checking for %r in known nfs clusters: %r",
+ cluster_id, clusters)
+ if cluster_id not in clusters:
+ raise ErrorResponse(f"Cluster {cluster_id!r} does not exist",
+ return_value=-errno.ENOENT)
+
+ def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Dict[str, Any]:
+ self._validate_cluster_id(kwargs['cluster_id'])
# if addr(s) are provided, construct client list and adjust outer block
clients = []
if addr:
return self.create_rgw_export(**kwargs)
raise NotImplementedError()
except Exception as e:
- return exception_handler(e, f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
+ log.exception(
+ f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
+ raise ErrorResponse.wrap(e)
- @export_cluster_checker
def delete_export(self,
cluster_id: str,
- pseudo_path: str) -> Tuple[int, str, str]:
+ pseudo_path: str) -> None:
+ self._validate_cluster_id(cluster_id)
return self._delete_export(cluster_id, pseudo_path)
def delete_all_exports(self, cluster_id: str) -> None:
log.info("No exports to delete")
return
for export in export_list:
- ret, out, err = self._delete_export(cluster_id=cluster_id, pseudo_path=None,
- export_obj=export)
- if ret != 0:
- raise NFSException(f"Failed to delete exports: {err} and {ret}")
+ try:
+ self._delete_export(cluster_id=cluster_id, pseudo_path=None,
+ export_obj=export)
+ except Exception as e:
+ raise NFSException(f"Failed to delete export {export.export_id}: {e}")
log.info("All exports successfully deleted for cluster id: %s", cluster_id)
def list_all_exports(self) -> List[Dict[str, Any]]:
r.extend([e.to_dict() for e in ls])
return r
- @export_cluster_checker
def list_exports(self,
cluster_id: str,
- detailed: bool = False) -> Tuple[int, str, str]:
+ detailed: bool = False) -> List[Any]:
+ self._validate_cluster_id(cluster_id)
try:
if detailed:
result_d = [export.to_dict() for export in self.exports[cluster_id]]
- return 0, json.dumps(result_d, indent=2), ''
+ return result_d
else:
result_ps = [export.pseudo for export in self.exports[cluster_id]]
- return 0, json.dumps(result_ps, indent=2), ''
+ return result_ps
except KeyError:
log.warning("No exports to list for %s", cluster_id)
- return 0, '', ''
+ return []
except Exception as e:
- return exception_handler(e, f"Failed to list exports for {cluster_id}")
+ log.exception(f"Failed to list exports for {cluster_id}")
+ raise ErrorResponse.wrap(e)
def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]:
export = self._fetch_export(cluster_id, pseudo_path)
log.warning(f"No {pseudo_path} export to show for {cluster_id}")
return None
- @export_cluster_checker
def get_export(
self,
cluster_id: str,
pseudo_path: str,
- ) -> Tuple[int, str, str]:
+ ) -> Dict[str, Any]:
+ self._validate_cluster_id(cluster_id)
try:
export_dict = self._get_export_dict(cluster_id, pseudo_path)
- if export_dict:
- return 0, json.dumps(export_dict, indent=2), ''
- log.warning("No %s export to show for %s", pseudo_path, cluster_id)
- return 0, '', ''
+ log.info(f"Fetched {export_dict!r} for {cluster_id!r}, {pseudo_path!r}")
+ return export_dict if export_dict else {}
except Exception as e:
- return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}")
+ log.exception(f"Failed to get {pseudo_path} export for {cluster_id}")
+ raise ErrorResponse.wrap(e)
def get_export_by_id(
self,
export = self._fetch_export(cluster_id, pseudo_path)
return export.to_dict() if export else None
- def apply_export(self, cluster_id: str, export_config: str) -> Tuple[int, str, str]:
+ # This method is used by the dashboard module (../dashboard/controllers/nfs.py)
+ # Do not change interface without updating the Dashboard code
+ def apply_export(self, cluster_id: str, export_config: str) -> AppliedExportResults:
+ try:
+ exports = self._read_export_config(cluster_id, export_config)
+ except Exception as e:
+ log.exception(f'Failed to update export: {e}')
+ raise ErrorResponse.wrap(e)
+
+ aeresults = AppliedExportResults()
+ for export in exports:
+ aeresults.append(self._change_export(cluster_id, export))
+ return aeresults
+
+ def _read_export_config(self, cluster_id: str, export_config: str) -> List[Dict]:
+ if not export_config:
+ raise NFSInvalidOperation("Empty Config!!")
try:
- if not export_config:
- raise NFSInvalidOperation("Empty Config!!")
+ j = json.loads(export_config)
+ except ValueError:
+ # okay, not JSON. is it an EXPORT block?
try:
- j = json.loads(export_config)
- except ValueError:
- # okay, not JSON. is it an EXPORT block?
- try:
- blocks = GaneshaConfParser(export_config).parse()
- exports = [
- Export.from_export_block(block, cluster_id)
- for block in blocks
- ]
- j = [export.to_dict() for export in exports]
- except Exception as ex:
- raise NFSInvalidOperation(f"Input must be JSON or a ganesha EXPORT block: {ex}")
-
- # check export type
- if isinstance(j, list):
- ret, out, err = (0, '', '')
- for export in j:
- try:
- r, o, e = self._apply_export(cluster_id, export)
- except Exception as ex:
- r, o, e = exception_handler(ex, f'Failed to apply export: {ex}')
- if r:
- ret = r
- if o:
- out += o + '\n'
- if e:
- err += e + '\n'
- return ret, out, err
- else:
- r, o, e = self._apply_export(cluster_id, j)
- return r, o, e
+ blocks = GaneshaConfParser(export_config).parse()
+ exports = [
+ Export.from_export_block(block, cluster_id)
+ for block in blocks
+ ]
+ j = [export.to_dict() for export in exports]
+ except Exception as ex:
+ raise NFSInvalidOperation(f"Input must be JSON or a ganesha EXPORT block: {ex}")
+ # check export type - always return a list
+ if isinstance(j, list):
+ return j # j is already a list object
+ return [j] # return a single object list, with j as the only item
+
+ def _change_export(self, cluster_id: str, export: Dict) -> Dict[str, str]:
+ try:
+ return self._apply_export(cluster_id, export)
except NotImplementedError:
- return 0, " Manual Restart of NFS PODS required for successful update of exports", ""
- except Exception as e:
- return exception_handler(e, f'Failed to update export: {e}')
+ # in theory, the NotImplementedError here may be raised by a hook back to
+ # an orchestration module. If the orchestration module supports it the NFS
+ # servers may be restarted. If not supported the expectation is that an
+ # (unfortunately generic) NotImplementedError will be raised. We then
+ # indicate to the user that manual intervention may be needed now that the
+ # configuration changes have been applied.
+ return {
+ "pseudo": export['pseudo'],
+ "state": "warning",
+ "msg": "changes applied (Manual restart of NFS Pods required)",
+ }
+ except Exception as ex:
+ msg = f'Failed to apply export: {ex}'
+ log.exception(msg)
+ return {"state": "error", "msg": msg}
def _update_user_id(
self,
squash: str,
access_type: str,
clients: list = [],
- sectype: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ sectype: Optional[List[str]] = None) -> Dict[str, Any]:
+
+ try:
+ cephfs_path_is_dir(self.mgr, fs_name, path)
+ except NotADirectoryError:
+ raise NFSException(f"path {path} is not a dir", -errno.ENOTDIR)
+ except cephfs.ObjectNotFound:
+ raise NFSObjectNotFound(f"path {path} does not exist")
+ except cephfs.Error as e:
+ raise NFSException(e.args[1], -e.args[0])
+
pseudo_path = normalize_path(pseudo_path)
if not self._fetch_export(cluster_id, pseudo_path):
"cluster": cluster_id,
"mode": export.access_type,
}
- return (0, json.dumps(result, indent=4), '')
- return 0, "", "Export already exists"
+ return result
+ raise NonFatalError("Export already exists")
def create_rgw_export(self,
cluster_id: str,
bucket: Optional[str] = None,
user_id: Optional[str] = None,
clients: list = [],
- sectype: Optional[List[str]] = None) -> Tuple[int, str, str]:
+ sectype: Optional[List[str]] = None) -> Dict[str, Any]:
pseudo_path = normalize_path(pseudo_path)
if not bucket and not user_id:
- return -errno.EINVAL, "", "Must specify either bucket or user_id"
+ raise ErrorResponse("Must specify either bucket or user_id")
if not self._fetch_export(cluster_id, pseudo_path):
export = self.create_export_from_dict(
"mode": export.access_type,
"squash": export.squash,
}
- return (0, json.dumps(result, indent=4), '')
- return 0, "", "Export already exists"
+ return result
+ raise NonFatalError("Export already exists")
def _apply_export(
self,
cluster_id: str,
new_export_dict: Dict,
- ) -> Tuple[int, str, str]:
+ ) -> Dict[str, str]:
for k in ['path', 'pseudo']:
if k not in new_export_dict:
raise NFSInvalidOperation(f'Export missing required field {k}')
if not old_export:
self._create_export_user(new_export)
self._save_export(cluster_id, new_export)
- return 0, f'Added export {new_export.pseudo}', ''
+ return {"pseudo": new_export.pseudo, "state": "added"}
need_nfs_service_restart = True
if old_export.fsal.name != new_export.fsal.name:
self._update_export(cluster_id, new_export, need_nfs_service_restart)
- return 0, f"Updated export {new_export.pseudo}", ""
+ return {"pseudo": new_export.pseudo, "state": "updated"}
def _rados(self, cluster_id: str) -> NFSRados:
"""Return a new NFSRados object for the given cluster id."""