5 from threading
import Lock
8 from threading
import _Timer
as Timer
11 from threading
import Timer
16 from .subvolspec
import SubvolumeSpec
17 from .subvolume
import SubVolume
18 from .exception
import VolumeException
19 from .purge_queue
import ThreadPoolPurgeQueueMixin
21 log
= logging
.getLogger(__name__
)
23 class ConnectionPool(object):
24 class Connection(object):
25 def __init__(self
, mgr
, fs_name
):
28 self
.fs_name
= fs_name
29 self
.ops_in_progress
= 0
30 self
.last_used
= time
.time()
31 self
.fs_id
= self
.get_fs_id()
34 fs_map
= self
.mgr
.get('fs_map')
35 for fs
in fs_map
['filesystems']:
36 if fs
['mdsmap']['fs_name'] == self
.fs_name
:
38 raise VolumeException(
39 -errno
.ENOENT
, "Volume '{0}' not found".format(self
.fs_name
))
41 def get_fs_handle(self
):
42 self
.last_used
= time
.time()
43 self
.ops_in_progress
+= 1
46 def put_fs_handle(self
):
47 assert self
.ops_in_progress
> 0
48 self
.ops_in_progress
-= 1
50 def del_fs_handle(self
):
51 if self
.is_connection_valid():
56 def is_connection_valid(self
):
59 fs_id
= self
.get_fs_id()
61 # the filesystem does not exist now -- connection is not valid.
63 return self
.fs_id
== fs_id
65 def is_connection_idle(self
, timeout
):
66 return (self
.ops_in_progress
== 0 and ((time
.time() - self
.last_used
) >= timeout
))
69 assert self
.ops_in_progress
== 0
70 log
.debug("Connecting to cephfs '{0}'".format(self
.fs_name
))
71 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.mgr
.rados
)
72 log
.debug("Setting user ID and group ID of CephFS mount as root...")
73 self
.fs
.conf_set("client_mount_uid", "0")
74 self
.fs
.conf_set("client_mount_gid", "0")
75 log
.debug("CephFS initializing...")
77 log
.debug("CephFS mounting...")
78 self
.fs
.mount(filesystem_name
=self
.fs_name
.encode('utf-8'))
79 log
.debug("Connection to cephfs '{0}' complete".format(self
.fs_name
))
82 assert self
.ops_in_progress
== 0
83 log
.info("disconnecting from cephfs '{0}'".format(self
.fs_name
))
88 assert self
.ops_in_progress
== 0
89 log
.info("aborting connection from cephfs '{0}'".format(self
.fs_name
))
95 recurring timer variant of Timer
98 while not self
.finished
.is_set():
99 self
.finished
.wait(self
.interval
)
100 self
.function(*self
.args
, **self
.kwargs
)
103 # TODO: make this configurable
104 TIMER_TASK_RUN_INTERVAL
= 30.0 # seconds
105 CONNECTION_IDLE_INTERVAL
= 60.0 # seconds
107 def __init__(self
, mgr
):
109 self
.connections
= {}
111 self
.timer_task
= ConnectionPool
.RTimer(ConnectionPool
.TIMER_TASK_RUN_INTERVAL
,
112 self
.cleanup_connections
)
113 self
.timer_task
.start()
115 def cleanup_connections(self
):
117 log
.info("scanning for idle connections..")
118 idle_fs
= [fs_name
for fs_name
,conn
in self
.connections
.iteritems()
119 if conn
.is_connection_idle(ConnectionPool
.CONNECTION_IDLE_INTERVAL
)]
120 for fs_name
in idle_fs
:
121 log
.info("cleaning up connection for '{}'".format(fs_name
))
122 self
._del
_fs
_handle
(fs_name
)
124 def get_fs_handle(self
, fs_name
):
128 conn
= self
.connections
.get(fs_name
, None)
130 if conn
.is_connection_valid():
131 return conn
.get_fs_handle()
133 # filesystem id changed beneath us (or the filesystem does not exist).
134 # this is possible if the filesystem got removed (and recreated with
135 # same name) via "ceph fs rm/new" mon command.
136 log
.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name
))
137 self
._del
_fs
_handle
(fs_name
)
138 conn
= ConnectionPool
.Connection(self
.mgr
, fs_name
)
140 except cephfs
.Error
as e
:
141 # try to provide a better error string if possible
142 if e
.args
[0] == errno
.ENOENT
:
143 raise VolumeException(
144 -errno
.ENOENT
, "Volume '{0}' not found".format(fs_name
))
145 raise VolumeException(-e
.args
[0], e
.args
[1])
146 self
.connections
[fs_name
] = conn
147 return conn
.get_fs_handle()
149 def put_fs_handle(self
, fs_name
):
151 conn
= self
.connections
.get(fs_name
, None)
155 def _del_fs_handle(self
, fs_name
):
156 conn
= self
.connections
.pop(fs_name
, None)
159 def del_fs_handle(self
, fs_name
):
161 self
._del
_fs
_handle
(fs_name
)
163 class VolumeClient(object):
164 def __init__(self
, mgr
):
166 self
.connection_pool
= ConnectionPool(self
.mgr
)
167 # TODO: make thread pool size configurable
168 self
.purge_queue
= ThreadPoolPurgeQueueMixin(self
, 4)
169 # on startup, queue purge job for available volumes to kickstart
170 # purge for leftover subvolume entries in trash. note that, if the
171 # trash directory does not exist or if there are no purge entries
172 # available for a volume, the volume is removed from the purge
174 fs_map
= self
.mgr
.get('fs_map')
175 for fs
in fs_map
['filesystems']:
176 self
.purge_queue
.queue_purge_job(fs
['mdsmap']['fs_name'])
178 def gen_pool_names(self
, volname
):
180 return metadata and data pool name (from a filesystem/volume name) as a tuple
182 return "cephfs.{}.meta".format(volname
), "cephfs.{}.data".format(volname
)
184 def get_fs(self
, fs_name
):
185 fs_map
= self
.mgr
.get('fs_map')
186 for fs
in fs_map
['filesystems']:
187 if fs
['mdsmap']['fs_name'] == fs_name
:
191 def get_mds_names(self
, fs_name
):
192 fs
= self
.get_fs(fs_name
)
195 return [mds
['name'] for mds
in fs
['mdsmap']['info'].values()]
197 def volume_exists(self
, volname
):
198 return self
.get_fs(volname
) is not None
200 def volume_exception_to_retval(self
, ve
):
202 return a tuple representation from a volume exception
206 def create_pool(self
, pool_name
, pg_num
):
207 # create the given pool
208 command
= {'prefix': 'osd pool create', 'pool': pool_name
, 'pg_num': pg_num
}
209 r
, outb
, outs
= self
.mgr
.mon_command(command
)
215 def remove_pool(self
, pool_name
):
216 command
= {'prefix': 'osd pool rm', 'pool': pool_name
, 'pool2': pool_name
,
217 'yes_i_really_really_mean_it': True}
218 return self
.mgr
.mon_command(command
)
220 def create_filesystem(self
, fs_name
, metadata_pool
, data_pool
):
221 command
= {'prefix': 'fs new', 'fs_name': fs_name
, 'metadata': metadata_pool
,
223 return self
.mgr
.mon_command(command
)
225 def remove_filesystem(self
, fs_name
):
226 command
= {'prefix': 'fs fail', 'fs_name': fs_name
}
227 r
, outb
, outs
= self
.mgr
.mon_command(command
)
230 command
= {'prefix': 'fs rm', 'fs_name': fs_name
, 'yes_i_really_mean_it': True}
231 return self
.mgr
.mon_command(command
)
233 def create_mds(self
, fs_name
):
234 spec
= orchestrator
.StatelessServiceSpec()
237 completion
= self
.mgr
.add_stateless_service("mds", spec
)
238 self
.mgr
._orchestrator
_wait
([completion
])
239 orchestrator
.raise_if_exception(completion
)
240 except (ImportError, orchestrator
.OrchestratorError
):
241 return 0, "", "Volume created successfully (no MDS daemons created)"
242 except Exception as e
:
243 # Don't let detailed orchestrator exceptions (python backtraces)
244 # bubble out to the user
245 log
.exception("Failed to create MDS daemons")
246 return -errno
.EINVAL
, "", str(e
)
249 ### volume operations -- create, rm, ls
251 def create_volume(self
, volname
, size
=None):
253 create volume (pool, filesystem and mds)
255 metadata_pool
, data_pool
= self
.gen_pool_names(volname
)
257 r
, outs
, outb
= self
.create_pool(metadata_pool
, 16)
260 r
, outb
, outs
= self
.create_pool(data_pool
, 8)
264 r
, outb
, outs
= self
.create_filesystem(volname
, metadata_pool
, data_pool
)
266 log
.error("Filesystem creation error: {0} {1} {2}".format(r
, outb
, outs
))
269 return self
.create_mds(volname
)
271 def delete_volume(self
, volname
):
273 delete the given module (tear down mds, remove filesystem)
275 self
.purge_queue
.cancel_purge_job(volname
)
276 self
.connection_pool
.del_fs_handle(volname
)
277 # Tear down MDS daemons
279 completion
= self
.mgr
.remove_stateless_service("mds", volname
)
280 self
.mgr
._orchestrator
_wait
([completion
])
281 orchestrator
.raise_if_exception(completion
)
282 except (ImportError, orchestrator
.OrchestratorError
):
283 log
.warning("OrchestratorError, not tearing down MDS daemons")
284 except Exception as e
:
285 # Don't let detailed orchestrator exceptions (python backtraces)
286 # bubble out to the user
287 log
.exception("Failed to tear down MDS daemons")
288 return -errno
.EINVAL
, "", str(e
)
290 # In case orchestrator didn't tear down MDS daemons cleanly, or
291 # there was no orchestrator, we force the daemons down.
292 if self
.volume_exists(volname
):
293 r
, outb
, outs
= self
.remove_filesystem(volname
)
297 log
.warning("Filesystem already gone for volume '{0}'".format(volname
))
298 metadata_pool
, data_pool
= self
.gen_pool_names(volname
)
299 r
, outb
, outs
= self
.remove_pool(metadata_pool
)
302 return self
.remove_pool(data_pool
)
304 def list_volumes(self
):
306 fs_map
= self
.mgr
.get("fs_map")
307 for f
in fs_map
['filesystems']:
308 result
.append({'name': f
['mdsmap']['fs_name']})
309 return 0, json
.dumps(result
, indent
=2), ""
311 def group_exists(self
, sv
, spec
):
312 # default group need not be explicitly created (as it gets created
313 # at the time of subvolume, snapshot and other create operations).
314 return spec
.is_default_group() or sv
.get_group_path(spec
)
317 def octal_str_to_decimal_int(mode
):
321 raise VolumeException(-errno
.EINVAL
, "Invalid mode '{0}'".format(mode
))
323 def connection_pool_wrap(func
):
325 decorator that wraps subvolume calls by fetching filesystem handle
326 from the connection pool when fs_handle argument is empty, otherwise
327 just invoke func with the passed in filesystem handle. Also handles
328 call made to non-existent volumes (only when fs_handle is empty).
330 def conn_wrapper(self
, fs_handle
, **kwargs
):
332 fs_name
= kwargs
['vol_name']
333 # note that force arg is available for remove type commands
334 force
= kwargs
.get('force', False)
336 # fetch the connection from the pool
339 fs_h
= self
.connection_pool
.get_fs_handle(fs_name
)
340 except VolumeException
as ve
:
342 return self
.volume_exception_to_retval(ve
)
345 # invoke the actual routine w/ fs handle
346 result
= func(self
, fs_h
, **kwargs
)
348 # hand over the connection back to the pool
350 self
.connection_pool
.put_fs_handle(fs_name
)
354 ### subvolume operations
356 @connection_pool_wrap
357 def create_subvolume(self
, fs_handle
, **kwargs
):
359 volname
= kwargs
['vol_name']
360 subvolname
= kwargs
['sub_name']
361 groupname
= kwargs
['group_name']
362 size
= kwargs
['size']
363 pool
= kwargs
['pool_layout']
364 mode
= kwargs
['mode']
367 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
368 spec
= SubvolumeSpec(subvolname
, groupname
)
369 if not self
.group_exists(sv
, spec
):
370 raise VolumeException(
371 -errno
.ENOENT
, "Subvolume group '{0}' not found, create it with " \
372 "`ceph fs subvolumegroup create` before creating subvolumes".format(groupname
))
373 sv
.create_subvolume(spec
, size
, pool
=pool
, mode
=self
.octal_str_to_decimal_int(mode
))
374 except VolumeException
as ve
:
375 ret
= self
.volume_exception_to_retval(ve
)
378 @connection_pool_wrap
379 def remove_subvolume(self
, fs_handle
, **kwargs
):
381 volname
= kwargs
['vol_name']
382 subvolname
= kwargs
['sub_name']
383 groupname
= kwargs
['group_name']
384 force
= kwargs
['force']
386 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
387 spec
= SubvolumeSpec(subvolname
, groupname
)
388 if self
.group_exists(sv
, spec
):
389 sv
.remove_subvolume(spec
, force
)
390 self
.purge_queue
.queue_purge_job(volname
)
392 raise VolumeException(
393 -errno
.ENOENT
, "Subvolume group '{0}' not found, cannot remove " \
394 "subvolume '{1}'".format(groupname
, subvolname
))
395 except VolumeException
as ve
:
396 ret
= self
.volume_exception_to_retval(ve
)
399 @connection_pool_wrap
400 def subvolume_getpath(self
, fs_handle
, **kwargs
):
402 volname
= kwargs
['vol_name']
403 subvolname
= kwargs
['sub_name']
404 groupname
= kwargs
['group_name']
406 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
407 spec
= SubvolumeSpec(subvolname
, groupname
)
408 if not self
.group_exists(sv
, spec
):
409 raise VolumeException(
410 -errno
.ENOENT
, "Subvolume group '{0}' not found".format(groupname
))
411 path
= sv
.get_subvolume_path(spec
)
413 raise VolumeException(
414 -errno
.ENOENT
, "Subvolume '{0}' not found".format(subvolname
))
416 except VolumeException
as ve
:
417 ret
= self
.volume_exception_to_retval(ve
)
420 ### subvolume snapshot
422 @connection_pool_wrap
423 def create_subvolume_snapshot(self
, fs_handle
, **kwargs
):
425 volname
= kwargs
['vol_name']
426 subvolname
= kwargs
['sub_name']
427 snapname
= kwargs
['snap_name']
428 groupname
= kwargs
['group_name']
431 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
432 spec
= SubvolumeSpec(subvolname
, groupname
)
433 if not self
.group_exists(sv
, spec
):
434 raise VolumeException(
435 -errno
.ENOENT
, "Subvolume group '{0}' not found, cannot create " \
436 "snapshot '{1}'".format(groupname
, snapname
))
437 if not sv
.get_subvolume_path(spec
):
438 raise VolumeException(
439 -errno
.ENOENT
, "Subvolume '{0}' not found, cannot create snapshot " \
440 "'{1}'".format(subvolname
, snapname
))
441 sv
.create_subvolume_snapshot(spec
, snapname
)
442 except VolumeException
as ve
:
443 ret
= self
.volume_exception_to_retval(ve
)
446 @connection_pool_wrap
447 def remove_subvolume_snapshot(self
, fs_handle
, **kwargs
):
449 volname
= kwargs
['vol_name']
450 subvolname
= kwargs
['sub_name']
451 snapname
= kwargs
['snap_name']
452 groupname
= kwargs
['group_name']
453 force
= kwargs
['force']
455 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
456 spec
= SubvolumeSpec(subvolname
, groupname
)
457 if self
.group_exists(sv
, spec
):
458 if sv
.get_subvolume_path(spec
):
459 sv
.remove_subvolume_snapshot(spec
, snapname
, force
)
461 raise VolumeException(
462 -errno
.ENOENT
, "Subvolume '{0}' not found, cannot remove " \
463 "subvolume snapshot '{1}'".format(subvolname
, snapname
))
465 raise VolumeException(
466 -errno
.ENOENT
, "Subvolume group '{0}' already removed, cannot " \
467 "remove subvolume snapshot '{1}'".format(groupname
, snapname
))
468 except VolumeException
as ve
:
469 ret
= self
.volume_exception_to_retval(ve
)
474 @connection_pool_wrap
475 def create_subvolume_group(self
, fs_handle
, **kwargs
):
477 volname
= kwargs
['vol_name']
478 groupname
= kwargs
['group_name']
479 pool
= kwargs
['pool_layout']
480 mode
= kwargs
['mode']
483 # TODO: validate that subvol size fits in volume size
484 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
485 spec
= SubvolumeSpec("", groupname
)
486 sv
.create_group(spec
, pool
=pool
, mode
=self
.octal_str_to_decimal_int(mode
))
487 except VolumeException
as ve
:
488 ret
= self
.volume_exception_to_retval(ve
)
491 @connection_pool_wrap
492 def remove_subvolume_group(self
, fs_handle
, **kwargs
):
494 volname
= kwargs
['vol_name']
495 groupname
= kwargs
['group_name']
496 force
= kwargs
['force']
498 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
499 # TODO: check whether there are no subvolumes in the group
500 spec
= SubvolumeSpec("", groupname
)
501 sv
.remove_group(spec
, force
)
502 except VolumeException
as ve
:
503 ret
= self
.volume_exception_to_retval(ve
)
506 @connection_pool_wrap
507 def getpath_subvolume_group(self
, fs_handle
, **kwargs
):
508 groupname
= kwargs
['group_name']
510 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
511 spec
= SubvolumeSpec("", groupname
)
512 path
= sv
.get_group_path(spec
)
514 raise VolumeException(
515 -errno
.ENOENT
, "Subvolume group '{0}' not found".format(groupname
))
517 except VolumeException
as ve
:
518 return self
.volume_exception_to_retval(ve
)
522 @connection_pool_wrap
523 def create_subvolume_group_snapshot(self
, fs_handle
, **kwargs
):
525 volname
= kwargs
['vol_name']
526 groupname
= kwargs
['group_name']
527 snapname
= kwargs
['snap_name']
529 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
530 spec
= SubvolumeSpec("", groupname
)
531 if not self
.group_exists(sv
, spec
):
532 raise VolumeException(
533 -errno
.ENOENT
, "Subvolume group '{0}' not found, cannot create " \
534 "snapshot '{1}'".format(groupname
, snapname
))
535 sv
.create_group_snapshot(spec
, snapname
)
536 except VolumeException
as ve
:
537 ret
= self
.volume_exception_to_retval(ve
)
540 @connection_pool_wrap
541 def remove_subvolume_group_snapshot(self
, fs_handle
, **kwargs
):
543 volname
= kwargs
['vol_name']
544 groupname
= kwargs
['group_name']
545 snapname
= kwargs
['snap_name']
546 force
= kwargs
['force']
548 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
549 spec
= SubvolumeSpec("", groupname
)
550 if self
.group_exists(sv
, spec
):
551 sv
.remove_group_snapshot(spec
, snapname
, force
)
553 raise VolumeException(
554 -errno
.ENOENT
, "Subvolume group '{0}' not found, cannot " \
555 "remove it".format(groupname
))
556 except VolumeException
as ve
:
557 ret
= self
.volume_exception_to_retval(ve
)
560 @connection_pool_wrap
561 def get_subvolume_trash_entry(self
, fs_handle
, **kwargs
):
563 volname
= kwargs
['vol_name']
564 exclude
= kwargs
.get('exclude_entries', [])
567 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
568 spec
= SubvolumeSpec("", "")
569 path
= sv
.get_trash_entry(spec
, exclude
)
571 except VolumeException
as ve
:
572 ret
= self
.volume_exception_to_retval(ve
)
575 @connection_pool_wrap
576 def purge_subvolume_trash_entry(self
, fs_handle
, **kwargs
):
578 volname
= kwargs
['vol_name']
579 purge_dir
= kwargs
['purge_dir']
580 should_cancel
= kwargs
.get('should_cancel', lambda: False)
583 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
584 spec
= SubvolumeSpec(purge_dir
.decode('utf-8'), "")
585 sv
.purge_subvolume(spec
, should_cancel
)
586 except VolumeException
as ve
:
587 ret
= self
.volume_exception_to_retval(ve
)