]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | import os |
2 | import stat | |
3 | import time | |
4 | import errno | |
5 | import logging | |
6 | from contextlib import contextmanager | |
7 | ||
8 | import cephfs | |
9 | ||
10 | from .async_job import AsyncJobs | |
11 | from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException | |
12 | from .fs_util import copy_file | |
13 | from .operations.op_sm import OpSm | |
14 | from .operations.resolver import resolve | |
15 | from .operations.volume import open_volume, open_volume_lockless | |
16 | from .operations.group import open_group | |
17 | from .operations.subvolume import open_subvol | |
18 | from .operations.clone_index import open_clone_index | |
19 | ||
20 | log = logging.getLogger(__name__) | |
21 | ||
22 | # helper for fetching a clone entry for a given volume | |
23 | def get_next_clone_entry(volume_client, volname, running_jobs): | |
24 | log.debug("fetching clone entry for volume '{0}'".format(volname)) | |
25 | ||
26 | try: | |
27 | with open_volume_lockless(volume_client, volname) as fs_handle: | |
28 | try: | |
29 | with open_clone_index(fs_handle, volume_client.volspec) as clone_index: | |
30 | job = clone_index.get_oldest_clone_entry(running_jobs) | |
31 | return 0, job | |
32 | except IndexException as ve: | |
33 | if ve.errno == -errno.ENOENT: | |
34 | return 0, None | |
35 | raise ve | |
36 | except VolumeException as ve: | |
9f95a23c | 37 | log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve)) |
92f5a8d4 | 38 | return ve.errno, None |
92f5a8d4 TL |
39 | |
40 | @contextmanager | |
41 | def open_at_volume(volume_client, volname, groupname, subvolname, need_complete=False, expected_types=[]): | |
42 | with open_volume(volume_client, volname) as fs_handle: | |
43 | with open_group(fs_handle, volume_client.volspec, groupname) as group: | |
44 | with open_subvol(fs_handle, volume_client.volspec, group, subvolname, | |
45 | need_complete, expected_types) as subvolume: | |
46 | yield subvolume | |
47 | ||
48 | @contextmanager | |
49 | def open_at_group(volume_client, fs_handle, groupname, subvolname, need_complete=False, expected_types=[]): | |
50 | with open_group(fs_handle, volume_client.volspec, groupname) as group: | |
51 | with open_subvol(fs_handle, volume_client.volspec, group, subvolname, | |
52 | need_complete, expected_types) as subvolume: | |
53 | yield subvolume | |
54 | ||
55 | def get_clone_state(volume_client, volname, groupname, subvolname): | |
56 | with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume: | |
57 | return subvolume.state | |
58 | ||
59 | def set_clone_state(volume_client, volname, groupname, subvolname, state): | |
60 | with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume: | |
61 | subvolume.state = (state, True) | |
62 | ||
63 | def get_clone_source(clone_subvolume): | |
64 | source = clone_subvolume._get_clone_source() | |
65 | return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot']) | |
66 | ||
67 | def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel): | |
68 | try: | |
9f95a23c TL |
69 | if should_cancel(): |
70 | next_state = OpSm.get_next_state("clone", "pending", -errno.EINTR) | |
71 | else: | |
72 | next_state = OpSm.get_next_state("clone", "pending", 0) | |
92f5a8d4 | 73 | except OpSmException as oe: |
9f95a23c | 74 | raise VolumeException(oe.errno, oe.error_str) |
92f5a8d4 TL |
75 | return (next_state, False) |
76 | ||
9f95a23c TL |
77 | def sync_attrs(fs_handle, target_path, source_statx): |
78 | try: | |
79 | fs_handle.lchown(target_path, source_statx["uid"], source_statx["gid"]) | |
80 | fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()), | |
81 | time.mktime(source_statx["mtime"].timetuple()))) | |
82 | except cephfs.Error as e: | |
83 | log.warn("error synchronizing attrs for {0} ({1})".format(target_path, e)) | |
84 | raise e | |
85 | ||
92f5a8d4 TL |
86 | def bulk_copy(fs_handle, source_path, dst_path, should_cancel): |
87 | """ | |
88 | bulk copy data from source to destination -- only directories, symlinks | |
9f95a23c | 89 | and regular files are synced. |
92f5a8d4 TL |
90 | """ |
91 | log.info("copying data from {0} to {1}".format(source_path, dst_path)) | |
92 | def cptree(src_root_path, dst_root_path): | |
93 | log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path)) | |
94 | try: | |
95 | with fs_handle.opendir(src_root_path) as dir_handle: | |
96 | d = fs_handle.readdir(dir_handle) | |
9f95a23c | 97 | while d and not should_cancel(): |
92f5a8d4 TL |
98 | if d.d_name not in (b".", b".."): |
99 | log.debug("d={0}".format(d)) | |
100 | d_full_src = os.path.join(src_root_path, d.d_name) | |
101 | d_full_dst = os.path.join(dst_root_path, d.d_name) | |
9f95a23c TL |
102 | stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE | |
103 | cephfs.CEPH_STATX_UID | | |
104 | cephfs.CEPH_STATX_GID | | |
105 | cephfs.CEPH_STATX_ATIME | | |
106 | cephfs.CEPH_STATX_MTIME | | |
107 | cephfs.CEPH_STATX_SIZE, | |
108 | cephfs.AT_SYMLINK_NOFOLLOW) | |
109 | handled = True | |
110 | mo = stx["mode"] & ~stat.S_IFMT(stx["mode"]) | |
111 | if stat.S_ISDIR(stx["mode"]): | |
92f5a8d4 TL |
112 | log.debug("cptree: (DIR) {0}".format(d_full_src)) |
113 | try: | |
114 | fs_handle.mkdir(d_full_dst, mo) | |
92f5a8d4 TL |
115 | except cephfs.Error as e: |
116 | if not e.args[0] == errno.EEXIST: | |
117 | raise | |
118 | cptree(d_full_src, d_full_dst) | |
9f95a23c | 119 | elif stat.S_ISLNK(stx["mode"]): |
92f5a8d4 TL |
120 | log.debug("cptree: (SYMLINK) {0}".format(d_full_src)) |
121 | target = fs_handle.readlink(d_full_src, 4096) | |
122 | try: | |
9f95a23c | 123 | fs_handle.symlink(target[:stx["size"]], d_full_dst) |
92f5a8d4 TL |
124 | except cephfs.Error as e: |
125 | if not e.args[0] == errno.EEXIST: | |
126 | raise | |
9f95a23c | 127 | elif stat.S_ISREG(stx["mode"]): |
92f5a8d4 | 128 | log.debug("cptree: (REG) {0}".format(d_full_src)) |
9f95a23c | 129 | copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel) |
92f5a8d4 | 130 | else: |
9f95a23c | 131 | handled = False |
92f5a8d4 | 132 | log.warn("cptree: (IGNORE) {0}".format(d_full_src)) |
9f95a23c TL |
133 | if handled: |
134 | sync_attrs(fs_handle, d_full_dst, stx) | |
92f5a8d4 | 135 | d = fs_handle.readdir(dir_handle) |
9f95a23c TL |
136 | stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME | |
137 | cephfs.CEPH_STATX_MTIME, | |
138 | cephfs.AT_SYMLINK_NOFOLLOW) | |
139 | fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()), | |
140 | time.mktime(stx_root["mtime"].timetuple()))) | |
92f5a8d4 TL |
141 | except cephfs.Error as e: |
142 | if not e.args[0] == errno.ENOENT: | |
143 | raise VolumeException(-e.args[0], e.args[1]) | |
144 | cptree(source_path, dst_path) | |
9f95a23c TL |
145 | if should_cancel(): |
146 | raise VolumeException(-errno.EINTR, "clone operation interrupted") | |
92f5a8d4 TL |
147 | |
148 | def do_clone(volume_client, volname, groupname, subvolname, should_cancel): | |
149 | with open_volume_lockless(volume_client, volname) as fs_handle: | |
150 | with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume: | |
151 | s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) | |
152 | with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume: | |
153 | src_path = source_subvolume.snapshot_path(s_snapname) | |
154 | dst_path = clone_subvolume.path | |
155 | bulk_copy(fs_handle, src_path, dst_path, should_cancel) | |
156 | ||
157 | def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel): | |
158 | try: | |
159 | do_clone(volume_client, volname, groupname, subvolname, should_cancel) | |
160 | next_state = OpSm.get_next_state("clone", "in-progress", 0) | |
161 | except VolumeException as ve: | |
162 | # jump to failed state | |
9f95a23c | 163 | next_state = OpSm.get_next_state("clone", "in-progress", ve.errno) |
92f5a8d4 | 164 | except OpSmException as oe: |
9f95a23c | 165 | raise VolumeException(oe.errno, oe.error_str) |
92f5a8d4 TL |
166 | return (next_state, False) |
167 | ||
168 | def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel): | |
169 | try: | |
170 | # detach source but leave the clone section intact for later inspection | |
171 | with open_volume(volume_client, volname) as fs_handle: | |
172 | with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume: | |
173 | s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) | |
174 | with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume: | |
175 | source_subvolume.detach_snapshot(s_snapname, index) | |
176 | except (MetadataMgrException, VolumeException) as e: | |
177 | log.error("failed to detach clone from snapshot: {0}".format(e)) | |
178 | return (None, True) | |
179 | ||
180 | def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel): | |
181 | try: | |
182 | with open_volume(volume_client, volname) as fs_handle: | |
183 | with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume: | |
184 | s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume) | |
185 | with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume: | |
186 | source_subvolume.detach_snapshot(s_snapname, index) | |
187 | clone_subvolume.remove_clone_source(flush=True) | |
188 | except (MetadataMgrException, VolumeException) as e: | |
189 | log.error("failed to detach clone from snapshot: {0}".format(e)) | |
190 | return (None, True) | |
191 | ||
192 | def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel): | |
193 | finished = False | |
194 | current_state = None | |
195 | try: | |
196 | current_state = get_clone_state(volume_client, volname, groupname, subvolname) | |
197 | log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state)) | |
198 | while not finished: | |
199 | handler = state_table.get(current_state, None) | |
200 | if not handler: | |
201 | raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state)) | |
202 | (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel) | |
203 | if next_state: | |
204 | log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\ | |
205 | current_state, next_state)) | |
206 | set_clone_state(volume_client, volname, groupname, subvolname, next_state) | |
207 | current_state = next_state | |
208 | except VolumeException as ve: | |
209 | log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\ | |
210 | subvolname, current_state, ve)) | |
211 | ||
212 | def clone(volume_client, volname, index, clone_path, state_table, should_cancel): | |
213 | log.info("cloning to subvolume path: {0}".format(clone_path)) | |
214 | resolved = resolve(volume_client.volspec, clone_path) | |
215 | ||
216 | groupname = resolved[0] | |
217 | subvolname = resolved[1] | |
218 | log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname)) | |
219 | ||
220 | try: | |
221 | log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) | |
222 | start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel) | |
223 | log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname)) | |
224 | except VolumeException as ve: | |
225 | log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve)) | |
226 | ||
227 | class Cloner(AsyncJobs): | |
228 | """ | |
229 | Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume. | |
230 | this relies on a simple state machine (which mimics states from OpSm class) as | |
231 | the driver. file types supported are directories, symbolic links and regular files. | |
232 | """ | |
233 | def __init__(self, volume_client, tp_size): | |
234 | self.vc = volume_client | |
235 | self.state_table = { | |
236 | 'pending' : handle_clone_pending, | |
237 | 'in-progress' : handle_clone_in_progress, | |
238 | 'complete' : handle_clone_complete, | |
9f95a23c TL |
239 | 'failed' : handle_clone_failed, |
240 | 'canceled' : handle_clone_failed, | |
92f5a8d4 TL |
241 | } |
242 | super(Cloner, self).__init__(volume_client, "cloner", tp_size) | |
243 | ||
9f95a23c TL |
244 | def is_clone_cancelable(self, clone_state): |
245 | return not (OpSm.is_final_state(clone_state) or OpSm.is_failed_state(clone_state)) | |
246 | ||
247 | def get_clone_tracking_index(self, fs_handle, clone_subvolume): | |
248 | with open_clone_index(fs_handle, self.vc.volspec) as index: | |
249 | return index.find_clone_entry_index(clone_subvolume.base_path) | |
250 | ||
251 | def _cancel_pending_clone(self, fs_handle, clone_subvolume, status, track_idx): | |
252 | clone_state = status['state'] | |
253 | assert self.is_clone_cancelable(clone_state) | |
254 | ||
255 | s_groupname = status['source'].get('group', None) | |
256 | s_subvolname = status['source']['subvolume'] | |
257 | s_snapname = status['source']['snapshot'] | |
258 | ||
259 | with open_group(fs_handle, self.vc.volspec, s_groupname) as s_group: | |
260 | with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname) as s_subvolume: | |
261 | next_state = OpSm.get_next_state("clone", clone_state, -errno.EINTR) | |
262 | clone_subvolume.state = (next_state, True) | |
263 | s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8')) | |
264 | ||
265 | def cancel_job(self, volname, job): | |
266 | """ | |
267 | override base class `cancel_job`. interpret @job as (clone, group) tuple. | |
268 | """ | |
269 | clonename = job[0] | |
270 | groupname = job[1] | |
271 | track_idx = None | |
272 | ||
273 | try: | |
274 | with open_volume(self.vc, volname) as fs_handle: | |
275 | with open_group(fs_handle, self.vc.volspec, groupname) as group: | |
276 | with open_subvol(fs_handle, self.vc.volspec, group, clonename, | |
277 | need_complete=False, expected_types=["clone"]) as clone_subvolume: | |
278 | status = clone_subvolume.status | |
279 | clone_state = status['state'] | |
280 | if not self.is_clone_cancelable(clone_state): | |
281 | raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") | |
282 | track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume) | |
283 | if not track_idx: | |
284 | log.warn("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path)) | |
285 | raise VolumeException(-errno.EINVAL, "error canceling clone") | |
286 | if OpSm.is_init_state("clone", clone_state): | |
287 | # clone has not started yet -- cancel right away. | |
288 | self._cancel_pending_clone(fs_handle, clone_subvolume, status, track_idx) | |
289 | return | |
290 | # cancelling an on-going clone would persist "canceled" state in subvolume metadata. | |
291 | # to persist the new state, async cloner accesses the volume in exclusive mode. | |
292 | # accessing the volume in exclusive mode here would lead to deadlock. | |
293 | assert track_idx is not None | |
294 | with self.lock: | |
295 | with open_volume_lockless(self.vc, volname) as fs_handle: | |
296 | with open_group(fs_handle, self.vc.volspec, groupname) as group: | |
297 | with open_subvol(fs_handle, self.vc.volspec, group, clonename, | |
298 | need_complete=False, expected_types=["clone"]) as clone_subvolume: | |
299 | if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)): | |
300 | raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)") | |
301 | except (IndexException, MetadataMgrException) as e: | |
302 | log.error("error cancelling clone {0}: ({1})".format(job, e)) | |
303 | raise VolumeException(-errno.EINVAL, "error canceling clone") | |
304 | ||
92f5a8d4 TL |
305 | def get_next_job(self, volname, running_jobs): |
306 | return get_next_clone_entry(self.vc, volname, running_jobs) | |
307 | ||
308 | def execute_job(self, volname, job, should_cancel): | |
309 | clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel) |