1 # -*- coding: utf-8 -*-
2 # pylint: disable=too-many-lines
3 from __future__
import absolute_import
8 from typing
import Any
, Dict
, List
, Optional
, cast
10 from ceph
.deployment
.service_spec
import NFSServiceSpec
11 from orchestrator
import DaemonDescription
, OrchestratorError
, ServiceDescription
14 from ..exceptions
import DashboardException
15 from ..settings
import Settings
16 from .cephfs
import CephFS
17 from .cephx
import CephX
18 from .orchestrator
import OrchClient
19 from .rgw_client
import NoCredentialsException
, NoRgwDaemonsException
, RequestException
, RgwClient
21 logger
= logging
.getLogger('ganesha')
24 class NFSException(DashboardException
):
25 def __init__(self
, msg
):
26 super(NFSException
, self
).__init
__(component
="nfs", msg
=msg
)
29 class Ganesha(object):
31 def _get_clusters_locations(cls
):
32 # pylint: disable=too-many-branches
33 # Get Orchestrator clusters
34 orch_result
= cls
._get
_orch
_clusters
_locations
()
36 # Get user-defined clusters
37 location_list_str
= Settings
.GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE
38 if not orch_result
and not location_list_str
:
39 raise NFSException("NFS-Ganesha cluster is not detected. "
40 "Please set the GANESHA_RADOS_POOL_NAMESPACE "
41 "setting or deploy an NFS-Ganesha cluster with the Orchestrator.")
42 result
= {} # type: ignore
43 location_list
= [loc
.strip() for loc
in location_list_str
.split(
44 ",")] if location_list_str
else []
45 for location
in location_list
:
47 raise NFSException("Invalid Ganesha cluster RADOS "
48 "[cluster_id:]pool/namespace setting: {}"
50 if location
.count(':') < 1:
52 if location
.count('/') > 1:
53 raise NFSException("Invalid Ganesha RADOS pool/namespace "
54 "setting: {}".format(location
))
55 # in this case accept pool/namespace only
57 if location
.count('/') == 0:
58 pool
, namespace
= location
, None
60 pool
, namespace
= location
.split('/', 1)
62 cluster
= location
[:location
.find(':')]
63 pool_nm
= location
[location
.find(':')+1:]
64 if pool_nm
.count('/') == 0:
65 pool
, namespace
= pool_nm
, None
67 pool
, namespace
= pool_nm
.split('/', 1)
69 # Check pool/namespace collision.
70 for clusters
in [orch_result
, result
]:
71 for cluster_name
, cluster_data
in clusters
.items():
72 if cluster_data
['pool'] == pool
and cluster_data
['namespace'] == namespace
:
74 f
'Pool `{pool}` and namespace `{namespace}` are already in use by '
75 f
"""NFS-Ganesha cluster called `{cluster_name}`{" that is deployed by "
76 "the Orchestrator" if cluster_data['type'] == ClusterType.ORCHESTRATOR
78 'Please update GANESHA_RADOS_POOL_NAMESPACE setting.'
81 if cluster
in orch_result
:
82 # cephadm might have set same cluster settings, ask the user to remove it.
84 'Detected a conflicting NFS-Ganesha cluster name `{0}`. There exists an '
85 'NFS-Ganesha cluster called `{0}` that is deployed by the Orchestrator. '
86 'Please remove or rename the cluster from the GANESHA_RADOS_POOL_NAMESPACE '
87 'setting.'.format(cluster
))
90 raise NFSException("Duplicate Ganesha cluster definition in "
91 "the setting: {}".format(location_list_str
))
94 'namespace': namespace
,
95 'type': ClusterType
.USER
,
98 return {**orch_result
, **result
}
101 def _get_orch_clusters_locations(cls
):
102 orch_result
= {} # type: ignore
103 services
= cls
._get
_orch
_nfs
_services
()
104 for service
in services
:
105 spec
= cast(NFSServiceSpec
, service
.spec
)
107 orch_result
[spec
.service_id
] = {
109 'namespace': spec
.namespace
,
110 'type': ClusterType
.ORCHESTRATOR
,
111 'daemon_conf': spec
.rados_config_name()
113 except AttributeError as ex
:
114 logger
.warning('Error when getting NFS service from the Orchestrator. %s', str(ex
))
119 def get_ganesha_clusters(cls
):
120 return list(cls
._get
_clusters
_locations
())
123 def _get_orch_nfs_services() -> List
[ServiceDescription
]:
125 return OrchClient
.instance().services
.list('nfs')
126 except (RuntimeError, OrchestratorError
, ImportError):
130 def parse_rados_url(cls
, rados_url
):
131 if not rados_url
.startswith("rados://"):
132 raise NFSException("Invalid NFS Ganesha RADOS configuration URL: {}"
134 rados_url
= rados_url
[8:]
135 url_comps
= rados_url
.split("/")
136 if len(url_comps
) < 2 or len(url_comps
) > 3:
137 raise NFSException("Invalid NFS Ganesha RADOS configuration URL: "
138 "rados://{}".format(rados_url
))
139 if len(url_comps
) == 2:
140 return url_comps
[0], None, url_comps
[1]
144 def make_rados_url(cls
, pool
, namespace
, obj
):
146 return "rados://{}/{}/{}".format(pool
, namespace
, obj
)
147 return "rados://{}/{}".format(pool
, obj
)
150 def get_cluster(cls
, cluster_id
):
151 locations
= cls
._get
_clusters
_locations
()
152 if cluster_id
not in locations
:
153 raise NFSException("Cluster not found: cluster_id={}"
155 return locations
[cluster_id
]
158 def fsals_available(cls
):
160 if CephFS
.list_filesystems():
161 result
.append("CEPH")
163 if RgwClient
.admin_instance().is_service_online() and \
164 RgwClient
.admin_instance().is_system_user():
166 except (DashboardException
, NoCredentialsException
, RequestException
,
167 NoRgwDaemonsException
):
172 class GaneshaConfParser(object):
173 def __init__(self
, raw_config
):
176 self
.clean_config(raw_config
)
178 def clean_config(self
, raw_config
):
179 for line
in raw_config
.split("\n"):
180 cardinal_idx
= line
.find('#')
181 if cardinal_idx
== -1:
185 self
.text
+= line
[:cardinal_idx
]
186 if line
.startswith("%"):
189 def remove_all_whitespaces(self
):
193 for i
, cha
in enumerate(self
.text
):
195 if cha
!= '"' and self
.text
[i
-1] != '\\':
200 elif i
== (len(self
.text
)-1):
201 if cha
!= '"' and self
.text
[i
-1] != '\\':
204 elif not in_section
and (i
== 0 or self
.text
[i
-1] == '\n') and cha
== '%':
207 elif in_string
or cha
not in [' ', '\n', '\t']:
209 elif cha
== '"' and self
.text
[i
-1] != '\\':
210 in_string
= not in_string
214 return self
.text
[self
.pos
:]
216 def parse_block_name(self
):
217 idx
= self
.stream().find('{')
219 raise Exception("Cannot find block name")
220 block_name
= self
.stream()[:idx
]
224 def parse_block_or_section(self
):
225 if self
.stream().startswith("%url"):
227 self
.pos
+= self
.stream().find('rados://')
228 idx
= self
.stream().find('\n')
230 value
= self
.stream()
231 self
.pos
+= len(self
.stream())
233 value
= self
.stream()[:idx
]
235 block_dict
= {'block_name': '%url', 'value': value
}
238 block_name
= self
.parse_block_name().upper()
239 block_dict
= {'block_name': block_name
}
240 self
.parse_block_body(block_dict
)
241 if self
.stream()[0] != '}':
242 raise Exception("No closing bracket '}' found at the end of block")
246 def parse_parameter_value(self
, raw_value
):
247 colon_idx
= raw_value
.find(',')
251 return int(raw_value
)
253 if raw_value
== "true":
255 if raw_value
== "false":
257 if raw_value
.find('"') == 0:
258 return raw_value
[1:-1]
261 return [self
.parse_parameter_value(v
.strip())
262 for v
in raw_value
.split(',')]
264 def parse_stanza(self
, block_dict
):
265 equal_idx
= self
.stream().find('=')
266 semicolon_idx
= self
.stream().find(';')
268 raise Exception("Malformed stanza: no equal symbol found.")
269 parameter_name
= self
.stream()[:equal_idx
].lower()
270 parameter_value
= self
.stream()[equal_idx
+1:semicolon_idx
]
271 block_dict
[parameter_name
] = self
.parse_parameter_value(
273 self
.pos
+= semicolon_idx
+1
275 def parse_block_body(self
, block_dict
):
278 semicolon_idx
= self
.stream().find(';')
279 lbracket_idx
= self
.stream().find('{')
280 rbracket_idx
= self
.stream().find('}')
282 if rbracket_idx
== 0:
286 if (semicolon_idx
!= -1 and lbracket_idx
!= -1
287 and semicolon_idx
< lbracket_idx
) \
288 or (semicolon_idx
!= -1 and lbracket_idx
== -1):
289 self
.parse_stanza(block_dict
)
290 elif (semicolon_idx
!= -1 and lbracket_idx
!= -1
291 and semicolon_idx
> lbracket_idx
) or (
292 semicolon_idx
== -1 and lbracket_idx
!= -1):
293 if '_blocks_' not in block_dict
:
294 block_dict
['_blocks_'] = []
295 block_dict
['_blocks_'].append(self
.parse_block_or_section())
297 raise Exception("Malformed stanza: no semicolon found.")
299 if last_pos
== self
.pos
:
300 raise Exception("Infinite loop while parsing block content")
304 self
.remove_all_whitespaces()
307 block_dict
= self
.parse_block_or_section()
308 blocks
.append(block_dict
)
312 def _indentation(depth
, size
=4):
314 for _
in range(0, depth
*size
):
319 def write_block_body(block
, depth
=0):
320 def format_val(key
, val
):
321 if isinstance(val
, list):
322 return ', '.join([format_val(key
, v
) for v
in val
])
323 if isinstance(val
, bool):
324 return str(val
).lower()
325 if isinstance(val
, int) or (block
['block_name'] == 'CLIENT'
326 and key
== 'clients'):
327 return '{}'.format(val
)
328 return '"{}"'.format(val
)
331 for key
, val
in block
.items():
332 if key
== 'block_name':
334 if key
== '_blocks_':
336 conf_str
+= GaneshaConfParser
.write_block(blo
, depth
)
338 conf_str
+= GaneshaConfParser
._indentation
(depth
)
339 conf_str
+= '{} = {};\n'.format(key
, format_val(key
, val
))
343 def write_block(block
, depth
):
344 if block
['block_name'] == "%url":
345 return '%url "{}"\n\n'.format(block
['value'])
348 conf_str
+= GaneshaConfParser
._indentation
(depth
)
349 conf_str
+= format(block
['block_name'])
351 conf_str
+= GaneshaConfParser
.write_block_body(block
, depth
+1)
352 conf_str
+= GaneshaConfParser
._indentation
(depth
)
357 def write_conf(blocks
):
358 if not isinstance(blocks
, list):
362 conf_str
+= GaneshaConfParser
.write_block(block
, 0)
367 def __init__(self
, name
):
371 def validate_path(cls
, _
):
372 raise NotImplementedError()
375 raise NotImplementedError()
378 raise NotImplementedError()
380 def create_path(self
, path
):
381 raise NotImplementedError()
384 def from_fsal_block(fsal_block
):
385 if fsal_block
['name'] == "CEPH":
386 return CephFSFSal
.from_fsal_block(fsal_block
)
387 if fsal_block
['name'] == 'RGW':
388 return RGWFSal
.from_fsal_block(fsal_block
)
391 def to_fsal_block(self
):
392 raise NotImplementedError()
395 def from_dict(fsal_dict
):
396 if fsal_dict
['name'] == "CEPH":
397 return CephFSFSal
.from_dict(fsal_dict
)
398 if fsal_dict
['name'] == 'RGW':
399 return RGWFSal
.from_dict(fsal_dict
)
403 raise NotImplementedError()
407 def __init__(self
, name
, rgw_user_id
, access_key
, secret_key
):
408 super(RGWFSal
, self
).__init
__(name
)
409 self
.rgw_user_id
= rgw_user_id
410 self
.access_key
= access_key
411 self
.secret_key
= secret_key
414 def validate_path(cls
, path
):
415 return path
== "/" or re
.match(r
'^[^/><|&()#?]+$', path
)
418 if not self
.rgw_user_id
:
419 raise NFSException('RGW user must be specified')
421 if not RgwClient
.admin_instance().user_exists(self
.rgw_user_id
):
422 raise NFSException("RGW user '{}' does not exist"
423 .format(self
.rgw_user_id
))
426 keys
= RgwClient
.admin_instance().get_user_keys(self
.rgw_user_id
)
427 self
.access_key
= keys
['access_key']
428 self
.secret_key
= keys
['secret_key']
430 def create_path(self
, path
):
431 if path
== '/': # nothing to do
433 rgw
= RgwClient
.instance(self
.rgw_user_id
)
435 exists
= rgw
.bucket_exists(path
, self
.rgw_user_id
)
436 logger
.debug('Checking existence of RGW bucket "%s" for user "%s": %s',
437 path
, self
.rgw_user_id
, exists
)
438 except RequestException
as exp
:
439 if exp
.status_code
== 403:
440 raise NFSException('Cannot create bucket "{}" as it already '
441 'exists, and belongs to other user.'
445 logger
.info('Creating new RGW bucket "%s" for user "%s"', path
,
447 rgw
.create_bucket(path
)
450 def from_fsal_block(cls
, fsal_block
):
451 return cls(fsal_block
['name'],
452 fsal_block
['user_id'],
453 fsal_block
['access_key_id'],
454 fsal_block
['secret_access_key'])
456 def to_fsal_block(self
):
458 'block_name': 'FSAL',
460 'user_id': self
.rgw_user_id
,
461 'access_key_id': self
.access_key
,
462 'secret_access_key': self
.secret_key
466 def from_dict(cls
, fsal_dict
):
467 return cls(fsal_dict
['name'], fsal_dict
['rgw_user_id'], None, None)
472 'rgw_user_id': self
.rgw_user_id
476 class CephFSFSal(FSal
):
477 def __init__(self
, name
, user_id
=None, fs_name
=None, sec_label_xattr
=None,
479 super(CephFSFSal
, self
).__init
__(name
)
480 self
.fs_name
= fs_name
481 self
.user_id
= user_id
482 self
.sec_label_xattr
= sec_label_xattr
483 self
.cephx_key
= cephx_key
486 def validate_path(cls
, path
):
487 return re
.match(r
'^/[^><|&()?]*$', path
)
490 if self
.user_id
and self
.user_id
not in CephX
.list_clients():
491 raise NFSException("cephx user '{}' does not exist"
492 .format(self
.user_id
))
496 self
.cephx_key
= CephX
.get_client_key(self
.user_id
)
498 def create_path(self
, path
):
499 cfs
= CephFS(self
.fs_name
)
505 def from_fsal_block(cls
, fsal_block
):
506 return cls(fsal_block
['name'],
507 fsal_block
.get('user_id', None),
508 fsal_block
.get('filesystem', None),
509 fsal_block
.get('sec_label_xattr', None),
510 fsal_block
.get('secret_access_key', None))
512 def to_fsal_block(self
):
514 'block_name': 'FSAL',
518 result
['user_id'] = self
.user_id
520 result
['filesystem'] = self
.fs_name
521 if self
.sec_label_xattr
:
522 result
['sec_label_xattr'] = self
.sec_label_xattr
524 result
['secret_access_key'] = self
.cephx_key
528 def from_dict(cls
, fsal_dict
):
529 return cls(fsal_dict
['name'], fsal_dict
['user_id'],
530 fsal_dict
['fs_name'], fsal_dict
['sec_label_xattr'], None)
535 'user_id': self
.user_id
,
536 'fs_name': self
.fs_name
,
537 'sec_label_xattr': self
.sec_label_xattr
541 class Client(object):
542 def __init__(self
, addresses
, access_type
=None, squash
=None):
543 self
.addresses
= addresses
544 self
.access_type
= access_type
545 self
.squash
= GaneshaConf
.format_squash(squash
)
548 def from_client_block(cls
, client_block
):
549 addresses
= client_block
['clients']
550 if not isinstance(addresses
, list):
551 addresses
= [addresses
]
552 return cls(addresses
,
553 client_block
.get('access_type', None),
554 client_block
.get('squash', None))
556 def to_client_block(self
):
558 'block_name': 'CLIENT',
559 'clients': self
.addresses
,
562 result
['access_type'] = self
.access_type
564 result
['squash'] = self
.squash
568 def from_dict(cls
, client_dict
):
569 return cls(client_dict
['addresses'], client_dict
['access_type'],
570 client_dict
['squash'])
574 'addresses': self
.addresses
,
575 'access_type': self
.access_type
,
576 'squash': self
.squash
580 class Export(object):
581 # pylint: disable=R0902
582 def __init__(self
, export_id
, path
, fsal
, cluster_id
, daemons
, pseudo
=None,
583 tag
=None, access_type
=None, squash
=None,
584 attr_expiration_time
=None, security_label
=False,
585 protocols
=None, transports
=None, clients
=None):
586 self
.export_id
= export_id
587 self
.path
= GaneshaConf
.format_path(path
)
589 self
.cluster_id
= cluster_id
590 self
.daemons
= set(daemons
)
591 self
.pseudo
= GaneshaConf
.format_path(pseudo
)
593 self
.access_type
= access_type
594 self
.squash
= GaneshaConf
.format_squash(squash
)
595 if attr_expiration_time
is None:
596 self
.attr_expiration_time
= 0
598 self
.attr_expiration_time
= attr_expiration_time
599 self
.security_label
= security_label
600 self
.protocols
= {GaneshaConf
.format_protocol(p
) for p
in protocols
}
601 self
.transports
= set(transports
)
602 self
.clients
= clients
605 # pylint: disable=R0912
606 if not self
.fsal
.validate_path(self
.path
):
607 raise NFSException("Export path ({}) is invalid.".format(self
.path
))
609 if not self
.protocols
:
611 "No NFS protocol version specified for the export.")
613 if not self
.transports
:
615 "No network transport type specified for the export.")
617 for t
in self
.transports
:
618 match
= re
.match(r
'^TCP$|^UDP$', t
)
621 "'{}' is an invalid network transport type identifier"
626 if 4 in self
.protocols
:
629 "Pseudo path is required when NFSv4 protocol is used")
630 match
= re
.match(r
'^/[^><|&()]*$', self
.pseudo
)
633 "Export pseudo path ({}) is invalid".format(self
.pseudo
))
636 match
= re
.match(r
'^[^/><|:&()]+$', self
.tag
)
639 "Export tag ({}) is invalid".format(self
.tag
))
641 if self
.fsal
.name
== 'RGW' and 4 not in self
.protocols
and not self
.tag
:
643 "Tag is mandatory for RGW export when using only NFSv3")
646 def from_export_block(cls
, export_block
, cluster_id
, defaults
):
647 logger
.debug("parsing export block: %s", export_block
)
649 fsal_block
= [b
for b
in export_block
['_blocks_']
650 if b
['block_name'] == "FSAL"]
652 protocols
= export_block
.get('protocols', defaults
['protocols'])
653 if not isinstance(protocols
, list):
654 protocols
= [protocols
]
656 transports
= export_block
.get('transports', defaults
['transports'])
657 if not isinstance(transports
, list):
658 transports
= [transports
]
660 client_blocks
= [b
for b
in export_block
['_blocks_']
661 if b
['block_name'] == "CLIENT"]
663 return cls(export_block
['export_id'],
664 export_block
['path'],
665 FSal
.from_fsal_block(fsal_block
[0]),
668 export_block
.get('pseudo', None),
669 export_block
.get('tag', None),
670 export_block
.get('access_type', defaults
['access_type']),
671 export_block
.get('squash', defaults
['squash']),
672 export_block
.get('attr_expiration_time', None),
673 export_block
.get('security_label', False),
676 [Client
.from_client_block(client
)
677 for client
in client_blocks
])
679 def to_export_block(self
, defaults
):
680 # pylint: disable=too-many-branches
682 'block_name': 'EXPORT',
683 'export_id': self
.export_id
,
687 result
['pseudo'] = self
.pseudo
689 result
['tag'] = self
.tag
690 if 'access_type' not in defaults \
691 or self
.access_type
!= defaults
['access_type']:
692 result
['access_type'] = self
.access_type
693 if 'squash' not in defaults
or self
.squash
!= defaults
['squash']:
694 result
['squash'] = self
.squash
695 if self
.fsal
.name
== 'CEPH':
696 result
['attr_expiration_time'] = self
.attr_expiration_time
697 result
['security_label'] = self
.security_label
698 if 'protocols' not in defaults
:
699 result
['protocols'] = list(self
.protocols
)
701 def_proto
= defaults
['protocols']
702 if not isinstance(def_proto
, list):
703 def_proto
= set([def_proto
])
704 if self
.protocols
!= def_proto
:
705 result
['protocols'] = list(self
.protocols
)
706 if 'transports' not in defaults
:
707 result
['transports'] = list(self
.transports
)
709 def_transp
= defaults
['transports']
710 if not isinstance(def_transp
, list):
711 def_transp
= set([def_transp
])
712 if self
.transports
!= def_transp
:
713 result
['transports'] = list(self
.transports
)
715 result
['_blocks_'] = [self
.fsal
.to_fsal_block()]
716 result
['_blocks_'].extend([client
.to_client_block()
717 for client
in self
.clients
])
721 def from_dict(cls
, export_id
, ex_dict
, old_export
=None):
722 return cls(export_id
,
724 FSal
.from_dict(ex_dict
['fsal']),
725 ex_dict
['cluster_id'],
729 ex_dict
['access_type'],
731 old_export
.attr_expiration_time
if old_export
else None,
732 ex_dict
['security_label'],
733 ex_dict
['protocols'],
734 ex_dict
['transports'],
735 [Client
.from_dict(client
) for client
in ex_dict
['clients']])
739 'export_id': self
.export_id
,
741 'fsal': self
.fsal
.to_dict(),
742 'cluster_id': self
.cluster_id
,
743 'daemons': sorted(list(self
.daemons
)),
744 'pseudo': self
.pseudo
,
746 'access_type': self
.access_type
,
747 'squash': self
.squash
,
748 'security_label': self
.security_label
,
749 'protocols': sorted(list(self
.protocols
)),
750 'transports': sorted(list(self
.transports
)),
751 'clients': [client
.to_dict() for client
in self
.clients
]
755 class ClusterType(object):
757 # Ganesha clusters deployed by the Orchestrator.
758 ORCHESTRATOR
= 'orchestrator'
760 # Ganesha clusters deployed manually by the user. Specified by using the
761 # GANESHA_CLUSTERS_RADOS_POOL_NAMESPACE setting.
765 class GaneshaConf(object):
766 # pylint: disable=R0902
768 def __init__(self
, cluster_id
, rados_pool
, rados_namespace
, daemon_confs
=None):
769 self
.cluster_id
= cluster_id
770 self
.rados_pool
= rados_pool
771 self
.rados_namespace
= rados_namespace
772 self
.daemon_confs
= daemon_confs
if daemon_confs
is not None else []
773 self
.export_conf_blocks
= [] # type: ignore
774 self
.daemons_conf_blocks
= {} # type: ignore
778 self
._read
_raw
_config
()
781 def_block
= [b
for b
in self
.export_conf_blocks
782 if b
['block_name'] == "EXPORT_DEFAULTS"]
783 self
.export_defaults
= def_block
[0] if def_block
else {}
784 self
._defaults
= self
.ganesha_defaults(self
.export_defaults
)
786 for export_block
in [block
for block
in self
.export_conf_blocks
787 if block
['block_name'] == "EXPORT"]:
788 export
= Export
.from_export_block(export_block
, cluster_id
,
790 self
.exports
[export
.export_id
] = export
792 # link daemons to exports
793 self
._link
_daemons
_to
_exports
()
795 def _link_daemons_to_exports(self
):
796 raise NotImplementedError()
799 def instance(cls
, cluster_id
):
800 cluster
= Ganesha
.get_cluster(cluster_id
)
801 if cluster
['type'] == ClusterType
.ORCHESTRATOR
:
802 return GaneshaConfOrchestrator(cluster_id
, cluster
['pool'], cluster
['namespace'],
803 [cluster
['daemon_conf']])
804 if cluster
['type'] == ClusterType
.USER
:
805 return GaneshaConfUser(cluster_id
, cluster
['pool'], cluster
['namespace'])
806 raise NFSException('Unknown cluster type `{}` for cluster `{}`'.format(
807 cluster
['type'], cluster_id
))
809 def _read_raw_config(self
):
811 def _read_rados_obj(_obj
):
812 size
, _
= _obj
.stat()
813 return _obj
.read(size
).decode("utf-8")
815 with mgr
.rados
.open_ioctx(self
.rados_pool
) as ioctx
:
816 if self
.rados_namespace
:
817 ioctx
.set_namespace(self
.rados_namespace
)
818 objs
= ioctx
.list_objects()
820 if obj
.key
.startswith("export-"):
821 raw_config
= _read_rados_obj(obj
)
822 logger
.debug("read export configuration from rados "
823 "object %s/%s/%s:\n%s", self
.rados_pool
,
824 self
.rados_namespace
, obj
.key
, raw_config
)
825 self
.export_conf_blocks
.extend(
826 GaneshaConfParser(raw_config
).parse())
827 elif not self
.daemon_confs
and obj
.key
.startswith("conf-"):
828 # Read all `conf-xxx` for daemon configs.
829 raw_config
= _read_rados_obj(obj
)
830 logger
.debug("read daemon configuration from rados "
831 "object %s/%s/%s:\n%s", self
.rados_pool
,
832 self
.rados_namespace
, obj
.key
, raw_config
)
833 idx
= obj
.key
.find('-')
834 self
.daemons_conf_blocks
[obj
.key
[idx
+1:]] = \
835 GaneshaConfParser(raw_config
).parse()
837 if self
.daemon_confs
:
838 # When daemon configs are provided.
839 for conf
in self
.daemon_confs
:
840 size
, _
= ioctx
.stat(conf
)
841 raw_config
= ioctx
.read(conf
, size
).decode("utf-8")
842 logger
.debug("read daemon configuration from rados "
843 "object %s/%s/%s:\n%s", self
.rados_pool
,
844 self
.rados_namespace
, conf
, raw_config
)
845 self
.daemons_conf_blocks
[conf
] = \
846 GaneshaConfParser(raw_config
).parse()
848 def _write_raw_config(self
, conf_block
, obj
):
849 raw_config
= GaneshaConfParser
.write_conf(conf_block
)
850 with mgr
.rados
.open_ioctx(self
.rados_pool
) as ioctx
:
851 if self
.rados_namespace
:
852 ioctx
.set_namespace(self
.rados_namespace
)
853 ioctx
.write_full(obj
, raw_config
.encode('utf-8'))
855 "write configuration into rados object %s/%s/%s:\n%s",
856 self
.rados_pool
, self
.rados_namespace
, obj
, raw_config
)
859 def ganesha_defaults(cls
, export_defaults
):
862 https://github.com/nfs-ganesha/nfs-ganesha/blob/next/src/config_samples/export.txt
865 'access_type': export_defaults
.get('access_type', 'NONE'),
866 'protocols': export_defaults
.get('protocols', [3, 4]),
867 'transports': export_defaults
.get('transports', ['TCP', 'UDP']),
868 'squash': export_defaults
.get('squash', 'root_squash')
872 def format_squash(cls
, squash
):
875 if squash
.lower() in ["no_root_squash", "noidsquash", "none"]:
876 return "no_root_squash"
877 if squash
.lower() in ["rootid", "root_id_squash", "rootidsquash"]:
878 return "root_id_squash"
879 if squash
.lower() in ["root", "root_squash", "rootsquash"]:
881 if squash
.lower() in ["all", "all_squash", "allsquash",
882 "all_anonymous", "allanonymous"]:
884 logger
.error("could not parse squash value: %s", squash
)
885 raise NFSException("'{}' is an invalid squash option".format(squash
))
888 def format_protocol(cls
, protocol
):
889 if str(protocol
) in ["NFSV3", "3", "V3", "NFS3"]:
891 if str(protocol
) in ["NFSV4", "4", "V4", "NFS4"]:
893 logger
.error("could not parse protocol value: %s", protocol
)
894 raise NFSException("'{}' is an invalid NFS protocol version identifier"
898 def format_path(cls
, path
):
901 if len(path
) > 1 and path
[-1] == '/':
905 def validate(self
, export
: Export
):
908 if 4 in export
.protocols
: # NFSv4 protocol
911 for ex
in self
.list_exports():
912 if export
.tag
and ex
.tag
== export
.tag
:
914 "Another export exists with the same tag: {}"
917 if export
.pseudo
and ex
.pseudo
== export
.pseudo
:
919 "Another export exists with the same pseudo path: {}"
920 .format(export
.pseudo
))
925 if export
.pseudo
[:export
.pseudo
.rfind('/')+1].startswith(ex
.pseudo
):
926 if export
.pseudo
[len(ex
.pseudo
)] == '/':
927 if len(ex
.pseudo
) > len_prefix
:
928 len_prefix
= len(ex
.pseudo
)
932 # validate pseudo path
933 idx
= len(parent_export
.pseudo
) # type: ignore
934 idx
= idx
+ 1 if idx
> 1 else idx
935 real_path
= "{}/{}".format(
936 parent_export
.path
# type: ignore
937 if len(parent_export
.path
) > 1 else "", # type: ignore
939 if export
.fsal
.name
== 'CEPH':
941 if export
.path
!= real_path
and not cfs
.dir_exists(real_path
):
943 "Pseudo path ({}) invalid, path {} does not exist."
944 .format(export
.pseudo
, real_path
))
946 def _gen_export_id(self
):
947 exports
= sorted(self
.exports
)
956 def _persist_daemon_configuration(self
):
957 raise NotImplementedError()
959 def _save_export(self
, export
):
960 self
.validate(export
)
961 export
.fsal
.create_path(export
.path
)
962 export
.fsal
.fill_keys()
963 self
.exports
[export
.export_id
] = export
964 conf_block
= export
.to_export_block(self
.export_defaults
)
965 self
._write
_raw
_config
(conf_block
, "export-{}".format(export
.export_id
))
966 self
._persist
_daemon
_configuration
()
968 def _delete_export(self
, export_id
):
969 self
._persist
_daemon
_configuration
()
970 with mgr
.rados
.open_ioctx(self
.rados_pool
) as ioctx
:
971 if self
.rados_namespace
:
972 ioctx
.set_namespace(self
.rados_namespace
)
973 ioctx
.remove_object("export-{}".format(export_id
))
975 def list_exports(self
):
976 return [ex
for _
, ex
in self
.exports
.items()]
978 def create_export(self
, ex_dict
):
979 ex_id
= self
._gen
_export
_id
()
980 export
= Export
.from_dict(ex_id
, ex_dict
)
981 self
._save
_export
(export
)
984 def has_export(self
, export_id
):
985 return export_id
in self
.exports
987 def update_export(self
, ex_dict
):
988 if ex_dict
['export_id'] not in self
.exports
:
990 old_export
= self
.exports
[ex_dict
['export_id']]
991 del self
.exports
[ex_dict
['export_id']]
992 export
= Export
.from_dict(ex_dict
['export_id'], ex_dict
, old_export
)
993 self
._save
_export
(export
)
994 self
.exports
[export
.export_id
] = export
997 def remove_export(self
, export_id
):
998 if export_id
not in self
.exports
:
1000 export
= self
.exports
[export_id
]
1001 del self
.exports
[export_id
]
1002 self
._delete
_export
(export_id
)
1005 def get_export(self
, export_id
):
1006 if export_id
in self
.exports
:
1007 return self
.exports
[export_id
]
1010 def list_daemons(self
) -> List
[Dict
[str, Any
]]:
1011 raise NotImplementedError()
1013 def list_daemon_confs(self
):
1014 return self
.daemons_conf_blocks
.keys()
1016 def reload_daemons(self
, daemons
):
1017 with mgr
.rados
.open_ioctx(self
.rados_pool
) as ioctx
:
1018 if self
.rados_namespace
:
1019 ioctx
.set_namespace(self
.rados_namespace
)
1020 for daemon_id
in daemons
:
1021 ioctx
.notify("conf-{}".format(daemon_id
))
1024 class GaneshaConfOrchestrator(GaneshaConf
):
1026 def _get_orch_nfs_instances(cls
,
1027 service_name
: Optional
[str] = None) -> List
[DaemonDescription
]:
1029 return OrchClient
.instance().services
.\
1030 list_daemons(service_name
=service_name
, daemon_type
="nfs")
1031 except (RuntimeError, OrchestratorError
, ImportError):
1034 def _link_daemons_to_exports(self
):
1035 instances
= self
._get
_orch
_nfs
_instances
('nfs.{}'.format(self
.cluster_id
))
1036 daemon_ids
= {instance
.daemon_id
for instance
in instances
}
1037 for _
, daemon_blocks
in self
.daemons_conf_blocks
.items():
1038 for block
in daemon_blocks
:
1039 if block
['block_name'] == "%url":
1040 rados_url
= block
['value']
1041 _
, _
, obj
= Ganesha
.parse_rados_url(rados_url
)
1042 if obj
.startswith("export-"):
1043 export_id
= int(obj
[obj
.find('-')+1:])
1044 self
.exports
[export_id
].daemons
.update(daemon_ids
)
1046 def validate(self
, export
: Export
):
1047 daemons_list
= {d
['daemon_id'] for d
in self
.list_daemons()}
1048 if export
.daemons
and set(export
.daemons
) != daemons_list
:
1049 raise NFSException('Export should be linked to all daemons.')
1050 super().validate(export
)
1052 def _persist_daemon_configuration(self
):
1053 daemon_map
= {} # type: ignore
1054 for daemon_id
in self
.list_daemon_confs():
1055 daemon_map
[daemon_id
] = []
1057 for daemon_id
in self
.list_daemon_confs():
1058 for _
, ex
in self
.exports
.items():
1060 daemon_map
[daemon_id
].append({
1061 'block_name': "%url",
1062 'value': Ganesha
.make_rados_url(
1063 self
.rados_pool
, self
.rados_namespace
,
1064 "export-{}".format(ex
.export_id
))
1066 for daemon_id
, conf_blocks
in daemon_map
.items():
1067 self
._write
_raw
_config
(conf_blocks
, daemon_id
)
1069 def list_daemons(self
) -> List
[Dict
[str, Any
]]:
1070 instances
= self
._get
_orch
_nfs
_instances
('nfs.{}'.format(self
.cluster_id
))
1072 'cluster_id': self
.cluster_id
,
1073 'daemon_id': instance
.daemon_id
,
1074 'cluster_type': ClusterType
.ORCHESTRATOR
,
1075 'status': instance
.status
,
1076 'status_desc': instance
.status_desc
1077 } for instance
in instances
]
1079 def reload_daemons(self
, daemons
):
1080 with mgr
.rados
.open_ioctx(self
.rados_pool
) as ioctx
:
1081 if self
.rados_namespace
:
1082 ioctx
.set_namespace(self
.rados_namespace
)
1083 for daemon_id
in self
.list_daemon_confs():
1084 ioctx
.notify(daemon_id
)
1087 class GaneshaConfUser(GaneshaConf
):
1089 def _link_daemons_to_exports(self
):
1090 for daemon_id
, daemon_blocks
in self
.daemons_conf_blocks
.items():
1091 for block
in daemon_blocks
:
1092 if block
['block_name'] == "%url":
1093 rados_url
= block
['value']
1094 _
, _
, obj
= Ganesha
.parse_rados_url(rados_url
)
1095 if obj
.startswith("export-"):
1096 export_id
= int(obj
[obj
.find('-')+1:])
1097 self
.exports
[export_id
].daemons
.add(daemon_id
)
1099 def validate(self
, export
: Export
):
1100 daemons_list
= [d
['daemon_id'] for d
in self
.list_daemons()]
1101 for daemon_id
in export
.daemons
:
1102 if daemon_id
not in daemons_list
:
1103 raise NFSException("Daemon '{}' does not exist".format(daemon_id
))
1104 super().validate(export
)
1106 def _persist_daemon_configuration(self
):
1107 daemon_map
= {} # type: ignore
1108 for daemon_id
in self
.list_daemon_confs():
1109 daemon_map
[daemon_id
] = []
1111 for _
, ex
in self
.exports
.items():
1112 for daemon
in ex
.daemons
:
1113 daemon_map
[daemon
].append({
1114 'block_name': "%url",
1115 'value': Ganesha
.make_rados_url(
1116 self
.rados_pool
, self
.rados_namespace
,
1117 "export-{}".format(ex
.export_id
))
1119 for daemon_id
, conf_blocks
in daemon_map
.items():
1120 self
._write
_raw
_config
(conf_blocks
, "conf-{}".format(daemon_id
))
1122 def list_daemons(self
) -> List
[Dict
[str, Any
]]:
1124 'cluster_id': self
.cluster_id
,
1125 'cluster_type': ClusterType
.USER
,
1126 'daemon_id': daemon_id
,
1128 'status_desc': 'running'
1129 } for daemon_id
in self
.list_daemon_confs()]