]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/operations/volume.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / operations / volume.py
1 import time
2 import errno
3 import logging
4 from contextlib import contextmanager
5 from threading import Lock, Condition
6
7 try:
8 # py2
9 from threading import _Timer as Timer
10 except ImportError:
11 #py3
12 from threading import Timer
13
14 import cephfs
15 import orchestrator
16
17 from .lock import GlobalLock
18 from ..exception import VolumeException
19 from ..fs_util import create_pool, remove_pool, create_filesystem, \
20 remove_filesystem, create_mds, volume_exists
21
22 log = logging.getLogger(__name__)
23
24 class ConnectionPool(object):
25 class Connection(object):
26 def __init__(self, mgr, fs_name):
27 self.fs = None
28 self.mgr = mgr
29 self.fs_name = fs_name
30 self.ops_in_progress = 0
31 self.last_used = time.time()
32 self.fs_id = self.get_fs_id()
33
34 def get_fs_id(self):
35 fs_map = self.mgr.get('fs_map')
36 for fs in fs_map['filesystems']:
37 if fs['mdsmap']['fs_name'] == self.fs_name:
38 return fs['id']
39 raise VolumeException(
40 -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
41
42 def get_fs_handle(self):
43 self.last_used = time.time()
44 self.ops_in_progress += 1
45 return self.fs
46
47 def put_fs_handle(self, notify):
48 assert self.ops_in_progress > 0
49 self.ops_in_progress -= 1
50 if self.ops_in_progress == 0:
51 notify()
52
53 def del_fs_handle(self, waiter):
54 if waiter:
55 while self.ops_in_progress != 0:
56 waiter()
57 if self.is_connection_valid():
58 self.disconnect()
59 else:
60 self.abort()
61
62 def is_connection_valid(self):
63 fs_id = None
64 try:
65 fs_id = self.get_fs_id()
66 except:
67 # the filesystem does not exist now -- connection is not valid.
68 pass
69 log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
70 return self.fs_id == fs_id
71
72 def is_connection_idle(self, timeout):
73 return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
74
75 def connect(self):
76 assert self.ops_in_progress == 0
77 log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
78 self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
79 log.debug("Setting user ID and group ID of CephFS mount as root...")
80 self.fs.conf_set("client_mount_uid", "0")
81 self.fs.conf_set("client_mount_gid", "0")
82 log.debug("CephFS initializing...")
83 self.fs.init()
84 log.debug("CephFS mounting...")
85 self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
86 log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
87
88 def disconnect(self):
89 try:
90 assert self.ops_in_progress == 0
91 log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
92 self.fs.shutdown()
93 self.fs = None
94 except Exception as e:
95 log.debug("disconnect: ({0})".format(e))
96 raise
97
98 def abort(self):
99 assert self.ops_in_progress == 0
100 log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
101 self.fs.abort_conn()
102 log.info("abort done from cephfs '{0}'".format(self.fs_name))
103 self.fs = None
104
105 class RTimer(Timer):
106 """
107 recurring timer variant of Timer
108 """
109 def run(self):
110 try:
111 while not self.finished.is_set():
112 self.finished.wait(self.interval)
113 self.function(*self.args, **self.kwargs)
114 self.finished.set()
115 except Exception as e:
116 log.error("ConnectionPool.RTimer: %s", e)
117 raise
118
119 # TODO: make this configurable
120 TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
121 CONNECTION_IDLE_INTERVAL = 60.0 # seconds
122
123 def __init__(self, mgr):
124 self.mgr = mgr
125 self.connections = {}
126 self.lock = Lock()
127 self.cond = Condition(self.lock)
128 self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
129 self.cleanup_connections)
130 self.timer_task.start()
131
132 def cleanup_connections(self):
133 with self.lock:
134 log.info("scanning for idle connections..")
135 idle_fs = [fs_name for fs_name,conn in self.connections.items()
136 if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
137 for fs_name in idle_fs:
138 log.info("cleaning up connection for '{}'".format(fs_name))
139 self._del_fs_handle(fs_name)
140
141 def get_fs_handle(self, fs_name):
142 with self.lock:
143 conn = None
144 try:
145 conn = self.connections.get(fs_name, None)
146 if conn:
147 if conn.is_connection_valid():
148 return conn.get_fs_handle()
149 else:
150 # filesystem id changed beneath us (or the filesystem does not exist).
151 # this is possible if the filesystem got removed (and recreated with
152 # same name) via "ceph fs rm/new" mon command.
153 log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
154 self._del_fs_handle(fs_name)
155 conn = ConnectionPool.Connection(self.mgr, fs_name)
156 conn.connect()
157 except cephfs.Error as e:
158 # try to provide a better error string if possible
159 if e.args[0] == errno.ENOENT:
160 raise VolumeException(
161 -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
162 raise VolumeException(-e.args[0], e.args[1])
163 self.connections[fs_name] = conn
164 return conn.get_fs_handle()
165
166 def put_fs_handle(self, fs_name):
167 with self.lock:
168 conn = self.connections.get(fs_name, None)
169 if conn:
170 conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
171
172 def _del_fs_handle(self, fs_name, wait=False):
173 conn = self.connections.pop(fs_name, None)
174 if conn:
175 conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
176
177 def del_fs_handle(self, fs_name, wait=False):
178 with self.lock:
179 self._del_fs_handle(fs_name, wait)
180
181 def del_all_handles(self):
182 with self.lock:
183 for fs_name in list(self.connections.keys()):
184 log.info("waiting for pending ops for '{}'".format(fs_name))
185 self._del_fs_handle(fs_name, wait=True)
186 log.info("pending ops completed for '{}'".format(fs_name))
187 # no new connections should have been initialized since its
188 # guarded on shutdown.
189 assert len(self.connections) == 0
190
191 def gen_pool_names(volname):
192 """
193 return metadata and data pool name (from a filesystem/volume name) as a tuple
194 """
195 return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
196
197 def create_volume(mgr, volname):
198 """
199 create volume (pool, filesystem and mds)
200 """
201 metadata_pool, data_pool = gen_pool_names(volname)
202 # create pools
203 r, outs, outb = create_pool(mgr, metadata_pool, 16)
204 if r != 0:
205 return r, outb, outs
206 r, outb, outs = create_pool(mgr, data_pool, 8)
207 if r != 0:
208 #cleanup
209 remove_pool(metadata_pool)
210 return r, outb, outs
211 # create filesystem
212 r, outb, outs = create_filesystem(mgr, volname, metadata_pool, data_pool)
213 if r != 0:
214 log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
215 #cleanup
216 remove_pool(data_pool)
217 remove_pool(metadata_pool)
218 return r, outb, outs
219 # create mds
220 return create_mds(mgr, volname)
221
222 def delete_volume(mgr, volname):
223 """
224 delete the given module (tear down mds, remove filesystem)
225 """
226 # Tear down MDS daemons
227 try:
228 completion = mgr.remove_stateless_service("mds", volname)
229 mgr._orchestrator_wait([completion])
230 orchestrator.raise_if_exception(completion)
231 except (ImportError, orchestrator.OrchestratorError):
232 log.warning("OrchestratorError, not tearing down MDS daemons")
233 except Exception as e:
234 # Don't let detailed orchestrator exceptions (python backtraces)
235 # bubble out to the user
236 log.exception("Failed to tear down MDS daemons")
237 return -errno.EINVAL, "", str(e)
238
239 # In case orchestrator didn't tear down MDS daemons cleanly, or
240 # there was no orchestrator, we force the daemons down.
241 if volume_exists(mgr, volname):
242 r, outb, outs = remove_filesystem(mgr, volname)
243 if r != 0:
244 return r, outb, outs
245 else:
246 err = "Filesystem not found for volume '{0}'".format(volname)
247 log.warning(err)
248 return -errno.ENOENT, "", err
249 metadata_pool, data_pool = gen_pool_names(volname)
250 r, outb, outs = remove_pool(mgr, metadata_pool)
251 if r != 0:
252 return r, outb, outs
253 return remove_pool(mgr, data_pool)
254
255 def list_volumes(mgr):
256 """
257 list all filesystem volumes.
258
259 :param: None
260 :return: None
261 """
262 result = []
263 fs_map = mgr.get("fs_map")
264 for f in fs_map['filesystems']:
265 result.append({'name': f['mdsmap']['fs_name']})
266 return result
267
268 @contextmanager
269 def open_volume(vc, volname):
270 """
271 open a volume for exclusive access. This API is to be used as a context manager.
272
273 :param vc: volume client instance
274 :param volname: volume name
275 :return: yields a volume handle (ceph filesystem handle)
276 """
277 if vc.is_stopping():
278 raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
279
280 g_lock = GlobalLock()
281 fs_handle = vc.connection_pool.get_fs_handle(volname)
282 try:
283 with g_lock.lock_op():
284 yield fs_handle
285 finally:
286 vc.connection_pool.put_fs_handle(volname)
287
288 @contextmanager
289 def open_volume_lockless(vc, volname):
290 """
291 open a volume with shared access. This API is to be used as a context manager.
292
293 :param vc: volume client instance
294 :param volname: volume name
295 :return: yields a volume handle (ceph filesystem handle)
296 """
297 if vc.is_stopping():
298 raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
299
300 fs_handle = vc.connection_pool.get_fs_handle(volname)
301 try:
302 yield fs_handle
303 finally:
304 vc.connection_pool.put_fs_handle(volname)