1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/io/ObjectRequest.h"
5 #include "common/ceph_context.h"
6 #include "common/dout.h"
7 #include "common/errno.h"
8 #include "common/Mutex.h"
9 #include "common/RWLock.h"
10 #include "common/WorkQueue.h"
11 #include "include/Context.h"
12 #include "include/err.h"
14 #include "librbd/ExclusiveLock.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/ObjectMap.h"
17 #include "librbd/Utils.h"
18 #include "librbd/io/AioCompletion.h"
19 #include "librbd/io/CopyupRequest.h"
20 #include "librbd/io/ImageRequest.h"
21 #include "librbd/io/ReadResult.h"
23 #include <boost/bind.hpp>
24 #include <boost/optional.hpp>
26 #define dout_subsys ceph_subsys_rbd
28 #define dout_prefix *_dout << "librbd::io::ObjectRequest: " << this \
29 << " " << __func__ << ": "
37 inline bool is_copy_on_read(I
*ictx
, librados::snap_t snap_id
) {
38 RWLock::RLocker
snap_locker(ictx
->snap_lock
);
39 return (ictx
->clone_copy_on_read
&&
40 !ictx
->read_only
&& snap_id
== CEPH_NOSNAP
&&
41 (ictx
->exclusive_lock
== nullptr ||
42 ictx
->exclusive_lock
->is_lock_owner()));
45 } // anonymous namespace
49 ObjectRequest
<I
>::create_write(I
*ictx
, const std::string
&oid
,
50 uint64_t object_no
, uint64_t object_off
,
51 const ceph::bufferlist
&data
,
52 const ::SnapContext
&snapc
, int op_flags
,
53 const ZTracer::Trace
&parent_trace
,
54 Context
*completion
) {
55 return new ObjectWriteRequest
<I
>(ictx
, oid
, object_no
, object_off
, data
,
56 snapc
, op_flags
, parent_trace
, completion
);
61 ObjectRequest
<I
>::create_discard(I
*ictx
, const std::string
&oid
,
62 uint64_t object_no
, uint64_t object_off
,
64 const ::SnapContext
&snapc
,
65 bool disable_clone_remove
,
66 bool update_object_map
,
67 const ZTracer::Trace
&parent_trace
,
68 Context
*completion
) {
69 return new ObjectDiscardRequest
<I
>(ictx
, oid
, object_no
, object_off
,
70 object_len
, snapc
, disable_clone_remove
,
71 update_object_map
, parent_trace
,
77 ObjectRequest
<I
>::create_writesame(I
*ictx
, const std::string
&oid
,
78 uint64_t object_no
, uint64_t object_off
,
80 const ceph::bufferlist
&data
,
81 const ::SnapContext
&snapc
, int op_flags
,
82 const ZTracer::Trace
&parent_trace
,
83 Context
*completion
) {
84 return new ObjectWriteSameRequest
<I
>(ictx
, oid
, object_no
, object_off
,
85 object_len
, data
, snapc
, op_flags
,
86 parent_trace
, completion
);
91 ObjectRequest
<I
>::create_compare_and_write(I
*ictx
, const std::string
&oid
,
94 const ceph::bufferlist
&cmp_data
,
95 const ceph::bufferlist
&write_data
,
96 const ::SnapContext
&snapc
,
97 uint64_t *mismatch_offset
,
99 const ZTracer::Trace
&parent_trace
,
100 Context
*completion
) {
101 return new ObjectCompareAndWriteRequest
<I
>(ictx
, oid
, object_no
, object_off
,
102 cmp_data
, write_data
, snapc
,
103 mismatch_offset
, op_flags
,
104 parent_trace
, completion
);
107 template <typename I
>
108 ObjectRequest
<I
>::ObjectRequest(I
*ictx
, const std::string
&oid
,
109 uint64_t objectno
, uint64_t off
,
110 uint64_t len
, librados::snap_t snap_id
,
111 const char *trace_name
,
112 const ZTracer::Trace
&trace
,
114 : m_ictx(ictx
), m_oid(oid
), m_object_no(objectno
), m_object_off(off
),
115 m_object_len(len
), m_snap_id(snap_id
), m_completion(completion
),
116 m_trace(util::create_trace(*ictx
, "", trace
)) {
117 if (m_trace
.valid()) {
118 m_trace
.copy_name(trace_name
+ std::string(" ") + oid
);
119 m_trace
.event("start");
123 template <typename I
>
124 void ObjectRequest
<I
>::add_write_hint(I
& image_ctx
,
125 librados::ObjectWriteOperation
*wr
) {
126 if (image_ctx
.enable_alloc_hint
) {
127 wr
->set_alloc_hint(image_ctx
.get_object_size(),
128 image_ctx
.get_object_size());
132 template <typename I
>
133 bool ObjectRequest
<I
>::compute_parent_extents(Extents
*parent_extents
) {
134 assert(m_ictx
->snap_lock
.is_locked());
135 assert(m_ictx
->parent_lock
.is_locked());
137 m_has_parent
= false;
138 parent_extents
->clear();
140 uint64_t parent_overlap
;
141 int r
= m_ictx
->get_parent_overlap(m_snap_id
, &parent_overlap
);
143 // NOTE: it's possible for a snapshot to be deleted while we are
144 // still reading from it
145 lderr(m_ictx
->cct
) << "failed to retrieve parent overlap: "
146 << cpp_strerror(r
) << dendl
;
148 } else if (parent_overlap
== 0) {
152 Striper::extent_to_file(m_ictx
->cct
, &m_ictx
->layout
, m_object_no
, 0,
153 m_ictx
->layout
.object_size
, *parent_extents
);
154 uint64_t object_overlap
= m_ictx
->prune_parent_extents(*parent_extents
,
156 if (object_overlap
> 0) {
157 ldout(m_ictx
->cct
, 20) << "overlap " << parent_overlap
<< " "
158 << "extents " << *parent_extents
<< dendl
;
159 m_has_parent
= !parent_extents
->empty();
165 template <typename I
>
166 void ObjectRequest
<I
>::async_finish(int r
) {
167 ldout(m_ictx
->cct
, 20) << "r=" << r
<< dendl
;
168 m_ictx
->op_work_queue
->queue(util::create_context_callback
<
169 ObjectRequest
<I
>, &ObjectRequest
<I
>::finish
>(this), r
);
172 template <typename I
>
173 void ObjectRequest
<I
>::finish(int r
) {
174 ldout(m_ictx
->cct
, 20) << "r=" << r
<< dendl
;
175 m_completion
->complete(r
);
181 template <typename I
>
182 ObjectReadRequest
<I
>::ObjectReadRequest(I
*ictx
, const std::string
&oid
,
183 uint64_t objectno
, uint64_t offset
,
184 uint64_t len
, librados::snap_t snap_id
,
185 int op_flags
, bool cache_initiated
,
186 const ZTracer::Trace
&parent_trace
,
188 : ObjectRequest
<I
>(ictx
, oid
, objectno
, offset
, len
, snap_id
, "read",
189 parent_trace
, completion
),
190 m_op_flags(op_flags
), m_cache_initiated(cache_initiated
) {
193 template <typename I
>
194 void ObjectReadRequest
<I
>::send() {
195 I
*image_ctx
= this->m_ictx
;
196 ldout(image_ctx
->cct
, 20) << dendl
;
198 if (!m_cache_initiated
&& image_ctx
->object_cacher
!= nullptr) {
205 template <typename I
>
206 void ObjectReadRequest
<I
>::read_cache() {
207 I
*image_ctx
= this->m_ictx
;
208 ldout(image_ctx
->cct
, 20) << dendl
;
210 // must use async callback to avoid cache_lock cycle
211 auto cache_ctx
= util::create_async_context_callback(
212 *image_ctx
, util::create_context_callback
<
213 ObjectReadRequest
<I
>, &ObjectReadRequest
<I
>::handle_read_cache
>(this));
214 image_ctx
->aio_read_from_cache(
215 this->m_oid
, this->m_object_no
, &m_read_data
, this->m_object_len
,
216 this->m_object_off
, cache_ctx
, m_op_flags
,
217 (this->m_trace
.valid() ? &this->m_trace
: nullptr));
220 template <typename I
>
221 void ObjectReadRequest
<I
>::handle_read_cache(int r
) {
222 I
*image_ctx
= this->m_ictx
;
223 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
229 lderr(image_ctx
->cct
) << "failed to read from cache: "
230 << cpp_strerror(r
) << dendl
;
238 template <typename I
>
239 void ObjectReadRequest
<I
>::read_object() {
240 I
*image_ctx
= this->m_ictx
;
242 RWLock::RLocker
snap_locker(image_ctx
->snap_lock
);
243 if (image_ctx
->object_map
!= nullptr &&
244 !image_ctx
->object_map
->object_may_exist(this->m_object_no
)) {
245 image_ctx
->op_work_queue
->queue(new FunctionContext([this](int r
) {
252 ldout(image_ctx
->cct
, 20) << dendl
;
254 librados::ObjectReadOperation op
;
255 if (this->m_object_len
>= image_ctx
->sparse_read_threshold_bytes
) {
256 op
.sparse_read(this->m_object_off
, this->m_object_len
, &m_ext_map
,
257 &m_read_data
, nullptr);
259 op
.read(this->m_object_off
, this->m_object_len
, &m_read_data
, nullptr);
261 op
.set_op_flags2(m_op_flags
);
263 librados::AioCompletion
*rados_completion
= util::create_rados_callback
<
264 ObjectReadRequest
<I
>, &ObjectReadRequest
<I
>::handle_read_object
>(this);
265 int flags
= image_ctx
->get_read_flags(this->m_snap_id
);
266 int r
= image_ctx
->data_ctx
.aio_operate(
267 this->m_oid
, rados_completion
, &op
, flags
, nullptr,
268 (this->m_trace
.valid() ? this->m_trace
.get_info() : nullptr));
271 rados_completion
->release();
274 template <typename I
>
275 void ObjectReadRequest
<I
>::handle_read_object(int r
) {
276 I
*image_ctx
= this->m_ictx
;
277 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
283 lderr(image_ctx
->cct
) << "failed to read from object: "
284 << cpp_strerror(r
) << dendl
;
292 template <typename I
>
293 void ObjectReadRequest
<I
>::read_parent() {
294 I
*image_ctx
= this->m_ictx
;
295 if (m_cache_initiated
) {
296 this->finish(-ENOENT
);
300 uint64_t object_overlap
= 0;
301 Extents parent_extents
;
303 RWLock::RLocker
snap_locker(image_ctx
->snap_lock
);
304 RWLock::RLocker
parent_locker(image_ctx
->parent_lock
);
306 // calculate reverse mapping onto the image
307 Striper::extent_to_file(image_ctx
->cct
, &image_ctx
->layout
,
308 this->m_object_no
, this->m_object_off
,
309 this->m_object_len
, parent_extents
);
311 uint64_t parent_overlap
= 0;
312 int r
= image_ctx
->get_parent_overlap(this->m_snap_id
, &parent_overlap
);
314 object_overlap
= image_ctx
->prune_parent_extents(parent_extents
,
319 if (object_overlap
== 0) {
320 this->finish(-ENOENT
);
324 ldout(image_ctx
->cct
, 20) << dendl
;
326 AioCompletion
*parent_completion
= AioCompletion::create_and_start
<
327 ObjectReadRequest
<I
>, &ObjectReadRequest
<I
>::handle_read_parent
>(
328 this, util::get_image_ctx(image_ctx
->parent
), AIO_TYPE_READ
);
329 ImageRequest
<I
>::aio_read(image_ctx
->parent
, parent_completion
,
330 std::move(parent_extents
), ReadResult
{&m_read_data
},
334 template <typename I
>
335 void ObjectReadRequest
<I
>::handle_read_parent(int r
) {
336 I
*image_ctx
= this->m_ictx
;
337 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
343 lderr(image_ctx
->cct
) << "failed to read parent extents: "
344 << cpp_strerror(r
) << dendl
;
352 template <typename I
>
353 void ObjectReadRequest
<I
>::copyup() {
354 I
*image_ctx
= this->m_ictx
;
355 if (!is_copy_on_read(image_ctx
, this->m_snap_id
)) {
360 image_ctx
->owner_lock
.get_read();
361 image_ctx
->snap_lock
.get_read();
362 image_ctx
->parent_lock
.get_read();
363 Extents parent_extents
;
364 if (!this->compute_parent_extents(&parent_extents
) ||
365 (image_ctx
->exclusive_lock
!= nullptr &&
366 !image_ctx
->exclusive_lock
->is_lock_owner())) {
367 image_ctx
->parent_lock
.put_read();
368 image_ctx
->snap_lock
.put_read();
369 image_ctx
->owner_lock
.put_read();
374 ldout(image_ctx
->cct
, 20) << dendl
;
376 Mutex::Locker
copyup_locker(image_ctx
->copyup_list_lock
);
377 auto it
= image_ctx
->copyup_list
.find(this->m_object_no
);
378 if (it
== image_ctx
->copyup_list
.end()) {
379 // create and kick off a CopyupRequest
380 auto new_req
= CopyupRequest
<I
>::create(
381 image_ctx
, this->m_oid
, this->m_object_no
, std::move(parent_extents
),
384 image_ctx
->copyup_list
[this->m_object_no
] = new_req
;
388 image_ctx
->parent_lock
.put_read();
389 image_ctx
->snap_lock
.put_read();
390 image_ctx
->owner_lock
.put_read();
396 template <typename I
>
397 AbstractObjectWriteRequest
<I
>::AbstractObjectWriteRequest(
398 I
*ictx
, const std::string
&oid
, uint64_t object_no
, uint64_t object_off
,
399 uint64_t len
, const ::SnapContext
&snapc
, const char *trace_name
,
400 const ZTracer::Trace
&parent_trace
, Context
*completion
)
401 : ObjectRequest
<I
>(ictx
, oid
, object_no
, object_off
, len
, CEPH_NOSNAP
,
402 trace_name
, parent_trace
, completion
),
403 m_snap_seq(snapc
.seq
.val
)
405 m_snaps
.insert(m_snaps
.end(), snapc
.snaps
.begin(), snapc
.snaps
.end());
408 RWLock::RLocker
snap_locker(ictx
->snap_lock
);
409 RWLock::RLocker
parent_locker(ictx
->parent_lock
);
410 this->compute_parent_extents(&m_parent_extents
);
413 if (this->m_object_off
== 0 &&
414 this->m_object_len
== ictx
->get_object_size()) {
415 m_full_object
= true;
418 if (!this->has_parent() ||
419 (m_full_object
&& m_snaps
.empty() && !is_post_copyup_write_required())) {
420 this->m_copyup_enabled
= false;
424 template <typename I
>
425 void AbstractObjectWriteRequest
<I
>::add_write_hint(
426 librados::ObjectWriteOperation
*wr
) {
427 I
*image_ctx
= this->m_ictx
;
428 RWLock::RLocker
snap_locker(image_ctx
->snap_lock
);
429 if (image_ctx
->object_map
== nullptr || !this->m_object_may_exist
) {
430 ObjectRequest
<I
>::add_write_hint(*image_ctx
, wr
);
434 template <typename I
>
435 void AbstractObjectWriteRequest
<I
>::send() {
436 I
*image_ctx
= this->m_ictx
;
437 ldout(image_ctx
->cct
, 20) << this->get_op_type() << " " << this->m_oid
<< " "
438 << this->m_object_off
<< "~" << this->m_object_len
441 RWLock::RLocker
snap_lock(image_ctx
->snap_lock
);
442 if (image_ctx
->object_map
== nullptr) {
443 m_object_may_exist
= true;
445 // should have been flushed prior to releasing lock
446 assert(image_ctx
->exclusive_lock
->is_lock_owner());
447 m_object_may_exist
= image_ctx
->object_map
->object_may_exist(
452 if (!m_object_may_exist
&& is_no_op_for_nonexistent_object()) {
453 ldout(image_ctx
->cct
, 20) << "skipping no-op on nonexistent object"
455 this->async_finish(0);
459 pre_write_object_map_update();
462 template <typename I
>
463 void AbstractObjectWriteRequest
<I
>::pre_write_object_map_update() {
464 I
*image_ctx
= this->m_ictx
;
466 image_ctx
->snap_lock
.get_read();
467 if (image_ctx
->object_map
== nullptr || !is_object_map_update_enabled()) {
468 image_ctx
->snap_lock
.put_read();
473 if (!m_object_may_exist
&& m_copyup_enabled
) {
474 // optimization: copyup required
475 image_ctx
->snap_lock
.put_read();
480 uint8_t new_state
= this->get_pre_write_object_map_state();
481 ldout(image_ctx
->cct
, 20) << this->m_oid
<< " " << this->m_object_off
482 << "~" << this->m_object_len
<< dendl
;
484 image_ctx
->object_map_lock
.get_write();
485 if (image_ctx
->object_map
->template aio_update
<
486 AbstractObjectWriteRequest
<I
>,
487 &AbstractObjectWriteRequest
<I
>::handle_pre_write_object_map_update
>(
488 CEPH_NOSNAP
, this->m_object_no
, new_state
, {}, this->m_trace
, this)) {
489 image_ctx
->object_map_lock
.put_write();
490 image_ctx
->snap_lock
.put_read();
494 image_ctx
->object_map_lock
.put_write();
495 image_ctx
->snap_lock
.put_read();
499 template <typename I
>
500 void AbstractObjectWriteRequest
<I
>::handle_pre_write_object_map_update(int r
) {
501 I
*image_ctx
= this->m_ictx
;
502 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
508 template <typename I
>
509 void AbstractObjectWriteRequest
<I
>::write_object() {
510 I
*image_ctx
= this->m_ictx
;
511 ldout(image_ctx
->cct
, 20) << dendl
;
513 librados::ObjectWriteOperation write
;
514 if (m_copyup_enabled
) {
515 ldout(image_ctx
->cct
, 20) << "guarding write" << dendl
;
516 write
.assert_exists();
519 add_write_hint(&write
);
520 add_write_ops(&write
);
521 assert(write
.size() != 0);
523 librados::AioCompletion
*rados_completion
= util::create_rados_callback
<
524 AbstractObjectWriteRequest
<I
>,
525 &AbstractObjectWriteRequest
<I
>::handle_write_object
>(this);
526 int r
= image_ctx
->data_ctx
.aio_operate(
527 this->m_oid
, rados_completion
, &write
, m_snap_seq
, m_snaps
,
528 (this->m_trace
.valid() ? this->m_trace
.get_info() : nullptr));
530 rados_completion
->release();
533 template <typename I
>
534 void AbstractObjectWriteRequest
<I
>::handle_write_object(int r
) {
535 I
*image_ctx
= this->m_ictx
;
536 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
538 r
= filter_write_result(r
);
540 if (m_copyup_enabled
) {
544 } else if (r
== -EILSEQ
) {
545 ldout(image_ctx
->cct
, 10) << "failed to write object" << dendl
;
549 lderr(image_ctx
->cct
) << "failed to write object: " << cpp_strerror(r
)
555 post_write_object_map_update();
558 template <typename I
>
559 void AbstractObjectWriteRequest
<I
>::copyup() {
560 I
*image_ctx
= this->m_ictx
;
561 ldout(image_ctx
->cct
, 20) << dendl
;
563 assert(!m_copyup_in_progress
);
564 m_copyup_in_progress
= true;
566 image_ctx
->copyup_list_lock
.Lock();
567 auto it
= image_ctx
->copyup_list
.find(this->m_object_no
);
568 if (it
== image_ctx
->copyup_list
.end()) {
569 auto new_req
= CopyupRequest
<I
>::create(
570 image_ctx
, this->m_oid
, this->m_object_no
,
571 std::move(this->m_parent_extents
), this->m_trace
);
572 this->m_parent_extents
.clear();
574 // make sure to wait on this CopyupRequest
575 new_req
->append_request(this);
576 image_ctx
->copyup_list
[this->m_object_no
] = new_req
;
578 image_ctx
->copyup_list_lock
.Unlock();
581 it
->second
->append_request(this);
582 image_ctx
->copyup_list_lock
.Unlock();
586 template <typename I
>
587 void AbstractObjectWriteRequest
<I
>::handle_copyup(int r
) {
588 I
*image_ctx
= this->m_ictx
;
589 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
591 assert(m_copyup_in_progress
);
592 m_copyup_in_progress
= false;
595 lderr(image_ctx
->cct
) << "failed to copyup object: " << cpp_strerror(r
)
601 if (is_post_copyup_write_required()) {
606 post_write_object_map_update();
609 template <typename I
>
610 void AbstractObjectWriteRequest
<I
>::post_write_object_map_update() {
611 I
*image_ctx
= this->m_ictx
;
613 image_ctx
->snap_lock
.get_read();
614 if (image_ctx
->object_map
== nullptr || !is_object_map_update_enabled() ||
615 !is_non_existent_post_write_object_map_state()) {
616 image_ctx
->snap_lock
.put_read();
621 ldout(image_ctx
->cct
, 20) << dendl
;
623 // should have been flushed prior to releasing lock
624 assert(image_ctx
->exclusive_lock
->is_lock_owner());
625 image_ctx
->object_map_lock
.get_write();
626 if (image_ctx
->object_map
->template aio_update
<
627 AbstractObjectWriteRequest
<I
>,
628 &AbstractObjectWriteRequest
<I
>::handle_post_write_object_map_update
>(
629 CEPH_NOSNAP
, this->m_object_no
, OBJECT_NONEXISTENT
, OBJECT_PENDING
,
630 this->m_trace
, this)) {
631 image_ctx
->object_map_lock
.put_write();
632 image_ctx
->snap_lock
.put_read();
636 image_ctx
->object_map_lock
.put_write();
637 image_ctx
->snap_lock
.put_read();
641 template <typename I
>
642 void AbstractObjectWriteRequest
<I
>::handle_post_write_object_map_update(int r
) {
643 I
*image_ctx
= this->m_ictx
;
644 ldout(image_ctx
->cct
, 20) << "r=" << r
<< dendl
;
650 template <typename I
>
651 void ObjectWriteRequest
<I
>::add_write_ops(librados::ObjectWriteOperation
*wr
) {
652 if (this->m_full_object
) {
653 wr
->write_full(m_write_data
);
655 wr
->write(this->m_object_off
, m_write_data
);
657 wr
->set_op_flags2(m_op_flags
);
660 template <typename I
>
661 void ObjectWriteSameRequest
<I
>::add_write_ops(
662 librados::ObjectWriteOperation
*wr
) {
663 wr
->writesame(this->m_object_off
, this->m_object_len
, m_write_data
);
664 wr
->set_op_flags2(m_op_flags
);
667 template <typename I
>
668 void ObjectCompareAndWriteRequest
<I
>::add_write_ops(
669 librados::ObjectWriteOperation
*wr
) {
670 wr
->cmpext(this->m_object_off
, m_cmp_bl
, nullptr);
672 if (this->m_full_object
) {
673 wr
->write_full(m_write_bl
);
675 wr
->write(this->m_object_off
, m_write_bl
);
677 wr
->set_op_flags2(m_op_flags
);
680 template <typename I
>
681 int ObjectCompareAndWriteRequest
<I
>::filter_write_result(int r
) const {
682 if (r
<= -MAX_ERRNO
) {
683 I
*image_ctx
= this->m_ictx
;
684 Extents image_extents
;
686 // object extent compare mismatch
687 uint64_t offset
= -MAX_ERRNO
- r
;
688 Striper::extent_to_file(image_ctx
->cct
, &image_ctx
->layout
,
689 this->m_object_no
, offset
, this->m_object_len
,
691 assert(image_extents
.size() == 1);
693 if (m_mismatch_offset
) {
694 *m_mismatch_offset
= image_extents
[0].first
;
702 } // namespace librbd
704 template class librbd::io::ObjectRequest
<librbd::ImageCtx
>;
705 template class librbd::io::ObjectReadRequest
<librbd::ImageCtx
>;
706 template class librbd::io::AbstractObjectWriteRequest
<librbd::ImageCtx
>;
707 template class librbd::io::ObjectWriteRequest
<librbd::ImageCtx
>;
708 template class librbd::io::ObjectDiscardRequest
<librbd::ImageCtx
>;
709 template class librbd::io::ObjectWriteSameRequest
<librbd::ImageCtx
>;
710 template class librbd::io::ObjectCompareAndWriteRequest
<librbd::ImageCtx
>;