]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/nfs/export.py
6f129e55c528204944e54772ba5673f163b9a593
[ceph.git] / ceph / src / pybind / mgr / nfs / export.py
1 import errno
2 import json
3 import logging
4 from typing import (
5 List,
6 Any,
7 Dict,
8 Tuple,
9 Optional,
10 TYPE_CHECKING,
11 TypeVar,
12 Callable,
13 Set,
14 cast)
15 from os.path import normpath
16
17 from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
18
19 from orchestrator import NoOrchestrator
20 from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
21
22 from .ganesha_conf import (
23 CephFSFSAL,
24 Export,
25 GaneshaConfParser,
26 RGWFSAL,
27 RawBlock,
28 format_block)
29 from .exception import NFSException, NFSInvalidOperation, FSNotFound
30 from .utils import (
31 CONF_PREFIX,
32 EXPORT_PREFIX,
33 USER_CONF_PREFIX,
34 export_obj_name,
35 conf_obj_name,
36 available_clusters,
37 check_fs,
38 restart_nfs_service)
39
40 if TYPE_CHECKING:
41 from nfs.module import Module
42
43 FuncT = TypeVar('FuncT', bound=Callable)
44
45 log = logging.getLogger(__name__)
46
47
48 def known_cluster_ids(mgr: 'Module') -> Set[str]:
49 """Return the set of known cluster IDs."""
50 try:
51 clusters = set(available_clusters(mgr))
52 except NoOrchestrator:
53 clusters = nfs_rados_configs(mgr.rados)
54 return clusters
55
56
57 def export_cluster_checker(func: FuncT) -> FuncT:
58 def cluster_check(
59 export: 'ExportMgr',
60 *args: Any,
61 **kwargs: Any
62 ) -> Tuple[int, str, str]:
63 """
64 This method checks if cluster exists
65 """
66 clusters = known_cluster_ids(export.mgr)
67 cluster_id: str = kwargs['cluster_id']
68 log.debug("checking for %r in known nfs clusters: %r",
69 cluster_id, clusters)
70 if cluster_id not in clusters:
71 return -errno.ENOENT, "", "Cluster does not exist"
72 return func(export, *args, **kwargs)
73 return cast(FuncT, cluster_check)
74
75
76 def exception_handler(
77 exception_obj: Exception,
78 log_msg: str = ""
79 ) -> Tuple[int, str, str]:
80 if log_msg:
81 log.exception(log_msg)
82 return getattr(exception_obj, 'errno', -1), "", str(exception_obj)
83
84
85 def _check_rados_notify(ioctx: Any, obj: str) -> None:
86 try:
87 ioctx.notify(obj)
88 except TimedOut:
89 log.exception("Ganesha timed out")
90
91
92 def normalize_path(path: str) -> str:
93 if path:
94 path = normpath(path.strip())
95 if path[:2] == "//":
96 path = path[1:]
97 return path
98
99
100 class NFSRados:
101 def __init__(self, rados: 'Rados', namespace: str) -> None:
102 self.rados = rados
103 self.pool = POOL_NAME
104 self.namespace = namespace
105
106 def _make_rados_url(self, obj: str) -> str:
107 return "rados://{}/{}/{}".format(self.pool, self.namespace, obj)
108
109 def _create_url_block(self, obj_name: str) -> RawBlock:
110 return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
111
112 def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
113 with self.rados.open_ioctx(self.pool) as ioctx:
114 ioctx.set_namespace(self.namespace)
115 ioctx.write_full(obj, conf_block.encode('utf-8'))
116 if not config_obj:
117 # Return after creating empty common config object
118 return
119 log.debug("write configuration into rados object %s/%s/%s",
120 self.pool, self.namespace, obj)
121
122 # Add created obj url to common config obj
123 ioctx.append(config_obj, format_block(
124 self._create_url_block(obj)).encode('utf-8'))
125 _check_rados_notify(ioctx, config_obj)
126 log.debug("Added %s url to %s", obj, config_obj)
127
128 def read_obj(self, obj: str) -> Optional[str]:
129 with self.rados.open_ioctx(self.pool) as ioctx:
130 ioctx.set_namespace(self.namespace)
131 try:
132 return ioctx.read(obj, 1048576).decode()
133 except ObjectNotFound:
134 return None
135
136 def update_obj(self, conf_block: str, obj: str, config_obj: str,
137 should_notify: Optional[bool] = True) -> None:
138 with self.rados.open_ioctx(self.pool) as ioctx:
139 ioctx.set_namespace(self.namespace)
140 ioctx.write_full(obj, conf_block.encode('utf-8'))
141 log.debug("write configuration into rados object %s/%s/%s",
142 self.pool, self.namespace, obj)
143 if should_notify:
144 _check_rados_notify(ioctx, config_obj)
145 log.debug("Update export %s in %s", obj, config_obj)
146
147 def remove_obj(self, obj: str, config_obj: str) -> None:
148 with self.rados.open_ioctx(self.pool) as ioctx:
149 ioctx.set_namespace(self.namespace)
150 export_urls = ioctx.read(config_obj)
151 url = '%url "{}"\n\n'.format(self._make_rados_url(obj))
152 export_urls = export_urls.replace(url.encode('utf-8'), b'')
153 ioctx.remove_object(obj)
154 ioctx.write_full(config_obj, export_urls)
155 _check_rados_notify(ioctx, config_obj)
156 log.debug("Object deleted: %s", url)
157
158 def remove_all_obj(self) -> None:
159 with self.rados.open_ioctx(self.pool) as ioctx:
160 ioctx.set_namespace(self.namespace)
161 for obj in ioctx.list_objects():
162 obj.remove()
163
164 def check_user_config(self) -> bool:
165 with self.rados.open_ioctx(self.pool) as ioctx:
166 ioctx.set_namespace(self.namespace)
167 for obj in ioctx.list_objects():
168 if obj.key.startswith(USER_CONF_PREFIX):
169 return True
170 return False
171
172
173 def nfs_rados_configs(rados: 'Rados', nfs_pool: str = POOL_NAME) -> Set[str]:
174 """Return a set of all the namespaces in the nfs_pool where nfs
175 configuration objects are found. The namespaces also correspond
176 to the cluster ids.
177 """
178 ns: Set[str] = set()
179 prefixes = (EXPORT_PREFIX, CONF_PREFIX, USER_CONF_PREFIX)
180 with rados.open_ioctx(nfs_pool) as ioctx:
181 ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
182 for obj in ioctx.list_objects():
183 if obj.key.startswith(prefixes):
184 ns.add(obj.nspace)
185 return ns
186
187
188 class ExportMgr:
189 def __init__(
190 self,
191 mgr: 'Module',
192 export_ls: Optional[Dict[str, List[Export]]] = None
193 ) -> None:
194 self.mgr = mgr
195 self.rados_pool = POOL_NAME
196 self._exports: Optional[Dict[str, List[Export]]] = export_ls
197
198 @property
199 def exports(self) -> Dict[str, List[Export]]:
200 if self._exports is None:
201 self._exports = {}
202 log.info("Begin export parsing")
203 for cluster_id in known_cluster_ids(self.mgr):
204 self.export_conf_objs = [] # type: List[Export]
205 self._read_raw_config(cluster_id)
206 self._exports[cluster_id] = self.export_conf_objs
207 log.info("Exports parsed successfully %s", self.exports.items())
208 return self._exports
209
210 def _fetch_export(
211 self,
212 cluster_id: str,
213 pseudo_path: str
214 ) -> Optional[Export]:
215 try:
216 for ex in self.exports[cluster_id]:
217 if ex.pseudo == pseudo_path:
218 return ex
219 return None
220 except KeyError:
221 log.info('no exports for cluster %s', cluster_id)
222 return None
223
224 def _fetch_export_id(
225 self,
226 cluster_id: str,
227 export_id: int
228 ) -> Optional[Export]:
229 try:
230 for ex in self.exports[cluster_id]:
231 if ex.export_id == export_id:
232 return ex
233 return None
234 except KeyError:
235 log.info(f'no exports for cluster {cluster_id}')
236 return None
237
238 def _delete_export_user(self, export: Export) -> None:
239 if isinstance(export.fsal, CephFSFSAL):
240 assert export.fsal.user_id
241 self.mgr.check_mon_command({
242 'prefix': 'auth rm',
243 'entity': 'client.{}'.format(export.fsal.user_id),
244 })
245 log.info("Deleted export user %s", export.fsal.user_id)
246 elif isinstance(export.fsal, RGWFSAL):
247 # do nothing; we're using the bucket owner creds.
248 pass
249
250 def _create_export_user(self, export: Export) -> None:
251 if isinstance(export.fsal, CephFSFSAL):
252 fsal = cast(CephFSFSAL, export.fsal)
253 assert fsal.fs_name
254 fsal.user_id = f"nfs.{export.cluster_id}.{export.export_id}"
255 fsal.cephx_key = self._create_user_key(
256 export.cluster_id, fsal.user_id, export.path, fsal.fs_name
257 )
258 log.debug("Successfully created user %s for cephfs path %s", fsal.user_id, export.path)
259
260 elif isinstance(export.fsal, RGWFSAL):
261 rgwfsal = cast(RGWFSAL, export.fsal)
262 if not rgwfsal.user_id:
263 assert export.path
264 ret, out, err = self.mgr.tool_exec(
265 ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path]
266 )
267 if ret:
268 raise NFSException(f'Failed to fetch owner for bucket {export.path}')
269 j = json.loads(out)
270 owner = j.get('owner', '')
271 rgwfsal.user_id = owner
272 assert rgwfsal.user_id
273 ret, out, err = self.mgr.tool_exec([
274 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id
275 ])
276 if ret:
277 raise NFSException(
278 f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}'
279 )
280 j = json.loads(out)
281
282 # FIXME: make this more tolerate of unexpected output?
283 rgwfsal.access_key_id = j['keys'][0]['access_key']
284 rgwfsal.secret_access_key = j['keys'][0]['secret_key']
285 log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path)
286
287 def _gen_export_id(self, cluster_id: str) -> int:
288 exports = sorted([ex.export_id for ex in self.exports[cluster_id]])
289 nid = 1
290 for e_id in exports:
291 if e_id == nid:
292 nid += 1
293 else:
294 break
295 return nid
296
297 def _read_raw_config(self, rados_namespace: str) -> None:
298 with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
299 ioctx.set_namespace(rados_namespace)
300 for obj in ioctx.list_objects():
301 if obj.key.startswith(EXPORT_PREFIX):
302 size, _ = obj.stat()
303 raw_config = obj.read(size)
304 raw_config = raw_config.decode("utf-8")
305 log.debug("read export configuration from rados "
306 "object %s/%s/%s", self.rados_pool,
307 rados_namespace, obj.key)
308 self.export_conf_objs.append(Export.from_export_block(
309 GaneshaConfParser(raw_config).parse()[0], rados_namespace))
310
311 def _save_export(self, cluster_id: str, export: Export) -> None:
312 self.exports[cluster_id].append(export)
313 self._rados(cluster_id).write_obj(
314 format_block(export.to_export_block()),
315 export_obj_name(export.export_id),
316 conf_obj_name(export.cluster_id)
317 )
318
319 def _delete_export(
320 self,
321 cluster_id: str,
322 pseudo_path: Optional[str],
323 export_obj: Optional[Export] = None
324 ) -> Tuple[int, str, str]:
325 try:
326 if export_obj:
327 export: Optional[Export] = export_obj
328 else:
329 assert pseudo_path
330 export = self._fetch_export(cluster_id, pseudo_path)
331
332 if export:
333 if pseudo_path:
334 self._rados(cluster_id).remove_obj(
335 export_obj_name(export.export_id), conf_obj_name(cluster_id))
336 self.exports[cluster_id].remove(export)
337 self._delete_export_user(export)
338 if not self.exports[cluster_id]:
339 del self.exports[cluster_id]
340 log.debug("Deleted all exports for cluster %s", cluster_id)
341 return 0, "Successfully deleted export", ""
342 return 0, "", "Export does not exist"
343 except Exception as e:
344 return exception_handler(e, f"Failed to delete {pseudo_path} export for {cluster_id}")
345
346 def _fetch_export_obj(self, cluster_id: str, ex_id: int) -> Optional[Export]:
347 try:
348 with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
349 ioctx.set_namespace(cluster_id)
350 export = Export.from_export_block(
351 GaneshaConfParser(
352 ioctx.read(export_obj_name(ex_id)).decode("utf-8")
353 ).parse()[0],
354 cluster_id
355 )
356 return export
357 except ObjectNotFound:
358 log.exception("Export ID: %s not found", ex_id)
359 return None
360
361 def _update_export(self, cluster_id: str, export: Export,
362 need_nfs_service_restart: bool) -> None:
363 self.exports[cluster_id].append(export)
364 self._rados(cluster_id).update_obj(
365 format_block(export.to_export_block()),
366 export_obj_name(export.export_id), conf_obj_name(export.cluster_id),
367 should_notify=not need_nfs_service_restart)
368 if need_nfs_service_restart:
369 restart_nfs_service(self.mgr, export.cluster_id)
370
371 @export_cluster_checker
372 def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Tuple[int, str, str]:
373 # if addr(s) are provided, construct client list and adjust outer block
374 clients = []
375 if addr:
376 clients = [{
377 'addresses': addr,
378 'access_type': 'ro' if kwargs['read_only'] else 'rw',
379 'squash': kwargs['squash'],
380 }]
381 kwargs['squash'] = 'none'
382 kwargs['clients'] = clients
383
384 if clients:
385 kwargs['access_type'] = "none"
386 elif kwargs['read_only']:
387 kwargs['access_type'] = "RO"
388 else:
389 kwargs['access_type'] = "RW"
390
391 if kwargs['cluster_id'] not in self.exports:
392 self.exports[kwargs['cluster_id']] = []
393
394 try:
395 fsal_type = kwargs.pop('fsal_type')
396 if fsal_type == 'cephfs':
397 return self.create_cephfs_export(**kwargs)
398 if fsal_type == 'rgw':
399 return self.create_rgw_export(**kwargs)
400 raise NotImplementedError()
401 except Exception as e:
402 return exception_handler(e, f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
403
404 @export_cluster_checker
405 def delete_export(self,
406 cluster_id: str,
407 pseudo_path: str) -> Tuple[int, str, str]:
408 return self._delete_export(cluster_id, pseudo_path)
409
410 def delete_all_exports(self, cluster_id: str) -> None:
411 try:
412 export_list = list(self.exports[cluster_id])
413 except KeyError:
414 log.info("No exports to delete")
415 return
416 for export in export_list:
417 ret, out, err = self._delete_export(cluster_id=cluster_id, pseudo_path=None,
418 export_obj=export)
419 if ret != 0:
420 raise NFSException(f"Failed to delete exports: {err} and {ret}")
421 log.info("All exports successfully deleted for cluster id: %s", cluster_id)
422
423 def list_all_exports(self) -> List[Dict[str, Any]]:
424 r = []
425 for cluster_id, ls in self.exports.items():
426 r.extend([e.to_dict() for e in ls])
427 return r
428
429 @export_cluster_checker
430 def list_exports(self,
431 cluster_id: str,
432 detailed: bool = False) -> Tuple[int, str, str]:
433 try:
434 if detailed:
435 result_d = [export.to_dict() for export in self.exports[cluster_id]]
436 return 0, json.dumps(result_d, indent=2), ''
437 else:
438 result_ps = [export.pseudo for export in self.exports[cluster_id]]
439 return 0, json.dumps(result_ps, indent=2), ''
440
441 except KeyError:
442 log.warning("No exports to list for %s", cluster_id)
443 return 0, '', ''
444 except Exception as e:
445 return exception_handler(e, f"Failed to list exports for {cluster_id}")
446
447 def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]:
448 export = self._fetch_export(cluster_id, pseudo_path)
449 if export:
450 return export.to_dict()
451 log.warning(f"No {pseudo_path} export to show for {cluster_id}")
452 return None
453
454 @export_cluster_checker
455 def get_export(
456 self,
457 cluster_id: str,
458 pseudo_path: str,
459 ) -> Tuple[int, str, str]:
460 try:
461 export_dict = self._get_export_dict(cluster_id, pseudo_path)
462 if export_dict:
463 return 0, json.dumps(export_dict, indent=2), ''
464 log.warning("No %s export to show for %s", pseudo_path, cluster_id)
465 return 0, '', ''
466 except Exception as e:
467 return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}")
468
469 def get_export_by_id(
470 self,
471 cluster_id: str,
472 export_id: int
473 ) -> Optional[Dict[str, Any]]:
474 export = self._fetch_export_id(cluster_id, export_id)
475 return export.to_dict() if export else None
476
477 def get_export_by_pseudo(
478 self,
479 cluster_id: str,
480 pseudo_path: str
481 ) -> Optional[Dict[str, Any]]:
482 export = self._fetch_export(cluster_id, pseudo_path)
483 return export.to_dict() if export else None
484
485 def apply_export(self, cluster_id: str, export_config: str) -> Tuple[int, str, str]:
486 try:
487 if not export_config:
488 raise NFSInvalidOperation("Empty Config!!")
489 try:
490 j = json.loads(export_config)
491 except ValueError:
492 # okay, not JSON. is it an EXPORT block?
493 try:
494 blocks = GaneshaConfParser(export_config).parse()
495 exports = [
496 Export.from_export_block(block, cluster_id)
497 for block in blocks
498 ]
499 j = [export.to_dict() for export in exports]
500 except Exception as ex:
501 raise NFSInvalidOperation(f"Input must be JSON or a ganesha EXPORT block: {ex}")
502
503 # check export type
504 if isinstance(j, list):
505 ret, out, err = (0, '', '')
506 for export in j:
507 try:
508 r, o, e = self._apply_export(cluster_id, export)
509 except Exception as ex:
510 r, o, e = exception_handler(ex, f'Failed to apply export: {ex}')
511 if r:
512 ret = r
513 if o:
514 out += o + '\n'
515 if e:
516 err += e + '\n'
517 return ret, out, err
518 else:
519 r, o, e = self._apply_export(cluster_id, j)
520 return r, o, e
521 except NotImplementedError:
522 return 0, " Manual Restart of NFS PODS required for successful update of exports", ""
523 except Exception as e:
524 return exception_handler(e, f'Failed to update export: {e}')
525
526 def _update_user_id(
527 self,
528 cluster_id: str,
529 path: str,
530 fs_name: str,
531 user_id: str
532 ) -> None:
533 osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
534 self.rados_pool, cluster_id, fs_name)
535 # NFS-Ganesha can dynamically enforce an export's access type changes, but Ceph server
536 # daemons can't dynamically enforce changes in Ceph user caps of the Ceph clients. To
537 # allow dynamic updates of CephFS NFS exports, always set FSAL Ceph user's MDS caps with
538 # path restricted read-write access. Rely on the ganesha servers to enforce the export
539 # access type requested for the NFS clients.
540 self.mgr.check_mon_command({
541 'prefix': 'auth caps',
542 'entity': f'client.{user_id}',
543 'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow rw path={}'.format(path)],
544 })
545
546 log.info("Export user updated %s", user_id)
547
548 def _create_user_key(
549 self,
550 cluster_id: str,
551 entity: str,
552 path: str,
553 fs_name: str,
554 ) -> str:
555 osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
556 self.rados_pool, cluster_id, fs_name)
557 nfs_caps = [
558 'mon', 'allow r',
559 'osd', osd_cap,
560 'mds', 'allow rw path={}'.format(path)
561 ]
562
563 ret, out, err = self.mgr.mon_command({
564 'prefix': 'auth get-or-create',
565 'entity': 'client.{}'.format(entity),
566 'caps': nfs_caps,
567 'format': 'json',
568 })
569 if ret == -errno.EINVAL and 'does not match' in err:
570 ret, out, err = self.mgr.mon_command({
571 'prefix': 'auth caps',
572 'entity': 'client.{}'.format(entity),
573 'caps': nfs_caps,
574 'format': 'json',
575 })
576 if err:
577 raise NFSException(f'Failed to update caps for {entity}: {err}')
578 ret, out, err = self.mgr.mon_command({
579 'prefix': 'auth get',
580 'entity': 'client.{}'.format(entity),
581 'format': 'json',
582 })
583 if err:
584 raise NFSException(f'Failed to fetch caps for {entity}: {err}')
585
586 json_res = json.loads(out)
587 log.info("Export user created is %s", json_res[0]['entity'])
588 return json_res[0]['key']
589
590 def create_export_from_dict(self,
591 cluster_id: str,
592 ex_id: int,
593 ex_dict: Dict[str, Any]) -> Export:
594 pseudo_path = ex_dict.get("pseudo")
595 if not pseudo_path:
596 raise NFSInvalidOperation("export must specify pseudo path")
597
598 path = ex_dict.get("path")
599 if path is None:
600 raise NFSInvalidOperation("export must specify path")
601 path = normalize_path(path)
602
603 fsal = ex_dict.get("fsal", {})
604 fsal_type = fsal.get("name")
605 if fsal_type == NFS_GANESHA_SUPPORTED_FSALS[1]:
606 if '/' in path and path != '/':
607 raise NFSInvalidOperation('"/" is not allowed in path with bucket name')
608 elif fsal_type == NFS_GANESHA_SUPPORTED_FSALS[0]:
609 fs_name = fsal.get("fs_name")
610 if not fs_name:
611 raise NFSInvalidOperation("export FSAL must specify fs_name")
612 if not check_fs(self.mgr, fs_name):
613 raise FSNotFound(fs_name)
614
615 user_id = f"nfs.{cluster_id}.{ex_id}"
616 if "user_id" in fsal and fsal["user_id"] != user_id:
617 raise NFSInvalidOperation(f"export FSAL user_id must be '{user_id}'")
618 else:
619 raise NFSInvalidOperation(f"NFS Ganesha supported FSALs are {NFS_GANESHA_SUPPORTED_FSALS}."
620 "Export must specify any one of it.")
621
622 ex_dict["fsal"] = fsal
623 ex_dict["cluster_id"] = cluster_id
624 export = Export.from_dict(ex_id, ex_dict)
625 export.validate(self.mgr)
626 log.debug("Successfully created %s export-%s from dict for cluster %s",
627 fsal_type, ex_id, cluster_id)
628 return export
629
630 def create_cephfs_export(self,
631 fs_name: str,
632 cluster_id: str,
633 pseudo_path: str,
634 read_only: bool,
635 path: str,
636 squash: str,
637 access_type: str,
638 clients: list = []) -> Tuple[int, str, str]:
639 pseudo_path = normalize_path(pseudo_path)
640
641 if not self._fetch_export(cluster_id, pseudo_path):
642 export = self.create_export_from_dict(
643 cluster_id,
644 self._gen_export_id(cluster_id),
645 {
646 "pseudo": pseudo_path,
647 "path": path,
648 "access_type": access_type,
649 "squash": squash,
650 "fsal": {
651 "name": NFS_GANESHA_SUPPORTED_FSALS[0],
652 "fs_name": fs_name,
653 },
654 "clients": clients,
655 }
656 )
657 log.debug("creating cephfs export %s", export)
658 self._create_export_user(export)
659 self._save_export(cluster_id, export)
660 result = {
661 "bind": export.pseudo,
662 "fs": fs_name,
663 "path": export.path,
664 "cluster": cluster_id,
665 "mode": export.access_type,
666 }
667 return (0, json.dumps(result, indent=4), '')
668 return 0, "", "Export already exists"
669
670 def create_rgw_export(self,
671 cluster_id: str,
672 pseudo_path: str,
673 access_type: str,
674 read_only: bool,
675 squash: str,
676 bucket: Optional[str] = None,
677 user_id: Optional[str] = None,
678 clients: list = []) -> Tuple[int, str, str]:
679 pseudo_path = normalize_path(pseudo_path)
680
681 if not bucket and not user_id:
682 return -errno.EINVAL, "", "Must specify either bucket or user_id"
683
684 if not self._fetch_export(cluster_id, pseudo_path):
685 export = self.create_export_from_dict(
686 cluster_id,
687 self._gen_export_id(cluster_id),
688 {
689 "pseudo": pseudo_path,
690 "path": bucket or '/',
691 "access_type": access_type,
692 "squash": squash,
693 "fsal": {
694 "name": NFS_GANESHA_SUPPORTED_FSALS[1],
695 "user_id": user_id,
696 },
697 "clients": clients,
698 }
699 )
700 log.debug("creating rgw export %s", export)
701 self._create_export_user(export)
702 self._save_export(cluster_id, export)
703 result = {
704 "bind": export.pseudo,
705 "path": export.path,
706 "cluster": cluster_id,
707 "mode": export.access_type,
708 "squash": export.squash,
709 }
710 return (0, json.dumps(result, indent=4), '')
711 return 0, "", "Export already exists"
712
713 def _apply_export(
714 self,
715 cluster_id: str,
716 new_export_dict: Dict,
717 ) -> Tuple[int, str, str]:
718 for k in ['path', 'pseudo']:
719 if k not in new_export_dict:
720 raise NFSInvalidOperation(f'Export missing required field {k}')
721 if cluster_id not in self.exports:
722 self.exports[cluster_id] = []
723
724 new_export_dict['path'] = normalize_path(new_export_dict['path'])
725 new_export_dict['pseudo'] = normalize_path(new_export_dict['pseudo'])
726
727 old_export = self._fetch_export(cluster_id, new_export_dict['pseudo'])
728 if old_export:
729 # Check if export id matches
730 if new_export_dict.get('export_id'):
731 if old_export.export_id != new_export_dict.get('export_id'):
732 raise NFSInvalidOperation('Export ID changed, Cannot update export')
733 else:
734 new_export_dict['export_id'] = old_export.export_id
735 elif new_export_dict.get('export_id'):
736 old_export = self._fetch_export_obj(cluster_id, new_export_dict['export_id'])
737 if old_export:
738 # re-fetch via old pseudo
739 old_export = self._fetch_export(cluster_id, old_export.pseudo)
740 assert old_export
741 log.debug("export %s pseudo %s -> %s",
742 old_export.export_id, old_export.pseudo, new_export_dict['pseudo'])
743
744 new_export = self.create_export_from_dict(
745 cluster_id,
746 new_export_dict.get('export_id', self._gen_export_id(cluster_id)),
747 new_export_dict
748 )
749
750 if not old_export:
751 self._create_export_user(new_export)
752 self._save_export(cluster_id, new_export)
753 return 0, f'Added export {new_export.pseudo}', ''
754
755 need_nfs_service_restart = True
756 if old_export.fsal.name != new_export.fsal.name:
757 raise NFSInvalidOperation('FSAL change not allowed')
758 if old_export.pseudo != new_export.pseudo:
759 log.debug('export %s pseudo %s -> %s',
760 new_export.export_id, old_export.pseudo, new_export.pseudo)
761
762 if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]:
763 old_fsal = cast(CephFSFSAL, old_export.fsal)
764 new_fsal = cast(CephFSFSAL, new_export.fsal)
765 if old_fsal.user_id != new_fsal.user_id:
766 self._delete_export_user(old_export)
767 self._create_export_user(new_export)
768 elif (
769 old_export.path != new_export.path
770 or old_fsal.fs_name != new_fsal.fs_name
771 ):
772 self._update_user_id(
773 cluster_id,
774 new_export.path,
775 cast(str, new_fsal.fs_name),
776 cast(str, new_fsal.user_id)
777 )
778 new_fsal.cephx_key = old_fsal.cephx_key
779 else:
780 expected_mds_caps = 'allow rw path={}'.format(new_export.path)
781 entity = new_fsal.user_id
782 ret, out, err = self.mgr.mon_command({
783 'prefix': 'auth get',
784 'entity': 'client.{}'.format(entity),
785 'format': 'json',
786 })
787 if ret:
788 raise NFSException(f'Failed to fetch caps for {entity}: {err}')
789 actual_mds_caps = json.loads(out)[0]['caps'].get('mds')
790 if actual_mds_caps != expected_mds_caps:
791 self._update_user_id(
792 cluster_id,
793 new_export.path,
794 cast(str, new_fsal.fs_name),
795 cast(str, new_fsal.user_id)
796 )
797 elif old_export.pseudo == new_export.pseudo:
798 need_nfs_service_restart = False
799 new_fsal.cephx_key = old_fsal.cephx_key
800
801 if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]:
802 old_rgw_fsal = cast(RGWFSAL, old_export.fsal)
803 new_rgw_fsal = cast(RGWFSAL, new_export.fsal)
804 if old_rgw_fsal.user_id != new_rgw_fsal.user_id:
805 self._delete_export_user(old_export)
806 self._create_export_user(new_export)
807 elif old_rgw_fsal.access_key_id != new_rgw_fsal.access_key_id:
808 raise NFSInvalidOperation('access_key_id change is not allowed')
809 elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key:
810 raise NFSInvalidOperation('secret_access_key change is not allowed')
811
812 self.exports[cluster_id].remove(old_export)
813
814 self._update_export(cluster_id, new_export, need_nfs_service_restart)
815
816 return 0, f"Updated export {new_export.pseudo}", ""
817
818 def _rados(self, cluster_id: str) -> NFSRados:
819 """Return a new NFSRados object for the given cluster id."""
820 return NFSRados(self.mgr.rados, cluster_id)