]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/volume.py
8e32a233b4f786c3f9c23b25136d3e8cd3241c82
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / volume.py
1 import json
2 import time
3 import errno
4 import logging
5 from threading import Lock
6 try:
7 # py2
8 from threading import _Timer as Timer
9 except ImportError:
10 #py3
11 from threading import Timer
12
13 import cephfs
14 import orchestrator
15
16 from .subvolspec import SubvolumeSpec
17 from .subvolume import SubVolume
18 from .exception import VolumeException
19 from .purge_queue import ThreadPoolPurgeQueueMixin
20
21 log = logging.getLogger(__name__)
22
23 class ConnectionPool(object):
24 class Connection(object):
25 def __init__(self, mgr, fs_name):
26 self.fs = None
27 self.mgr = mgr
28 self.fs_name = fs_name
29 self.ops_in_progress = 0
30 self.last_used = time.time()
31 self.fs_id = self.get_fs_id()
32
33 def get_fs_id(self):
34 fs_map = self.mgr.get('fs_map')
35 for fs in fs_map['filesystems']:
36 if fs['mdsmap']['fs_name'] == self.fs_name:
37 return fs['id']
38 raise VolumeException(
39 -errno.ENOENT, "Volume '{0}' not found".format(self.fs_name))
40
41 def get_fs_handle(self):
42 self.last_used = time.time()
43 self.ops_in_progress += 1
44 return self.fs
45
46 def put_fs_handle(self):
47 assert self.ops_in_progress > 0
48 self.ops_in_progress -= 1
49
50 def del_fs_handle(self):
51 if self.is_connection_valid():
52 self.disconnect()
53 else:
54 self.abort()
55
56 def is_connection_valid(self):
57 fs_id = None
58 try:
59 fs_id = self.get_fs_id()
60 except:
61 # the filesystem does not exist now -- connection is not valid.
62 pass
63 return self.fs_id == fs_id
64
65 def is_connection_idle(self, timeout):
66 return (self.ops_in_progress == 0 and ((time.time() - self.last_used) >= timeout))
67
68 def connect(self):
69 assert self.ops_in_progress == 0
70 log.debug("Connecting to cephfs '{0}'".format(self.fs_name))
71 self.fs = cephfs.LibCephFS(rados_inst=self.mgr.rados)
72 log.debug("Setting user ID and group ID of CephFS mount as root...")
73 self.fs.conf_set("client_mount_uid", "0")
74 self.fs.conf_set("client_mount_gid", "0")
75 log.debug("CephFS initializing...")
76 self.fs.init()
77 log.debug("CephFS mounting...")
78 self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
79 log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
80
81 def disconnect(self):
82 assert self.ops_in_progress == 0
83 log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
84 self.fs.shutdown()
85 self.fs = None
86
87 def abort(self):
88 assert self.ops_in_progress == 0
89 log.info("aborting connection from cephfs '{0}'".format(self.fs_name))
90 self.fs.abort_conn()
91 self.fs = None
92
93 class RTimer(Timer):
94 """
95 recurring timer variant of Timer
96 """
97 def run(self):
98 while not self.finished.is_set():
99 self.finished.wait(self.interval)
100 self.function(*self.args, **self.kwargs)
101 self.finished.set()
102
103 # TODO: make this configurable
104 TIMER_TASK_RUN_INTERVAL = 30.0 # seconds
105 CONNECTION_IDLE_INTERVAL = 60.0 # seconds
106
107 def __init__(self, mgr):
108 self.mgr = mgr
109 self.connections = {}
110 self.lock = Lock()
111 self.timer_task = ConnectionPool.RTimer(ConnectionPool.TIMER_TASK_RUN_INTERVAL,
112 self.cleanup_connections)
113 self.timer_task.start()
114
115 def cleanup_connections(self):
116 with self.lock:
117 log.info("scanning for idle connections..")
118 idle_fs = [fs_name for fs_name,conn in self.connections.iteritems()
119 if conn.is_connection_idle(ConnectionPool.CONNECTION_IDLE_INTERVAL)]
120 for fs_name in idle_fs:
121 log.info("cleaning up connection for '{}'".format(fs_name))
122 self._del_fs_handle(fs_name)
123
124 def get_fs_handle(self, fs_name):
125 with self.lock:
126 conn = None
127 try:
128 conn = self.connections.get(fs_name, None)
129 if conn:
130 if conn.is_connection_valid():
131 return conn.get_fs_handle()
132 else:
133 # filesystem id changed beneath us (or the filesystem does not exist).
134 # this is possible if the filesystem got removed (and recreated with
135 # same name) via "ceph fs rm/new" mon command.
136 log.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name))
137 self._del_fs_handle(fs_name)
138 conn = ConnectionPool.Connection(self.mgr, fs_name)
139 conn.connect()
140 except cephfs.Error as e:
141 # try to provide a better error string if possible
142 if e.args[0] == errno.ENOENT:
143 raise VolumeException(
144 -errno.ENOENT, "Volume '{0}' not found".format(fs_name))
145 raise VolumeException(-e.args[0], e.args[1])
146 self.connections[fs_name] = conn
147 return conn.get_fs_handle()
148
149 def put_fs_handle(self, fs_name):
150 with self.lock:
151 conn = self.connections.get(fs_name, None)
152 if conn:
153 conn.put_fs_handle()
154
155 def _del_fs_handle(self, fs_name):
156 conn = self.connections.pop(fs_name, None)
157 if conn:
158 conn.del_fs_handle()
159 def del_fs_handle(self, fs_name):
160 with self.lock:
161 self._del_fs_handle(fs_name)
162
163 class VolumeClient(object):
164 def __init__(self, mgr):
165 self.mgr = mgr
166 self.connection_pool = ConnectionPool(self.mgr)
167 # TODO: make thread pool size configurable
168 self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
169 # on startup, queue purge job for available volumes to kickstart
170 # purge for leftover subvolume entries in trash. note that, if the
171 # trash directory does not exist or if there are no purge entries
172 # available for a volume, the volume is removed from the purge
173 # job list.
174 fs_map = self.mgr.get('fs_map')
175 for fs in fs_map['filesystems']:
176 self.purge_queue.queue_purge_job(fs['mdsmap']['fs_name'])
177
178 def gen_pool_names(self, volname):
179 """
180 return metadata and data pool name (from a filesystem/volume name) as a tuple
181 """
182 return "cephfs.{}.meta".format(volname), "cephfs.{}.data".format(volname)
183
184 def get_fs(self, fs_name):
185 fs_map = self.mgr.get('fs_map')
186 for fs in fs_map['filesystems']:
187 if fs['mdsmap']['fs_name'] == fs_name:
188 return fs
189 return None
190
191 def get_mds_names(self, fs_name):
192 fs = self.get_fs(fs_name)
193 if fs is None:
194 return []
195 return [mds['name'] for mds in fs['mdsmap']['info'].values()]
196
197 def volume_exists(self, volname):
198 return self.get_fs(volname) is not None
199
200 def volume_exception_to_retval(self, ve):
201 """
202 return a tuple representation from a volume exception
203 """
204 return ve.to_tuple()
205
206 def create_pool(self, pool_name, pg_num):
207 # create the given pool
208 command = {'prefix': 'osd pool create', 'pool': pool_name, 'pg_num': pg_num}
209 r, outb, outs = self.mgr.mon_command(command)
210 if r != 0:
211 return r, outb, outs
212
213 return r, outb, outs
214
215 def remove_pool(self, pool_name):
216 command = {'prefix': 'osd pool rm', 'pool': pool_name, 'pool2': pool_name,
217 'yes_i_really_really_mean_it': True}
218 return self.mgr.mon_command(command)
219
220 def create_filesystem(self, fs_name, metadata_pool, data_pool):
221 command = {'prefix': 'fs new', 'fs_name': fs_name, 'metadata': metadata_pool,
222 'data': data_pool}
223 return self.mgr.mon_command(command)
224
225 def remove_filesystem(self, fs_name):
226 command = {'prefix': 'fs fail', 'fs_name': fs_name}
227 r, outb, outs = self.mgr.mon_command(command)
228 if r != 0:
229 return r, outb, outs
230 command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True}
231 return self.mgr.mon_command(command)
232
233 def create_mds(self, fs_name):
234 spec = orchestrator.StatelessServiceSpec()
235 spec.name = fs_name
236 try:
237 completion = self.mgr.add_stateless_service("mds", spec)
238 self.mgr._orchestrator_wait([completion])
239 orchestrator.raise_if_exception(completion)
240 except (ImportError, orchestrator.OrchestratorError):
241 return 0, "", "Volume created successfully (no MDS daemons created)"
242 except Exception as e:
243 # Don't let detailed orchestrator exceptions (python backtraces)
244 # bubble out to the user
245 log.exception("Failed to create MDS daemons")
246 return -errno.EINVAL, "", str(e)
247 return 0, "", ""
248
249 ### volume operations -- create, rm, ls
250
251 def create_volume(self, volname, size=None):
252 """
253 create volume (pool, filesystem and mds)
254 """
255 metadata_pool, data_pool = self.gen_pool_names(volname)
256 # create pools
257 r, outs, outb = self.create_pool(metadata_pool, 16)
258 if r != 0:
259 return r, outb, outs
260 r, outb, outs = self.create_pool(data_pool, 8)
261 if r != 0:
262 return r, outb, outs
263 # create filesystem
264 r, outb, outs = self.create_filesystem(volname, metadata_pool, data_pool)
265 if r != 0:
266 log.error("Filesystem creation error: {0} {1} {2}".format(r, outb, outs))
267 return r, outb, outs
268 # create mds
269 return self.create_mds(volname)
270
271 def delete_volume(self, volname):
272 """
273 delete the given module (tear down mds, remove filesystem)
274 """
275 self.purge_queue.cancel_purge_job(volname)
276 self.connection_pool.del_fs_handle(volname)
277 # Tear down MDS daemons
278 try:
279 completion = self.mgr.remove_stateless_service("mds", volname)
280 self.mgr._orchestrator_wait([completion])
281 orchestrator.raise_if_exception(completion)
282 except (ImportError, orchestrator.OrchestratorError):
283 log.warning("OrchestratorError, not tearing down MDS daemons")
284 except Exception as e:
285 # Don't let detailed orchestrator exceptions (python backtraces)
286 # bubble out to the user
287 log.exception("Failed to tear down MDS daemons")
288 return -errno.EINVAL, "", str(e)
289
290 # In case orchestrator didn't tear down MDS daemons cleanly, or
291 # there was no orchestrator, we force the daemons down.
292 if self.volume_exists(volname):
293 r, outb, outs = self.remove_filesystem(volname)
294 if r != 0:
295 return r, outb, outs
296 else:
297 log.warning("Filesystem already gone for volume '{0}'".format(volname))
298 metadata_pool, data_pool = self.gen_pool_names(volname)
299 r, outb, outs = self.remove_pool(metadata_pool)
300 if r != 0:
301 return r, outb, outs
302 return self.remove_pool(data_pool)
303
304 def list_volumes(self):
305 result = []
306 fs_map = self.mgr.get("fs_map")
307 for f in fs_map['filesystems']:
308 result.append({'name': f['mdsmap']['fs_name']})
309 return 0, json.dumps(result, indent=2), ""
310
311 def group_exists(self, sv, spec):
312 # default group need not be explicitly created (as it gets created
313 # at the time of subvolume, snapshot and other create operations).
314 return spec.is_default_group() or sv.get_group_path(spec)
315
316 @staticmethod
317 def octal_str_to_decimal_int(mode):
318 try:
319 return int(mode, 8)
320 except ValueError:
321 raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
322
323 def connection_pool_wrap(func):
324 """
325 decorator that wraps subvolume calls by fetching filesystem handle
326 from the connection pool when fs_handle argument is empty, otherwise
327 just invoke func with the passed in filesystem handle. Also handles
328 call made to non-existent volumes (only when fs_handle is empty).
329 """
330 def conn_wrapper(self, fs_handle, **kwargs):
331 fs_h = fs_handle
332 fs_name = kwargs['vol_name']
333 # note that force arg is available for remove type commands
334 force = kwargs.get('force', False)
335
336 # fetch the connection from the pool
337 if not fs_handle:
338 try:
339 fs_h = self.connection_pool.get_fs_handle(fs_name)
340 except VolumeException as ve:
341 if not force:
342 return self.volume_exception_to_retval(ve)
343 return 0, "", ""
344
345 # invoke the actual routine w/ fs handle
346 result = func(self, fs_h, **kwargs)
347
348 # hand over the connection back to the pool
349 if fs_h:
350 self.connection_pool.put_fs_handle(fs_name)
351 return result
352 return conn_wrapper
353
354 ### subvolume operations
355
356 @connection_pool_wrap
357 def create_subvolume(self, fs_handle, **kwargs):
358 ret = 0, "", ""
359 volname = kwargs['vol_name']
360 subvolname = kwargs['sub_name']
361 groupname = kwargs['group_name']
362 size = kwargs['size']
363 pool = kwargs['pool_layout']
364 mode = kwargs['mode']
365
366 try:
367 with SubVolume(self.mgr, fs_handle) as sv:
368 spec = SubvolumeSpec(subvolname, groupname)
369 if not self.group_exists(sv, spec):
370 raise VolumeException(
371 -errno.ENOENT, "Subvolume group '{0}' not found, create it with " \
372 "`ceph fs subvolumegroup create` before creating subvolumes".format(groupname))
373 sv.create_subvolume(spec, size, pool=pool, mode=self.octal_str_to_decimal_int(mode))
374 except VolumeException as ve:
375 ret = self.volume_exception_to_retval(ve)
376 return ret
377
378 @connection_pool_wrap
379 def remove_subvolume(self, fs_handle, **kwargs):
380 ret = 0, "", ""
381 volname = kwargs['vol_name']
382 subvolname = kwargs['sub_name']
383 groupname = kwargs['group_name']
384 force = kwargs['force']
385 try:
386 with SubVolume(self.mgr, fs_handle) as sv:
387 spec = SubvolumeSpec(subvolname, groupname)
388 if self.group_exists(sv, spec):
389 sv.remove_subvolume(spec, force)
390 self.purge_queue.queue_purge_job(volname)
391 elif not force:
392 raise VolumeException(
393 -errno.ENOENT, "Subvolume group '{0}' not found, cannot remove " \
394 "subvolume '{1}'".format(groupname, subvolname))
395 except VolumeException as ve:
396 ret = self.volume_exception_to_retval(ve)
397 return ret
398
399 @connection_pool_wrap
400 def subvolume_getpath(self, fs_handle, **kwargs):
401 ret = None
402 volname = kwargs['vol_name']
403 subvolname = kwargs['sub_name']
404 groupname = kwargs['group_name']
405 try:
406 with SubVolume(self.mgr, fs_handle) as sv:
407 spec = SubvolumeSpec(subvolname, groupname)
408 if not self.group_exists(sv, spec):
409 raise VolumeException(
410 -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
411 path = sv.get_subvolume_path(spec)
412 if not path:
413 raise VolumeException(
414 -errno.ENOENT, "Subvolume '{0}' not found".format(subvolname))
415 ret = 0, path, ""
416 except VolumeException as ve:
417 ret = self.volume_exception_to_retval(ve)
418 return ret
419
420 ### subvolume snapshot
421
422 @connection_pool_wrap
423 def create_subvolume_snapshot(self, fs_handle, **kwargs):
424 ret = 0, "", ""
425 volname = kwargs['vol_name']
426 subvolname = kwargs['sub_name']
427 snapname = kwargs['snap_name']
428 groupname = kwargs['group_name']
429
430 try:
431 with SubVolume(self.mgr, fs_handle) as sv:
432 spec = SubvolumeSpec(subvolname, groupname)
433 if not self.group_exists(sv, spec):
434 raise VolumeException(
435 -errno.ENOENT, "Subvolume group '{0}' not found, cannot create " \
436 "snapshot '{1}'".format(groupname, snapname))
437 if not sv.get_subvolume_path(spec):
438 raise VolumeException(
439 -errno.ENOENT, "Subvolume '{0}' not found, cannot create snapshot " \
440 "'{1}'".format(subvolname, snapname))
441 sv.create_subvolume_snapshot(spec, snapname)
442 except VolumeException as ve:
443 ret = self.volume_exception_to_retval(ve)
444 return ret
445
446 @connection_pool_wrap
447 def remove_subvolume_snapshot(self, fs_handle, **kwargs):
448 ret = 0, "", ""
449 volname = kwargs['vol_name']
450 subvolname = kwargs['sub_name']
451 snapname = kwargs['snap_name']
452 groupname = kwargs['group_name']
453 force = kwargs['force']
454 try:
455 with SubVolume(self.mgr, fs_handle) as sv:
456 spec = SubvolumeSpec(subvolname, groupname)
457 if self.group_exists(sv, spec):
458 if sv.get_subvolume_path(spec):
459 sv.remove_subvolume_snapshot(spec, snapname, force)
460 elif not force:
461 raise VolumeException(
462 -errno.ENOENT, "Subvolume '{0}' not found, cannot remove " \
463 "subvolume snapshot '{1}'".format(subvolname, snapname))
464 elif not force:
465 raise VolumeException(
466 -errno.ENOENT, "Subvolume group '{0}' already removed, cannot " \
467 "remove subvolume snapshot '{1}'".format(groupname, snapname))
468 except VolumeException as ve:
469 ret = self.volume_exception_to_retval(ve)
470 return ret
471
472 ### group operations
473
474 @connection_pool_wrap
475 def create_subvolume_group(self, fs_handle, **kwargs):
476 ret = 0, "", ""
477 volname = kwargs['vol_name']
478 groupname = kwargs['group_name']
479 pool = kwargs['pool_layout']
480 mode = kwargs['mode']
481
482 try:
483 # TODO: validate that subvol size fits in volume size
484 with SubVolume(self.mgr, fs_handle) as sv:
485 spec = SubvolumeSpec("", groupname)
486 sv.create_group(spec, pool=pool, mode=self.octal_str_to_decimal_int(mode))
487 except VolumeException as ve:
488 ret = self.volume_exception_to_retval(ve)
489 return ret
490
491 @connection_pool_wrap
492 def remove_subvolume_group(self, fs_handle, **kwargs):
493 ret = 0, "", ""
494 volname = kwargs['vol_name']
495 groupname = kwargs['group_name']
496 force = kwargs['force']
497 try:
498 with SubVolume(self.mgr, fs_handle) as sv:
499 # TODO: check whether there are no subvolumes in the group
500 spec = SubvolumeSpec("", groupname)
501 sv.remove_group(spec, force)
502 except VolumeException as ve:
503 ret = self.volume_exception_to_retval(ve)
504 return ret
505
506 @connection_pool_wrap
507 def getpath_subvolume_group(self, fs_handle, **kwargs):
508 groupname = kwargs['group_name']
509 try:
510 with SubVolume(self.mgr, fs_handle) as sv:
511 spec = SubvolumeSpec("", groupname)
512 path = sv.get_group_path(spec)
513 if path is None:
514 raise VolumeException(
515 -errno.ENOENT, "Subvolume group '{0}' not found".format(groupname))
516 return 0, path, ""
517 except VolumeException as ve:
518 return self.volume_exception_to_retval(ve)
519
520 ### group snapshot
521
522 @connection_pool_wrap
523 def create_subvolume_group_snapshot(self, fs_handle, **kwargs):
524 ret = 0, "", ""
525 volname = kwargs['vol_name']
526 groupname = kwargs['group_name']
527 snapname = kwargs['snap_name']
528 try:
529 with SubVolume(self.mgr, fs_handle) as sv:
530 spec = SubvolumeSpec("", groupname)
531 if not self.group_exists(sv, spec):
532 raise VolumeException(
533 -errno.ENOENT, "Subvolume group '{0}' not found, cannot create " \
534 "snapshot '{1}'".format(groupname, snapname))
535 sv.create_group_snapshot(spec, snapname)
536 except VolumeException as ve:
537 ret = self.volume_exception_to_retval(ve)
538 return ret
539
540 @connection_pool_wrap
541 def remove_subvolume_group_snapshot(self, fs_handle, **kwargs):
542 ret = 0, "", ""
543 volname = kwargs['vol_name']
544 groupname = kwargs['group_name']
545 snapname = kwargs['snap_name']
546 force = kwargs['force']
547 try:
548 with SubVolume(self.mgr, fs_handle) as sv:
549 spec = SubvolumeSpec("", groupname)
550 if self.group_exists(sv, spec):
551 sv.remove_group_snapshot(spec, snapname, force)
552 elif not force:
553 raise VolumeException(
554 -errno.ENOENT, "Subvolume group '{0}' not found, cannot " \
555 "remove it".format(groupname))
556 except VolumeException as ve:
557 ret = self.volume_exception_to_retval(ve)
558 return ret
559
560 @connection_pool_wrap
561 def get_subvolume_trash_entry(self, fs_handle, **kwargs):
562 ret = None
563 volname = kwargs['vol_name']
564 exclude = kwargs.get('exclude_entries', [])
565
566 try:
567 with SubVolume(self.mgr, fs_handle) as sv:
568 spec = SubvolumeSpec("", "")
569 path = sv.get_trash_entry(spec, exclude)
570 ret = 0, path, ""
571 except VolumeException as ve:
572 ret = self.volume_exception_to_retval(ve)
573 return ret
574
575 @connection_pool_wrap
576 def purge_subvolume_trash_entry(self, fs_handle, **kwargs):
577 ret = 0, "", ""
578 volname = kwargs['vol_name']
579 purge_dir = kwargs['purge_dir']
580 should_cancel = kwargs.get('should_cancel', lambda: False)
581
582 try:
583 with SubVolume(self.mgr, fs_handle) as sv:
584 spec = SubvolumeSpec(purge_dir.decode('utf-8'), "")
585 sv.purge_subvolume(spec, should_cancel)
586 except VolumeException as ve:
587 ret = self.volume_exception_to_retval(ve)
588 return ret