]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/async_cloner.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / async_cloner.py
1 import os
2 import stat
3 import time
4 import errno
5 import logging
6 from contextlib import contextmanager
7
8 import cephfs
9
10 from .async_job import AsyncJobs
11 from .exception import IndexException, MetadataMgrException, OpSmException, VolumeException
12 from .fs_util import copy_file
13 from .operations.op_sm import OpSm
14 from .operations.resolver import resolve
15 from .operations.volume import open_volume, open_volume_lockless
16 from .operations.group import open_group
17 from .operations.subvolume import open_subvol
18 from .operations.clone_index import open_clone_index
19
20 log = logging.getLogger(__name__)
21
22 # helper for fetching a clone entry for a given volume
23 def get_next_clone_entry(volume_client, volname, running_jobs):
24 log.debug("fetching clone entry for volume '{0}'".format(volname))
25
26 try:
27 with open_volume_lockless(volume_client, volname) as fs_handle:
28 try:
29 with open_clone_index(fs_handle, volume_client.volspec) as clone_index:
30 job = clone_index.get_oldest_clone_entry(running_jobs)
31 return 0, job
32 except IndexException as ve:
33 if ve.errno == -errno.ENOENT:
34 return 0, None
35 raise ve
36 except VolumeException as ve:
37 log.error("error fetching clone entry for volume '{0}' ({1})".format(volname, ve))
38 return ve.errno, None
39
40 @contextmanager
41 def open_at_volume(volume_client, volname, groupname, subvolname, need_complete=False, expected_types=[]):
42 with open_volume(volume_client, volname) as fs_handle:
43 with open_group(fs_handle, volume_client.volspec, groupname) as group:
44 with open_subvol(fs_handle, volume_client.volspec, group, subvolname,
45 need_complete, expected_types) as subvolume:
46 yield subvolume
47
48 @contextmanager
49 def open_at_group(volume_client, fs_handle, groupname, subvolname, need_complete=False, expected_types=[]):
50 with open_group(fs_handle, volume_client.volspec, groupname) as group:
51 with open_subvol(fs_handle, volume_client.volspec, group, subvolname,
52 need_complete, expected_types) as subvolume:
53 yield subvolume
54
55 def get_clone_state(volume_client, volname, groupname, subvolname):
56 with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume:
57 return subvolume.state
58
59 def set_clone_state(volume_client, volname, groupname, subvolname, state):
60 with open_at_volume(volume_client, volname, groupname, subvolname) as subvolume:
61 subvolume.state = (state, True)
62
63 def get_clone_source(clone_subvolume):
64 source = clone_subvolume._get_clone_source()
65 return (source['volume'], source.get('group', None), source['subvolume'], source['snapshot'])
66
67 def handle_clone_pending(volume_client, volname, index, groupname, subvolname, should_cancel):
68 try:
69 if should_cancel():
70 next_state = OpSm.get_next_state("clone", "pending", -errno.EINTR)
71 else:
72 next_state = OpSm.get_next_state("clone", "pending", 0)
73 except OpSmException as oe:
74 raise VolumeException(oe.errno, oe.error_str)
75 return (next_state, False)
76
77 def sync_attrs(fs_handle, target_path, source_statx):
78 try:
79 fs_handle.lchown(target_path, source_statx["uid"], source_statx["gid"])
80 fs_handle.lutimes(target_path, (time.mktime(source_statx["atime"].timetuple()),
81 time.mktime(source_statx["mtime"].timetuple())))
82 except cephfs.Error as e:
83 log.warning("error synchronizing attrs for {0} ({1})".format(target_path, e))
84 raise e
85
86 def bulk_copy(fs_handle, source_path, dst_path, should_cancel):
87 """
88 bulk copy data from source to destination -- only directories, symlinks
89 and regular files are synced.
90 """
91 log.info("copying data from {0} to {1}".format(source_path, dst_path))
92 def cptree(src_root_path, dst_root_path):
93 log.debug("cptree: {0} -> {1}".format(src_root_path, dst_root_path))
94 try:
95 with fs_handle.opendir(src_root_path) as dir_handle:
96 d = fs_handle.readdir(dir_handle)
97 while d and not should_cancel():
98 if d.d_name not in (b".", b".."):
99 log.debug("d={0}".format(d))
100 d_full_src = os.path.join(src_root_path, d.d_name)
101 d_full_dst = os.path.join(dst_root_path, d.d_name)
102 stx = fs_handle.statx(d_full_src, cephfs.CEPH_STATX_MODE |
103 cephfs.CEPH_STATX_UID |
104 cephfs.CEPH_STATX_GID |
105 cephfs.CEPH_STATX_ATIME |
106 cephfs.CEPH_STATX_MTIME |
107 cephfs.CEPH_STATX_SIZE,
108 cephfs.AT_SYMLINK_NOFOLLOW)
109 handled = True
110 mo = stx["mode"] & ~stat.S_IFMT(stx["mode"])
111 if stat.S_ISDIR(stx["mode"]):
112 log.debug("cptree: (DIR) {0}".format(d_full_src))
113 try:
114 fs_handle.mkdir(d_full_dst, mo)
115 except cephfs.Error as e:
116 if not e.args[0] == errno.EEXIST:
117 raise
118 cptree(d_full_src, d_full_dst)
119 elif stat.S_ISLNK(stx["mode"]):
120 log.debug("cptree: (SYMLINK) {0}".format(d_full_src))
121 target = fs_handle.readlink(d_full_src, 4096)
122 try:
123 fs_handle.symlink(target[:stx["size"]], d_full_dst)
124 except cephfs.Error as e:
125 if not e.args[0] == errno.EEXIST:
126 raise
127 elif stat.S_ISREG(stx["mode"]):
128 log.debug("cptree: (REG) {0}".format(d_full_src))
129 copy_file(fs_handle, d_full_src, d_full_dst, mo, cancel_check=should_cancel)
130 else:
131 handled = False
132 log.warning("cptree: (IGNORE) {0}".format(d_full_src))
133 if handled:
134 sync_attrs(fs_handle, d_full_dst, stx)
135 d = fs_handle.readdir(dir_handle)
136 stx_root = fs_handle.statx(src_root_path, cephfs.CEPH_STATX_ATIME |
137 cephfs.CEPH_STATX_MTIME,
138 cephfs.AT_SYMLINK_NOFOLLOW)
139 fs_handle.lutimes(dst_root_path, (time.mktime(stx_root["atime"].timetuple()),
140 time.mktime(stx_root["mtime"].timetuple())))
141 except cephfs.Error as e:
142 if not e.args[0] == errno.ENOENT:
143 raise VolumeException(-e.args[0], e.args[1])
144 cptree(source_path, dst_path)
145 if should_cancel():
146 raise VolumeException(-errno.EINTR, "clone operation interrupted")
147
148 def do_clone(volume_client, volname, groupname, subvolname, should_cancel):
149 with open_volume_lockless(volume_client, volname) as fs_handle:
150 with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
151 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
152 with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
153 src_path = source_subvolume.snapshot_path(s_snapname)
154 dst_path = clone_subvolume.path
155 bulk_copy(fs_handle, src_path, dst_path, should_cancel)
156
157 def handle_clone_in_progress(volume_client, volname, index, groupname, subvolname, should_cancel):
158 try:
159 do_clone(volume_client, volname, groupname, subvolname, should_cancel)
160 next_state = OpSm.get_next_state("clone", "in-progress", 0)
161 except VolumeException as ve:
162 # jump to failed state
163 next_state = OpSm.get_next_state("clone", "in-progress", ve.errno)
164 except OpSmException as oe:
165 raise VolumeException(oe.errno, oe.error_str)
166 return (next_state, False)
167
168 def handle_clone_failed(volume_client, volname, index, groupname, subvolname, should_cancel):
169 try:
170 # detach source but leave the clone section intact for later inspection
171 with open_volume(volume_client, volname) as fs_handle:
172 with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
173 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
174 with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
175 source_subvolume.detach_snapshot(s_snapname, index)
176 except (MetadataMgrException, VolumeException) as e:
177 log.error("failed to detach clone from snapshot: {0}".format(e))
178 return (None, True)
179
180 def handle_clone_complete(volume_client, volname, index, groupname, subvolname, should_cancel):
181 try:
182 with open_volume(volume_client, volname) as fs_handle:
183 with open_at_group(volume_client, fs_handle, groupname, subvolname) as clone_subvolume:
184 s_volname, s_groupname, s_subvolname, s_snapname = get_clone_source(clone_subvolume)
185 with open_at_group(volume_client, fs_handle, s_groupname, s_subvolname) as source_subvolume:
186 source_subvolume.detach_snapshot(s_snapname, index)
187 clone_subvolume.remove_clone_source(flush=True)
188 except (MetadataMgrException, VolumeException) as e:
189 log.error("failed to detach clone from snapshot: {0}".format(e))
190 return (None, True)
191
192 def start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel):
193 finished = False
194 current_state = None
195 try:
196 current_state = get_clone_state(volume_client, volname, groupname, subvolname)
197 log.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname, groupname, subvolname, current_state))
198 while not finished:
199 handler = state_table.get(current_state, None)
200 if not handler:
201 raise VolumeException(-errno.EINVAL, "invalid clone state: \"{0}\"".format(current_state))
202 (next_state, finished) = handler(volume_client, volname, index, groupname, subvolname, should_cancel)
203 if next_state:
204 log.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname, groupname, subvolname,\
205 current_state, next_state))
206 set_clone_state(volume_client, volname, groupname, subvolname, next_state)
207 current_state = next_state
208 except VolumeException as ve:
209 log.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname, groupname,\
210 subvolname, current_state, ve))
211
212 def clone(volume_client, volname, index, clone_path, state_table, should_cancel):
213 log.info("cloning to subvolume path: {0}".format(clone_path))
214 resolved = resolve(volume_client.volspec, clone_path)
215
216 groupname = resolved[0]
217 subvolname = resolved[1]
218 log.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname, subvolname))
219
220 try:
221 log.info("starting clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
222 start_clone_sm(volume_client, volname, index, groupname, subvolname, state_table, should_cancel)
223 log.info("finished clone: ({0}, {1}, {2})".format(volname, groupname, subvolname))
224 except VolumeException as ve:
225 log.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname, groupname, subvolname, ve))
226
227 class Cloner(AsyncJobs):
228 """
229 Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
230 this relies on a simple state machine (which mimics states from OpSm class) as
231 the driver. file types supported are directories, symbolic links and regular files.
232 """
233 def __init__(self, volume_client, tp_size):
234 self.vc = volume_client
235 self.state_table = {
236 'pending' : handle_clone_pending,
237 'in-progress' : handle_clone_in_progress,
238 'complete' : handle_clone_complete,
239 'failed' : handle_clone_failed,
240 'canceled' : handle_clone_failed,
241 }
242 super(Cloner, self).__init__(volume_client, "cloner", tp_size)
243
244 def is_clone_cancelable(self, clone_state):
245 return not (OpSm.is_final_state(clone_state) or OpSm.is_failed_state(clone_state))
246
247 def get_clone_tracking_index(self, fs_handle, clone_subvolume):
248 with open_clone_index(fs_handle, self.vc.volspec) as index:
249 return index.find_clone_entry_index(clone_subvolume.base_path)
250
251 def _cancel_pending_clone(self, fs_handle, clone_subvolume, status, track_idx):
252 clone_state = status['state']
253 assert self.is_clone_cancelable(clone_state)
254
255 s_groupname = status['source'].get('group', None)
256 s_subvolname = status['source']['subvolume']
257 s_snapname = status['source']['snapshot']
258
259 with open_group(fs_handle, self.vc.volspec, s_groupname) as s_group:
260 with open_subvol(fs_handle, self.vc.volspec, s_group, s_subvolname) as s_subvolume:
261 next_state = OpSm.get_next_state("clone", clone_state, -errno.EINTR)
262 clone_subvolume.state = (next_state, True)
263 s_subvolume.detach_snapshot(s_snapname, track_idx.decode('utf-8'))
264
265 def cancel_job(self, volname, job):
266 """
267 override base class `cancel_job`. interpret @job as (clone, group) tuple.
268 """
269 clonename = job[0]
270 groupname = job[1]
271 track_idx = None
272
273 try:
274 with open_volume(self.vc, volname) as fs_handle:
275 with open_group(fs_handle, self.vc.volspec, groupname) as group:
276 with open_subvol(fs_handle, self.vc.volspec, group, clonename,
277 need_complete=False, expected_types=["clone"]) as clone_subvolume:
278 status = clone_subvolume.status
279 clone_state = status['state']
280 if not self.is_clone_cancelable(clone_state):
281 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
282 track_idx = self.get_clone_tracking_index(fs_handle, clone_subvolume)
283 if not track_idx:
284 log.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume.base_path))
285 raise VolumeException(-errno.EINVAL, "error canceling clone")
286 if OpSm.is_init_state("clone", clone_state):
287 # clone has not started yet -- cancel right away.
288 self._cancel_pending_clone(fs_handle, clone_subvolume, status, track_idx)
289 return
290 # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
291 # to persist the new state, async cloner accesses the volume in exclusive mode.
292 # accessing the volume in exclusive mode here would lead to deadlock.
293 assert track_idx is not None
294 with self.lock:
295 with open_volume_lockless(self.vc, volname) as fs_handle:
296 with open_group(fs_handle, self.vc.volspec, groupname) as group:
297 with open_subvol(fs_handle, self.vc.volspec, group, clonename,
298 need_complete=False, expected_types=["clone"]) as clone_subvolume:
299 if not self._cancel_job(volname, (track_idx, clone_subvolume.base_path)):
300 raise VolumeException(-errno.EINVAL, "cannot cancel -- clone finished (check clone status)")
301 except (IndexException, MetadataMgrException) as e:
302 log.error("error cancelling clone {0}: ({1})".format(job, e))
303 raise VolumeException(-errno.EINVAL, "error canceling clone")
304
305 def get_next_job(self, volname, running_jobs):
306 return get_next_clone_entry(self.vc, volname, running_jobs)
307
308 def execute_job(self, volname, job, should_cancel):
309 clone(self.vc, volname, job[0].decode('utf-8'), job[1].decode('utf-8'), self.state_table, should_cancel)