1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/io/ObjectRequest.h"
5 #include "common/ceph_context.h"
6 #include "common/dout.h"
7 #include "common/errno.h"
8 #include "common/ceph_mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/Context.h"
11 #include "include/err.h"
12 #include "osd/osd_types.h"
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/ObjectMap.h"
17 #include "librbd/Utils.h"
18 #include "librbd/io/AioCompletion.h"
19 #include "librbd/io/CopyupRequest.h"
20 #include "librbd/io/ImageRequest.h"
21 #include "librbd/io/ReadResult.h"
23 #include <boost/bind.hpp>
24 #include <boost/optional.hpp>
26 #define dout_subsys ceph_subsys_rbd
28 #define dout_prefix *_dout << "librbd::io::ObjectRequest: " << this \
29 << " " << __func__ << ": " \
30 << data_object_name(this->m_ictx, \
31 this->m_object_no) << " "
36 using librbd::util::data_object_name
;
41 inline bool is_copy_on_read(I
*ictx
, librados::snap_t snap_id
) {
42 std::shared_lock image_locker
{ictx
->image_lock
};
43 return (ictx
->clone_copy_on_read
&&
44 !ictx
->read_only
&& snap_id
== CEPH_NOSNAP
&&
45 (ictx
->exclusive_lock
== nullptr ||
46 ictx
->exclusive_lock
->is_lock_owner()));
49 } // anonymous namespace
53 ObjectRequest
<I
>::create_write(
54 I
*ictx
, uint64_t object_no
, uint64_t object_off
, ceph::bufferlist
&& data
,
55 const ::SnapContext
&snapc
, int op_flags
,
56 const ZTracer::Trace
&parent_trace
, Context
*completion
) {
57 return new ObjectWriteRequest
<I
>(ictx
, object_no
, object_off
,
58 std::move(data
), snapc
, op_flags
,
59 parent_trace
, completion
);
64 ObjectRequest
<I
>::create_discard(
65 I
*ictx
, uint64_t object_no
, uint64_t object_off
, uint64_t object_len
,
66 const ::SnapContext
&snapc
, int discard_flags
,
67 const ZTracer::Trace
&parent_trace
, Context
*completion
) {
68 return new ObjectDiscardRequest
<I
>(ictx
, object_no
, object_off
,
69 object_len
, snapc
, discard_flags
,
70 parent_trace
, completion
);
75 ObjectRequest
<I
>::create_write_same(
76 I
*ictx
, uint64_t object_no
, uint64_t object_off
, uint64_t object_len
,
77 ceph::bufferlist
&& data
, const ::SnapContext
&snapc
, int op_flags
,
78 const ZTracer::Trace
&parent_trace
, Context
*completion
) {
79 return new ObjectWriteSameRequest
<I
>(ictx
, object_no
, object_off
,
80 object_len
, std::move(data
), snapc
,
81 op_flags
, parent_trace
, completion
);
86 ObjectRequest
<I
>::create_compare_and_write(
87 I
*ictx
, uint64_t object_no
, uint64_t object_off
,
88 ceph::bufferlist
&& cmp_data
, ceph::bufferlist
&& write_data
,
89 const ::SnapContext
&snapc
, uint64_t *mismatch_offset
, int op_flags
,
90 const ZTracer::Trace
&parent_trace
, Context
*completion
) {
91 return new ObjectCompareAndWriteRequest
<I
>(ictx
, object_no
, object_off
,
93 std::move(write_data
), snapc
,
94 mismatch_offset
, op_flags
,
95 parent_trace
, completion
);
99 ObjectRequest
<I
>::ObjectRequest(
100 I
*ictx
, uint64_t objectno
, uint64_t off
, uint64_t len
,
101 librados::snap_t snap_id
, const char *trace_name
,
102 const ZTracer::Trace
&trace
, Context
*completion
)
103 : m_ictx(ictx
), m_object_no(objectno
), m_object_off(off
),
104 m_object_len(len
), m_snap_id(snap_id
), m_completion(completion
),
105 m_trace(util::create_trace(*ictx
, "", trace
)) {
106 ceph_assert(m_ictx
->data_ctx
.is_valid());
107 if (m_trace
.valid()) {
108 m_trace
.copy_name(trace_name
+ std::string(" ") +
109 data_object_name(ictx
, objectno
));
110 m_trace
.event("start");
114 template <typename I
>
115 void ObjectRequest
<I
>::add_write_hint(I
& image_ctx
,
116 librados::ObjectWriteOperation
*wr
) {
117 if (image_ctx
.enable_alloc_hint
) {
118 wr
->set_alloc_hint2(image_ctx
.get_object_size(),
119 image_ctx
.get_object_size(),
120 image_ctx
.alloc_hint_flags
);
121 } else if (image_ctx
.alloc_hint_flags
!= 0U) {
122 wr
->set_alloc_hint2(0, 0, image_ctx
.alloc_hint_flags
);
126 template <typename I
>
127 bool ObjectRequest
<I
>::compute_parent_extents(Extents
*parent_extents
,
129 ceph_assert(ceph_mutex_is_locked(m_ictx
->image_lock
));
131 m_has_parent
= false;
132 parent_extents
->clear();
134 uint64_t parent_overlap
;
135 int r
= m_ictx
->get_parent_overlap(m_snap_id
, &parent_overlap
);
137 // NOTE: it's possible for a snapshot to be deleted while we are
138 // still reading from it
139 lderr(m_ictx
->cct
) << "failed to retrieve parent overlap: "
140 << cpp_strerror(r
) << dendl
;
144 if (!read_request
&& !m_ictx
->migration_info
.empty()) {
145 parent_overlap
= m_ictx
->migration_info
.overlap
;
148 if (parent_overlap
== 0) {
152 Striper::extent_to_file(m_ictx
->cct
, &m_ictx
->layout
, m_object_no
, 0,
153 m_ictx
->layout
.object_size
, *parent_extents
);
154 uint64_t object_overlap
= m_ictx
->prune_parent_extents(*parent_extents
,
156 if (object_overlap
> 0) {
157 ldout(m_ictx
->cct
, 20) << "overlap " << parent_overlap
<< " "
158 << "extents " << *parent_extents
<< dendl
;
159 m_has_parent
= !parent_extents
->empty();
165 template <typename I
>
166 void ObjectRequest
<I
>::async_finish(int r
) {
167 ldout(m_ictx
->cct
, 20) << "r=" << r
<< dendl
;
168 m_ictx
->op_work_queue
->queue(util::create_context_callback
<
169 ObjectRequest
<I
>, &ObjectRequest
<I
>::finish
>(this), r
);
172 template <typename I
>
173 void ObjectRequest
<I
>::finish(int r
) {
174 ldout(m_ictx
->cct
, 20) << "r=" << r
<< dendl
;
175 m_completion
->complete(r
);
181 template <typename I
>
182 ObjectReadRequest
<I
>::ObjectReadRequest(
183 I
*ictx
, uint64_t objectno
, uint64_t offset
, uint64_t len
,
184 librados::snap_t snap_id
, int op_flags
, const ZTracer::Trace
&parent_trace
,
185 bufferlist
* read_data
, ExtentMap
* extent_map
, Context
*completion
)
186 : ObjectRequest
<I
>(ictx
, objectno
, offset
, len
, snap_id
, "read",
187 parent_trace
, completion
),
188 m_op_flags(op_flags
), m_read_data(read_data
), m_extent_map(extent_map
) {
191 template <typename I
>
192 void ObjectReadRequest
<I
>::send() {
193 I
*image_ctx
= this->m_ictx
;
194 ldout(image_ctx
->cct
, 20) << dendl
;
199 template <typename I
>
200 void ObjectReadRequest
<I
>::read_object() {
201 I
*image_ctx
= this->m_ictx
;
203 std::shared_lock image_locker
{image_ctx
->image_lock
};
204 if (image_ctx
->object_map
!= nullptr &&
205 !image_ctx
->object_map
->object_may_exist(this->m_object_no
)) {
206 image_ctx
->op_work_queue
->queue(new LambdaContext([this](int r
) {
213 ldout(image_ctx
->cct
, 20) << dendl
;
215 librados::ObjectReadOperation op
;
216 if (this->m_object_len
>= image_ctx
->sparse_read_threshold_bytes
) {
217 op
.sparse_read(this->m_object_off
, this->m_object_len
, m_extent_map
,
218 m_read_data
, nullptr);
220 op
.read(this->m_object_off
, this->m_object_len
, m_read_data
, nullptr);
222 op
.set_op_flags2(m_op_flags
);
224 librados::AioCompletion
*rados_completion
= util::create_rados_callback
<
225 ObjectReadRequest
<I
>, &ObjectReadRequest
<I
>::handle_read_object
>(this);
226 int flags
= image_ctx
->get_read_flags(this->m_snap_id
);
227 int r
= image_ctx
->data_ctx
.aio_operate(
228 data_object_name(this->m_ictx
, this->m_object_no
), rados_completion
, &op
,
230 (this->m_trace
.valid() ? this->m_trace
.get_info() : nullptr));
233 rados_completion
->release();
236 template <typename I
>
237 void ObjectReadRequest
<I
>::handle_read_object(int r
) {
238 I
*image_ctx
= this->m_ictx
;
239 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
245 lderr(image_ctx
->cct
) << "failed to read from object: "
246 << cpp_strerror(r
) << dendl
;
254 template <typename I
>
255 void ObjectReadRequest
<I
>::read_parent() {
256 I
*image_ctx
= this->m_ictx
;
258 std::shared_lock image_locker
{image_ctx
->image_lock
};
260 // calculate reverse mapping onto the image
261 Extents parent_extents
;
262 Striper::extent_to_file(image_ctx
->cct
, &image_ctx
->layout
,
263 this->m_object_no
, this->m_object_off
,
264 this->m_object_len
, parent_extents
);
266 uint64_t parent_overlap
= 0;
267 uint64_t object_overlap
= 0;
268 int r
= image_ctx
->get_parent_overlap(this->m_snap_id
, &parent_overlap
);
270 object_overlap
= image_ctx
->prune_parent_extents(parent_extents
,
274 if (object_overlap
== 0) {
275 image_locker
.unlock();
277 this->finish(-ENOENT
);
281 ldout(image_ctx
->cct
, 20) << dendl
;
283 auto parent_completion
= AioCompletion::create_and_start
<
284 ObjectReadRequest
<I
>, &ObjectReadRequest
<I
>::handle_read_parent
>(
285 this, util::get_image_ctx(image_ctx
->parent
), AIO_TYPE_READ
);
286 ImageRequest
<I
>::aio_read(image_ctx
->parent
, parent_completion
,
287 std::move(parent_extents
), ReadResult
{m_read_data
},
291 template <typename I
>
292 void ObjectReadRequest
<I
>::handle_read_parent(int r
) {
293 I
*image_ctx
= this->m_ictx
;
294 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
300 lderr(image_ctx
->cct
) << "failed to read parent extents: "
301 << cpp_strerror(r
) << dendl
;
309 template <typename I
>
310 void ObjectReadRequest
<I
>::copyup() {
311 I
*image_ctx
= this->m_ictx
;
312 if (!is_copy_on_read(image_ctx
, this->m_snap_id
)) {
317 image_ctx
->owner_lock
.lock_shared();
318 image_ctx
->image_lock
.lock_shared();
319 Extents parent_extents
;
320 if (!this->compute_parent_extents(&parent_extents
, true) ||
321 (image_ctx
->exclusive_lock
!= nullptr &&
322 !image_ctx
->exclusive_lock
->is_lock_owner())) {
323 image_ctx
->image_lock
.unlock_shared();
324 image_ctx
->owner_lock
.unlock_shared();
329 ldout(image_ctx
->cct
, 20) << dendl
;
331 image_ctx
->copyup_list_lock
.lock();
332 auto it
= image_ctx
->copyup_list
.find(this->m_object_no
);
333 if (it
== image_ctx
->copyup_list
.end()) {
334 // create and kick off a CopyupRequest
335 auto new_req
= CopyupRequest
<I
>::create(
336 image_ctx
, this->m_object_no
, std::move(parent_extents
), this->m_trace
);
338 image_ctx
->copyup_list
[this->m_object_no
] = new_req
;
339 image_ctx
->copyup_list_lock
.unlock();
340 image_ctx
->image_lock
.unlock_shared();
343 image_ctx
->copyup_list_lock
.unlock();
344 image_ctx
->image_lock
.unlock_shared();
347 image_ctx
->owner_lock
.unlock_shared();
353 template <typename I
>
354 AbstractObjectWriteRequest
<I
>::AbstractObjectWriteRequest(
355 I
*ictx
, uint64_t object_no
, uint64_t object_off
, uint64_t len
,
356 const ::SnapContext
&snapc
, const char *trace_name
,
357 const ZTracer::Trace
&parent_trace
, Context
*completion
)
358 : ObjectRequest
<I
>(ictx
, object_no
, object_off
, len
, CEPH_NOSNAP
, trace_name
,
359 parent_trace
, completion
),
360 m_snap_seq(snapc
.seq
.val
)
362 m_snaps
.insert(m_snaps
.end(), snapc
.snaps
.begin(), snapc
.snaps
.end());
364 if (this->m_object_off
== 0 &&
365 this->m_object_len
== ictx
->get_object_size()) {
366 m_full_object
= true;
369 compute_parent_info();
371 ictx
->image_lock
.lock_shared();
372 if (!ictx
->migration_info
.empty()) {
373 m_guarding_migration_write
= true;
375 ictx
->image_lock
.unlock_shared();
378 template <typename I
>
379 void AbstractObjectWriteRequest
<I
>::compute_parent_info() {
380 I
*image_ctx
= this->m_ictx
;
381 std::shared_lock image_locker
{image_ctx
->image_lock
};
383 this->compute_parent_extents(&m_parent_extents
, false);
385 if (!this->has_parent() ||
386 (m_full_object
&& m_snaps
.empty() && !is_post_copyup_write_required())) {
387 m_copyup_enabled
= false;
391 template <typename I
>
392 void AbstractObjectWriteRequest
<I
>::add_write_hint(
393 librados::ObjectWriteOperation
*wr
) {
394 I
*image_ctx
= this->m_ictx
;
395 std::shared_lock image_locker
{image_ctx
->image_lock
};
396 if (image_ctx
->object_map
== nullptr || !this->m_object_may_exist
) {
397 ObjectRequest
<I
>::add_write_hint(*image_ctx
, wr
);
401 template <typename I
>
402 void AbstractObjectWriteRequest
<I
>::send() {
403 I
*image_ctx
= this->m_ictx
;
404 ldout(image_ctx
->cct
, 20) << this->get_op_type() << " "
405 << this->m_object_off
<< "~" << this->m_object_len
408 std::shared_lock image_lock
{image_ctx
->image_lock
};
409 if (image_ctx
->object_map
== nullptr) {
410 m_object_may_exist
= true;
412 // should have been flushed prior to releasing lock
413 ceph_assert(image_ctx
->exclusive_lock
->is_lock_owner());
414 m_object_may_exist
= image_ctx
->object_map
->object_may_exist(
419 if (!m_object_may_exist
&& is_no_op_for_nonexistent_object()) {
420 ldout(image_ctx
->cct
, 20) << "skipping no-op on nonexistent object"
422 this->async_finish(0);
426 pre_write_object_map_update();
429 template <typename I
>
430 void AbstractObjectWriteRequest
<I
>::pre_write_object_map_update() {
431 I
*image_ctx
= this->m_ictx
;
433 image_ctx
->image_lock
.lock_shared();
434 if (image_ctx
->object_map
== nullptr || !is_object_map_update_enabled()) {
435 image_ctx
->image_lock
.unlock_shared();
440 if (!m_object_may_exist
&& m_copyup_enabled
) {
441 // optimization: copyup required
442 image_ctx
->image_lock
.unlock_shared();
447 uint8_t new_state
= this->get_pre_write_object_map_state();
448 ldout(image_ctx
->cct
, 20) << this->m_object_off
<< "~" << this->m_object_len
451 if (image_ctx
->object_map
->template aio_update
<
452 AbstractObjectWriteRequest
<I
>,
453 &AbstractObjectWriteRequest
<I
>::handle_pre_write_object_map_update
>(
454 CEPH_NOSNAP
, this->m_object_no
, new_state
, {}, this->m_trace
, false,
456 image_ctx
->image_lock
.unlock_shared();
460 image_ctx
->image_lock
.unlock_shared();
464 template <typename I
>
465 void AbstractObjectWriteRequest
<I
>::handle_pre_write_object_map_update(int r
) {
466 I
*image_ctx
= this->m_ictx
;
467 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
469 lderr(image_ctx
->cct
) << "failed to update object map: "
470 << cpp_strerror(r
) << dendl
;
478 template <typename I
>
479 void AbstractObjectWriteRequest
<I
>::write_object() {
480 I
*image_ctx
= this->m_ictx
;
481 ldout(image_ctx
->cct
, 20) << dendl
;
483 librados::ObjectWriteOperation write
;
484 if (m_copyup_enabled
) {
485 ldout(image_ctx
->cct
, 20) << "guarding write" << dendl
;
486 if (m_guarding_migration_write
) {
487 cls_client::assert_snapc_seq(
488 &write
, m_snap_seq
, cls::rbd::ASSERT_SNAPC_SEQ_LE_SNAPSET_SEQ
);
490 write
.assert_exists();
494 add_write_hint(&write
);
495 add_write_ops(&write
);
496 ceph_assert(write
.size() != 0);
498 librados::AioCompletion
*rados_completion
= util::create_rados_callback
<
499 AbstractObjectWriteRequest
<I
>,
500 &AbstractObjectWriteRequest
<I
>::handle_write_object
>(this);
501 int r
= image_ctx
->data_ctx
.aio_operate(
502 data_object_name(this->m_ictx
, this->m_object_no
), rados_completion
,
503 &write
, m_snap_seq
, m_snaps
,
504 (this->m_trace
.valid() ? this->m_trace
.get_info() : nullptr));
506 rados_completion
->release();
509 template <typename I
>
510 void AbstractObjectWriteRequest
<I
>::handle_write_object(int r
) {
511 I
*image_ctx
= this->m_ictx
;
512 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
514 r
= filter_write_result(r
);
516 if (m_copyup_enabled
) {
520 } else if (r
== -ERANGE
&& m_guarding_migration_write
) {
521 image_ctx
->image_lock
.lock_shared();
522 m_guarding_migration_write
= !image_ctx
->migration_info
.empty();
523 image_ctx
->image_lock
.unlock_shared();
525 if (m_guarding_migration_write
) {
528 ldout(image_ctx
->cct
, 10) << "migration parent gone, restart io" << dendl
;
529 compute_parent_info();
533 } else if (r
== -EILSEQ
) {
534 ldout(image_ctx
->cct
, 10) << "failed to write object" << dendl
;
538 lderr(image_ctx
->cct
) << "failed to write object: " << cpp_strerror(r
)
544 post_write_object_map_update();
547 template <typename I
>
548 void AbstractObjectWriteRequest
<I
>::copyup() {
549 I
*image_ctx
= this->m_ictx
;
550 ldout(image_ctx
->cct
, 20) << dendl
;
552 ceph_assert(!m_copyup_in_progress
);
553 m_copyup_in_progress
= true;
555 image_ctx
->copyup_list_lock
.lock();
556 auto it
= image_ctx
->copyup_list
.find(this->m_object_no
);
557 if (it
== image_ctx
->copyup_list
.end()) {
558 auto new_req
= CopyupRequest
<I
>::create(
559 image_ctx
, this->m_object_no
, std::move(this->m_parent_extents
),
561 this->m_parent_extents
.clear();
563 // make sure to wait on this CopyupRequest
564 new_req
->append_request(this);
565 image_ctx
->copyup_list
[this->m_object_no
] = new_req
;
567 image_ctx
->copyup_list_lock
.unlock();
570 it
->second
->append_request(this);
571 image_ctx
->copyup_list_lock
.unlock();
575 template <typename I
>
576 void AbstractObjectWriteRequest
<I
>::handle_copyup(int r
) {
577 I
*image_ctx
= this->m_ictx
;
578 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
580 ceph_assert(m_copyup_in_progress
);
581 m_copyup_in_progress
= false;
583 if (r
< 0 && r
!= -ERESTART
) {
584 lderr(image_ctx
->cct
) << "failed to copyup object: " << cpp_strerror(r
)
590 if (r
== -ERESTART
|| is_post_copyup_write_required()) {
595 post_write_object_map_update();
598 template <typename I
>
599 void AbstractObjectWriteRequest
<I
>::post_write_object_map_update() {
600 I
*image_ctx
= this->m_ictx
;
602 image_ctx
->image_lock
.lock_shared();
603 if (image_ctx
->object_map
== nullptr || !is_object_map_update_enabled() ||
604 !is_non_existent_post_write_object_map_state()) {
605 image_ctx
->image_lock
.unlock_shared();
610 ldout(image_ctx
->cct
, 20) << dendl
;
612 // should have been flushed prior to releasing lock
613 ceph_assert(image_ctx
->exclusive_lock
->is_lock_owner());
614 if (image_ctx
->object_map
->template aio_update
<
615 AbstractObjectWriteRequest
<I
>,
616 &AbstractObjectWriteRequest
<I
>::handle_post_write_object_map_update
>(
617 CEPH_NOSNAP
, this->m_object_no
, OBJECT_NONEXISTENT
, OBJECT_PENDING
,
618 this->m_trace
, false, this)) {
619 image_ctx
->image_lock
.unlock_shared();
623 image_ctx
->image_lock
.unlock_shared();
627 template <typename I
>
628 void AbstractObjectWriteRequest
<I
>::handle_post_write_object_map_update(int r
) {
629 I
*image_ctx
= this->m_ictx
;
630 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
632 lderr(image_ctx
->cct
) << "failed to update object map: "
633 << cpp_strerror(r
) << dendl
;
641 template <typename I
>
642 void ObjectWriteRequest
<I
>::add_write_ops(librados::ObjectWriteOperation
*wr
) {
643 if (this->m_full_object
) {
644 wr
->write_full(m_write_data
);
646 wr
->write(this->m_object_off
, m_write_data
);
648 wr
->set_op_flags2(m_op_flags
);
651 template <typename I
>
652 void ObjectWriteSameRequest
<I
>::add_write_ops(
653 librados::ObjectWriteOperation
*wr
) {
654 wr
->writesame(this->m_object_off
, this->m_object_len
, m_write_data
);
655 wr
->set_op_flags2(m_op_flags
);
658 template <typename I
>
659 void ObjectCompareAndWriteRequest
<I
>::add_write_ops(
660 librados::ObjectWriteOperation
*wr
) {
661 wr
->cmpext(this->m_object_off
, m_cmp_bl
, nullptr);
663 if (this->m_full_object
) {
664 wr
->write_full(m_write_bl
);
666 wr
->write(this->m_object_off
, m_write_bl
);
668 wr
->set_op_flags2(m_op_flags
);
671 template <typename I
>
672 int ObjectCompareAndWriteRequest
<I
>::filter_write_result(int r
) const {
673 if (r
<= -MAX_ERRNO
) {
674 I
*image_ctx
= this->m_ictx
;
675 Extents image_extents
;
677 // object extent compare mismatch
678 uint64_t offset
= -MAX_ERRNO
- r
;
679 Striper::extent_to_file(image_ctx
->cct
, &image_ctx
->layout
,
680 this->m_object_no
, offset
, this->m_object_len
,
682 ceph_assert(image_extents
.size() == 1);
684 if (m_mismatch_offset
) {
685 *m_mismatch_offset
= image_extents
[0].first
;
693 } // namespace librbd
695 template class librbd::io::ObjectRequest
<librbd::ImageCtx
>;
696 template class librbd::io::ObjectReadRequest
<librbd::ImageCtx
>;
697 template class librbd::io::AbstractObjectWriteRequest
<librbd::ImageCtx
>;
698 template class librbd::io::ObjectWriteRequest
<librbd::ImageCtx
>;
699 template class librbd::io::ObjectDiscardRequest
<librbd::ImageCtx
>;
700 template class librbd::io::ObjectWriteSameRequest
<librbd::ImageCtx
>;
701 template class librbd::io::ObjectCompareAndWriteRequest
<librbd::ImageCtx
>;