from __future__ import absolute_import
import logging
+import os
import re
+from typing import Any, Dict, List, Optional, cast
-from orchestrator import OrchestratorError
+from ceph.deployment.service_spec import NFSServiceSpec
+from orchestrator import DaemonDescription, OrchestratorError, ServiceDescription
+
+from .. import mgr
+from ..exceptions import DashboardException
+from ..settings import Settings
from .cephfs import CephFS
from .cephx import CephX
from .orchestrator import OrchClient
from .rgw_client import RgwClient, RequestException, NoCredentialsException
-from .. import mgr
-from ..settings import Settings
-from ..exceptions import DashboardException
logger = logging.getLogger('ganesha')
class Ganesha(object):
@classmethod
def _get_clusters_locations(cls):
- result = {} # type: ignore
+ # pylint: disable=too-many-branches
+ # Get Orchestrator clusters
+ orch_result = cls._get_orch_clusters_locations()
+
+ # Get user-defined clusters
location_list_str = Settings.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE
- if not location_list_str:
- raise NFSException("Ganesha config location is not configured. "
+ if not orch_result and not location_list_str:
+ raise NFSException("NFS-Ganesha cluster is not detected. "
"Please set the GANESHA_RADOS_POOL_NAMESPACE "
- "setting.")
- location_list = [l.strip() for l in location_list_str.split(",")]
+ "setting or deploy an NFS-Ganesha cluster with the Orchestrator.")
+ result = {} # type: ignore
+ location_list = [loc.strip() for loc in location_list_str.split(
+ ",")] if location_list_str else []
for location in location_list:
cluster = None
pool = None
else:
pool, namespace = pool_nm.split('/', 1)
+ if cluster in orch_result:
+ # cephadm might have set same cluster settings, ask the user to remove it.
+ raise NFSException(
+ 'Detected a conflicting NFS-Ganesha cluster name `{0}`. There exists an '
+ 'NFS-Ganesha cluster called `{0}` that is deployed by the Orchestrator. '
+ 'Please remove or rename the cluster from the GANESHA_RADOS_POOL_NAMESPACE '
+ 'setting.'.format(cluster))
+
if cluster in result:
raise NFSException("Duplicate Ganesha cluster definition in "
"the setting: {}".format(location_list_str))
- result[cluster] = (pool, namespace)
+ result[cluster] = {
+ 'pool': pool,
+ 'namespace': namespace,
+ 'type': ClusterType.USER,
+ 'daemon_conf': None
+ }
+ return {**orch_result, **result}
- return result
+ @classmethod
+ def _get_orch_clusters_locations(cls):
+ orch_result = {} # type: ignore
+ services = cls._get_orch_nfs_services()
+ for service in services:
+ spec = cast(NFSServiceSpec, service.spec)
+ try:
+ orch_result[spec.service_id] = {
+ 'pool': spec.pool,
+ 'namespace': spec.namespace,
+ 'type': ClusterType.ORCHESTRATOR,
+ 'daemon_conf': spec.rados_config_name()
+ }
+ except AttributeError as ex:
+ logger.warning('Error when getting NFS service from the Orchestrator. %s', str(ex))
+ continue
+ return orch_result
@classmethod
def get_ganesha_clusters(cls):
return [cluster_id for cluster_id in cls._get_clusters_locations()]
@staticmethod
- def _get_orch_nfs_instances():
+ def _get_orch_nfs_services() -> List[ServiceDescription]:
try:
- return OrchClient.instance().services.list("nfs")
+ return OrchClient.instance().services.list('nfs')
except (RuntimeError, OrchestratorError, ImportError):
return []
- @classmethod
- def get_daemons_status(cls):
- instances = cls._get_orch_nfs_instances()
- if not instances:
- return None
-
- result = {} # type: ignore
- for instance in instances:
- if instance.service is None:
- instance.service = "_default_"
- if instance.service not in result:
- result[instance.service] = {}
- result[instance.service][instance.hostname] = {
- 'status': instance.status,
- 'desc': instance.status_desc,
- }
- return result
-
@classmethod
def parse_rados_url(cls, rados_url):
if not rados_url.startswith("rados://"):
return "rados://{}/{}".format(pool, obj)
@classmethod
- def get_pool_and_namespace(cls, cluster_id):
- instances = cls._get_orch_nfs_instances()
- # we assume that every instance stores there configuration in the
- # same RADOS pool/namespace
- if instances:
- location = instances[0].rados_config_location
- pool, ns, _ = cls.parse_rados_url(location)
- return pool, ns
+ def get_cluster(cls, cluster_id):
locations = cls._get_clusters_locations()
if cluster_id not in locations:
raise NFSException("Cluster not found: cluster_id={}"
.format(cluster_id))
return locations[cluster_id]
- @classmethod
- def reload_daemons(cls, cluster_id, daemons_id):
- logger.debug("issued reload of daemons: %s", daemons_id)
- if not OrchClient.instance().available():
- logger.debug("orchestrator not available")
- return
- reload_list = []
- daemons = cls.get_daemons_status()
- if cluster_id not in daemons:
- raise NFSException("Cluster not found: cluster_id={}"
- .format(cluster_id))
- for daemon_id in daemons_id:
- if daemon_id not in daemons[cluster_id]:
- continue
- if daemons[cluster_id][daemon_id] == 1:
- reload_list.append((cluster_id, daemon_id))
- OrchClient.instance().reload_service("nfs", reload_list)
-
@classmethod
def fsals_available(cls):
result = []
def create_path(self, path):
cfs = CephFS(self.fs_name)
+ if path == os.sep:
+ return
cfs.mk_dirs(path)
@classmethod
self.transports = set(transports)
self.clients = clients
- def validate(self, daemons_list):
+ def validate(self):
# pylint: disable=R0912
- for daemon_id in self.daemons:
- if daemon_id not in daemons_list:
- raise NFSException("Daemon '{}' does not exist"
- .format(daemon_id))
-
if not self.fsal.validate_path(self.path):
raise NFSException("Export path ({}) is invalid.".format(self.path))
}
+class ClusterType(object):
+
+ # Ganesha clusters deployed by the Orchestrator.
+ ORCHESTRATOR = 'orchestrator'
+
+ # Ganesha clusters deployed manually by the user. Specified by using the
+ # GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE setting.
+ USER = 'user'
+
+
class GaneshaConf(object):
# pylint: disable=R0902
- def __init__(self, cluster_id, rados_pool, rados_namespace):
+ def __init__(self, cluster_id, rados_pool, rados_namespace, daemon_confs=None):
self.cluster_id = cluster_id
self.rados_pool = rados_pool
self.rados_namespace = rados_namespace
+ self.daemon_confs = daemon_confs if daemon_confs is not None else []
self.export_conf_blocks = [] # type: ignore
self.daemons_conf_blocks = {} # type: ignore
self._defaults = {}
self.exports[export.export_id] = export
# link daemons to exports
- for daemon_id, daemon_blocks in self.daemons_conf_blocks.items():
- for block in daemon_blocks:
- if block['block_name'] == "%url":
- rados_url = block['value']
- _, _, obj = Ganesha.parse_rados_url(rados_url)
- if obj.startswith("export-"):
- export_id = int(obj[obj.find('-')+1:])
- self.exports[export_id].daemons.add(daemon_id)
+ self._link_daemons_to_exports()
+
+ def _link_daemons_to_exports(self):
+ raise NotImplementedError()
@classmethod
def instance(cls, cluster_id):
- pool, ns = Ganesha.get_pool_and_namespace(cluster_id)
- return cls(cluster_id, pool, ns)
+ cluster = Ganesha.get_cluster(cluster_id)
+ if cluster['type'] == ClusterType.ORCHESTRATOR:
+ return GaneshaConfOrchestrator(cluster_id, cluster['pool'], cluster['namespace'],
+ [cluster['daemon_conf']])
+ if cluster['type'] == ClusterType.USER:
+ return GaneshaConfUser(cluster_id, cluster['pool'], cluster['namespace'])
+ raise NFSException('Unknown cluster type `{}` for cluster `{}`'.format(
+ cluster['type'], cluster_id))
def _read_raw_config(self):
+
+ def _read_rados_obj(_obj):
+ size, _ = _obj.stat()
+ return _obj.read(size).decode("utf-8")
+
with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
if self.rados_namespace:
ioctx.set_namespace(self.rados_namespace)
objs = ioctx.list_objects()
for obj in objs:
if obj.key.startswith("export-"):
- size, _ = obj.stat()
- raw_config = obj.read(size)
- raw_config = raw_config.decode("utf-8")
+ raw_config = _read_rados_obj(obj)
logger.debug("read export configuration from rados "
"object %s/%s/%s:\n%s", self.rados_pool,
self.rados_namespace, obj.key, raw_config)
self.export_conf_blocks.extend(
GaneshaConfParser(raw_config).parse())
- elif obj.key.startswith("conf-"):
- size, _ = obj.stat()
- raw_config = obj.read(size)
- raw_config = raw_config.decode("utf-8")
+ elif not self.daemon_confs and obj.key.startswith("conf-"):
+ # Read all `conf-xxx` for daemon configs.
+ raw_config = _read_rados_obj(obj)
logger.debug("read daemon configuration from rados "
"object %s/%s/%s:\n%s", self.rados_pool,
self.rados_namespace, obj.key, raw_config)
self.daemons_conf_blocks[obj.key[idx+1:]] = \
GaneshaConfParser(raw_config).parse()
+ if self.daemon_confs:
+ # When daemon configs are provided.
+ for conf in self.daemon_confs:
+ size, _ = ioctx.stat(conf)
+ raw_config = ioctx.read(conf, size).decode("utf-8")
+ logger.debug("read daemon configuration from rados "
+ "object %s/%s/%s:\n%s", self.rados_pool,
+ self.rados_namespace, conf, raw_config)
+ self.daemons_conf_blocks[conf] = \
+ GaneshaConfParser(raw_config).parse()
+
def _write_raw_config(self, conf_block, obj):
raw_config = GaneshaConfParser.write_conf(conf_block)
with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
path = path[:-1]
return path
- def validate(self, export):
- export.validate(self.list_daemons())
+ def validate(self, export: Export):
+ export.validate()
if 4 in export.protocols: # NFSv4 protocol
len_prefix = 1
return nid
def _persist_daemon_configuration(self):
- daemon_map = {} # type: ignore
- for daemon_id in self.list_daemons():
- daemon_map[daemon_id] = []
-
- for _, ex in self.exports.items():
- for daemon in ex.daemons:
- daemon_map[daemon].append({
- 'block_name': "%url",
- 'value': Ganesha.make_rados_url(
- self.rados_pool, self.rados_namespace,
- "export-{}".format(ex.export_id))
- })
- for daemon_id, conf_blocks in daemon_map.items():
- self._write_raw_config(conf_blocks, "conf-{}".format(daemon_id))
+ raise NotImplementedError()
def _save_export(self, export):
self.validate(export)
return self.exports[export_id]
return None
- def list_daemons(self):
- return [daemon_id for daemon_id in self.daemons_conf_blocks]
+ def list_daemons(self) -> List[Dict[str, Any]]:
+ raise NotImplementedError()
+
+ def list_daemon_confs(self):
+ return self.daemons_conf_blocks.keys()
def reload_daemons(self, daemons):
with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
ioctx.set_namespace(self.rados_namespace)
for daemon_id in daemons:
ioctx.notify("conf-{}".format(daemon_id))
+
+
+class GaneshaConfOrchestrator(GaneshaConf):
+ @classmethod
+ def _get_orch_nfs_instances(cls,
+ service_name: Optional[str] = None) -> List[DaemonDescription]:
+ try:
+ return OrchClient.instance().services.\
+ list_daemons(service_name=service_name, daemon_type="nfs")
+ except (RuntimeError, OrchestratorError, ImportError):
+ return []
+
+ def _link_daemons_to_exports(self):
+ instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
+ daemon_ids = {instance.daemon_id for instance in instances}
+ for _, daemon_blocks in self.daemons_conf_blocks.items():
+ for block in daemon_blocks:
+ if block['block_name'] == "%url":
+ rados_url = block['value']
+ _, _, obj = Ganesha.parse_rados_url(rados_url)
+ if obj.startswith("export-"):
+ export_id = int(obj[obj.find('-')+1:])
+ self.exports[export_id].daemons.update(daemon_ids)
+
+ def validate(self, export: Export):
+ daemons_list = {d['daemon_id'] for d in self.list_daemons()}
+ if export.daemons and set(export.daemons) != daemons_list:
+ raise NFSException('Export should be linked to all daemons.')
+ super().validate(export)
+
+ def _persist_daemon_configuration(self):
+ daemon_map = {} # type: ignore
+ for daemon_id in self.list_daemon_confs():
+ daemon_map[daemon_id] = []
+
+ for daemon_id in self.list_daemon_confs():
+ for _, ex in self.exports.items():
+ if ex.daemons:
+ daemon_map[daemon_id].append({
+ 'block_name': "%url",
+ 'value': Ganesha.make_rados_url(
+ self.rados_pool, self.rados_namespace,
+ "export-{}".format(ex.export_id))
+ })
+ for daemon_id, conf_blocks in daemon_map.items():
+ self._write_raw_config(conf_blocks, daemon_id)
+
+ def list_daemons(self) -> List[Dict[str, Any]]:
+ instances = self._get_orch_nfs_instances('nfs.{}'.format(self.cluster_id))
+ return [{
+ 'cluster_id': self.cluster_id,
+ 'daemon_id': instance.daemon_id,
+ 'cluster_type': ClusterType.ORCHESTRATOR,
+ 'status': instance.status,
+ 'status_desc': instance.status_desc
+ } for instance in instances]
+
+ def reload_daemons(self, daemons):
+ with mgr.rados.open_ioctx(self.rados_pool) as ioctx:
+ if self.rados_namespace:
+ ioctx.set_namespace(self.rados_namespace)
+ for daemon_id in self.list_daemon_confs():
+ ioctx.notify(daemon_id)
+
+
+class GaneshaConfUser(GaneshaConf):
+
+ def _link_daemons_to_exports(self):
+ for daemon_id, daemon_blocks in self.daemons_conf_blocks.items():
+ for block in daemon_blocks:
+ if block['block_name'] == "%url":
+ rados_url = block['value']
+ _, _, obj = Ganesha.parse_rados_url(rados_url)
+ if obj.startswith("export-"):
+ export_id = int(obj[obj.find('-')+1:])
+ self.exports[export_id].daemons.add(daemon_id)
+
+ def validate(self, export: Export):
+ daemons_list = [d['daemon_id'] for d in self.list_daemons()]
+ for daemon_id in export.daemons:
+ if daemon_id not in daemons_list:
+ raise NFSException("Daemon '{}' does not exist".format(daemon_id))
+ super().validate(export)
+
+ def _persist_daemon_configuration(self):
+ daemon_map = {} # type: ignore
+ for daemon_id in self.list_daemon_confs():
+ daemon_map[daemon_id] = []
+
+ for _, ex in self.exports.items():
+ for daemon in ex.daemons:
+ daemon_map[daemon].append({
+ 'block_name': "%url",
+ 'value': Ganesha.make_rados_url(
+ self.rados_pool, self.rados_namespace,
+ "export-{}".format(ex.export_id))
+ })
+ for daemon_id, conf_blocks in daemon_map.items():
+ self._write_raw_config(conf_blocks, "conf-{}".format(daemon_id))
+
+ def list_daemons(self) -> List[Dict[str, Any]]:
+ return [{
+ 'cluster_id': self.cluster_id,
+ 'cluster_type': ClusterType.USER,
+ 'daemon_id': daemon_id,
+ 'status': 1,
+ 'status_desc': 'running'
+ } for daemon_id in self.list_daemon_confs()]