]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | import os |
2 | import stat | |
3 | import time | |
4 | import errno | |
5 | import logging | |
6 | from contextlib import contextmanager | |
7 | ||
8 | import cephfs | |
f67539c2 | 9 | from mgr_util import lock_timeout_log |
92f5a8d4 TL |
10 | |
11 | from .async_job import AsyncJobs | |
12 | from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException | |
13 | from .fs_util import copy_file | |
adb31ebb TL |
14 | from .operations.versions.op_sm import SubvolumeOpSm |
15 | from .operations.versions.subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions | |
92f5a8d4 TL |
16 | from .operations.resolver import resolve |
17 | from .operations.volume import open_volume, open_volume_lockless | |
18 | from .operations.group import open_group | |
19 | from .operations.subvolume import open_subvol | |
20 | from .operations.clone_index import open_clone_index | |
adb31ebb | 21 | from .operations.template import SubvolumeOpType |
92f5a8d4 TL |
22 | |
23 | log = logging.getLogger(__name__) | |
24 | ||
25 | # helper for fetching a clone entry for a given volume | |
26 | def get_next_clone_entry(volume_client, volname, running_jobs): | |
27 | log.debug("fetching clone entry for volume '{0}'".format(volname)) | |
28 | ||
29 | try: | |
30 | with open_volume_lockless(volume_client, volname) as fs_handle: | |
31 | try: | |
32 | with open_clone_index(fs_handle, volume_client.volspec) as clone_index: | |
33 | job = clone_index.get_oldest_clone_entry(running_jobs) | |
34 | return 0, job | |
35 | except IndexException as ve: | |
36 | if ve.errno == -errno.ENOENT: | |
37 | return 0, None | |
38 | raise ve | |
39 | except VolumeException as ve: | |
9f95a23c | 40 | log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve)) |
92f5a8d4 | 41 | return ve.errno, None |
92f5a8d4 TL |
42 | |
43 | @contextmanager | |
adb31ebb | 44 | def open_at_volume(volume_client, volname, groupname, subvolname, op_type): |
92f5a8d4 TL |
45 | with open_volume(volume_client, volname) as fs_handle: |
46 | with open_group(fs_handle, volume_client.volspec, groupname) as group: | |
cd265ab1 | 47 | with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume: |
92f5a8d4 TL |
48 | yield subvolume |
49 | ||
50 | @contextmanager | |
adb31ebb | 51 | def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type): |
92f5a8d4 | 52 | with open_group(fs_handle, volume_client.volspec, groupname) as group: |
cd265ab1 | 53 | with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume: |
92f5a8d4 TL |
54 | yield subvolume |
55 | ||
adb31ebb TL |
56 | @contextmanager |
57 | def open_at_group_unique(volume_client, fs_handle, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type): | |
58 | # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return | |
59 | # the clone subvolume as the source subvolume | |
60 | if s_groupname == c_groupname and s_subvolname == c_subvolname: | |
61 | yield c_subvolume | |
62 | else: | |
63 | with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, op_type) as s_subvolume: | |
64 | yield s_subvolume | |
65 | ||
66 | ||
67 | @contextmanager | |
68 | def open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname): | |
69 | with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume: | |
70 | s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) | |
71 | if groupname == s_groupname and subvolname == s_subvolname: | |
72 | # use the same subvolume to avoid metadata overwrites | |
73 | yield (clone_subvolume, clone_subvolume, s_snapname) | |
74 | else: | |
75 | with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume: | |
76 | yield (clone_subvolume, source_subvolume, s_snapname) | |
77 | ||
92f5a8d4 | 78 | def get_clone_state(volume_client, volname, groupname, subvolname): |
adb31ebb | 79 | with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: |
92f5a8d4 TL |
80 | return subvolume.state |
81 | ||
82 | def set_clone_state(volume_client, volname, groupname, subvolname, state): | |
adb31ebb | 83 | with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume: |
92f5a8d4 TL |
84 | subvolume.state = (state, True) |
85 | ||
86 | def get_clone_source(clone_subvolume): | |
87 | source = clone_subvolume._get_clone_source() | |
88 | return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot']) | |
89 | ||
adb31ebb TL |
90 | def get_next_state_on_error(errnum): |
91 | if errnum == -errno.EINTR: | |
92 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, | |
93 | SubvolumeStates.STATE_INPROGRESS, | |
94 | SubvolumeActions.ACTION_CANCELLED) | |
95 | else: | |
96 | # jump to failed state, on all other errors | |
97 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, | |
98 | SubvolumeStates.STATE_INPROGRESS, | |
99 | SubvolumeActions.ACTION_FAILED) | |
100 | return next_state | |
101 | ||
92f5a8d4 TL |
102 | def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel): |
103 | try: | |
9f95a23c | 104 | if should_cancel(): |
adb31ebb TL |
105 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
106 | SubvolumeStates.STATE_PENDING, | |
107 | SubvolumeActions.ACTION_CANCELLED) | |
9f95a23c | 108 | else: |
adb31ebb TL |
109 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
110 | SubvolumeStates.STATE_PENDING, | |
111 | SubvolumeActions.ACTION_SUCCESS) | |
92f5a8d4 | 112 | except OpSmException as oe: |
9f95a23c | 113 | raise VolumeException(oe.errno, oe.error_str) |
92f5a8d4 TL |
114 | return (next_state, False) |
115 | ||
9f95a23c TL |
116 | def sync_attrs(fs_handle, target_path, source_statx): |
117 | try: | |
118 | fs_handle.lchown(target_path, source_statx["uid"], source_statx["gid"]) | |
119 | fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()), | |
120 | time.mktime(source_statx["mtime"].timetuple()))) | |
f67539c2 | 121 | fs_handle.lchmod(target_path, source_statx["mode"]) |
9f95a23c | 122 | except cephfs.Error as e: |
e306af50 | 123 | log.warning("error synchronizing attrs for {0} ({1})".format(target_path, e)) |
9f95a23c TL |
124 | raise e |
125 | ||
92f5a8d4 TL |
126 | def bulk_copy(fs_handle, source_path, dst_path, should_cancel): |
127 | """ | |
128 | bulk copy data from source to destination -- only directories, symlinks | |
9f95a23c | 129 | and regular files are synced. |
92f5a8d4 TL |
130 | """ |
131 | log.info("copying data from {0} to {1}".format(source_path, dst_path)) | |
132 | def cptree(src_root_path, dst_root_path): | |
133 | log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path)) | |
134 | try: | |
135 | with fs_handle.opendir(src_root_path) as dir_handle: | |
136 | d = fs_handle.readdir(dir_handle) | |
9f95a23c | 137 | while d and not should_cancel(): |
92f5a8d4 TL |
138 | if d.d_name not in (b".", b".."): |
139 | log.debug("d={0}".format(d)) | |
140 | d_full_src = os.path.join(src_root_path, d.d_name) | |
141 | d_full_dst = os.path.join(dst_root_path, d.d_name) | |
9f95a23c TL |
142 | stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE | |
143 | cephfs.CEPH_STATX_UID | | |
144 | cephfs.CEPH_STATX_GID | | |
145 | cephfs.CEPH_STATX_ATIME | | |
146 | cephfs.CEPH_STATX_MTIME | | |
147 | cephfs.CEPH_STATX_SIZE, | |
148 | cephfs.AT_SYMLINK_NOFOLLOW) | |
149 | handled = True | |
150 | mo = stx["mode"] & ~stat.S_IFMT(stx["mode"]) | |
151 | if stat.S_ISDIR(stx["mode"]): | |
92f5a8d4 TL |
152 | log.debug("cptree: (DIR) {0}".format(d_full_src)) |
153 | try: | |
154 | fs_handle.mkdir(d_full_dst, mo) | |
92f5a8d4 TL |
155 | except cephfs.Error as e: |
156 | if not e.args[0] == errno.EEXIST: | |
157 | raise | |
158 | cptree(d_full_src, d_full_dst) | |
9f95a23c | 159 | elif stat.S_ISLNK(stx["mode"]): |
92f5a8d4 TL |
160 | log.debug("cptree: (SYMLINK) {0}".format(d_full_src)) |
161 | target = fs_handle.readlink(d_full_src, 4096) | |
162 | try: | |
9f95a23c | 163 | fs_handle.symlink(target[:stx["size"]], d_full_dst) |
92f5a8d4 TL |
164 | except cephfs.Error as e: |
165 | if not e.args[0] == errno.EEXIST: | |
166 | raise | |
9f95a23c | 167 | elif stat.S_ISREG(stx["mode"]): |
92f5a8d4 | 168 | log.debug("cptree: (REG) {0}".format(d_full_src)) |
9f95a23c | 169 | copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel) |
92f5a8d4 | 170 | else: |
9f95a23c | 171 | handled = False |
e306af50 | 172 | log.warning("cptree: (IGNORE) {0}".format(d_full_src)) |
9f95a23c TL |
173 | if handled: |
174 | sync_attrs(fs_handle, d_full_dst, stx) | |
92f5a8d4 | 175 | d = fs_handle.readdir(dir_handle) |
9f95a23c TL |
176 | stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME | |
177 | cephfs.CEPH_STATX_MTIME, | |
178 | cephfs.AT_SYMLINK_NOFOLLOW) | |
179 | fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()), | |
180 | time.mktime(stx_root["mtime"].timetuple()))) | |
92f5a8d4 TL |
181 | except cephfs.Error as e: |
182 | if not e.args[0] == errno.ENOENT: | |
183 | raise VolumeException(-e.args[0], e.args[1]) | |
184 | cptree(source_path, dst_path) | |
9f95a23c TL |
185 | if should_cancel(): |
186 | raise VolumeException(-errno.EINTR, "clone operation interrupted") | |
92f5a8d4 TL |
187 | |
188 | def do_clone(volume_client, volname, groupname, subvolname, should_cancel): | |
189 | with open_volume_lockless(volume_client, volname) as fs_handle: | |
adb31ebb TL |
190 | with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes: |
191 | src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2]) | |
192 | dst_path = clone_volumes[0].path | |
193 | bulk_copy(fs_handle, src_path, dst_path, should_cancel) | |
92f5a8d4 TL |
194 | |
195 | def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel): | |
196 | try: | |
197 | do_clone(volume_client, volname, groupname, subvolname, should_cancel) | |
adb31ebb TL |
198 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, |
199 | SubvolumeStates.STATE_INPROGRESS, | |
200 | SubvolumeActions.ACTION_SUCCESS) | |
92f5a8d4 | 201 | except VolumeException as ve: |
adb31ebb | 202 | next_state = get_next_state_on_error(ve.errno) |
92f5a8d4 | 203 | except OpSmException as oe: |
9f95a23c | 204 | raise VolumeException(oe.errno, oe.error_str) |
92f5a8d4 TL |
205 | return (next_state, False) |
206 | ||
207 | def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel): | |
208 | try: | |
92f5a8d4 | 209 | with open_volume(volume_client, volname) as fs_handle: |
adb31ebb TL |
210 | # detach source but leave the clone section intact for later inspection |
211 | with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes: | |
212 | clone_volumes[1].detach_snapshot(clone_volumes[2], index) | |
92f5a8d4 TL |
213 | except (MetadataMgrException, VolumeException) as e: |
214 | log.error("failed to detach clone from snapshot: {0}".format(e)) | |
215 | return (None, True) | |
216 | ||
217 | def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel): | |
218 | try: | |
219 | with open_volume(volume_client, volname) as fs_handle: | |
adb31ebb TL |
220 | with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes: |
221 | clone_volumes[1].detach_snapshot(clone_volumes[2], index) | |
222 | clone_volumes[0].remove_clone_source(flush=True) | |
92f5a8d4 TL |
223 | except (MetadataMgrException, VolumeException) as e: |
224 | log.error("failed to detach clone from snapshot: {0}".format(e)) | |
225 | return (None, True) | |
226 | ||
227 | def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel): | |
228 | finished = False | |
229 | current_state = None | |
230 | try: | |
231 | current_state = get_clone_state(volume_client, volname, groupname, subvolname) | |
232 | log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state)) | |
233 | while not finished: | |
234 | handler = state_table.get(current_state, None) | |
235 | if not handler: | |
236 | raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state)) | |
237 | (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel) | |
238 | if next_state: | |
239 | log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\ | |
240 | current_state, next_state)) | |
241 | set_clone_state(volume_client, volname, groupname, subvolname, next_state) | |
242 | current_state = next_state | |
243 | except VolumeException as ve: | |
244 | log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\ | |
245 | subvolname, current_state, ve)) | |
246 | ||
247 | def clone(volume_client, volname, index, clone_path, state_table, should_cancel): | |
248 | log.info("cloning to subvolume path: {0}".format(clone_path)) | |
249 | resolved = resolve(volume_client.volspec, clone_path) | |
250 | ||
251 | groupname = resolved[0] | |
252 | subvolname = resolved[1] | |
253 | log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname)) | |
254 | ||
255 | try: | |
256 | log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) | |
257 | start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel) | |
258 | log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) | |
259 | except VolumeException as ve: | |
260 | log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve)) | |
261 | ||
262 | class Cloner(AsyncJobs): | |
263 | """ | |
264 | Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume. | |
adb31ebb | 265 | this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as |
92f5a8d4 TL |
266 | the driver. file types supported are directories, symbolic links and regular files. |
267 | """ | |
268 | def __init__(self, volume_client, tp_size): | |
269 | self.vc = volume_client | |
270 | self.state_table = { | |
adb31ebb TL |
271 | SubvolumeStates.STATE_PENDING : handle_clone_pending, |
272 | SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress, | |
273 | SubvolumeStates.STATE_COMPLETE : handle_clone_complete, | |
274 | SubvolumeStates.STATE_FAILED : handle_clone_failed, | |
275 | SubvolumeStates.STATE_CANCELED : handle_clone_failed, | |
92f5a8d4 TL |
276 | } |
277 | super(Cloner, self).__init__(volume_client, "cloner", tp_size) | |
278 | ||
f91f0fd5 | 279 | def reconfigure_max_concurrent_clones(self, tp_size): |
f67539c2 | 280 | return super(Cloner, self).reconfigure_max_async_threads(tp_size) |
f91f0fd5 | 281 | |
9f95a23c | 282 | def is_clone_cancelable(self, clone_state): |
adb31ebb | 283 | return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state)) |
9f95a23c TL |
284 | |
285 | def get_clone_tracking_index(self, fs_handle, clone_subvolume): | |
286 | with open_clone_index(fs_handle, self.vc.volspec) as index: | |
287 | return index.find_clone_entry_index(clone_subvolume.base_path) | |
288 | ||
adb31ebb TL |
289 | def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx): |
290 | clone_state = SubvolumeStates.from_value(status['state']) | |
9f95a23c TL |
291 | assert self.is_clone_cancelable(clone_state) |
292 | ||
293 | s_groupname = status['source'].get('group', None) | |
294 | s_subvolname = status['source']['subvolume'] | |
295 | s_snapname = status['source']['snapshot'] | |
296 | ||
adb31ebb TL |
297 | with open_at_group_unique(self.vc, fs_handle, s_groupname, s_subvolname, clone_subvolume, clone_groupname, |
298 | clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume: | |
299 | next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE, | |
300 | clone_state, | |
301 | SubvolumeActions.ACTION_CANCELLED) | |
302 | clone_subvolume.state = (next_state, True) | |
303 | s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8')) | |
9f95a23c TL |
304 | |
305 | def cancel_job(self, volname, job): | |
306 | """ | |
307 | override base class `cancel_job`. interpret @job as (clone, group) tuple. | |
308 | """ | |
309 | clonename = job[0] | |
310 | groupname = job[1] | |
311 | track_idx = None | |
312 | ||
313 | try: | |
314 | with open_volume(self.vc, volname) as fs_handle: | |
315 | with open_group(fs_handle, self.vc.volspec, groupname) as group: | |
cd265ab1 | 316 | with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: |
9f95a23c | 317 | status = clone_subvolume.status |
adb31ebb | 318 | clone_state = SubvolumeStates.from_value(status['state']) |
9f95a23c TL |
319 | if not self.is_clone_cancelable(clone_state): |
320 | raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") | |
321 | track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume) | |
322 | if not track_idx: | |
e306af50 | 323 | log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path)) |
9f95a23c | 324 | raise VolumeException(-errno.EINVAL, "error canceling clone") |
adb31ebb | 325 | if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state): |
9f95a23c | 326 | # clone has not started yet -- cancel right away. |
adb31ebb | 327 | self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx) |
9f95a23c TL |
328 | return |
329 | # cancelling an on-going clone would persist "canceled" state in subvolume metadata. | |
330 | # to persist the new state, async cloner accesses the volume in exclusive mode. | |
331 | # accessing the volume in exclusive mode here would lead to deadlock. | |
332 | assert track_idx is not None | |
f67539c2 | 333 | with lock_timeout_log(self.lock): |
9f95a23c TL |
334 | with open_volume_lockless(self.vc, volname) as fs_handle: |
335 | with open_group(fs_handle, self.vc.volspec, groupname) as group: | |
cd265ab1 | 336 | with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume: |
9f95a23c TL |
337 | if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)): |
338 | raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") | |
339 | except (IndexException, MetadataMgrException) as e: | |
340 | log.error("error cancelling clone {0}: ({1})".format(job, e)) | |
341 | raise VolumeException(-errno.EINVAL, "error canceling clone") | |
342 | ||
92f5a8d4 TL |
343 | def get_next_job(self, volname, running_jobs): |
344 | return get_next_clone_entry(self.vc, volname, running_jobs) | |
345 | ||
346 | def execute_job(self, volname, job, should_cancel): | |
347 | clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel) |