]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/nfs/export.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / pybind / mgr / nfs / export.py
1 import errno
2 import json
3 import logging
4 from typing import (
5 List,
6 Any,
7 Dict,
8 Optional,
9 TYPE_CHECKING,
10 TypeVar,
11 Callable,
12 Set,
13 cast)
14 from os.path import normpath
15 import cephfs
16
17 from rados import TimedOut, ObjectNotFound, Rados, LIBRADOS_ALL_NSPACES
18
19 from object_format import ErrorResponse
20 from orchestrator import NoOrchestrator
21 from mgr_module import NFS_POOL_NAME as POOL_NAME, NFS_GANESHA_SUPPORTED_FSALS
22
23 from .ganesha_conf import (
24 CephFSFSAL,
25 Export,
26 GaneshaConfParser,
27 RGWFSAL,
28 RawBlock,
29 format_block)
30 from .exception import NFSException, NFSInvalidOperation, FSNotFound, NFSObjectNotFound
31 from .utils import (
32 CONF_PREFIX,
33 EXPORT_PREFIX,
34 NonFatalError,
35 USER_CONF_PREFIX,
36 export_obj_name,
37 conf_obj_name,
38 available_clusters,
39 check_fs,
40 restart_nfs_service, cephfs_path_is_dir)
41
42 if TYPE_CHECKING:
43 from nfs.module import Module
44
45 FuncT = TypeVar('FuncT', bound=Callable)
46
47 log = logging.getLogger(__name__)
48
49
50 def known_cluster_ids(mgr: 'Module') -> Set[str]:
51 """Return the set of known cluster IDs."""
52 try:
53 clusters = set(available_clusters(mgr))
54 except NoOrchestrator:
55 clusters = nfs_rados_configs(mgr.rados)
56 return clusters
57
58
59 def _check_rados_notify(ioctx: Any, obj: str) -> None:
60 try:
61 ioctx.notify(obj)
62 except TimedOut:
63 log.exception("Ganesha timed out")
64
65
66 def normalize_path(path: str) -> str:
67 if path:
68 path = normpath(path.strip())
69 if path[:2] == "//":
70 path = path[1:]
71 return path
72
73
74 class NFSRados:
75 def __init__(self, rados: 'Rados', namespace: str) -> None:
76 self.rados = rados
77 self.pool = POOL_NAME
78 self.namespace = namespace
79
80 def _make_rados_url(self, obj: str) -> str:
81 return "rados://{}/{}/{}".format(self.pool, self.namespace, obj)
82
83 def _create_url_block(self, obj_name: str) -> RawBlock:
84 return RawBlock('%url', values={'value': self._make_rados_url(obj_name)})
85
86 def write_obj(self, conf_block: str, obj: str, config_obj: str = '') -> None:
87 with self.rados.open_ioctx(self.pool) as ioctx:
88 ioctx.set_namespace(self.namespace)
89 ioctx.write_full(obj, conf_block.encode('utf-8'))
90 if not config_obj:
91 # Return after creating empty common config object
92 return
93 log.debug("write configuration into rados object %s/%s/%s",
94 self.pool, self.namespace, obj)
95
96 # Add created obj url to common config obj
97 ioctx.append(config_obj, format_block(
98 self._create_url_block(obj)).encode('utf-8'))
99 _check_rados_notify(ioctx, config_obj)
100 log.debug("Added %s url to %s", obj, config_obj)
101
102 def read_obj(self, obj: str) -> Optional[str]:
103 with self.rados.open_ioctx(self.pool) as ioctx:
104 ioctx.set_namespace(self.namespace)
105 try:
106 return ioctx.read(obj, 1048576).decode()
107 except ObjectNotFound:
108 return None
109
110 def update_obj(self, conf_block: str, obj: str, config_obj: str,
111 should_notify: Optional[bool] = True) -> None:
112 with self.rados.open_ioctx(self.pool) as ioctx:
113 ioctx.set_namespace(self.namespace)
114 ioctx.write_full(obj, conf_block.encode('utf-8'))
115 log.debug("write configuration into rados object %s/%s/%s",
116 self.pool, self.namespace, obj)
117 if should_notify:
118 _check_rados_notify(ioctx, config_obj)
119 log.debug("Update export %s in %s", obj, config_obj)
120
121 def remove_obj(self, obj: str, config_obj: str) -> None:
122 with self.rados.open_ioctx(self.pool) as ioctx:
123 ioctx.set_namespace(self.namespace)
124 export_urls = ioctx.read(config_obj)
125 url = '%url "{}"\n\n'.format(self._make_rados_url(obj))
126 export_urls = export_urls.replace(url.encode('utf-8'), b'')
127 ioctx.remove_object(obj)
128 ioctx.write_full(config_obj, export_urls)
129 _check_rados_notify(ioctx, config_obj)
130 log.debug("Object deleted: %s", url)
131
132 def remove_all_obj(self) -> None:
133 with self.rados.open_ioctx(self.pool) as ioctx:
134 ioctx.set_namespace(self.namespace)
135 for obj in ioctx.list_objects():
136 obj.remove()
137
138 def check_user_config(self) -> bool:
139 with self.rados.open_ioctx(self.pool) as ioctx:
140 ioctx.set_namespace(self.namespace)
141 for obj in ioctx.list_objects():
142 if obj.key.startswith(USER_CONF_PREFIX):
143 return True
144 return False
145
146
147 def nfs_rados_configs(rados: 'Rados', nfs_pool: str = POOL_NAME) -> Set[str]:
148 """Return a set of all the namespaces in the nfs_pool where nfs
149 configuration objects are found. The namespaces also correspond
150 to the cluster ids.
151 """
152 ns: Set[str] = set()
153 prefixes = (EXPORT_PREFIX, CONF_PREFIX, USER_CONF_PREFIX)
154 with rados.open_ioctx(nfs_pool) as ioctx:
155 ioctx.set_namespace(LIBRADOS_ALL_NSPACES)
156 for obj in ioctx.list_objects():
157 if obj.key.startswith(prefixes):
158 ns.add(obj.nspace)
159 return ns
160
161
162 class AppliedExportResults:
163 """Gathers the results of multiple changed exports.
164 Returned by apply_export.
165 """
166
167 def __init__(self) -> None:
168 self.changes: List[Dict[str, str]] = []
169 self.has_error = False
170
171 def append(self, value: Dict[str, str]) -> None:
172 if value.get("state", "") == "error":
173 self.has_error = True
174 self.changes.append(value)
175
176 def to_simplified(self) -> List[Dict[str, str]]:
177 return self.changes
178
179 def mgr_return_value(self) -> int:
180 return -errno.EIO if self.has_error else 0
181
182
183 class ExportMgr:
184 def __init__(
185 self,
186 mgr: 'Module',
187 export_ls: Optional[Dict[str, List[Export]]] = None
188 ) -> None:
189 self.mgr = mgr
190 self.rados_pool = POOL_NAME
191 self._exports: Optional[Dict[str, List[Export]]] = export_ls
192
193 @property
194 def exports(self) -> Dict[str, List[Export]]:
195 if self._exports is None:
196 self._exports = {}
197 log.info("Begin export parsing")
198 for cluster_id in known_cluster_ids(self.mgr):
199 self.export_conf_objs = [] # type: List[Export]
200 self._read_raw_config(cluster_id)
201 self._exports[cluster_id] = self.export_conf_objs
202 log.info("Exports parsed successfully %s", self.exports.items())
203 return self._exports
204
205 def _fetch_export(
206 self,
207 cluster_id: str,
208 pseudo_path: str
209 ) -> Optional[Export]:
210 try:
211 for ex in self.exports[cluster_id]:
212 if ex.pseudo == pseudo_path:
213 return ex
214 return None
215 except KeyError:
216 log.info('no exports for cluster %s', cluster_id)
217 return None
218
219 def _fetch_export_id(
220 self,
221 cluster_id: str,
222 export_id: int
223 ) -> Optional[Export]:
224 try:
225 for ex in self.exports[cluster_id]:
226 if ex.export_id == export_id:
227 return ex
228 return None
229 except KeyError:
230 log.info(f'no exports for cluster {cluster_id}')
231 return None
232
233 def _delete_export_user(self, export: Export) -> None:
234 if isinstance(export.fsal, CephFSFSAL):
235 assert export.fsal.user_id
236 self.mgr.check_mon_command({
237 'prefix': 'auth rm',
238 'entity': 'client.{}'.format(export.fsal.user_id),
239 })
240 log.info("Deleted export user %s", export.fsal.user_id)
241 elif isinstance(export.fsal, RGWFSAL):
242 # do nothing; we're using the bucket owner creds.
243 pass
244
245 def _create_export_user(self, export: Export) -> None:
246 if isinstance(export.fsal, CephFSFSAL):
247 fsal = cast(CephFSFSAL, export.fsal)
248 assert fsal.fs_name
249 fsal.user_id = f"nfs.{export.cluster_id}.{export.export_id}"
250 fsal.cephx_key = self._create_user_key(
251 export.cluster_id, fsal.user_id, export.path, fsal.fs_name
252 )
253 log.debug("Successfully created user %s for cephfs path %s", fsal.user_id, export.path)
254
255 elif isinstance(export.fsal, RGWFSAL):
256 rgwfsal = cast(RGWFSAL, export.fsal)
257 if not rgwfsal.user_id:
258 assert export.path
259 ret, out, err = self.mgr.tool_exec(
260 ['radosgw-admin', 'bucket', 'stats', '--bucket', export.path]
261 )
262 if ret:
263 raise NFSException(f'Failed to fetch owner for bucket {export.path}')
264 j = json.loads(out)
265 owner = j.get('owner', '')
266 rgwfsal.user_id = owner
267 assert rgwfsal.user_id
268 ret, out, err = self.mgr.tool_exec([
269 'radosgw-admin', 'user', 'info', '--uid', rgwfsal.user_id
270 ])
271 if ret:
272 raise NFSException(
273 f'Failed to fetch key for bucket {export.path} owner {rgwfsal.user_id}'
274 )
275 j = json.loads(out)
276
277 # FIXME: make this more tolerate of unexpected output?
278 rgwfsal.access_key_id = j['keys'][0]['access_key']
279 rgwfsal.secret_access_key = j['keys'][0]['secret_key']
280 log.debug("Successfully fetched user %s for RGW path %s", rgwfsal.user_id, export.path)
281
282 def _gen_export_id(self, cluster_id: str) -> int:
283 exports = sorted([ex.export_id for ex in self.exports[cluster_id]])
284 nid = 1
285 for e_id in exports:
286 if e_id == nid:
287 nid += 1
288 else:
289 break
290 return nid
291
292 def _read_raw_config(self, rados_namespace: str) -> None:
293 with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
294 ioctx.set_namespace(rados_namespace)
295 for obj in ioctx.list_objects():
296 if obj.key.startswith(EXPORT_PREFIX):
297 size, _ = obj.stat()
298 raw_config = obj.read(size)
299 raw_config = raw_config.decode("utf-8")
300 log.debug("read export configuration from rados "
301 "object %s/%s/%s", self.rados_pool,
302 rados_namespace, obj.key)
303 self.export_conf_objs.append(Export.from_export_block(
304 GaneshaConfParser(raw_config).parse()[0], rados_namespace))
305
306 def _save_export(self, cluster_id: str, export: Export) -> None:
307 self.exports[cluster_id].append(export)
308 self._rados(cluster_id).write_obj(
309 format_block(export.to_export_block()),
310 export_obj_name(export.export_id),
311 conf_obj_name(export.cluster_id)
312 )
313
314 def _delete_export(
315 self,
316 cluster_id: str,
317 pseudo_path: Optional[str],
318 export_obj: Optional[Export] = None
319 ) -> None:
320 try:
321 if export_obj:
322 export: Optional[Export] = export_obj
323 else:
324 assert pseudo_path
325 export = self._fetch_export(cluster_id, pseudo_path)
326
327 if export:
328 if pseudo_path:
329 self._rados(cluster_id).remove_obj(
330 export_obj_name(export.export_id), conf_obj_name(cluster_id))
331 self.exports[cluster_id].remove(export)
332 self._delete_export_user(export)
333 if not self.exports[cluster_id]:
334 del self.exports[cluster_id]
335 log.debug("Deleted all exports for cluster %s", cluster_id)
336 return None
337 raise NonFatalError("Export does not exist")
338 except Exception as e:
339 log.exception(f"Failed to delete {pseudo_path} export for {cluster_id}")
340 raise ErrorResponse.wrap(e)
341
342 def _fetch_export_obj(self, cluster_id: str, ex_id: int) -> Optional[Export]:
343 try:
344 with self.mgr.rados.open_ioctx(self.rados_pool) as ioctx:
345 ioctx.set_namespace(cluster_id)
346 export = Export.from_export_block(
347 GaneshaConfParser(
348 ioctx.read(export_obj_name(ex_id)).decode("utf-8")
349 ).parse()[0],
350 cluster_id
351 )
352 return export
353 except ObjectNotFound:
354 log.exception("Export ID: %s not found", ex_id)
355 return None
356
357 def _update_export(self, cluster_id: str, export: Export,
358 need_nfs_service_restart: bool) -> None:
359 self.exports[cluster_id].append(export)
360 self._rados(cluster_id).update_obj(
361 format_block(export.to_export_block()),
362 export_obj_name(export.export_id), conf_obj_name(export.cluster_id),
363 should_notify=not need_nfs_service_restart)
364 if need_nfs_service_restart:
365 restart_nfs_service(self.mgr, export.cluster_id)
366
367 def _validate_cluster_id(self, cluster_id: str) -> None:
368 """Raise an exception if cluster_id is not valid."""
369 clusters = known_cluster_ids(self.mgr)
370 log.debug("checking for %r in known nfs clusters: %r",
371 cluster_id, clusters)
372 if cluster_id not in clusters:
373 raise ErrorResponse(f"Cluster {cluster_id!r} does not exist",
374 return_value=-errno.ENOENT)
375
376 def create_export(self, addr: Optional[List[str]] = None, **kwargs: Any) -> Dict[str, Any]:
377 self._validate_cluster_id(kwargs['cluster_id'])
378 # if addr(s) are provided, construct client list and adjust outer block
379 clients = []
380 if addr:
381 clients = [{
382 'addresses': addr,
383 'access_type': 'ro' if kwargs['read_only'] else 'rw',
384 'squash': kwargs['squash'],
385 }]
386 kwargs['squash'] = 'none'
387 kwargs['clients'] = clients
388
389 if clients:
390 kwargs['access_type'] = "none"
391 elif kwargs['read_only']:
392 kwargs['access_type'] = "RO"
393 else:
394 kwargs['access_type'] = "RW"
395
396 if kwargs['cluster_id'] not in self.exports:
397 self.exports[kwargs['cluster_id']] = []
398
399 try:
400 fsal_type = kwargs.pop('fsal_type')
401 if fsal_type == 'cephfs':
402 return self.create_cephfs_export(**kwargs)
403 if fsal_type == 'rgw':
404 return self.create_rgw_export(**kwargs)
405 raise NotImplementedError()
406 except Exception as e:
407 log.exception(
408 f"Failed to create {kwargs['pseudo_path']} export for {kwargs['cluster_id']}")
409 raise ErrorResponse.wrap(e)
410
411 def delete_export(self,
412 cluster_id: str,
413 pseudo_path: str) -> None:
414 self._validate_cluster_id(cluster_id)
415 return self._delete_export(cluster_id, pseudo_path)
416
417 def delete_all_exports(self, cluster_id: str) -> None:
418 try:
419 export_list = list(self.exports[cluster_id])
420 except KeyError:
421 log.info("No exports to delete")
422 return
423 for export in export_list:
424 try:
425 self._delete_export(cluster_id=cluster_id, pseudo_path=None,
426 export_obj=export)
427 except Exception as e:
428 raise NFSException(f"Failed to delete export {export.export_id}: {e}")
429 log.info("All exports successfully deleted for cluster id: %s", cluster_id)
430
431 def list_all_exports(self) -> List[Dict[str, Any]]:
432 r = []
433 for cluster_id, ls in self.exports.items():
434 r.extend([e.to_dict() for e in ls])
435 return r
436
437 def list_exports(self,
438 cluster_id: str,
439 detailed: bool = False) -> List[Any]:
440 self._validate_cluster_id(cluster_id)
441 try:
442 if detailed:
443 result_d = [export.to_dict() for export in self.exports[cluster_id]]
444 return result_d
445 else:
446 result_ps = [export.pseudo for export in self.exports[cluster_id]]
447 return result_ps
448
449 except KeyError:
450 log.warning("No exports to list for %s", cluster_id)
451 return []
452 except Exception as e:
453 log.exception(f"Failed to list exports for {cluster_id}")
454 raise ErrorResponse.wrap(e)
455
456 def _get_export_dict(self, cluster_id: str, pseudo_path: str) -> Optional[Dict[str, Any]]:
457 export = self._fetch_export(cluster_id, pseudo_path)
458 if export:
459 return export.to_dict()
460 log.warning(f"No {pseudo_path} export to show for {cluster_id}")
461 return None
462
463 def get_export(
464 self,
465 cluster_id: str,
466 pseudo_path: str,
467 ) -> Dict[str, Any]:
468 self._validate_cluster_id(cluster_id)
469 try:
470 export_dict = self._get_export_dict(cluster_id, pseudo_path)
471 log.info(f"Fetched {export_dict!r} for {cluster_id!r}, {pseudo_path!r}")
472 return export_dict if export_dict else {}
473 except Exception as e:
474 log.exception(f"Failed to get {pseudo_path} export for {cluster_id}")
475 raise ErrorResponse.wrap(e)
476
477 def get_export_by_id(
478 self,
479 cluster_id: str,
480 export_id: int
481 ) -> Optional[Dict[str, Any]]:
482 export = self._fetch_export_id(cluster_id, export_id)
483 return export.to_dict() if export else None
484
485 def get_export_by_pseudo(
486 self,
487 cluster_id: str,
488 pseudo_path: str
489 ) -> Optional[Dict[str, Any]]:
490 export = self._fetch_export(cluster_id, pseudo_path)
491 return export.to_dict() if export else None
492
493 # This method is used by the dashboard module (../dashboard/controllers/nfs.py)
494 # Do not change interface without updating the Dashboard code
495 def apply_export(self, cluster_id: str, export_config: str) -> AppliedExportResults:
496 try:
497 exports = self._read_export_config(cluster_id, export_config)
498 except Exception as e:
499 log.exception(f'Failed to update export: {e}')
500 raise ErrorResponse.wrap(e)
501
502 aeresults = AppliedExportResults()
503 for export in exports:
504 aeresults.append(self._change_export(cluster_id, export))
505 return aeresults
506
507 def _read_export_config(self, cluster_id: str, export_config: str) -> List[Dict]:
508 if not export_config:
509 raise NFSInvalidOperation("Empty Config!!")
510 try:
511 j = json.loads(export_config)
512 except ValueError:
513 # okay, not JSON. is it an EXPORT block?
514 try:
515 blocks = GaneshaConfParser(export_config).parse()
516 exports = [
517 Export.from_export_block(block, cluster_id)
518 for block in blocks
519 ]
520 j = [export.to_dict() for export in exports]
521 except Exception as ex:
522 raise NFSInvalidOperation(f"Input must be JSON or a ganesha EXPORT block: {ex}")
523 # check export type - always return a list
524 if isinstance(j, list):
525 return j # j is already a list object
526 return [j] # return a single object list, with j as the only item
527
528 def _change_export(self, cluster_id: str, export: Dict) -> Dict[str, str]:
529 try:
530 return self._apply_export(cluster_id, export)
531 except NotImplementedError:
532 # in theory, the NotImplementedError here may be raised by a hook back to
533 # an orchestration module. If the orchestration module supports it the NFS
534 # servers may be restarted. If not supported the expectation is that an
535 # (unfortunately generic) NotImplementedError will be raised. We then
536 # indicate to the user that manual intervention may be needed now that the
537 # configuration changes have been applied.
538 return {
539 "pseudo": export['pseudo'],
540 "state": "warning",
541 "msg": "changes applied (Manual restart of NFS Pods required)",
542 }
543 except Exception as ex:
544 msg = f'Failed to apply export: {ex}'
545 log.exception(msg)
546 return {"state": "error", "msg": msg}
547
548 def _update_user_id(
549 self,
550 cluster_id: str,
551 path: str,
552 fs_name: str,
553 user_id: str
554 ) -> None:
555 osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
556 self.rados_pool, cluster_id, fs_name)
557 # NFS-Ganesha can dynamically enforce an export's access type changes, but Ceph server
558 # daemons can't dynamically enforce changes in Ceph user caps of the Ceph clients. To
559 # allow dynamic updates of CephFS NFS exports, always set FSAL Ceph user's MDS caps with
560 # path restricted read-write access. Rely on the ganesha servers to enforce the export
561 # access type requested for the NFS clients.
562 self.mgr.check_mon_command({
563 'prefix': 'auth caps',
564 'entity': f'client.{user_id}',
565 'caps': ['mon', 'allow r', 'osd', osd_cap, 'mds', 'allow rw path={}'.format(path)],
566 })
567
568 log.info("Export user updated %s", user_id)
569
570 def _create_user_key(
571 self,
572 cluster_id: str,
573 entity: str,
574 path: str,
575 fs_name: str,
576 ) -> str:
577 osd_cap = 'allow rw pool={} namespace={}, allow rw tag cephfs data={}'.format(
578 self.rados_pool, cluster_id, fs_name)
579 nfs_caps = [
580 'mon', 'allow r',
581 'osd', osd_cap,
582 'mds', 'allow rw path={}'.format(path)
583 ]
584
585 ret, out, err = self.mgr.mon_command({
586 'prefix': 'auth get-or-create',
587 'entity': 'client.{}'.format(entity),
588 'caps': nfs_caps,
589 'format': 'json',
590 })
591 if ret == -errno.EINVAL and 'does not match' in err:
592 ret, out, err = self.mgr.mon_command({
593 'prefix': 'auth caps',
594 'entity': 'client.{}'.format(entity),
595 'caps': nfs_caps,
596 'format': 'json',
597 })
598 if err:
599 raise NFSException(f'Failed to update caps for {entity}: {err}')
600 ret, out, err = self.mgr.mon_command({
601 'prefix': 'auth get',
602 'entity': 'client.{}'.format(entity),
603 'format': 'json',
604 })
605 if err:
606 raise NFSException(f'Failed to fetch caps for {entity}: {err}')
607
608 json_res = json.loads(out)
609 log.info("Export user created is %s", json_res[0]['entity'])
610 return json_res[0]['key']
611
612 def create_export_from_dict(self,
613 cluster_id: str,
614 ex_id: int,
615 ex_dict: Dict[str, Any]) -> Export:
616 pseudo_path = ex_dict.get("pseudo")
617 if not pseudo_path:
618 raise NFSInvalidOperation("export must specify pseudo path")
619
620 path = ex_dict.get("path")
621 if path is None:
622 raise NFSInvalidOperation("export must specify path")
623 path = normalize_path(path)
624
625 fsal = ex_dict.get("fsal", {})
626 fsal_type = fsal.get("name")
627 if fsal_type == NFS_GANESHA_SUPPORTED_FSALS[1]:
628 if '/' in path and path != '/':
629 raise NFSInvalidOperation('"/" is not allowed in path with bucket name')
630 elif fsal_type == NFS_GANESHA_SUPPORTED_FSALS[0]:
631 fs_name = fsal.get("fs_name")
632 if not fs_name:
633 raise NFSInvalidOperation("export FSAL must specify fs_name")
634 if not check_fs(self.mgr, fs_name):
635 raise FSNotFound(fs_name)
636
637 user_id = f"nfs.{cluster_id}.{ex_id}"
638 if "user_id" in fsal and fsal["user_id"] != user_id:
639 raise NFSInvalidOperation(f"export FSAL user_id must be '{user_id}'")
640 else:
641 raise NFSInvalidOperation(f"NFS Ganesha supported FSALs are {NFS_GANESHA_SUPPORTED_FSALS}."
642 "Export must specify any one of it.")
643
644 ex_dict["fsal"] = fsal
645 ex_dict["cluster_id"] = cluster_id
646 export = Export.from_dict(ex_id, ex_dict)
647 export.validate(self.mgr)
648 log.debug("Successfully created %s export-%s from dict for cluster %s",
649 fsal_type, ex_id, cluster_id)
650 return export
651
652 def create_cephfs_export(self,
653 fs_name: str,
654 cluster_id: str,
655 pseudo_path: str,
656 read_only: bool,
657 path: str,
658 squash: str,
659 access_type: str,
660 clients: list = [],
661 sectype: Optional[List[str]] = None) -> Dict[str, Any]:
662
663 try:
664 cephfs_path_is_dir(self.mgr, fs_name, path)
665 except NotADirectoryError:
666 raise NFSException(f"path {path} is not a dir", -errno.ENOTDIR)
667 except cephfs.ObjectNotFound:
668 raise NFSObjectNotFound(f"path {path} does not exist")
669 except cephfs.Error as e:
670 raise NFSException(e.args[1], -e.args[0])
671
672 pseudo_path = normalize_path(pseudo_path)
673
674 if not self._fetch_export(cluster_id, pseudo_path):
675 export = self.create_export_from_dict(
676 cluster_id,
677 self._gen_export_id(cluster_id),
678 {
679 "pseudo": pseudo_path,
680 "path": path,
681 "access_type": access_type,
682 "squash": squash,
683 "fsal": {
684 "name": NFS_GANESHA_SUPPORTED_FSALS[0],
685 "fs_name": fs_name,
686 },
687 "clients": clients,
688 "sectype": sectype,
689 }
690 )
691 log.debug("creating cephfs export %s", export)
692 self._create_export_user(export)
693 self._save_export(cluster_id, export)
694 result = {
695 "bind": export.pseudo,
696 "fs": fs_name,
697 "path": export.path,
698 "cluster": cluster_id,
699 "mode": export.access_type,
700 }
701 return result
702 raise NonFatalError("Export already exists")
703
704 def create_rgw_export(self,
705 cluster_id: str,
706 pseudo_path: str,
707 access_type: str,
708 read_only: bool,
709 squash: str,
710 bucket: Optional[str] = None,
711 user_id: Optional[str] = None,
712 clients: list = [],
713 sectype: Optional[List[str]] = None) -> Dict[str, Any]:
714 pseudo_path = normalize_path(pseudo_path)
715
716 if not bucket and not user_id:
717 raise ErrorResponse("Must specify either bucket or user_id")
718
719 if not self._fetch_export(cluster_id, pseudo_path):
720 export = self.create_export_from_dict(
721 cluster_id,
722 self._gen_export_id(cluster_id),
723 {
724 "pseudo": pseudo_path,
725 "path": bucket or '/',
726 "access_type": access_type,
727 "squash": squash,
728 "fsal": {
729 "name": NFS_GANESHA_SUPPORTED_FSALS[1],
730 "user_id": user_id,
731 },
732 "clients": clients,
733 "sectype": sectype,
734 }
735 )
736 log.debug("creating rgw export %s", export)
737 self._create_export_user(export)
738 self._save_export(cluster_id, export)
739 result = {
740 "bind": export.pseudo,
741 "path": export.path,
742 "cluster": cluster_id,
743 "mode": export.access_type,
744 "squash": export.squash,
745 }
746 return result
747 raise NonFatalError("Export already exists")
748
749 def _apply_export(
750 self,
751 cluster_id: str,
752 new_export_dict: Dict,
753 ) -> Dict[str, str]:
754 for k in ['path', 'pseudo']:
755 if k not in new_export_dict:
756 raise NFSInvalidOperation(f'Export missing required field {k}')
757 if cluster_id not in self.exports:
758 self.exports[cluster_id] = []
759
760 new_export_dict['path'] = normalize_path(new_export_dict['path'])
761 new_export_dict['pseudo'] = normalize_path(new_export_dict['pseudo'])
762
763 old_export = self._fetch_export(cluster_id, new_export_dict['pseudo'])
764 if old_export:
765 # Check if export id matches
766 if new_export_dict.get('export_id'):
767 if old_export.export_id != new_export_dict.get('export_id'):
768 raise NFSInvalidOperation('Export ID changed, Cannot update export')
769 else:
770 new_export_dict['export_id'] = old_export.export_id
771 elif new_export_dict.get('export_id'):
772 old_export = self._fetch_export_obj(cluster_id, new_export_dict['export_id'])
773 if old_export:
774 # re-fetch via old pseudo
775 old_export = self._fetch_export(cluster_id, old_export.pseudo)
776 assert old_export
777 log.debug("export %s pseudo %s -> %s",
778 old_export.export_id, old_export.pseudo, new_export_dict['pseudo'])
779
780 new_export = self.create_export_from_dict(
781 cluster_id,
782 new_export_dict.get('export_id', self._gen_export_id(cluster_id)),
783 new_export_dict
784 )
785
786 if not old_export:
787 self._create_export_user(new_export)
788 self._save_export(cluster_id, new_export)
789 return {"pseudo": new_export.pseudo, "state": "added"}
790
791 need_nfs_service_restart = True
792 if old_export.fsal.name != new_export.fsal.name:
793 raise NFSInvalidOperation('FSAL change not allowed')
794 if old_export.pseudo != new_export.pseudo:
795 log.debug('export %s pseudo %s -> %s',
796 new_export.export_id, old_export.pseudo, new_export.pseudo)
797
798 if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[0]:
799 old_fsal = cast(CephFSFSAL, old_export.fsal)
800 new_fsal = cast(CephFSFSAL, new_export.fsal)
801 if old_fsal.user_id != new_fsal.user_id:
802 self._delete_export_user(old_export)
803 self._create_export_user(new_export)
804 elif (
805 old_export.path != new_export.path
806 or old_fsal.fs_name != new_fsal.fs_name
807 ):
808 self._update_user_id(
809 cluster_id,
810 new_export.path,
811 cast(str, new_fsal.fs_name),
812 cast(str, new_fsal.user_id)
813 )
814 new_fsal.cephx_key = old_fsal.cephx_key
815 else:
816 expected_mds_caps = 'allow rw path={}'.format(new_export.path)
817 entity = new_fsal.user_id
818 ret, out, err = self.mgr.mon_command({
819 'prefix': 'auth get',
820 'entity': 'client.{}'.format(entity),
821 'format': 'json',
822 })
823 if ret:
824 raise NFSException(f'Failed to fetch caps for {entity}: {err}')
825 actual_mds_caps = json.loads(out)[0]['caps'].get('mds')
826 if actual_mds_caps != expected_mds_caps:
827 self._update_user_id(
828 cluster_id,
829 new_export.path,
830 cast(str, new_fsal.fs_name),
831 cast(str, new_fsal.user_id)
832 )
833 elif old_export.pseudo == new_export.pseudo:
834 need_nfs_service_restart = False
835 new_fsal.cephx_key = old_fsal.cephx_key
836
837 if old_export.fsal.name == NFS_GANESHA_SUPPORTED_FSALS[1]:
838 old_rgw_fsal = cast(RGWFSAL, old_export.fsal)
839 new_rgw_fsal = cast(RGWFSAL, new_export.fsal)
840 if old_rgw_fsal.user_id != new_rgw_fsal.user_id:
841 self._delete_export_user(old_export)
842 self._create_export_user(new_export)
843 elif old_rgw_fsal.access_key_id != new_rgw_fsal.access_key_id:
844 raise NFSInvalidOperation('access_key_id change is not allowed')
845 elif old_rgw_fsal.secret_access_key != new_rgw_fsal.secret_access_key:
846 raise NFSInvalidOperation('secret_access_key change is not allowed')
847
848 self.exports[cluster_id].remove(old_export)
849
850 self._update_export(cluster_id, new_export, need_nfs_service_restart)
851
852 return {"pseudo": new_export.pseudo, "state": "updated"}
853
854 def _rados(self, cluster_id: str) -> NFSRados:
855 """Return a new NFSRados object for the given cluster id."""
856 return NFSRados(self.mgr.rados, cluster_id)