]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/operations/volume.py
17f377ca44a14a79dd9f28ec9d95d85ec9feeb8c
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / operations / volume.py
1 import time
2 import errno
3 import logging
4 import sys
5
6 from typing import List
7
8 from contextlib import contextmanager
9 from threading import Lock, Condition
10 from typing import no_type_check
11
12 if sys.version_info >= (3, 3):
13 from threading import Timer
14 else:
15 from threading import _Timer as Timer
16
17 import cephfs
18 import orchestrator
19
20 from .lock import GlobalLock
21 from ..exception import VolumeException
22 from ..fs_util import create_pool, remove_pool, create_filesystem, \
23 remove_filesystem, create_mds, volume_exists
24
25 log = logging.getLogger(__name__)
26
27 class ConnectionPool(object):
28 class Connection(object):
29 def __init__(self, mgr, fs_name):
30 self.fs = None
31 self.mgr = mgr
32 self.fs_name = fs_name
33 self.ops_in_progress = 0
34 self.last_used = time.time()
35 self.fs_id = self.get_fs_id()
36
37 def get_fs_id(self):
38 fs_map = self.mgr.get('fs_map')
39 for fs in fs_map['filesystems']:
40 if fs['mdsmap']['fs_name'] == self.fs_name:
41 return fs['id']
42 raise VolumeException(
43 -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
44
45 def get_fs_handle(self):
46 self.last_used = time.time()
47 self.ops_in_progress += 1
48 return self.fs
49
50 def put_fs_handle(self, notify):
51 assert self.ops_in_progress > 0
52 self.ops_in_progress -= 1
53 if self.ops_in_progress == 0:
54 notify()
55
56 def del_fs_handle(self, waiter):
57 if waiter:
58 while self.ops_in_progress != 0:
59 waiter()
60 if self.is_connection_valid():
61 self.disconnect()
62 else:
63 self.abort()
64
65 def is_connection_valid(self):
66 fs_id = None
67 try:
68 fs_id = self.get_fs_id()
69 except:
70 # the filesystem does not exist now -- connection is not valid.
71 pass
72 log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id))
73 return self.fs_id == fs_id
74
75 def is_connection_idle(self, timeout):
76 return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
77
78 def connect(self):
79 assert self.ops_in_progress == 0
80 log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
81 self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
82 log.debug("Setting user ID and group ID of CephFS mount as root...")
83 self.fs.conf_set("client_mount_uid", "0")
84 self.fs.conf_set("client_mount_gid", "0")
85 log.debug("CephFS initializing...")
86 self.fs.init()
87 log.debug("CephFS mounting...")
88 self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
89 log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
90 self.mgr._ceph_register_client(self.fs.get_addrs())
91
92 def disconnect(self):
93 try:
94 assert self.fs
95 assert self.ops_in_progress == 0
96 log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
97 addrs = self.fs.get_addrs()
98 self.fs.shutdown()
99 self.mgr._ceph_unregister_client(addrs)
100 self.fs = None
101 except Exception as e:
102 log.debug("disconnect: ({0})".format(e))
103 raise
104
105 def abort(self):
106 assert self.fs
107 assert self.ops_in_progress == 0
108 log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
109 self.fs.abort_conn()
110 log.info("abort done from cephfs '{0}'".format(self.fs_name))
111 self.fs = None
112
113 class RTimer(Timer):
114 """
115 recurring timer variant of Timer
116 """
117 @no_type_check
118 def run(self):
119 try:
120 while not self.finished.is_set():
121 self.finished.wait(self.interval)
122 self.function(*self.args, **self.kwargs)
123 self.finished.set()
124 except Exception as e:
125 log.error("ConnectionPool.RTimer: %s", e)
126 raise
127
128 # TODO: make this configurable
129 TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
130 CONNECTION_IDLE_INTERVAL = 60.0 # seconds
131
132 def __init__(self, mgr):
133 self.mgr = mgr
134 self.connections = {}
135 self.lock = Lock()
136 self.cond = Condition(self.lock)
137 self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
138 self.cleanup_connections)
139 self.timer_task.start()
140
141 def cleanup_connections(self):
142 with self.lock:
143 log.info("scanning for idle connections..")
144 idle_fs = [fs_name for fs_name,conn in self.connections.items()
145 if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
146 for fs_name in idle_fs:
147 log.info("cleaning up connection for '{}'".format(fs_name))
148 self._del_fs_handle(fs_name)
149
150 def get_fs_handle(self, fs_name):
151 with self.lock:
152 conn = None
153 try:
154 conn = self.connections.get(fs_name, None)
155 if conn:
156 if conn.is_connection_valid():
157 return conn.get_fs_handle()
158 else:
159 # filesystem id changed beneath us (or the filesystem does not exist).
160 # this is possible if the filesystem got removed (and recreated with
161 # same name) via "ceph fs rm/new" mon command.
162 log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
163 self._del_fs_handle(fs_name)
164 conn = ConnectionPool.Connection(self.mgr, fs_name)
165 conn.connect()
166 except cephfs.Error as e:
167 # try to provide a better error string if possible
168 if e.args[0] == errno.ENOENT:
169 raise VolumeException(
170 -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
171 raise VolumeException(-e.args[0], e.args[1])
172 self.connections[fs_name] = conn
173 return conn.get_fs_handle()
174
175 def put_fs_handle(self, fs_name):
176 with self.lock:
177 conn = self.connections.get(fs_name, None)
178 if conn:
179 conn.put_fs_handle(notify=lambda: self.cond.notifyAll())
180
181 def _del_fs_handle(self, fs_name, wait=False):
182 conn = self.connections.pop(fs_name, None)
183 if conn:
184 conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait())
185
186 def del_fs_handle(self, fs_name, wait=False):
187 with self.lock:
188 self._del_fs_handle(fs_name, wait)
189
190 def del_all_handles(self):
191 with self.lock:
192 for fs_name in list(self.connections.keys()):
193 log.info("waiting for pending ops for '{}'".format(fs_name))
194 self._del_fs_handle(fs_name, wait=True)
195 log.info("pending ops completed for '{}'".format(fs_name))
196 # no new connections should have been initialized since its
197 # guarded on shutdown.
198 assert len(self.connections) == 0
199
200 def gen_pool_names(volname):
201 """
202 return metadata and data pool name (from a filesystem/volume name) as a tuple
203 """
204 return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
205
206 def get_pool_names(mgr, volname):
207 """
208 return metadata and data pools (list) names of volume as a tuple
209 """
210 fs_map = mgr.get("fs_map")
211 metadata_pool_id = None
212 data_pool_ids = [] # type: List[int]
213 for f in fs_map['filesystems']:
214 if volname == f['mdsmap']['fs_name']:
215 metadata_pool_id = f['mdsmap']['metadata_pool']
216 data_pool_ids = f['mdsmap']['data_pools']
217 break
218 if metadata_pool_id is None:
219 return None, None
220
221 osdmap = mgr.get("osd_map")
222 pools = dict([(p['pool'], p['pool_name']) for p in osdmap['pools']])
223 metadata_pool = pools[metadata_pool_id]
224 data_pools = [pools[id] for id in data_pool_ids]
225 return metadata_pool, data_pools
226
227 def create_volume(mgr, volname, placement):
228 """
229 create volume (pool, filesystem and mds)
230 """
231 metadata_pool, data_pool = gen_pool_names(volname)
232 # create pools
233 r, outs, outb = create_pool(mgr, metadata_pool)
234 if r != 0:
235 return r, outb, outs
236 r, outb, outs = create_pool(mgr, data_pool)
237 if r != 0:
238 #cleanup
239 remove_pool(mgr, metadata_pool)
240 return r, outb, outs
241 # create filesystem
242 r, outb, outs = create_filesystem(mgr, volname, metadata_pool, data_pool)
243 if r != 0:
244 log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
245 #cleanup
246 remove_pool(mgr, data_pool)
247 remove_pool(mgr, metadata_pool)
248 return r, outb, outs
249 # create mds
250 return create_mds(mgr, volname, placement)
251
252 def delete_volume(mgr, volname, metadata_pool, data_pools):
253 """
254 delete the given module (tear down mds, remove filesystem, remove pools)
255 """
256 # Tear down MDS daemons
257 try:
258 completion = mgr.remove_service('mds.' + volname)
259 mgr._orchestrator_wait([completion])
260 orchestrator.raise_if_exception(completion)
261 except (ImportError, orchestrator.OrchestratorError):
262 log.warning("OrchestratorError, not tearing down MDS daemons")
263 except Exception as e:
264 # Don't let detailed orchestrator exceptions (python backtraces)
265 # bubble out to the user
266 log.exception("Failed to tear down MDS daemons")
267 return -errno.EINVAL, "", str(e)
268
269 # In case orchestrator didn't tear down MDS daemons cleanly, or
270 # there was no orchestrator, we force the daemons down.
271 if volume_exists(mgr, volname):
272 r, outb, outs = remove_filesystem(mgr, volname)
273 if r != 0:
274 return r, outb, outs
275 else:
276 err = "Filesystem not found for volume '{0}'".format(volname)
277 log.warning(err)
278 return -errno.ENOENT, "", err
279 r, outb, outs = remove_pool(mgr, metadata_pool)
280 if r != 0:
281 return r, outb, outs
282
283 for data_pool in data_pools:
284 r, outb, outs = remove_pool(mgr, data_pool)
285 if r != 0:
286 return r, outb, outs
287 result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools))
288 return r, result_str, ""
289
290 def list_volumes(mgr):
291 """
292 list all filesystem volumes.
293
294 :param: None
295 :return: None
296 """
297 result = []
298 fs_map = mgr.get("fs_map")
299 for f in fs_map['filesystems']:
300 result.append({'name': f['mdsmap']['fs_name']})
301 return result
302
303 @contextmanager
304 def open_volume(vc, volname):
305 """
306 open a volume for exclusive access. This API is to be used as a context manager.
307
308 :param vc: volume client instance
309 :param volname: volume name
310 :return: yields a volume handle (ceph filesystem handle)
311 """
312 if vc.is_stopping():
313 raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
314
315 g_lock = GlobalLock()
316 fs_handle = vc.connection_pool.get_fs_handle(volname)
317 try:
318 with g_lock.lock_op():
319 yield fs_handle
320 finally:
321 vc.connection_pool.put_fs_handle(volname)
322
323 @contextmanager
324 def open_volume_lockless(vc, volname):
325 """
326 open a volume with shared access. This API is to be used as a context manager.
327
328 :param vc: volume client instance
329 :param volname: volume name
330 :return: yields a volume handle (ceph filesystem handle)
331 """
332 if vc.is_stopping():
333 raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress")
334
335 fs_handle = vc.connection_pool.get_fs_handle(volname)
336 try:
337 yield fs_handle
338 finally:
339 vc.connection_pool.put_fs_handle(volname)