]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/volumes/fs/async_cloner.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_cloner.py
CommitLineData
92f5a8d4
TL
1import os
2import stat
3import time
4import errno
5import logging
6from contextlib import contextmanager
7
8import cephfs
f67539c2 9from mgr_util import lock_timeout_log
92f5a8d4
TL
10
11from .async_job import AsyncJobs
12from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
13from .fs_util import copy_file
adb31ebb
TL
14from .operations.versions.op_sm import SubvolumeOpSm
15from .operations.versions.subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeActions
92f5a8d4
TL
16from .operations.resolver import resolve
17from .operations.volume import open_volume, open_volume_lockless
18from .operations.group import open_group
19from .operations.subvolume import open_subvol
20from .operations.clone_index import open_clone_index
adb31ebb 21from .operations.template import SubvolumeOpType
92f5a8d4
TL
22
23log = logging.getLogger(__name__)
24
25# helper for fetching a clone entry for a given volume
26def get_next_clone_entry(volume_client, volname, running_jobs):
27 log.debug("fetching clone entry for volume '{0}'".format(volname))
28
29 try:
30 with open_volume_lockless(volume_client, volname) as fs_handle:
31 try:
32 with open_clone_index(fs_handle, volume_client.volspec) as clone_index:
33 job = clone_index.get_oldest_clone_entry(running_jobs)
34 return 0, job
35 except IndexException as ve:
36 if ve.errno == -errno.ENOENT:
37 return 0, None
38 raise ve
39 except VolumeException as ve:
9f95a23c 40 log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
92f5a8d4 41 return ve.errno, None
92f5a8d4
TL
42
43@contextmanager
adb31ebb 44def open_at_volume(volume_client, volname, groupname, subvolname, op_type):
92f5a8d4
TL
45 with open_volume(volume_client, volname) as fs_handle:
46 with open_group(fs_handle, volume_client.volspec, groupname) as group:
cd265ab1 47 with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
92f5a8d4
TL
48 yield subvolume
49
50@contextmanager
adb31ebb 51def open_at_group(volume_client, fs_handle, groupname, subvolname, op_type):
92f5a8d4 52 with open_group(fs_handle, volume_client.volspec, groupname) as group:
cd265ab1 53 with open_subvol(volume_client.mgr, fs_handle, volume_client.volspec, group, subvolname, op_type) as subvolume:
92f5a8d4
TL
54 yield subvolume
55
adb31ebb
TL
56@contextmanager
57def open_at_group_unique(volume_client, fs_handle, s_groupname, s_subvolname, c_subvolume, c_groupname, c_subvolname, op_type):
58 # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return
59 # the clone subvolume as the source subvolume
60 if s_groupname == c_groupname and s_subvolname == c_subvolname:
61 yield c_subvolume
62 else:
63 with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, op_type) as s_subvolume:
64 yield s_subvolume
65
66
67@contextmanager
68def open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname):
69 with open_at_group(volume_client, fs_handle, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as clone_subvolume:
70 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
71 if groupname == s_groupname and subvolname == s_subvolname:
72 # use the same subvolume to avoid metadata overwrites
73 yield (clone_subvolume, clone_subvolume, s_snapname)
74 else:
75 with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as source_subvolume:
76 yield (clone_subvolume, source_subvolume, s_snapname)
77
92f5a8d4 78def get_clone_state(volume_client, volname, groupname, subvolname):
adb31ebb 79 with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
92f5a8d4
TL
80 return subvolume.state
81
82def set_clone_state(volume_client, volname, groupname, subvolname, state):
adb31ebb 83 with open_at_volume(volume_client, volname, groupname, subvolname, SubvolumeOpType.CLONE_INTERNAL) as subvolume:
92f5a8d4
TL
84 subvolume.state = (state, True)
85
86def get_clone_source(clone_subvolume):
87 source = clone_subvolume._get_clone_source()
88 return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot'])
89
adb31ebb
TL
90def get_next_state_on_error(errnum):
91 if errnum == -errno.EINTR:
92 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
93 SubvolumeStates.STATE_INPROGRESS,
94 SubvolumeActions.ACTION_CANCELLED)
95 else:
96 # jump to failed state, on all other errors
97 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
98 SubvolumeStates.STATE_INPROGRESS,
99 SubvolumeActions.ACTION_FAILED)
100 return next_state
101
92f5a8d4
TL
102def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel):
103 try:
9f95a23c 104 if should_cancel():
adb31ebb
TL
105 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
106 SubvolumeStates.STATE_PENDING,
107 SubvolumeActions.ACTION_CANCELLED)
9f95a23c 108 else:
adb31ebb
TL
109 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
110 SubvolumeStates.STATE_PENDING,
111 SubvolumeActions.ACTION_SUCCESS)
92f5a8d4 112 except OpSmException as oe:
9f95a23c 113 raise VolumeException(oe.errno, oe.error_str)
92f5a8d4
TL
114 return (next_state, False)
115
9f95a23c
TL
116def sync_attrs(fs_handle, target_path, source_statx):
117 try:
118 fs_handle.lchown(target_path, source_statx["uid"], source_statx["gid"])
119 fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()),
120 time.mktime(source_statx["mtime"].timetuple())))
f67539c2 121 fs_handle.lchmod(target_path, source_statx["mode"])
9f95a23c 122 except cephfs.Error as e:
e306af50 123 log.warning("error synchronizing attrs for {0} ({1})".format(target_path, e))
9f95a23c
TL
124 raise e
125
92f5a8d4
TL
126def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
127 """
128 bulk copy data from source to destination -- only directories, symlinks
9f95a23c 129 and regular files are synced.
92f5a8d4
TL
130 """
131 log.info("copying data from {0} to {1}".format(source_path, dst_path))
132 def cptree(src_root_path, dst_root_path):
133 log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path))
134 try:
135 with fs_handle.opendir(src_root_path) as dir_handle:
136 d = fs_handle.readdir(dir_handle)
9f95a23c 137 while d and not should_cancel():
92f5a8d4
TL
138 if d.d_name not in (b".", b".."):
139 log.debug("d={0}".format(d))
140 d_full_src = os.path.join(src_root_path, d.d_name)
141 d_full_dst = os.path.join(dst_root_path, d.d_name)
9f95a23c
TL
142 stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE |
143 cephfs.CEPH_STATX_UID |
144 cephfs.CEPH_STATX_GID |
145 cephfs.CEPH_STATX_ATIME |
146 cephfs.CEPH_STATX_MTIME |
147 cephfs.CEPH_STATX_SIZE,
148 cephfs.AT_SYMLINK_NOFOLLOW)
149 handled = True
150 mo = stx["mode"] & ~stat.S_IFMT(stx["mode"])
151 if stat.S_ISDIR(stx["mode"]):
92f5a8d4
TL
152 log.debug("cptree: (DIR) {0}".format(d_full_src))
153 try:
154 fs_handle.mkdir(d_full_dst, mo)
92f5a8d4
TL
155 except cephfs.Error as e:
156 if not e.args[0] == errno.EEXIST:
157 raise
158 cptree(d_full_src, d_full_dst)
9f95a23c 159 elif stat.S_ISLNK(stx["mode"]):
92f5a8d4
TL
160 log.debug("cptree: (SYMLINK) {0}".format(d_full_src))
161 target = fs_handle.readlink(d_full_src, 4096)
162 try:
9f95a23c 163 fs_handle.symlink(target[:stx["size"]], d_full_dst)
92f5a8d4
TL
164 except cephfs.Error as e:
165 if not e.args[0] == errno.EEXIST:
166 raise
9f95a23c 167 elif stat.S_ISREG(stx["mode"]):
92f5a8d4 168 log.debug("cptree: (REG) {0}".format(d_full_src))
9f95a23c 169 copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel)
92f5a8d4 170 else:
9f95a23c 171 handled = False
e306af50 172 log.warning("cptree: (IGNORE) {0}".format(d_full_src))
9f95a23c
TL
173 if handled:
174 sync_attrs(fs_handle, d_full_dst, stx)
92f5a8d4 175 d = fs_handle.readdir(dir_handle)
9f95a23c
TL
176 stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME |
177 cephfs.CEPH_STATX_MTIME,
178 cephfs.AT_SYMLINK_NOFOLLOW)
179 fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()),
180 time.mktime(stx_root["mtime"].timetuple())))
92f5a8d4
TL
181 except cephfs.Error as e:
182 if not e.args[0] == errno.ENOENT:
183 raise VolumeException(-e.args[0], e.args[1])
184 cptree(source_path, dst_path)
9f95a23c
TL
185 if should_cancel():
186 raise VolumeException(-errno.EINTR, "clone operation interrupted")
92f5a8d4
TL
187
188def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
189 with open_volume_lockless(volume_client, volname) as fs_handle:
adb31ebb
TL
190 with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
191 src_path = clone_volumes[1].snapshot_data_path(clone_volumes[2])
192 dst_path = clone_volumes[0].path
193 bulk_copy(fs_handle, src_path, dst_path, should_cancel)
92f5a8d4
TL
194
195def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
196 try:
197 do_clone(volume_client, volname, groupname, subvolname, should_cancel)
adb31ebb
TL
198 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
199 SubvolumeStates.STATE_INPROGRESS,
200 SubvolumeActions.ACTION_SUCCESS)
92f5a8d4 201 except VolumeException as ve:
adb31ebb 202 next_state = get_next_state_on_error(ve.errno)
92f5a8d4 203 except OpSmException as oe:
9f95a23c 204 raise VolumeException(oe.errno, oe.error_str)
92f5a8d4
TL
205 return (next_state, False)
206
207def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel):
208 try:
92f5a8d4 209 with open_volume(volume_client, volname) as fs_handle:
adb31ebb
TL
210 # detach source but leave the clone section intact for later inspection
211 with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
212 clone_volumes[1].detach_snapshot(clone_volumes[2], index)
92f5a8d4
TL
213 except (MetadataMgrException, VolumeException) as e:
214 log.error("failed to detach clone from snapshot: {0}".format(e))
215 return (None, True)
216
217def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
218 try:
219 with open_volume(volume_client, volname) as fs_handle:
adb31ebb
TL
220 with open_clone_subvolume_pair(volume_client, fs_handle, volname, groupname, subvolname) as clone_volumes:
221 clone_volumes[1].detach_snapshot(clone_volumes[2], index)
222 clone_volumes[0].remove_clone_source(flush=True)
92f5a8d4
TL
223 except (MetadataMgrException, VolumeException) as e:
224 log.error("failed to detach clone from snapshot: {0}".format(e))
225 return (None, True)
226
227def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
228 finished = False
229 current_state = None
230 try:
231 current_state = get_clone_state(volume_client, volname, groupname, subvolname)
232 log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
233 while not finished:
234 handler = state_table.get(current_state, None)
235 if not handler:
236 raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state))
237 (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel)
238 if next_state:
239 log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\
240 current_state, next_state))
241 set_clone_state(volume_client, volname, groupname, subvolname, next_state)
242 current_state = next_state
243 except VolumeException as ve:
244 log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
245 subvolname, current_state, ve))
246
247def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
248 log.info("cloning to subvolume path: {0}".format(clone_path))
249 resolved = resolve(volume_client.volspec, clone_path)
250
251 groupname = resolved[0]
252 subvolname = resolved[1]
253 log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname))
254
255 try:
256 log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
257 start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
258 log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
259 except VolumeException as ve:
260 log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
261
262class Cloner(AsyncJobs):
263 """
264 Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
adb31ebb 265 this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
92f5a8d4
TL
266 the driver. file types supported are directories, symbolic links and regular files.
267 """
268 def __init__(self, volume_client, tp_size):
269 self.vc = volume_client
270 self.state_table = {
adb31ebb
TL
271 SubvolumeStates.STATE_PENDING : handle_clone_pending,
272 SubvolumeStates.STATE_INPROGRESS : handle_clone_in_progress,
273 SubvolumeStates.STATE_COMPLETE : handle_clone_complete,
274 SubvolumeStates.STATE_FAILED : handle_clone_failed,
275 SubvolumeStates.STATE_CANCELED : handle_clone_failed,
92f5a8d4
TL
276 }
277 super(Cloner, self).__init__(volume_client, "cloner", tp_size)
278
f91f0fd5 279 def reconfigure_max_concurrent_clones(self, tp_size):
f67539c2 280 return super(Cloner, self).reconfigure_max_async_threads(tp_size)
f91f0fd5 281
9f95a23c 282 def is_clone_cancelable(self, clone_state):
adb31ebb 283 return not (SubvolumeOpSm.is_complete_state(clone_state) or SubvolumeOpSm.is_failed_state(clone_state))
9f95a23c
TL
284
285 def get_clone_tracking_index(self, fs_handle, clone_subvolume):
286 with open_clone_index(fs_handle, self.vc.volspec) as index:
287 return index.find_clone_entry_index(clone_subvolume.base_path)
288
adb31ebb
TL
289 def _cancel_pending_clone(self, fs_handle, clone_subvolume, clone_subvolname, clone_groupname, status, track_idx):
290 clone_state = SubvolumeStates.from_value(status['state'])
9f95a23c
TL
291 assert self.is_clone_cancelable(clone_state)
292
293 s_groupname = status['source'].get('group', None)
294 s_subvolname = status['source']['subvolume']
295 s_snapname = status['source']['snapshot']
296
adb31ebb
TL
297 with open_at_group_unique(self.vc, fs_handle, s_groupname, s_subvolname, clone_subvolume, clone_groupname,
298 clone_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
299 next_state = SubvolumeOpSm.transition(SubvolumeTypes.TYPE_CLONE,
300 clone_state,
301 SubvolumeActions.ACTION_CANCELLED)
302 clone_subvolume.state = (next_state, True)
303 s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
9f95a23c
TL
304
305 def cancel_job(self, volname, job):
306 """
307 override base class `cancel_job`. interpret @job as (clone, group) tuple.
308 """
309 clonename = job[0]
310 groupname = job[1]
311 track_idx = None
312
313 try:
314 with open_volume(self.vc, volname) as fs_handle:
315 with open_group(fs_handle, self.vc.volspec, groupname) as group:
cd265ab1 316 with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
9f95a23c 317 status = clone_subvolume.status
adb31ebb 318 clone_state = SubvolumeStates.from_value(status['state'])
9f95a23c
TL
319 if not self.is_clone_cancelable(clone_state):
320 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
321 track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume)
322 if not track_idx:
e306af50 323 log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path))
9f95a23c 324 raise VolumeException(-errno.EINVAL, "error canceling clone")
adb31ebb 325 if SubvolumeOpSm.is_init_state(SubvolumeTypes.TYPE_CLONE, clone_state):
9f95a23c 326 # clone has not started yet -- cancel right away.
adb31ebb 327 self._cancel_pending_clone(fs_handle, clone_subvolume, clonename, groupname, status, track_idx)
9f95a23c
TL
328 return
329 # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
330 # to persist the new state, async cloner accesses the volume in exclusive mode.
331 # accessing the volume in exclusive mode here would lead to deadlock.
332 assert track_idx is not None
f67539c2 333 with lock_timeout_log(self.lock):
9f95a23c
TL
334 with open_volume_lockless(self.vc, volname) as fs_handle:
335 with open_group(fs_handle, self.vc.volspec, groupname) as group:
cd265ab1 336 with open_subvol(self.vc.mgr, fs_handle, self.vc.volspec, group, clonename, SubvolumeOpType.CLONE_CANCEL) as clone_subvolume:
9f95a23c
TL
337 if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
338 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
339 except (IndexException, MetadataMgrException) as e:
340 log.error("error cancelling clone {0}: ({1})".format(job, e))
341 raise VolumeException(-errno.EINVAL, "error canceling clone")
342
92f5a8d4
TL
343 def get_next_job(self, volname, running_jobs):
344 return get_next_clone_entry(self.vc, volname, running_jobs)
345
346 def execute_job(self, volname, job, should_cancel):
347 clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)