]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | import os |
2 | import stat | |
3 | import time | |
4 | import errno | |
5 | import logging | |
6 | from contextlib import contextmanager | |
20effc67 | 7 | from typing import Dict, Union |
92f5a8d4 TL |
8 | |
9 | import cephfs | |
f67539c2 | 10 | from mgr_util import lock_timeout_log |
92f5a8d4 TL |
11 | |
12 | from .async_job import AsyncJobs | |
13 | from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException | |
14 | from .fs_util import copy_file | |
adb31ebb TL |
15 | from .operations.versions.op_sm import SubvolumeOpSm |
16 | from .operations.versions.subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions | |
92f5a8d4 TL |
17 | from .operations.resolver import resolve |
18 | from .operations.volume import open_volume, open_volume_lockless | |
19 | from .operations.group import open_group | |
20 | from .operations.subvolume import open_subvol | |
21 | from .operations.clone_index import open_clone_index | |
adb31ebb | 22 | from .operations.template import SubvolumeOpType |
92f5a8d4 TL |
23 | |
24 | log = logging.getLogger(__name__) | |
25 | ||
26 | # helper for fetching a clone entry for a given volume | |
522d829b | 27 | def get_next_clone_entry(fs_client, volspec, volname, running_jobs): |
92f5a8d4 TL |
28 | log.debug("fetching clone entry for volume '{0}'".format(volname)) |
29 | ||
30 | try: | |
522d829b | 31 | with open_volume_lockless(fs_client, volname) as fs_handle: |
92f5a8d4 | 32 | try: |
522d829b | 33 | with open_clone_index(fs_handle, volspec) as clone_index: |
92f5a8d4 TL |
34 | job = clone_index.get_oldest_clone_entry(running_jobs) |
35 | return 0, job | |
36 | except IndexException as ve: | |
37 | if ve.errno == -errno.ENOENT: | |
38 | return 0, None | |
39 | raise ve | |
40 | except VolumeException as ve: | |
9f95a23c | 41 | log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve)) |
92f5a8d4 | 42 | return ve.errno, None |
92f5a8d4 TL |
43 | |
44 | @contextmanager | |
522d829b TL |
45 | def open_at_volume(fs_client, volspec, volname, groupname, subvolname, op_type): |
46 | with open_volume(fs_client, volname) as fs_handle: | |
47 | with open_group(fs_handle, volspec, groupname) as group: | |
48 | with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume: | |
92f5a8d4 TL |
49 | yield subvolume |
50 | ||
51 | @contextmanager | |
522d829b TL |
52 | def open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, op_type): |
53 | with open_group(fs_handle, volspec, groupname) as group: | |
54 | with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume: | |
92f5a8d4 TL |
55 | yield subvolume |
56 | ||
adb31ebb | 57 | @contextmanager |
522d829b | 58 | def open_at_group_unique(fs_client, fs_handle, volspec, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type): |
adb31ebb TL |
59 | # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return |
60 | # the clone subvolume as the source subvolume | |
61 | if s_groupname == c_groupname and s_subvolname == c_subvolname: | |
62 | yield c_subvolume | |
63 | else: | |
522d829b | 64 | with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, op_type) as s_subvolume: |
adb31ebb TL |
65 | yield s_subvolume |
66 | ||
67 | ||
68 | @contextmanager | |
522d829b TL |
69 | def open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname): |
70 | with open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume: | |
adb31ebb TL |
71 | s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) |
72 | if groupname == s_groupname and subvolname == s_subvolname: | |
73 | # use the same subvolume to avoid metadata overwrites | |
74 | yield (clone_subvolume, clone_subvolume, s_snapname) | |
75 | else: | |
522d829b | 76 | with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume: |
adb31ebb TL |
77 | yield (clone_subvolume, source_subvolume, s_snapname) |
78 | ||
522d829b TL |
79 | def get_clone_state(fs_client, volspec, volname, groupname, subvolname): |
80 | with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: | |
92f5a8d4 TL |
81 | return subvolume.state |
82 | ||
522d829b TL |
83 | def set_clone_state(fs_client, volspec, volname, groupname, subvolname, state): |
84 | with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: | |
92f5a8d4 TL |
85 | subvolume.state = (state, True) |
86 | ||
87 | def get_clone_source(clone_subvolume): | |
88 | source = clone_subvolume._get_clone_source() | |
89 | return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot']) | |
90 | ||
adb31ebb TL |
91 | def get_next_state_on_error(errnum): |
92 | if errnum == -errno.EINTR: | |
93 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, | |
94 | SubvolumeStates.STATE_INPROGRESS, | |
95 | SubvolumeActions.ACTION_CANCELLED) | |
96 | else: | |
97 | # jump to failed state, on all other errors | |
98 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, | |
99 | SubvolumeStates.STATE_INPROGRESS, | |
100 | SubvolumeActions.ACTION_FAILED) | |
101 | return next_state | |
102 | ||
522d829b | 103 | def handle_clone_pending(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): |
92f5a8d4 | 104 | try: |
9f95a23c | 105 | if should_cancel(): |
adb31ebb TL |
106 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
107 | SubvolumeStates.STATE_PENDING, | |
108 | SubvolumeActions.ACTION_CANCELLED) | |
9f95a23c | 109 | else: |
adb31ebb TL |
110 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
111 | SubvolumeStates.STATE_PENDING, | |
112 | SubvolumeActions.ACTION_SUCCESS) | |
92f5a8d4 | 113 | except OpSmException as oe: |
9f95a23c | 114 | raise VolumeException(oe.errno, oe.error_str) |
92f5a8d4 TL |
115 | return (next_state, False) |
116 | ||
9f95a23c TL |
117 | def sync_attrs(fs_handle, target_path, source_statx): |
118 | try: | |
119 | fs_handle.lchown(target_path, source_statx["uid"], source_statx["gid"]) | |
120 | fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()), | |
121 | time.mktime(source_statx["mtime"].timetuple()))) | |
f67539c2 | 122 | fs_handle.lchmod(target_path, source_statx["mode"]) |
9f95a23c | 123 | except cephfs.Error as e: |
e306af50 | 124 | log.warning("error synchronizing attrs for {0} ({1})".format(target_path, e)) |
9f95a23c TL |
125 | raise e |
126 | ||
92f5a8d4 TL |
127 | def bulk_copy(fs_handle, source_path, dst_path, should_cancel): |
128 | """ | |
129 | bulk copy data from source to destination -- only directories, symlinks | |
9f95a23c | 130 | and regular files are synced. |
92f5a8d4 TL |
131 | """ |
132 | log.info("copying data from {0} to {1}".format(source_path, dst_path)) | |
133 | def cptree(src_root_path, dst_root_path): | |
134 | log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path)) | |
135 | try: | |
136 | with fs_handle.opendir(src_root_path) as dir_handle: | |
137 | d = fs_handle.readdir(dir_handle) | |
9f95a23c | 138 | while d and not should_cancel(): |
92f5a8d4 TL |
139 | if d.d_name not in (b".", b".."): |
140 | log.debug("d={0}".format(d)) | |
141 | d_full_src = os.path.join(src_root_path, d.d_name) | |
142 | d_full_dst = os.path.join(dst_root_path, d.d_name) | |
9f95a23c TL |
143 | stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE | |
144 | cephfs.CEPH_STATX_UID | | |
145 | cephfs.CEPH_STATX_GID | | |
146 | cephfs.CEPH_STATX_ATIME | | |
147 | cephfs.CEPH_STATX_MTIME | | |
148 | cephfs.CEPH_STATX_SIZE, | |
149 | cephfs.AT_SYMLINK_NOFOLLOW) | |
150 | handled = True | |
151 | mo = stx["mode"] & ~stat.S_IFMT(stx["mode"]) | |
152 | if stat.S_ISDIR(stx["mode"]): | |
92f5a8d4 TL |
153 | log.debug("cptree: (DIR) {0}".format(d_full_src)) |
154 | try: | |
155 | fs_handle.mkdir(d_full_dst, mo) | |
92f5a8d4 TL |
156 | except cephfs.Error as e: |
157 | if not e.args[0] == errno.EEXIST: | |
158 | raise | |
159 | cptree(d_full_src, d_full_dst) | |
9f95a23c | 160 | elif stat.S_ISLNK(stx["mode"]): |
92f5a8d4 TL |
161 | log.debug("cptree: (SYMLINK) {0}".format(d_full_src)) |
162 | target = fs_handle.readlink(d_full_src, 4096) | |
163 | try: | |
9f95a23c | 164 | fs_handle.symlink(target[:stx["size"]], d_full_dst) |
92f5a8d4 TL |
165 | except cephfs.Error as e: |
166 | if not e.args[0] == errno.EEXIST: | |
167 | raise | |
9f95a23c | 168 | elif stat.S_ISREG(stx["mode"]): |
92f5a8d4 | 169 | log.debug("cptree: (REG) {0}".format(d_full_src)) |
9f95a23c | 170 | copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel) |
92f5a8d4 | 171 | else: |
9f95a23c | 172 | handled = False |
e306af50 | 173 | log.warning("cptree: (IGNORE) {0}".format(d_full_src)) |
9f95a23c TL |
174 | if handled: |
175 | sync_attrs(fs_handle, d_full_dst, stx) | |
92f5a8d4 | 176 | d = fs_handle.readdir(dir_handle) |
9f95a23c TL |
177 | stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME | |
178 | cephfs.CEPH_STATX_MTIME, | |
179 | cephfs.AT_SYMLINK_NOFOLLOW) | |
180 | fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()), | |
181 | time.mktime(stx_root["mtime"].timetuple()))) | |
92f5a8d4 TL |
182 | except cephfs.Error as e: |
183 | if not e.args[0] == errno.ENOENT: | |
184 | raise VolumeException(-e.args[0], e.args[1]) | |
185 | cptree(source_path, dst_path) | |
9f95a23c TL |
186 | if should_cancel(): |
187 | raise VolumeException(-errno.EINTR, "clone operation interrupted") | |
92f5a8d4 | 188 | |
20effc67 TL |
189 | def set_quota_on_clone(fs_handle, clone_volumes_pair): |
190 | attrs = {} # type: Dict[str, Union[int, str, None]] | |
191 | src_path = clone_volumes_pair[1].snapshot_data_path(clone_volumes_pair[2]) | |
192 | dst_path = clone_volumes_pair[0].path | |
193 | try: | |
194 | attrs["quota"] = int(fs_handle.getxattr(src_path, | |
195 | 'ceph.quota.max_bytes' | |
196 | ).decode('utf-8')) | |
197 | except cephfs.NoData: | |
198 | attrs["quota"] = None | |
199 | ||
200 | if attrs["quota"] is not None: | |
201 | clone_volumes_pair[0].set_attrs(dst_path, attrs) | |
202 | ||
522d829b TL |
203 | def do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel): |
204 | with open_volume_lockless(fs_client, volname) as fs_handle: | |
205 | with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: | |
adb31ebb TL |
206 | src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2]) |
207 | dst_path = clone_volumes[0].path | |
208 | bulk_copy(fs_handle, src_path, dst_path, should_cancel) | |
20effc67 TL |
209 | set_quota_on_clone(fs_handle, clone_volumes) |
210 | ||
211 | def log_clone_failure(volname, groupname, subvolname, ve): | |
212 | if ve.errno == -errno.EINTR: | |
213 | log.info("Clone cancelled: ({0}, {1}, {2})".format(volname, groupname, subvolname)) | |
214 | elif ve.errno == -errno.EDQUOT: | |
215 | log.error("Clone failed: ({0}, {1}, {2}, reason -> Disk quota exceeded)".format(volname, groupname, subvolname)) | |
216 | else: | |
217 | log.error("Clone failed: ({0}, {1}, {2}, reason -> {3})".format(volname, groupname, subvolname, ve)) | |
92f5a8d4 | 218 | |
522d829b | 219 | def handle_clone_in_progress(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): |
92f5a8d4 | 220 | try: |
522d829b | 221 | do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel) |
adb31ebb TL |
222 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
223 | SubvolumeStates.STATE_INPROGRESS, | |
224 | SubvolumeActions.ACTION_SUCCESS) | |
92f5a8d4 | 225 | except VolumeException as ve: |
20effc67 | 226 | log_clone_failure(volname, groupname, subvolname, ve) |
adb31ebb | 227 | next_state = get_next_state_on_error(ve.errno) |
92f5a8d4 | 228 | except OpSmException as oe: |
9f95a23c | 229 | raise VolumeException(oe.errno, oe.error_str) |
92f5a8d4 TL |
230 | return (next_state, False) |
231 | ||
522d829b | 232 | def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): |
92f5a8d4 | 233 | try: |
522d829b | 234 | with open_volume(fs_client, volname) as fs_handle: |
adb31ebb | 235 | # detach source but leave the clone section intact for later inspection |
522d829b | 236 | with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: |
adb31ebb | 237 | clone_volumes[1].detach_snapshot(clone_volumes[2], index) |
92f5a8d4 TL |
238 | except (MetadataMgrException, VolumeException) as e: |
239 | log.error("failed to detach clone from snapshot: {0}".format(e)) | |
240 | return (None, True) | |
241 | ||
522d829b | 242 | def handle_clone_complete(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): |
92f5a8d4 | 243 | try: |
522d829b TL |
244 | with open_volume(fs_client, volname) as fs_handle: |
245 | with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: | |
adb31ebb TL |
246 | clone_volumes[1].detach_snapshot(clone_volumes[2], index) |
247 | clone_volumes[0].remove_clone_source(flush=True) | |
92f5a8d4 TL |
248 | except (MetadataMgrException, VolumeException) as e: |
249 | log.error("failed to detach clone from snapshot: {0}".format(e)) | |
250 | return (None, True) | |
251 | ||
522d829b | 252 | def start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay): |
92f5a8d4 TL |
253 | finished = False |
254 | current_state = None | |
255 | try: | |
522d829b | 256 | current_state = get_clone_state(fs_client, volspec, volname, groupname, subvolname) |
92f5a8d4 | 257 | log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state)) |
522d829b TL |
258 | if current_state == SubvolumeStates.STATE_PENDING: |
259 | time.sleep(snapshot_clone_delay) | |
260 | log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay)) | |
92f5a8d4 TL |
261 | while not finished: |
262 | handler = state_table.get(current_state, None) | |
263 | if not handler: | |
264 | raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state)) | |
522d829b | 265 | (next_state, finished) = handler(fs_client, volspec, volname, index, groupname, subvolname, should_cancel) |
92f5a8d4 TL |
266 | if next_state: |
267 | log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\ | |
268 | current_state, next_state)) | |
522d829b | 269 | set_clone_state(fs_client, volspec, volname, groupname, subvolname, next_state) |
92f5a8d4 TL |
270 | current_state = next_state |
271 | except VolumeException as ve: | |
272 | log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\ | |
273 | subvolname, current_state, ve)) | |
274 | ||
522d829b | 275 | def clone(fs_client, volspec, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay): |
92f5a8d4 | 276 | log.info("cloning to subvolume path: {0}".format(clone_path)) |
522d829b | 277 | resolved = resolve(volspec, clone_path) |
92f5a8d4 TL |
278 | |
279 | groupname = resolved[0] | |
280 | subvolname = resolved[1] | |
281 | log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname)) | |
282 | ||
283 | try: | |
284 | log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) | |
522d829b | 285 | start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay) |
92f5a8d4 TL |
286 | log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) |
287 | except VolumeException as ve: | |
288 | log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve)) | |
289 | ||
290 | class Cloner(AsyncJobs): | |
291 | """ | |
292 | Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume. | |
adb31ebb | 293 | this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as |
92f5a8d4 TL |
294 | the driver. file types supported are directories, symbolic links and regular files. |
295 | """ | |
522d829b | 296 | def __init__(self, volume_client, tp_size, snapshot_clone_delay): |
92f5a8d4 | 297 | self.vc = volume_client |
522d829b | 298 | self.snapshot_clone_delay = snapshot_clone_delay |
92f5a8d4 | 299 | self.state_table = { |
adb31ebb TL |
300 | SubvolumeStates.STATE_PENDING : handle_clone_pending, |
301 | SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress, | |
302 | SubvolumeStates.STATE_COMPLETE : handle_clone_complete, | |
303 | SubvolumeStates.STATE_FAILED : handle_clone_failed, | |
304 | SubvolumeStates.STATE_CANCELED : handle_clone_failed, | |
92f5a8d4 TL |
305 | } |
306 | super(Cloner, self).__init__(volume_client, "cloner", tp_size) | |
307 | ||
f91f0fd5 | 308 | def reconfigure_max_concurrent_clones(self, tp_size): |
f67539c2 | 309 | return super(Cloner, self).reconfigure_max_async_threads(tp_size) |
f91f0fd5 | 310 | |
522d829b TL |
311 | def reconfigure_snapshot_clone_delay(self, timeout): |
312 | self.snapshot_clone_delay = timeout | |
313 | ||
9f95a23c | 314 | def is_clone_cancelable(self, clone_state): |
adb31ebb | 315 | return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) |
9f95a23c TL |
316 | |
317 | def get_clone_tracking_index(self, fs_handle, clone_subvolume): | |
318 | with open_clone_index(fs_handle, self.vc.volspec) as index: | |
319 | return index.find_clone_entry_index(clone_subvolume.base_path) | |
320 | ||
adb31ebb TL |
321 | def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx): |
322 | clone_state = SubvolumeStates.from_value(status['state']) | |
9f95a23c TL |
323 | assert self.is_clone_cancelable(clone_state) |
324 | ||
325 | s_groupname = status['source'].get('group', None) | |
326 | s_subvolname = status['source']['subvolume'] | |
327 | s_snapname = status['source']['snapshot'] | |
328 | ||
522d829b TL |
329 | with open_at_group_unique(self.fs_client, fs_handle, self.vc.volspec, s_groupname, s_subvolname, clone_subvolume, |
330 | clone_groupname, clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: | |
adb31ebb TL |
331 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
332 | clone_state, | |
333 | SubvolumeActions.ACTION_CANCELLED) | |
334 | clone_subvolume.state = (next_state, True) | |
335 | s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8')) | |
9f95a23c TL |
336 | |
337 | def cancel_job(self, volname, job): | |
338 | """ | |
339 | override base class `cancel_job`. interpret @job as (clone, group) tuple. | |
340 | """ | |
341 | clonename = job[0] | |
342 | groupname = job[1] | |
343 | track_idx = None | |
344 | ||
345 | try: | |
522d829b | 346 | with open_volume(self.fs_client, volname) as fs_handle: |
9f95a23c | 347 | with open_group(fs_handle, self.vc.volspec, groupname) as group: |
522d829b | 348 | with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: |
9f95a23c | 349 | status = clone_subvolume.status |
adb31ebb | 350 | clone_state = SubvolumeStates.from_value(status['state']) |
9f95a23c TL |
351 | if not self.is_clone_cancelable(clone_state): |
352 | raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") | |
353 | track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume) | |
354 | if not track_idx: | |
e306af50 | 355 | log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path)) |
9f95a23c | 356 | raise VolumeException(-errno.EINVAL, "error canceling clone") |
20effc67 TL |
357 | clone_job = (track_idx, clone_subvolume.base_path) |
358 | jobs = [j[0] for j in self.jobs[volname]] | |
359 | with lock_timeout_log(self.lock): | |
360 | if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state) and not clone_job in jobs: | |
361 | logging.debug("Cancelling pending job {0}".format(clone_job)) | |
362 | # clone has not started yet -- cancel right away. | |
363 | self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx) | |
364 | return | |
9f95a23c TL |
365 | # cancelling an on-going clone would persist "canceled" state in subvolume metadata. |
366 | # to persist the new state, async cloner accesses the volume in exclusive mode. | |
367 | # accessing the volume in exclusive mode here would lead to deadlock. | |
368 | assert track_idx is not None | |
f67539c2 | 369 | with lock_timeout_log(self.lock): |
522d829b | 370 | with open_volume_lockless(self.fs_client, volname) as fs_handle: |
9f95a23c | 371 | with open_group(fs_handle, self.vc.volspec, groupname) as group: |
522d829b | 372 | with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: |
9f95a23c TL |
373 | if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)): |
374 | raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") | |
375 | except (IndexException, MetadataMgrException) as e: | |
376 | log.error("error cancelling clone {0}: ({1})".format(job, e)) | |
377 | raise VolumeException(-errno.EINVAL, "error canceling clone") | |
378 | ||
92f5a8d4 | 379 | def get_next_job(self, volname, running_jobs): |
522d829b | 380 | return get_next_clone_entry(self.fs_client, self.vc.volspec, volname, running_jobs) |
92f5a8d4 TL |
381 | |
382 | def execute_job(self, volname, job, should_cancel): | |
522d829b | 383 | clone(self.fs_client, self.vc.volspec, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay) |