4 from threading
import Event
8 from .fs_util
import listdir
10 from .operations
.volume
import ConnectionPool
, open_volume
, create_volume
, \
11 delete_volume
, list_volumes
, get_pool_names
12 from .operations
.group
import open_group
, create_group
, remove_group
, open_group_unique
13 from .operations
.subvolume
import open_subvol
, create_subvol
, remove_subvol
, \
16 from .vol_spec
import VolSpec
17 from .exception
import VolumeException
, ClusterError
, ClusterTimeout
, EvictionError
18 from .async_cloner
import Cloner
19 from .purge_queue
import ThreadPoolPurgeQueueMixin
20 from .operations
.template
import SubvolumeOpType
22 log
= logging
.getLogger(__name__
)
24 ALLOWED_ACCESS_LEVELS
= ('r', 'rw')
27 def octal_str_to_decimal_int(mode
):
31 raise VolumeException(-errno
.EINVAL
, "Invalid mode '{0}'".format(mode
))
33 def name_to_json(names
):
35 convert the list of names to json
38 for i
in range(len(names
)):
39 namedict
.append({'name': names
[i
].decode('utf-8')})
40 return json
.dumps(namedict
, indent
=4, sort_keys
=True)
42 class VolumeClient(object):
43 def __init__(self
, mgr
):
45 self
.stopping
= Event()
46 # volume specification
47 self
.volspec
= VolSpec(mgr
.rados
.conf_get('client_snapdir'))
48 self
.connection_pool
= ConnectionPool(self
.mgr
)
49 self
.cloner
= Cloner(self
, self
.mgr
.max_concurrent_clones
)
50 self
.purge_queue
= ThreadPoolPurgeQueueMixin(self
, 4)
51 # on startup, queue purge job for available volumes to kickstart
52 # purge for leftover subvolume entries in trash. note that, if the
53 # trash directory does not exist or if there are no purge entries
54 # available for a volume, the volume is removed from the purge
56 fs_map
= self
.mgr
.get('fs_map')
57 for fs
in fs_map
['filesystems']:
58 self
.cloner
.queue_job(fs
['mdsmap']['fs_name'])
59 self
.purge_queue
.queue_job(fs
['mdsmap']['fs_name'])
61 def is_stopping(self
):
62 return self
.stopping
.is_set()
65 log
.info("shutting down")
66 # first, note that we're shutting down
68 # second, ask purge threads to quit
69 self
.purge_queue
.cancel_all_jobs()
70 # third, delete all libcephfs handles from connection pool
71 self
.connection_pool
.del_all_handles()
73 def cluster_log(self
, msg
, lvl
=None):
75 log to cluster log with default log level as WARN.
78 lvl
= self
.mgr
.CLUSTER_LOG_PRIO_WARN
79 self
.mgr
.cluster_log("cluster", lvl
, msg
)
81 def volume_exception_to_retval(self
, ve
):
83 return a tuple representation from a volume exception
87 ### volume operations -- create, rm, ls
89 def create_fs_volume(self
, volname
, placement
):
90 if self
.is_stopping():
91 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
92 return create_volume(self
.mgr
, volname
, placement
)
94 def delete_fs_volume(self
, volname
, confirm
):
95 if self
.is_stopping():
96 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
98 if confirm
!= "--yes-i-really-mean-it":
99 return -errno
.EPERM
, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
100 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
101 "that is what you want, re-issue the command followed by " \
102 "--yes-i-really-mean-it.".format(volname
)
104 ret
, out
, err
= self
.mgr
.check_mon_command({
105 'prefix': 'config get',
106 'key': 'mon_allow_pool_delete',
110 mon_allow_pool_delete
= json
.loads(out
)
111 if not mon_allow_pool_delete
:
112 return -errno
.EPERM
, "", "pool deletion is disabled; you must first " \
113 "set the mon_allow_pool_delete config option to true before volumes " \
116 metadata_pool
, data_pools
= get_pool_names(self
.mgr
, volname
)
117 if not metadata_pool
:
118 return -errno
.ENOENT
, "", "volume {0} doesn't exist".format(volname
)
119 self
.purge_queue
.cancel_jobs(volname
)
120 self
.connection_pool
.del_fs_handle(volname
, wait
=True)
121 return delete_volume(self
.mgr
, volname
, metadata_pool
, data_pools
)
123 def list_fs_volumes(self
):
124 if self
.stopping
.is_set():
125 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
126 volumes
= list_volumes(self
.mgr
)
127 return 0, json
.dumps(volumes
, indent
=4, sort_keys
=True), ""
129 ### subvolume operations
131 def _create_subvolume(self
, fs_handle
, volname
, group
, subvolname
, **kwargs
):
132 size
= kwargs
['size']
133 pool
= kwargs
['pool_layout']
136 mode
= kwargs
['mode']
137 isolate_nspace
= kwargs
['namespace_isolated']
139 oct_mode
= octal_str_to_decimal_int(mode
)
142 self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, size
, isolate_nspace
, pool
, oct_mode
, uid
, gid
)
143 except VolumeException
as ve
:
144 # kick the purge threads for async removal -- note that this
145 # assumes that the subvolume is moved to trashcan for cleanup on error.
146 self
.purge_queue
.queue_job(volname
)
149 def create_subvolume(self
, **kwargs
):
151 volname
= kwargs
['vol_name']
152 subvolname
= kwargs
['sub_name']
153 groupname
= kwargs
['group_name']
154 size
= kwargs
['size']
155 pool
= kwargs
['pool_layout']
158 isolate_nspace
= kwargs
['namespace_isolated']
161 with
open_volume(self
, volname
) as fs_handle
:
162 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
164 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.CREATE
) as subvolume
:
165 # idempotent creation -- valid. Attributes set is supported.
167 'uid': uid
if uid
else subvolume
.uid
,
168 'gid': gid
if gid
else subvolume
.gid
,
170 'pool_namespace': subvolume
.namespace
if isolate_nspace
else None,
173 subvolume
.set_attrs(subvolume
.path
, attrs
)
174 except VolumeException
as ve
:
175 if ve
.errno
== -errno
.ENOENT
:
176 self
._create
_subvolume
(fs_handle
, volname
, group
, subvolname
, **kwargs
)
179 except VolumeException
as ve
:
180 # volume/group does not exist or subvolume creation failed
181 ret
= self
.volume_exception_to_retval(ve
)
184 def remove_subvolume(self
, **kwargs
):
186 volname
= kwargs
['vol_name']
187 subvolname
= kwargs
['sub_name']
188 groupname
= kwargs
['group_name']
189 force
= kwargs
['force']
190 retainsnaps
= kwargs
['retain_snapshots']
193 with
open_volume(self
, volname
) as fs_handle
:
194 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
195 remove_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, force
, retainsnaps
)
196 # kick the purge threads for async removal -- note that this
197 # assumes that the subvolume is moved to trash can.
198 # TODO: make purge queue as singleton so that trash can kicks
199 # the purge threads on dump.
200 self
.purge_queue
.queue_job(volname
)
201 except VolumeException
as ve
:
202 if ve
.errno
== -errno
.EAGAIN
:
203 ve
= VolumeException(ve
.errno
, ve
.error_str
+ " (use --force to override)")
204 ret
= self
.volume_exception_to_retval(ve
)
205 elif not (ve
.errno
== -errno
.ENOENT
and force
):
206 ret
= self
.volume_exception_to_retval(ve
)
209 def authorize_subvolume(self
, **kwargs
):
211 volname
= kwargs
['vol_name']
212 subvolname
= kwargs
['sub_name']
213 authid
= kwargs
['auth_id']
214 groupname
= kwargs
['group_name']
215 accesslevel
= kwargs
['access_level']
216 tenant_id
= kwargs
['tenant_id']
217 allow_existing_id
= kwargs
['allow_existing_id']
220 with
open_volume(self
, volname
) as fs_handle
:
221 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
222 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.ALLOW_ACCESS
) as subvolume
:
223 key
= subvolume
.authorize(authid
, accesslevel
, tenant_id
, allow_existing_id
)
225 except VolumeException
as ve
:
226 ret
= self
.volume_exception_to_retval(ve
)
229 def deauthorize_subvolume(self
, **kwargs
):
231 volname
= kwargs
['vol_name']
232 subvolname
= kwargs
['sub_name']
233 authid
= kwargs
['auth_id']
234 groupname
= kwargs
['group_name']
237 with
open_volume(self
, volname
) as fs_handle
:
238 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
239 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.DENY_ACCESS
) as subvolume
:
240 subvolume
.deauthorize(authid
)
241 except VolumeException
as ve
:
242 ret
= self
.volume_exception_to_retval(ve
)
245 def authorized_list(self
, **kwargs
):
247 volname
= kwargs
['vol_name']
248 subvolname
= kwargs
['sub_name']
249 groupname
= kwargs
['group_name']
252 with
open_volume(self
, volname
) as fs_handle
:
253 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
254 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.AUTH_LIST
) as subvolume
:
255 auths
= subvolume
.authorized_list()
256 ret
= 0, json
.dumps(auths
, indent
=4, sort_keys
=True), ""
257 except VolumeException
as ve
:
258 ret
= self
.volume_exception_to_retval(ve
)
261 def evict(self
, **kwargs
):
263 volname
= kwargs
['vol_name']
264 subvolname
= kwargs
['sub_name']
265 authid
= kwargs
['auth_id']
266 groupname
= kwargs
['group_name']
269 with
open_volume(self
, volname
) as fs_handle
:
270 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
271 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.EVICT
) as subvolume
:
272 key
= subvolume
.evict(volname
, authid
)
274 except (VolumeException
, ClusterTimeout
, ClusterError
, EvictionError
) as e
:
275 if isinstance(e
, VolumeException
):
276 ret
= self
.volume_exception_to_retval(e
)
277 elif isinstance(e
, ClusterTimeout
):
278 ret
= -errno
.ETIMEDOUT
, "", "Timedout trying to talk to ceph cluster"
279 elif isinstance(e
, ClusterError
):
280 ret
= e
._result
_code
, "", e
._result
_str
281 elif isinstance(e
, EvictionError
):
282 ret
= -errno
.EINVAL
, "", str(e
)
285 def resize_subvolume(self
, **kwargs
):
287 volname
= kwargs
['vol_name']
288 subvolname
= kwargs
['sub_name']
289 newsize
= kwargs
['new_size']
290 noshrink
= kwargs
['no_shrink']
291 groupname
= kwargs
['group_name']
294 with
open_volume(self
, volname
) as fs_handle
:
295 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
296 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.RESIZE
) as subvolume
:
297 nsize
, usedbytes
= subvolume
.resize(newsize
, noshrink
)
299 [{'bytes_used': usedbytes
},{'bytes_quota': nsize
},
300 {'bytes_pcent': "undefined" if nsize
== 0 else '{0:.2f}'.format((float(usedbytes
) / nsize
) * 100.0)}],
301 indent
=4, sort_keys
=True), ""
302 except VolumeException
as ve
:
303 ret
= self
.volume_exception_to_retval(ve
)
306 def subvolume_pin(self
, **kwargs
):
308 volname
= kwargs
['vol_name']
309 subvolname
= kwargs
['sub_name']
310 pin_type
= kwargs
['pin_type']
311 pin_setting
= kwargs
['pin_setting']
312 groupname
= kwargs
['group_name']
315 with
open_volume(self
, volname
) as fs_handle
:
316 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
317 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.PIN
) as subvolume
:
318 subvolume
.pin(pin_type
, pin_setting
)
319 ret
= 0, json
.dumps({}), ""
320 except VolumeException
as ve
:
321 ret
= self
.volume_exception_to_retval(ve
)
324 def subvolume_getpath(self
, **kwargs
):
326 volname
= kwargs
['vol_name']
327 subvolname
= kwargs
['sub_name']
328 groupname
= kwargs
['group_name']
331 with
open_volume(self
, volname
) as fs_handle
:
332 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
333 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.GETPATH
) as subvolume
:
334 subvolpath
= subvolume
.path
335 ret
= 0, subvolpath
.decode("utf-8"), ""
336 except VolumeException
as ve
:
337 ret
= self
.volume_exception_to_retval(ve
)
340 def subvolume_info(self
, **kwargs
):
342 volname
= kwargs
['vol_name']
343 subvolname
= kwargs
['sub_name']
344 groupname
= kwargs
['group_name']
347 with
open_volume(self
, volname
) as fs_handle
:
348 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
349 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.INFO
) as subvolume
:
351 mon_map_mons
= self
.mgr
.get('mon_map')['mons']
352 for mon
in mon_map_mons
:
353 ip_port
= mon
['addr'].split("/")[0]
354 mon_addr_lst
.append(ip_port
)
356 subvol_info_dict
= subvolume
.info()
357 subvol_info_dict
["mon_addrs"] = mon_addr_lst
358 ret
= 0, json
.dumps(subvol_info_dict
, indent
=4, sort_keys
=True), ""
359 except VolumeException
as ve
:
360 ret
= self
.volume_exception_to_retval(ve
)
363 def list_subvolumes(self
, **kwargs
):
365 volname
= kwargs
['vol_name']
366 groupname
= kwargs
['group_name']
369 with
open_volume(self
, volname
) as fs_handle
:
370 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
371 subvolumes
= group
.list_subvolumes()
372 ret
= 0, name_to_json(subvolumes
), ""
373 except VolumeException
as ve
:
374 ret
= self
.volume_exception_to_retval(ve
)
377 ### subvolume snapshot
379 def create_subvolume_snapshot(self
, **kwargs
):
381 volname
= kwargs
['vol_name']
382 subvolname
= kwargs
['sub_name']
383 snapname
= kwargs
['snap_name']
384 groupname
= kwargs
['group_name']
387 with
open_volume(self
, volname
) as fs_handle
:
388 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
389 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_CREATE
) as subvolume
:
390 subvolume
.create_snapshot(snapname
)
391 except VolumeException
as ve
:
392 ret
= self
.volume_exception_to_retval(ve
)
395 def remove_subvolume_snapshot(self
, **kwargs
):
397 volname
= kwargs
['vol_name']
398 subvolname
= kwargs
['sub_name']
399 snapname
= kwargs
['snap_name']
400 groupname
= kwargs
['group_name']
401 force
= kwargs
['force']
404 with
open_volume(self
, volname
) as fs_handle
:
405 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
406 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_REMOVE
) as subvolume
:
407 subvolume
.remove_snapshot(snapname
)
408 except VolumeException
as ve
:
409 # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
410 # we should tickle the purge jobs to purge the same
411 if ve
.errno
== -errno
.ESTALE
:
412 self
.purge_queue
.queue_job(volname
)
413 elif not (ve
.errno
== -errno
.ENOENT
and force
):
414 ret
= self
.volume_exception_to_retval(ve
)
417 def subvolume_snapshot_info(self
, **kwargs
):
419 volname
= kwargs
['vol_name']
420 subvolname
= kwargs
['sub_name']
421 snapname
= kwargs
['snap_name']
422 groupname
= kwargs
['group_name']
425 with
open_volume(self
, volname
) as fs_handle
:
426 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
427 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_INFO
) as subvolume
:
428 snap_info_dict
= subvolume
.snapshot_info(snapname
)
429 ret
= 0, json
.dumps(snap_info_dict
, indent
=4, sort_keys
=True), ""
430 except VolumeException
as ve
:
431 ret
= self
.volume_exception_to_retval(ve
)
434 def list_subvolume_snapshots(self
, **kwargs
):
436 volname
= kwargs
['vol_name']
437 subvolname
= kwargs
['sub_name']
438 groupname
= kwargs
['group_name']
441 with
open_volume(self
, volname
) as fs_handle
:
442 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
443 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_LIST
) as subvolume
:
444 snapshots
= subvolume
.list_snapshots()
445 ret
= 0, name_to_json(snapshots
), ""
446 except VolumeException
as ve
:
447 ret
= self
.volume_exception_to_retval(ve
)
450 def protect_subvolume_snapshot(self
, **kwargs
):
451 ret
= 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
452 volname
= kwargs
['vol_name']
453 subvolname
= kwargs
['sub_name']
454 groupname
= kwargs
['group_name']
457 with
open_volume(self
, volname
) as fs_handle
:
458 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
459 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_PROTECT
) as subvolume
:
460 log
.warning("snapshot protect call is deprecated and will be removed in a future release")
461 except VolumeException
as ve
:
462 ret
= self
.volume_exception_to_retval(ve
)
465 def unprotect_subvolume_snapshot(self
, **kwargs
):
466 ret
= 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
467 volname
= kwargs
['vol_name']
468 subvolname
= kwargs
['sub_name']
469 groupname
= kwargs
['group_name']
472 with
open_volume(self
, volname
) as fs_handle
:
473 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
474 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_UNPROTECT
) as subvolume
:
475 log
.warning("snapshot unprotect call is deprecated and will be removed in a future release")
476 except VolumeException
as ve
:
477 ret
= self
.volume_exception_to_retval(ve
)
480 def _prepare_clone_subvolume(self
, fs_handle
, volname
, s_subvolume
, s_snapname
, t_group
, t_subvolname
, **kwargs
):
481 t_pool
= kwargs
['pool_layout']
482 s_subvolname
= kwargs
['sub_name']
483 s_groupname
= kwargs
['group_name']
484 t_groupname
= kwargs
['target_group_name']
486 create_clone(self
.mgr
, fs_handle
, self
.volspec
, t_group
, t_subvolname
, t_pool
, volname
, s_subvolume
, s_snapname
)
487 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, t_group
, t_subvolname
, SubvolumeOpType
.CLONE_INTERNAL
) as t_subvolume
:
489 if t_groupname
== s_groupname
and t_subvolname
== s_subvolname
:
490 t_subvolume
.attach_snapshot(s_snapname
, t_subvolume
)
492 s_subvolume
.attach_snapshot(s_snapname
, t_subvolume
)
493 self
.cloner
.queue_job(volname
)
494 except VolumeException
as ve
:
497 self
.purge_queue
.queue_job(volname
)
498 except Exception as e
:
499 log
.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname
, e
))
502 def _clone_subvolume_snapshot(self
, fs_handle
, volname
, s_group
, s_subvolume
, **kwargs
):
503 s_snapname
= kwargs
['snap_name']
504 target_subvolname
= kwargs
['target_sub_name']
505 target_groupname
= kwargs
['target_group_name']
506 s_groupname
= kwargs
['group_name']
508 if not s_snapname
.encode('utf-8') in s_subvolume
.list_snapshots():
509 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(s_snapname
))
511 with
open_group_unique(fs_handle
, self
.volspec
, target_groupname
, s_group
, s_groupname
) as target_group
:
513 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, target_group
, target_subvolname
, SubvolumeOpType
.CLONE_CREATE
):
514 raise VolumeException(-errno
.EEXIST
, "subvolume '{0}' exists".format(target_subvolname
))
515 except VolumeException
as ve
:
516 if ve
.errno
== -errno
.ENOENT
:
517 self
._prepare
_clone
_subvolume
(fs_handle
, volname
, s_subvolume
, s_snapname
,
518 target_group
, target_subvolname
, **kwargs
)
522 def clone_subvolume_snapshot(self
, **kwargs
):
524 volname
= kwargs
['vol_name']
525 s_subvolname
= kwargs
['sub_name']
526 s_groupname
= kwargs
['group_name']
529 with
open_volume(self
, volname
) as fs_handle
:
530 with
open_group(fs_handle
, self
.volspec
, s_groupname
) as s_group
:
531 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, s_group
, s_subvolname
, SubvolumeOpType
.CLONE_SOURCE
) as s_subvolume
:
532 self
._clone
_subvolume
_snapshot
(fs_handle
, volname
, s_group
, s_subvolume
, **kwargs
)
533 except VolumeException
as ve
:
534 ret
= self
.volume_exception_to_retval(ve
)
537 def clone_status(self
, **kwargs
):
539 volname
= kwargs
['vol_name']
540 clonename
= kwargs
['clone_name']
541 groupname
= kwargs
['group_name']
544 with
open_volume(self
, volname
) as fs_handle
:
545 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
546 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, clonename
, SubvolumeOpType
.CLONE_STATUS
) as subvolume
:
547 ret
= 0, json
.dumps({'status' : subvolume
.status
}, indent
=2), ""
548 except VolumeException
as ve
:
549 ret
= self
.volume_exception_to_retval(ve
)
552 def clone_cancel(self
, **kwargs
):
554 volname
= kwargs
['vol_name']
555 clonename
= kwargs
['clone_name']
556 groupname
= kwargs
['group_name']
559 self
.cloner
.cancel_job(volname
, (clonename
, groupname
))
560 except VolumeException
as ve
:
561 ret
= self
.volume_exception_to_retval(ve
)
566 def create_subvolume_group(self
, **kwargs
):
568 volname
= kwargs
['vol_name']
569 groupname
= kwargs
['group_name']
570 pool
= kwargs
['pool_layout']
573 mode
= kwargs
['mode']
576 with
open_volume(self
, volname
) as fs_handle
:
578 with
open_group(fs_handle
, self
.volspec
, groupname
):
579 # idempotent creation -- valid.
581 except VolumeException
as ve
:
582 if ve
.errno
== -errno
.ENOENT
:
583 oct_mode
= octal_str_to_decimal_int(mode
)
584 create_group(fs_handle
, self
.volspec
, groupname
, pool
, oct_mode
, uid
, gid
)
587 except VolumeException
as ve
:
588 # volume does not exist or subvolume group creation failed
589 ret
= self
.volume_exception_to_retval(ve
)
592 def remove_subvolume_group(self
, **kwargs
):
594 volname
= kwargs
['vol_name']
595 groupname
= kwargs
['group_name']
596 force
= kwargs
['force']
599 with
open_volume(self
, volname
) as fs_handle
:
600 remove_group(fs_handle
, self
.volspec
, groupname
)
601 except VolumeException
as ve
:
602 if not (ve
.errno
== -errno
.ENOENT
and force
):
603 ret
= self
.volume_exception_to_retval(ve
)
606 def getpath_subvolume_group(self
, **kwargs
):
607 volname
= kwargs
['vol_name']
608 groupname
= kwargs
['group_name']
611 with
open_volume(self
, volname
) as fs_handle
:
612 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
613 return 0, group
.path
.decode('utf-8'), ""
614 except VolumeException
as ve
:
615 return self
.volume_exception_to_retval(ve
)
617 def list_subvolume_groups(self
, **kwargs
):
618 volname
= kwargs
['vol_name']
621 with
open_volume(self
, volname
) as fs_handle
:
622 groups
= listdir(fs_handle
, self
.volspec
.base_dir
)
623 ret
= 0, name_to_json(groups
), ""
624 except VolumeException
as ve
:
625 if not ve
.errno
== -errno
.ENOENT
:
626 ret
= self
.volume_exception_to_retval(ve
)
629 def pin_subvolume_group(self
, **kwargs
):
631 volname
= kwargs
['vol_name']
632 groupname
= kwargs
['group_name']
633 pin_type
= kwargs
['pin_type']
634 pin_setting
= kwargs
['pin_setting']
637 with
open_volume(self
, volname
) as fs_handle
:
638 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
639 group
.pin(pin_type
, pin_setting
)
640 ret
= 0, json
.dumps({}), ""
641 except VolumeException
as ve
:
642 ret
= self
.volume_exception_to_retval(ve
)
647 def create_subvolume_group_snapshot(self
, **kwargs
):
648 ret
= -errno
.ENOSYS
, "", "subvolume group snapshots are not supported"
649 volname
= kwargs
['vol_name']
650 groupname
= kwargs
['group_name']
651 # snapname = kwargs['snap_name']
654 with
open_volume(self
, volname
) as fs_handle
:
655 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
656 # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
657 # at the subvolume group (see: https://tracker.ceph.com/issues/46074)
658 # group.create_snapshot(snapname)
660 except VolumeException
as ve
:
661 ret
= self
.volume_exception_to_retval(ve
)
664 def remove_subvolume_group_snapshot(self
, **kwargs
):
666 volname
= kwargs
['vol_name']
667 groupname
= kwargs
['group_name']
668 snapname
= kwargs
['snap_name']
669 force
= kwargs
['force']
672 with
open_volume(self
, volname
) as fs_handle
:
673 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
674 group
.remove_snapshot(snapname
)
675 except VolumeException
as ve
:
676 if not (ve
.errno
== -errno
.ENOENT
and force
):
677 ret
= self
.volume_exception_to_retval(ve
)
680 def list_subvolume_group_snapshots(self
, **kwargs
):
682 volname
= kwargs
['vol_name']
683 groupname
= kwargs
['group_name']
686 with
open_volume(self
, volname
) as fs_handle
:
687 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
688 snapshots
= group
.list_snapshots()
689 ret
= 0, name_to_json(snapshots
), ""
690 except VolumeException
as ve
:
691 ret
= self
.volume_exception_to_retval(ve
)