1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/io/CopyupRequest.h"
5 #include "common/ceph_context.h"
6 #include "common/dout.h"
7 #include "common/errno.h"
8 #include "common/Mutex.h"
10 #include "librbd/AsyncObjectThrottle.h"
11 #include "librbd/ExclusiveLock.h"
12 #include "librbd/ImageCtx.h"
13 #include "librbd/ObjectMap.h"
14 #include "librbd/Utils.h"
15 #include "librbd/io/AioCompletion.h"
16 #include "librbd/io/ImageRequest.h"
17 #include "librbd/io/ObjectRequest.h"
18 #include "librbd/io/ReadResult.h"
20 #include <boost/bind.hpp>
21 #include <boost/lambda/bind.hpp>
22 #include <boost/lambda/construct.hpp>
24 #define dout_subsys ceph_subsys_rbd
26 #define dout_prefix *_dout << "librbd::io::CopyupRequest: " << this \
27 << " " << __func__ << ": "
34 class UpdateObjectMap
: public C_AsyncObjectThrottle
<> {
36 UpdateObjectMap(AsyncObjectThrottle
<> &throttle
, ImageCtx
*image_ctx
,
37 uint64_t object_no
, const std::vector
<uint64_t> *snap_ids
,
38 const ZTracer::Trace
&trace
, size_t snap_id_idx
)
39 : C_AsyncObjectThrottle(throttle
, *image_ctx
), m_object_no(object_no
),
40 m_snap_ids(*snap_ids
), m_trace(trace
), m_snap_id_idx(snap_id_idx
)
45 uint64_t snap_id
= m_snap_ids
[m_snap_id_idx
];
46 if (snap_id
== CEPH_NOSNAP
) {
47 RWLock::RLocker
snap_locker(m_image_ctx
.snap_lock
);
48 RWLock::WLocker
object_map_locker(m_image_ctx
.object_map_lock
);
49 assert(m_image_ctx
.exclusive_lock
->is_lock_owner());
50 assert(m_image_ctx
.object_map
!= nullptr);
51 bool sent
= m_image_ctx
.object_map
->aio_update
<Context
>(
52 CEPH_NOSNAP
, m_object_no
, OBJECT_EXISTS
, {}, m_trace
, this);
53 return (sent
? 0 : 1);
56 uint8_t state
= OBJECT_EXISTS
;
57 if (m_image_ctx
.test_features(RBD_FEATURE_FAST_DIFF
) &&
58 m_snap_id_idx
+ 1 < m_snap_ids
.size()) {
59 state
= OBJECT_EXISTS_CLEAN
;
62 RWLock::RLocker
snap_locker(m_image_ctx
.snap_lock
);
63 RWLock::RLocker
object_map_locker(m_image_ctx
.object_map_lock
);
64 if (m_image_ctx
.object_map
== nullptr) {
68 bool sent
= m_image_ctx
.object_map
->aio_update
<Context
>(
69 snap_id
, m_object_no
, state
, {}, m_trace
, this);
76 const std::vector
<uint64_t> &m_snap_ids
;
77 const ZTracer::Trace
&m_trace
;
81 } // anonymous namespace
84 CopyupRequest::CopyupRequest(ImageCtx
*ictx
, const std::string
&oid
,
85 uint64_t objectno
, Extents
&&image_extents
,
86 const ZTracer::Trace
&parent_trace
)
87 : m_ictx(ictx
), m_oid(oid
), m_object_no(objectno
),
88 m_image_extents(image_extents
),
89 m_trace(util::create_trace(*m_ictx
, "copy-up", parent_trace
)),
90 m_state(STATE_READ_FROM_PARENT
)
92 m_async_op
.start_op(*m_ictx
);
95 CopyupRequest::~CopyupRequest() {
96 assert(m_pending_requests
.empty());
97 m_async_op
.finish_op();
100 void CopyupRequest::append_request(ObjectRequest
<> *req
) {
101 ldout(m_ictx
->cct
, 20) << req
<< dendl
;
102 m_pending_requests
.push_back(req
);
105 void CopyupRequest::complete_requests(int r
) {
106 while (!m_pending_requests
.empty()) {
107 vector
<ObjectRequest
<> *>::iterator it
= m_pending_requests
.begin();
108 ObjectRequest
<> *req
= *it
;
109 ldout(m_ictx
->cct
, 20) << "completing request " << req
<< dendl
;
111 m_pending_requests
.erase(it
);
115 bool CopyupRequest::send_copyup() {
116 bool add_copyup_op
= !m_copyup_data
.is_zero();
117 bool copy_on_read
= m_pending_requests
.empty();
118 if (!add_copyup_op
&& copy_on_read
) {
119 // copyup empty object to prevent future CoR attempts
120 m_copyup_data
.clear();
121 add_copyup_op
= true;
124 ldout(m_ictx
->cct
, 20) << "oid " << m_oid
<< dendl
;
125 m_state
= STATE_COPYUP
;
127 m_ictx
->snap_lock
.get_read();
128 ::SnapContext snapc
= m_ictx
->snapc
;
129 m_ictx
->snap_lock
.put_read();
131 std::vector
<librados::snap_t
> snaps
;
138 if (copy_on_read
|| (!snapc
.snaps
.empty() && add_copyup_op
)) {
139 assert(add_copyup_op
);
140 add_copyup_op
= false;
142 librados::ObjectWriteOperation copyup_op
;
143 copyup_op
.exec("rbd", "copyup", m_copyup_data
);
145 // send only the copyup request with a blank snapshot context so that
146 // all snapshots are detected from the parent for this object. If
147 // this is a CoW request, a second request will be created for the
148 // actual modification.
151 ldout(m_ictx
->cct
, 20) << "copyup with empty snapshot context" << dendl
;
152 librados::AioCompletion
*comp
= util::create_rados_callback(this);
154 librados::Rados
rados(m_ictx
->data_ctx
);
155 r
= rados
.ioctx_create2(m_ictx
->data_ctx
.get_id(), m_data_ctx
);
158 r
= m_data_ctx
.aio_operate(
159 m_oid
, comp
, ©up_op
, 0, snaps
,
160 (m_trace
.valid() ? m_trace
.get_info() : nullptr));
166 librados::ObjectWriteOperation write_op
;
168 // CoW did not need to handle existing snapshots
169 write_op
.exec("rbd", "copyup", m_copyup_data
);
172 // merge all pending write ops into this single RADOS op
173 for (size_t i
=0; i
<m_pending_requests
.size(); ++i
) {
174 ObjectRequest
<> *req
= m_pending_requests
[i
];
175 ldout(m_ictx
->cct
, 20) << "add_copyup_ops " << req
<< dendl
;
176 bool set_hints
= (i
== 0);
177 req
->add_copyup_ops(&write_op
, set_hints
);
179 assert(write_op
.size() != 0);
181 snaps
.insert(snaps
.end(), snapc
.snaps
.begin(), snapc
.snaps
.end());
182 librados::AioCompletion
*comp
= util::create_rados_callback(this);
183 r
= m_ictx
->data_ctx
.aio_operate(
184 m_oid
, comp
, &write_op
, snapc
.seq
, snaps
,
185 (m_trace
.valid() ? m_trace
.get_info() : nullptr));
192 bool CopyupRequest::is_copyup_required() {
194 for (const ObjectRequest
<> *req
: m_pending_requests
) {
195 if (!req
->is_op_payload_empty()) {
201 return (m_copyup_data
.is_zero() && noop
);
204 void CopyupRequest::send()
206 m_state
= STATE_READ_FROM_PARENT
;
207 AioCompletion
*comp
= AioCompletion::create_and_start(
208 this, m_ictx
, AIO_TYPE_READ
);
210 ldout(m_ictx
->cct
, 20) << "completion " << comp
212 << ", extents " << m_image_extents
214 ImageRequest
<>::aio_read(m_ictx
->parent
, comp
, std::move(m_image_extents
),
215 ReadResult
{&m_copyup_data
}, 0, m_trace
);
218 void CopyupRequest::complete(int r
)
220 if (should_complete(r
)) {
221 complete_requests(r
);
226 bool CopyupRequest::should_complete(int r
)
228 CephContext
*cct
= m_ictx
->cct
;
229 ldout(cct
, 20) << "oid " << m_oid
230 << ", r " << r
<< dendl
;
232 uint64_t pending_copyups
;
234 case STATE_READ_FROM_PARENT
:
235 ldout(cct
, 20) << "READ_FROM_PARENT" << dendl
;
237 if (r
>= 0 || r
== -ENOENT
) {
238 if (is_copyup_required()) {
239 ldout(cct
, 20) << "nop, skipping" << dendl
;
243 return send_object_map_head();
247 case STATE_OBJECT_MAP_HEAD
:
248 ldout(cct
, 20) << "OBJECT_MAP_HEAD" << dendl
;
250 return send_object_map();
252 case STATE_OBJECT_MAP
:
253 ldout(cct
, 20) << "OBJECT_MAP" << dendl
;
255 return send_copyup();
258 // invoked via a finisher in librados, so thread safe
259 pending_copyups
= --m_pending_copyups
;
260 ldout(cct
, 20) << "COPYUP (" << pending_copyups
<< " pending)"
263 // hide the -ENOENT error if this is the last op
264 if (pending_copyups
== 0) {
265 complete_requests(0);
268 complete_requests(r
);
270 return (pending_copyups
== 0);
273 lderr(cct
) << "invalid state: " << m_state
<< dendl
;
280 void CopyupRequest::remove_from_list()
282 Mutex::Locker
l(m_ictx
->copyup_list_lock
);
284 map
<uint64_t, CopyupRequest
*>::iterator it
=
285 m_ictx
->copyup_list
.find(m_object_no
);
286 assert(it
!= m_ictx
->copyup_list
.end());
287 m_ictx
->copyup_list
.erase(it
);
290 bool CopyupRequest::send_object_map_head() {
291 CephContext
*cct
= m_ictx
->cct
;
292 ldout(cct
, 20) << dendl
;
294 m_state
= STATE_OBJECT_MAP_HEAD
;
297 RWLock::RLocker
owner_locker(m_ictx
->owner_lock
);
298 RWLock::RLocker
snap_locker(m_ictx
->snap_lock
);
299 if (m_ictx
->object_map
!= nullptr) {
300 bool copy_on_read
= m_pending_requests
.empty();
301 assert(m_ictx
->exclusive_lock
->is_lock_owner());
303 RWLock::WLocker
object_map_locker(m_ictx
->object_map_lock
);
304 if (!m_ictx
->snaps
.empty()) {
305 m_snap_ids
.insert(m_snap_ids
.end(), m_ictx
->snaps
.begin(),
306 m_ictx
->snaps
.end());
309 (*m_ictx
->object_map
)[m_object_no
] != OBJECT_EXISTS
) {
310 m_snap_ids
.insert(m_snap_ids
.begin(), CEPH_NOSNAP
);
311 object_map_locker
.unlock();
312 snap_locker
.unlock();
313 owner_locker
.unlock();
314 return send_object_map();
317 bool may_update
= false;
318 uint8_t new_state
, current_state
;
320 vector
<ObjectRequest
<> *>::reverse_iterator r_it
= m_pending_requests
.rbegin();
321 for (; r_it
!= m_pending_requests
.rend(); ++r_it
) {
322 ObjectRequest
<> *req
= *r_it
;
323 if (!req
->pre_object_map_update(&new_state
)) {
327 current_state
= (*m_ictx
->object_map
)[m_object_no
];
328 ldout(cct
, 20) << req
->get_op_type() << " object no "
329 << m_object_no
<< " current state "
330 << stringify(static_cast<uint32_t>(current_state
))
331 << " new state " << stringify(static_cast<uint32_t>(new_state
))
337 if (may_update
&& (new_state
!= current_state
) &&
338 m_ictx
->object_map
->aio_update
<CopyupRequest
>(
339 CEPH_NOSNAP
, m_object_no
, new_state
, current_state
, m_trace
,
346 return send_object_map();
349 bool CopyupRequest::send_object_map() {
350 // avoid possible recursive lock attempts
351 if (m_snap_ids
.empty()) {
352 // no object map update required
353 return send_copyup();
355 // update object maps for HEAD and all existing snapshots
356 ldout(m_ictx
->cct
, 20) << "oid " << m_oid
<< dendl
;
357 m_state
= STATE_OBJECT_MAP
;
359 RWLock::RLocker
owner_locker(m_ictx
->owner_lock
);
360 AsyncObjectThrottle
<>::ContextFactory
context_factory(
361 boost::lambda::bind(boost::lambda::new_ptr
<UpdateObjectMap
>(),
362 boost::lambda::_1
, m_ictx
, m_object_no
, &m_snap_ids
, m_trace
,
364 AsyncObjectThrottle
<> *throttle
= new AsyncObjectThrottle
<>(
365 NULL
, *m_ictx
, context_factory
, util::create_context_callback(this),
366 NULL
, 0, m_snap_ids
.size());
367 throttle
->start_ops(m_ictx
->concurrent_management_ops
);
373 } // namespace librbd