]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | import time |
2 | import errno | |
3 | import logging | |
9f95a23c TL |
4 | import sys |
5 | ||
f6b5b4d7 TL |
6 | from typing import List |
7 | ||
92f5a8d4 TL |
8 | from contextlib import contextmanager |
9 | from threading import Lock, Condition | |
9f95a23c | 10 | from typing import no_type_check |
92f5a8d4 | 11 | |
9f95a23c | 12 | if sys.version_info >= (3, 3): |
92f5a8d4 | 13 | from threading import Timer |
9f95a23c TL |
14 | else: |
15 | from threading import _Timer as Timer | |
92f5a8d4 TL |
16 | |
17 | import cephfs | |
18 | import orchestrator | |
19 | ||
20 | from .lock import GlobalLock | |
21 | from ..exception import VolumeException | |
22 | from ..fs_util import create_pool, remove_pool, create_filesystem, \ | |
23 | remove_filesystem, create_mds, volume_exists | |
24 | ||
25 | log = logging.getLogger(__name__) | |
26 | ||
27 | class ConnectionPool(object): | |
28 | class Connection(object): | |
29 | def __init__(self, mgr, fs_name): | |
30 | self.fs = None | |
31 | self.mgr = mgr | |
32 | self.fs_name = fs_name | |
33 | self.ops_in_progress = 0 | |
34 | self.last_used = time.time() | |
35 | self.fs_id = self.get_fs_id() | |
36 | ||
37 | def get_fs_id(self): | |
38 | fs_map = self.mgr.get('fs_map') | |
39 | for fs in fs_map['filesystems']: | |
40 | if fs['mdsmap']['fs_name'] == self.fs_name: | |
41 | return fs['id'] | |
42 | raise VolumeException( | |
43 | -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name)) | |
44 | ||
45 | def get_fs_handle(self): | |
46 | self.last_used = time.time() | |
47 | self.ops_in_progress += 1 | |
48 | return self.fs | |
49 | ||
50 | def put_fs_handle(self, notify): | |
51 | assert self.ops_in_progress > 0 | |
52 | self.ops_in_progress -= 1 | |
53 | if self.ops_in_progress == 0: | |
54 | notify() | |
55 | ||
56 | def del_fs_handle(self, waiter): | |
57 | if waiter: | |
58 | while self.ops_in_progress != 0: | |
59 | waiter() | |
60 | if self.is_connection_valid(): | |
61 | self.disconnect() | |
62 | else: | |
63 | self.abort() | |
64 | ||
65 | def is_connection_valid(self): | |
66 | fs_id = None | |
67 | try: | |
68 | fs_id = self.get_fs_id() | |
69 | except: | |
70 | # the filesystem does not exist now -- connection is not valid. | |
71 | pass | |
72 | log.debug("self.fs_id={0}, fs_id={1}".format(self.fs_id, fs_id)) | |
73 | return self.fs_id == fs_id | |
74 | ||
75 | def is_connection_idle(self, timeout): | |
76 | return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout)) | |
77 | ||
78 | def connect(self): | |
79 | assert self.ops_in_progress == 0 | |
80 | log.debug("Connecting to cephfs '{0}'".format(self.fs_name)) | |
81 | self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados) | |
82 | log.debug("Setting user ID and group ID of CephFS mount as root...") | |
83 | self.fs.conf_set("client_mount_uid", "0") | |
84 | self.fs.conf_set("client_mount_gid", "0") | |
85 | log.debug("CephFS initializing...") | |
86 | self.fs.init() | |
87 | log.debug("CephFS mounting...") | |
88 | self.fs.mount(filesystem_name=self.fs_name.encode('utf-8')) | |
89 | log.debug("Connection to cephfs '{0}' complete".format(self.fs_name)) | |
9f95a23c | 90 | self.mgr._ceph_register_client(self.fs.get_addrs()) |
92f5a8d4 TL |
91 | |
92 | def disconnect(self): | |
93 | try: | |
9f95a23c | 94 | assert self.fs |
92f5a8d4 TL |
95 | assert self.ops_in_progress == 0 |
96 | log.info("disconnecting from cephfs '{0}'".format(self.fs_name)) | |
9f95a23c | 97 | addrs = self.fs.get_addrs() |
92f5a8d4 | 98 | self.fs.shutdown() |
9f95a23c | 99 | self.mgr._ceph_unregister_client(addrs) |
92f5a8d4 TL |
100 | self.fs = None |
101 | except Exception as e: | |
102 | log.debug("disconnect: ({0})".format(e)) | |
103 | raise | |
104 | ||
105 | def abort(self): | |
9f95a23c | 106 | assert self.fs |
92f5a8d4 TL |
107 | assert self.ops_in_progress == 0 |
108 | log.info("aborting connection from cephfs '{0}'".format(self.fs_name)) | |
109 | self.fs.abort_conn() | |
110 | log.info("abort done from cephfs '{0}'".format(self.fs_name)) | |
111 | self.fs = None | |
112 | ||
113 | class RTimer(Timer): | |
114 | """ | |
115 | recurring timer variant of Timer | |
116 | """ | |
9f95a23c | 117 | @no_type_check |
92f5a8d4 TL |
118 | def run(self): |
119 | try: | |
120 | while not self.finished.is_set(): | |
121 | self.finished.wait(self.interval) | |
122 | self.function(*self.args, **self.kwargs) | |
123 | self.finished.set() | |
124 | except Exception as e: | |
125 | log.error("ConnectionPool.RTimer: %s", e) | |
126 | raise | |
127 | ||
128 | # TODO: make this configurable | |
129 | TIMER_TASK_RUN_INTERVAL = 30.0 # seconds | |
130 | CONNECTION_IDLE_INTERVAL = 60.0 # seconds | |
131 | ||
132 | def __init__(self, mgr): | |
133 | self.mgr = mgr | |
134 | self.connections = {} | |
135 | self.lock = Lock() | |
136 | self.cond = Condition(self.lock) | |
137 | self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL, | |
138 | self.cleanup_connections) | |
139 | self.timer_task.start() | |
140 | ||
141 | def cleanup_connections(self): | |
142 | with self.lock: | |
143 | log.info("scanning for idle connections..") | |
144 | idle_fs = [fs_name for fs_name,conn in self.connections.items() | |
145 | if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)] | |
146 | for fs_name in idle_fs: | |
147 | log.info("cleaning up connection for '{}'".format(fs_name)) | |
148 | self._del_fs_handle(fs_name) | |
149 | ||
150 | def get_fs_handle(self, fs_name): | |
151 | with self.lock: | |
152 | conn = None | |
153 | try: | |
154 | conn = self.connections.get(fs_name, None) | |
155 | if conn: | |
156 | if conn.is_connection_valid(): | |
157 | return conn.get_fs_handle() | |
158 | else: | |
159 | # filesystem id changed beneath us (or the filesystem does not exist). | |
160 | # this is possible if the filesystem got removed (and recreated with | |
161 | # same name) via "ceph fs rm/new" mon command. | |
162 | log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name)) | |
163 | self._del_fs_handle(fs_name) | |
164 | conn = ConnectionPool.Connection(self.mgr, fs_name) | |
165 | conn.connect() | |
166 | except cephfs.Error as e: | |
167 | # try to provide a better error string if possible | |
168 | if e.args[0] == errno.ENOENT: | |
169 | raise VolumeException( | |
170 | -errno.ENOENT, "Volume '{0}' not found".format(fs_name)) | |
171 | raise VolumeException(-e.args[0], e.args[1]) | |
172 | self.connections[fs_name] = conn | |
173 | return conn.get_fs_handle() | |
174 | ||
175 | def put_fs_handle(self, fs_name): | |
176 | with self.lock: | |
177 | conn = self.connections.get(fs_name, None) | |
178 | if conn: | |
179 | conn.put_fs_handle(notify=lambda: self.cond.notifyAll()) | |
180 | ||
181 | def _del_fs_handle(self, fs_name, wait=False): | |
182 | conn = self.connections.pop(fs_name, None) | |
183 | if conn: | |
184 | conn.del_fs_handle(waiter=None if not wait else lambda: self.cond.wait()) | |
185 | ||
186 | def del_fs_handle(self, fs_name, wait=False): | |
187 | with self.lock: | |
188 | self._del_fs_handle(fs_name, wait) | |
189 | ||
190 | def del_all_handles(self): | |
191 | with self.lock: | |
192 | for fs_name in list(self.connections.keys()): | |
193 | log.info("waiting for pending ops for '{}'".format(fs_name)) | |
194 | self._del_fs_handle(fs_name, wait=True) | |
195 | log.info("pending ops completed for '{}'".format(fs_name)) | |
196 | # no new connections should have been initialized since its | |
197 | # guarded on shutdown. | |
198 | assert len(self.connections) == 0 | |
199 | ||
200 | def gen_pool_names(volname): | |
201 | """ | |
202 | return metadata and data pool name (from a filesystem/volume name) as a tuple | |
203 | """ | |
204 | return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname) | |
205 | ||
f6b5b4d7 TL |
206 | def get_pool_names(mgr, volname): |
207 | """ | |
208 | return metadata and data pools (list) names of volume as a tuple | |
209 | """ | |
210 | fs_map = mgr.get("fs_map") | |
211 | metadata_pool_id = None | |
212 | data_pool_ids = [] # type: List[int] | |
213 | for f in fs_map['filesystems']: | |
214 | if volname == f['mdsmap']['fs_name']: | |
215 | metadata_pool_id = f['mdsmap']['metadata_pool'] | |
216 | data_pool_ids = f['mdsmap']['data_pools'] | |
217 | break | |
218 | if metadata_pool_id is None: | |
219 | return None, None | |
220 | ||
221 | osdmap = mgr.get("osd_map") | |
222 | pools = dict([(p['pool'], p['pool_name']) for p in osdmap['pools']]) | |
223 | metadata_pool = pools[metadata_pool_id] | |
224 | data_pools = [pools[id] for id in data_pool_ids] | |
225 | return metadata_pool, data_pools | |
226 | ||
9f95a23c | 227 | def create_volume(mgr, volname, placement): |
92f5a8d4 TL |
228 | """ |
229 | create volume (pool, filesystem and mds) | |
230 | """ | |
231 | metadata_pool, data_pool = gen_pool_names(volname) | |
232 | # create pools | |
9f95a23c | 233 | r, outs, outb = create_pool(mgr, metadata_pool) |
92f5a8d4 TL |
234 | if r != 0: |
235 | return r, outb, outs | |
9f95a23c | 236 | r, outb, outs = create_pool(mgr, data_pool) |
92f5a8d4 TL |
237 | if r != 0: |
238 | #cleanup | |
9f95a23c | 239 | remove_pool(mgr, metadata_pool) |
92f5a8d4 TL |
240 | return r, outb, outs |
241 | # create filesystem | |
242 | r, outb, outs = create_filesystem(mgr, volname, metadata_pool, data_pool) | |
243 | if r != 0: | |
244 | log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs)) | |
245 | #cleanup | |
9f95a23c TL |
246 | remove_pool(mgr, data_pool) |
247 | remove_pool(mgr, metadata_pool) | |
92f5a8d4 TL |
248 | return r, outb, outs |
249 | # create mds | |
9f95a23c | 250 | return create_mds(mgr, volname, placement) |
92f5a8d4 | 251 | |
f6b5b4d7 | 252 | def delete_volume(mgr, volname, metadata_pool, data_pools): |
92f5a8d4 | 253 | """ |
f6b5b4d7 | 254 | delete the given module (tear down mds, remove filesystem, remove pools) |
92f5a8d4 TL |
255 | """ |
256 | # Tear down MDS daemons | |
257 | try: | |
9f95a23c | 258 | completion = mgr.remove_service('mds.' + volname) |
92f5a8d4 TL |
259 | mgr._orchestrator_wait([completion]) |
260 | orchestrator.raise_if_exception(completion) | |
261 | except (ImportError, orchestrator.OrchestratorError): | |
262 | log.warning("OrchestratorError, not tearing down MDS daemons") | |
263 | except Exception as e: | |
264 | # Don't let detailed orchestrator exceptions (python backtraces) | |
265 | # bubble out to the user | |
266 | log.exception("Failed to tear down MDS daemons") | |
267 | return -errno.EINVAL, "", str(e) | |
268 | ||
269 | # In case orchestrator didn't tear down MDS daemons cleanly, or | |
270 | # there was no orchestrator, we force the daemons down. | |
271 | if volume_exists(mgr, volname): | |
272 | r, outb, outs = remove_filesystem(mgr, volname) | |
273 | if r != 0: | |
274 | return r, outb, outs | |
275 | else: | |
276 | err = "Filesystem not found for volume '{0}'".format(volname) | |
277 | log.warning(err) | |
278 | return -errno.ENOENT, "", err | |
92f5a8d4 TL |
279 | r, outb, outs = remove_pool(mgr, metadata_pool) |
280 | if r != 0: | |
281 | return r, outb, outs | |
f6b5b4d7 TL |
282 | |
283 | for data_pool in data_pools: | |
284 | r, outb, outs = remove_pool(mgr, data_pool) | |
285 | if r != 0: | |
286 | return r, outb, outs | |
287 | result_str = "metadata pool: {0} data pool: {1} removed".format(metadata_pool, str(data_pools)) | |
288 | return r, result_str, "" | |
92f5a8d4 TL |
289 | |
290 | def list_volumes(mgr): | |
291 | """ | |
292 | list all filesystem volumes. | |
293 | ||
294 | :param: None | |
295 | :return: None | |
296 | """ | |
297 | result = [] | |
298 | fs_map = mgr.get("fs_map") | |
299 | for f in fs_map['filesystems']: | |
300 | result.append({'name': f['mdsmap']['fs_name']}) | |
301 | return result | |
302 | ||
303 | @contextmanager | |
304 | def open_volume(vc, volname): | |
305 | """ | |
306 | open a volume for exclusive access. This API is to be used as a context manager. | |
307 | ||
308 | :param vc: volume client instance | |
309 | :param volname: volume name | |
310 | :return: yields a volume handle (ceph filesystem handle) | |
311 | """ | |
312 | if vc.is_stopping(): | |
313 | raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress") | |
314 | ||
315 | g_lock = GlobalLock() | |
316 | fs_handle = vc.connection_pool.get_fs_handle(volname) | |
317 | try: | |
318 | with g_lock.lock_op(): | |
319 | yield fs_handle | |
320 | finally: | |
321 | vc.connection_pool.put_fs_handle(volname) | |
322 | ||
323 | @contextmanager | |
324 | def open_volume_lockless(vc, volname): | |
325 | """ | |
326 | open a volume with shared access. This API is to be used as a context manager. | |
327 | ||
328 | :param vc: volume client instance | |
329 | :param volname: volume name | |
330 | :return: yields a volume handle (ceph filesystem handle) | |
331 | """ | |
332 | if vc.is_stopping(): | |
333 | raise VolumeException(-errno.ESHUTDOWN, "shutdown in progress") | |
334 | ||
335 | fs_handle = vc.connection_pool.get_fs_handle(volname) | |
336 | try: | |
337 | yield fs_handle | |
338 | finally: | |
339 | vc.connection_pool.put_fs_handle(volname) |