]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/nfs/module.py
403f89f3a97367061a5d9cbc1624773b84a9cec5
[ceph.git] / ceph / src / pybind / mgr / nfs / module.py
1 import logging
2 import threading
3 from typing import Tuple, Optional, List, Dict, Any
4
5 from mgr_module import MgrModule, CLICommand, Option, CLICheckNonemptyFileInput
6 import orchestrator
7
8 from .export import ExportMgr
9 from .cluster import NFSCluster
10 from .utils import available_clusters
11
12 log = logging.getLogger(__name__)
13
14
15 class Module(orchestrator.OrchestratorClientMixin, MgrModule):
16 MODULE_OPTIONS: List[Option] = []
17
18 def __init__(self, *args: str, **kwargs: Any) -> None:
19 self.inited = False
20 self.lock = threading.Lock()
21 super(Module, self).__init__(*args, **kwargs)
22 with self.lock:
23 self.export_mgr = ExportMgr(self)
24 self.nfs = NFSCluster(self)
25 self.inited = True
26
27 @CLICommand('nfs export create cephfs', perm='rw')
28 def _cmd_nfs_export_create_cephfs(
29 self,
30 cluster_id: str,
31 pseudo_path: str,
32 fsname: str,
33 path: Optional[str] = '/',
34 readonly: Optional[bool] = False,
35 client_addr: Optional[List[str]] = None,
36 squash: str = 'none',
37 ) -> Tuple[int, str, str]:
38 """Create a CephFS export"""
39 return self.export_mgr.create_export(fsal_type='cephfs', fs_name=fsname,
40 cluster_id=cluster_id, pseudo_path=pseudo_path,
41 read_only=readonly, path=path,
42 squash=squash, addr=client_addr)
43
44 @CLICommand('nfs export create rgw', perm='rw')
45 def _cmd_nfs_export_create_rgw(
46 self,
47 cluster_id: str,
48 pseudo_path: str,
49 bucket: Optional[str] = None,
50 user_id: Optional[str] = None,
51 readonly: Optional[bool] = False,
52 client_addr: Optional[List[str]] = None,
53 squash: str = 'none',
54 ) -> Tuple[int, str, str]:
55 """Create an RGW export"""
56 return self.export_mgr.create_export(fsal_type='rgw', bucket=bucket,
57 user_id=user_id,
58 cluster_id=cluster_id, pseudo_path=pseudo_path,
59 read_only=readonly, squash=squash,
60 addr=client_addr)
61
62 @CLICommand('nfs export rm', perm='rw')
63 def _cmd_nfs_export_rm(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
64 """Remove a cephfs export"""
65 return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
66
67 @CLICommand('nfs export delete', perm='rw')
68 def _cmd_nfs_export_delete(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
69 """Delete a cephfs export (DEPRECATED)"""
70 return self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
71
72 @CLICommand('nfs export ls', perm='r')
73 def _cmd_nfs_export_ls(self, cluster_id: str, detailed: bool = False) -> Tuple[int, str, str]:
74 """List exports of a NFS cluster"""
75 return self.export_mgr.list_exports(cluster_id=cluster_id, detailed=detailed)
76
77 @CLICommand('nfs export info', perm='r')
78 def _cmd_nfs_export_info(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
79 """Fetch a export of a NFS cluster given the pseudo path/binding"""
80 return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
81
82 @CLICommand('nfs export get', perm='r')
83 def _cmd_nfs_export_get(self, cluster_id: str, pseudo_path: str) -> Tuple[int, str, str]:
84 """Fetch a export of a NFS cluster given the pseudo path/binding (DEPRECATED)"""
85 return self.export_mgr.get_export(cluster_id=cluster_id, pseudo_path=pseudo_path)
86
87 @CLICommand('nfs export apply', perm='rw')
88 @CLICheckNonemptyFileInput(desc='Export JSON or Ganesha EXPORT specification')
89 def _cmd_nfs_export_apply(self, cluster_id: str, inbuf: str) -> Tuple[int, str, str]:
90 """Create or update an export by `-i <json_or_ganesha_export_file>`"""
91 return self.export_mgr.apply_export(cluster_id, export_config=inbuf)
92
93 @CLICommand('nfs cluster create', perm='rw')
94 def _cmd_nfs_cluster_create(self,
95 cluster_id: str,
96 placement: Optional[str] = None,
97 ingress: Optional[bool] = None,
98 virtual_ip: Optional[str] = None,
99 port: Optional[int] = None) -> Tuple[int, str, str]:
100 """Create an NFS Cluster"""
101 return self.nfs.create_nfs_cluster(cluster_id=cluster_id, placement=placement,
102 virtual_ip=virtual_ip, ingress=ingress,
103 port=port)
104
105 @CLICommand('nfs cluster rm', perm='rw')
106 def _cmd_nfs_cluster_rm(self, cluster_id: str) -> Tuple[int, str, str]:
107 """Removes an NFS Cluster"""
108 return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
109
110 @CLICommand('nfs cluster delete', perm='rw')
111 def _cmd_nfs_cluster_delete(self, cluster_id: str) -> Tuple[int, str, str]:
112 """Removes an NFS Cluster (DEPRECATED)"""
113 return self.nfs.delete_nfs_cluster(cluster_id=cluster_id)
114
115 @CLICommand('nfs cluster ls', perm='r')
116 def _cmd_nfs_cluster_ls(self) -> Tuple[int, str, str]:
117 """List NFS Clusters"""
118 return self.nfs.list_nfs_cluster()
119
120 @CLICommand('nfs cluster info', perm='r')
121 def _cmd_nfs_cluster_info(self, cluster_id: Optional[str] = None) -> Tuple[int, str, str]:
122 """Displays NFS Cluster info"""
123 return self.nfs.show_nfs_cluster_info(cluster_id=cluster_id)
124
125 @CLICommand('nfs cluster config get', perm='r')
126 def _cmd_nfs_cluster_config_get(self, cluster_id: str) -> Tuple[int, str, str]:
127 """Fetch NFS-Ganesha config"""
128 return self.nfs.get_nfs_cluster_config(cluster_id=cluster_id)
129
130 @CLICommand('nfs cluster config set', perm='rw')
131 @CLICheckNonemptyFileInput(desc='NFS-Ganesha Configuration')
132 def _cmd_nfs_cluster_config_set(self, cluster_id: str, inbuf: str) -> Tuple[int, str, str]:
133 """Set NFS-Ganesha config by `-i <config_file>`"""
134 return self.nfs.set_nfs_cluster_config(cluster_id=cluster_id, nfs_config=inbuf)
135
136 @CLICommand('nfs cluster config reset', perm='rw')
137 def _cmd_nfs_cluster_config_reset(self, cluster_id: str) -> Tuple[int, str, str]:
138 """Reset NFS-Ganesha Config to default"""
139 return self.nfs.reset_nfs_cluster_config(cluster_id=cluster_id)
140
141 def fetch_nfs_export_obj(self) -> ExportMgr:
142 return self.export_mgr
143
144 def export_ls(self) -> List[Dict[Any, Any]]:
145 return self.export_mgr.list_all_exports()
146
147 def export_get(self, cluster_id: str, export_id: int) -> Optional[Dict[str, Any]]:
148 return self.export_mgr.get_export_by_id(cluster_id, export_id)
149
150 def export_rm(self, cluster_id: str, pseudo: str) -> None:
151 self.export_mgr.delete_export(cluster_id=cluster_id, pseudo_path=pseudo)
152
153 def cluster_ls(self) -> List[str]:
154 return available_clusters(self)