]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/operations/volume.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / operations / volume.py
1 import time
2 import errno
3 import logging
4 import sys
5
6 from contextlib import contextmanager
7 from threading import Lock, Condition
8 from typing import no_type_check
9
10 if sys.version_info >= (3, 3):
11 from threading import Timer
12 else:
13 from threading import _Timer as Timer
14
15 import cephfs
16 import orchestrator
17
18 from .lock import GlobalLock
19 from ..exception import VolumeException
20 from ..fs_util import create_pool, remove_pool, create_filesystem, \
21 remove_filesystem, create_mds, volume_exists
22
23 log = logging.getLogger(__name__)
24
25 class ConnectionPool(object):
26 class Connection(object):
27 def __init__(self, mgr, fs_name):
28 self.fs = None
29 self.mgr = mgr
30 self.fs_name = fs_name
31 self.ops_in_progress = 0
32 self.last_used = time.time()
33 self.fs_id = self.get_fs_id()
34
35 def get_fs_id(self):
36 fs_map = self.mgr.get('fs_map')
37 for fs in fs_map['filesystems']:
38 if fs['mdsmap']['fs_name'] == self.fs_name:
39 return fs['id']
40 raise VolumeException(
41 -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
42
43 def get_fs_handle(self):
44 self.last_used = time.time()
45 self.ops_in_progress += 1
46 return self.fs
47
48 def put_fs_handle(self, notify):
49 assert self.ops_in_progress > 0
50 self.ops_in_progress -= 1
51 if self.ops_in_progress == 0:
52 notify()
53
54 def del_fs_handle(self, waiter):
55 if waiter:
56 while self.ops_in_progress != 0:
57 waiter()
58 if self.is_connection_valid():
59 self.disconnect()
60 else:
61 self.abort()
62
63 def is_connection_valid(self):
64 fs_id = None
65 try:
66 fs_id = self.get_fs_id()
67 except:
68 # the filesystem does not exist now -- connection is not valid.
69 pass
70 log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
71 return self.fs_id == fs_id
72
73 def is_connection_idle(self, timeout):
74 return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
75
76 def connect(self):
77 assert self.ops_in_progress == 0
78 log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
79 self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
80 log.debug("Setting user ID and group ID of CephFS mount as root...")
81 self.fs.conf_set("client_mount_uid", "0")
82 self.fs.conf_set("client_mount_gid", "0")
83 log.debug("CephFS initializing...")
84 self.fs.init()
85 log.debug("CephFS mounting...")
86 self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
87 log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
88 self.mgr._ceph_register_client(self.fs.get_addrs())
89
90 def disconnect(self):
91 try:
92 assert self.fs
93 assert self.ops_in_progress == 0
94 log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
95 addrs = self.fs.get_addrs()
96 self.fs.shutdown()
97 self.mgr._ceph_unregister_client(addrs)
98 self.fs = None
99 except Exception as e:
100 log.debug("disconnect: ({0})".format(e))
101 raise
102
103 def abort(self):
104 assert self.fs
105 assert self.ops_in_progress == 0
106 log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
107 self.fs.abort_conn()
108 log.info("abort done from cephfs '{0}'".format(self.fs_name))
109 self.fs = None
110
111 class RTimer(Timer):
112 """
113 recurring timer variant of Timer
114 """
115 @no_type_check
116 def run(self):
117 try:
118 while not self.finished.is_set():
119 self.finished.wait(self.interval)
120 self.function(*self.args, **self.kwargs)
121 self.finished.set()
122 except Exception as e:
123 log.error("ConnectionPool.RTimer: %s", e)
124 raise
125
126 # TODO: make this configurable
127 TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
128 CONNECTION_IDLE_INTERVAL = 60.0 # seconds
129
130 def __init__(self, mgr):
131 self.mgr = mgr
132 self.connections = {}
133 self.lock = Lock()
134 self.cond = Condition(self.lock)
135 self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
136 self.cleanup_connections)
137 self.timer_task.start()
138
139 def cleanup_connections(self):
140 with self.lock:
141 log.info("scanning for idle connections..")
142 idle_fs = [fs_name for fs_name,conn in self.connections.items()
143 if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
144 for fs_name in idle_fs:
145 log.info("cleaning up connection for '{}'".format(fs_name))
146 self._del_fs_handle(fs_name)
147
148 def get_fs_handle(self, fs_name):
149 with self.lock:
150 conn = None
151 try:
152 conn = self.connections.get(fs_name, None)
153 if conn:
154 if conn.is_connection_valid():
155 return conn.get_fs_handle()
156 else:
157 # filesystem id changed beneath us (or the filesystem does not exist).
158 # this is possible if the filesystem got removed (and recreated with
159 # same name) via "ceph fs rm/new" mon command.
160 log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
161 self._del_fs_handle(fs_name)
162 conn = ConnectionPool.Connection(self.mgr, fs_name)
163 conn.connect()
164 except cephfs.Error as e:
165 # try to provide a better error string if possible
166 if e.args[0] == errno.ENOENT:
167 raise VolumeException(
168 -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
169 raise VolumeException(-e.args[0], e.args[1])
170 self.connections[fs_name] = conn
171 return conn.get_fs_handle()
172
173 def put_fs_handle(self, fs_name):
174 with self.lock:
175 conn = self.connections.get(fs_name, None)
176 if conn:
177 conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
178
179 def _del_fs_handle(self, fs_name, wait=False):
180 conn = self.connections.pop(fs_name, None)
181 if conn:
182 conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
183
184 def del_fs_handle(self, fs_name, wait=False):
185 with self.lock:
186 self._del_fs_handle(fs_name, wait)
187
188 def del_all_handles(self):
189 with self.lock:
190 for fs_name in list(self.connections.keys()):
191 log.info("waiting for pending ops for '{}'".format(fs_name))
192 self._del_fs_handle(fs_name, wait=True)
193 log.info("pending ops completed for '{}'".format(fs_name))
194 # no new connections should have been initialized since its
195 # guarded on shutdown.
196 assert len(self.connections) == 0
197
198 def gen_pool_names(volname):
199 """
200 return metadata and data pool name (from a filesystem/volume name) as a tuple
201 """
202 return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
203
204 def create_volume(mgr, volname, placement):
205 """
206 create volume (pool, filesystem and mds)
207 """
208 metadata_pool, data_pool = gen_pool_names(volname)
209 # create pools
210 r, outs, outb = create_pool(mgr, metadata_pool)
211 if r != 0:
212 return r, outb, outs
213 r, outb, outs = create_pool(mgr, data_pool)
214 if r != 0:
215 #cleanup
216 remove_pool(mgr, metadata_pool)
217 return r, outb, outs
218 # create filesystem
219 r, outb, outs = create_filesystem(mgr, volname, metadata_pool, data_pool)
220 if r != 0:
221 log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
222 #cleanup
223 remove_pool(mgr, data_pool)
224 remove_pool(mgr, metadata_pool)
225 return r, outb, outs
226 # create mds
227 return create_mds(mgr, volname, placement)
228
229 def delete_volume(mgr, volname):
230 """
231 delete the given module (tear down mds, remove filesystem)
232 """
233 # Tear down MDS daemons
234 try:
235 completion = mgr.remove_service('mds.' + volname)
236 mgr._orchestrator_wait([completion])
237 orchestrator.raise_if_exception(completion)
238 except (ImportError, orchestrator.OrchestratorError):
239 log.warning("OrchestratorError, not tearing down MDS daemons")
240 except Exception as e:
241 # Don't let detailed orchestrator exceptions (python backtraces)
242 # bubble out to the user
243 log.exception("Failed to tear down MDS daemons")
244 return -errno.EINVAL, "", str(e)
245
246 # In case orchestrator didn't tear down MDS daemons cleanly, or
247 # there was no orchestrator, we force the daemons down.
248 if volume_exists(mgr, volname):
249 r, outb, outs = remove_filesystem(mgr, volname)
250 if r != 0:
251 return r, outb, outs
252 else:
253 err = "Filesystem not found for volume '{0}'".format(volname)
254 log.warning(err)
255 return -errno.ENOENT, "", err
256 metadata_pool, data_pool = gen_pool_names(volname)
257 r, outb, outs = remove_pool(mgr, metadata_pool)
258 if r != 0:
259 return r, outb, outs
260 return remove_pool(mgr, data_pool)
261
262 def list_volumes(mgr):
263 """
264 list all filesystem volumes.
265
266 :param: None
267 :return: None
268 """
269 result = []
270 fs_map = mgr.get("fs_map")
271 for f in fs_map['filesystems']:
272 result.append({'name': f['mdsmap']['fs_name']})
273 return result
274
275 @contextmanager
276 def open_volume(vc, volname):
277 """
278 open a volume for exclusive access. This API is to be used as a context manager.
279
280 :param vc: volume client instance
281 :param volname: volume name
282 :return: yields a volume handle (ceph filesystem handle)
283 """
284 if vc.is_stopping():
285 raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
286
287 g_lock = GlobalLock()
288 fs_handle = vc.connection_pool.get_fs_handle(volname)
289 try:
290 with g_lock.lock_op():
291 yield fs_handle
292 finally:
293 vc.connection_pool.put_fs_handle(volname)
294
295 @contextmanager
296 def open_volume_lockless(vc, volname):
297 """
298 open a volume with shared access. This API is to be used as a context manager.
299
300 :param vc: volume client instance
301 :param volname: volume name
302 :return: yields a volume handle (ceph filesystem handle)
303 """
304 if vc.is_stopping():
305 raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
306
307 fs_handle = vc.connection_pool.get_fs_handle(volname)
308 try:
309 yield fs_handle
310 finally:
311 vc.connection_pool.put_fs_handle(volname)