]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/io/ImageRequestWQ.h" | |
5 | #include "common/errno.h" | |
31f18b77 | 6 | #include "common/zipkin_trace.h" |
7c673cae FG |
7 | #include "librbd/ExclusiveLock.h" |
8 | #include "librbd/ImageCtx.h" | |
9 | #include "librbd/ImageState.h" | |
10 | #include "librbd/internal.h" | |
11 | #include "librbd/Utils.h" | |
12 | #include "librbd/exclusive_lock/Policy.h" | |
13 | #include "librbd/io/AioCompletion.h" | |
14 | #include "librbd/io/ImageRequest.h" | |
15 | ||
16 | #define dout_subsys ceph_subsys_rbd | |
17 | #undef dout_prefix | |
18 | #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \ | |
19 | << " " << __func__ << ": " | |
20 | ||
21 | namespace librbd { | |
22 | namespace io { | |
23 | ||
224ce89b WB |
24 | template <typename I> |
25 | struct ImageRequestWQ<I>::C_AcquireLock : public Context { | |
26 | ImageRequestWQ *work_queue; | |
27 | ImageRequest<I> *image_request; | |
28 | ||
29 | C_AcquireLock(ImageRequestWQ *work_queue, ImageRequest<I> *image_request) | |
30 | : work_queue(work_queue), image_request(image_request) { | |
31 | } | |
32 | ||
33 | void finish(int r) override { | |
34 | work_queue->handle_acquire_lock(r, image_request); | |
35 | } | |
36 | }; | |
37 | ||
38 | template <typename I> | |
39 | struct ImageRequestWQ<I>::C_BlockedWrites : public Context { | |
40 | ImageRequestWQ *work_queue; | |
41 | C_BlockedWrites(ImageRequestWQ *_work_queue) | |
42 | : work_queue(_work_queue) { | |
43 | } | |
44 | ||
45 | void finish(int r) override { | |
46 | work_queue->handle_blocked_writes(r); | |
47 | } | |
48 | }; | |
49 | ||
50 | template <typename I> | |
51 | struct ImageRequestWQ<I>::C_RefreshFinish : public Context { | |
52 | ImageRequestWQ *work_queue; | |
53 | ImageRequest<I> *image_request; | |
54 | ||
55 | C_RefreshFinish(ImageRequestWQ *work_queue, | |
56 | ImageRequest<I> *image_request) | |
57 | : work_queue(work_queue), image_request(image_request) { | |
58 | } | |
59 | void finish(int r) override { | |
60 | work_queue->handle_refreshed(r, image_request); | |
61 | } | |
62 | }; | |
63 | ||
64 | template <typename I> | |
65 | ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name, | |
66 | time_t ti, ThreadPool *tp) | |
67 | : ThreadPool::PointerWQ<ImageRequest<I> >(name, ti, 0, tp), | |
7c673cae | 68 | m_image_ctx(*image_ctx), |
224ce89b | 69 | m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) { |
7c673cae FG |
70 | CephContext *cct = m_image_ctx.cct; |
71 | ldout(cct, 5) << "ictx=" << image_ctx << dendl; | |
224ce89b | 72 | this->register_work_queue(); |
7c673cae FG |
73 | } |
74 | ||
224ce89b WB |
75 | template <typename I> |
76 | ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len, | |
77 | ReadResult &&read_result, int op_flags) { | |
7c673cae FG |
78 | CephContext *cct = m_image_ctx.cct; |
79 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
80 | << "len = " << len << dendl; | |
81 | ||
82 | C_SaferCond cond; | |
83 | AioCompletion *c = AioCompletion::create(&cond); | |
84 | aio_read(c, off, len, std::move(read_result), op_flags, false); | |
85 | return cond.wait(); | |
86 | } | |
87 | ||
224ce89b WB |
88 | template <typename I> |
89 | ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len, | |
90 | bufferlist &&bl, int op_flags) { | |
7c673cae FG |
91 | CephContext *cct = m_image_ctx.cct; |
92 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
93 | << "len = " << len << dendl; | |
94 | ||
95 | m_image_ctx.snap_lock.get_read(); | |
96 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
97 | m_image_ctx.snap_lock.put_read(); | |
98 | if (r < 0) { | |
99 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
100 | return r; | |
101 | } | |
102 | ||
103 | C_SaferCond cond; | |
104 | AioCompletion *c = AioCompletion::create(&cond); | |
105 | aio_write(c, off, len, std::move(bl), op_flags, false); | |
106 | ||
107 | r = cond.wait(); | |
108 | if (r < 0) { | |
109 | return r; | |
110 | } | |
111 | return len; | |
112 | } | |
113 | ||
224ce89b WB |
114 | template <typename I> |
115 | ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len, | |
116 | bool skip_partial_discard) { | |
7c673cae FG |
117 | CephContext *cct = m_image_ctx.cct; |
118 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
119 | << "len = " << len << dendl; | |
120 | ||
121 | m_image_ctx.snap_lock.get_read(); | |
122 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
123 | m_image_ctx.snap_lock.put_read(); | |
124 | if (r < 0) { | |
125 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
126 | return r; | |
127 | } | |
128 | ||
129 | C_SaferCond cond; | |
130 | AioCompletion *c = AioCompletion::create(&cond); | |
131 | aio_discard(c, off, len, skip_partial_discard, false); | |
132 | ||
133 | r = cond.wait(); | |
134 | if (r < 0) { | |
135 | return r; | |
136 | } | |
137 | return len; | |
138 | } | |
139 | ||
224ce89b WB |
140 | template <typename I> |
141 | ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len, | |
142 | bufferlist &&bl, int op_flags) { | |
7c673cae FG |
143 | CephContext *cct = m_image_ctx.cct; |
144 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
145 | << "len = " << len << ", data_len " << bl.length() << dendl; | |
146 | ||
147 | m_image_ctx.snap_lock.get_read(); | |
148 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
149 | m_image_ctx.snap_lock.put_read(); | |
150 | if (r < 0) { | |
151 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
152 | return r; | |
153 | } | |
154 | ||
155 | C_SaferCond cond; | |
156 | AioCompletion *c = AioCompletion::create(&cond); | |
157 | aio_writesame(c, off, len, std::move(bl), op_flags, false); | |
158 | ||
159 | r = cond.wait(); | |
160 | if (r < 0) { | |
161 | return r; | |
162 | } | |
163 | return len; | |
164 | } | |
165 | ||
224ce89b WB |
166 | template <typename I> |
167 | void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len, | |
168 | ReadResult &&read_result, int op_flags, | |
169 | bool native_async) { | |
7c673cae | 170 | CephContext *cct = m_image_ctx.cct; |
31f18b77 FG |
171 | ZTracer::Trace trace; |
172 | if (cct->_conf->rbd_blkin_trace_all) { | |
173 | trace.init("wq: read", &m_image_ctx.trace_endpoint); | |
174 | trace.event("start"); | |
175 | } | |
176 | ||
224ce89b | 177 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ); |
7c673cae FG |
178 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
179 | << "completion=" << c << ", off=" << off << ", " | |
180 | << "len=" << len << ", " << "flags=" << op_flags << dendl; | |
181 | ||
182 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
183 | c->set_event_notify(true); | |
184 | } | |
185 | ||
224ce89b | 186 | if (!start_in_flight_io(c)) { |
7c673cae FG |
187 | return; |
188 | } | |
189 | ||
7c673cae FG |
190 | // if journaling is enabled -- we need to replay the journal because |
191 | // it might contain an uncommitted write | |
224ce89b | 192 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); |
7c673cae | 193 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() || |
224ce89b WB |
194 | require_lock_on_read()) { |
195 | queue(ImageRequest<I>::create_read_request( | |
196 | m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags, | |
197 | trace)); | |
7c673cae FG |
198 | } else { |
199 | c->start_op(); | |
224ce89b WB |
200 | ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}}, |
201 | std::move(read_result), op_flags, trace); | |
202 | finish_in_flight_io(); | |
7c673cae | 203 | } |
31f18b77 | 204 | trace.event("finish"); |
7c673cae FG |
205 | } |
206 | ||
224ce89b WB |
207 | template <typename I> |
208 | void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len, | |
209 | bufferlist &&bl, int op_flags, | |
210 | bool native_async) { | |
7c673cae | 211 | CephContext *cct = m_image_ctx.cct; |
31f18b77 FG |
212 | ZTracer::Trace trace; |
213 | if (cct->_conf->rbd_blkin_trace_all) { | |
214 | trace.init("wq: write", &m_image_ctx.trace_endpoint); | |
215 | trace.event("init"); | |
216 | } | |
217 | ||
224ce89b | 218 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE); |
7c673cae FG |
219 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
220 | << "completion=" << c << ", off=" << off << ", " | |
221 | << "len=" << len << ", flags=" << op_flags << dendl; | |
222 | ||
223 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
224 | c->set_event_notify(true); | |
225 | } | |
226 | ||
224ce89b | 227 | if (!start_in_flight_io(c)) { |
7c673cae FG |
228 | return; |
229 | } | |
230 | ||
231 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
232 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
224ce89b WB |
233 | queue(ImageRequest<I>::create_write_request( |
234 | m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace)); | |
7c673cae FG |
235 | } else { |
236 | c->start_op(); | |
224ce89b WB |
237 | ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}}, |
238 | std::move(bl), op_flags, trace); | |
239 | finish_in_flight_io(); | |
7c673cae | 240 | } |
31f18b77 | 241 | trace.event("finish"); |
7c673cae FG |
242 | } |
243 | ||
224ce89b WB |
244 | template <typename I> |
245 | void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off, | |
246 | uint64_t len, bool skip_partial_discard, | |
247 | bool native_async) { | |
7c673cae | 248 | CephContext *cct = m_image_ctx.cct; |
31f18b77 FG |
249 | ZTracer::Trace trace; |
250 | if (cct->_conf->rbd_blkin_trace_all) { | |
251 | trace.init("wq: discard", &m_image_ctx.trace_endpoint); | |
252 | trace.event("init"); | |
253 | } | |
254 | ||
224ce89b | 255 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD); |
7c673cae FG |
256 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
257 | << "completion=" << c << ", off=" << off << ", len=" << len | |
258 | << dendl; | |
259 | ||
260 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
261 | c->set_event_notify(true); | |
262 | } | |
263 | ||
224ce89b | 264 | if (!start_in_flight_io(c)) { |
7c673cae FG |
265 | return; |
266 | } | |
267 | ||
268 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
269 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
224ce89b WB |
270 | queue(ImageRequest<I>::create_discard_request( |
271 | m_image_ctx, c, off, len, skip_partial_discard, trace)); | |
7c673cae FG |
272 | } else { |
273 | c->start_op(); | |
224ce89b WB |
274 | ImageRequest<I>::aio_discard(&m_image_ctx, c, off, len, |
275 | skip_partial_discard, trace); | |
276 | finish_in_flight_io(); | |
7c673cae | 277 | } |
31f18b77 | 278 | trace.event("finish"); |
7c673cae FG |
279 | } |
280 | ||
224ce89b WB |
281 | template <typename I> |
282 | void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) { | |
7c673cae | 283 | CephContext *cct = m_image_ctx.cct; |
31f18b77 FG |
284 | ZTracer::Trace trace; |
285 | if (cct->_conf->rbd_blkin_trace_all) { | |
286 | trace.init("wq: flush", &m_image_ctx.trace_endpoint); | |
287 | trace.event("init"); | |
288 | } | |
289 | ||
224ce89b | 290 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH); |
7c673cae FG |
291 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
292 | << "completion=" << c << dendl; | |
293 | ||
294 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
295 | c->set_event_notify(true); | |
296 | } | |
297 | ||
224ce89b | 298 | if (!start_in_flight_io(c)) { |
7c673cae FG |
299 | return; |
300 | } | |
301 | ||
302 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
303 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { | |
224ce89b | 304 | queue(ImageRequest<I>::create_flush_request(m_image_ctx, c, trace)); |
7c673cae | 305 | } else { |
224ce89b WB |
306 | ImageRequest<I>::aio_flush(&m_image_ctx, c, trace); |
307 | finish_in_flight_io(); | |
7c673cae | 308 | } |
31f18b77 | 309 | trace.event("finish"); |
7c673cae FG |
310 | } |
311 | ||
224ce89b WB |
312 | template <typename I> |
313 | void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off, | |
314 | uint64_t len, bufferlist &&bl, | |
315 | int op_flags, bool native_async) { | |
7c673cae | 316 | CephContext *cct = m_image_ctx.cct; |
31f18b77 FG |
317 | ZTracer::Trace trace; |
318 | if (cct->_conf->rbd_blkin_trace_all) { | |
319 | trace.init("wq: writesame", &m_image_ctx.trace_endpoint); | |
320 | trace.event("init"); | |
321 | } | |
322 | ||
224ce89b | 323 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME); |
7c673cae FG |
324 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
325 | << "completion=" << c << ", off=" << off << ", " | |
326 | << "len=" << len << ", data_len = " << bl.length() << ", " | |
327 | << "flags=" << op_flags << dendl; | |
328 | ||
329 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
330 | c->set_event_notify(true); | |
331 | } | |
332 | ||
224ce89b | 333 | if (!start_in_flight_io(c)) { |
7c673cae FG |
334 | return; |
335 | } | |
336 | ||
337 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
338 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
224ce89b WB |
339 | queue(ImageRequest<I>::create_writesame_request( |
340 | m_image_ctx, c, off, len, std::move(bl), op_flags, trace)); | |
7c673cae FG |
341 | } else { |
342 | c->start_op(); | |
224ce89b WB |
343 | ImageRequest<I>::aio_writesame(&m_image_ctx, c, off, len, std::move(bl), |
344 | op_flags, trace); | |
345 | finish_in_flight_io(); | |
7c673cae | 346 | } |
31f18b77 | 347 | trace.event("finish"); |
7c673cae FG |
348 | } |
349 | ||
224ce89b WB |
350 | template <typename I> |
351 | void ImageRequestWQ<I>::shut_down(Context *on_shutdown) { | |
7c673cae FG |
352 | assert(m_image_ctx.owner_lock.is_locked()); |
353 | ||
354 | { | |
355 | RWLock::WLocker locker(m_lock); | |
356 | assert(!m_shutdown); | |
357 | m_shutdown = true; | |
358 | ||
359 | CephContext *cct = m_image_ctx.cct; | |
224ce89b | 360 | ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load() |
7c673cae | 361 | << dendl; |
224ce89b | 362 | if (m_in_flight_ios > 0) { |
7c673cae FG |
363 | m_on_shutdown = on_shutdown; |
364 | return; | |
365 | } | |
366 | } | |
367 | ||
368 | // ensure that all in-flight IO is flushed | |
369 | m_image_ctx.flush(on_shutdown); | |
370 | } | |
371 | ||
224ce89b WB |
372 | template <typename I> |
373 | int ImageRequestWQ<I>::block_writes() { | |
7c673cae FG |
374 | C_SaferCond cond_ctx; |
375 | block_writes(&cond_ctx); | |
376 | return cond_ctx.wait(); | |
377 | } | |
378 | ||
224ce89b WB |
379 | template <typename I> |
380 | void ImageRequestWQ<I>::block_writes(Context *on_blocked) { | |
7c673cae FG |
381 | assert(m_image_ctx.owner_lock.is_locked()); |
382 | CephContext *cct = m_image_ctx.cct; | |
383 | ||
384 | { | |
385 | RWLock::WLocker locker(m_lock); | |
386 | ++m_write_blockers; | |
387 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
388 | << m_write_blockers << dendl; | |
224ce89b | 389 | if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) { |
7c673cae FG |
390 | m_write_blocker_contexts.push_back(on_blocked); |
391 | return; | |
392 | } | |
393 | } | |
394 | ||
395 | // ensure that all in-flight IO is flushed | |
396 | m_image_ctx.flush(on_blocked); | |
397 | } | |
398 | ||
224ce89b WB |
399 | template <typename I> |
400 | void ImageRequestWQ<I>::unblock_writes() { | |
7c673cae FG |
401 | CephContext *cct = m_image_ctx.cct; |
402 | ||
403 | bool wake_up = false; | |
404 | { | |
405 | RWLock::WLocker locker(m_lock); | |
406 | assert(m_write_blockers > 0); | |
407 | --m_write_blockers; | |
408 | ||
409 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
410 | << m_write_blockers << dendl; | |
411 | if (m_write_blockers == 0) { | |
412 | wake_up = true; | |
413 | } | |
414 | } | |
415 | ||
416 | if (wake_up) { | |
224ce89b | 417 | this->signal(); |
7c673cae FG |
418 | } |
419 | } | |
420 | ||
224ce89b WB |
421 | template <typename I> |
422 | void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) { | |
7c673cae FG |
423 | CephContext *cct = m_image_ctx.cct; |
424 | ldout(cct, 20) << dendl; | |
425 | ||
224ce89b | 426 | bool wake_up = false; |
7c673cae FG |
427 | { |
428 | RWLock::WLocker locker(m_lock); | |
224ce89b WB |
429 | switch (direction) { |
430 | case DIRECTION_READ: | |
431 | wake_up = (enabled != m_require_lock_on_read); | |
432 | m_require_lock_on_read = enabled; | |
433 | break; | |
434 | case DIRECTION_WRITE: | |
435 | wake_up = (enabled != m_require_lock_on_write); | |
436 | m_require_lock_on_write = enabled; | |
437 | break; | |
438 | case DIRECTION_BOTH: | |
439 | wake_up = (enabled != m_require_lock_on_read || | |
440 | enabled != m_require_lock_on_write); | |
441 | m_require_lock_on_read = enabled; | |
442 | m_require_lock_on_write = enabled; | |
443 | break; | |
7c673cae | 444 | } |
224ce89b | 445 | } |
7c673cae | 446 | |
224ce89b WB |
447 | // wake up the thread pool whenever the state changes so that |
448 | // we can re-request the lock if required | |
449 | if (wake_up) { | |
450 | this->signal(); | |
7c673cae | 451 | } |
7c673cae FG |
452 | } |
453 | ||
224ce89b WB |
454 | template <typename I> |
455 | void *ImageRequestWQ<I>::_void_dequeue() { | |
456 | CephContext *cct = m_image_ctx.cct; | |
457 | ImageRequest<I> *peek_item = this->front(); | |
7c673cae | 458 | |
224ce89b WB |
459 | // no queued IO requests or all IO is blocked/stalled |
460 | if (peek_item == nullptr || m_io_blockers.load() > 0) { | |
7c673cae FG |
461 | return nullptr; |
462 | } | |
463 | ||
224ce89b | 464 | bool lock_required; |
7c673cae FG |
465 | bool refresh_required = m_image_ctx.state->is_refresh_required(); |
466 | { | |
467 | RWLock::RLocker locker(m_lock); | |
224ce89b WB |
468 | bool write_op = peek_item->is_write_op(); |
469 | lock_required = is_lock_required(write_op); | |
470 | if (write_op) { | |
471 | if (!lock_required && m_write_blockers > 0) { | |
472 | // missing lock is not the write blocker | |
7c673cae FG |
473 | return nullptr; |
474 | } | |
475 | ||
224ce89b WB |
476 | if (!lock_required && !refresh_required) { |
477 | // completed ops will requeue the IO -- don't count it as in-progress | |
478 | m_in_flight_writes++; | |
7c673cae | 479 | } |
7c673cae FG |
480 | } |
481 | } | |
482 | ||
224ce89b WB |
483 | ImageRequest<I> *item = reinterpret_cast<ImageRequest<I> *>( |
484 | ThreadPool::PointerWQ<ImageRequest<I> >::_void_dequeue()); | |
7c673cae FG |
485 | assert(peek_item == item); |
486 | ||
224ce89b WB |
487 | if (lock_required) { |
488 | this->get_pool_lock().Unlock(); | |
489 | m_image_ctx.owner_lock.get_read(); | |
490 | if (m_image_ctx.exclusive_lock != nullptr) { | |
491 | ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl; | |
492 | if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) { | |
493 | lderr(cct) << "op requires exclusive lock" << dendl; | |
494 | fail_in_flight_io(-EROFS, item); | |
495 | ||
496 | // wake up the IO since we won't be returning a request to process | |
497 | this->signal(); | |
498 | } else { | |
499 | // stall IO until the acquire completes | |
500 | ++m_io_blockers; | |
501 | m_image_ctx.exclusive_lock->acquire_lock(new C_AcquireLock(this, item)); | |
502 | } | |
503 | } else { | |
504 | // raced with the exclusive lock being disabled | |
505 | lock_required = false; | |
506 | } | |
507 | m_image_ctx.owner_lock.put_read(); | |
508 | this->get_pool_lock().Lock(); | |
509 | ||
510 | if (lock_required) { | |
511 | return nullptr; | |
512 | } | |
513 | } | |
514 | ||
7c673cae | 515 | if (refresh_required) { |
224ce89b | 516 | ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl; |
7c673cae FG |
517 | |
518 | // stall IO until the refresh completes | |
224ce89b | 519 | ++m_io_blockers; |
7c673cae | 520 | |
224ce89b | 521 | this->get_pool_lock().Unlock(); |
7c673cae | 522 | m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); |
224ce89b | 523 | this->get_pool_lock().Lock(); |
7c673cae FG |
524 | return nullptr; |
525 | } | |
526 | ||
527 | item->start_op(); | |
528 | return item; | |
529 | } | |
530 | ||
224ce89b WB |
531 | template <typename I> |
532 | void ImageRequestWQ<I>::process(ImageRequest<I> *req) { | |
7c673cae FG |
533 | CephContext *cct = m_image_ctx.cct; |
534 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
535 | << "req=" << req << dendl; | |
536 | ||
537 | req->send(); | |
538 | ||
224ce89b | 539 | finish_queued_io(req); |
7c673cae | 540 | if (req->is_write_op()) { |
224ce89b | 541 | finish_in_flight_write(); |
7c673cae FG |
542 | } |
543 | delete req; | |
544 | ||
224ce89b | 545 | finish_in_flight_io(); |
7c673cae FG |
546 | } |
547 | ||
224ce89b WB |
548 | template <typename I> |
549 | void ImageRequestWQ<I>::finish_queued_io(ImageRequest<I> *req) { | |
7c673cae FG |
550 | RWLock::RLocker locker(m_lock); |
551 | if (req->is_write_op()) { | |
552 | assert(m_queued_writes > 0); | |
553 | m_queued_writes--; | |
554 | } else { | |
555 | assert(m_queued_reads > 0); | |
556 | m_queued_reads--; | |
557 | } | |
558 | } | |
559 | ||
224ce89b WB |
560 | template <typename I> |
561 | void ImageRequestWQ<I>::finish_in_flight_write() { | |
7c673cae FG |
562 | bool writes_blocked = false; |
563 | { | |
564 | RWLock::RLocker locker(m_lock); | |
224ce89b WB |
565 | assert(m_in_flight_writes > 0); |
566 | if (--m_in_flight_writes == 0 && | |
7c673cae FG |
567 | !m_write_blocker_contexts.empty()) { |
568 | writes_blocked = true; | |
569 | } | |
570 | } | |
571 | ||
572 | if (writes_blocked) { | |
573 | m_image_ctx.flush(new C_BlockedWrites(this)); | |
574 | } | |
575 | } | |
576 | ||
224ce89b WB |
577 | template <typename I> |
578 | int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) { | |
7c673cae FG |
579 | RWLock::RLocker locker(m_lock); |
580 | ||
581 | if (m_shutdown) { | |
582 | CephContext *cct = m_image_ctx.cct; | |
583 | lderr(cct) << "IO received on closed image" << dendl; | |
584 | ||
224ce89b | 585 | c->get(); |
7c673cae FG |
586 | c->fail(-ESHUTDOWN); |
587 | return false; | |
588 | } | |
589 | ||
224ce89b | 590 | m_in_flight_ios++; |
7c673cae FG |
591 | return true; |
592 | } | |
593 | ||
224ce89b WB |
594 | template <typename I> |
595 | void ImageRequestWQ<I>::finish_in_flight_io() { | |
7c673cae FG |
596 | Context *on_shutdown; |
597 | { | |
598 | RWLock::RLocker locker(m_lock); | |
224ce89b | 599 | if (--m_in_flight_ios > 0 || !m_shutdown) { |
7c673cae FG |
600 | return; |
601 | } | |
602 | on_shutdown = m_on_shutdown; | |
603 | } | |
604 | ||
605 | CephContext *cct = m_image_ctx.cct; | |
606 | ldout(cct, 5) << "completing shut down" << dendl; | |
607 | ||
608 | assert(on_shutdown != nullptr); | |
609 | m_image_ctx.flush(on_shutdown); | |
610 | } | |
611 | ||
224ce89b WB |
612 | template <typename I> |
613 | void ImageRequestWQ<I>::fail_in_flight_io(int r, ImageRequest<I> *req) { | |
614 | this->process_finish(); | |
615 | req->fail(r); | |
616 | finish_queued_io(req); | |
617 | delete req; | |
618 | finish_in_flight_io(); | |
619 | } | |
7c673cae | 620 | |
224ce89b WB |
621 | template <typename I> |
622 | bool ImageRequestWQ<I>::is_lock_required(bool write_op) const { | |
623 | assert(m_lock.is_locked()); | |
624 | return ((write_op && m_require_lock_on_write) || | |
625 | (!write_op && m_require_lock_on_read)); | |
7c673cae FG |
626 | } |
627 | ||
224ce89b WB |
628 | template <typename I> |
629 | void ImageRequestWQ<I>::queue(ImageRequest<I> *req) { | |
630 | assert(m_image_ctx.owner_lock.is_locked()); | |
631 | ||
7c673cae FG |
632 | CephContext *cct = m_image_ctx.cct; |
633 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
634 | << "req=" << req << dendl; | |
635 | ||
224ce89b | 636 | if (req->is_write_op()) { |
7c673cae FG |
637 | m_queued_writes++; |
638 | } else { | |
639 | m_queued_reads++; | |
640 | } | |
641 | ||
224ce89b WB |
642 | ThreadPool::PointerWQ<ImageRequest<I> >::queue(req); |
643 | } | |
7c673cae | 644 | |
224ce89b WB |
645 | template <typename I> |
646 | void ImageRequestWQ<I>::handle_acquire_lock(int r, ImageRequest<I> *req) { | |
647 | CephContext *cct = m_image_ctx.cct; | |
648 | ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl; | |
649 | ||
650 | if (r < 0) { | |
651 | fail_in_flight_io(r, req); | |
652 | } else { | |
653 | // since IO was stalled for acquire -- original IO order is preserved | |
654 | // if we requeue this op for work queue processing | |
655 | this->requeue(req); | |
7c673cae | 656 | } |
224ce89b WB |
657 | |
658 | assert(m_io_blockers.load() > 0); | |
659 | --m_io_blockers; | |
660 | this->signal(); | |
7c673cae FG |
661 | } |
662 | ||
224ce89b WB |
663 | template <typename I> |
664 | void ImageRequestWQ<I>::handle_refreshed(int r, ImageRequest<I> *req) { | |
7c673cae | 665 | CephContext *cct = m_image_ctx.cct; |
224ce89b WB |
666 | ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", " |
667 | << "req=" << req << dendl; | |
7c673cae | 668 | if (r < 0) { |
224ce89b | 669 | fail_in_flight_io(r, req); |
7c673cae FG |
670 | } else { |
671 | // since IO was stalled for refresh -- original IO order is preserved | |
672 | // if we requeue this op for work queue processing | |
224ce89b | 673 | this->requeue(req); |
7c673cae FG |
674 | } |
675 | ||
224ce89b WB |
676 | assert(m_io_blockers.load() > 0); |
677 | --m_io_blockers; | |
678 | this->signal(); | |
7c673cae FG |
679 | } |
680 | ||
224ce89b WB |
681 | template <typename I> |
682 | void ImageRequestWQ<I>::handle_blocked_writes(int r) { | |
7c673cae FG |
683 | Contexts contexts; |
684 | { | |
685 | RWLock::WLocker locker(m_lock); | |
686 | contexts.swap(m_write_blocker_contexts); | |
687 | } | |
688 | ||
689 | for (auto ctx : contexts) { | |
690 | ctx->complete(0); | |
691 | } | |
692 | } | |
693 | ||
224ce89b WB |
694 | template class librbd::io::ImageRequestWQ<librbd::ImageCtx>; |
695 | ||
7c673cae FG |
696 | } // namespace io |
697 | } // namespace librbd |