]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | import os |
2 | import stat | |
3 | import time | |
4 | import errno | |
5 | import logging | |
6 | from contextlib import contextmanager | |
1d09f67e | 7 | from typing import Optional |
92f5a8d4 TL |
8 | |
9 | import cephfs | |
f67539c2 | 10 | from mgr_util import lock_timeout_log |
92f5a8d4 TL |
11 | |
12 | from .async_job import AsyncJobs | |
13 | from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException | |
14 | from .fs_util import copy_file | |
adb31ebb TL |
15 | from .operations.versions.op_sm import SubvolumeOpSm |
16 | from .operations.versions.subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions | |
92f5a8d4 TL |
17 | from .operations.resolver import resolve |
18 | from .operations.volume import open_volume, open_volume_lockless | |
19 | from .operations.group import open_group | |
20 | from .operations.subvolume import open_subvol | |
21 | from .operations.clone_index import open_clone_index | |
adb31ebb | 22 | from .operations.template import SubvolumeOpType |
92f5a8d4 TL |
23 | |
24 | log = logging.getLogger(__name__) | |
25 | ||
26 | # helper for fetching a clone entry for a given volume | |
522d829b | 27 | def get_next_clone_entry(fs_client, volspec, volname, running_jobs): |
92f5a8d4 TL |
28 | log.debug("fetching clone entry for volume '{0}'".format(volname)) |
29 | ||
30 | try: | |
522d829b | 31 | with open_volume_lockless(fs_client, volname) as fs_handle: |
92f5a8d4 | 32 | try: |
522d829b | 33 | with open_clone_index(fs_handle, volspec) as clone_index: |
92f5a8d4 TL |
34 | job = clone_index.get_oldest_clone_entry(running_jobs) |
35 | return 0, job | |
36 | except IndexException as ve: | |
37 | if ve.errno == -errno.ENOENT: | |
38 | return 0, None | |
39 | raise ve | |
40 | except VolumeException as ve: | |
9f95a23c | 41 | log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve)) |
92f5a8d4 | 42 | return ve.errno, None |
92f5a8d4 TL |
43 | |
44 | @contextmanager | |
522d829b TL |
45 | def open_at_volume(fs_client, volspec, volname, groupname, subvolname, op_type): |
46 | with open_volume(fs_client, volname) as fs_handle: | |
47 | with open_group(fs_handle, volspec, groupname) as group: | |
48 | with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume: | |
92f5a8d4 TL |
49 | yield subvolume |
50 | ||
51 | @contextmanager | |
522d829b TL |
52 | def open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, op_type): |
53 | with open_group(fs_handle, volspec, groupname) as group: | |
54 | with open_subvol(fs_client.mgr, fs_handle, volspec, group, subvolname, op_type) as subvolume: | |
92f5a8d4 TL |
55 | yield subvolume |
56 | ||
adb31ebb | 57 | @contextmanager |
522d829b | 58 | def open_at_group_unique(fs_client, fs_handle, volspec, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type): |
adb31ebb TL |
59 | # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return |
60 | # the clone subvolume as the source subvolume | |
61 | if s_groupname == c_groupname and s_subvolname == c_subvolname: | |
62 | yield c_subvolume | |
63 | else: | |
522d829b | 64 | with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, op_type) as s_subvolume: |
adb31ebb TL |
65 | yield s_subvolume |
66 | ||
67 | ||
68 | @contextmanager | |
522d829b TL |
69 | def open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname): |
70 | with open_at_group(fs_client, fs_handle, volspec, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume: | |
adb31ebb TL |
71 | s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) |
72 | if groupname == s_groupname and subvolname == s_subvolname: | |
73 | # use the same subvolume to avoid metadata overwrites | |
74 | yield (clone_subvolume, clone_subvolume, s_snapname) | |
75 | else: | |
522d829b | 76 | with open_at_group(fs_client, fs_handle, volspec, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume: |
adb31ebb TL |
77 | yield (clone_subvolume, source_subvolume, s_snapname) |
78 | ||
522d829b TL |
79 | def get_clone_state(fs_client, volspec, volname, groupname, subvolname): |
80 | with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: | |
92f5a8d4 TL |
81 | return subvolume.state |
82 | ||
522d829b TL |
83 | def set_clone_state(fs_client, volspec, volname, groupname, subvolname, state): |
84 | with open_at_volume(fs_client, volspec, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: | |
92f5a8d4 TL |
85 | subvolume.state = (state, True) |
86 | ||
87 | def get_clone_source(clone_subvolume): | |
88 | source = clone_subvolume._get_clone_source() | |
89 | return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot']) | |
90 | ||
adb31ebb TL |
91 | def get_next_state_on_error(errnum): |
92 | if errnum == -errno.EINTR: | |
93 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, | |
94 | SubvolumeStates.STATE_INPROGRESS, | |
95 | SubvolumeActions.ACTION_CANCELLED) | |
96 | else: | |
97 | # jump to failed state, on all other errors | |
98 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, | |
99 | SubvolumeStates.STATE_INPROGRESS, | |
100 | SubvolumeActions.ACTION_FAILED) | |
101 | return next_state | |
102 | ||
522d829b | 103 | def handle_clone_pending(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): |
92f5a8d4 | 104 | try: |
9f95a23c | 105 | if should_cancel(): |
adb31ebb TL |
106 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
107 | SubvolumeStates.STATE_PENDING, | |
108 | SubvolumeActions.ACTION_CANCELLED) | |
33c7a0ef TL |
109 | update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname, |
110 | VolumeException(-errno.EINTR, "user interrupted clone operation")) | |
9f95a23c | 111 | else: |
adb31ebb TL |
112 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
113 | SubvolumeStates.STATE_PENDING, | |
114 | SubvolumeActions.ACTION_SUCCESS) | |
92f5a8d4 | 115 | except OpSmException as oe: |
9f95a23c | 116 | raise VolumeException(oe.errno, oe.error_str) |
92f5a8d4 TL |
117 | return (next_state, False) |
118 | ||
9f95a23c TL |
119 | def sync_attrs(fs_handle, target_path, source_statx): |
120 | try: | |
121 | fs_handle.lchown(target_path, source_statx["uid"], source_statx["gid"]) | |
122 | fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()), | |
123 | time.mktime(source_statx["mtime"].timetuple()))) | |
f67539c2 | 124 | fs_handle.lchmod(target_path, source_statx["mode"]) |
9f95a23c | 125 | except cephfs.Error as e: |
e306af50 | 126 | log.warning("error synchronizing attrs for {0} ({1})".format(target_path, e)) |
9f95a23c TL |
127 | raise e |
128 | ||
92f5a8d4 TL |
129 | def bulk_copy(fs_handle, source_path, dst_path, should_cancel): |
130 | """ | |
131 | bulk copy data from source to destination -- only directories, symlinks | |
9f95a23c | 132 | and regular files are synced. |
92f5a8d4 TL |
133 | """ |
134 | log.info("copying data from {0} to {1}".format(source_path, dst_path)) | |
135 | def cptree(src_root_path, dst_root_path): | |
136 | log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path)) | |
137 | try: | |
138 | with fs_handle.opendir(src_root_path) as dir_handle: | |
139 | d = fs_handle.readdir(dir_handle) | |
9f95a23c | 140 | while d and not should_cancel(): |
92f5a8d4 TL |
141 | if d.d_name not in (b".", b".."): |
142 | log.debug("d={0}".format(d)) | |
143 | d_full_src = os.path.join(src_root_path, d.d_name) | |
144 | d_full_dst = os.path.join(dst_root_path, d.d_name) | |
9f95a23c TL |
145 | stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE | |
146 | cephfs.CEPH_STATX_UID | | |
147 | cephfs.CEPH_STATX_GID | | |
148 | cephfs.CEPH_STATX_ATIME | | |
149 | cephfs.CEPH_STATX_MTIME | | |
150 | cephfs.CEPH_STATX_SIZE, | |
151 | cephfs.AT_SYMLINK_NOFOLLOW) | |
152 | handled = True | |
153 | mo = stx["mode"] & ~stat.S_IFMT(stx["mode"]) | |
154 | if stat.S_ISDIR(stx["mode"]): | |
92f5a8d4 TL |
155 | log.debug("cptree: (DIR) {0}".format(d_full_src)) |
156 | try: | |
157 | fs_handle.mkdir(d_full_dst, mo) | |
92f5a8d4 TL |
158 | except cephfs.Error as e: |
159 | if not e.args[0] == errno.EEXIST: | |
160 | raise | |
161 | cptree(d_full_src, d_full_dst) | |
9f95a23c | 162 | elif stat.S_ISLNK(stx["mode"]): |
92f5a8d4 TL |
163 | log.debug("cptree: (SYMLINK) {0}".format(d_full_src)) |
164 | target = fs_handle.readlink(d_full_src, 4096) | |
165 | try: | |
9f95a23c | 166 | fs_handle.symlink(target[:stx["size"]], d_full_dst) |
92f5a8d4 TL |
167 | except cephfs.Error as e: |
168 | if not e.args[0] == errno.EEXIST: | |
169 | raise | |
9f95a23c | 170 | elif stat.S_ISREG(stx["mode"]): |
92f5a8d4 | 171 | log.debug("cptree: (REG) {0}".format(d_full_src)) |
9f95a23c | 172 | copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel) |
92f5a8d4 | 173 | else: |
9f95a23c | 174 | handled = False |
e306af50 | 175 | log.warning("cptree: (IGNORE) {0}".format(d_full_src)) |
9f95a23c TL |
176 | if handled: |
177 | sync_attrs(fs_handle, d_full_dst, stx) | |
92f5a8d4 | 178 | d = fs_handle.readdir(dir_handle) |
9f95a23c TL |
179 | stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME | |
180 | cephfs.CEPH_STATX_MTIME, | |
181 | cephfs.AT_SYMLINK_NOFOLLOW) | |
182 | fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()), | |
183 | time.mktime(stx_root["mtime"].timetuple()))) | |
92f5a8d4 TL |
184 | except cephfs.Error as e: |
185 | if not e.args[0] == errno.ENOENT: | |
186 | raise VolumeException(-e.args[0], e.args[1]) | |
187 | cptree(source_path, dst_path) | |
9f95a23c | 188 | if should_cancel(): |
33c7a0ef | 189 | raise VolumeException(-errno.EINTR, "user interrupted clone operation") |
92f5a8d4 | 190 | |
20effc67 | 191 | def set_quota_on_clone(fs_handle, clone_volumes_pair): |
20effc67 TL |
192 | src_path = clone_volumes_pair[1].snapshot_data_path(clone_volumes_pair[2]) |
193 | dst_path = clone_volumes_pair[0].path | |
1d09f67e | 194 | quota = None # type: Optional[int] |
20effc67 | 195 | try: |
1d09f67e | 196 | quota = int(fs_handle.getxattr(src_path, 'ceph.quota.max_bytes').decode('utf-8')) |
20effc67 | 197 | except cephfs.NoData: |
1d09f67e | 198 | pass |
20effc67 | 199 | |
1d09f67e TL |
200 | if quota is not None: |
201 | try: | |
202 | fs_handle.setxattr(dst_path, 'ceph.quota.max_bytes', str(quota).encode('utf-8'), 0) | |
203 | except cephfs.InvalidValue: | |
204 | raise VolumeException(-errno.EINVAL, "invalid size specified: '{0}'".format(quota)) | |
205 | except cephfs.Error as e: | |
206 | raise VolumeException(-e.args[0], e.args[1]) | |
207 | ||
208 | quota_files = None # type: Optional[int] | |
209 | try: | |
210 | quota_files = int(fs_handle.getxattr(src_path, 'ceph.quota.max_files').decode('utf-8')) | |
211 | except cephfs.NoData: | |
212 | pass | |
213 | ||
214 | if quota_files is not None: | |
215 | try: | |
216 | fs_handle.setxattr(dst_path, 'ceph.quota.max_files', str(quota_files).encode('utf-8'), 0) | |
217 | except cephfs.InvalidValue: | |
218 | raise VolumeException(-errno.EINVAL, "invalid file count specified: '{0}'".format(quota_files)) | |
219 | except cephfs.Error as e: | |
220 | raise VolumeException(-e.args[0], e.args[1]) | |
20effc67 | 221 | |
522d829b TL |
222 | def do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel): |
223 | with open_volume_lockless(fs_client, volname) as fs_handle: | |
224 | with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: | |
adb31ebb TL |
225 | src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2]) |
226 | dst_path = clone_volumes[0].path | |
227 | bulk_copy(fs_handle, src_path, dst_path, should_cancel) | |
20effc67 TL |
228 | set_quota_on_clone(fs_handle, clone_volumes) |
229 | ||
33c7a0ef TL |
230 | def update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname, ve): |
231 | with open_volume_lockless(fs_client, volname) as fs_handle: | |
232 | with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: | |
233 | if ve.errno == -errno.EINTR: | |
234 | clone_volumes[0].add_clone_failure(-ve.errno, "user interrupted clone operation") | |
235 | else: | |
236 | clone_volumes[0].add_clone_failure(-ve.errno, ve.error_str) | |
237 | ||
20effc67 TL |
238 | def log_clone_failure(volname, groupname, subvolname, ve): |
239 | if ve.errno == -errno.EINTR: | |
240 | log.info("Clone cancelled: ({0}, {1}, {2})".format(volname, groupname, subvolname)) | |
241 | elif ve.errno == -errno.EDQUOT: | |
242 | log.error("Clone failed: ({0}, {1}, {2}, reason -> Disk quota exceeded)".format(volname, groupname, subvolname)) | |
243 | else: | |
244 | log.error("Clone failed: ({0}, {1}, {2}, reason -> {3})".format(volname, groupname, subvolname, ve)) | |
92f5a8d4 | 245 | |
522d829b | 246 | def handle_clone_in_progress(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): |
92f5a8d4 | 247 | try: |
522d829b | 248 | do_clone(fs_client, volspec, volname, groupname, subvolname, should_cancel) |
adb31ebb TL |
249 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
250 | SubvolumeStates.STATE_INPROGRESS, | |
251 | SubvolumeActions.ACTION_SUCCESS) | |
92f5a8d4 | 252 | except VolumeException as ve: |
33c7a0ef | 253 | update_clone_failure_status(fs_client, volspec, volname, groupname, subvolname, ve) |
20effc67 | 254 | log_clone_failure(volname, groupname, subvolname, ve) |
adb31ebb | 255 | next_state = get_next_state_on_error(ve.errno) |
92f5a8d4 | 256 | except OpSmException as oe: |
9f95a23c | 257 | raise VolumeException(oe.errno, oe.error_str) |
92f5a8d4 TL |
258 | return (next_state, False) |
259 | ||
522d829b | 260 | def handle_clone_failed(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): |
92f5a8d4 | 261 | try: |
522d829b | 262 | with open_volume(fs_client, volname) as fs_handle: |
adb31ebb | 263 | # detach source but leave the clone section intact for later inspection |
522d829b | 264 | with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: |
adb31ebb | 265 | clone_volumes[1].detach_snapshot(clone_volumes[2], index) |
92f5a8d4 TL |
266 | except (MetadataMgrException, VolumeException) as e: |
267 | log.error("failed to detach clone from snapshot: {0}".format(e)) | |
268 | return (None, True) | |
269 | ||
522d829b | 270 | def handle_clone_complete(fs_client, volspec, volname, index, groupname, subvolname, should_cancel): |
92f5a8d4 | 271 | try: |
522d829b TL |
272 | with open_volume(fs_client, volname) as fs_handle: |
273 | with open_clone_subvolume_pair(fs_client, fs_handle, volspec, volname, groupname, subvolname) as clone_volumes: | |
adb31ebb TL |
274 | clone_volumes[1].detach_snapshot(clone_volumes[2], index) |
275 | clone_volumes[0].remove_clone_source(flush=True) | |
92f5a8d4 TL |
276 | except (MetadataMgrException, VolumeException) as e: |
277 | log.error("failed to detach clone from snapshot: {0}".format(e)) | |
278 | return (None, True) | |
279 | ||
522d829b | 280 | def start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay): |
92f5a8d4 TL |
281 | finished = False |
282 | current_state = None | |
283 | try: | |
522d829b | 284 | current_state = get_clone_state(fs_client, volspec, volname, groupname, subvolname) |
92f5a8d4 | 285 | log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state)) |
522d829b TL |
286 | if current_state == SubvolumeStates.STATE_PENDING: |
287 | time.sleep(snapshot_clone_delay) | |
288 | log.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname, groupname, subvolname, snapshot_clone_delay)) | |
92f5a8d4 TL |
289 | while not finished: |
290 | handler = state_table.get(current_state, None) | |
291 | if not handler: | |
292 | raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state)) | |
522d829b | 293 | (next_state, finished) = handler(fs_client, volspec, volname, index, groupname, subvolname, should_cancel) |
92f5a8d4 TL |
294 | if next_state: |
295 | log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\ | |
296 | current_state, next_state)) | |
522d829b | 297 | set_clone_state(fs_client, volspec, volname, groupname, subvolname, next_state) |
92f5a8d4 TL |
298 | current_state = next_state |
299 | except VolumeException as ve: | |
300 | log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\ | |
301 | subvolname, current_state, ve)) | |
302 | ||
522d829b | 303 | def clone(fs_client, volspec, volname, index, clone_path, state_table, should_cancel, snapshot_clone_delay): |
92f5a8d4 | 304 | log.info("cloning to subvolume path: {0}".format(clone_path)) |
522d829b | 305 | resolved = resolve(volspec, clone_path) |
92f5a8d4 TL |
306 | |
307 | groupname = resolved[0] | |
308 | subvolname = resolved[1] | |
309 | log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname)) | |
310 | ||
311 | try: | |
312 | log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) | |
522d829b | 313 | start_clone_sm(fs_client, volspec, volname, index, groupname, subvolname, state_table, should_cancel, snapshot_clone_delay) |
92f5a8d4 TL |
314 | log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) |
315 | except VolumeException as ve: | |
316 | log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve)) | |
317 | ||
318 | class Cloner(AsyncJobs): | |
319 | """ | |
320 | Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume. | |
adb31ebb | 321 | this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as |
92f5a8d4 TL |
322 | the driver. file types supported are directories, symbolic links and regular files. |
323 | """ | |
522d829b | 324 | def __init__(self, volume_client, tp_size, snapshot_clone_delay): |
92f5a8d4 | 325 | self.vc = volume_client |
522d829b | 326 | self.snapshot_clone_delay = snapshot_clone_delay |
92f5a8d4 | 327 | self.state_table = { |
adb31ebb TL |
328 | SubvolumeStates.STATE_PENDING : handle_clone_pending, |
329 | SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress, | |
330 | SubvolumeStates.STATE_COMPLETE : handle_clone_complete, | |
331 | SubvolumeStates.STATE_FAILED : handle_clone_failed, | |
332 | SubvolumeStates.STATE_CANCELED : handle_clone_failed, | |
92f5a8d4 TL |
333 | } |
334 | super(Cloner, self).__init__(volume_client, "cloner", tp_size) | |
335 | ||
f91f0fd5 | 336 | def reconfigure_max_concurrent_clones(self, tp_size): |
f67539c2 | 337 | return super(Cloner, self).reconfigure_max_async_threads(tp_size) |
f91f0fd5 | 338 | |
522d829b TL |
339 | def reconfigure_snapshot_clone_delay(self, timeout): |
340 | self.snapshot_clone_delay = timeout | |
341 | ||
9f95a23c | 342 | def is_clone_cancelable(self, clone_state): |
adb31ebb | 343 | return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) |
9f95a23c TL |
344 | |
345 | def get_clone_tracking_index(self, fs_handle, clone_subvolume): | |
346 | with open_clone_index(fs_handle, self.vc.volspec) as index: | |
347 | return index.find_clone_entry_index(clone_subvolume.base_path) | |
348 | ||
adb31ebb TL |
349 | def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx): |
350 | clone_state = SubvolumeStates.from_value(status['state']) | |
9f95a23c TL |
351 | assert self.is_clone_cancelable(clone_state) |
352 | ||
353 | s_groupname = status['source'].get('group', None) | |
354 | s_subvolname = status['source']['subvolume'] | |
355 | s_snapname = status['source']['snapshot'] | |
356 | ||
522d829b TL |
357 | with open_at_group_unique(self.fs_client, fs_handle, self.vc.volspec, s_groupname, s_subvolname, clone_subvolume, |
358 | clone_groupname, clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: | |
adb31ebb TL |
359 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
360 | clone_state, | |
361 | SubvolumeActions.ACTION_CANCELLED) | |
362 | clone_subvolume.state = (next_state, True) | |
33c7a0ef | 363 | clone_subvolume.add_clone_failure(errno.EINTR, "user interrupted clone operation") |
adb31ebb | 364 | s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8')) |
9f95a23c TL |
365 | |
366 | def cancel_job(self, volname, job): | |
367 | """ | |
368 | override base class `cancel_job`. interpret @job as (clone, group) tuple. | |
369 | """ | |
370 | clonename = job[0] | |
371 | groupname = job[1] | |
372 | track_idx = None | |
373 | ||
374 | try: | |
522d829b | 375 | with open_volume(self.fs_client, volname) as fs_handle: |
9f95a23c | 376 | with open_group(fs_handle, self.vc.volspec, groupname) as group: |
522d829b | 377 | with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: |
9f95a23c | 378 | status = clone_subvolume.status |
adb31ebb | 379 | clone_state = SubvolumeStates.from_value(status['state']) |
9f95a23c TL |
380 | if not self.is_clone_cancelable(clone_state): |
381 | raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") | |
382 | track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume) | |
383 | if not track_idx: | |
e306af50 | 384 | log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path)) |
9f95a23c | 385 | raise VolumeException(-errno.EINVAL, "error canceling clone") |
20effc67 TL |
386 | clone_job = (track_idx, clone_subvolume.base_path) |
387 | jobs = [j[0] for j in self.jobs[volname]] | |
388 | with lock_timeout_log(self.lock): | |
389 | if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state) and not clone_job in jobs: | |
390 | logging.debug("Cancelling pending job {0}".format(clone_job)) | |
391 | # clone has not started yet -- cancel right away. | |
392 | self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx) | |
393 | return | |
9f95a23c TL |
394 | # cancelling an on-going clone would persist "canceled" state in subvolume metadata. |
395 | # to persist the new state, async cloner accesses the volume in exclusive mode. | |
396 | # accessing the volume in exclusive mode here would lead to deadlock. | |
397 | assert track_idx is not None | |
f67539c2 | 398 | with lock_timeout_log(self.lock): |
522d829b | 399 | with open_volume_lockless(self.fs_client, volname) as fs_handle: |
9f95a23c | 400 | with open_group(fs_handle, self.vc.volspec, groupname) as group: |
522d829b | 401 | with open_subvol(self.fs_client.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: |
9f95a23c TL |
402 | if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)): |
403 | raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") | |
404 | except (IndexException, MetadataMgrException) as e: | |
405 | log.error("error cancelling clone {0}: ({1})".format(job, e)) | |
406 | raise VolumeException(-errno.EINVAL, "error canceling clone") | |
407 | ||
92f5a8d4 | 408 | def get_next_job(self, volname, running_jobs): |
522d829b | 409 | return get_next_clone_entry(self.fs_client, self.vc.volspec, volname, running_jobs) |
92f5a8d4 TL |
410 | |
411 | def execute_job(self, volname, job, should_cancel): | |
522d829b | 412 | clone(self.fs_client, self.vc.volspec, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel, self.snapshot_clone_delay) |