import errno
import json
import logging
-from typing import List, Any, Dict, Tuple, Optional, TYPE_CHECKING, TypeVar, Callable, cast
+from typing import (
+ List,
+ Any,
+ Dict,
+ Tuple,
+ Optional,
+ TYPE_CHECKING,
+ TypeVar,
+ Callable,
+ Set,
+ cast)
from os.path import normpath
-from rados import TimedOut, ObjectNotFound
+from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
+from orchestrator import NoOrchestrator
from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
from .export_utils import GaneshaConfParser, Export, RawBlock, CephFSFSAL, RGWFSAL
-from .exception import NFSException, NFSInvalidOperation, FSNotFound, \
- ClusterNotFound
-from .utils import available_clusters, check_fs, restart_nfs_service
+from .exception import NFSException, NFSInvalidOperation, FSNotFound
+from .utils import (
+ CONF_PREFIX,
+ EXPORT_PREFIX,
+ USER_CONF_PREFIX,
+ export_obj_name,
+ conf_obj_name,
+ available_clusters,
+ check_fs,
+ restart_nfs_service)
if TYPE_CHECKING:
from nfs.module import Module
log = logging.getLogger(__name__)
+def known_cluster_ids(mgr: 'Module') -> Set[str]:
+ """Return the set of known cluster IDs."""
+ try:
+ clusters = set(available_clusters(mgr))
+ except NoOrchestrator:
+ clusters = nfs_rados_configs(mgr.rados)
+ return clusters
+
+
def export_cluster_checker(func: FuncT) -> FuncT:
def cluster_check(
export: 'ExportMgr',
"""
This method checks if cluster exists
"""
- if kwargs['cluster_id'] not in available_clusters(export.mgr):
- return -errno.ENOENT, "", "Cluster does not exists"
+ clusters = known_cluster_ids(export.mgr)
+ cluster_id: str = kwargs['cluster_id']
+ log.debug("checking for %r in known nfs clusters: %r",
+ cluster_id, clusters)
+ if cluster_id not in clusters:
+ return -errno.ENOENT, "", "Cluster does not exist"
return func(export, *args, **kwargs)
return cast(FuncT, cluster_check)
return getattr(exception_obj, 'errno', -1), "", str(exception_obj)
+def _check_rados_notify(ioctx: Any, obj: str) -> None:
+ try:
+ ioctx.notify(obj)
+ except TimedOut:
+ log.exception("Ganesha timed out")
+
+
+def normalize_path(path: str) -> str:
+ if path:
+ path = normpath(path.strip())
+ if path[:2] == "//":
+ path = path[1:]
+ return path
+
+
class NFSRados:
- def __init__(self, mgr: 'Module', namespace: str) -> None:
- self.mgr = mgr
+ def __init__(self, rados: 'Rados', namespace: str) -> None:
+ self.rados = rados
self.pool = POOL_NAME
self.namespace = namespace
return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
- with self.mgr.rados.open_ioctx(self.pool) as ioctx:
+ with self.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
ioctx.write_full(obj, conf_block.encode('utf-8'))
if not config_obj:
# Add created obj url to common config obj
ioctx.append(config_obj, GaneshaConfParser.write_block(
self._create_url_block(obj)).encode('utf-8'))
- ExportMgr._check_rados_notify(ioctx, config_obj)
+ _check_rados_notify(ioctx, config_obj)
log.debug("Added %s url to %s", obj, config_obj)
def read_obj(self, obj: str) -> Optional[str]:
- with self.mgr.rados.open_ioctx(self.pool) as ioctx:
+ with self.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
try:
return ioctx.read(obj, 1048576).decode()
return None
def update_obj(self, conf_block: str, obj: str, config_obj: str) -> None:
- with self.mgr.rados.open_ioctx(self.pool) as ioctx:
+ with self.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
ioctx.write_full(obj, conf_block.encode('utf-8'))
log.debug("write configuration into rados object %s/%s/%s",
self.pool, self.namespace, obj)
- ExportMgr._check_rados_notify(ioctx, config_obj)
+ _check_rados_notify(ioctx, config_obj)
log.debug("Update export %s in %s", obj, config_obj)
def remove_obj(self, obj: str, config_obj: str) -> None:
- with self.mgr.rados.open_ioctx(self.pool) as ioctx:
+ with self.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
export_urls = ioctx.read(config_obj)
url = '%url "{}"\n\n'.format(self._make_rados_url(obj))
export_urls = export_urls.replace(url.encode('utf-8'), b'')
ioctx.remove_object(obj)
ioctx.write_full(config_obj, export_urls)
- ExportMgr._check_rados_notify(ioctx, config_obj)
+ _check_rados_notify(ioctx, config_obj)
log.debug("Object deleted: %s", url)
def remove_all_obj(self) -> None:
- with self.mgr.rados.open_ioctx(self.pool) as ioctx:
+ with self.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
for obj in ioctx.list_objects():
obj.remove()
def check_user_config(self) -> bool:
- with self.mgr.rados.open_ioctx(self.pool) as ioctx:
+ with self.rados.open_ioctx(self.pool) as ioctx:
ioctx.set_namespace(self.namespace)
for obj in ioctx.list_objects():
- if obj.key.startswith("userconf-nfs"):
+ if obj.key.startswith(USER_CONF_PREFIX):
return True
return False
+def nfs_rados_configs(rados: 'Rados', nfs_pool: str = POOL_NAME) -> Set[str]:
+ """Return a set of all the namespaces in the nfs_pool where nfs
+ configuration objects are found. The namespaces also correspond
+ to the cluster ids.
+ """
+ ns: Set[str] = set()
+ prefixes = (EXPORT_PREFIX, CONF_PREFIX, USER_CONF_PREFIX)
+ with rados.open_ioctx(nfs_pool) as ioctx:
+ ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
+ for obj in ioctx.list_objects():
+ if obj.key.startswith(prefixes):
+ ns.add(obj.nspace)
+ return ns
+
+
class ExportMgr:
def __init__(
self,
self.rados_pool = POOL_NAME
self._exports: Optional[Dict[str, List[Export]]] = export_ls
- @staticmethod
- def _check_rados_notify(ioctx: Any, obj: str) -> None:
- try:
- ioctx.notify(obj)
- except TimedOut:
- log.exception("Ganesha timed out")
-
@property
def exports(self) -> Dict[str, List[Export]]:
if self._exports is None:
self._exports = {}
log.info("Begin export parsing")
- for cluster_id in available_clusters(self.mgr):
+ for cluster_id in known_cluster_ids(self.mgr):
self.export_conf_objs = [] # type: List[Export]
self._read_raw_config(cluster_id)
- self.exports[cluster_id] = self.export_conf_objs
+ self._exports[cluster_id] = self.export_conf_objs
log.info("Exports parsed successfully %s", self.exports.items())
return self._exports
with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
ioctx.set_namespace(rados_namespace)
for obj in ioctx.list_objects():
- if obj.key.startswith("export-"):
+ if obj.key.startswith(EXPORT_PREFIX):
size, _ = obj.stat()
raw_config = obj.read(size)
raw_config = raw_config.decode("utf-8")
def _save_export(self, cluster_id: str, export: Export) -> None:
self.exports[cluster_id].append(export)
- NFSRados(self.mgr, cluster_id).write_obj(
+ self._rados(cluster_id).write_obj(
GaneshaConfParser.write_block(export.to_export_block()),
- f'export-{export.export_id}',
- f'conf-nfs.{export.cluster_id}'
+ export_obj_name(export.export_id),
+ conf_obj_name(export.cluster_id)
)
def _delete_export(
if export:
if pseudo_path:
- NFSRados(self.mgr, cluster_id).remove_obj(
- f'export-{export.export_id}', f'conf-nfs.{cluster_id}')
+ self._rados(cluster_id).remove_obj(
+ export_obj_name(export.export_id), conf_obj_name(cluster_id))
self.exports[cluster_id].remove(export)
self._delete_export_user(export)
if not self.exports[cluster_id]:
ioctx.set_namespace(cluster_id)
export = Export.from_export_block(
GaneshaConfParser(
- ioctx.read(f"export-{ex_id}").decode("utf-8")
+ ioctx.read(export_obj_name(ex_id)).decode("utf-8")
).parse()[0],
cluster_id
)
def _update_export(self, cluster_id: str, export: Export) -> None:
self.exports[cluster_id].append(export)
- NFSRados(self.mgr, cluster_id).update_obj(
+ self._rados(cluster_id).update_obj(
GaneshaConfParser.write_block(export.to_export_block()),
- f'export-{export.export_id}', f'conf-nfs.{export.cluster_id}')
-
- def format_path(self, path: str) -> str:
- if path:
- path = normpath(path.strip())
- if path[:2] == "//":
- path = path[1:]
- return path
+ export_obj_name(export.export_id), conf_obj_name(export.cluster_id))
@export_cluster_checker
def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Tuple[int, str, str]:
path = ex_dict.get("path")
if path is None:
raise NFSInvalidOperation("export must specify path")
- path = self.format_path(path)
+ path = normalize_path(path)
fsal = ex_dict.get("fsal", {})
fsal_type = fsal.get("name")
squash: str,
access_type: str,
clients: list = []) -> Tuple[int, str, str]:
- pseudo_path = self.format_path(pseudo_path)
+ pseudo_path = normalize_path(pseudo_path)
if not self._fetch_export(cluster_id, pseudo_path):
export = self.create_export_from_dict(
bucket: Optional[str] = None,
user_id: Optional[str] = None,
clients: list = []) -> Tuple[int, str, str]:
- pseudo_path = self.format_path(pseudo_path)
+ pseudo_path = normalize_path(pseudo_path)
if not bucket and not user_id:
return -errno.EINVAL, "", "Must specify either bucket or user_id"
for k in ['path', 'pseudo']:
if k not in new_export_dict:
raise NFSInvalidOperation(f'Export missing required field {k}')
- if cluster_id not in available_clusters(self.mgr):
- raise ClusterNotFound()
if cluster_id not in self.exports:
self.exports[cluster_id] = []
- new_export_dict['path'] = self.format_path(new_export_dict['path'])
- new_export_dict['pseudo'] = self.format_path(new_export_dict['pseudo'])
+ new_export_dict['path'] = normalize_path(new_export_dict['path'])
+ new_export_dict['pseudo'] = normalize_path(new_export_dict['pseudo'])
old_export = self._fetch_export(cluster_id, new_export_dict['pseudo'])
if old_export:
restart_nfs_service(self.mgr, new_export.cluster_id)
return 0, f"Updated export {new_export.pseudo}", ""
+
+ def _rados(self, cluster_id: str) -> NFSRados:
+ """Return a new NFSRados object for the given cluster id."""
+ return NFSRados(self.mgr.rados, cluster_id)