]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/nfs/export.py
import ceph quincy 17.2.6
[ceph.git] / ceph / src / pybind / mgr / nfs / export.py
1 import errno
2 import json
3 import logging
4 from typing import (
5 List,
6 Any,
7 Dict,
8 Tuple,
9 Optional,
10 TYPE_CHECKING,
11 TypeVar,
12 Callable,
13 Set,
14 cast)
15 from os.path import normpath
16
17 from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
18
19 from orchestrator import NoOrchestrator
20 from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
21
22 from .ganesha_conf import (
23 CephFSFSAL,
24 Export,
25 GaneshaConfParser,
26 RGWFSAL,
27 RawBlock,
28 format_block)
29 from .exception import NFSException, NFSInvalidOperation, FSNotFound
30 from .utils import (
31 CONF_PREFIX,
32 EXPORT_PREFIX,
33 USER_CONF_PREFIX,
34 export_obj_name,
35 conf_obj_name,
36 available_clusters,
37 check_fs,
38 restart_nfs_service)
39
40 if TYPE_CHECKING:
41 from nfs.module import Module
42
43 FuncT = TypeVar('FuncT', bound=Callable)
44
45 log = logging.getLogger(__name__)
46
47
48 def known_cluster_ids(mgr: 'Module') -> Set[str]:
49 """Return the set of known cluster IDs."""
50 try:
51 clusters = set(available_clusters(mgr))
52 except NoOrchestrator:
53 clusters = nfs_rados_configs(mgr.rados)
54 return clusters
55
56
57 def export_cluster_checker(func: FuncT) -> FuncT:
58 def cluster_check(
59 export: 'ExportMgr',
60 *args: Any,
61 **kwargs: Any
62 ) -> Tuple[int, str, str]:
63 """
64 This method checks if cluster exists
65 """
66 clusters = known_cluster_ids(export.mgr)
67 cluster_id: str = kwargs['cluster_id']
68 log.debug("checking for %r in known nfs clusters: %r",
69 cluster_id, clusters)
70 if cluster_id not in clusters:
71 return -errno.ENOENT, "", "Cluster does not exist"
72 return func(export, *args, **kwargs)
73 return cast(FuncT, cluster_check)
74
75
76 def exception_handler(
77 exception_obj: Exception,
78 log_msg: str = ""
79 ) -> Tuple[int, str, str]:
80 if log_msg:
81 log.exception(log_msg)
82 return getattr(exception_obj, 'errno', -1), "", str(exception_obj)
83
84
85 def _check_rados_notify(ioctx: Any, obj: str) -> None:
86 try:
87 ioctx.notify(obj)
88 except TimedOut:
89 log.exception("Ganesha timed out")
90
91
92 def normalize_path(path: str) -> str:
93 if path:
94 path = normpath(path.strip())
95 if path[:2] == "//":
96 path = path[1:]
97 return path
98
99
100 class NFSRados:
101 def __init__(self, rados: 'Rados', namespace: str) -> None:
102 self.rados = rados
103 self.pool = POOL_NAME
104 self.namespace = namespace
105
106 def _make_rados_url(self, obj: str) -> str:
107 return "rados://{}/{}/{}".format(self.pool, self.namespace, obj)
108
109 def _create_url_block(self, obj_name: str) -> RawBlock:
110 return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
111
112 def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
113 with self.rados.open_ioctx(self.pool) as ioctx:
114 ioctx.set_namespace(self.namespace)
115 ioctx.write_full(obj, conf_block.encode('utf-8'))
116 if not config_obj:
117 # Return after creating empty common config object
118 return
119 log.debug("write configuration into rados object %s/%s/%s",
120 self.pool, self.namespace, obj)
121
122 # Add created obj url to common config obj
123 ioctx.append(config_obj, format_block(
124 self._create_url_block(obj)).encode('utf-8'))
125 _check_rados_notify(ioctx, config_obj)
126 log.debug("Added %s url to %s", obj, config_obj)
127
128 def read_obj(self, obj: str) -> Optional[str]:
129 with self.rados.open_ioctx(self.pool) as ioctx:
130 ioctx.set_namespace(self.namespace)
131 try:
132 return ioctx.read(obj, 1048576).decode()
133 except ObjectNotFound:
134 return None
135
136 def update_obj(self, conf_block: str, obj: str, config_obj: str,
137 should_notify: Optional[bool] = True) -> None:
138 with self.rados.open_ioctx(self.pool) as ioctx:
139 ioctx.set_namespace(self.namespace)
140 ioctx.write_full(obj, conf_block.encode('utf-8'))
141 log.debug("write configuration into rados object %s/%s/%s",
142 self.pool, self.namespace, obj)
143 if should_notify:
144 _check_rados_notify(ioctx, config_obj)
145 log.debug("Update export %s in %s", obj, config_obj)
146
147 def remove_obj(self, obj: str, config_obj: str) -> None:
148 with self.rados.open_ioctx(self.pool) as ioctx:
149 ioctx.set_namespace(self.namespace)
150 export_urls = ioctx.read(config_obj)
151 url = '%url "{}"\n\n'.format(self._make_rados_url(obj))
152 export_urls = export_urls.replace(url.encode('utf-8'), b'')
153 ioctx.remove_object(obj)
154 ioctx.write_full(config_obj, export_urls)
155 _check_rados_notify(ioctx, config_obj)
156 log.debug("Object deleted: %s", url)
157
158 def remove_all_obj(self) -> None:
159 with self.rados.open_ioctx(self.pool) as ioctx:
160 ioctx.set_namespace(self.namespace)
161 for obj in ioctx.list_objects():
162 obj.remove()
163
164 def check_user_config(self) -> bool:
165 with self.rados.open_ioctx(self.pool) as ioctx:
166 ioctx.set_namespace(self.namespace)
167 for obj in ioctx.list_objects():
168 if obj.key.startswith(USER_CONF_PREFIX):
169 return True
170 return False
171
172
173 def nfs_rados_configs(rados: 'Rados', nfs_pool: str = POOL_NAME) -> Set[str]:
174 """Return a set of all the namespaces in the nfs_pool where nfs
175 configuration objects are found. The namespaces also correspond
176 to the cluster ids.
177 """
178 ns: Set[str] = set()
179 prefixes = (EXPORT_PREFIX, CONF_PREFIX, USER_CONF_PREFIX)
180 with rados.open_ioctx(nfs_pool) as ioctx:
181 ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
182 for obj in ioctx.list_objects():
183 if obj.key.startswith(prefixes):
184 ns.add(obj.nspace)
185 return ns
186
187
188 class ExportMgr:
189 def __init__(
190 self,
191 mgr: 'Module',
192 export_ls: Optional[Dict[str, List[Export]]] = None
193 ) -> None:
194 self.mgr = mgr
195 self.rados_pool = POOL_NAME
196 self._exports: Optional[Dict[str, List[Export]]] = export_ls
197
198 @property
199 def exports(self) -> Dict[str, List[Export]]:
200 if self._exports is None:
201 self._exports = {}
202 log.info("Begin export parsing")
203 for cluster_id in known_cluster_ids(self.mgr):
204 self.export_conf_objs = [] # type: List[Export]
205 self._read_raw_config(cluster_id)
206 self._exports[cluster_id] = self.export_conf_objs
207 log.info("Exports parsed successfully %s", self.exports.items())
208 return self._exports
209
210 def _fetch_export(
211 self,
212 cluster_id: str,
213 pseudo_path: str
214 ) -> Optional[Export]:
215 try:
216 for ex in self.exports[cluster_id]:
217 if ex.pseudo == pseudo_path:
218 return ex
219 return None
220 except KeyError:
221 log.info('no exports for cluster %s', cluster_id)
222 return None
223
224 def _fetch_export_id(
225 self,
226 cluster_id: str,
227 export_id: int
228 ) -> Optional[Export]:
229 try:
230 for ex in self.exports[cluster_id]:
231 if ex.export_id == export_id:
232 return ex
233 return None
234 except KeyError:
235 log.info(f'no exports for cluster {cluster_id}')
236 return None
237
238 def _delete_export_user(self, export: Export) -> None:
239 if isinstance(export.fsal, CephFSFSAL):
240 assert export.fsal.user_id
241 self.mgr.check_mon_command({
242 'prefix': 'auth rm',
243 'entity': 'client.{}'.format(export.fsal.user_id),
244 })
245 log.info("Deleted export user %s", export.fsal.user_id)
246 elif isinstance(export.fsal, RGWFSAL):
247 # do nothing; we're using the bucket owner creds.
248 pass
249
250 def _create_export_user(self, export: Export) -> None:
251 if isinstance(export.fsal, CephFSFSAL):
252 fsal = cast(CephFSFSAL, export.fsal)
253 assert fsal.fs_name
254 fsal.user_id = f"nfs.{export.cluster_id}.{export.export_id}"
255 fsal.cephx_key = self._create_user_key(
256 export.cluster_id, fsal.user_id, export.path, fsal.fs_name
257 )
258 log.debug("Successfully created user %s for cephfs path %s", fsal.user_id, export.path)
259
260 elif isinstance(export.fsal, RGWFSAL):
261 rgwfsal = cast(RGWFSAL, export.fsal)
262 if not rgwfsal.user_id:
263 assert export.path
264 ret, out, err = self.mgr.tool_exec(
265 ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path]
266 )
267 if ret:
268 raise NFSException(f'Failed to fetch owner for bucket {export.path}')
269 j = json.loads(out)
270 owner = j.get('owner', '')
271 rgwfsal.user_id = owner
272 assert rgwfsal.user_id
273 ret, out, err = self.mgr.tool_exec([
274 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id
275 ])
276 if ret:
277 raise NFSException(
278 f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}'
279 )
280 j = json.loads(out)
281
282 # FIXME: make this more tolerate of unexpected output?
283 rgwfsal.access_key_id = j['keys'][0]['access_key']
284 rgwfsal.secret_access_key = j['keys'][0]['secret_key']
285 log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path)
286
287 def _gen_export_id(self, cluster_id: str) -> int:
288 exports = sorted([ex.export_id for ex in self.exports[cluster_id]])
289 nid = 1
290 for e_id in exports:
291 if e_id == nid:
292 nid += 1
293 else:
294 break
295 return nid
296
297 def _read_raw_config(self, rados_namespace: str) -> None:
298 with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
299 ioctx.set_namespace(rados_namespace)
300 for obj in ioctx.list_objects():
301 if obj.key.startswith(EXPORT_PREFIX):
302 size, _ = obj.stat()
303 raw_config = obj.read(size)
304 raw_config = raw_config.decode("utf-8")
305 log.debug("read export configuration from rados "
306 "object %s/%s/%s", self.rados_pool,
307 rados_namespace, obj.key)
308 self.export_conf_objs.append(Export.from_export_block(
309 GaneshaConfParser(raw_config).parse()[0], rados_namespace))
310
311 def _save_export(self, cluster_id: str, export: Export) -> None:
312 self.exports[cluster_id].append(export)
313 self._rados(cluster_id).write_obj(
314 format_block(export.to_export_block()),
315 export_obj_name(export.export_id),
316 conf_obj_name(export.cluster_id)
317 )
318
319 def _delete_export(
320 self,
321 cluster_id: str,
322 pseudo_path: Optional[str],
323 export_obj: Optional[Export] = None
324 ) -> Tuple[int, str, str]:
325 try:
326 if export_obj:
327 export: Optional[Export] = export_obj
328 else:
329 assert pseudo_path
330 export = self._fetch_export(cluster_id, pseudo_path)
331
332 if export:
333 if pseudo_path:
334 self._rados(cluster_id).remove_obj(
335 export_obj_name(export.export_id), conf_obj_name(cluster_id))
336 self.exports[cluster_id].remove(export)
337 self._delete_export_user(export)
338 if not self.exports[cluster_id]:
339 del self.exports[cluster_id]
340 log.debug("Deleted all exports for cluster %s", cluster_id)
341 return 0, "Successfully deleted export", ""
342 return 0, "", "Export does not exist"
343 except Exception as e:
344 return exception_handler(e, f"Failed to delete {pseudo_path} export for {cluster_id}")
345
346 def _fetch_export_obj(self, cluster_id: str, ex_id: int) -> Optional[Export]:
347 try:
348 with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
349 ioctx.set_namespace(cluster_id)
350 export = Export.from_export_block(
351 GaneshaConfParser(
352 ioctx.read(export_obj_name(ex_id)).decode("utf-8")
353 ).parse()[0],
354 cluster_id
355 )
356 return export
357 except ObjectNotFound:
358 log.exception("Export ID: %s not found", ex_id)
359 return None
360
361 def _update_export(self, cluster_id: str, export: Export,
362 need_nfs_service_restart: bool) -> None:
363 self.exports[cluster_id].append(export)
364 self._rados(cluster_id).update_obj(
365 format_block(export.to_export_block()),
366 export_obj_name(export.export_id), conf_obj_name(export.cluster_id),
367 should_notify=not need_nfs_service_restart)
368 if need_nfs_service_restart:
369 restart_nfs_service(self.mgr, export.cluster_id)
370
371 @export_cluster_checker
372 def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Tuple[int, str, str]:
373 # if addr(s) are provided, construct client list and adjust outer block
374 clients = []
375 if addr:
376 clients = [{
377 'addresses': addr,
378 'access_type': 'ro' if kwargs['read_only'] else 'rw',
379 'squash': kwargs['squash'],
380 }]
381 kwargs['squash'] = 'none'
382 kwargs['clients'] = clients
383
384 if clients:
385 kwargs['access_type'] = "none"
386 elif kwargs['read_only']:
387 kwargs['access_type'] = "RO"
388 else:
389 kwargs['access_type'] = "RW"
390
391 if kwargs['cluster_id'] not in self.exports:
392 self.exports[kwargs['cluster_id']] = []
393
394 try:
395 fsal_type = kwargs.pop('fsal_type')
396 if fsal_type == 'cephfs':
397 return self.create_cephfs_export(**kwargs)
398 if fsal_type == 'rgw':
399 return self.create_rgw_export(**kwargs)
400 raise NotImplementedError()
401 except Exception as e:
402 return exception_handler(e, f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
403
404 @export_cluster_checker
405 def delete_export(self,
406 cluster_id: str,
407 pseudo_path: str) -> Tuple[int, str, str]:
408 return self._delete_export(cluster_id, pseudo_path)
409
410 def delete_all_exports(self, cluster_id: str) -> None:
411 try:
412 export_list = list(self.exports[cluster_id])
413 except KeyError:
414 log.info("No exports to delete")
415 return
416 for export in export_list:
417 ret, out, err = self._delete_export(cluster_id=cluster_id, pseudo_path=None,
418 export_obj=export)
419 if ret != 0:
420 raise NFSException(f"Failed to delete exports: {err} and {ret}")
421 log.info("All exports successfully deleted for cluster id: %s", cluster_id)
422
423 def list_all_exports(self) -> List[Dict[str, Any]]:
424 r = []
425 for cluster_id, ls in self.exports.items():
426 r.extend([e.to_dict() for e in ls])
427 return r
428
429 @export_cluster_checker
430 def list_exports(self,
431 cluster_id: str,
432 detailed: bool = False) -> Tuple[int, str, str]:
433 try:
434 if detailed:
435 result_d = [export.to_dict() for export in self.exports[cluster_id]]
436 return 0, json.dumps(result_d, indent=2), ''
437 else:
438 result_ps = [export.pseudo for export in self.exports[cluster_id]]
439 return 0, json.dumps(result_ps, indent=2), ''
440
441 except KeyError:
442 log.warning("No exports to list for %s", cluster_id)
443 return 0, '', ''
444 except Exception as e:
445 return exception_handler(e, f"Failed to list exports for {cluster_id}")
446
447 def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]:
448 export = self._fetch_export(cluster_id, pseudo_path)
449 if export:
450 return export.to_dict()
451 log.warning(f"No {pseudo_path} export to show for {cluster_id}")
452 return None
453
454 @export_cluster_checker
455 def get_export(
456 self,
457 cluster_id: str,
458 pseudo_path: str,
459 ) -> Tuple[int, str, str]:
460 try:
461 export_dict = self._get_export_dict(cluster_id, pseudo_path)
462 if export_dict:
463 return 0, json.dumps(export_dict, indent=2), ''
464 log.warning("No %s export to show for %s", pseudo_path, cluster_id)
465 return 0, '', ''
466 except Exception as e:
467 return exception_handler(e, f"Failed to get {pseudo_path} export for {cluster_id}")
468
469 def get_export_by_id(
470 self,
471 cluster_id: str,
472 export_id: int
473 ) -> Optional[Dict[str, Any]]:
474 export = self._fetch_export_id(cluster_id, export_id)
475 return export.to_dict() if export else None
476
477 def get_export_by_pseudo(
478 self,
479 cluster_id: str,
480 pseudo_path: str
481 ) -> Optional[Dict[str, Any]]:
482 export = self._fetch_export(cluster_id, pseudo_path)
483 return export.to_dict() if export else None
484
485 def apply_export(self, cluster_id: str, export_config: str) -> Tuple[int, str, str]:
486 try:
487 if not export_config:
488 raise NFSInvalidOperation("Empty Config!!")
489 try:
490 j = json.loads(export_config)
491 except ValueError:
492 # okay, not JSON. is it an EXPORT block?
493 try:
494 blocks = GaneshaConfParser(export_config).parse()
495 exports = [
496 Export.from_export_block(block, cluster_id)
497 for block in blocks
498 ]
499 j = [export.to_dict() for export in exports]
500 except Exception as ex:
501 raise NFSInvalidOperation(f"Input must be JSON or a ganesha EXPORT block: {ex}")
502
503 # check export type
504 if isinstance(j, list):
505 ret, out, err = (0, '', '')
506 for export in j:
507 try:
508 r, o, e = self._apply_export(cluster_id, export)
509 except Exception as ex:
510 r, o, e = exception_handler(ex, f'Failed to apply export: {ex}')
511 if r:
512 ret = r
513 if o:
514 out += o + '\n'
515 if e:
516 err += e + '\n'
517 return ret, out, err
518 else:
519 r, o, e = self._apply_export(cluster_id, j)
520 return r, o, e
521 except NotImplementedError:
522 return 0, " Manual Restart of NFS PODS required for successful update of exports", ""
523 except Exception as e:
524 return exception_handler(e, f'Failed to update export: {e}')
525
526 def _update_user_id(
527 self,
528 cluster_id: str,
529 path: str,
530 fs_name: str,
531 user_id: str
532 ) -> None:
533 osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
534 self.rados_pool, cluster_id, fs_name)
535 # NFS-Ganesha can dynamically enforce an export's access type changes, but Ceph server
536 # daemons can't dynamically enforce changes in Ceph user caps of the Ceph clients. To
537 # allow dynamic updates of CephFS NFS exports, always set FSAL Ceph user's MDS caps with
538 # path restricted read-write access. Rely on the ganesha servers to enforce the export
539 # access type requested for the NFS clients.
540 self.mgr.check_mon_command({
541 'prefix': 'auth caps',
542 'entity': f'client.{user_id}',
543 'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow rw path={}'.format(path)],
544 })
545
546 log.info("Export user updated %s", user_id)
547
548 def _create_user_key(
549 self,
550 cluster_id: str,
551 entity: str,
552 path: str,
553 fs_name: str,
554 ) -> str:
555 osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
556 self.rados_pool, cluster_id, fs_name)
557 nfs_caps = [
558 'mon', 'allow r',
559 'osd', osd_cap,
560 'mds', 'allow rw path={}'.format(path)
561 ]
562
563 ret, out, err = self.mgr.mon_command({
564 'prefix': 'auth get-or-create',
565 'entity': 'client.{}'.format(entity),
566 'caps': nfs_caps,
567 'format': 'json',
568 })
569 if ret == -errno.EINVAL and 'does not match' in err:
570 ret, out, err = self.mgr.mon_command({
571 'prefix': 'auth caps',
572 'entity': 'client.{}'.format(entity),
573 'caps': nfs_caps,
574 'format': 'json',
575 })
576 if err:
577 raise NFSException(f'Failed to update caps for {entity}: {err}')
578 ret, out, err = self.mgr.mon_command({
579 'prefix': 'auth get',
580 'entity': 'client.{}'.format(entity),
581 'format': 'json',
582 })
583 if err:
584 raise NFSException(f'Failed to fetch caps for {entity}: {err}')
585
586 json_res = json.loads(out)
587 log.info("Export user created is %s", json_res[0]['entity'])
588 return json_res[0]['key']
589
590 def create_export_from_dict(self,
591 cluster_id: str,
592 ex_id: int,
593 ex_dict: Dict[str, Any]) -> Export:
594 pseudo_path = ex_dict.get("pseudo")
595 if not pseudo_path:
596 raise NFSInvalidOperation("export must specify pseudo path")
597
598 path = ex_dict.get("path")
599 if path is None:
600 raise NFSInvalidOperation("export must specify path")
601 path = normalize_path(path)
602
603 fsal = ex_dict.get("fsal", {})
604 fsal_type = fsal.get("name")
605 if fsal_type == NFS_GANESHA_SUPPORTED_FSALS[1]:
606 if '/' in path and path != '/':
607 raise NFSInvalidOperation('"/" is not allowed in path with bucket name')
608 elif fsal_type == NFS_GANESHA_SUPPORTED_FSALS[0]:
609 fs_name = fsal.get("fs_name")
610 if not fs_name:
611 raise NFSInvalidOperation("export FSAL must specify fs_name")
612 if not check_fs(self.mgr, fs_name):
613 raise FSNotFound(fs_name)
614
615 user_id = f"nfs.{cluster_id}.{ex_id}"
616 if "user_id" in fsal and fsal["user_id"] != user_id:
617 raise NFSInvalidOperation(f"export FSAL user_id must be '{user_id}'")
618 else:
619 raise NFSInvalidOperation(f"NFS Ganesha supported FSALs are {NFS_GANESHA_SUPPORTED_FSALS}."
620 "Export must specify any one of it.")
621
622 ex_dict["fsal"] = fsal
623 ex_dict["cluster_id"] = cluster_id
624 export = Export.from_dict(ex_id, ex_dict)
625 export.validate(self.mgr)
626 log.debug("Successfully created %s export-%s from dict for cluster %s",
627 fsal_type, ex_id, cluster_id)
628 return export
629
630 def create_cephfs_export(self,
631 fs_name: str,
632 cluster_id: str,
633 pseudo_path: str,
634 read_only: bool,
635 path: str,
636 squash: str,
637 access_type: str,
638 clients: list = [],
639 sectype: Optional[List[str]] = None) -> Tuple[int, str, str]:
640 pseudo_path = normalize_path(pseudo_path)
641
642 if not self._fetch_export(cluster_id, pseudo_path):
643 export = self.create_export_from_dict(
644 cluster_id,
645 self._gen_export_id(cluster_id),
646 {
647 "pseudo": pseudo_path,
648 "path": path,
649 "access_type": access_type,
650 "squash": squash,
651 "fsal": {
652 "name": NFS_GANESHA_SUPPORTED_FSALS[0],
653 "fs_name": fs_name,
654 },
655 "clients": clients,
656 "sectype": sectype,
657 }
658 )
659 log.debug("creating cephfs export %s", export)
660 self._create_export_user(export)
661 self._save_export(cluster_id, export)
662 result = {
663 "bind": export.pseudo,
664 "fs": fs_name,
665 "path": export.path,
666 "cluster": cluster_id,
667 "mode": export.access_type,
668 }
669 return (0, json.dumps(result, indent=4), '')
670 return 0, "", "Export already exists"
671
672 def create_rgw_export(self,
673 cluster_id: str,
674 pseudo_path: str,
675 access_type: str,
676 read_only: bool,
677 squash: str,
678 bucket: Optional[str] = None,
679 user_id: Optional[str] = None,
680 clients: list = [],
681 sectype: Optional[List[str]] = None) -> Tuple[int, str, str]:
682 pseudo_path = normalize_path(pseudo_path)
683
684 if not bucket and not user_id:
685 return -errno.EINVAL, "", "Must specify either bucket or user_id"
686
687 if not self._fetch_export(cluster_id, pseudo_path):
688 export = self.create_export_from_dict(
689 cluster_id,
690 self._gen_export_id(cluster_id),
691 {
692 "pseudo": pseudo_path,
693 "path": bucket or '/',
694 "access_type": access_type,
695 "squash": squash,
696 "fsal": {
697 "name": NFS_GANESHA_SUPPORTED_FSALS[1],
698 "user_id": user_id,
699 },
700 "clients": clients,
701 "sectype": sectype,
702 }
703 )
704 log.debug("creating rgw export %s", export)
705 self._create_export_user(export)
706 self._save_export(cluster_id, export)
707 result = {
708 "bind": export.pseudo,
709 "path": export.path,
710 "cluster": cluster_id,
711 "mode": export.access_type,
712 "squash": export.squash,
713 }
714 return (0, json.dumps(result, indent=4), '')
715 return 0, "", "Export already exists"
716
717 def _apply_export(
718 self,
719 cluster_id: str,
720 new_export_dict: Dict,
721 ) -> Tuple[int, str, str]:
722 for k in ['path', 'pseudo']:
723 if k not in new_export_dict:
724 raise NFSInvalidOperation(f'Export missing required field {k}')
725 if cluster_id not in self.exports:
726 self.exports[cluster_id] = []
727
728 new_export_dict['path'] = normalize_path(new_export_dict['path'])
729 new_export_dict['pseudo'] = normalize_path(new_export_dict['pseudo'])
730
731 old_export = self._fetch_export(cluster_id, new_export_dict['pseudo'])
732 if old_export:
733 # Check if export id matches
734 if new_export_dict.get('export_id'):
735 if old_export.export_id != new_export_dict.get('export_id'):
736 raise NFSInvalidOperation('Export ID changed, Cannot update export')
737 else:
738 new_export_dict['export_id'] = old_export.export_id
739 elif new_export_dict.get('export_id'):
740 old_export = self._fetch_export_obj(cluster_id, new_export_dict['export_id'])
741 if old_export:
742 # re-fetch via old pseudo
743 old_export = self._fetch_export(cluster_id, old_export.pseudo)
744 assert old_export
745 log.debug("export %s pseudo %s -> %s",
746 old_export.export_id, old_export.pseudo, new_export_dict['pseudo'])
747
748 new_export = self.create_export_from_dict(
749 cluster_id,
750 new_export_dict.get('export_id', self._gen_export_id(cluster_id)),
751 new_export_dict
752 )
753
754 if not old_export:
755 self._create_export_user(new_export)
756 self._save_export(cluster_id, new_export)
757 return 0, f'Added export {new_export.pseudo}', ''
758
759 need_nfs_service_restart = True
760 if old_export.fsal.name != new_export.fsal.name:
761 raise NFSInvalidOperation('FSAL change not allowed')
762 if old_export.pseudo != new_export.pseudo:
763 log.debug('export %s pseudo %s -> %s',
764 new_export.export_id, old_export.pseudo, new_export.pseudo)
765
766 if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]:
767 old_fsal = cast(CephFSFSAL, old_export.fsal)
768 new_fsal = cast(CephFSFSAL, new_export.fsal)
769 if old_fsal.user_id != new_fsal.user_id:
770 self._delete_export_user(old_export)
771 self._create_export_user(new_export)
772 elif (
773 old_export.path != new_export.path
774 or old_fsal.fs_name != new_fsal.fs_name
775 ):
776 self._update_user_id(
777 cluster_id,
778 new_export.path,
779 cast(str, new_fsal.fs_name),
780 cast(str, new_fsal.user_id)
781 )
782 new_fsal.cephx_key = old_fsal.cephx_key
783 else:
784 expected_mds_caps = 'allow rw path={}'.format(new_export.path)
785 entity = new_fsal.user_id
786 ret, out, err = self.mgr.mon_command({
787 'prefix': 'auth get',
788 'entity': 'client.{}'.format(entity),
789 'format': 'json',
790 })
791 if ret:
792 raise NFSException(f'Failed to fetch caps for {entity}: {err}')
793 actual_mds_caps = json.loads(out)[0]['caps'].get('mds')
794 if actual_mds_caps != expected_mds_caps:
795 self._update_user_id(
796 cluster_id,
797 new_export.path,
798 cast(str, new_fsal.fs_name),
799 cast(str, new_fsal.user_id)
800 )
801 elif old_export.pseudo == new_export.pseudo:
802 need_nfs_service_restart = False
803 new_fsal.cephx_key = old_fsal.cephx_key
804
805 if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]:
806 old_rgw_fsal = cast(RGWFSAL, old_export.fsal)
807 new_rgw_fsal = cast(RGWFSAL, new_export.fsal)
808 if old_rgw_fsal.user_id != new_rgw_fsal.user_id:
809 self._delete_export_user(old_export)
810 self._create_export_user(new_export)
811 elif old_rgw_fsal.access_key_id != new_rgw_fsal.access_key_id:
812 raise NFSInvalidOperation('access_key_id change is not allowed')
813 elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key:
814 raise NFSInvalidOperation('secret_access_key change is not allowed')
815
816 self.exports[cluster_id].remove(old_export)
817
818 self._update_export(cluster_id, new_export, need_nfs_service_restart)
819
820 return 0, f"Updated export {new_export.pseudo}", ""
821
822 def _rados(self, cluster_id: str) -> NFSRados:
823 """Return a new NFSRados object for the given cluster id."""
824 return NFSRados(self.mgr.rados, cluster_id)