3 from typing
import Tuple
, Optional
, List
, Dict
, Any
5 from mgr_module
import MgrModule
, CLICommand
, Option
, CLICheckNonemptyFileInput
8 from orchestrator
.module
import IngressType
10 from .export
import ExportMgr
, AppliedExportResults
11 from .cluster
import NFSCluster
12 from .utils
import available_clusters
14 log
= logging
.getLogger(__name__
)
17 class Module(orchestrator
.OrchestratorClientMixin
, MgrModule
):
18 MODULE_OPTIONS
: List
[Option
] = []
20 def __init__(self
, *args
: str, **kwargs
: Any
) -> None:
22 self
.lock
= threading
.Lock()
23 super(Module
, self
).__init
__(*args
, **kwargs
)
25 self
.export_mgr
= ExportMgr(self
)
26 self
.nfs
= NFSCluster(self
)
29 @CLICommand('nfs export create cephfs', perm
='rw')
30 @object_format.Responder()
31 def _cmd_nfs_export_create_cephfs(
36 path
: Optional
[str] = '/',
37 readonly
: Optional
[bool] = False,
38 client_addr
: Optional
[List
[str]] = None,
40 sectype
: Optional
[List
[str]] = None,
42 """Create a CephFS export"""
43 return self
.export_mgr
.create_export(
46 cluster_id
=cluster_id
,
47 pseudo_path
=pseudo_path
,
55 @CLICommand('nfs export create rgw', perm
='rw')
56 @object_format.Responder()
57 def _cmd_nfs_export_create_rgw(
61 bucket
: Optional
[str] = None,
62 user_id
: Optional
[str] = None,
63 readonly
: Optional
[bool] = False,
64 client_addr
: Optional
[List
[str]] = None,
66 sectype
: Optional
[List
[str]] = None,
68 """Create an RGW export"""
69 return self
.export_mgr
.create_export(
73 cluster_id
=cluster_id
,
74 pseudo_path
=pseudo_path
,
81 @CLICommand('nfs export rm', perm
='rw')
82 @object_format.EmptyResponder()
83 def _cmd_nfs_export_rm(self
, cluster_id
: str, pseudo_path
: str) -> None:
84 """Remove a cephfs export"""
85 return self
.export_mgr
.delete_export(cluster_id
=cluster_id
, pseudo_path
=pseudo_path
)
87 @CLICommand('nfs export delete', perm
='rw')
88 @object_format.EmptyResponder()
89 def _cmd_nfs_export_delete(self
, cluster_id
: str, pseudo_path
: str) -> None:
90 """Delete a cephfs export (DEPRECATED)"""
91 return self
.export_mgr
.delete_export(cluster_id
=cluster_id
, pseudo_path
=pseudo_path
)
93 @CLICommand('nfs export ls', perm
='r')
94 @object_format.Responder()
95 def _cmd_nfs_export_ls(self
, cluster_id
: str, detailed
: bool = False) -> List
[Any
]:
96 """List exports of a NFS cluster"""
97 return self
.export_mgr
.list_exports(cluster_id
=cluster_id
, detailed
=detailed
)
99 @CLICommand('nfs export info', perm
='r')
100 @object_format.Responder()
101 def _cmd_nfs_export_info(self
, cluster_id
: str, pseudo_path
: str) -> Dict
[str, Any
]:
102 """Fetch a export of a NFS cluster given the pseudo path/binding"""
103 return self
.export_mgr
.get_export(cluster_id
=cluster_id
, pseudo_path
=pseudo_path
)
105 @CLICommand('nfs export get', perm
='r')
106 @object_format.Responder()
107 def _cmd_nfs_export_get(self
, cluster_id
: str, pseudo_path
: str) -> Dict
[str, Any
]:
108 """Fetch a export of a NFS cluster given the pseudo path/binding (DEPRECATED)"""
109 return self
.export_mgr
.get_export(cluster_id
=cluster_id
, pseudo_path
=pseudo_path
)
111 @CLICommand('nfs export apply', perm
='rw')
112 @CLICheckNonemptyFileInput(desc
='Export JSON or Ganesha EXPORT specification')
113 @object_format.Responder()
114 def _cmd_nfs_export_apply(self
, cluster_id
: str, inbuf
: str) -> AppliedExportResults
:
115 """Create or update an export by `-i <json_or_ganesha_export_file>`"""
116 return self
.export_mgr
.apply_export(cluster_id
, export_config
=inbuf
)
118 @CLICommand('nfs cluster create', perm
='rw')
119 @object_format.EmptyResponder()
120 def _cmd_nfs_cluster_create(self
,
122 placement
: Optional
[str] = None,
123 ingress
: Optional
[bool] = None,
124 virtual_ip
: Optional
[str] = None,
125 ingress_mode
: Optional
[IngressType
] = None,
126 port
: Optional
[int] = None) -> None:
127 """Create an NFS Cluster"""
128 return self
.nfs
.create_nfs_cluster(cluster_id
=cluster_id
, placement
=placement
,
129 virtual_ip
=virtual_ip
, ingress
=ingress
,
130 ingress_mode
=ingress_mode
, port
=port
)
132 @CLICommand('nfs cluster rm', perm
='rw')
133 @object_format.EmptyResponder()
134 def _cmd_nfs_cluster_rm(self
, cluster_id
: str) -> None:
135 """Removes an NFS Cluster"""
136 return self
.nfs
.delete_nfs_cluster(cluster_id
=cluster_id
)
138 @CLICommand('nfs cluster delete', perm
='rw')
139 @object_format.EmptyResponder()
140 def _cmd_nfs_cluster_delete(self
, cluster_id
: str) -> None:
141 """Removes an NFS Cluster (DEPRECATED)"""
142 return self
.nfs
.delete_nfs_cluster(cluster_id
=cluster_id
)
144 @CLICommand('nfs cluster ls', perm
='r')
145 @object_format.Responder()
146 def _cmd_nfs_cluster_ls(self
) -> List
[str]:
147 """List NFS Clusters"""
148 return self
.nfs
.list_nfs_cluster()
150 @CLICommand('nfs cluster info', perm
='r')
151 @object_format.Responder()
152 def _cmd_nfs_cluster_info(self
, cluster_id
: Optional
[str] = None) -> Dict
[str, Any
]:
153 """Displays NFS Cluster info"""
154 return self
.nfs
.show_nfs_cluster_info(cluster_id
=cluster_id
)
156 @CLICommand('nfs cluster config get', perm
='r')
157 @object_format.ErrorResponseHandler()
158 def _cmd_nfs_cluster_config_get(self
, cluster_id
: str) -> Tuple
[int, str, str]:
159 """Fetch NFS-Ganesha config"""
160 conf
= self
.nfs
.get_nfs_cluster_config(cluster_id
=cluster_id
)
163 @CLICommand('nfs cluster config set', perm
='rw')
164 @CLICheckNonemptyFileInput(desc
='NFS-Ganesha Configuration')
165 @object_format.EmptyResponder()
166 def _cmd_nfs_cluster_config_set(self
, cluster_id
: str, inbuf
: str) -> None:
167 """Set NFS-Ganesha config by `-i <config_file>`"""
168 return self
.nfs
.set_nfs_cluster_config(cluster_id
=cluster_id
, nfs_config
=inbuf
)
170 @CLICommand('nfs cluster config reset', perm
='rw')
171 @object_format.EmptyResponder()
172 def _cmd_nfs_cluster_config_reset(self
, cluster_id
: str) -> None:
173 """Reset NFS-Ganesha Config to default"""
174 return self
.nfs
.reset_nfs_cluster_config(cluster_id
=cluster_id
)
176 def fetch_nfs_export_obj(self
) -> ExportMgr
:
177 return self
.export_mgr
179 def export_ls(self
) -> List
[Dict
[Any
, Any
]]:
180 return self
.export_mgr
.list_all_exports()
182 def export_get(self
, cluster_id
: str, export_id
: int) -> Optional
[Dict
[str, Any
]]:
183 return self
.export_mgr
.get_export_by_id(cluster_id
, export_id
)
185 def export_rm(self
, cluster_id
: str, pseudo
: str) -> None:
186 self
.export_mgr
.delete_export(cluster_id
=cluster_id
, pseudo_path
=pseudo
)
188 def cluster_ls(self
) -> List
[str]:
189 return available_clusters(self
)