]>
Commit | Line | Data |
---|---|---|
b3b6e05e TL |
1 | import logging |
2 | import socket | |
3 | import json | |
4 | import re | |
5 | from typing import cast, Dict, List, Any, Union, Optional | |
6 | ||
7 | from ceph.deployment.service_spec import NFSServiceSpec, PlacementSpec, IngressSpec | |
8 | ||
9 | import orchestrator | |
10 | ||
11 | from .exception import NFSInvalidOperation, ClusterNotFound | |
12 | from .utils import POOL_NAME, available_clusters, restart_nfs_service | |
13 | from .export import NFSRados, exception_handler | |
14 | ||
15 | log = logging.getLogger(__name__) | |
16 | ||
17 | ||
18 | def resolve_ip(hostname: str) -> str: | |
19 | try: | |
20 | r = socket.getaddrinfo(hostname, None, flags=socket.AI_CANONNAME, | |
21 | type=socket.SOCK_STREAM) | |
22 | # pick first v4 IP, if present | |
23 | for a in r: | |
24 | if a[0] == socket.AF_INET: | |
25 | return a[4][0] | |
26 | return r[0][4][0] | |
27 | except socket.gaierror as e: | |
28 | raise NFSInvalidOperation(f"Cannot resolve IP for host {hostname}: {e}") | |
29 | ||
30 | ||
31 | def cluster_setter(func): | |
32 | def set_pool_ns_clusterid(nfs, *args, **kwargs): | |
33 | nfs._set_pool_namespace(kwargs['cluster_id']) | |
34 | nfs._set_cluster_id(kwargs['cluster_id']) | |
35 | return func(nfs, *args, **kwargs) | |
36 | return set_pool_ns_clusterid | |
37 | ||
38 | ||
39 | def create_ganesha_pool(mgr, pool): | |
40 | pool_list = [p['pool_name'] for p in mgr.get_osdmap().dump().get('pools', [])] | |
41 | if pool not in pool_list: | |
42 | mgr.check_mon_command({'prefix': 'osd pool create', 'pool': pool}) | |
43 | mgr.check_mon_command({'prefix': 'osd pool application enable', | |
44 | 'pool': pool, | |
45 | 'app': 'nfs'}) | |
46 | ||
47 | ||
48 | class NFSCluster: | |
49 | def __init__(self, mgr): | |
50 | self.pool_name = POOL_NAME | |
51 | self.pool_ns = '' | |
52 | self.mgr = mgr | |
53 | ||
54 | def _set_cluster_id(self, cluster_id): | |
55 | self.cluster_id = cluster_id | |
56 | ||
57 | def _set_pool_namespace(self, cluster_id): | |
58 | self.pool_ns = cluster_id | |
59 | ||
60 | def _get_common_conf_obj_name(self): | |
61 | return f'conf-nfs.{self.cluster_id}' | |
62 | ||
63 | def _get_user_conf_obj_name(self): | |
64 | return f'userconf-nfs.{self.cluster_id}' | |
65 | ||
66 | def _call_orch_apply_nfs(self, placement, virtual_ip=None): | |
67 | if virtual_ip: | |
68 | # nfs + ingress | |
69 | # run NFS on non-standard port | |
70 | spec = NFSServiceSpec(service_type='nfs', service_id=self.cluster_id, | |
71 | pool=self.pool_name, namespace=self.pool_ns, | |
72 | placement=PlacementSpec.from_string(placement), | |
73 | # use non-default port so we don't conflict with ingress | |
74 | port=12049) | |
75 | completion = self.mgr.apply_nfs(spec) | |
76 | orchestrator.raise_if_exception(completion) | |
77 | ispec = IngressSpec(service_type='ingress', | |
78 | service_id='nfs.' + self.cluster_id, | |
79 | backend_service='nfs.' + self.cluster_id, | |
80 | frontend_port=2049, # default nfs port | |
81 | monitor_port=9049, | |
82 | virtual_ip=virtual_ip) | |
83 | completion = self.mgr.apply_ingress(ispec) | |
84 | orchestrator.raise_if_exception(completion) | |
85 | else: | |
86 | # standalone nfs | |
87 | spec = NFSServiceSpec(service_type='nfs', service_id=self.cluster_id, | |
88 | pool=self.pool_name, namespace=self.pool_ns, | |
89 | placement=PlacementSpec.from_string(placement)) | |
90 | completion = self.mgr.apply_nfs(spec) | |
91 | orchestrator.raise_if_exception(completion) | |
92 | ||
93 | def create_empty_rados_obj(self): | |
94 | common_conf = self._get_common_conf_obj_name() | |
95 | NFSRados(self.mgr, self.pool_ns).write_obj('', self._get_common_conf_obj_name()) | |
96 | log.info(f"Created empty object:{common_conf}") | |
97 | ||
98 | def delete_config_obj(self): | |
99 | NFSRados(self.mgr, self.pool_ns).remove_all_obj() | |
100 | log.info(f"Deleted {self._get_common_conf_obj_name()} object and all objects in " | |
101 | f"{self.pool_ns}") | |
102 | ||
103 | @cluster_setter | |
104 | def create_nfs_cluster(self, | |
105 | cluster_id: str, | |
106 | placement: Optional[str], | |
107 | virtual_ip: Optional[str], | |
108 | ingress: Optional[bool] = None): | |
109 | try: | |
110 | if virtual_ip and not ingress: | |
111 | raise NFSInvalidOperation('virtual_ip can only be provided with ingress enabled') | |
112 | if not virtual_ip and ingress: | |
113 | raise NFSInvalidOperation('ingress currently requires a virtual_ip') | |
114 | invalid_str = re.search('[^A-Za-z0-9-_.]', cluster_id) | |
115 | if invalid_str: | |
116 | raise NFSInvalidOperation(f"cluster id {cluster_id} is invalid. " | |
117 | f"{invalid_str.group()} is char not permitted") | |
118 | ||
119 | create_ganesha_pool(self.mgr, self.pool_name) | |
120 | ||
121 | self.create_empty_rados_obj() | |
122 | ||
123 | if cluster_id not in available_clusters(self.mgr): | |
124 | self._call_orch_apply_nfs(placement, virtual_ip) | |
125 | return 0, "NFS Cluster Created Successfully", "" | |
126 | return 0, "", f"{cluster_id} cluster already exists" | |
127 | except Exception as e: | |
128 | return exception_handler(e, f"NFS Cluster {cluster_id} could not be created") | |
129 | ||
130 | @cluster_setter | |
131 | def delete_nfs_cluster(self, cluster_id): | |
132 | try: | |
133 | cluster_list = available_clusters(self.mgr) | |
134 | if cluster_id in cluster_list: | |
135 | self.mgr.export_mgr.delete_all_exports(cluster_id) | |
136 | completion = self.mgr.remove_service('ingress.nfs.' + self.cluster_id) | |
137 | orchestrator.raise_if_exception(completion) | |
138 | completion = self.mgr.remove_service('nfs.' + self.cluster_id) | |
139 | orchestrator.raise_if_exception(completion) | |
140 | self.delete_config_obj() | |
141 | return 0, "NFS Cluster Deleted Successfully", "" | |
142 | return 0, "", "Cluster does not exist" | |
143 | except Exception as e: | |
144 | return exception_handler(e, f"Failed to delete NFS Cluster {cluster_id}") | |
145 | ||
146 | def list_nfs_cluster(self): | |
147 | try: | |
148 | return 0, '\n'.join(available_clusters(self.mgr)), "" | |
149 | except Exception as e: | |
150 | return exception_handler(e, "Failed to list NFS Cluster") | |
151 | ||
152 | def _show_nfs_cluster_info(self, cluster_id: str) -> Dict[str, Any]: | |
153 | self._set_cluster_id(cluster_id) | |
154 | completion = self.mgr.list_daemons(daemon_type='nfs') | |
155 | orchestrator.raise_if_exception(completion) | |
156 | backends: List[Dict[str, Union[str, int]]] = [] | |
157 | # Here completion.result is a list DaemonDescription objects | |
158 | for cluster in completion.result: | |
159 | if self.cluster_id == cluster.service_id(): | |
160 | try: | |
161 | if cluster.ip: | |
162 | ip = cluster.ip | |
163 | else: | |
164 | c = self.mgr.get_hosts() | |
165 | orchestrator.raise_if_exception(c) | |
166 | hosts = [h for h in c.result | |
167 | if h.hostname == cluster.hostname] | |
168 | if hosts: | |
169 | ip = resolve_ip(hosts[0].addr) | |
170 | else: | |
171 | # sigh | |
172 | ip = resolve_ip(cluster.hostname) | |
173 | backends.append({ | |
174 | "hostname": cluster.hostname, | |
175 | "ip": ip, | |
176 | "port": cluster.ports[0] | |
177 | }) | |
178 | except orchestrator.OrchestratorError: | |
179 | continue | |
180 | ||
181 | r: Dict[str, Any] = { | |
182 | 'virtual_ip': None, | |
183 | 'backend': backends, | |
184 | } | |
185 | sc = self.mgr.describe_service(service_type='ingress') | |
186 | orchestrator.raise_if_exception(sc) | |
187 | for i in sc.result: | |
188 | spec = cast(IngressSpec, i.spec) | |
189 | if spec.backend_service == f'nfs.{cluster_id}': | |
190 | r['virtual_ip'] = i.virtual_ip.split('/')[0] | |
191 | if i.ports: | |
192 | r['port'] = i.ports[0] | |
193 | if len(i.ports) > 1: | |
194 | r['monitor_port'] = i.ports[1] | |
195 | return r | |
196 | ||
197 | def show_nfs_cluster_info(self, cluster_id=None): | |
198 | try: | |
199 | cluster_ls = [] | |
200 | info_res = {} | |
201 | if cluster_id: | |
202 | cluster_ls = [cluster_id] | |
203 | else: | |
204 | cluster_ls = available_clusters(self.mgr) | |
205 | ||
206 | for cluster_id in cluster_ls: | |
207 | res = self._show_nfs_cluster_info(cluster_id) | |
208 | if res: | |
209 | info_res[cluster_id] = res | |
210 | return (0, json.dumps(info_res, indent=4), '') | |
211 | except Exception as e: | |
212 | return exception_handler(e, "Failed to show info for cluster") | |
213 | ||
214 | @cluster_setter | |
215 | def set_nfs_cluster_config(self, cluster_id, nfs_config): | |
216 | try: | |
b3b6e05e TL |
217 | if cluster_id in available_clusters(self.mgr): |
218 | rados_obj = NFSRados(self.mgr, self.pool_ns) | |
219 | if rados_obj.check_user_config(): | |
220 | return 0, "", "NFS-Ganesha User Config already exists" | |
221 | rados_obj.write_obj(nfs_config, self._get_user_conf_obj_name(), | |
222 | self._get_common_conf_obj_name()) | |
223 | restart_nfs_service(self.mgr, cluster_id) | |
224 | return 0, "NFS-Ganesha Config Set Successfully", "" | |
225 | raise ClusterNotFound() | |
226 | except NotImplementedError: | |
227 | return 0, "NFS-Ganesha Config Added Successfully "\ | |
228 | "(Manual Restart of NFS PODS required)", "" | |
229 | except Exception as e: | |
230 | return exception_handler(e, f"Setting NFS-Ganesha Config failed for {cluster_id}") | |
231 | ||
232 | @cluster_setter | |
233 | def reset_nfs_cluster_config(self, cluster_id): | |
234 | try: | |
235 | if cluster_id in available_clusters(self.mgr): | |
236 | rados_obj = NFSRados(self.mgr, self.pool_ns) | |
237 | if not rados_obj.check_user_config(): | |
238 | return 0, "", "NFS-Ganesha User Config does not exist" | |
239 | rados_obj.remove_obj(self._get_user_conf_obj_name(), | |
240 | self._get_common_conf_obj_name()) | |
241 | restart_nfs_service(self.mgr, cluster_id) | |
242 | return 0, "NFS-Ganesha Config Reset Successfully", "" | |
243 | raise ClusterNotFound() | |
244 | except NotImplementedError: | |
245 | return 0, "NFS-Ganesha Config Removed Successfully "\ | |
246 | "(Manual Restart of NFS PODS required)", "" | |
247 | except Exception as e: | |
248 | return exception_handler(e, f"Resetting NFS-Ganesha Config failed for {cluster_id}") |