]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/async_cloner.py
6 from contextlib
import contextmanager
10 from .async_job
import AsyncJobs
11 from .exception
import IndexException
, MetadataMgrException
, OpSmException
, VolumeException
12 from .fs_util
import copy_file
13 from .operations
.op_sm
import OpSm
14 from .operations
.resolver
import resolve
15 from .operations
.volume
import open_volume
, open_volume_lockless
16 from .operations
.group
import open_group
17 from .operations
.subvolume
import open_subvol
18 from .operations
.clone_index
import open_clone_index
20 log
= logging
.getLogger(__name__
)
22 # helper for fetching a clone entry for a given volume
23 def get_next_clone_entry(volume_client
, volname
, running_jobs
):
24 log
.debug("fetching clone entry for volume '{0}'".format(volname
))
27 with
open_volume_lockless(volume_client
, volname
) as fs_handle
:
29 with
open_clone_index(fs_handle
, volume_client
.volspec
) as clone_index
:
30 job
= clone_index
.get_oldest_clone_entry(running_jobs
)
32 except IndexException
as ve
:
33 if ve
.errno
== -errno
.ENOENT
:
36 except VolumeException
as ve
:
37 log
.error("error fetching clone entry for volume '{0}' ({1})".format(volname
, ve
))
41 def open_at_volume(volume_client
, volname
, groupname
, subvolname
, need_complete
=False, expected_types
=[]):
42 with
open_volume(volume_client
, volname
) as fs_handle
:
43 with
open_group(fs_handle
, volume_client
.volspec
, groupname
) as group
:
44 with
open_subvol(fs_handle
, volume_client
.volspec
, group
, subvolname
,
45 need_complete
, expected_types
) as subvolume
:
49 def open_at_group(volume_client
, fs_handle
, groupname
, subvolname
, need_complete
=False, expected_types
=[]):
50 with
open_group(fs_handle
, volume_client
.volspec
, groupname
) as group
:
51 with
open_subvol(fs_handle
, volume_client
.volspec
, group
, subvolname
,
52 need_complete
, expected_types
) as subvolume
:
55 def get_clone_state(volume_client
, volname
, groupname
, subvolname
):
56 with
open_at_volume(volume_client
, volname
, groupname
, subvolname
) as subvolume
:
57 return subvolume
.state
59 def set_clone_state(volume_client
, volname
, groupname
, subvolname
, state
):
60 with
open_at_volume(volume_client
, volname
, groupname
, subvolname
) as subvolume
:
61 subvolume
.state
= (state
, True)
63 def get_clone_source(clone_subvolume
):
64 source
= clone_subvolume
._get
_clone
_source
()
65 return (source
['volume'], source
.get('group', None), source
['subvolume'], source
['snapshot'])
67 def handle_clone_pending(volume_client
, volname
, index
, groupname
, subvolname
, should_cancel
):
70 next_state
= OpSm
.get_next_state("clone", "pending", -errno
.EINTR
)
72 next_state
= OpSm
.get_next_state("clone", "pending", 0)
73 except OpSmException
as oe
:
74 raise VolumeException(oe
.errno
, oe
.error_str
)
75 return (next_state
, False)
77 def sync_attrs(fs_handle
, target_path
, source_statx
):
79 fs_handle
.lchown(target_path
, source_statx
["uid"], source_statx
["gid"])
80 fs_handle
.lutimes(target_path
, (time
.mktime(source_statx
["atime"].timetuple()),
81 time
.mktime(source_statx
["mtime"].timetuple())))
82 except cephfs
.Error
as e
:
83 log
.warning("error synchronizing attrs for {0} ({1})".format(target_path
, e
))
86 def bulk_copy(fs_handle
, source_path
, dst_path
, should_cancel
):
88 bulk copy data from source to destination -- only directories, symlinks
89 and regular files are synced.
91 log
.info("copying data from {0} to {1}".format(source_path
, dst_path
))
92 def cptree(src_root_path
, dst_root_path
):
93 log
.debug("cptree: {0} -> {1}".format(src_root_path
, dst_root_path
))
95 with fs_handle
.opendir(src_root_path
) as dir_handle
:
96 d
= fs_handle
.readdir(dir_handle
)
97 while d
and not should_cancel():
98 if d
.d_name
not in (b
".", b
".."):
99 log
.debug("d={0}".format(d
))
100 d_full_src
= os
.path
.join(src_root_path
, d
.d_name
)
101 d_full_dst
= os
.path
.join(dst_root_path
, d
.d_name
)
102 stx
= fs_handle
.statx(d_full_src
, cephfs
.CEPH_STATX_MODE |
103 cephfs
.CEPH_STATX_UID |
104 cephfs
.CEPH_STATX_GID |
105 cephfs
.CEPH_STATX_ATIME |
106 cephfs
.CEPH_STATX_MTIME |
107 cephfs
.CEPH_STATX_SIZE
,
108 cephfs
.AT_SYMLINK_NOFOLLOW
)
110 mo
= stx
["mode"] & ~stat
.S_IFMT(stx
["mode"])
111 if stat
.S_ISDIR(stx
["mode"]):
112 log
.debug("cptree: (DIR) {0}".format(d_full_src
))
114 fs_handle
.mkdir(d_full_dst
, mo
)
115 except cephfs
.Error
as e
:
116 if not e
.args
[0] == errno
.EEXIST
:
118 cptree(d_full_src
, d_full_dst
)
119 elif stat
.S_ISLNK(stx
["mode"]):
120 log
.debug("cptree: (SYMLINK) {0}".format(d_full_src
))
121 target
= fs_handle
.readlink(d_full_src
, 4096)
123 fs_handle
.symlink(target
[:stx
["size"]], d_full_dst
)
124 except cephfs
.Error
as e
:
125 if not e
.args
[0] == errno
.EEXIST
:
127 elif stat
.S_ISREG(stx
["mode"]):
128 log
.debug("cptree: (REG) {0}".format(d_full_src
))
129 copy_file(fs_handle
, d_full_src
, d_full_dst
, mo
, cancel_check
=should_cancel
)
132 log
.warning("cptree: (IGNORE) {0}".format(d_full_src
))
134 sync_attrs(fs_handle
, d_full_dst
, stx
)
135 d
= fs_handle
.readdir(dir_handle
)
136 stx_root
= fs_handle
.statx(src_root_path
, cephfs
.CEPH_STATX_ATIME |
137 cephfs
.CEPH_STATX_MTIME
,
138 cephfs
.AT_SYMLINK_NOFOLLOW
)
139 fs_handle
.lutimes(dst_root_path
, (time
.mktime(stx_root
["atime"].timetuple()),
140 time
.mktime(stx_root
["mtime"].timetuple())))
141 except cephfs
.Error
as e
:
142 if not e
.args
[0] == errno
.ENOENT
:
143 raise VolumeException(-e
.args
[0], e
.args
[1])
144 cptree(source_path
, dst_path
)
146 raise VolumeException(-errno
.EINTR
, "clone operation interrupted")
148 def do_clone(volume_client
, volname
, groupname
, subvolname
, should_cancel
):
149 with
open_volume_lockless(volume_client
, volname
) as fs_handle
:
150 with
open_at_group(volume_client
, fs_handle
, groupname
, subvolname
) as clone_subvolume
:
151 s_volname
, s_groupname
, s_subvolname
, s_snapname
= get_clone_source(clone_subvolume
)
152 with
open_at_group(volume_client
, fs_handle
, s_groupname
, s_subvolname
) as source_subvolume
:
153 src_path
= source_subvolume
.snapshot_path(s_snapname
)
154 dst_path
= clone_subvolume
.path
155 bulk_copy(fs_handle
, src_path
, dst_path
, should_cancel
)
157 def handle_clone_in_progress(volume_client
, volname
, index
, groupname
, subvolname
, should_cancel
):
159 do_clone(volume_client
, volname
, groupname
, subvolname
, should_cancel
)
160 next_state
= OpSm
.get_next_state("clone", "in-progress", 0)
161 except VolumeException
as ve
:
162 # jump to failed state
163 next_state
= OpSm
.get_next_state("clone", "in-progress", ve
.errno
)
164 except OpSmException
as oe
:
165 raise VolumeException(oe
.errno
, oe
.error_str
)
166 return (next_state
, False)
168 def handle_clone_failed(volume_client
, volname
, index
, groupname
, subvolname
, should_cancel
):
170 # detach source but leave the clone section intact for later inspection
171 with
open_volume(volume_client
, volname
) as fs_handle
:
172 with
open_at_group(volume_client
, fs_handle
, groupname
, subvolname
) as clone_subvolume
:
173 s_volname
, s_groupname
, s_subvolname
, s_snapname
= get_clone_source(clone_subvolume
)
174 with
open_at_group(volume_client
, fs_handle
, s_groupname
, s_subvolname
) as source_subvolume
:
175 source_subvolume
.detach_snapshot(s_snapname
, index
)
176 except (MetadataMgrException
, VolumeException
) as e
:
177 log
.error("failed to detach clone from snapshot: {0}".format(e
))
180 def handle_clone_complete(volume_client
, volname
, index
, groupname
, subvolname
, should_cancel
):
182 with
open_volume(volume_client
, volname
) as fs_handle
:
183 with
open_at_group(volume_client
, fs_handle
, groupname
, subvolname
) as clone_subvolume
:
184 s_volname
, s_groupname
, s_subvolname
, s_snapname
= get_clone_source(clone_subvolume
)
185 with
open_at_group(volume_client
, fs_handle
, s_groupname
, s_subvolname
) as source_subvolume
:
186 source_subvolume
.detach_snapshot(s_snapname
, index
)
187 clone_subvolume
.remove_clone_source(flush
=True)
188 except (MetadataMgrException
, VolumeException
) as e
:
189 log
.error("failed to detach clone from snapshot: {0}".format(e
))
192 def start_clone_sm(volume_client
, volname
, index
, groupname
, subvolname
, state_table
, should_cancel
):
196 current_state
= get_clone_state(volume_client
, volname
, groupname
, subvolname
)
197 log
.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname
, groupname
, subvolname
, current_state
))
199 handler
= state_table
.get(current_state
, None)
201 raise VolumeException(-errno
.EINVAL
, "invalid clone state: \"{0}\"".format(current_state
))
202 (next_state
, finished
) = handler(volume_client
, volname
, index
, groupname
, subvolname
, should_cancel
)
204 log
.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname
, groupname
, subvolname
,\
205 current_state
, next_state
))
206 set_clone_state(volume_client
, volname
, groupname
, subvolname
, next_state
)
207 current_state
= next_state
208 except VolumeException
as ve
:
209 log
.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname
, groupname
,\
210 subvolname
, current_state
, ve
))
212 def clone(volume_client
, volname
, index
, clone_path
, state_table
, should_cancel
):
213 log
.info("cloning to subvolume path: {0}".format(clone_path
))
214 resolved
= resolve(volume_client
.volspec
, clone_path
)
216 groupname
= resolved
[0]
217 subvolname
= resolved
[1]
218 log
.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname
, subvolname
))
221 log
.info("starting clone: ({0}, {1}, {2})".format(volname
, groupname
, subvolname
))
222 start_clone_sm(volume_client
, volname
, index
, groupname
, subvolname
, state_table
, should_cancel
)
223 log
.info("finished clone: ({0}, {1}, {2})".format(volname
, groupname
, subvolname
))
224 except VolumeException
as ve
:
225 log
.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname
, groupname
, subvolname
, ve
))
227 class Cloner(AsyncJobs
):
229 Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
230 this relies on a simple state machine (which mimics states from OpSm class) as
231 the driver. file types supported are directories, symbolic links and regular files.
233 def __init__(self
, volume_client
, tp_size
):
234 self
.vc
= volume_client
236 'pending' : handle_clone_pending
,
237 'in-progress' : handle_clone_in_progress
,
238 'complete' : handle_clone_complete
,
239 'failed' : handle_clone_failed
,
240 'canceled' : handle_clone_failed
,
242 super(Cloner
, self
).__init
__(volume_client
, "cloner", tp_size
)
244 def is_clone_cancelable(self
, clone_state
):
245 return not (OpSm
.is_final_state(clone_state
) or OpSm
.is_failed_state(clone_state
))
247 def get_clone_tracking_index(self
, fs_handle
, clone_subvolume
):
248 with
open_clone_index(fs_handle
, self
.vc
.volspec
) as index
:
249 return index
.find_clone_entry_index(clone_subvolume
.base_path
)
251 def _cancel_pending_clone(self
, fs_handle
, clone_subvolume
, status
, track_idx
):
252 clone_state
= status
['state']
253 assert self
.is_clone_cancelable(clone_state
)
255 s_groupname
= status
['source'].get('group', None)
256 s_subvolname
= status
['source']['subvolume']
257 s_snapname
= status
['source']['snapshot']
259 with
open_group(fs_handle
, self
.vc
.volspec
, s_groupname
) as s_group
:
260 with
open_subvol(fs_handle
, self
.vc
.volspec
, s_group
, s_subvolname
) as s_subvolume
:
261 next_state
= OpSm
.get_next_state("clone", clone_state
, -errno
.EINTR
)
262 clone_subvolume
.state
= (next_state
, True)
263 s_subvolume
.detach_snapshot(s_snapname
, track_idx
.decode('utf-8'))
265 def cancel_job(self
, volname
, job
):
267 override base class `cancel_job`. interpret @job as (clone, group) tuple.
274 with
open_volume(self
.vc
, volname
) as fs_handle
:
275 with
open_group(fs_handle
, self
.vc
.volspec
, groupname
) as group
:
276 with
open_subvol(fs_handle
, self
.vc
.volspec
, group
, clonename
,
277 need_complete
=False, expected_types
=["clone"]) as clone_subvolume
:
278 status
= clone_subvolume
.status
279 clone_state
= status
['state']
280 if not self
.is_clone_cancelable(clone_state
):
281 raise VolumeException(-errno
.EINVAL
, "cannot cancel -- clone finished (check clone status)")
282 track_idx
= self
.get_clone_tracking_index(fs_handle
, clone_subvolume
)
284 log
.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume
.base_path
))
285 raise VolumeException(-errno
.EINVAL
, "error canceling clone")
286 if OpSm
.is_init_state("clone", clone_state
):
287 # clone has not started yet -- cancel right away.
288 self
._cancel
_pending
_clone
(fs_handle
, clone_subvolume
, status
, track_idx
)
290 # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
291 # to persist the new state, async cloner accesses the volume in exclusive mode.
292 # accessing the volume in exclusive mode here would lead to deadlock.
293 assert track_idx
is not None
295 with
open_volume_lockless(self
.vc
, volname
) as fs_handle
:
296 with
open_group(fs_handle
, self
.vc
.volspec
, groupname
) as group
:
297 with
open_subvol(fs_handle
, self
.vc
.volspec
, group
, clonename
,
298 need_complete
=False, expected_types
=["clone"]) as clone_subvolume
:
299 if not self
._cancel
_job
(volname
, (track_idx
, clone_subvolume
.base_path
)):
300 raise VolumeException(-errno
.EINVAL
, "cannot cancel -- clone finished (check clone status)")
301 except (IndexException
, MetadataMgrException
) as e
:
302 log
.error("error cancelling clone {0}: ({1})".format(job
, e
))
303 raise VolumeException(-errno
.EINVAL
, "error canceling clone")
305 def get_next_job(self
, volname
, running_jobs
):
306 return get_next_clone_entry(self
.vc
, volname
, running_jobs
)
308 def execute_job(self
, volname
, job
, should_cancel
):
309 clone(self
.vc
, volname
, job
[0].decode('utf-8'), job
[1].decode('utf-8'), self
.state_table
, should_cancel
)