4 from threading
import Event
8 from .fs_util
import listdir
10 from .operations
.volume
import ConnectionPool
, open_volume
, create_volume
, \
11 delete_volume
, list_volumes
12 from .operations
.group
import open_group
, create_group
, remove_group
13 from .operations
.subvolume
import open_subvol
, create_subvol
, remove_subvol
, \
16 from .vol_spec
import VolSpec
17 from .exception
import VolumeException
18 from .async_cloner
import Cloner
19 from .purge_queue
import ThreadPoolPurgeQueueMixin
21 log
= logging
.getLogger(__name__
)
23 def octal_str_to_decimal_int(mode
):
27 raise VolumeException(-errno
.EINVAL
, "Invalid mode '{0}'".format(mode
))
29 def name_to_json(names
):
31 convert the list of names to json
34 for i
in range(len(names
)):
35 namedict
.append({'name': names
[i
].decode('utf-8')})
36 return json
.dumps(namedict
, indent
=4, sort_keys
=True)
38 class VolumeClient(object):
39 def __init__(self
, mgr
):
41 self
.stopping
= Event()
42 # volume specification
43 self
.volspec
= VolSpec(mgr
.rados
.conf_get('client_snapdir'))
44 self
.connection_pool
= ConnectionPool(self
.mgr
)
45 # TODO: make thread pool size configurable
46 self
.cloner
= Cloner(self
, 4)
47 self
.purge_queue
= ThreadPoolPurgeQueueMixin(self
, 4)
48 # on startup, queue purge job for available volumes to kickstart
49 # purge for leftover subvolume entries in trash. note that, if the
50 # trash directory does not exist or if there are no purge entries
51 # available for a volume, the volume is removed from the purge
53 fs_map
= self
.mgr
.get('fs_map')
54 for fs
in fs_map
['filesystems']:
55 self
.cloner
.queue_job(fs
['mdsmap']['fs_name'])
56 self
.purge_queue
.queue_job(fs
['mdsmap']['fs_name'])
58 def is_stopping(self
):
59 return self
.stopping
.is_set()
62 log
.info("shutting down")
63 # first, note that we're shutting down
65 # second, ask purge threads to quit
66 self
.purge_queue
.cancel_all_jobs()
67 # third, delete all libcephfs handles from connection pool
68 self
.connection_pool
.del_all_handles()
70 def cluster_log(self
, msg
, lvl
=None):
72 log to cluster log with default log level as WARN.
75 lvl
= self
.mgr
.CLUSTER_LOG_PRIO_WARN
76 self
.mgr
.cluster_log("cluster", lvl
, msg
)
78 def volume_exception_to_retval(self
, ve
):
80 return a tuple representation from a volume exception
84 ### volume operations -- create, rm, ls
86 def create_fs_volume(self
, volname
, placement
):
87 if self
.is_stopping():
88 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
89 return create_volume(self
.mgr
, volname
, placement
)
91 def delete_fs_volume(self
, volname
, confirm
):
92 if self
.is_stopping():
93 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
95 if confirm
!= "--yes-i-really-mean-it":
96 return -errno
.EPERM
, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
97 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
98 "that is what you want, re-issue the command followed by " \
99 "--yes-i-really-mean-it.".format(volname
)
101 self
.purge_queue
.cancel_jobs(volname
)
102 self
.connection_pool
.del_fs_handle(volname
, wait
=True)
103 return delete_volume(self
.mgr
, volname
)
105 def list_fs_volumes(self
):
106 if self
.stopping
.is_set():
107 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
108 volumes
= list_volumes(self
.mgr
)
109 return 0, json
.dumps(volumes
, indent
=4, sort_keys
=True), ""
111 ### subvolume operations
113 def _create_subvolume(self
, fs_handle
, volname
, group
, subvolname
, **kwargs
):
114 size
= kwargs
['size']
115 pool
= kwargs
['pool_layout']
118 mode
= kwargs
['mode']
120 oct_mode
= octal_str_to_decimal_int(mode
)
123 fs_handle
, self
.volspec
, group
, subvolname
, size
, False, pool
, oct_mode
, uid
, gid
)
124 except VolumeException
as ve
:
125 # kick the purge threads for async removal -- note that this
126 # assumes that the subvolume is moved to trashcan for cleanup on error.
127 self
.purge_queue
.queue_job(volname
)
130 def create_subvolume(self
, **kwargs
):
132 volname
= kwargs
['vol_name']
133 subvolname
= kwargs
['sub_name']
134 groupname
= kwargs
['group_name']
137 with
open_volume(self
, volname
) as fs_handle
:
138 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
140 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
):
141 # idempotent creation -- valid.
143 except VolumeException
as ve
:
144 if ve
.errno
== -errno
.ENOENT
:
145 self
._create
_subvolume
(fs_handle
, volname
, group
, subvolname
, **kwargs
)
148 except VolumeException
as ve
:
149 # volume/group does not exist or subvolume creation failed
150 ret
= self
.volume_exception_to_retval(ve
)
153 def remove_subvolume(self
, **kwargs
):
155 volname
= kwargs
['vol_name']
156 subvolname
= kwargs
['sub_name']
157 groupname
= kwargs
['group_name']
158 force
= kwargs
['force']
161 with
open_volume(self
, volname
) as fs_handle
:
162 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
163 remove_subvol(fs_handle
, self
.volspec
, group
, subvolname
, force
)
164 # kick the purge threads for async removal -- note that this
165 # assumes that the subvolume is moved to trash can.
166 # TODO: make purge queue as singleton so that trash can kicks
167 # the purge threads on dump.
168 self
.purge_queue
.queue_job(volname
)
169 except VolumeException
as ve
:
170 if ve
.errno
== -errno
.EAGAIN
:
171 ve
= VolumeException(ve
.errno
, ve
.error_str
+ " (use --force to override)")
172 ret
= self
.volume_exception_to_retval(ve
)
173 elif not (ve
.errno
== -errno
.ENOENT
and force
):
174 ret
= self
.volume_exception_to_retval(ve
)
177 def resize_subvolume(self
, **kwargs
):
179 volname
= kwargs
['vol_name']
180 subvolname
= kwargs
['sub_name']
181 newsize
= kwargs
['new_size']
182 noshrink
= kwargs
['no_shrink']
183 groupname
= kwargs
['group_name']
186 with
open_volume(self
, volname
) as fs_handle
:
187 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
188 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
189 nsize
, usedbytes
= subvolume
.resize(newsize
, noshrink
)
191 [{'bytes_used': usedbytes
},{'bytes_quota': nsize
},
192 {'bytes_pcent': "undefined" if nsize
== 0 else '{0:.2f}'.format((float(usedbytes
) / nsize
) * 100.0)}],
193 indent
=4, sort_keys
=True), ""
194 except VolumeException
as ve
:
195 ret
= self
.volume_exception_to_retval(ve
)
198 def subvolume_getpath(self
, **kwargs
):
200 volname
= kwargs
['vol_name']
201 subvolname
= kwargs
['sub_name']
202 groupname
= kwargs
['group_name']
205 with
open_volume(self
, volname
) as fs_handle
:
206 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
207 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
208 subvolpath
= subvolume
.path
209 ret
= 0, subvolpath
.decode("utf-8"), ""
210 except VolumeException
as ve
:
211 ret
= self
.volume_exception_to_retval(ve
)
214 def list_subvolumes(self
, **kwargs
):
216 volname
= kwargs
['vol_name']
217 groupname
= kwargs
['group_name']
220 with
open_volume(self
, volname
) as fs_handle
:
221 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
222 subvolumes
= group
.list_subvolumes()
223 ret
= 0, name_to_json(subvolumes
), ""
224 except VolumeException
as ve
:
225 ret
= self
.volume_exception_to_retval(ve
)
228 ### subvolume snapshot
230 def create_subvolume_snapshot(self
, **kwargs
):
232 volname
= kwargs
['vol_name']
233 subvolname
= kwargs
['sub_name']
234 snapname
= kwargs
['snap_name']
235 groupname
= kwargs
['group_name']
238 with
open_volume(self
, volname
) as fs_handle
:
239 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
240 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
241 subvolume
.create_snapshot(snapname
)
242 except VolumeException
as ve
:
243 ret
= self
.volume_exception_to_retval(ve
)
246 def remove_subvolume_snapshot(self
, **kwargs
):
248 volname
= kwargs
['vol_name']
249 subvolname
= kwargs
['sub_name']
250 snapname
= kwargs
['snap_name']
251 groupname
= kwargs
['group_name']
252 force
= kwargs
['force']
255 with
open_volume(self
, volname
) as fs_handle
:
256 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
257 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
258 subvolume
.remove_snapshot(snapname
)
259 except VolumeException
as ve
:
260 if not (ve
.errno
== -errno
.ENOENT
and force
):
261 ret
= self
.volume_exception_to_retval(ve
)
264 def list_subvolume_snapshots(self
, **kwargs
):
266 volname
= kwargs
['vol_name']
267 subvolname
= kwargs
['sub_name']
268 groupname
= kwargs
['group_name']
271 with
open_volume(self
, volname
) as fs_handle
:
272 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
273 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
274 snapshots
= subvolume
.list_snapshots()
275 ret
= 0, name_to_json(snapshots
), ""
276 except VolumeException
as ve
:
277 ret
= self
.volume_exception_to_retval(ve
)
280 def protect_subvolume_snapshot(self
, **kwargs
):
282 volname
= kwargs
['vol_name']
283 subvolname
= kwargs
['sub_name']
284 snapname
= kwargs
['snap_name']
285 groupname
= kwargs
['group_name']
288 with
open_volume(self
, volname
) as fs_handle
:
289 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
290 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
291 subvolume
.protect_snapshot(snapname
)
292 except VolumeException
as ve
:
293 ret
= self
.volume_exception_to_retval(ve
)
296 def unprotect_subvolume_snapshot(self
, **kwargs
):
298 volname
= kwargs
['vol_name']
299 subvolname
= kwargs
['sub_name']
300 snapname
= kwargs
['snap_name']
301 groupname
= kwargs
['group_name']
304 with
open_volume(self
, volname
) as fs_handle
:
305 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
306 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
307 subvolume
.unprotect_snapshot(snapname
)
308 except VolumeException
as ve
:
309 ret
= self
.volume_exception_to_retval(ve
)
312 def _prepare_clone_subvolume(self
, fs_handle
, volname
, subvolume
, snapname
, target_group
, target_subvolname
, target_pool
):
313 create_clone(fs_handle
, self
.volspec
, target_group
, target_subvolname
, target_pool
, volname
, subvolume
, snapname
)
314 with
open_subvol(fs_handle
, self
.volspec
, target_group
, target_subvolname
, need_complete
=False) as target_subvolume
:
316 subvolume
.attach_snapshot(snapname
, target_subvolume
)
317 self
.cloner
.queue_job(volname
)
318 except VolumeException
as ve
:
320 target_subvolume
.remove()
321 self
.purge_queue
.queue_job(volname
)
322 except Exception as e
:
323 log
.warn("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname
, e
))
326 def _clone_subvolume_snapshot(self
, fs_handle
, volname
, subvolume
, **kwargs
):
327 snapname
= kwargs
['snap_name']
328 target_pool
= kwargs
['pool_layout']
329 target_subvolname
= kwargs
['target_sub_name']
330 target_groupname
= kwargs
['target_group_name']
332 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
333 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
334 if not subvolume
.is_snapshot_protected(snapname
):
335 raise VolumeException(-errno
.EINVAL
, "snapshot '{0}' is not protected".format(snapname
))
337 # TODO: when the target group is same as source, reuse group object.
338 with
open_group(fs_handle
, self
.volspec
, target_groupname
) as target_group
:
340 with
open_subvol(fs_handle
, self
.volspec
, target_group
, target_subvolname
, need_complete
=False):
341 raise VolumeException(-errno
.EEXIST
, "subvolume '{0}' exists".format(target_subvolname
))
342 except VolumeException
as ve
:
343 if ve
.errno
== -errno
.ENOENT
:
344 self
._prepare
_clone
_subvolume
(fs_handle
, volname
, subvolume
, snapname
,
345 target_group
, target_subvolname
, target_pool
)
349 def clone_subvolume_snapshot(self
, **kwargs
):
351 volname
= kwargs
['vol_name']
352 subvolname
= kwargs
['sub_name']
353 groupname
= kwargs
['group_name']
356 with
open_volume(self
, volname
) as fs_handle
:
357 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
358 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
359 self
._clone
_subvolume
_snapshot
(fs_handle
, volname
, subvolume
, **kwargs
)
360 except VolumeException
as ve
:
361 ret
= self
.volume_exception_to_retval(ve
)
364 def clone_status(self
, **kwargs
):
366 volname
= kwargs
['vol_name']
367 clonename
= kwargs
['clone_name']
368 groupname
= kwargs
['group_name']
371 with
open_volume(self
, volname
) as fs_handle
:
372 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
373 with
open_subvol(fs_handle
, self
.volspec
, group
, clonename
,
374 need_complete
=False, expected_types
=["clone"]) as subvolume
:
375 ret
= 0, json
.dumps({'status' : subvolume
.status
}, indent
=2), ""
376 except VolumeException
as ve
:
377 ret
= self
.volume_exception_to_retval(ve
)
380 def clone_cancel(self
, **kwargs
):
382 volname
= kwargs
['vol_name']
383 clonename
= kwargs
['clone_name']
384 groupname
= kwargs
['group_name']
387 self
.cloner
.cancel_job(volname
, (clonename
, groupname
))
388 except VolumeException
as ve
:
389 ret
= self
.volume_exception_to_retval(ve
)
394 def create_subvolume_group(self
, **kwargs
):
396 volname
= kwargs
['vol_name']
397 groupname
= kwargs
['group_name']
398 pool
= kwargs
['pool_layout']
401 mode
= kwargs
['mode']
404 with
open_volume(self
, volname
) as fs_handle
:
406 with
open_group(fs_handle
, self
.volspec
, groupname
):
407 # idempotent creation -- valid.
409 except VolumeException
as ve
:
410 if ve
.errno
== -errno
.ENOENT
:
411 oct_mode
= octal_str_to_decimal_int(mode
)
412 create_group(fs_handle
, self
.volspec
, groupname
, pool
, oct_mode
, uid
, gid
)
415 except VolumeException
as ve
:
416 # volume does not exist or subvolume group creation failed
417 ret
= self
.volume_exception_to_retval(ve
)
420 def remove_subvolume_group(self
, **kwargs
):
422 volname
= kwargs
['vol_name']
423 groupname
= kwargs
['group_name']
424 force
= kwargs
['force']
427 with
open_volume(self
, volname
) as fs_handle
:
428 remove_group(fs_handle
, self
.volspec
, groupname
)
429 except VolumeException
as ve
:
430 if not (ve
.errno
== -errno
.ENOENT
and force
):
431 ret
= self
.volume_exception_to_retval(ve
)
434 def getpath_subvolume_group(self
, **kwargs
):
435 volname
= kwargs
['vol_name']
436 groupname
= kwargs
['group_name']
439 with
open_volume(self
, volname
) as fs_handle
:
440 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
441 return 0, group
.path
.decode('utf-8'), ""
442 except VolumeException
as ve
:
443 return self
.volume_exception_to_retval(ve
)
445 def list_subvolume_groups(self
, **kwargs
):
446 volname
= kwargs
['vol_name']
449 with
open_volume(self
, volname
) as fs_handle
:
450 groups
= listdir(fs_handle
, self
.volspec
.base_dir
)
451 ret
= 0, name_to_json(groups
), ""
452 except VolumeException
as ve
:
453 if not ve
.errno
== -errno
.ENOENT
:
454 ret
= self
.volume_exception_to_retval(ve
)
459 def create_subvolume_group_snapshot(self
, **kwargs
):
461 volname
= kwargs
['vol_name']
462 groupname
= kwargs
['group_name']
463 snapname
= kwargs
['snap_name']
466 with
open_volume(self
, volname
) as fs_handle
:
467 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
468 group
.create_snapshot(snapname
)
469 except VolumeException
as ve
:
470 ret
= self
.volume_exception_to_retval(ve
)
473 def remove_subvolume_group_snapshot(self
, **kwargs
):
475 volname
= kwargs
['vol_name']
476 groupname
= kwargs
['group_name']
477 snapname
= kwargs
['snap_name']
478 force
= kwargs
['force']
481 with
open_volume(self
, volname
) as fs_handle
:
482 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
483 group
.remove_snapshot(snapname
)
484 except VolumeException
as ve
:
485 if not (ve
.errno
== -errno
.ENOENT
and force
):
486 ret
= self
.volume_exception_to_retval(ve
)
489 def list_subvolume_group_snapshots(self
, **kwargs
):
491 volname
= kwargs
['vol_name']
492 groupname
= kwargs
['group_name']
495 with
open_volume(self
, volname
) as fs_handle
:
496 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
497 snapshots
= group
.list_snapshots()
498 ret
= 0, name_to_json(snapshots
), ""
499 except VolumeException
as ve
:
500 ret
= self
.volume_exception_to_retval(ve
)