]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/nfs/cluster.py
d558a3a37a1f329fc3674221974db7b8606517d9
5 from typing
import cast
, Dict
, List
, Any
, Union
, Optional
, TYPE_CHECKING
7 from mgr_module
import NFS_POOL_NAME
as POOL_NAME
8 from ceph
.deployment
.service_spec
import NFSServiceSpec
, PlacementSpec
, IngressSpec
9 from object_format
import ErrorResponse
12 from orchestrator
.module
import IngressType
14 from .exception
import NFSInvalidOperation
, ClusterNotFound
16 ManualRestartRequired
,
22 from .export
import NFSRados
25 from nfs
.module
import Module
26 from mgr_module
import MgrModule
29 log
= logging
.getLogger(__name__
)
32 def resolve_ip(hostname
: str) -> str:
34 r
= socket
.getaddrinfo(hostname
, None, flags
=socket
.AI_CANONNAME
,
35 type=socket
.SOCK_STREAM
)
36 # pick first v4 IP, if present
38 if a
[0] == socket
.AF_INET
:
41 except socket
.gaierror
as e
:
42 raise NFSInvalidOperation(f
"Cannot resolve IP for host {hostname}: {e}")
45 def create_ganesha_pool(mgr
: 'MgrModule') -> None:
46 pool_list
= [p
['pool_name'] for p
in mgr
.get_osdmap().dump().get('pools', [])]
47 if POOL_NAME
not in pool_list
:
48 mgr
.check_mon_command({'prefix': 'osd pool create',
50 'yes_i_really_mean_it': True})
51 mgr
.check_mon_command({'prefix': 'osd pool application enable',
54 log
.debug("Successfully created nfs-ganesha pool %s", POOL_NAME
)
58 def __init__(self
, mgr
: 'Module') -> None:
61 def _call_orch_apply_nfs(
64 placement
: Optional
[str] = None,
65 virtual_ip
: Optional
[str] = None,
66 ingress_mode
: Optional
[IngressType
] = None,
67 port
: Optional
[int] = None,
70 port
= 2049 # default nfs port
73 # run NFS on non-standard port
75 ingress_mode
= IngressType
.default
76 ingress_mode
= ingress_mode
.canonicalize()
77 pspec
= PlacementSpec
.from_string(placement
)
78 if ingress_mode
== IngressType
.keepalive_only
:
79 # enforce count=1 for nfs over keepalive only
82 ganesha_port
= 10000 + port
# semi-arbitrary, fix me someday
83 frontend_port
: Optional
[int] = port
84 virtual_ip_for_ganesha
: Optional
[str] = None
85 keepalive_only
: bool = False
86 enable_haproxy_protocol
: bool = False
87 if ingress_mode
== IngressType
.haproxy_protocol
:
88 enable_haproxy_protocol
= True
89 elif ingress_mode
== IngressType
.keepalive_only
:
91 virtual_ip_for_ganesha
= virtual_ip
.split('/')[0]
95 spec
= NFSServiceSpec(service_type
='nfs', service_id
=cluster_id
,
97 # use non-default port so we don't conflict with ingress
99 virtual_ip
=virtual_ip_for_ganesha
,
100 enable_haproxy_protocol
=enable_haproxy_protocol
)
101 completion
= self
.mgr
.apply_nfs(spec
)
102 orchestrator
.raise_if_exception(completion
)
103 ispec
= IngressSpec(service_type
='ingress',
104 service_id
='nfs.' + cluster_id
,
105 backend_service
='nfs.' + cluster_id
,
107 frontend_port
=frontend_port
,
108 monitor_port
=7000 + port
, # semi-arbitrary, fix me someday
109 virtual_ip
=virtual_ip
,
110 keepalive_only
=keepalive_only
,
111 enable_haproxy_protocol
=enable_haproxy_protocol
)
112 completion
= self
.mgr
.apply_ingress(ispec
)
113 orchestrator
.raise_if_exception(completion
)
116 spec
= NFSServiceSpec(service_type
='nfs', service_id
=cluster_id
,
117 placement
=PlacementSpec
.from_string(placement
),
119 completion
= self
.mgr
.apply_nfs(spec
)
120 orchestrator
.raise_if_exception(completion
)
121 log
.debug("Successfully deployed nfs daemons with cluster id %s and placement %s",
122 cluster_id
, placement
)
124 def create_empty_rados_obj(self
, cluster_id
: str) -> None:
125 common_conf
= conf_obj_name(cluster_id
)
126 self
._rados
(cluster_id
).write_obj('', conf_obj_name(cluster_id
))
127 log
.info("Created empty object:%s", common_conf
)
129 def delete_config_obj(self
, cluster_id
: str) -> None:
130 self
._rados
(cluster_id
).remove_all_obj()
131 log
.info("Deleted %s object and all objects in %s",
132 conf_obj_name(cluster_id
), cluster_id
)
134 def create_nfs_cluster(
137 placement
: Optional
[str],
138 virtual_ip
: Optional
[str],
139 ingress
: Optional
[bool] = None,
140 ingress_mode
: Optional
[IngressType
] = None,
141 port
: Optional
[int] = None,
145 # validate virtual_ip value: ip_address throws a ValueError
146 # exception in case it's not a valid ipv4 or ipv6 address
147 ip
= virtual_ip
.split('/')[0]
148 ipaddress
.ip_address(ip
)
149 if virtual_ip
and not ingress
:
150 raise NFSInvalidOperation('virtual_ip can only be provided with ingress enabled')
151 if not virtual_ip
and ingress
:
152 raise NFSInvalidOperation('ingress currently requires a virtual_ip')
153 if ingress_mode
and not ingress
:
154 raise NFSInvalidOperation('--ingress-mode must be passed along with --ingress')
155 invalid_str
= re
.search('[^A-Za-z0-9-_.]', cluster_id
)
157 raise NFSInvalidOperation(f
"cluster id {cluster_id} is invalid. "
158 f
"{invalid_str.group()} is char not permitted")
160 create_ganesha_pool(self
.mgr
)
162 self
.create_empty_rados_obj(cluster_id
)
164 if cluster_id
not in available_clusters(self
.mgr
):
165 self
._call
_orch
_apply
_nfs
(cluster_id
, placement
, virtual_ip
, ingress_mode
, port
)
167 raise NonFatalError(f
"{cluster_id} cluster already exists")
168 except Exception as e
:
169 log
.exception(f
"NFS Cluster {cluster_id} could not be created")
170 raise ErrorResponse
.wrap(e
)
172 def delete_nfs_cluster(self
, cluster_id
: str) -> None:
174 cluster_list
= available_clusters(self
.mgr
)
175 if cluster_id
in cluster_list
:
176 self
.mgr
.export_mgr
.delete_all_exports(cluster_id
)
177 completion
= self
.mgr
.remove_service('ingress.nfs.' + cluster_id
)
178 orchestrator
.raise_if_exception(completion
)
179 completion
= self
.mgr
.remove_service('nfs.' + cluster_id
)
180 orchestrator
.raise_if_exception(completion
)
181 self
.delete_config_obj(cluster_id
)
183 raise NonFatalError("Cluster does not exist")
184 except Exception as e
:
185 log
.exception(f
"Failed to delete NFS Cluster {cluster_id}")
186 raise ErrorResponse
.wrap(e
)
188 def list_nfs_cluster(self
) -> List
[str]:
190 return available_clusters(self
.mgr
)
191 except Exception as e
:
192 log
.exception("Failed to list NFS Cluster")
193 raise ErrorResponse
.wrap(e
)
195 def _show_nfs_cluster_info(self
, cluster_id
: str) -> Dict
[str, Any
]:
196 completion
= self
.mgr
.list_daemons(daemon_type
='nfs')
197 # Here completion.result is a list DaemonDescription objects
198 clusters
= orchestrator
.raise_if_exception(completion
)
199 backends
: List
[Dict
[str, Union
[Any
]]] = []
201 for cluster
in clusters
:
202 if cluster_id
== cluster
.service_id():
203 assert cluster
.hostname
208 c
= self
.mgr
.get_hosts()
209 orchestrator
.raise_if_exception(c
)
210 hosts
= [h
for h
in c
.result
or []
211 if h
.hostname
== cluster
.hostname
]
213 ip
= resolve_ip(hosts
[0].addr
)
216 ip
= resolve_ip(cluster
.hostname
)
218 "hostname": cluster
.hostname
,
220 "port": cluster
.ports
[0] if cluster
.ports
else None
222 except orchestrator
.OrchestratorError
:
225 r
: Dict
[str, Any
] = {
229 sc
= self
.mgr
.describe_service(service_type
='ingress')
230 services
= orchestrator
.raise_if_exception(sc
)
232 spec
= cast(IngressSpec
, i
.spec
)
233 if spec
.backend_service
== f
'nfs.{cluster_id}':
234 r
['virtual_ip'] = i
.virtual_ip
.split('/')[0] if i
.virtual_ip
else None
236 r
['port'] = i
.ports
[0]
238 r
['monitor_port'] = i
.ports
[1]
239 log
.debug("Successfully fetched %s info: %s", cluster_id
, r
)
242 def show_nfs_cluster_info(self
, cluster_id
: Optional
[str] = None) -> Dict
[str, Any
]:
244 if cluster_id
and cluster_id
not in available_clusters(self
.mgr
):
245 raise ClusterNotFound()
248 cluster_ls
= [cluster_id
]
250 cluster_ls
= available_clusters(self
.mgr
)
252 for cluster_id
in cluster_ls
:
253 res
= self
._show
_nfs
_cluster
_info
(cluster_id
)
255 info_res
[cluster_id
] = res
257 except Exception as e
:
258 log
.exception("Failed to show info for cluster")
259 raise ErrorResponse
.wrap(e
)
261 def get_nfs_cluster_config(self
, cluster_id
: str) -> str:
263 if cluster_id
in available_clusters(self
.mgr
):
264 rados_obj
= self
._rados
(cluster_id
)
265 conf
= rados_obj
.read_obj(user_conf_obj_name(cluster_id
))
267 raise ClusterNotFound()
268 except Exception as e
:
269 log
.exception(f
"Fetching NFS-Ganesha Config failed for {cluster_id}")
270 raise ErrorResponse
.wrap(e
)
272 def set_nfs_cluster_config(self
, cluster_id
: str, nfs_config
: str) -> None:
274 if cluster_id
in available_clusters(self
.mgr
):
275 rados_obj
= self
._rados
(cluster_id
)
276 if rados_obj
.check_user_config():
277 raise NonFatalError("NFS-Ganesha User Config already exists")
278 rados_obj
.write_obj(nfs_config
, user_conf_obj_name(cluster_id
),
279 conf_obj_name(cluster_id
))
280 log
.debug("Successfully saved %s's user config: \n %s", cluster_id
, nfs_config
)
281 restart_nfs_service(self
.mgr
, cluster_id
)
283 raise ClusterNotFound()
284 except NotImplementedError:
285 raise ManualRestartRequired("NFS-Ganesha Config Added Successfully")
286 except Exception as e
:
287 log
.exception(f
"Setting NFS-Ganesha Config failed for {cluster_id}")
288 raise ErrorResponse
.wrap(e
)
290 def reset_nfs_cluster_config(self
, cluster_id
: str) -> None:
292 if cluster_id
in available_clusters(self
.mgr
):
293 rados_obj
= self
._rados
(cluster_id
)
294 if not rados_obj
.check_user_config():
295 raise NonFatalError("NFS-Ganesha User Config does not exist")
296 rados_obj
.remove_obj(user_conf_obj_name(cluster_id
),
297 conf_obj_name(cluster_id
))
298 restart_nfs_service(self
.mgr
, cluster_id
)
300 raise ClusterNotFound()
301 except NotImplementedError:
302 raise ManualRestartRequired("NFS-Ganesha Config Removed Successfully")
303 except Exception as e
:
304 log
.exception(f
"Resetting NFS-Ganesha Config failed for {cluster_id}")
305 raise ErrorResponse
.wrap(e
)
307 def _rados(self
, cluster_id
: str) -> NFSRados
:
308 """Return a new NFSRados object for the given cluster id."""
309 return NFSRados(self
.mgr
.rados
, cluster_id
)