4 from threading
import Event
8 from .fs_util
import listdir
10 from .operations
.volume
import ConnectionPool
, open_volume
, create_volume
, \
11 delete_volume
, list_volumes
12 from .operations
.group
import open_group
, create_group
, remove_group
13 from .operations
.subvolume
import open_subvol
, create_subvol
, remove_subvol
, \
16 from .vol_spec
import VolSpec
17 from .exception
import VolumeException
18 from .async_cloner
import Cloner
19 from .purge_queue
import ThreadPoolPurgeQueueMixin
21 log
= logging
.getLogger(__name__
)
23 def octal_str_to_decimal_int(mode
):
27 raise VolumeException(-errno
.EINVAL
, "Invalid mode '{0}'".format(mode
))
29 def name_to_json(names
):
31 convert the list of names to json
34 for i
in range(len(names
)):
35 namedict
.append({'name': names
[i
].decode('utf-8')})
36 return json
.dumps(namedict
, indent
=4, sort_keys
=True)
38 class VolumeClient(object):
39 def __init__(self
, mgr
):
41 self
.stopping
= Event()
42 # volume specification
43 self
.volspec
= VolSpec(mgr
.rados
.conf_get('client_snapdir'))
44 self
.connection_pool
= ConnectionPool(self
.mgr
)
45 # TODO: make thread pool size configurable
46 self
.cloner
= Cloner(self
, 4)
47 self
.purge_queue
= ThreadPoolPurgeQueueMixin(self
, 4)
48 # on startup, queue purge job for available volumes to kickstart
49 # purge for leftover subvolume entries in trash. note that, if the
50 # trash directory does not exist or if there are no purge entries
51 # available for a volume, the volume is removed from the purge
53 fs_map
= self
.mgr
.get('fs_map')
54 for fs
in fs_map
['filesystems']:
55 self
.cloner
.queue_job(fs
['mdsmap']['fs_name'])
56 self
.purge_queue
.queue_job(fs
['mdsmap']['fs_name'])
58 def is_stopping(self
):
59 return self
.stopping
.is_set()
62 log
.info("shutting down")
63 # first, note that we're shutting down
65 # second, ask purge threads to quit
66 self
.purge_queue
.cancel_all_jobs()
67 # third, delete all libcephfs handles from connection pool
68 self
.connection_pool
.del_all_handles()
70 def cluster_log(self
, msg
, lvl
=None):
72 log to cluster log with default log level as WARN.
75 lvl
= self
.mgr
.CLUSTER_LOG_PRIO_WARN
76 self
.mgr
.cluster_log("cluster", lvl
, msg
)
78 def volume_exception_to_retval(self
, ve
):
80 return a tuple representation from a volume exception
84 ### volume operations -- create, rm, ls
86 def create_fs_volume(self
, volname
, placement
):
87 if self
.is_stopping():
88 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
89 return create_volume(self
.mgr
, volname
, placement
)
91 def delete_fs_volume(self
, volname
, confirm
):
92 if self
.is_stopping():
93 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
95 if confirm
!= "--yes-i-really-mean-it":
96 return -errno
.EPERM
, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
97 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
98 "that is what you want, re-issue the command followed by " \
99 "--yes-i-really-mean-it.".format(volname
)
101 self
.purge_queue
.cancel_jobs(volname
)
102 self
.connection_pool
.del_fs_handle(volname
, wait
=True)
103 return delete_volume(self
.mgr
, volname
)
105 def list_fs_volumes(self
):
106 if self
.stopping
.is_set():
107 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
108 volumes
= list_volumes(self
.mgr
)
109 return 0, json
.dumps(volumes
, indent
=4, sort_keys
=True), ""
111 ### subvolume operations
113 def _create_subvolume(self
, fs_handle
, volname
, group
, subvolname
, **kwargs
):
114 size
= kwargs
['size']
115 pool
= kwargs
['pool_layout']
118 mode
= kwargs
['mode']
119 isolate_nspace
= kwargs
['namespace_isolated']
121 oct_mode
= octal_str_to_decimal_int(mode
)
124 fs_handle
, self
.volspec
, group
, subvolname
, size
, isolate_nspace
, pool
, oct_mode
, uid
, gid
)
125 except VolumeException
as ve
:
126 # kick the purge threads for async removal -- note that this
127 # assumes that the subvolume is moved to trashcan for cleanup on error.
128 self
.purge_queue
.queue_job(volname
)
131 def create_subvolume(self
, **kwargs
):
133 volname
= kwargs
['vol_name']
134 subvolname
= kwargs
['sub_name']
135 groupname
= kwargs
['group_name']
136 size
= kwargs
['size']
137 pool
= kwargs
['pool_layout']
140 isolate_nspace
= kwargs
['namespace_isolated']
143 with
open_volume(self
, volname
) as fs_handle
:
144 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
146 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
147 # idempotent creation -- valid. Attributes set is supported.
148 uid
= uid
if uid
else subvolume
.uid
149 gid
= gid
if gid
else subvolume
.gid
150 subvolume
.set_attrs(subvolume
.path
, size
, isolate_nspace
, pool
, uid
, gid
)
151 except VolumeException
as ve
:
152 if ve
.errno
== -errno
.ENOENT
:
153 self
._create
_subvolume
(fs_handle
, volname
, group
, subvolname
, **kwargs
)
156 except VolumeException
as ve
:
157 # volume/group does not exist or subvolume creation failed
158 ret
= self
.volume_exception_to_retval(ve
)
161 def remove_subvolume(self
, **kwargs
):
163 volname
= kwargs
['vol_name']
164 subvolname
= kwargs
['sub_name']
165 groupname
= kwargs
['group_name']
166 force
= kwargs
['force']
169 with
open_volume(self
, volname
) as fs_handle
:
170 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
171 remove_subvol(fs_handle
, self
.volspec
, group
, subvolname
, force
)
172 # kick the purge threads for async removal -- note that this
173 # assumes that the subvolume is moved to trash can.
174 # TODO: make purge queue as singleton so that trash can kicks
175 # the purge threads on dump.
176 self
.purge_queue
.queue_job(volname
)
177 except VolumeException
as ve
:
178 if ve
.errno
== -errno
.EAGAIN
:
179 ve
= VolumeException(ve
.errno
, ve
.error_str
+ " (use --force to override)")
180 ret
= self
.volume_exception_to_retval(ve
)
181 elif not (ve
.errno
== -errno
.ENOENT
and force
):
182 ret
= self
.volume_exception_to_retval(ve
)
185 def resize_subvolume(self
, **kwargs
):
187 volname
= kwargs
['vol_name']
188 subvolname
= kwargs
['sub_name']
189 newsize
= kwargs
['new_size']
190 noshrink
= kwargs
['no_shrink']
191 groupname
= kwargs
['group_name']
194 with
open_volume(self
, volname
) as fs_handle
:
195 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
196 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
197 nsize
, usedbytes
= subvolume
.resize(newsize
, noshrink
)
199 [{'bytes_used': usedbytes
},{'bytes_quota': nsize
},
200 {'bytes_pcent': "undefined" if nsize
== 0 else '{0:.2f}'.format((float(usedbytes
) / nsize
) * 100.0)}],
201 indent
=4, sort_keys
=True), ""
202 except VolumeException
as ve
:
203 ret
= self
.volume_exception_to_retval(ve
)
206 def subvolume_getpath(self
, **kwargs
):
208 volname
= kwargs
['vol_name']
209 subvolname
= kwargs
['sub_name']
210 groupname
= kwargs
['group_name']
213 with
open_volume(self
, volname
) as fs_handle
:
214 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
215 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
216 subvolpath
= subvolume
.path
217 ret
= 0, subvolpath
.decode("utf-8"), ""
218 except VolumeException
as ve
:
219 ret
= self
.volume_exception_to_retval(ve
)
222 def subvolume_info(self
, **kwargs
):
224 volname
= kwargs
['vol_name']
225 subvolname
= kwargs
['sub_name']
226 groupname
= kwargs
['group_name']
229 with
open_volume(self
, volname
) as fs_handle
:
230 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
231 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
233 mon_map_mons
= self
.mgr
.get('mon_map')['mons']
234 for mon
in mon_map_mons
:
235 ip_port
= mon
['addr'].split("/")[0]
236 mon_addr_lst
.append(ip_port
)
238 subvol_info_dict
= subvolume
.info()
239 subvol_info_dict
["mon_addrs"] = mon_addr_lst
240 ret
= 0, json
.dumps(subvol_info_dict
, indent
=4, sort_keys
=True), ""
241 except VolumeException
as ve
:
242 ret
= self
.volume_exception_to_retval(ve
)
245 def list_subvolumes(self
, **kwargs
):
247 volname
= kwargs
['vol_name']
248 groupname
= kwargs
['group_name']
251 with
open_volume(self
, volname
) as fs_handle
:
252 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
253 subvolumes
= group
.list_subvolumes()
254 ret
= 0, name_to_json(subvolumes
), ""
255 except VolumeException
as ve
:
256 ret
= self
.volume_exception_to_retval(ve
)
259 ### subvolume snapshot
261 def create_subvolume_snapshot(self
, **kwargs
):
263 volname
= kwargs
['vol_name']
264 subvolname
= kwargs
['sub_name']
265 snapname
= kwargs
['snap_name']
266 groupname
= kwargs
['group_name']
269 with
open_volume(self
, volname
) as fs_handle
:
270 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
271 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
272 subvolume
.create_snapshot(snapname
)
273 except VolumeException
as ve
:
274 ret
= self
.volume_exception_to_retval(ve
)
277 def remove_subvolume_snapshot(self
, **kwargs
):
279 volname
= kwargs
['vol_name']
280 subvolname
= kwargs
['sub_name']
281 snapname
= kwargs
['snap_name']
282 groupname
= kwargs
['group_name']
283 force
= kwargs
['force']
286 with
open_volume(self
, volname
) as fs_handle
:
287 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
288 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
289 subvolume
.remove_snapshot(snapname
)
290 except VolumeException
as ve
:
291 if not (ve
.errno
== -errno
.ENOENT
and force
):
292 ret
= self
.volume_exception_to_retval(ve
)
295 def subvolume_snapshot_info(self
, **kwargs
):
297 volname
= kwargs
['vol_name']
298 subvolname
= kwargs
['sub_name']
299 snapname
= kwargs
['snap_name']
300 groupname
= kwargs
['group_name']
303 with
open_volume(self
, volname
) as fs_handle
:
304 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
305 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
306 snap_info_dict
= subvolume
.snapshot_info(snapname
)
307 ret
= 0, json
.dumps(snap_info_dict
, indent
=4, sort_keys
=True), ""
308 except VolumeException
as ve
:
309 ret
= self
.volume_exception_to_retval(ve
)
312 def list_subvolume_snapshots(self
, **kwargs
):
314 volname
= kwargs
['vol_name']
315 subvolname
= kwargs
['sub_name']
316 groupname
= kwargs
['group_name']
319 with
open_volume(self
, volname
) as fs_handle
:
320 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
321 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
322 snapshots
= subvolume
.list_snapshots()
323 ret
= 0, name_to_json(snapshots
), ""
324 except VolumeException
as ve
:
325 ret
= self
.volume_exception_to_retval(ve
)
328 def protect_subvolume_snapshot(self
, **kwargs
):
330 volname
= kwargs
['vol_name']
331 subvolname
= kwargs
['sub_name']
332 snapname
= kwargs
['snap_name']
333 groupname
= kwargs
['group_name']
336 with
open_volume(self
, volname
) as fs_handle
:
337 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
338 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
339 subvolume
.protect_snapshot(snapname
)
340 except VolumeException
as ve
:
341 ret
= self
.volume_exception_to_retval(ve
)
344 def unprotect_subvolume_snapshot(self
, **kwargs
):
346 volname
= kwargs
['vol_name']
347 subvolname
= kwargs
['sub_name']
348 snapname
= kwargs
['snap_name']
349 groupname
= kwargs
['group_name']
352 with
open_volume(self
, volname
) as fs_handle
:
353 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
354 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
355 subvolume
.unprotect_snapshot(snapname
)
356 except VolumeException
as ve
:
357 ret
= self
.volume_exception_to_retval(ve
)
360 def _prepare_clone_subvolume(self
, fs_handle
, volname
, subvolume
, snapname
, target_group
, target_subvolname
, target_pool
):
361 create_clone(fs_handle
, self
.volspec
, target_group
, target_subvolname
, target_pool
, volname
, subvolume
, snapname
)
362 with
open_subvol(fs_handle
, self
.volspec
, target_group
, target_subvolname
, need_complete
=False) as target_subvolume
:
364 subvolume
.attach_snapshot(snapname
, target_subvolume
)
365 self
.cloner
.queue_job(volname
)
366 except VolumeException
as ve
:
368 target_subvolume
.remove()
369 self
.purge_queue
.queue_job(volname
)
370 except Exception as e
:
371 log
.warning("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname
, e
))
374 def _clone_subvolume_snapshot(self
, fs_handle
, volname
, subvolume
, **kwargs
):
375 snapname
= kwargs
['snap_name']
376 target_pool
= kwargs
['pool_layout']
377 target_subvolname
= kwargs
['target_sub_name']
378 target_groupname
= kwargs
['target_group_name']
380 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
381 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
382 if not subvolume
.is_snapshot_protected(snapname
):
383 raise VolumeException(-errno
.EINVAL
, "snapshot '{0}' is not protected".format(snapname
))
385 # TODO: when the target group is same as source, reuse group object.
386 with
open_group(fs_handle
, self
.volspec
, target_groupname
) as target_group
:
388 with
open_subvol(fs_handle
, self
.volspec
, target_group
, target_subvolname
, need_complete
=False):
389 raise VolumeException(-errno
.EEXIST
, "subvolume '{0}' exists".format(target_subvolname
))
390 except VolumeException
as ve
:
391 if ve
.errno
== -errno
.ENOENT
:
392 self
._prepare
_clone
_subvolume
(fs_handle
, volname
, subvolume
, snapname
,
393 target_group
, target_subvolname
, target_pool
)
397 def clone_subvolume_snapshot(self
, **kwargs
):
399 volname
= kwargs
['vol_name']
400 subvolname
= kwargs
['sub_name']
401 groupname
= kwargs
['group_name']
404 with
open_volume(self
, volname
) as fs_handle
:
405 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
406 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
407 self
._clone
_subvolume
_snapshot
(fs_handle
, volname
, subvolume
, **kwargs
)
408 except VolumeException
as ve
:
409 ret
= self
.volume_exception_to_retval(ve
)
412 def clone_status(self
, **kwargs
):
414 volname
= kwargs
['vol_name']
415 clonename
= kwargs
['clone_name']
416 groupname
= kwargs
['group_name']
419 with
open_volume(self
, volname
) as fs_handle
:
420 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
421 with
open_subvol(fs_handle
, self
.volspec
, group
, clonename
,
422 need_complete
=False, expected_types
=["clone"]) as subvolume
:
423 ret
= 0, json
.dumps({'status' : subvolume
.status
}, indent
=2), ""
424 except VolumeException
as ve
:
425 ret
= self
.volume_exception_to_retval(ve
)
428 def clone_cancel(self
, **kwargs
):
430 volname
= kwargs
['vol_name']
431 clonename
= kwargs
['clone_name']
432 groupname
= kwargs
['group_name']
435 self
.cloner
.cancel_job(volname
, (clonename
, groupname
))
436 except VolumeException
as ve
:
437 ret
= self
.volume_exception_to_retval(ve
)
442 def create_subvolume_group(self
, **kwargs
):
444 volname
= kwargs
['vol_name']
445 groupname
= kwargs
['group_name']
446 pool
= kwargs
['pool_layout']
449 mode
= kwargs
['mode']
452 with
open_volume(self
, volname
) as fs_handle
:
454 with
open_group(fs_handle
, self
.volspec
, groupname
):
455 # idempotent creation -- valid.
457 except VolumeException
as ve
:
458 if ve
.errno
== -errno
.ENOENT
:
459 oct_mode
= octal_str_to_decimal_int(mode
)
460 create_group(fs_handle
, self
.volspec
, groupname
, pool
, oct_mode
, uid
, gid
)
463 except VolumeException
as ve
:
464 # volume does not exist or subvolume group creation failed
465 ret
= self
.volume_exception_to_retval(ve
)
468 def remove_subvolume_group(self
, **kwargs
):
470 volname
= kwargs
['vol_name']
471 groupname
= kwargs
['group_name']
472 force
= kwargs
['force']
475 with
open_volume(self
, volname
) as fs_handle
:
476 remove_group(fs_handle
, self
.volspec
, groupname
)
477 except VolumeException
as ve
:
478 if not (ve
.errno
== -errno
.ENOENT
and force
):
479 ret
= self
.volume_exception_to_retval(ve
)
482 def getpath_subvolume_group(self
, **kwargs
):
483 volname
= kwargs
['vol_name']
484 groupname
= kwargs
['group_name']
487 with
open_volume(self
, volname
) as fs_handle
:
488 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
489 return 0, group
.path
.decode('utf-8'), ""
490 except VolumeException
as ve
:
491 return self
.volume_exception_to_retval(ve
)
493 def list_subvolume_groups(self
, **kwargs
):
494 volname
= kwargs
['vol_name']
497 with
open_volume(self
, volname
) as fs_handle
:
498 groups
= listdir(fs_handle
, self
.volspec
.base_dir
)
499 ret
= 0, name_to_json(groups
), ""
500 except VolumeException
as ve
:
501 if not ve
.errno
== -errno
.ENOENT
:
502 ret
= self
.volume_exception_to_retval(ve
)
507 def create_subvolume_group_snapshot(self
, **kwargs
):
509 volname
= kwargs
['vol_name']
510 groupname
= kwargs
['group_name']
511 snapname
= kwargs
['snap_name']
514 with
open_volume(self
, volname
) as fs_handle
:
515 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
516 group
.create_snapshot(snapname
)
517 except VolumeException
as ve
:
518 ret
= self
.volume_exception_to_retval(ve
)
521 def remove_subvolume_group_snapshot(self
, **kwargs
):
523 volname
= kwargs
['vol_name']
524 groupname
= kwargs
['group_name']
525 snapname
= kwargs
['snap_name']
526 force
= kwargs
['force']
529 with
open_volume(self
, volname
) as fs_handle
:
530 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
531 group
.remove_snapshot(snapname
)
532 except VolumeException
as ve
:
533 if not (ve
.errno
== -errno
.ENOENT
and force
):
534 ret
= self
.volume_exception_to_retval(ve
)
537 def list_subvolume_group_snapshots(self
, **kwargs
):
539 volname
= kwargs
['vol_name']
540 groupname
= kwargs
['group_name']
543 with
open_volume(self
, volname
) as fs_handle
:
544 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
545 snapshots
= group
.list_snapshots()
546 ret
= 0, name_to_json(snapshots
), ""
547 except VolumeException
as ve
:
548 ret
= self
.volume_exception_to_retval(ve
)