]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/mgr/volumes/fs/async_cloner.py
Import ceph 15.2.8
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_cloner.py
CommitLineData
92f5a8d4
TL
1import os
2import stat
3import time
4import errno
5import logging
6from contextlib import contextmanager
7
8import cephfs
9
10from .async_job import AsyncJobs
11from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
12from .fs_util import copy_file
13from .operations.op_sm import OpSm
14from .operations.resolver import resolve
15from .operations.volume import open_volume, open_volume_lockless
16from .operations.group import open_group
17from .operations.subvolume import open_subvol
18from .operations.clone_index import open_clone_index
19
20log = logging.getLogger(__name__)
21
22# helper for fetching a clone entry for a given volume
23def get_next_clone_entry(volume_client, volname, running_jobs):
24 log.debug("fetching clone entry for volume '{0}'".format(volname))
25
26 try:
27 with open_volume_lockless(volume_client, volname) as fs_handle:
28 try:
29 with open_clone_index(fs_handle, volume_client.volspec) as clone_index:
30 job = clone_index.get_oldest_clone_entry(running_jobs)
31 return 0, job
32 except IndexException as ve:
33 if ve.errno == -errno.ENOENT:
34 return 0, None
35 raise ve
36 except VolumeException as ve:
9f95a23c 37 log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
92f5a8d4 38 return ve.errno, None
92f5a8d4
TL
39
40@contextmanager
41def open_at_volume(volume_client, volname, groupname, subvolname, need_complete=False, expected_types=[]):
42 with open_volume(volume_client, volname) as fs_handle:
43 with open_group(fs_handle, volume_client.volspec, groupname) as group:
44 with open_subvol(fs_handle, volume_client.volspec, group, subvolname,
45 need_complete, expected_types) as subvolume:
46 yield subvolume
47
48@contextmanager
49def open_at_group(volume_client, fs_handle, groupname, subvolname, need_complete=False, expected_types=[]):
50 with open_group(fs_handle, volume_client.volspec, groupname) as group:
51 with open_subvol(fs_handle, volume_client.volspec, group, subvolname,
52 need_complete, expected_types) as subvolume:
53 yield subvolume
54
55def get_clone_state(volume_client, volname, groupname, subvolname):
56 with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume:
57 return subvolume.state
58
59def set_clone_state(volume_client, volname, groupname, subvolname, state):
60 with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume:
61 subvolume.state = (state, True)
62
63def get_clone_source(clone_subvolume):
64 source = clone_subvolume._get_clone_source()
65 return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot'])
66
67def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel):
68 try:
9f95a23c
TL
69 if should_cancel():
70 next_state = OpSm.get_next_state("clone", "pending", -errno.EINTR)
71 else:
72 next_state = OpSm.get_next_state("clone", "pending", 0)
92f5a8d4 73 except OpSmException as oe:
9f95a23c 74 raise VolumeException(oe.errno, oe.error_str)
92f5a8d4
TL
75 return (next_state, False)
76
9f95a23c
TL
77def sync_attrs(fs_handle, target_path, source_statx):
78 try:
79 fs_handle.lchown(target_path, source_statx["uid"], source_statx["gid"])
80 fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()),
81 time.mktime(source_statx["mtime"].timetuple())))
82 except cephfs.Error as e:
e306af50 83 log.warning("error synchronizing attrs for {0} ({1})".format(target_path, e))
9f95a23c
TL
84 raise e
85
92f5a8d4
TL
86def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
87 """
88 bulk copy data from source to destination -- only directories, symlinks
9f95a23c 89 and regular files are synced.
92f5a8d4
TL
90 """
91 log.info("copying data from {0} to {1}".format(source_path, dst_path))
92 def cptree(src_root_path, dst_root_path):
93 log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path))
94 try:
95 with fs_handle.opendir(src_root_path) as dir_handle:
96 d = fs_handle.readdir(dir_handle)
9f95a23c 97 while d and not should_cancel():
92f5a8d4
TL
98 if d.d_name not in (b".", b".."):
99 log.debug("d={0}".format(d))
100 d_full_src = os.path.join(src_root_path, d.d_name)
101 d_full_dst = os.path.join(dst_root_path, d.d_name)
9f95a23c
TL
102 stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE |
103 cephfs.CEPH_STATX_UID |
104 cephfs.CEPH_STATX_GID |
105 cephfs.CEPH_STATX_ATIME |
106 cephfs.CEPH_STATX_MTIME |
107 cephfs.CEPH_STATX_SIZE,
108 cephfs.AT_SYMLINK_NOFOLLOW)
109 handled = True
110 mo = stx["mode"] & ~stat.S_IFMT(stx["mode"])
111 if stat.S_ISDIR(stx["mode"]):
92f5a8d4
TL
112 log.debug("cptree: (DIR) {0}".format(d_full_src))
113 try:
114 fs_handle.mkdir(d_full_dst, mo)
92f5a8d4
TL
115 except cephfs.Error as e:
116 if not e.args[0] == errno.EEXIST:
117 raise
118 cptree(d_full_src, d_full_dst)
9f95a23c 119 elif stat.S_ISLNK(stx["mode"]):
92f5a8d4
TL
120 log.debug("cptree: (SYMLINK) {0}".format(d_full_src))
121 target = fs_handle.readlink(d_full_src, 4096)
122 try:
9f95a23c 123 fs_handle.symlink(target[:stx["size"]], d_full_dst)
92f5a8d4
TL
124 except cephfs.Error as e:
125 if not e.args[0] == errno.EEXIST:
126 raise
9f95a23c 127 elif stat.S_ISREG(stx["mode"]):
92f5a8d4 128 log.debug("cptree: (REG) {0}".format(d_full_src))
9f95a23c 129 copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel)
92f5a8d4 130 else:
9f95a23c 131 handled = False
e306af50 132 log.warning("cptree: (IGNORE) {0}".format(d_full_src))
9f95a23c
TL
133 if handled:
134 sync_attrs(fs_handle, d_full_dst, stx)
92f5a8d4 135 d = fs_handle.readdir(dir_handle)
9f95a23c
TL
136 stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME |
137 cephfs.CEPH_STATX_MTIME,
138 cephfs.AT_SYMLINK_NOFOLLOW)
139 fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()),
140 time.mktime(stx_root["mtime"].timetuple())))
92f5a8d4
TL
141 except cephfs.Error as e:
142 if not e.args[0] == errno.ENOENT:
143 raise VolumeException(-e.args[0], e.args[1])
144 cptree(source_path, dst_path)
9f95a23c
TL
145 if should_cancel():
146 raise VolumeException(-errno.EINTR, "clone operation interrupted")
92f5a8d4
TL
147
148def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
149 with open_volume_lockless(volume_client, volname) as fs_handle:
150 with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
151 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
152 with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
153 src_path = source_subvolume.snapshot_path(s_snapname)
154 dst_path = clone_subvolume.path
155 bulk_copy(fs_handle, src_path, dst_path, should_cancel)
156
157def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
158 try:
159 do_clone(volume_client, volname, groupname, subvolname, should_cancel)
160 next_state = OpSm.get_next_state("clone", "in-progress", 0)
161 except VolumeException as ve:
162 # jump to failed state
9f95a23c 163 next_state = OpSm.get_next_state("clone", "in-progress", ve.errno)
92f5a8d4 164 except OpSmException as oe:
9f95a23c 165 raise VolumeException(oe.errno, oe.error_str)
92f5a8d4
TL
166 return (next_state, False)
167
168def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel):
169 try:
170 # detach source but leave the clone section intact for later inspection
171 with open_volume(volume_client, volname) as fs_handle:
172 with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
173 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
174 with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
175 source_subvolume.detach_snapshot(s_snapname, index)
176 except (MetadataMgrException, VolumeException) as e:
177 log.error("failed to detach clone from snapshot: {0}".format(e))
178 return (None, True)
179
180def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
181 try:
182 with open_volume(volume_client, volname) as fs_handle:
183 with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
184 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
185 with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
186 source_subvolume.detach_snapshot(s_snapname, index)
187 clone_subvolume.remove_clone_source(flush=True)
188 except (MetadataMgrException, VolumeException) as e:
189 log.error("failed to detach clone from snapshot: {0}".format(e))
190 return (None, True)
191
192def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
193 finished = False
194 current_state = None
195 try:
196 current_state = get_clone_state(volume_client, volname, groupname, subvolname)
197 log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
198 while not finished:
199 handler = state_table.get(current_state, None)
200 if not handler:
201 raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state))
202 (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel)
203 if next_state:
204 log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\
205 current_state, next_state))
206 set_clone_state(volume_client, volname, groupname, subvolname, next_state)
207 current_state = next_state
208 except VolumeException as ve:
209 log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
210 subvolname, current_state, ve))
211
212def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
213 log.info("cloning to subvolume path: {0}".format(clone_path))
214 resolved = resolve(volume_client.volspec, clone_path)
215
216 groupname = resolved[0]
217 subvolname = resolved[1]
218 log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname))
219
220 try:
221 log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
222 start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
223 log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
224 except VolumeException as ve:
225 log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
226
227class Cloner(AsyncJobs):
228 """
229 Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
230 this relies on a simple state machine (which mimics states from OpSm class) as
231 the driver. file types supported are directories, symbolic links and regular files.
232 """
233 def __init__(self, volume_client, tp_size):
234 self.vc = volume_client
235 self.state_table = {
236 'pending' : handle_clone_pending,
237 'in-progress' : handle_clone_in_progress,
238 'complete' : handle_clone_complete,
9f95a23c
TL
239 'failed' : handle_clone_failed,
240 'canceled' : handle_clone_failed,
92f5a8d4
TL
241 }
242 super(Cloner, self).__init__(volume_client, "cloner", tp_size)
243
f91f0fd5
TL
244 def reconfigure_max_concurrent_clones(self, tp_size):
245 super(Cloner, self).reconfigure_max_concurrent_clones("cloner", tp_size)
246
9f95a23c
TL
247 def is_clone_cancelable(self, clone_state):
248 return not (OpSm.is_final_state(clone_state) or OpSm.is_failed_state(clone_state))
249
250 def get_clone_tracking_index(self, fs_handle, clone_subvolume):
251 with open_clone_index(fs_handle, self.vc.volspec) as index:
252 return index.find_clone_entry_index(clone_subvolume.base_path)
253
254 def _cancel_pending_clone(self, fs_handle, clone_subvolume, status, track_idx):
255 clone_state = status['state']
256 assert self.is_clone_cancelable(clone_state)
257
258 s_groupname = status['source'].get('group', None)
259 s_subvolname = status['source']['subvolume']
260 s_snapname = status['source']['snapshot']
261
262 with open_group(fs_handle, self.vc.volspec, s_groupname) as s_group:
263 with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname) as s_subvolume:
264 next_state = OpSm.get_next_state("clone", clone_state, -errno.EINTR)
265 clone_subvolume.state = (next_state, True)
266 s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
267
268 def cancel_job(self, volname, job):
269 """
270 override base class `cancel_job`. interpret @job as (clone, group) tuple.
271 """
272 clonename = job[0]
273 groupname = job[1]
274 track_idx = None
275
276 try:
277 with open_volume(self.vc, volname) as fs_handle:
278 with open_group(fs_handle, self.vc.volspec, groupname) as group:
279 with open_subvol(fs_handle, self.vc.volspec, group, clonename,
280 need_complete=False, expected_types=["clone"]) as clone_subvolume:
281 status = clone_subvolume.status
282 clone_state = status['state']
283 if not self.is_clone_cancelable(clone_state):
284 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
285 track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume)
286 if not track_idx:
e306af50 287 log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path))
9f95a23c
TL
288 raise VolumeException(-errno.EINVAL, "error canceling clone")
289 if OpSm.is_init_state("clone", clone_state):
290 # clone has not started yet -- cancel right away.
291 self._cancel_pending_clone(fs_handle, clone_subvolume, status, track_idx)
292 return
293 # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
294 # to persist the new state, async cloner accesses the volume in exclusive mode.
295 # accessing the volume in exclusive mode here would lead to deadlock.
296 assert track_idx is not None
297 with self.lock:
298 with open_volume_lockless(self.vc, volname) as fs_handle:
299 with open_group(fs_handle, self.vc.volspec, groupname) as group:
300 with open_subvol(fs_handle, self.vc.volspec, group, clonename,
301 need_complete=False, expected_types=["clone"]) as clone_subvolume:
302 if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
303 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
304 except (IndexException, MetadataMgrException) as e:
305 log.error("error cancelling clone {0}: ({1})".format(job, e))
306 raise VolumeException(-errno.EINVAL, "error canceling clone")
307
92f5a8d4
TL
308 def get_next_job(self, volname, running_jobs):
309 return get_next_clone_entry(self.vc, volname, running_jobs)
310
311 def execute_job(self, volname, job, should_cancel):
312 clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)