]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/io/AioCompletion.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / librbd / io / AioCompletion.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/io/AioCompletion.h"
5 #include <errno.h>
6
7 #include "common/ceph_context.h"
8 #include "common/dout.h"
9 #include "common/errno.h"
10 #include "common/perf_counters.h"
11 #include "common/WorkQueue.h"
12
13 #include "librbd/ImageCtx.h"
14 #include "librbd/internal.h"
15 #include "librbd/Journal.h"
16 #include "librbd/Types.h"
17
18 #ifdef WITH_LTTNG
19 #include "tracing/librbd.h"
20 #else
21 #define tracepoint(...)
22 #endif
23
24 #define dout_subsys ceph_subsys_rbd
25 #undef dout_prefix
26 #define dout_prefix *_dout << "librbd::io::AioCompletion: " << this \
27 << " " << __func__ << ": "
28
29 namespace librbd {
30 namespace io {
31
32 int AioCompletion::wait_for_complete() {
33 tracepoint(librbd, aio_wait_for_complete_enter, this);
34 {
35 std::unique_lock<std::mutex> locker(lock);
36 while (state != AIO_STATE_COMPLETE) {
37 cond.wait(locker);
38 }
39 }
40 tracepoint(librbd, aio_wait_for_complete_exit, 0);
41 return 0;
42 }
43
44 void AioCompletion::finalize() {
45 ceph_assert(ictx != nullptr);
46 CephContext *cct = ictx->cct;
47
48 // finalize any pending error results since we won't be
49 // atomically incrementing rval anymore
50 int err_r = error_rval;
51 if (err_r < 0) {
52 rval = err_r;
53 }
54
55 ssize_t r = rval;
56 ldout(cct, 20) << "r=" << r << dendl;
57 if (r >= 0 && aio_type == AIO_TYPE_READ) {
58 read_result.assemble_result(cct);
59 }
60 }
61
62 void AioCompletion::complete() {
63 ceph_assert(ictx != nullptr);
64 CephContext *cct = ictx->cct;
65
66 ssize_t r = rval;
67 tracepoint(librbd, aio_complete_enter, this, r);
68 if (ictx->perfcounter != nullptr) {
69 ceph::timespan elapsed = coarse_mono_clock::now() - start_time;
70 switch (aio_type) {
71 case AIO_TYPE_GENERIC:
72 case AIO_TYPE_OPEN:
73 case AIO_TYPE_CLOSE:
74 break;
75 case AIO_TYPE_READ:
76 ictx->perfcounter->tinc(l_librbd_rd_latency, elapsed); break;
77 case AIO_TYPE_WRITE:
78 ictx->perfcounter->tinc(l_librbd_wr_latency, elapsed); break;
79 case AIO_TYPE_DISCARD:
80 ictx->perfcounter->tinc(l_librbd_discard_latency, elapsed); break;
81 case AIO_TYPE_FLUSH:
82 ictx->perfcounter->tinc(l_librbd_flush_latency, elapsed); break;
83 case AIO_TYPE_WRITESAME:
84 ictx->perfcounter->tinc(l_librbd_ws_latency, elapsed); break;
85 case AIO_TYPE_COMPARE_AND_WRITE:
86 ictx->perfcounter->tinc(l_librbd_cmp_latency, elapsed); break;
87 default:
88 lderr(cct) << "completed invalid aio_type: " << aio_type << dendl;
89 break;
90 }
91 }
92
93 if ((aio_type == AIO_TYPE_CLOSE) ||
94 (aio_type == AIO_TYPE_OPEN && r < 0)) {
95 // must destroy ImageCtx prior to invoking callback
96 delete ictx;
97 ictx = nullptr;
98 external_callback = false;
99 }
100
101 state = AIO_STATE_CALLBACK;
102 if (complete_cb) {
103 if (external_callback) {
104 complete_external_callback();
105 } else {
106 complete_cb(rbd_comp, complete_arg);
107 complete_event_socket();
108 }
109 } else {
110 complete_event_socket();
111 }
112 state = AIO_STATE_COMPLETE;
113
114 {
115 std::unique_lock<std::mutex> locker(lock);
116 cond.notify_all();
117 }
118
119 // note: possible for image to be closed after op marked finished
120 if (async_op.started()) {
121 async_op.finish_op();
122 }
123 tracepoint(librbd, aio_complete_exit);
124 }
125
126 void AioCompletion::init_time(ImageCtx *i, aio_type_t t) {
127 if (ictx == nullptr) {
128 ictx = i;
129 aio_type = t;
130 start_time = coarse_mono_clock::now();
131 }
132 }
133
134 void AioCompletion::start_op() {
135 ceph_assert(ictx != nullptr);
136
137 if (aio_type == AIO_TYPE_OPEN || aio_type == AIO_TYPE_CLOSE) {
138 // no need to track async open/close operations
139 return;
140 }
141
142 ceph_assert(!async_op.started());
143 async_op.start_op(*ictx);
144 }
145
146 void AioCompletion::queue_complete() {
147 uint32_t zero = 0;
148 pending_count.compare_exchange_strong(zero, 1);
149 ceph_assert(zero == 0);
150
151 // ensure completion fires in clean lock context
152 ictx->op_work_queue->queue(new C_AioRequest(this), 0);
153 }
154
155 void AioCompletion::block(CephContext* cct) {
156 ldout(cct, 20) << dendl;
157 ceph_assert(!was_armed);
158
159 get();
160 ++pending_count;
161 }
162
163 void AioCompletion::unblock(CephContext* cct) {
164 ldout(cct, 20) << dendl;
165 ceph_assert(was_armed);
166
167 uint32_t previous_pending_count = pending_count--;
168 ceph_assert(previous_pending_count > 0);
169
170 if (previous_pending_count == 1) {
171 queue_complete();
172 }
173 put();
174 }
175
176 void AioCompletion::fail(int r)
177 {
178 ceph_assert(ictx != nullptr);
179 CephContext *cct = ictx->cct;
180 lderr(cct) << cpp_strerror(r) << dendl;
181
182 ceph_assert(!was_armed);
183 was_armed = true;
184
185 error_rval = r;
186
187 uint32_t previous_pending_count = pending_count.load();
188 if (previous_pending_count == 0) {
189 queue_complete();
190 }
191 }
192
193 void AioCompletion::set_request_count(uint32_t count) {
194 ceph_assert(ictx != nullptr);
195 CephContext *cct = ictx->cct;
196
197 ceph_assert(!was_armed);
198 was_armed = true;
199
200 ldout(cct, 20) << "pending=" << count << dendl;
201 uint32_t previous_pending_count = pending_count.fetch_add(count);
202 if (previous_pending_count == 0 && count == 0) {
203 queue_complete();
204 }
205 }
206
207 void AioCompletion::complete_request(ssize_t r)
208 {
209 uint32_t previous_pending_count = pending_count--;
210 ceph_assert(previous_pending_count > 0);
211 auto pending_count = previous_pending_count - 1;
212
213 ceph_assert(ictx != nullptr);
214 CephContext *cct = ictx->cct;
215
216 if (r > 0) {
217 rval += r;
218 } else if (r != -EEXIST) {
219 // might race w/ another thread setting an error code but
220 // first one wins
221 int zero = 0;
222 error_rval.compare_exchange_strong(zero, r);
223 }
224
225 ldout(cct, 20) << "cb=" << complete_cb << ", "
226 << "pending=" << pending_count << dendl;
227 if (pending_count == 0) {
228 finalize();
229 complete();
230 }
231 put();
232 }
233
234 bool AioCompletion::is_complete() {
235 tracepoint(librbd, aio_is_complete_enter, this);
236 bool done = (this->state != AIO_STATE_PENDING);
237 tracepoint(librbd, aio_is_complete_exit, done);
238 return done;
239 }
240
241 ssize_t AioCompletion::get_return_value() {
242 tracepoint(librbd, aio_get_return_value_enter, this);
243 ssize_t r = rval;
244 tracepoint(librbd, aio_get_return_value_exit, r);
245 return r;
246 }
247
248 void AioCompletion::complete_external_callback() {
249 // ensure librbd external users never experience concurrent callbacks
250 // from multiple librbd-internal threads.
251 ictx->external_callback_completions.push(this);
252
253 while (true) {
254 if (ictx->external_callback_in_progress.exchange(true)) {
255 // another thread is concurrently invoking external callbacks
256 break;
257 }
258
259 AioCompletion* aio_comp;
260 while (ictx->external_callback_completions.pop(aio_comp)) {
261 aio_comp->complete_cb(aio_comp->rbd_comp, aio_comp->complete_arg);
262 aio_comp->complete_event_socket();
263 }
264
265 ictx->external_callback_in_progress.store(false);
266 if (ictx->external_callback_completions.empty()) {
267 // queue still empty implies we didn't have a race between the last failed
268 // pop and resetting the in-progress state
269 break;
270 }
271 }
272 }
273
274 void AioCompletion::complete_event_socket() {
275 if (ictx != nullptr && event_notify && ictx->event_socket.is_valid()) {
276 ictx->event_socket_completions.push(this);
277 ictx->event_socket.notify();
278 }
279 }
280
281 } // namespace io
282 } // namespace librbd