)
+def _validate_sec_type(sec_type: str) -> None:
+ valid_sec_types = ["none", "sys", "krb5", "krb5i", "krb5p"]
+ if not isinstance(sec_type, str) or sec_type not in valid_sec_types:
+ raise NFSInvalidOperation(
+ f"SecType {sec_type} invalid, valid types are {valid_sec_types}")
+
+
class RawBlock():
def __init__(self, block_name: str, blocks: List['RawBlock'] = [], values: Dict[str, Any] = {}):
if not values: # workaround mutable default argument
protocols: List[int],
transports: List[str],
fsal: FSAL,
- clients: Optional[List[Client]] = None) -> None:
+ clients: Optional[List[Client]] = None,
+ sectype: Optional[List[str]] = None) -> None:
self.export_id = export_id
self.path = path
self.fsal = fsal
self.protocols = protocols
self.transports = transports
self.clients: List[Client] = clients or []
+ self.sectype = sectype
@classmethod
def from_export_block(cls, export_block: RawBlock, cluster_id: str) -> 'Export':
elif not transports:
transports = []
+ # if this module wrote the ganesha conf the param is camelcase
+ # "SecType". but for compatiblity with manually edited ganesha confs,
+ # accept "sectype" too.
+ sectype = (export_block.values.get("SecType")
+ or export_block.values.get("sectype") or None)
return cls(export_block.values['export_id'],
export_block.values['path'],
cluster_id,
transports,
FSAL.from_fsal_block(fsal_blocks[0]),
[Client.from_client_block(client)
- for client in client_blocks])
+ for client in client_blocks],
+ sectype=sectype)
def to_export_block(self) -> RawBlock:
- result = RawBlock('EXPORT', values={
+ values = {
'export_id': self.export_id,
'path': self.path,
'pseudo': self.pseudo,
'security_label': self.security_label,
'protocols': self.protocols,
'transports': self.transports,
- })
+ }
+ if self.sectype:
+ values['SecType'] = self.sectype
+ result = RawBlock("EXPORT", values=values)
result.blocks = [
self.fsal.to_fsal_block()
] + [
ex_dict.get('protocols', [4]),
ex_dict.get('transports', ['TCP']),
FSAL.from_dict(ex_dict.get('fsal', {})),
- [Client.from_dict(client) for client in ex_dict.get('clients', [])])
+ [Client.from_dict(client) for client in ex_dict.get('clients', [])],
+ sectype=ex_dict.get("sectype"))
def to_dict(self) -> Dict[str, Any]:
- return {
+ values = {
'export_id': self.export_id,
'path': self.path,
'cluster_id': self.cluster_id,
'fsal': self.fsal.to_dict(),
'clients': [client.to_dict() for client in self.clients]
}
+ if self.sectype:
+ values['sectype'] = self.sectype
+ return values
def validate(self, mgr: 'Module') -> None:
if not isabs(self.pseudo) or self.pseudo == "/":
else:
raise NFSInvalidOperation('FSAL {self.fsal.name} not supported')
+ for st in (self.sectype or []):
+ _validate_sec_type(st)
+
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Export):
return False