]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/async_cloner.py
61928ec2d0f4230774e341422b84ae8bd09877b6
6 from contextlib
import contextmanager
10 from .async_job
import AsyncJobs
11 from .exception
import IndexException
, MetadataMgrException
, OpSmException
, VolumeException
12 from .fs_util
import copy_file
13 from .operations
.versions
.op_sm
import SubvolumeOpSm
14 from .operations
.versions
.subvolume_attrs
import SubvolumeTypes
, SubvolumeStates
, SubvolumeActions
15 from .operations
.resolver
import resolve
16 from .operations
.volume
import open_volume
, open_volume_lockless
17 from .operations
.group
import open_group
18 from .operations
.subvolume
import open_subvol
19 from .operations
.clone_index
import open_clone_index
20 from .operations
.template
import SubvolumeOpType
22 log
= logging
.getLogger(__name__
)
24 # helper for fetching a clone entry for a given volume
25 def get_next_clone_entry(volume_client
, volname
, running_jobs
):
26 log
.debug("fetching clone entry for volume '{0}'".format(volname
))
29 with
open_volume_lockless(volume_client
, volname
) as fs_handle
:
31 with
open_clone_index(fs_handle
, volume_client
.volspec
) as clone_index
:
32 job
= clone_index
.get_oldest_clone_entry(running_jobs
)
34 except IndexException
as ve
:
35 if ve
.errno
== -errno
.ENOENT
:
38 except VolumeException
as ve
:
39 log
.error("error fetching clone entry for volume '{0}' ({1})".format(volname
, ve
))
43 def open_at_volume(volume_client
, volname
, groupname
, subvolname
, op_type
):
44 with
open_volume(volume_client
, volname
) as fs_handle
:
45 with
open_group(fs_handle
, volume_client
.volspec
, groupname
) as group
:
46 with
open_subvol(fs_handle
, volume_client
.volspec
, group
, subvolname
, op_type
) as subvolume
:
50 def open_at_group(volume_client
, fs_handle
, groupname
, subvolname
, op_type
):
51 with
open_group(fs_handle
, volume_client
.volspec
, groupname
) as group
:
52 with
open_subvol(fs_handle
, volume_client
.volspec
, group
, subvolname
, op_type
) as subvolume
:
56 def open_at_group_unique(volume_client
, fs_handle
, s_groupname
, s_subvolname
, c_subvolume
, c_groupname
, c_subvolname
, op_type
):
57 # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return
58 # the clone subvolume as the source subvolume
59 if s_groupname
== c_groupname
and s_subvolname
== c_subvolname
:
62 with
open_at_group(volume_client
, fs_handle
, s_groupname
, s_subvolname
, op_type
) as s_subvolume
:
67 def open_clone_subvolume_pair(volume_client
, fs_handle
, volname
, groupname
, subvolname
):
68 with
open_at_group(volume_client
, fs_handle
, groupname
, subvolname
, SubvolumeOpType
.CLONE_INTERNAL
) as clone_subvolume
:
69 s_volname
, s_groupname
, s_subvolname
, s_snapname
= get_clone_source(clone_subvolume
)
70 if groupname
== s_groupname
and subvolname
== s_subvolname
:
71 # use the same subvolume to avoid metadata overwrites
72 yield (clone_subvolume
, clone_subvolume
, s_snapname
)
74 with
open_at_group(volume_client
, fs_handle
, s_groupname
, s_subvolname
, SubvolumeOpType
.CLONE_SOURCE
) as source_subvolume
:
75 yield (clone_subvolume
, source_subvolume
, s_snapname
)
77 def get_clone_state(volume_client
, volname
, groupname
, subvolname
):
78 with
open_at_volume(volume_client
, volname
, groupname
, subvolname
, SubvolumeOpType
.CLONE_INTERNAL
) as subvolume
:
79 return subvolume
.state
81 def set_clone_state(volume_client
, volname
, groupname
, subvolname
, state
):
82 with
open_at_volume(volume_client
, volname
, groupname
, subvolname
, SubvolumeOpType
.CLONE_INTERNAL
) as subvolume
:
83 subvolume
.state
= (state
, True)
85 def get_clone_source(clone_subvolume
):
86 source
= clone_subvolume
._get
_clone
_source
()
87 return (source
['volume'], source
.get('group', None), source
['subvolume'], source
['snapshot'])
89 def get_next_state_on_error(errnum
):
90 if errnum
== -errno
.EINTR
:
91 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
92 SubvolumeStates
.STATE_INPROGRESS
,
93 SubvolumeActions
.ACTION_CANCELLED
)
95 # jump to failed state, on all other errors
96 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
97 SubvolumeStates
.STATE_INPROGRESS
,
98 SubvolumeActions
.ACTION_FAILED
)
101 def handle_clone_pending(volume_client
, volname
, index
, groupname
, subvolname
, should_cancel
):
104 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
105 SubvolumeStates
.STATE_PENDING
,
106 SubvolumeActions
.ACTION_CANCELLED
)
108 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
109 SubvolumeStates
.STATE_PENDING
,
110 SubvolumeActions
.ACTION_SUCCESS
)
111 except OpSmException
as oe
:
112 raise VolumeException(oe
.errno
, oe
.error_str
)
113 return (next_state
, False)
115 def sync_attrs(fs_handle
, target_path
, source_statx
):
117 fs_handle
.lchown(target_path
, source_statx
["uid"], source_statx
["gid"])
118 fs_handle
.lutimes(target_path
, (time
.mktime(source_statx
["atime"].timetuple()),
119 time
.mktime(source_statx
["mtime"].timetuple())))
120 except cephfs
.Error
as e
:
121 log
.warning("error synchronizing attrs for {0} ({1})".format(target_path
, e
))
124 def bulk_copy(fs_handle
, source_path
, dst_path
, should_cancel
):
126 bulk copy data from source to destination -- only directories, symlinks
127 and regular files are synced.
129 log
.info("copying data from {0} to {1}".format(source_path
, dst_path
))
130 def cptree(src_root_path
, dst_root_path
):
131 log
.debug("cptree: {0} -> {1}".format(src_root_path
, dst_root_path
))
133 with fs_handle
.opendir(src_root_path
) as dir_handle
:
134 d
= fs_handle
.readdir(dir_handle
)
135 while d
and not should_cancel():
136 if d
.d_name
not in (b
".", b
".."):
137 log
.debug("d={0}".format(d
))
138 d_full_src
= os
.path
.join(src_root_path
, d
.d_name
)
139 d_full_dst
= os
.path
.join(dst_root_path
, d
.d_name
)
140 stx
= fs_handle
.statx(d_full_src
, cephfs
.CEPH_STATX_MODE |
141 cephfs
.CEPH_STATX_UID |
142 cephfs
.CEPH_STATX_GID |
143 cephfs
.CEPH_STATX_ATIME |
144 cephfs
.CEPH_STATX_MTIME |
145 cephfs
.CEPH_STATX_SIZE
,
146 cephfs
.AT_SYMLINK_NOFOLLOW
)
148 mo
= stx
["mode"] & ~stat
.S_IFMT(stx
["mode"])
149 if stat
.S_ISDIR(stx
["mode"]):
150 log
.debug("cptree: (DIR) {0}".format(d_full_src
))
152 fs_handle
.mkdir(d_full_dst
, mo
)
153 except cephfs
.Error
as e
:
154 if not e
.args
[0] == errno
.EEXIST
:
156 cptree(d_full_src
, d_full_dst
)
157 elif stat
.S_ISLNK(stx
["mode"]):
158 log
.debug("cptree: (SYMLINK) {0}".format(d_full_src
))
159 target
= fs_handle
.readlink(d_full_src
, 4096)
161 fs_handle
.symlink(target
[:stx
["size"]], d_full_dst
)
162 except cephfs
.Error
as e
:
163 if not e
.args
[0] == errno
.EEXIST
:
165 elif stat
.S_ISREG(stx
["mode"]):
166 log
.debug("cptree: (REG) {0}".format(d_full_src
))
167 copy_file(fs_handle
, d_full_src
, d_full_dst
, mo
, cancel_check
=should_cancel
)
170 log
.warning("cptree: (IGNORE) {0}".format(d_full_src
))
172 sync_attrs(fs_handle
, d_full_dst
, stx
)
173 d
= fs_handle
.readdir(dir_handle
)
174 stx_root
= fs_handle
.statx(src_root_path
, cephfs
.CEPH_STATX_ATIME |
175 cephfs
.CEPH_STATX_MTIME
,
176 cephfs
.AT_SYMLINK_NOFOLLOW
)
177 fs_handle
.lutimes(dst_root_path
, (time
.mktime(stx_root
["atime"].timetuple()),
178 time
.mktime(stx_root
["mtime"].timetuple())))
179 except cephfs
.Error
as e
:
180 if not e
.args
[0] == errno
.ENOENT
:
181 raise VolumeException(-e
.args
[0], e
.args
[1])
182 cptree(source_path
, dst_path
)
184 raise VolumeException(-errno
.EINTR
, "clone operation interrupted")
186 def do_clone(volume_client
, volname
, groupname
, subvolname
, should_cancel
):
187 with
open_volume_lockless(volume_client
, volname
) as fs_handle
:
188 with
open_clone_subvolume_pair(volume_client
, fs_handle
, volname
, groupname
, subvolname
) as clone_volumes
:
189 src_path
= clone_volumes
[1].snapshot_data_path(clone_volumes
[2])
190 dst_path
= clone_volumes
[0].path
191 bulk_copy(fs_handle
, src_path
, dst_path
, should_cancel
)
193 def handle_clone_in_progress(volume_client
, volname
, index
, groupname
, subvolname
, should_cancel
):
195 do_clone(volume_client
, volname
, groupname
, subvolname
, should_cancel
)
196 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
197 SubvolumeStates
.STATE_INPROGRESS
,
198 SubvolumeActions
.ACTION_SUCCESS
)
199 except VolumeException
as ve
:
200 next_state
= get_next_state_on_error(ve
.errno
)
201 except OpSmException
as oe
:
202 raise VolumeException(oe
.errno
, oe
.error_str
)
203 return (next_state
, False)
205 def handle_clone_failed(volume_client
, volname
, index
, groupname
, subvolname
, should_cancel
):
207 with
open_volume(volume_client
, volname
) as fs_handle
:
208 # detach source but leave the clone section intact for later inspection
209 with
open_clone_subvolume_pair(volume_client
, fs_handle
, volname
, groupname
, subvolname
) as clone_volumes
:
210 clone_volumes
[1].detach_snapshot(clone_volumes
[2], index
)
211 except (MetadataMgrException
, VolumeException
) as e
:
212 log
.error("failed to detach clone from snapshot: {0}".format(e
))
215 def handle_clone_complete(volume_client
, volname
, index
, groupname
, subvolname
, should_cancel
):
217 with
open_volume(volume_client
, volname
) as fs_handle
:
218 with
open_clone_subvolume_pair(volume_client
, fs_handle
, volname
, groupname
, subvolname
) as clone_volumes
:
219 clone_volumes
[1].detach_snapshot(clone_volumes
[2], index
)
220 clone_volumes
[0].remove_clone_source(flush
=True)
221 except (MetadataMgrException
, VolumeException
) as e
:
222 log
.error("failed to detach clone from snapshot: {0}".format(e
))
225 def start_clone_sm(volume_client
, volname
, index
, groupname
, subvolname
, state_table
, should_cancel
):
229 current_state
= get_clone_state(volume_client
, volname
, groupname
, subvolname
)
230 log
.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname
, groupname
, subvolname
, current_state
))
232 handler
= state_table
.get(current_state
, None)
234 raise VolumeException(-errno
.EINVAL
, "invalid clone state: \"{0}\"".format(current_state
))
235 (next_state
, finished
) = handler(volume_client
, volname
, index
, groupname
, subvolname
, should_cancel
)
237 log
.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname
, groupname
, subvolname
,\
238 current_state
, next_state
))
239 set_clone_state(volume_client
, volname
, groupname
, subvolname
, next_state
)
240 current_state
= next_state
241 except VolumeException
as ve
:
242 log
.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname
, groupname
,\
243 subvolname
, current_state
, ve
))
245 def clone(volume_client
, volname
, index
, clone_path
, state_table
, should_cancel
):
246 log
.info("cloning to subvolume path: {0}".format(clone_path
))
247 resolved
= resolve(volume_client
.volspec
, clone_path
)
249 groupname
= resolved
[0]
250 subvolname
= resolved
[1]
251 log
.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname
, subvolname
))
254 log
.info("starting clone: ({0}, {1}, {2})".format(volname
, groupname
, subvolname
))
255 start_clone_sm(volume_client
, volname
, index
, groupname
, subvolname
, state_table
, should_cancel
)
256 log
.info("finished clone: ({0}, {1}, {2})".format(volname
, groupname
, subvolname
))
257 except VolumeException
as ve
:
258 log
.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname
, groupname
, subvolname
, ve
))
260 class Cloner(AsyncJobs
):
262 Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
263 this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
264 the driver. file types supported are directories, symbolic links and regular files.
266 def __init__(self
, volume_client
, tp_size
):
267 self
.vc
= volume_client
269 SubvolumeStates
.STATE_PENDING
: handle_clone_pending
,
270 SubvolumeStates
.STATE_INPROGRESS
: handle_clone_in_progress
,
271 SubvolumeStates
.STATE_COMPLETE
: handle_clone_complete
,
272 SubvolumeStates
.STATE_FAILED
: handle_clone_failed
,
273 SubvolumeStates
.STATE_CANCELED
: handle_clone_failed
,
275 super(Cloner
, self
).__init
__(volume_client
, "cloner", tp_size
)
277 def reconfigure_max_concurrent_clones(self
, tp_size
):
278 super(Cloner
, self
).reconfigure_max_concurrent_clones("cloner", tp_size
)
280 def is_clone_cancelable(self
, clone_state
):
281 return not (SubvolumeOpSm
.is_complete_state(clone_state
) or SubvolumeOpSm
.is_failed_state(clone_state
))
283 def get_clone_tracking_index(self
, fs_handle
, clone_subvolume
):
284 with
open_clone_index(fs_handle
, self
.vc
.volspec
) as index
:
285 return index
.find_clone_entry_index(clone_subvolume
.base_path
)
287 def _cancel_pending_clone(self
, fs_handle
, clone_subvolume
, clone_subvolname
, clone_groupname
, status
, track_idx
):
288 clone_state
= SubvolumeStates
.from_value(status
['state'])
289 assert self
.is_clone_cancelable(clone_state
)
291 s_groupname
= status
['source'].get('group', None)
292 s_subvolname
= status
['source']['subvolume']
293 s_snapname
= status
['source']['snapshot']
295 with
open_at_group_unique(self
.vc
, fs_handle
, s_groupname
, s_subvolname
, clone_subvolume
, clone_groupname
,
296 clone_subvolname
, SubvolumeOpType
.CLONE_SOURCE
) as s_subvolume
:
297 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
299 SubvolumeActions
.ACTION_CANCELLED
)
300 clone_subvolume
.state
= (next_state
, True)
301 s_subvolume
.detach_snapshot(s_snapname
, track_idx
.decode('utf-8'))
303 def cancel_job(self
, volname
, job
):
305 override base class `cancel_job`. interpret @job as (clone, group) tuple.
312 with
open_volume(self
.vc
, volname
) as fs_handle
:
313 with
open_group(fs_handle
, self
.vc
.volspec
, groupname
) as group
:
314 with
open_subvol(fs_handle
, self
.vc
.volspec
, group
, clonename
, SubvolumeOpType
.CLONE_CANCEL
) as clone_subvolume
:
315 status
= clone_subvolume
.status
316 clone_state
= SubvolumeStates
.from_value(status
['state'])
317 if not self
.is_clone_cancelable(clone_state
):
318 raise VolumeException(-errno
.EINVAL
, "cannot cancel -- clone finished (check clone status)")
319 track_idx
= self
.get_clone_tracking_index(fs_handle
, clone_subvolume
)
321 log
.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume
.base_path
))
322 raise VolumeException(-errno
.EINVAL
, "error canceling clone")
323 if SubvolumeOpSm
.is_init_state(SubvolumeTypes
.TYPE_CLONE
, clone_state
):
324 # clone has not started yet -- cancel right away.
325 self
._cancel
_pending
_clone
(fs_handle
, clone_subvolume
, clonename
, groupname
, status
, track_idx
)
327 # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
328 # to persist the new state, async cloner accesses the volume in exclusive mode.
329 # accessing the volume in exclusive mode here would lead to deadlock.
330 assert track_idx
is not None
332 with
open_volume_lockless(self
.vc
, volname
) as fs_handle
:
333 with
open_group(fs_handle
, self
.vc
.volspec
, groupname
) as group
:
334 with
open_subvol(fs_handle
, self
.vc
.volspec
, group
, clonename
, SubvolumeOpType
.CLONE_CANCEL
) as clone_subvolume
:
335 if not self
._cancel
_job
(volname
, (track_idx
, clone_subvolume
.base_path
)):
336 raise VolumeException(-errno
.EINVAL
, "cannot cancel -- clone finished (check clone status)")
337 except (IndexException
, MetadataMgrException
) as e
:
338 log
.error("error cancelling clone {0}: ({1})".format(job
, e
))
339 raise VolumeException(-errno
.EINVAL
, "error canceling clone")
341 def get_next_job(self
, volname
, running_jobs
):
342 return get_next_clone_entry(self
.vc
, volname
, running_jobs
)
344 def execute_job(self
, volname
, job
, should_cancel
):
345 clone(self
.vc
, volname
, job
[0].decode('utf-8'), job
[1].decode('utf-8'), self
.state_table
, should_cancel
)