1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/io/AioCompletion.h"
7 #include "common/ceph_context.h"
8 #include "common/dout.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
11 #include "common/WorkQueue.h"
13 #include "librbd/ImageCtx.h"
14 #include "librbd/internal.h"
15 #include "librbd/Journal.h"
16 #include "librbd/Types.h"
19 #include "tracing/librbd.h"
21 #define tracepoint(...)
24 #define dout_subsys ceph_subsys_rbd
26 #define dout_prefix *_dout << "librbd::io::AioCompletion: " << this \
27 << " " << __func__ << ": "
32 int AioCompletion::wait_for_complete() {
33 tracepoint(librbd
, aio_wait_for_complete_enter
, this);
35 std::unique_lock
<std::mutex
> locker(lock
);
36 while (state
!= AIO_STATE_COMPLETE
) {
40 tracepoint(librbd
, aio_wait_for_complete_exit
, 0);
44 void AioCompletion::finalize() {
45 ceph_assert(ictx
!= nullptr);
46 CephContext
*cct
= ictx
->cct
;
48 // finalize any pending error results since we won't be
49 // atomically incrementing rval anymore
50 int err_r
= error_rval
;
56 ldout(cct
, 20) << "r=" << r
<< dendl
;
57 if (r
>= 0 && aio_type
== AIO_TYPE_READ
) {
58 read_result
.assemble_result(cct
);
62 void AioCompletion::complete() {
63 ceph_assert(ictx
!= nullptr);
64 CephContext
*cct
= ictx
->cct
;
67 tracepoint(librbd
, aio_complete_enter
, this, r
);
68 if (ictx
->perfcounter
!= nullptr) {
69 ceph::timespan elapsed
= coarse_mono_clock::now() - start_time
;
71 case AIO_TYPE_GENERIC
:
76 ictx
->perfcounter
->tinc(l_librbd_rd_latency
, elapsed
); break;
78 ictx
->perfcounter
->tinc(l_librbd_wr_latency
, elapsed
); break;
79 case AIO_TYPE_DISCARD
:
80 ictx
->perfcounter
->tinc(l_librbd_discard_latency
, elapsed
); break;
82 ictx
->perfcounter
->tinc(l_librbd_flush_latency
, elapsed
); break;
83 case AIO_TYPE_WRITESAME
:
84 ictx
->perfcounter
->tinc(l_librbd_ws_latency
, elapsed
); break;
85 case AIO_TYPE_COMPARE_AND_WRITE
:
86 ictx
->perfcounter
->tinc(l_librbd_cmp_latency
, elapsed
); break;
88 lderr(cct
) << "completed invalid aio_type: " << aio_type
<< dendl
;
93 if ((aio_type
== AIO_TYPE_CLOSE
) ||
94 (aio_type
== AIO_TYPE_OPEN
&& r
< 0)) {
95 // must destroy ImageCtx prior to invoking callback
98 external_callback
= false;
101 state
= AIO_STATE_CALLBACK
;
103 if (external_callback
) {
104 complete_external_callback();
106 complete_cb(rbd_comp
, complete_arg
);
107 complete_event_socket();
110 complete_event_socket();
112 state
= AIO_STATE_COMPLETE
;
115 std::unique_lock
<std::mutex
> locker(lock
);
119 // note: possible for image to be closed after op marked finished
120 if (async_op
.started()) {
121 async_op
.finish_op();
123 tracepoint(librbd
, aio_complete_exit
);
126 void AioCompletion::init_time(ImageCtx
*i
, aio_type_t t
) {
127 if (ictx
== nullptr) {
130 start_time
= coarse_mono_clock::now();
134 void AioCompletion::start_op() {
135 ceph_assert(ictx
!= nullptr);
137 if (aio_type
== AIO_TYPE_OPEN
|| aio_type
== AIO_TYPE_CLOSE
) {
138 // no need to track async open/close operations
142 ceph_assert(!async_op
.started());
143 async_op
.start_op(*ictx
);
146 void AioCompletion::queue_complete() {
148 pending_count
.compare_exchange_strong(zero
, 1);
149 ceph_assert(zero
== 0);
151 // ensure completion fires in clean lock context
152 ictx
->op_work_queue
->queue(new C_AioRequest(this), 0);
155 void AioCompletion::block(CephContext
* cct
) {
156 ldout(cct
, 20) << dendl
;
157 ceph_assert(!was_armed
);
163 void AioCompletion::unblock(CephContext
* cct
) {
164 ldout(cct
, 20) << dendl
;
165 ceph_assert(was_armed
);
167 uint32_t previous_pending_count
= pending_count
--;
168 ceph_assert(previous_pending_count
> 0);
170 if (previous_pending_count
== 1) {
176 void AioCompletion::fail(int r
)
178 ceph_assert(ictx
!= nullptr);
179 CephContext
*cct
= ictx
->cct
;
180 lderr(cct
) << cpp_strerror(r
) << dendl
;
182 ceph_assert(!was_armed
);
187 uint32_t previous_pending_count
= pending_count
.load();
188 if (previous_pending_count
== 0) {
193 void AioCompletion::set_request_count(uint32_t count
) {
194 ceph_assert(ictx
!= nullptr);
195 CephContext
*cct
= ictx
->cct
;
197 ceph_assert(!was_armed
);
200 ldout(cct
, 20) << "pending=" << count
<< dendl
;
201 uint32_t previous_pending_count
= pending_count
.fetch_add(count
);
202 if (previous_pending_count
== 0 && count
== 0) {
207 void AioCompletion::complete_request(ssize_t r
)
209 uint32_t previous_pending_count
= pending_count
--;
210 ceph_assert(previous_pending_count
> 0);
211 auto pending_count
= previous_pending_count
- 1;
213 ceph_assert(ictx
!= nullptr);
214 CephContext
*cct
= ictx
->cct
;
218 } else if (r
!= -EEXIST
) {
219 // might race w/ another thread setting an error code but
222 error_rval
.compare_exchange_strong(zero
, r
);
225 ldout(cct
, 20) << "cb=" << complete_cb
<< ", "
226 << "pending=" << pending_count
<< dendl
;
227 if (pending_count
== 0) {
234 bool AioCompletion::is_complete() {
235 tracepoint(librbd
, aio_is_complete_enter
, this);
236 bool done
= (this->state
!= AIO_STATE_PENDING
);
237 tracepoint(librbd
, aio_is_complete_exit
, done
);
241 ssize_t
AioCompletion::get_return_value() {
242 tracepoint(librbd
, aio_get_return_value_enter
, this);
244 tracepoint(librbd
, aio_get_return_value_exit
, r
);
248 void AioCompletion::complete_external_callback() {
249 // ensure librbd external users never experience concurrent callbacks
250 // from multiple librbd-internal threads.
251 ictx
->external_callback_completions
.push(this);
254 if (ictx
->external_callback_in_progress
.exchange(true)) {
255 // another thread is concurrently invoking external callbacks
259 AioCompletion
* aio_comp
;
260 while (ictx
->external_callback_completions
.pop(aio_comp
)) {
261 aio_comp
->complete_cb(aio_comp
->rbd_comp
, aio_comp
->complete_arg
);
262 aio_comp
->complete_event_socket();
265 ictx
->external_callback_in_progress
.store(false);
266 if (ictx
->external_callback_completions
.empty()) {
267 // queue still empty implies we didn't have a race between the last failed
268 // pop and resetting the in-progress state
274 void AioCompletion::complete_event_socket() {
275 if (ictx
!= nullptr && event_notify
&& ictx
->event_socket
.is_valid()) {
276 ictx
->event_socket_completions
.push(this);
277 ictx
->event_socket
.notify();
282 } // namespace librbd