5 from threading
import Lock
8 from threading
import _Timer
as Timer
11 from threading
import Timer
16 from .subvolspec
import SubvolumeSpec
17 from .subvolume
import SubVolume
18 from .exception
import VolumeException
19 from .purge_queue
import ThreadPoolPurgeQueueMixin
21 log
= logging
.getLogger(__name__
)
23 class ConnectionPool(object):
24 class Connection(object):
25 def __init__(self
, mgr
, fs_name
):
28 self
.fs_name
= fs_name
29 self
.ops_in_progress
= 0
30 self
.last_used
= time
.time()
31 self
.fs_id
= self
.get_fs_id()
34 fs_map
= self
.mgr
.get('fs_map')
35 for fs
in fs_map
['filesystems']:
36 if fs
['mdsmap']['fs_name'] == self
.fs_name
:
38 raise VolumeException(
39 -errno
.ENOENT
, "Volume '{0}' not found".format(self
.fs_name
))
41 def get_fs_handle(self
):
42 self
.last_used
= time
.time()
43 self
.ops_in_progress
+= 1
46 def put_fs_handle(self
):
47 assert self
.ops_in_progress
> 0
48 self
.ops_in_progress
-= 1
50 def del_fs_handle(self
):
51 if self
.is_connection_valid():
56 def is_connection_valid(self
):
59 fs_id
= self
.get_fs_id()
61 # the filesystem does not exist now -- connection is not valid.
63 return self
.fs_id
== fs_id
65 def is_connection_idle(self
, timeout
):
66 return (self
.ops_in_progress
== 0 and ((time
.time() - self
.last_used
) >= timeout
))
69 assert self
.ops_in_progress
== 0
70 log
.debug("Connecting to cephfs '{0}'".format(self
.fs_name
))
71 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.mgr
.rados
)
72 log
.debug("Setting user ID and group ID of CephFS mount as root...")
73 self
.fs
.conf_set("client_mount_uid", "0")
74 self
.fs
.conf_set("client_mount_gid", "0")
75 log
.debug("CephFS initializing...")
77 log
.debug("CephFS mounting...")
78 self
.fs
.mount(filesystem_name
=self
.fs_name
.encode('utf-8'))
79 log
.debug("Connection to cephfs '{0}' complete".format(self
.fs_name
))
82 assert self
.ops_in_progress
== 0
83 log
.info("disconnecting from cephfs '{0}'".format(self
.fs_name
))
88 assert self
.ops_in_progress
== 0
89 log
.info("aborting connection from cephfs '{0}'".format(self
.fs_name
))
95 recurring timer variant of Timer
98 while not self
.finished
.is_set():
99 self
.finished
.wait(self
.interval
)
100 self
.function(*self
.args
, **self
.kwargs
)
103 # TODO: make this configurable
104 TIMER_TASK_RUN_INTERVAL
= 30.0 # seconds
105 CONNECTION_IDLE_INTERVAL
= 60.0 # seconds
107 def __init__(self
, mgr
):
109 self
.connections
= {}
111 self
.timer_task
= ConnectionPool
.RTimer(ConnectionPool
.TIMER_TASK_RUN_INTERVAL
,
112 self
.cleanup_connections
)
113 self
.timer_task
.start()
115 def cleanup_connections(self
):
117 log
.info("scanning for idle connections..")
118 idle_fs
= [fs_name
for fs_name
,conn
in self
.connections
.iteritems()
119 if conn
.is_connection_idle(ConnectionPool
.CONNECTION_IDLE_INTERVAL
)]
120 for fs_name
in idle_fs
:
121 log
.info("cleaning up connection for '{}'".format(fs_name
))
122 self
._del
_fs
_handle
(fs_name
)
124 def get_fs_handle(self
, fs_name
):
128 conn
= self
.connections
.get(fs_name
, None)
130 if conn
.is_connection_valid():
131 return conn
.get_fs_handle()
133 # filesystem id changed beneath us (or the filesystem does not exist).
134 # this is possible if the filesystem got removed (and recreated with
135 # same name) via "ceph fs rm/new" mon command.
136 log
.warning("filesystem id changed for volume '{0}', reconnecting...".format(fs_name
))
137 self
._del
_fs
_handle
(fs_name
)
138 conn
= ConnectionPool
.Connection(self
.mgr
, fs_name
)
140 except cephfs
.Error
as e
:
141 # try to provide a better error string if possible
142 if e
.args
[0] == errno
.ENOENT
:
143 raise VolumeException(
144 -errno
.ENOENT
, "Volume '{0}' not found".format(fs_name
))
145 raise VolumeException(-e
.args
[0], e
.args
[1])
146 self
.connections
[fs_name
] = conn
147 return conn
.get_fs_handle()
149 def put_fs_handle(self
, fs_name
):
151 conn
= self
.connections
.get(fs_name
, None)
155 def _del_fs_handle(self
, fs_name
):
156 conn
= self
.connections
.pop(fs_name
, None)
159 def del_fs_handle(self
, fs_name
):
161 self
._del
_fs
_handle
(fs_name
)
163 class VolumeClient(object):
164 def __init__(self
, mgr
):
166 self
.connection_pool
= ConnectionPool(self
.mgr
)
167 # TODO: make thread pool size configurable
168 self
.purge_queue
= ThreadPoolPurgeQueueMixin(self
, 4)
169 # on startup, queue purge job for available volumes to kickstart
170 # purge for leftover subvolume entries in trash. note that, if the
171 # trash directory does not exist or if there are no purge entries
172 # available for a volume, the volume is removed from the purge
174 fs_map
= self
.mgr
.get('fs_map')
175 for fs
in fs_map
['filesystems']:
176 self
.purge_queue
.queue_purge_job(fs
['mdsmap']['fs_name'])
178 def cluster_log(self
, msg
, lvl
=None):
180 log to cluster log with default log level as WARN.
183 lvl
= self
.mgr
.CLUSTER_LOG_PRIO_WARN
184 self
.mgr
.cluster_log("cluster", lvl
, msg
)
186 def gen_pool_names(self
, volname
):
188 return metadata and data pool name (from a filesystem/volume name) as a tuple
190 return "cephfs.{}.meta".format(volname
), "cephfs.{}.data".format(volname
)
192 def get_fs(self
, fs_name
):
193 fs_map
= self
.mgr
.get('fs_map')
194 for fs
in fs_map
['filesystems']:
195 if fs
['mdsmap']['fs_name'] == fs_name
:
199 def get_mds_names(self
, fs_name
):
200 fs
= self
.get_fs(fs_name
)
203 return [mds
['name'] for mds
in fs
['mdsmap']['info'].values()]
205 def volume_exists(self
, volname
):
206 return self
.get_fs(volname
) is not None
208 def volume_exception_to_retval(self
, ve
):
210 return a tuple representation from a volume exception
214 def create_pool(self
, pool_name
, pg_num
):
215 # create the given pool
216 command
= {'prefix': 'osd pool create', 'pool': pool_name
, 'pg_num': pg_num
}
217 r
, outb
, outs
= self
.mgr
.mon_command(command
)
223 def remove_pool(self
, pool_name
):
224 command
= {'prefix': 'osd pool rm', 'pool': pool_name
, 'pool2': pool_name
,
225 'yes_i_really_really_mean_it': True}
226 return self
.mgr
.mon_command(command
)
228 def create_filesystem(self
, fs_name
, metadata_pool
, data_pool
):
229 command
= {'prefix': 'fs new', 'fs_name': fs_name
, 'metadata': metadata_pool
,
231 return self
.mgr
.mon_command(command
)
233 def remove_filesystem(self
, fs_name
, confirm
):
234 if confirm
!= "--yes-i-really-mean-it":
235 return -errno
.EPERM
, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
236 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
237 "that is what you want, re-issue the command followed by " \
238 "--yes-i-really-mean-it.".format(fs_name
)
240 command
= {'prefix': 'fs fail', 'fs_name': fs_name
}
241 r
, outb
, outs
= self
.mgr
.mon_command(command
)
245 command
= {'prefix': 'fs rm', 'fs_name': fs_name
, 'yes_i_really_mean_it': True}
246 return self
.mgr
.mon_command(command
)
248 def create_mds(self
, fs_name
):
249 spec
= orchestrator
.StatelessServiceSpec()
252 completion
= self
.mgr
.add_stateless_service("mds", spec
)
253 self
.mgr
._orchestrator
_wait
([completion
])
254 orchestrator
.raise_if_exception(completion
)
255 except (ImportError, orchestrator
.OrchestratorError
):
256 return 0, "", "Volume created successfully (no MDS daemons created)"
257 except Exception as e
:
258 # Don't let detailed orchestrator exceptions (python backtraces)
259 # bubble out to the user
260 log
.exception("Failed to create MDS daemons")
261 return -errno
.EINVAL
, "", str(e
)
264 ### volume operations -- create, rm, ls
266 def create_volume(self
, volname
):
268 create volume (pool, filesystem and mds)
270 metadata_pool
, data_pool
= self
.gen_pool_names(volname
)
272 r
, outs
, outb
= self
.create_pool(metadata_pool
, 16)
275 r
, outb
, outs
= self
.create_pool(data_pool
, 8)
279 r
, outb
, outs
= self
.create_filesystem(volname
, metadata_pool
, data_pool
)
281 log
.error("Filesystem creation error: {0} {1} {2}".format(r
, outb
, outs
))
284 return self
.create_mds(volname
)
286 def delete_volume(self
, volname
, confirm
):
288 delete the given module (tear down mds, remove filesystem)
290 self
.purge_queue
.cancel_purge_job(volname
)
291 self
.connection_pool
.del_fs_handle(volname
)
292 # Tear down MDS daemons
294 completion
= self
.mgr
.remove_stateless_service("mds", volname
)
295 self
.mgr
._orchestrator
_wait
([completion
])
296 orchestrator
.raise_if_exception(completion
)
297 except (ImportError, orchestrator
.OrchestratorError
):
298 log
.warning("OrchestratorError, not tearing down MDS daemons")
299 except Exception as e
:
300 # Don't let detailed orchestrator exceptions (python backtraces)
301 # bubble out to the user
302 log
.exception("Failed to tear down MDS daemons")
303 return -errno
.EINVAL
, "", str(e
)
305 # In case orchestrator didn't tear down MDS daemons cleanly, or
306 # there was no orchestrator, we force the daemons down.
307 if self
.volume_exists(volname
):
308 r
, outb
, outs
= self
.remove_filesystem(volname
, confirm
)
312 err
= "Filesystem not found for volume '{0}'".format(volname
)
314 return -errno
.ENOENT
, "", err
315 metadata_pool
, data_pool
= self
.gen_pool_names(volname
)
316 r
, outb
, outs
= self
.remove_pool(metadata_pool
)
319 return self
.remove_pool(data_pool
)
321 def list_volumes(self
):
323 fs_map
= self
.mgr
.get("fs_map")
324 for f
in fs_map
['filesystems']:
325 result
.append({'name': f
['mdsmap']['fs_name']})
326 return 0, json
.dumps(result
, indent
=2), ""
328 def group_exists(self
, sv
, spec
):
329 # default group need not be explicitly created (as it gets created
330 # at the time of subvolume, snapshot and other create operations).
331 return spec
.is_default_group() or sv
.get_group_path(spec
)
334 def octal_str_to_decimal_int(mode
):
338 raise VolumeException(-errno
.EINVAL
, "Invalid mode '{0}'".format(mode
))
340 def connection_pool_wrap(func
):
342 decorator that wraps subvolume calls by fetching filesystem handle
343 from the connection pool when fs_handle argument is empty, otherwise
344 just invoke func with the passed in filesystem handle. Also handles
345 call made to non-existent volumes (only when fs_handle is empty).
347 def conn_wrapper(self
, fs_handle
, **kwargs
):
349 fs_name
= kwargs
['vol_name']
350 # note that force arg is available for remove type commands
351 force
= kwargs
.get('force', False)
353 # fetch the connection from the pool
356 fs_h
= self
.connection_pool
.get_fs_handle(fs_name
)
357 except VolumeException
as ve
:
359 return self
.volume_exception_to_retval(ve
)
362 # invoke the actual routine w/ fs handle
363 result
= func(self
, fs_h
, **kwargs
)
365 # hand over the connection back to the pool
367 self
.connection_pool
.put_fs_handle(fs_name
)
371 def nametojson(self
, names
):
373 convert the list of names to json
377 for i
in range(len(names
)):
378 namedict
.append({'name': names
[i
].decode('utf-8')})
379 return json
.dumps(namedict
, indent
=2)
381 ### subvolume operations
383 @connection_pool_wrap
384 def create_subvolume(self
, fs_handle
, **kwargs
):
386 volname
= kwargs
['vol_name']
387 subvolname
= kwargs
['sub_name']
388 groupname
= kwargs
['group_name']
389 size
= kwargs
['size']
390 pool
= kwargs
['pool_layout']
391 mode
= kwargs
['mode']
394 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
395 spec
= SubvolumeSpec(subvolname
, groupname
)
396 if not self
.group_exists(sv
, spec
):
397 raise VolumeException(
398 -errno
.ENOENT
, "Subvolume group '{0}' not found, create it with " \
399 "`ceph fs subvolumegroup create` before creating subvolumes".format(groupname
))
400 sv
.create_subvolume(spec
, size
, pool
=pool
, mode
=self
.octal_str_to_decimal_int(mode
))
401 except VolumeException
as ve
:
402 ret
= self
.volume_exception_to_retval(ve
)
405 @connection_pool_wrap
406 def remove_subvolume(self
, fs_handle
, **kwargs
):
408 volname
= kwargs
['vol_name']
409 subvolname
= kwargs
['sub_name']
410 groupname
= kwargs
['group_name']
411 force
= kwargs
['force']
413 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
414 spec
= SubvolumeSpec(subvolname
, groupname
)
415 if self
.group_exists(sv
, spec
):
416 sv
.remove_subvolume(spec
, force
)
417 self
.purge_queue
.queue_purge_job(volname
)
419 raise VolumeException(
420 -errno
.ENOENT
, "Subvolume group '{0}' not found, cannot remove " \
421 "subvolume '{1}'".format(groupname
, subvolname
))
422 except VolumeException
as ve
:
423 ret
= self
.volume_exception_to_retval(ve
)
426 @connection_pool_wrap
427 def subvolume_getpath(self
, fs_handle
, **kwargs
):
429 volname
= kwargs
['vol_name']
430 subvolname
= kwargs
['sub_name']
431 groupname
= kwargs
['group_name']
433 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
434 spec
= SubvolumeSpec(subvolname
, groupname
)
435 if not self
.group_exists(sv
, spec
):
436 raise VolumeException(
437 -errno
.ENOENT
, "Subvolume group '{0}' not found".format(groupname
))
438 path
= sv
.get_subvolume_path(spec
)
440 raise VolumeException(
441 -errno
.ENOENT
, "Subvolume '{0}' not found".format(subvolname
))
442 ret
= 0, path
.decode("utf-8"), ""
443 except VolumeException
as ve
:
444 ret
= self
.volume_exception_to_retval(ve
)
447 @connection_pool_wrap
448 def list_subvolumes(self
, fs_handle
, **kwargs
):
450 groupname
= kwargs
['group_name']
453 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
454 spec
= SubvolumeSpec(None, groupname
)
455 if not self
.group_exists(sv
, spec
):
456 raise VolumeException(
457 -errno
.ENOENT
, "Subvolume group '{0}' not found".format(groupname
))
458 path
= sv
.get_group_path(spec
)
459 # When default subvolume group is not yet created we just return an empty list.
463 subvolumes
= sv
.get_dir_entries(path
)
464 ret
= 0, self
.nametojson(subvolumes
), ""
465 except VolumeException
as ve
:
466 ret
= self
.volume_exception_to_retval(ve
)
469 ### subvolume snapshot
471 @connection_pool_wrap
472 def create_subvolume_snapshot(self
, fs_handle
, **kwargs
):
474 volname
= kwargs
['vol_name']
475 subvolname
= kwargs
['sub_name']
476 snapname
= kwargs
['snap_name']
477 groupname
= kwargs
['group_name']
480 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
481 spec
= SubvolumeSpec(subvolname
, groupname
)
482 if not self
.group_exists(sv
, spec
):
483 raise VolumeException(
484 -errno
.ENOENT
, "Subvolume group '{0}' not found, cannot create " \
485 "snapshot '{1}'".format(groupname
, snapname
))
486 if not sv
.get_subvolume_path(spec
):
487 raise VolumeException(
488 -errno
.ENOENT
, "Subvolume '{0}' not found, cannot create snapshot " \
489 "'{1}'".format(subvolname
, snapname
))
490 sv
.create_subvolume_snapshot(spec
, snapname
)
491 except VolumeException
as ve
:
492 ret
= self
.volume_exception_to_retval(ve
)
495 @connection_pool_wrap
496 def remove_subvolume_snapshot(self
, fs_handle
, **kwargs
):
498 volname
= kwargs
['vol_name']
499 subvolname
= kwargs
['sub_name']
500 snapname
= kwargs
['snap_name']
501 groupname
= kwargs
['group_name']
502 force
= kwargs
['force']
504 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
505 spec
= SubvolumeSpec(subvolname
, groupname
)
506 if self
.group_exists(sv
, spec
):
507 if sv
.get_subvolume_path(spec
):
508 sv
.remove_subvolume_snapshot(spec
, snapname
, force
)
510 raise VolumeException(
511 -errno
.ENOENT
, "Subvolume '{0}' not found, cannot remove " \
512 "subvolume snapshot '{1}'".format(subvolname
, snapname
))
514 raise VolumeException(
515 -errno
.ENOENT
, "Subvolume group '{0}' already removed, cannot " \
516 "remove subvolume snapshot '{1}'".format(groupname
, snapname
))
517 except VolumeException
as ve
:
518 ret
= self
.volume_exception_to_retval(ve
)
521 @connection_pool_wrap
522 def list_subvolume_snapshots(self
, fs_handle
, **kwargs
):
524 subvolname
= kwargs
['sub_name']
525 groupname
= kwargs
['group_name']
528 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
529 spec
= SubvolumeSpec(subvolname
, groupname
)
530 if not self
.group_exists(sv
, spec
):
531 raise VolumeException(
532 -errno
.ENOENT
, "Subvolume group '{0}' not found".format(groupname
))
534 if sv
.get_subvolume_path(spec
) == None:
535 raise VolumeException(-errno
.ENOENT
,
536 "Subvolume '{0}' not found".format(subvolname
))
538 path
= spec
.make_subvol_snapdir_path(self
.mgr
.rados
.conf_get('client_snapdir'))
539 snapshots
= sv
.get_dir_entries(path
)
540 ret
= 0, self
.nametojson(snapshots
), ""
541 except VolumeException
as ve
:
542 ret
= self
.volume_exception_to_retval(ve
)
548 @connection_pool_wrap
549 def create_subvolume_group(self
, fs_handle
, **kwargs
):
551 volname
= kwargs
['vol_name']
552 groupname
= kwargs
['group_name']
553 pool
= kwargs
['pool_layout']
554 mode
= kwargs
['mode']
557 # TODO: validate that subvol size fits in volume size
558 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
559 spec
= SubvolumeSpec("", groupname
)
560 sv
.create_group(spec
, pool
=pool
, mode
=self
.octal_str_to_decimal_int(mode
))
561 except VolumeException
as ve
:
562 ret
= self
.volume_exception_to_retval(ve
)
565 @connection_pool_wrap
566 def remove_subvolume_group(self
, fs_handle
, **kwargs
):
568 volname
= kwargs
['vol_name']
569 groupname
= kwargs
['group_name']
570 force
= kwargs
['force']
572 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
573 # TODO: check whether there are no subvolumes in the group
574 spec
= SubvolumeSpec("", groupname
)
575 sv
.remove_group(spec
, force
)
576 except VolumeException
as ve
:
577 ret
= self
.volume_exception_to_retval(ve
)
580 @connection_pool_wrap
581 def getpath_subvolume_group(self
, fs_handle
, **kwargs
):
582 groupname
= kwargs
['group_name']
584 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
585 spec
= SubvolumeSpec("", groupname
)
586 path
= sv
.get_group_path(spec
)
588 raise VolumeException(
589 -errno
.ENOENT
, "Subvolume group '{0}' not found".format(groupname
))
590 return 0, path
.decode("utf-8"), ""
591 except VolumeException
as ve
:
592 return self
.volume_exception_to_retval(ve
)
594 @connection_pool_wrap
595 def list_subvolume_groups(self
, fs_handle
, **kwargs
):
599 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
600 subvolumegroups
= sv
.get_dir_entries(SubvolumeSpec
.DEFAULT_SUBVOL_PREFIX
)
601 ret
= 0, self
.nametojson(subvolumegroups
), ""
602 except VolumeException
as ve
:
603 ret
= self
.volume_exception_to_retval(ve
)
608 @connection_pool_wrap
609 def create_subvolume_group_snapshot(self
, fs_handle
, **kwargs
):
611 volname
= kwargs
['vol_name']
612 groupname
= kwargs
['group_name']
613 snapname
= kwargs
['snap_name']
615 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
616 spec
= SubvolumeSpec("", groupname
)
617 if not self
.group_exists(sv
, spec
):
618 raise VolumeException(
619 -errno
.ENOENT
, "Subvolume group '{0}' not found, cannot create " \
620 "snapshot '{1}'".format(groupname
, snapname
))
621 sv
.create_group_snapshot(spec
, snapname
)
622 except VolumeException
as ve
:
623 ret
= self
.volume_exception_to_retval(ve
)
626 @connection_pool_wrap
627 def remove_subvolume_group_snapshot(self
, fs_handle
, **kwargs
):
629 volname
= kwargs
['vol_name']
630 groupname
= kwargs
['group_name']
631 snapname
= kwargs
['snap_name']
632 force
= kwargs
['force']
634 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
635 spec
= SubvolumeSpec("", groupname
)
636 if self
.group_exists(sv
, spec
):
637 sv
.remove_group_snapshot(spec
, snapname
, force
)
639 raise VolumeException(
640 -errno
.ENOENT
, "Subvolume group '{0}' not found, cannot " \
641 "remove it".format(groupname
))
642 except VolumeException
as ve
:
643 ret
= self
.volume_exception_to_retval(ve
)
646 @connection_pool_wrap
647 def list_subvolume_group_snapshots(self
, fs_handle
, **kwargs
):
649 groupname
= kwargs
['group_name']
652 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
653 spec
= SubvolumeSpec(None, groupname
)
654 if not self
.group_exists(sv
, spec
):
655 raise VolumeException(
656 -errno
.ENOENT
, "Subvolume group '{0}' not found".format(groupname
))
658 path
= spec
.make_group_snapdir_path(self
.mgr
.rados
.conf_get('client_snapdir'))
659 snapshots
= sv
.get_dir_entries(path
)
660 ret
= 0, self
.nametojson(snapshots
), ""
661 except VolumeException
as ve
:
662 ret
= self
.volume_exception_to_retval(ve
)
665 @connection_pool_wrap
666 def get_subvolume_trash_entry(self
, fs_handle
, **kwargs
):
668 volname
= kwargs
['vol_name']
669 exclude
= kwargs
.get('exclude_entries', [])
672 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
673 spec
= SubvolumeSpec("", "")
674 path
= sv
.get_trash_entry(spec
, exclude
)
676 except VolumeException
as ve
:
677 ret
= self
.volume_exception_to_retval(ve
)
680 @connection_pool_wrap
681 def purge_subvolume_trash_entry(self
, fs_handle
, **kwargs
):
683 volname
= kwargs
['vol_name']
684 purge_dir
= kwargs
['purge_dir']
685 should_cancel
= kwargs
.get('should_cancel', lambda: False)
688 with
SubVolume(self
.mgr
, fs_handle
) as sv
:
689 spec
= SubvolumeSpec(purge_dir
.decode('utf-8'), "")
690 sv
.purge_subvolume(spec
, should_cancel
)
691 except VolumeException
as ve
:
692 ret
= self
.volume_exception_to_retval(ve
)