]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/async_cloner.py
810397031f52b22186b5e9bafe11b5350d5765c1
6 from contextlib
import contextmanager
7 from typing
import Optional
10 from mgr_util
import lock_timeout_log
12 from .async_job
import AsyncJobs
13 from .exception
import IndexException
, MetadataMgrException
, OpSmException
, VolumeException
14 from .fs_util
import copy_file
15 from .operations
.versions
.op_sm
import SubvolumeOpSm
16 from .operations
.versions
.subvolume_attrs
import SubvolumeTypes
, SubvolumeStates
, SubvolumeActions
17 from .operations
.resolver
import resolve
18 from .operations
.volume
import open_volume
, open_volume_lockless
19 from .operations
.group
import open_group
20 from .operations
.subvolume
import open_subvol
21 from .operations
.clone_index
import open_clone_index
22 from .operations
.template
import SubvolumeOpType
24 log
= logging
.getLogger(__name__
)
26 # helper for fetching a clone entry for a given volume
27 def get_next_clone_entry(fs_client
, volspec
, volname
, running_jobs
):
28 log
.debug("fetching clone entry for volume '{0}'".format(volname
))
31 with
open_volume_lockless(fs_client
, volname
) as fs_handle
:
33 with
open_clone_index(fs_handle
, volspec
) as clone_index
:
34 job
= clone_index
.get_oldest_clone_entry(running_jobs
)
36 except IndexException
as ve
:
37 if ve
.errno
== -errno
.ENOENT
:
40 except VolumeException
as ve
:
41 log
.error("error fetching clone entry for volume '{0}' ({1})".format(volname
, ve
))
45 def open_at_volume(fs_client
, volspec
, volname
, groupname
, subvolname
, op_type
):
46 with
open_volume(fs_client
, volname
) as fs_handle
:
47 with
open_group(fs_handle
, volspec
, groupname
) as group
:
48 with
open_subvol(fs_client
.mgr
, fs_handle
, volspec
, group
, subvolname
, op_type
) as subvolume
:
52 def open_at_group(fs_client
, fs_handle
, volspec
, groupname
, subvolname
, op_type
):
53 with
open_group(fs_handle
, volspec
, groupname
) as group
:
54 with
open_subvol(fs_client
.mgr
, fs_handle
, volspec
, group
, subvolname
, op_type
) as subvolume
:
58 def open_at_group_unique(fs_client
, fs_handle
, volspec
, s_groupname
, s_subvolname
, c_subvolume
, c_groupname
, c_subvolname
, op_type
):
59 # if a snapshot of a retained subvolume is being cloned to recreate the same subvolume, return
60 # the clone subvolume as the source subvolume
61 if s_groupname
== c_groupname
and s_subvolname
== c_subvolname
:
64 with
open_at_group(fs_client
, fs_handle
, volspec
, s_groupname
, s_subvolname
, op_type
) as s_subvolume
:
69 def open_clone_subvolume_pair(fs_client
, fs_handle
, volspec
, volname
, groupname
, subvolname
):
70 with
open_at_group(fs_client
, fs_handle
, volspec
, groupname
, subvolname
, SubvolumeOpType
.CLONE_INTERNAL
) as clone_subvolume
:
71 s_volname
, s_groupname
, s_subvolname
, s_snapname
= get_clone_source(clone_subvolume
)
72 if groupname
== s_groupname
and subvolname
== s_subvolname
:
73 # use the same subvolume to avoid metadata overwrites
74 yield (clone_subvolume
, clone_subvolume
, s_snapname
)
76 with
open_at_group(fs_client
, fs_handle
, volspec
, s_groupname
, s_subvolname
, SubvolumeOpType
.CLONE_SOURCE
) as source_subvolume
:
77 yield (clone_subvolume
, source_subvolume
, s_snapname
)
79 def get_clone_state(fs_client
, volspec
, volname
, groupname
, subvolname
):
80 with
open_at_volume(fs_client
, volspec
, volname
, groupname
, subvolname
, SubvolumeOpType
.CLONE_INTERNAL
) as subvolume
:
81 return subvolume
.state
83 def set_clone_state(fs_client
, volspec
, volname
, groupname
, subvolname
, state
):
84 with
open_at_volume(fs_client
, volspec
, volname
, groupname
, subvolname
, SubvolumeOpType
.CLONE_INTERNAL
) as subvolume
:
85 subvolume
.state
= (state
, True)
87 def get_clone_source(clone_subvolume
):
88 source
= clone_subvolume
._get
_clone
_source
()
89 return (source
['volume'], source
.get('group', None), source
['subvolume'], source
['snapshot'])
91 def get_next_state_on_error(errnum
):
92 if errnum
== -errno
.EINTR
:
93 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
94 SubvolumeStates
.STATE_INPROGRESS
,
95 SubvolumeActions
.ACTION_CANCELLED
)
97 # jump to failed state, on all other errors
98 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
99 SubvolumeStates
.STATE_INPROGRESS
,
100 SubvolumeActions
.ACTION_FAILED
)
103 def handle_clone_pending(fs_client
, volspec
, volname
, index
, groupname
, subvolname
, should_cancel
):
106 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
107 SubvolumeStates
.STATE_PENDING
,
108 SubvolumeActions
.ACTION_CANCELLED
)
110 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
111 SubvolumeStates
.STATE_PENDING
,
112 SubvolumeActions
.ACTION_SUCCESS
)
113 except OpSmException
as oe
:
114 raise VolumeException(oe
.errno
, oe
.error_str
)
115 return (next_state
, False)
117 def sync_attrs(fs_handle
, target_path
, source_statx
):
119 fs_handle
.lchown(target_path
, source_statx
["uid"], source_statx
["gid"])
120 fs_handle
.lutimes(target_path
, (time
.mktime(source_statx
["atime"].timetuple()),
121 time
.mktime(source_statx
["mtime"].timetuple())))
122 fs_handle
.lchmod(target_path
, source_statx
["mode"])
123 except cephfs
.Error
as e
:
124 log
.warning("error synchronizing attrs for {0} ({1})".format(target_path
, e
))
127 def bulk_copy(fs_handle
, source_path
, dst_path
, should_cancel
):
129 bulk copy data from source to destination -- only directories, symlinks
130 and regular files are synced.
132 log
.info("copying data from {0} to {1}".format(source_path
, dst_path
))
133 def cptree(src_root_path
, dst_root_path
):
134 log
.debug("cptree: {0} -> {1}".format(src_root_path
, dst_root_path
))
136 with fs_handle
.opendir(src_root_path
) as dir_handle
:
137 d
= fs_handle
.readdir(dir_handle
)
138 while d
and not should_cancel():
139 if d
.d_name
not in (b
".", b
".."):
140 log
.debug("d={0}".format(d
))
141 d_full_src
= os
.path
.join(src_root_path
, d
.d_name
)
142 d_full_dst
= os
.path
.join(dst_root_path
, d
.d_name
)
143 stx
= fs_handle
.statx(d_full_src
, cephfs
.CEPH_STATX_MODE |
144 cephfs
.CEPH_STATX_UID |
145 cephfs
.CEPH_STATX_GID |
146 cephfs
.CEPH_STATX_ATIME |
147 cephfs
.CEPH_STATX_MTIME |
148 cephfs
.CEPH_STATX_SIZE
,
149 cephfs
.AT_SYMLINK_NOFOLLOW
)
151 mo
= stx
["mode"] & ~stat
.S_IFMT(stx
["mode"])
152 if stat
.S_ISDIR(stx
["mode"]):
153 log
.debug("cptree: (DIR) {0}".format(d_full_src
))
155 fs_handle
.mkdir(d_full_dst
, mo
)
156 except cephfs
.Error
as e
:
157 if not e
.args
[0] == errno
.EEXIST
:
159 cptree(d_full_src
, d_full_dst
)
160 elif stat
.S_ISLNK(stx
["mode"]):
161 log
.debug("cptree: (SYMLINK) {0}".format(d_full_src
))
162 target
= fs_handle
.readlink(d_full_src
, 4096)
164 fs_handle
.symlink(target
[:stx
["size"]], d_full_dst
)
165 except cephfs
.Error
as e
:
166 if not e
.args
[0] == errno
.EEXIST
:
168 elif stat
.S_ISREG(stx
["mode"]):
169 log
.debug("cptree: (REG) {0}".format(d_full_src
))
170 copy_file(fs_handle
, d_full_src
, d_full_dst
, mo
, cancel_check
=should_cancel
)
173 log
.warning("cptree: (IGNORE) {0}".format(d_full_src
))
175 sync_attrs(fs_handle
, d_full_dst
, stx
)
176 d
= fs_handle
.readdir(dir_handle
)
177 stx_root
= fs_handle
.statx(src_root_path
, cephfs
.CEPH_STATX_ATIME |
178 cephfs
.CEPH_STATX_MTIME
,
179 cephfs
.AT_SYMLINK_NOFOLLOW
)
180 fs_handle
.lutimes(dst_root_path
, (time
.mktime(stx_root
["atime"].timetuple()),
181 time
.mktime(stx_root
["mtime"].timetuple())))
182 except cephfs
.Error
as e
:
183 if not e
.args
[0] == errno
.ENOENT
:
184 raise VolumeException(-e
.args
[0], e
.args
[1])
185 cptree(source_path
, dst_path
)
187 raise VolumeException(-errno
.EINTR
, "clone operation interrupted")
189 def set_quota_on_clone(fs_handle
, clone_volumes_pair
):
190 src_path
= clone_volumes_pair
[1].snapshot_data_path(clone_volumes_pair
[2])
191 dst_path
= clone_volumes_pair
[0].path
192 quota
= None # type: Optional[int]
194 quota
= int(fs_handle
.getxattr(src_path
, 'ceph.quota.max_bytes').decode('utf-8'))
195 except cephfs
.NoData
:
198 if quota
is not None:
200 fs_handle
.setxattr(dst_path
, 'ceph.quota.max_bytes', str(quota
).encode('utf-8'), 0)
201 except cephfs
.InvalidValue
:
202 raise VolumeException(-errno
.EINVAL
, "invalid size specified: '{0}'".format(quota
))
203 except cephfs
.Error
as e
:
204 raise VolumeException(-e
.args
[0], e
.args
[1])
206 quota_files
= None # type: Optional[int]
208 quota_files
= int(fs_handle
.getxattr(src_path
, 'ceph.quota.max_files').decode('utf-8'))
209 except cephfs
.NoData
:
212 if quota_files
is not None:
214 fs_handle
.setxattr(dst_path
, 'ceph.quota.max_files', str(quota_files
).encode('utf-8'), 0)
215 except cephfs
.InvalidValue
:
216 raise VolumeException(-errno
.EINVAL
, "invalid file count specified: '{0}'".format(quota_files
))
217 except cephfs
.Error
as e
:
218 raise VolumeException(-e
.args
[0], e
.args
[1])
220 def do_clone(fs_client
, volspec
, volname
, groupname
, subvolname
, should_cancel
):
221 with
open_volume_lockless(fs_client
, volname
) as fs_handle
:
222 with
open_clone_subvolume_pair(fs_client
, fs_handle
, volspec
, volname
, groupname
, subvolname
) as clone_volumes
:
223 src_path
= clone_volumes
[1].snapshot_data_path(clone_volumes
[2])
224 dst_path
= clone_volumes
[0].path
225 bulk_copy(fs_handle
, src_path
, dst_path
, should_cancel
)
226 set_quota_on_clone(fs_handle
, clone_volumes
)
228 def log_clone_failure(volname
, groupname
, subvolname
, ve
):
229 if ve
.errno
== -errno
.EINTR
:
230 log
.info("Clone cancelled: ({0}, {1}, {2})".format(volname
, groupname
, subvolname
))
231 elif ve
.errno
== -errno
.EDQUOT
:
232 log
.error("Clone failed: ({0}, {1}, {2}, reason -> Disk quota exceeded)".format(volname
, groupname
, subvolname
))
234 log
.error("Clone failed: ({0}, {1}, {2}, reason -> {3})".format(volname
, groupname
, subvolname
, ve
))
236 def handle_clone_in_progress(fs_client
, volspec
, volname
, index
, groupname
, subvolname
, should_cancel
):
238 do_clone(fs_client
, volspec
, volname
, groupname
, subvolname
, should_cancel
)
239 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
240 SubvolumeStates
.STATE_INPROGRESS
,
241 SubvolumeActions
.ACTION_SUCCESS
)
242 except VolumeException
as ve
:
243 log_clone_failure(volname
, groupname
, subvolname
, ve
)
244 next_state
= get_next_state_on_error(ve
.errno
)
245 except OpSmException
as oe
:
246 raise VolumeException(oe
.errno
, oe
.error_str
)
247 return (next_state
, False)
249 def handle_clone_failed(fs_client
, volspec
, volname
, index
, groupname
, subvolname
, should_cancel
):
251 with
open_volume(fs_client
, volname
) as fs_handle
:
252 # detach source but leave the clone section intact for later inspection
253 with
open_clone_subvolume_pair(fs_client
, fs_handle
, volspec
, volname
, groupname
, subvolname
) as clone_volumes
:
254 clone_volumes
[1].detach_snapshot(clone_volumes
[2], index
)
255 except (MetadataMgrException
, VolumeException
) as e
:
256 log
.error("failed to detach clone from snapshot: {0}".format(e
))
259 def handle_clone_complete(fs_client
, volspec
, volname
, index
, groupname
, subvolname
, should_cancel
):
261 with
open_volume(fs_client
, volname
) as fs_handle
:
262 with
open_clone_subvolume_pair(fs_client
, fs_handle
, volspec
, volname
, groupname
, subvolname
) as clone_volumes
:
263 clone_volumes
[1].detach_snapshot(clone_volumes
[2], index
)
264 clone_volumes
[0].remove_clone_source(flush
=True)
265 except (MetadataMgrException
, VolumeException
) as e
:
266 log
.error("failed to detach clone from snapshot: {0}".format(e
))
269 def start_clone_sm(fs_client
, volspec
, volname
, index
, groupname
, subvolname
, state_table
, should_cancel
, snapshot_clone_delay
):
273 current_state
= get_clone_state(fs_client
, volspec
, volname
, groupname
, subvolname
)
274 log
.debug("cloning ({0}, {1}, {2}) -- starting state \"{3}\"".format(volname
, groupname
, subvolname
, current_state
))
275 if current_state
== SubvolumeStates
.STATE_PENDING
:
276 time
.sleep(snapshot_clone_delay
)
277 log
.info("Delayed cloning ({0}, {1}, {2}) -- by {3} seconds".format(volname
, groupname
, subvolname
, snapshot_clone_delay
))
279 handler
= state_table
.get(current_state
, None)
281 raise VolumeException(-errno
.EINVAL
, "invalid clone state: \"{0}\"".format(current_state
))
282 (next_state
, finished
) = handler(fs_client
, volspec
, volname
, index
, groupname
, subvolname
, should_cancel
)
284 log
.debug("({0}, {1}, {2}) transition state [\"{3}\" => \"{4}\"]".format(volname
, groupname
, subvolname
,\
285 current_state
, next_state
))
286 set_clone_state(fs_client
, volspec
, volname
, groupname
, subvolname
, next_state
)
287 current_state
= next_state
288 except VolumeException
as ve
:
289 log
.error("clone failed for ({0}, {1}, {2}) (current_state: {3}, reason: {4})".format(volname
, groupname
,\
290 subvolname
, current_state
, ve
))
292 def clone(fs_client
, volspec
, volname
, index
, clone_path
, state_table
, should_cancel
, snapshot_clone_delay
):
293 log
.info("cloning to subvolume path: {0}".format(clone_path
))
294 resolved
= resolve(volspec
, clone_path
)
296 groupname
= resolved
[0]
297 subvolname
= resolved
[1]
298 log
.debug("resolved to [group: {0}, subvolume: {1}]".format(groupname
, subvolname
))
301 log
.info("starting clone: ({0}, {1}, {2})".format(volname
, groupname
, subvolname
))
302 start_clone_sm(fs_client
, volspec
, volname
, index
, groupname
, subvolname
, state_table
, should_cancel
, snapshot_clone_delay
)
303 log
.info("finished clone: ({0}, {1}, {2})".format(volname
, groupname
, subvolname
))
304 except VolumeException
as ve
:
305 log
.error("clone failed for ({0}, {1}, {2}), reason: {3}".format(volname
, groupname
, subvolname
, ve
))
307 class Cloner(AsyncJobs
):
309 Asynchronous cloner: pool of threads to copy data from a snapshot to a subvolume.
310 this relies on a simple state machine (which mimics states from SubvolumeOpSm class) as
311 the driver. file types supported are directories, symbolic links and regular files.
313 def __init__(self
, volume_client
, tp_size
, snapshot_clone_delay
):
314 self
.vc
= volume_client
315 self
.snapshot_clone_delay
= snapshot_clone_delay
317 SubvolumeStates
.STATE_PENDING
: handle_clone_pending
,
318 SubvolumeStates
.STATE_INPROGRESS
: handle_clone_in_progress
,
319 SubvolumeStates
.STATE_COMPLETE
: handle_clone_complete
,
320 SubvolumeStates
.STATE_FAILED
: handle_clone_failed
,
321 SubvolumeStates
.STATE_CANCELED
: handle_clone_failed
,
323 super(Cloner
, self
).__init
__(volume_client
, "cloner", tp_size
)
325 def reconfigure_max_concurrent_clones(self
, tp_size
):
326 return super(Cloner
, self
).reconfigure_max_async_threads(tp_size
)
328 def reconfigure_snapshot_clone_delay(self
, timeout
):
329 self
.snapshot_clone_delay
= timeout
331 def is_clone_cancelable(self
, clone_state
):
332 return not (SubvolumeOpSm
.is_complete_state(clone_state
) or SubvolumeOpSm
.is_failed_state(clone_state
))
334 def get_clone_tracking_index(self
, fs_handle
, clone_subvolume
):
335 with
open_clone_index(fs_handle
, self
.vc
.volspec
) as index
:
336 return index
.find_clone_entry_index(clone_subvolume
.base_path
)
338 def _cancel_pending_clone(self
, fs_handle
, clone_subvolume
, clone_subvolname
, clone_groupname
, status
, track_idx
):
339 clone_state
= SubvolumeStates
.from_value(status
['state'])
340 assert self
.is_clone_cancelable(clone_state
)
342 s_groupname
= status
['source'].get('group', None)
343 s_subvolname
= status
['source']['subvolume']
344 s_snapname
= status
['source']['snapshot']
346 with
open_at_group_unique(self
.fs_client
, fs_handle
, self
.vc
.volspec
, s_groupname
, s_subvolname
, clone_subvolume
,
347 clone_groupname
, clone_subvolname
, SubvolumeOpType
.CLONE_SOURCE
) as s_subvolume
:
348 next_state
= SubvolumeOpSm
.transition(SubvolumeTypes
.TYPE_CLONE
,
350 SubvolumeActions
.ACTION_CANCELLED
)
351 clone_subvolume
.state
= (next_state
, True)
352 s_subvolume
.detach_snapshot(s_snapname
, track_idx
.decode('utf-8'))
354 def cancel_job(self
, volname
, job
):
356 override base class `cancel_job`. interpret @job as (clone, group) tuple.
363 with
open_volume(self
.fs_client
, volname
) as fs_handle
:
364 with
open_group(fs_handle
, self
.vc
.volspec
, groupname
) as group
:
365 with
open_subvol(self
.fs_client
.mgr
, fs_handle
, self
.vc
.volspec
, group
, clonename
, SubvolumeOpType
.CLONE_CANCEL
) as clone_subvolume
:
366 status
= clone_subvolume
.status
367 clone_state
= SubvolumeStates
.from_value(status
['state'])
368 if not self
.is_clone_cancelable(clone_state
):
369 raise VolumeException(-errno
.EINVAL
, "cannot cancel -- clone finished (check clone status)")
370 track_idx
= self
.get_clone_tracking_index(fs_handle
, clone_subvolume
)
372 log
.warning("cannot lookup clone tracking index for {0}".format(clone_subvolume
.base_path
))
373 raise VolumeException(-errno
.EINVAL
, "error canceling clone")
374 clone_job
= (track_idx
, clone_subvolume
.base_path
)
375 jobs
= [j
[0] for j
in self
.jobs
[volname
]]
376 with
lock_timeout_log(self
.lock
):
377 if SubvolumeOpSm
.is_init_state(SubvolumeTypes
.TYPE_CLONE
, clone_state
) and not clone_job
in jobs
:
378 logging
.debug("Cancelling pending job {0}".format(clone_job
))
379 # clone has not started yet -- cancel right away.
380 self
._cancel
_pending
_clone
(fs_handle
, clone_subvolume
, clonename
, groupname
, status
, track_idx
)
382 # cancelling an on-going clone would persist "canceled" state in subvolume metadata.
383 # to persist the new state, async cloner accesses the volume in exclusive mode.
384 # accessing the volume in exclusive mode here would lead to deadlock.
385 assert track_idx
is not None
386 with
lock_timeout_log(self
.lock
):
387 with
open_volume_lockless(self
.fs_client
, volname
) as fs_handle
:
388 with
open_group(fs_handle
, self
.vc
.volspec
, groupname
) as group
:
389 with
open_subvol(self
.fs_client
.mgr
, fs_handle
, self
.vc
.volspec
, group
, clonename
, SubvolumeOpType
.CLONE_CANCEL
) as clone_subvolume
:
390 if not self
._cancel
_job
(volname
, (track_idx
, clone_subvolume
.base_path
)):
391 raise VolumeException(-errno
.EINVAL
, "cannot cancel -- clone finished (check clone status)")
392 except (IndexException
, MetadataMgrException
) as e
:
393 log
.error("error cancelling clone {0}: ({1})".format(job
, e
))
394 raise VolumeException(-errno
.EINVAL
, "error canceling clone")
396 def get_next_job(self
, volname
, running_jobs
):
397 return get_next_clone_entry(self
.fs_client
, self
.vc
.volspec
, volname
, running_jobs
)
399 def execute_job(self
, volname
, job
, should_cancel
):
400 clone(self
.fs_client
, self
.vc
.volspec
, volname
, job
[0].decode('utf-8'), job
[1].decode('utf-8'), self
.state_table
, should_cancel
, self
.snapshot_clone_delay
)