]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/async_cloner.py
61928ec2d0f4230774e341422b84ae8bd09877b6
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_cloner.py
1 import os
2 import stat
3 import time
4 import errno
5 import logging
6 from contextlib import contextmanager
7
8 import cephfs
9
10 from .async_job import AsyncJobs
11 from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
12 from .fs_util import copy_file
13 from .operations.versions.op_sm import SubvolumeOpSm
14 from .operations.versions.subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions
15 from .operations.resolver import resolve
16 from .operations.volume import open_volume, open_volume_lockless
17 from .operations.group import open_group
18 from .operations.subvolume import open_subvol
19 from .operations.clone_index import open_clone_index
20 from .operations.template import SubvolumeOpType
21
22 log = logging.getLogger(__name__)
23
24 # helper for fetching a clone entry for a given volume
25 def get_next_clone_entry(volume_client, volname, running_jobs):
26 log.debug("fetching clone entry for volume '{0}'".format(volname))
27
28 try:
29 with open_volume_lockless(volume_client, volname) as fs_handle:
30 try:
31 with open_clone_index(fs_handle, volume_client.volspec) as clone_index:
32 job = clone_index.get_oldest_clone_entry(running_jobs)
33 return 0, job
34 except IndexException as ve:
35 if ve.errno == -errno.ENOENT:
36 return 0, None
37 raise ve
38 except VolumeException as ve:
39 log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
40 return ve.errno, None
41
42 @contextmanager
43 def open_at_volume(volume_client, volname, groupname, subvolname, op_type):
44 with open_volume(volume_client, volname) as fs_handle:
45 with open_group(fs_handle, volume_client.volspec, groupname) as group:
46 with open_subvol(fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
47 yield subvolume
48
49 @contextmanager
50 def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type):
51 with open_group(fs_handle, volume_client.volspec, groupname) as group:
52 with open_subvol(fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
53 yield subvolume
54
55 @contextmanager
56 def open_at_group_unique(volume_client, fs_handle, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type):
57 # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return
58 # the clone subvolume as the source subvolume
59 if s_groupname == c_groupname and s_subvolname == c_subvolname:
60 yield c_subvolume
61 else:
62 with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, op_type) as s_subvolume:
63 yield s_subvolume
64
65
66 @contextmanager
67 def open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname):
68 with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
69 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
70 if groupname == s_groupname and subvolname == s_subvolname:
71 # use the same subvolume to avoid metadata overwrites
72 yield (clone_subvolume, clone_subvolume, s_snapname)
73 else:
74 with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
75 yield (clone_subvolume, source_subvolume, s_snapname)
76
77 def get_clone_state(volume_client, volname, groupname, subvolname):
78 with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
79 return subvolume.state
80
81 def set_clone_state(volume_client, volname, groupname, subvolname, state):
82 with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
83 subvolume.state = (state, True)
84
85 def get_clone_source(clone_subvolume):
86 source = clone_subvolume._get_clone_source()
87 return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot'])
88
89 def get_next_state_on_error(errnum):
90 if errnum == -errno.EINTR:
91 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
92 SubvolumeStates.STATE_INPROGRESS,
93 SubvolumeActions.ACTION_CANCELLED)
94 else:
95 # jump to failed state, on all other errors
96 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
97 SubvolumeStates.STATE_INPROGRESS,
98 SubvolumeActions.ACTION_FAILED)
99 return next_state
100
101 def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel):
102 try:
103 if should_cancel():
104 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
105 SubvolumeStates.STATE_PENDING,
106 SubvolumeActions.ACTION_CANCELLED)
107 else:
108 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
109 SubvolumeStates.STATE_PENDING,
110 SubvolumeActions.ACTION_SUCCESS)
111 except OpSmException as oe:
112 raise VolumeException(oe.errno, oe.error_str)
113 return (next_state, False)
114
115 def sync_attrs(fs_handle, target_path, source_statx):
116 try:
117 fs_handle.lchown(target_path, source_statx["uid"], source_statx["gid"])
118 fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()),
119 time.mktime(source_statx["mtime"].timetuple())))
120 except cephfs.Error as e:
121 log.warning("error synchronizing attrs for {0} ({1})".format(target_path, e))
122 raise e
123
124 def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
125 """
126 bulk copy data from source to destination -- only directories, symlinks
127 and regular files are synced.
128 """
129 log.info("copying data from {0} to {1}".format(source_path, dst_path))
130 def cptree(src_root_path, dst_root_path):
131 log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path))
132 try:
133 with fs_handle.opendir(src_root_path) as dir_handle:
134 d = fs_handle.readdir(dir_handle)
135 while d and not should_cancel():
136 if d.d_name not in (b".", b".."):
137 log.debug("d={0}".format(d))
138 d_full_src = os.path.join(src_root_path, d.d_name)
139 d_full_dst = os.path.join(dst_root_path, d.d_name)
140 stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE |
141 cephfs.CEPH_STATX_UID |
142 cephfs.CEPH_STATX_GID |
143 cephfs.CEPH_STATX_ATIME |
144 cephfs.CEPH_STATX_MTIME |
145 cephfs.CEPH_STATX_SIZE,
146 cephfs.AT_SYMLINK_NOFOLLOW)
147 handled = True
148 mo = stx["mode"] & ~stat.S_IFMT(stx["mode"])
149 if stat.S_ISDIR(stx["mode"]):
150 log.debug("cptree: (DIR) {0}".format(d_full_src))
151 try:
152 fs_handle.mkdir(d_full_dst, mo)
153 except cephfs.Error as e:
154 if not e.args[0] == errno.EEXIST:
155 raise
156 cptree(d_full_src, d_full_dst)
157 elif stat.S_ISLNK(stx["mode"]):
158 log.debug("cptree: (SYMLINK) {0}".format(d_full_src))
159 target = fs_handle.readlink(d_full_src, 4096)
160 try:
161 fs_handle.symlink(target[:stx["size"]], d_full_dst)
162 except cephfs.Error as e:
163 if not e.args[0] == errno.EEXIST:
164 raise
165 elif stat.S_ISREG(stx["mode"]):
166 log.debug("cptree: (REG) {0}".format(d_full_src))
167 copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel)
168 else:
169 handled = False
170 log.warning("cptree: (IGNORE) {0}".format(d_full_src))
171 if handled:
172 sync_attrs(fs_handle, d_full_dst, stx)
173 d = fs_handle.readdir(dir_handle)
174 stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME |
175 cephfs.CEPH_STATX_MTIME,
176 cephfs.AT_SYMLINK_NOFOLLOW)
177 fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()),
178 time.mktime(stx_root["mtime"].timetuple())))
179 except cephfs.Error as e:
180 if not e.args[0] == errno.ENOENT:
181 raise VolumeException(-e.args[0], e.args[1])
182 cptree(source_path, dst_path)
183 if should_cancel():
184 raise VolumeException(-errno.EINTR, "clone operation interrupted")
185
186 def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
187 with open_volume_lockless(volume_client, volname) as fs_handle:
188 with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
189 src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2])
190 dst_path = clone_volumes[0].path
191 bulk_copy(fs_handle, src_path, dst_path, should_cancel)
192
193 def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
194 try:
195 do_clone(volume_client, volname, groupname, subvolname, should_cancel)
196 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
197 SubvolumeStates.STATE_INPROGRESS,
198 SubvolumeActions.ACTION_SUCCESS)
199 except VolumeException as ve:
200 next_state = get_next_state_on_error(ve.errno)
201 except OpSmException as oe:
202 raise VolumeException(oe.errno, oe.error_str)
203 return (next_state, False)
204
205 def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel):
206 try:
207 with open_volume(volume_client, volname) as fs_handle:
208 # detach source but leave the clone section intact for later inspection
209 with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
210 clone_volumes[1].detach_snapshot(clone_volumes[2], index)
211 except (MetadataMgrException, VolumeException) as e:
212 log.error("failed to detach clone from snapshot: {0}".format(e))
213 return (None, True)
214
215 def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
216 try:
217 with open_volume(volume_client, volname) as fs_handle:
218 with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
219 clone_volumes[1].detach_snapshot(clone_volumes[2], index)
220 clone_volumes[0].remove_clone_source(flush=True)
221 except (MetadataMgrException, VolumeException) as e:
222 log.error("failed to detach clone from snapshot: {0}".format(e))
223 return (None, True)
224
225 def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
226 finished = False
227 current_state = None
228 try:
229 current_state = get_clone_state(volume_client, volname, groupname, subvolname)
230 log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
231 while not finished:
232 handler = state_table.get(current_state, None)
233 if not handler:
234 raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state))
235 (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel)
236 if next_state:
237 log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\
238 current_state, next_state))
239 set_clone_state(volume_client, volname, groupname, subvolname, next_state)
240 current_state = next_state
241 except VolumeException as ve:
242 log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
243 subvolname, current_state, ve))
244
245 def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
246 log.info("cloning to subvolume path: {0}".format(clone_path))
247 resolved = resolve(volume_client.volspec, clone_path)
248
249 groupname = resolved[0]
250 subvolname = resolved[1]
251 log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname))
252
253 try:
254 log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
255 start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
256 log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
257 except VolumeException as ve:
258 log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
259
260 class Cloner(AsyncJobs):
261 """
262 Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
263 this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
264 the driver. file types supported are directories, symbolic links and regular files.
265 """
266 def __init__(self, volume_client, tp_size):
267 self.vc = volume_client
268 self.state_table = {
269 SubvolumeStates.STATE_PENDING : handle_clone_pending,
270 SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
271 SubvolumeStates.STATE_COMPLETE : handle_clone_complete,
272 SubvolumeStates.STATE_FAILED : handle_clone_failed,
273 SubvolumeStates.STATE_CANCELED : handle_clone_failed,
274 }
275 super(Cloner, self).__init__(volume_client, "cloner", tp_size)
276
277 def reconfigure_max_concurrent_clones(self, tp_size):
278 super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
279
280 def is_clone_cancelable(self, clone_state):
281 return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
282
283 def get_clone_tracking_index(self, fs_handle, clone_subvolume):
284 with open_clone_index(fs_handle, self.vc.volspec) as index:
285 return index.find_clone_entry_index(clone_subvolume.base_path)
286
287 def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx):
288 clone_state = SubvolumeStates.from_value(status['state'])
289 assert self.is_clone_cancelable(clone_state)
290
291 s_groupname = status['source'].get('group', None)
292 s_subvolname = status['source']['subvolume']
293 s_snapname = status['source']['snapshot']
294
295 with open_at_group_unique(self.vc, fs_handle, s_groupname, s_subvolname, clone_subvolume, clone_groupname,
296 clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
297 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
298 clone_state,
299 SubvolumeActions.ACTION_CANCELLED)
300 clone_subvolume.state = (next_state, True)
301 s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
302
303 def cancel_job(self, volname, job):
304 """
305 override base class `cancel_job`. interpret @job as (clone, group) tuple.
306 """
307 clonename = job[0]
308 groupname = job[1]
309 track_idx = None
310
311 try:
312 with open_volume(self.vc, volname) as fs_handle:
313 with open_group(fs_handle, self.vc.volspec, groupname) as group:
314 with open_subvol(fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
315 status = clone_subvolume.status
316 clone_state = SubvolumeStates.from_value(status['state'])
317 if not self.is_clone_cancelable(clone_state):
318 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
319 track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume)
320 if not track_idx:
321 log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path))
322 raise VolumeException(-errno.EINVAL, "error canceling clone")
323 if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state):
324 # clone has not started yet -- cancel right away.
325 self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx)
326 return
327 # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
328 # to persist the new state, async cloner accesses the volume in exclusive mode.
329 # accessing the volume in exclusive mode here would lead to deadlock.
330 assert track_idx is not None
331 with self.lock:
332 with open_volume_lockless(self.vc, volname) as fs_handle:
333 with open_group(fs_handle, self.vc.volspec, groupname) as group:
334 with open_subvol(fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
335 if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
336 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
337 except (IndexException, MetadataMgrException) as e:
338 log.error("error cancelling clone {0}: ({1})".format(job, e))
339 raise VolumeException(-errno.EINVAL, "error canceling clone")
340
341 def get_next_job(self, volname, running_jobs):
342 return get_next_clone_entry(self.vc, volname, running_jobs)
343
344 def execute_job(self, volname, job, should_cancel):
345 clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)