3 from typing
import Tuple
, Optional
, List
, Dict
, Any
5 from mgr_module
import MgrModule
, CLICommand
, Option
, CLICheckNonemptyFileInput
8 from .export
import ExportMgr
9 from .cluster
import NFSCluster
10 from .utils
import available_clusters
12 log
= logging
.getLogger(__name__
)
15 class Module(orchestrator
.OrchestratorClientMixin
, MgrModule
):
16 MODULE_OPTIONS
: List
[Option
] = []
18 def __init__(self
, *args
: str, **kwargs
: Any
) -> None:
20 self
.lock
= threading
.Lock()
21 super(Module
, self
).__init
__(*args
, **kwargs
)
23 self
.export_mgr
= ExportMgr(self
)
24 self
.nfs
= NFSCluster(self
)
27 @CLICommand('nfs export create cephfs', perm
='rw')
28 def _cmd_nfs_export_create_cephfs(
33 path
: Optional
[str] = '/',
34 readonly
: Optional
[bool] = False,
35 client_addr
: Optional
[List
[str]] = None,
37 ) -> Tuple
[int, str, str]:
38 """Create a CephFS export"""
39 return self
.export_mgr
.create_export(fsal_type
='cephfs', fs_name
=fsname
,
40 cluster_id
=cluster_id
, pseudo_path
=pseudo_path
,
41 read_only
=readonly
, path
=path
,
42 squash
=squash
, addr
=client_addr
)
44 @CLICommand('nfs export create rgw', perm
='rw')
45 def _cmd_nfs_export_create_rgw(
49 bucket
: Optional
[str] = None,
50 user_id
: Optional
[str] = None,
51 readonly
: Optional
[bool] = False,
52 client_addr
: Optional
[List
[str]] = None,
54 ) -> Tuple
[int, str, str]:
55 """Create an RGW export"""
56 return self
.export_mgr
.create_export(fsal_type
='rgw', bucket
=bucket
,
58 cluster_id
=cluster_id
, pseudo_path
=pseudo_path
,
59 read_only
=readonly
, squash
=squash
,
62 @CLICommand('nfs export rm', perm
='rw')
63 def _cmd_nfs_export_rm(self
, cluster_id
: str, pseudo_path
: str) -> Tuple
[int, str, str]:
64 """Remove a cephfs export"""
65 return self
.export_mgr
.delete_export(cluster_id
=cluster_id
, pseudo_path
=pseudo_path
)
67 @CLICommand('nfs export delete', perm
='rw')
68 def _cmd_nfs_export_delete(self
, cluster_id
: str, pseudo_path
: str) -> Tuple
[int, str, str]:
69 """Delete a cephfs export (DEPRECATED)"""
70 return self
.export_mgr
.delete_export(cluster_id
=cluster_id
, pseudo_path
=pseudo_path
)
72 @CLICommand('nfs export ls', perm
='r')
73 def _cmd_nfs_export_ls(self
, cluster_id
: str, detailed
: bool = False) -> Tuple
[int, str, str]:
74 """List exports of a NFS cluster"""
75 return self
.export_mgr
.list_exports(cluster_id
=cluster_id
, detailed
=detailed
)
77 @CLICommand('nfs export info', perm
='r')
78 def _cmd_nfs_export_info(self
, cluster_id
: str, pseudo_path
: str) -> Tuple
[int, str, str]:
79 """Fetch a export of a NFS cluster given the pseudo path/binding"""
80 return self
.export_mgr
.get_export(cluster_id
=cluster_id
, pseudo_path
=pseudo_path
)
82 @CLICommand('nfs export get', perm
='r')
83 def _cmd_nfs_export_get(self
, cluster_id
: str, pseudo_path
: str) -> Tuple
[int, str, str]:
84 """Fetch a export of a NFS cluster given the pseudo path/binding (DEPRECATED)"""
85 return self
.export_mgr
.get_export(cluster_id
=cluster_id
, pseudo_path
=pseudo_path
)
87 @CLICommand('nfs export apply', perm
='rw')
88 @CLICheckNonemptyFileInput(desc
='Export JSON or Ganesha EXPORT specification')
89 def _cmd_nfs_export_apply(self
, cluster_id
: str, inbuf
: str) -> Tuple
[int, str, str]:
90 """Create or update an export by `-i <json_or_ganesha_export_file>`"""
91 return self
.export_mgr
.apply_export(cluster_id
, export_config
=inbuf
)
93 @CLICommand('nfs cluster create', perm
='rw')
94 def _cmd_nfs_cluster_create(self
,
96 placement
: Optional
[str] = None,
97 ingress
: Optional
[bool] = None,
98 virtual_ip
: Optional
[str] = None,
99 port
: Optional
[int] = None) -> Tuple
[int, str, str]:
100 """Create an NFS Cluster"""
101 return self
.nfs
.create_nfs_cluster(cluster_id
=cluster_id
, placement
=placement
,
102 virtual_ip
=virtual_ip
, ingress
=ingress
,
105 @CLICommand('nfs cluster rm', perm
='rw')
106 def _cmd_nfs_cluster_rm(self
, cluster_id
: str) -> Tuple
[int, str, str]:
107 """Removes an NFS Cluster"""
108 return self
.nfs
.delete_nfs_cluster(cluster_id
=cluster_id
)
110 @CLICommand('nfs cluster delete', perm
='rw')
111 def _cmd_nfs_cluster_delete(self
, cluster_id
: str) -> Tuple
[int, str, str]:
112 """Removes an NFS Cluster (DEPRECATED)"""
113 return self
.nfs
.delete_nfs_cluster(cluster_id
=cluster_id
)
115 @CLICommand('nfs cluster ls', perm
='r')
116 def _cmd_nfs_cluster_ls(self
) -> Tuple
[int, str, str]:
117 """List NFS Clusters"""
118 return self
.nfs
.list_nfs_cluster()
120 @CLICommand('nfs cluster info', perm
='r')
121 def _cmd_nfs_cluster_info(self
, cluster_id
: Optional
[str] = None) -> Tuple
[int, str, str]:
122 """Displays NFS Cluster info"""
123 return self
.nfs
.show_nfs_cluster_info(cluster_id
=cluster_id
)
125 @CLICommand('nfs cluster config get', perm
='r')
126 def _cmd_nfs_cluster_config_get(self
, cluster_id
: str) -> Tuple
[int, str, str]:
127 """Fetch NFS-Ganesha config"""
128 return self
.nfs
.get_nfs_cluster_config(cluster_id
=cluster_id
)
130 @CLICommand('nfs cluster config set', perm
='rw')
131 @CLICheckNonemptyFileInput(desc
='NFS-Ganesha Configuration')
132 def _cmd_nfs_cluster_config_set(self
, cluster_id
: str, inbuf
: str) -> Tuple
[int, str, str]:
133 """Set NFS-Ganesha config by `-i <config_file>`"""
134 return self
.nfs
.set_nfs_cluster_config(cluster_id
=cluster_id
, nfs_config
=inbuf
)
136 @CLICommand('nfs cluster config reset', perm
='rw')
137 def _cmd_nfs_cluster_config_reset(self
, cluster_id
: str) -> Tuple
[int, str, str]:
138 """Reset NFS-Ganesha Config to default"""
139 return self
.nfs
.reset_nfs_cluster_config(cluster_id
=cluster_id
)
141 def fetch_nfs_export_obj(self
) -> ExportMgr
:
142 return self
.export_mgr
144 def export_ls(self
) -> List
[Dict
[Any
, Any
]]:
145 return self
.export_mgr
.list_all_exports()
147 def export_get(self
, cluster_id
: str, export_id
: int) -> Optional
[Dict
[str, Any
]]:
148 return self
.export_mgr
.get_export_by_id(cluster_id
, export_id
)
150 def export_rm(self
, cluster_id
: str, pseudo
: str) -> None:
151 self
.export_mgr
.delete_export(cluster_id
=cluster_id
, pseudo_path
=pseudo
)
153 def cluster_ls(self
) -> List
[str]:
154 return available_clusters(self
)