]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/volumes/fs/async_cloner.py
import ceph quincy 17.2.1
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_cloner.py
CommitLineData
92f5a8d4
TL
1import os
2import stat
3import time
4import errno
5import logging
6from contextlib import contextmanager
1d09f67e 7from typing import Optional
92f5a8d4
TL
8
9import cephfs
f67539c2 10from mgr_util import lock_timeout_log
92f5a8d4
TL
11
12from .async_job import AsyncJobs
13from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
14from .fs_util import copy_file
adb31ebb
TL
15from .operations.versions.op_sm import SubvolumeOpSm
16from .operations.versions.subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions
92f5a8d4
TL
17from .operations.resolver import resolve
18from .operations.volume import open_volume, open_volume_lockless
19from .operations.group import open_group
20from .operations.subvolume import open_subvol
21from .operations.clone_index import open_clone_index
adb31ebb 22from .operations.template import SubvolumeOpType
92f5a8d4
TL
23
24log = logging.getLogger(__name__)
25
26# helper for fetching a clone entry for a given volume
522d829b 27def get_next_clone_entry(fs_client, volspec, volname, running_jobs):
92f5a8d4
TL
28 log.debug("fetching clone entry for volume '{0}'".format(volname))
29
30 try:
522d829b 31 with open_volume_lockless(fs_client, volname) as fs_handle:
92f5a8d4 32 try:
522d829b 33 with open_clone_index(fs_handle, volspec) as clone_index:
92f5a8d4
TL
34 job = clone_index.get_oldest_clone_entry(running_jobs)
35 return 0, job
36 except IndexException as ve:
37 if ve.errno == -errno.ENOENT:
38 return 0, None
39 raise ve
40 except VolumeException as ve:
9f95a23c 41 log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
92f5a8d4 42 return ve.errno, None
92f5a8d4
TL
43
44@contextmanager
522d829b
TL
45def open_at_volume(fs_client, volspec, volname, groupname, subvolname, op_type):
46 with open_volume(fs_client, volname) as fs_handle:
47 with open_group(fs_handle, volspec, groupname) as group:
48 with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume:
92f5a8d4
TL
49 yield subvolume
50
51@contextmanager
522d829b
TL
52def open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, op_type):
53 with open_group(fs_handle, volspec, groupname) as group:
54 with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume:
92f5a8d4
TL
55 yield subvolume
56
adb31ebb 57@contextmanager
522d829b 58def open_at_group_unique(fs_client, fs_handle, volspec, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type):
adb31ebb
TL
59 # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return
60 # the clone subvolume as the source subvolume
61 if s_groupname == c_groupname and s_subvolname == c_subvolname:
62 yield c_subvolume
63 else:
522d829b 64 with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, op_type) as s_subvolume:
adb31ebb
TL
65 yield s_subvolume
66
67
68@contextmanager
522d829b
TL
69def open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname):
70 with open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
adb31ebb
TL
71 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
72 if groupname == s_groupname and subvolname == s_subvolname:
73 # use the same subvolume to avoid metadata overwrites
74 yield (clone_subvolume, clone_subvolume, s_snapname)
75 else:
522d829b 76 with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
adb31ebb
TL
77 yield (clone_subvolume, source_subvolume, s_snapname)
78
522d829b
TL
79def get_clone_state(fs_client, volspec, volname, groupname, subvolname):
80 with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
92f5a8d4
TL
81 return subvolume.state
82
522d829b
TL
83def set_clone_state(fs_client, volspec, volname, groupname, subvolname, state):
84 with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
92f5a8d4
TL
85 subvolume.state = (state, True)
86
87def get_clone_source(clone_subvolume):
88 source = clone_subvolume._get_clone_source()
89 return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot'])
90
adb31ebb
TL
91def get_next_state_on_error(errnum):
92 if errnum == -errno.EINTR:
93 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
94 SubvolumeStates.STATE_INPROGRESS,
95 SubvolumeActions.ACTION_CANCELLED)
96 else:
97 # jump to failed state, on all other errors
98 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
99 SubvolumeStates.STATE_INPROGRESS,
100 SubvolumeActions.ACTION_FAILED)
101 return next_state
102
522d829b 103def handle_clone_pending(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
92f5a8d4 104 try:
9f95a23c 105 if should_cancel():
adb31ebb
TL
106 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
107 SubvolumeStates.STATE_PENDING,
108 SubvolumeActions.ACTION_CANCELLED)
33c7a0ef
TL
109 update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname,
110 VolumeException(-errno.EINTR, "user interrupted clone operation"))
9f95a23c 111 else:
adb31ebb
TL
112 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
113 SubvolumeStates.STATE_PENDING,
114 SubvolumeActions.ACTION_SUCCESS)
92f5a8d4 115 except OpSmException as oe:
9f95a23c 116 raise VolumeException(oe.errno, oe.error_str)
92f5a8d4
TL
117 return (next_state, False)
118
9f95a23c
TL
119def sync_attrs(fs_handle, target_path, source_statx):
120 try:
121 fs_handle.lchown(target_path, source_statx["uid"], source_statx["gid"])
122 fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()),
123 time.mktime(source_statx["mtime"].timetuple())))
f67539c2 124 fs_handle.lchmod(target_path, source_statx["mode"])
9f95a23c 125 except cephfs.Error as e:
e306af50 126 log.warning("error synchronizing attrs for {0} ({1})".format(target_path, e))
9f95a23c
TL
127 raise e
128
92f5a8d4
TL
129def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
130 """
131 bulk copy data from source to destination -- only directories, symlinks
9f95a23c 132 and regular files are synced.
92f5a8d4
TL
133 """
134 log.info("copying data from {0} to {1}".format(source_path, dst_path))
135 def cptree(src_root_path, dst_root_path):
136 log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path))
137 try:
138 with fs_handle.opendir(src_root_path) as dir_handle:
139 d = fs_handle.readdir(dir_handle)
9f95a23c 140 while d and not should_cancel():
92f5a8d4
TL
141 if d.d_name not in (b".", b".."):
142 log.debug("d={0}".format(d))
143 d_full_src = os.path.join(src_root_path, d.d_name)
144 d_full_dst = os.path.join(dst_root_path, d.d_name)
9f95a23c
TL
145 stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE |
146 cephfs.CEPH_STATX_UID |
147 cephfs.CEPH_STATX_GID |
148 cephfs.CEPH_STATX_ATIME |
149 cephfs.CEPH_STATX_MTIME |
150 cephfs.CEPH_STATX_SIZE,
151 cephfs.AT_SYMLINK_NOFOLLOW)
152 handled = True
153 mo = stx["mode"] & ~stat.S_IFMT(stx["mode"])
154 if stat.S_ISDIR(stx["mode"]):
92f5a8d4
TL
155 log.debug("cptree: (DIR) {0}".format(d_full_src))
156 try:
157 fs_handle.mkdir(d_full_dst, mo)
92f5a8d4
TL
158 except cephfs.Error as e:
159 if not e.args[0] == errno.EEXIST:
160 raise
161 cptree(d_full_src, d_full_dst)
9f95a23c 162 elif stat.S_ISLNK(stx["mode"]):
92f5a8d4
TL
163 log.debug("cptree: (SYMLINK) {0}".format(d_full_src))
164 target = fs_handle.readlink(d_full_src, 4096)
165 try:
9f95a23c 166 fs_handle.symlink(target[:stx["size"]], d_full_dst)
92f5a8d4
TL
167 except cephfs.Error as e:
168 if not e.args[0] == errno.EEXIST:
169 raise
9f95a23c 170 elif stat.S_ISREG(stx["mode"]):
92f5a8d4 171 log.debug("cptree: (REG) {0}".format(d_full_src))
9f95a23c 172 copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel)
92f5a8d4 173 else:
9f95a23c 174 handled = False
e306af50 175 log.warning("cptree: (IGNORE) {0}".format(d_full_src))
9f95a23c
TL
176 if handled:
177 sync_attrs(fs_handle, d_full_dst, stx)
92f5a8d4 178 d = fs_handle.readdir(dir_handle)
9f95a23c
TL
179 stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME |
180 cephfs.CEPH_STATX_MTIME,
181 cephfs.AT_SYMLINK_NOFOLLOW)
182 fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()),
183 time.mktime(stx_root["mtime"].timetuple())))
92f5a8d4
TL
184 except cephfs.Error as e:
185 if not e.args[0] == errno.ENOENT:
186 raise VolumeException(-e.args[0], e.args[1])
187 cptree(source_path, dst_path)
9f95a23c 188 if should_cancel():
33c7a0ef 189 raise VolumeException(-errno.EINTR, "user interrupted clone operation")
92f5a8d4 190
20effc67 191def set_quota_on_clone(fs_handle, clone_volumes_pair):
20effc67
TL
192 src_path = clone_volumes_pair[1].snapshot_data_path(clone_volumes_pair[2])
193 dst_path = clone_volumes_pair[0].path
1d09f67e 194 quota = None # type: Optional[int]
20effc67 195 try:
1d09f67e 196 quota = int(fs_handle.getxattr(src_path, 'ceph.quota.max_bytes').decode('utf-8'))
20effc67 197 except cephfs.NoData:
1d09f67e 198 pass
20effc67 199
1d09f67e
TL
200 if quota is not None:
201 try:
202 fs_handle.setxattr(dst_path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0)
203 except cephfs.InvalidValue:
204 raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota))
205 except cephfs.Error as e:
206 raise VolumeException(-e.args[0], e.args[1])
207
208 quota_files = None # type: Optional[int]
209 try:
210 quota_files = int(fs_handle.getxattr(src_path, 'ceph.quota.max_files').decode('utf-8'))
211 except cephfs.NoData:
212 pass
213
214 if quota_files is not None:
215 try:
216 fs_handle.setxattr(dst_path, 'ceph.quota.max_files', str(quota_files).encode('utf-8'), 0)
217 except cephfs.InvalidValue:
218 raise VolumeException(-errno.EINVAL, "invalid file count specified: '{0}'".format(quota_files))
219 except cephfs.Error as e:
220 raise VolumeException(-e.args[0], e.args[1])
20effc67 221
522d829b
TL
222def do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel):
223 with open_volume_lockless(fs_client, volname) as fs_handle:
224 with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
adb31ebb
TL
225 src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2])
226 dst_path = clone_volumes[0].path
227 bulk_copy(fs_handle, src_path, dst_path, should_cancel)
20effc67
TL
228 set_quota_on_clone(fs_handle, clone_volumes)
229
33c7a0ef
TL
230def update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname, ve):
231 with open_volume_lockless(fs_client, volname) as fs_handle:
232 with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
233 if ve.errno == -errno.EINTR:
234 clone_volumes[0].add_clone_failure(-ve.errno, "user interrupted clone operation")
235 else:
236 clone_volumes[0].add_clone_failure(-ve.errno, ve.error_str)
237
20effc67
TL
238def log_clone_failure(volname, groupname, subvolname, ve):
239 if ve.errno == -errno.EINTR:
240 log.info("Clone cancelled: ({0}, {1}, {2})".format(volname, groupname, subvolname))
241 elif ve.errno == -errno.EDQUOT:
242 log.error("Clone failed: ({0}, {1}, {2}, reason -> Disk quota exceeded)".format(volname, groupname, subvolname))
243 else:
244 log.error("Clone failed: ({0}, {1}, {2}, reason -> {3})".format(volname, groupname, subvolname, ve))
92f5a8d4 245
522d829b 246def handle_clone_in_progress(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
92f5a8d4 247 try:
522d829b 248 do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel)
adb31ebb
TL
249 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
250 SubvolumeStates.STATE_INPROGRESS,
251 SubvolumeActions.ACTION_SUCCESS)
92f5a8d4 252 except VolumeException as ve:
33c7a0ef 253 update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname, ve)
20effc67 254 log_clone_failure(volname, groupname, subvolname, ve)
adb31ebb 255 next_state = get_next_state_on_error(ve.errno)
92f5a8d4 256 except OpSmException as oe:
9f95a23c 257 raise VolumeException(oe.errno, oe.error_str)
92f5a8d4
TL
258 return (next_state, False)
259
522d829b 260def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
92f5a8d4 261 try:
522d829b 262 with open_volume(fs_client, volname) as fs_handle:
adb31ebb 263 # detach source but leave the clone section intact for later inspection
522d829b 264 with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
adb31ebb 265 clone_volumes[1].detach_snapshot(clone_volumes[2], index)
92f5a8d4
TL
266 except (MetadataMgrException, VolumeException) as e:
267 log.error("failed to detach clone from snapshot: {0}".format(e))
268 return (None, True)
269
522d829b 270def handle_clone_complete(fs_client, volspec, volname, index, groupname, subvolname, should_cancel):
92f5a8d4 271 try:
522d829b
TL
272 with open_volume(fs_client, volname) as fs_handle:
273 with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes:
adb31ebb
TL
274 clone_volumes[1].detach_snapshot(clone_volumes[2], index)
275 clone_volumes[0].remove_clone_source(flush=True)
92f5a8d4
TL
276 except (MetadataMgrException, VolumeException) as e:
277 log.error("failed to detach clone from snapshot: {0}".format(e))
278 return (None, True)
279
522d829b 280def start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay):
92f5a8d4
TL
281 finished = False
282 current_state = None
283 try:
522d829b 284 current_state = get_clone_state(fs_client, volspec, volname, groupname, subvolname)
92f5a8d4 285 log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
522d829b
TL
286 if current_state == SubvolumeStates.STATE_PENDING:
287 time.sleep(snapshot_clone_delay)
288 log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay))
92f5a8d4
TL
289 while not finished:
290 handler = state_table.get(current_state, None)
291 if not handler:
292 raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state))
522d829b 293 (next_state, finished) = handler(fs_client, volspec, volname, index, groupname, subvolname, should_cancel)
92f5a8d4
TL
294 if next_state:
295 log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\
296 current_state, next_state))
522d829b 297 set_clone_state(fs_client, volspec, volname, groupname, subvolname, next_state)
92f5a8d4
TL
298 current_state = next_state
299 except VolumeException as ve:
300 log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
301 subvolname, current_state, ve))
302
522d829b 303def clone(fs_client, volspec, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay):
92f5a8d4 304 log.info("cloning to subvolume path: {0}".format(clone_path))
522d829b 305 resolved = resolve(volspec, clone_path)
92f5a8d4
TL
306
307 groupname = resolved[0]
308 subvolname = resolved[1]
309 log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname))
310
311 try:
312 log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
522d829b 313 start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay)
92f5a8d4
TL
314 log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
315 except VolumeException as ve:
316 log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
317
318class Cloner(AsyncJobs):
319 """
320 Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
adb31ebb 321 this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
92f5a8d4
TL
322 the driver. file types supported are directories, symbolic links and regular files.
323 """
522d829b 324 def __init__(self, volume_client, tp_size, snapshot_clone_delay):
92f5a8d4 325 self.vc = volume_client
522d829b 326 self.snapshot_clone_delay = snapshot_clone_delay
92f5a8d4 327 self.state_table = {
adb31ebb
TL
328 SubvolumeStates.STATE_PENDING : handle_clone_pending,
329 SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
330 SubvolumeStates.STATE_COMPLETE : handle_clone_complete,
331 SubvolumeStates.STATE_FAILED : handle_clone_failed,
332 SubvolumeStates.STATE_CANCELED : handle_clone_failed,
92f5a8d4
TL
333 }
334 super(Cloner, self).__init__(volume_client, "cloner", tp_size)
335
f91f0fd5 336 def reconfigure_max_concurrent_clones(self, tp_size):
f67539c2 337 return super(Cloner, self).reconfigure_max_async_threads(tp_size)
f91f0fd5 338
522d829b
TL
339 def reconfigure_snapshot_clone_delay(self, timeout):
340 self.snapshot_clone_delay = timeout
341
9f95a23c 342 def is_clone_cancelable(self, clone_state):
adb31ebb 343 return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
9f95a23c
TL
344
345 def get_clone_tracking_index(self, fs_handle, clone_subvolume):
346 with open_clone_index(fs_handle, self.vc.volspec) as index:
347 return index.find_clone_entry_index(clone_subvolume.base_path)
348
adb31ebb
TL
349 def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx):
350 clone_state = SubvolumeStates.from_value(status['state'])
9f95a23c
TL
351 assert self.is_clone_cancelable(clone_state)
352
353 s_groupname = status['source'].get('group', None)
354 s_subvolname = status['source']['subvolume']
355 s_snapname = status['source']['snapshot']
356
522d829b
TL
357 with open_at_group_unique(self.fs_client, fs_handle, self.vc.volspec, s_groupname, s_subvolname, clone_subvolume,
358 clone_groupname, clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
adb31ebb
TL
359 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
360 clone_state,
361 SubvolumeActions.ACTION_CANCELLED)
362 clone_subvolume.state = (next_state, True)
33c7a0ef 363 clone_subvolume.add_clone_failure(errno.EINTR, "user interrupted clone operation")
adb31ebb 364 s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
9f95a23c
TL
365
366 def cancel_job(self, volname, job):
367 """
368 override base class `cancel_job`. interpret @job as (clone, group) tuple.
369 """
370 clonename = job[0]
371 groupname = job[1]
372 track_idx = None
373
374 try:
522d829b 375 with open_volume(self.fs_client, volname) as fs_handle:
9f95a23c 376 with open_group(fs_handle, self.vc.volspec, groupname) as group:
522d829b 377 with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
9f95a23c 378 status = clone_subvolume.status
adb31ebb 379 clone_state = SubvolumeStates.from_value(status['state'])
9f95a23c
TL
380 if not self.is_clone_cancelable(clone_state):
381 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
382 track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume)
383 if not track_idx:
e306af50 384 log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path))
9f95a23c 385 raise VolumeException(-errno.EINVAL, "error canceling clone")
20effc67
TL
386 clone_job = (track_idx, clone_subvolume.base_path)
387 jobs = [j[0] for j in self.jobs[volname]]
388 with lock_timeout_log(self.lock):
389 if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state) and not clone_job in jobs:
390 logging.debug("Cancelling pending job {0}".format(clone_job))
391 # clone has not started yet -- cancel right away.
392 self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx)
393 return
9f95a23c
TL
394 # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
395 # to persist the new state, async cloner accesses the volume in exclusive mode.
396 # accessing the volume in exclusive mode here would lead to deadlock.
397 assert track_idx is not None
f67539c2 398 with lock_timeout_log(self.lock):
522d829b 399 with open_volume_lockless(self.fs_client, volname) as fs_handle:
9f95a23c 400 with open_group(fs_handle, self.vc.volspec, groupname) as group:
522d829b 401 with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
9f95a23c
TL
402 if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
403 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
404 except (IndexException, MetadataMgrException) as e:
405 log.error("error cancelling clone {0}: ({1})".format(job, e))
406 raise VolumeException(-errno.EINVAL, "error canceling clone")
407
92f5a8d4 408 def get_next_job(self, volname, running_jobs):
522d829b 409 return get_next_clone_entry(self.fs_client, self.vc.volspec, volname, running_jobs)
92f5a8d4
TL
410
411 def execute_job(self, volname, job, should_cancel):
522d829b 412 clone(self.fs_client, self.vc.volspec, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay)