]>
git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/io/ImageRequestWQ.cc
2758790eb087a87f2d44a49fb495056d879eb35c
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/io/ImageRequestWQ.h"
5 #include "common/errno.h"
6 #include "common/zipkin_trace.h"
7 #include "librbd/ExclusiveLock.h"
8 #include "librbd/ImageCtx.h"
9 #include "librbd/ImageState.h"
10 #include "librbd/internal.h"
11 #include "librbd/Utils.h"
12 #include "librbd/exclusive_lock/Policy.h"
13 #include "librbd/io/AioCompletion.h"
14 #include "librbd/io/ImageRequest.h"
16 #define dout_subsys ceph_subsys_rbd
18 #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
19 << " " << __func__ << ": "
24 ImageRequestWQ::ImageRequestWQ(ImageCtx
*image_ctx
, const string
&name
,
25 time_t ti
, ThreadPool
*tp
)
26 : ThreadPool::PointerWQ
<ImageRequest
<> >(name
, ti
, 0, tp
),
27 m_image_ctx(*image_ctx
),
28 m_lock(util::unique_lock_name("ImageRequestWQ::m_lock", this)),
29 m_write_blockers(0), m_in_progress_writes(0), m_queued_reads(0),
30 m_queued_writes(0), m_in_flight_ops(0), m_refresh_in_progress(false),
31 m_shutdown(false), m_on_shutdown(nullptr) {
32 CephContext
*cct
= m_image_ctx
.cct
;
33 ldout(cct
, 5) << "ictx=" << image_ctx
<< dendl
;
34 tp
->add_work_queue(this);
37 ssize_t
ImageRequestWQ::read(uint64_t off
, uint64_t len
,
38 ReadResult
&&read_result
, int op_flags
) {
39 CephContext
*cct
= m_image_ctx
.cct
;
40 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
41 << "len = " << len
<< dendl
;
44 AioCompletion
*c
= AioCompletion::create(&cond
);
45 aio_read(c
, off
, len
, std::move(read_result
), op_flags
, false);
49 ssize_t
ImageRequestWQ::write(uint64_t off
, uint64_t len
,
50 bufferlist
&&bl
, int op_flags
) {
51 CephContext
*cct
= m_image_ctx
.cct
;
52 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
53 << "len = " << len
<< dendl
;
55 m_image_ctx
.snap_lock
.get_read();
56 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
57 m_image_ctx
.snap_lock
.put_read();
59 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
64 AioCompletion
*c
= AioCompletion::create(&cond
);
65 aio_write(c
, off
, len
, std::move(bl
), op_flags
, false);
74 ssize_t
ImageRequestWQ::discard(uint64_t off
, uint64_t len
, bool skip_partial_discard
) {
75 CephContext
*cct
= m_image_ctx
.cct
;
76 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
77 << "len = " << len
<< dendl
;
79 m_image_ctx
.snap_lock
.get_read();
80 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
81 m_image_ctx
.snap_lock
.put_read();
83 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
88 AioCompletion
*c
= AioCompletion::create(&cond
);
89 aio_discard(c
, off
, len
, skip_partial_discard
, false);
98 ssize_t
ImageRequestWQ::writesame(uint64_t off
, uint64_t len
, bufferlist
&&bl
,
100 CephContext
*cct
= m_image_ctx
.cct
;
101 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
102 << "len = " << len
<< ", data_len " << bl
.length() << dendl
;
104 m_image_ctx
.snap_lock
.get_read();
105 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
106 m_image_ctx
.snap_lock
.put_read();
108 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
113 AioCompletion
*c
= AioCompletion::create(&cond
);
114 aio_writesame(c
, off
, len
, std::move(bl
), op_flags
, false);
123 void ImageRequestWQ::aio_read(AioCompletion
*c
, uint64_t off
, uint64_t len
,
124 ReadResult
&&read_result
, int op_flags
,
126 CephContext
*cct
= m_image_ctx
.cct
;
127 ZTracer::Trace trace
;
128 if (cct
->_conf
->rbd_blkin_trace_all
) {
129 trace
.init("wq: read", &m_image_ctx
.trace_endpoint
);
130 trace
.event("start");
133 c
->init_time(&m_image_ctx
, AIO_TYPE_READ
);
134 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
135 << "completion=" << c
<< ", off=" << off
<< ", "
136 << "len=" << len
<< ", " << "flags=" << op_flags
<< dendl
;
138 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
139 c
->set_event_notify(true);
142 if (!start_in_flight_op(c
)) {
146 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
148 // if journaling is enabled -- we need to replay the journal because
149 // it might contain an uncommitted write
152 RWLock::RLocker
locker(m_lock
);
153 lock_required
= m_require_lock_on_read
;
156 if (m_image_ctx
.non_blocking_aio
|| writes_blocked() || !writes_empty() ||
158 queue(new ImageReadRequest
<>(m_image_ctx
, c
, {{off
, len
}},
159 std::move(read_result
), op_flags
, trace
));
162 ImageRequest
<>::aio_read(&m_image_ctx
, c
, {{off
, len
}},
163 std::move(read_result
), op_flags
, trace
);
164 finish_in_flight_op();
166 trace
.event("finish");
169 void ImageRequestWQ::aio_write(AioCompletion
*c
, uint64_t off
, uint64_t len
,
170 bufferlist
&&bl
, int op_flags
,
172 CephContext
*cct
= m_image_ctx
.cct
;
173 ZTracer::Trace trace
;
174 if (cct
->_conf
->rbd_blkin_trace_all
) {
175 trace
.init("wq: write", &m_image_ctx
.trace_endpoint
);
179 c
->init_time(&m_image_ctx
, AIO_TYPE_WRITE
);
180 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
181 << "completion=" << c
<< ", off=" << off
<< ", "
182 << "len=" << len
<< ", flags=" << op_flags
<< dendl
;
184 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
185 c
->set_event_notify(true);
188 if (!start_in_flight_op(c
)) {
192 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
193 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
194 queue(new ImageWriteRequest
<>(m_image_ctx
, c
, {{off
, len
}},
195 std::move(bl
), op_flags
, trace
));
198 ImageRequest
<>::aio_write(&m_image_ctx
, c
, {{off
, len
}},
199 std::move(bl
), op_flags
, trace
);
200 finish_in_flight_op();
202 trace
.event("finish");
205 void ImageRequestWQ::aio_discard(AioCompletion
*c
, uint64_t off
,
206 uint64_t len
, bool skip_partial_discard
,
208 CephContext
*cct
= m_image_ctx
.cct
;
209 ZTracer::Trace trace
;
210 if (cct
->_conf
->rbd_blkin_trace_all
) {
211 trace
.init("wq: discard", &m_image_ctx
.trace_endpoint
);
215 c
->init_time(&m_image_ctx
, AIO_TYPE_DISCARD
);
216 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
217 << "completion=" << c
<< ", off=" << off
<< ", len=" << len
220 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
221 c
->set_event_notify(true);
224 if (!start_in_flight_op(c
)) {
228 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
229 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
230 queue(new ImageDiscardRequest
<>(m_image_ctx
, c
, off
, len
,
231 skip_partial_discard
, trace
));
234 ImageRequest
<>::aio_discard(&m_image_ctx
, c
, off
, len
,
235 skip_partial_discard
, trace
);
236 finish_in_flight_op();
238 trace
.event("finish");
241 void ImageRequestWQ::aio_flush(AioCompletion
*c
, bool native_async
) {
242 CephContext
*cct
= m_image_ctx
.cct
;
243 ZTracer::Trace trace
;
244 if (cct
->_conf
->rbd_blkin_trace_all
) {
245 trace
.init("wq: flush", &m_image_ctx
.trace_endpoint
);
249 c
->init_time(&m_image_ctx
, AIO_TYPE_FLUSH
);
250 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
251 << "completion=" << c
<< dendl
;
253 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
254 c
->set_event_notify(true);
257 if (!start_in_flight_op(c
)) {
261 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
262 if (m_image_ctx
.non_blocking_aio
|| writes_blocked() || !writes_empty()) {
263 queue(new ImageFlushRequest
<>(m_image_ctx
, c
, trace
));
265 ImageRequest
<>::aio_flush(&m_image_ctx
, c
, trace
);
266 finish_in_flight_op();
268 trace
.event("finish");
271 void ImageRequestWQ::aio_writesame(AioCompletion
*c
, uint64_t off
, uint64_t len
,
272 bufferlist
&&bl
, int op_flags
,
274 CephContext
*cct
= m_image_ctx
.cct
;
275 ZTracer::Trace trace
;
276 if (cct
->_conf
->rbd_blkin_trace_all
) {
277 trace
.init("wq: writesame", &m_image_ctx
.trace_endpoint
);
281 c
->init_time(&m_image_ctx
, AIO_TYPE_WRITESAME
);
282 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
283 << "completion=" << c
<< ", off=" << off
<< ", "
284 << "len=" << len
<< ", data_len = " << bl
.length() << ", "
285 << "flags=" << op_flags
<< dendl
;
287 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
288 c
->set_event_notify(true);
291 if (!start_in_flight_op(c
)) {
295 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
296 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
297 queue(new ImageWriteSameRequest
<>(m_image_ctx
, c
, off
, len
, std::move(bl
),
301 ImageRequest
<>::aio_writesame(&m_image_ctx
, c
, off
, len
, std::move(bl
),
303 finish_in_flight_op();
305 trace
.event("finish");
308 void ImageRequestWQ::shut_down(Context
*on_shutdown
) {
309 assert(m_image_ctx
.owner_lock
.is_locked());
312 RWLock::WLocker
locker(m_lock
);
316 CephContext
*cct
= m_image_ctx
.cct
;
317 ldout(cct
, 5) << __func__
<< ": in_flight=" << m_in_flight_ops
.load()
319 if (m_in_flight_ops
> 0) {
320 m_on_shutdown
= on_shutdown
;
325 // ensure that all in-flight IO is flushed
326 m_image_ctx
.flush(on_shutdown
);
329 bool ImageRequestWQ::is_lock_request_needed() const {
330 RWLock::RLocker
locker(m_lock
);
331 return (m_queued_writes
> 0 ||
332 (m_require_lock_on_read
&& m_queued_reads
> 0));
335 int ImageRequestWQ::block_writes() {
336 C_SaferCond cond_ctx
;
337 block_writes(&cond_ctx
);
338 return cond_ctx
.wait();
341 void ImageRequestWQ::block_writes(Context
*on_blocked
) {
342 assert(m_image_ctx
.owner_lock
.is_locked());
343 CephContext
*cct
= m_image_ctx
.cct
;
346 RWLock::WLocker
locker(m_lock
);
348 ldout(cct
, 5) << &m_image_ctx
<< ", " << "num="
349 << m_write_blockers
<< dendl
;
350 if (!m_write_blocker_contexts
.empty() || m_in_progress_writes
> 0) {
351 m_write_blocker_contexts
.push_back(on_blocked
);
356 // ensure that all in-flight IO is flushed
357 m_image_ctx
.flush(on_blocked
);
360 void ImageRequestWQ::unblock_writes() {
361 CephContext
*cct
= m_image_ctx
.cct
;
363 bool wake_up
= false;
365 RWLock::WLocker
locker(m_lock
);
366 assert(m_write_blockers
> 0);
369 ldout(cct
, 5) << &m_image_ctx
<< ", " << "num="
370 << m_write_blockers
<< dendl
;
371 if (m_write_blockers
== 0) {
381 void ImageRequestWQ::set_require_lock_on_read() {
382 CephContext
*cct
= m_image_ctx
.cct
;
383 ldout(cct
, 20) << dendl
;
385 RWLock::WLocker
locker(m_lock
);
386 m_require_lock_on_read
= true;
389 void ImageRequestWQ::clear_require_lock_on_read() {
390 CephContext
*cct
= m_image_ctx
.cct
;
391 ldout(cct
, 20) << dendl
;
394 RWLock::WLocker
locker(m_lock
);
395 if (!m_require_lock_on_read
) {
399 m_require_lock_on_read
= false;
404 void *ImageRequestWQ::_void_dequeue() {
405 ImageRequest
<> *peek_item
= front();
407 // no IO ops available or refresh in-progress (IO stalled)
408 if (peek_item
== nullptr || m_refresh_in_progress
) {
412 bool refresh_required
= m_image_ctx
.state
->is_refresh_required();
414 RWLock::RLocker
locker(m_lock
);
415 if (peek_item
->is_write_op()) {
416 if (m_write_blockers
> 0) {
420 // refresh will requeue the op -- don't count it as in-progress
421 if (!refresh_required
) {
422 m_in_progress_writes
++;
424 } else if (m_require_lock_on_read
) {
429 ImageRequest
<> *item
= reinterpret_cast<ImageRequest
<> *>(
430 ThreadPool::PointerWQ
<ImageRequest
<> >::_void_dequeue());
431 assert(peek_item
== item
);
433 if (refresh_required
) {
434 ldout(m_image_ctx
.cct
, 15) << "image refresh required: delaying IO " << item
437 // stall IO until the refresh completes
438 m_refresh_in_progress
= true;
440 get_pool_lock().Unlock();
441 m_image_ctx
.state
->refresh(new C_RefreshFinish(this, item
));
442 get_pool_lock().Lock();
450 void ImageRequestWQ::process(ImageRequest
<> *req
) {
451 CephContext
*cct
= m_image_ctx
.cct
;
452 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
453 << "req=" << req
<< dendl
;
457 finish_queued_op(req
);
458 if (req
->is_write_op()) {
459 finish_in_progress_write();
463 finish_in_flight_op();
466 void ImageRequestWQ::finish_queued_op(ImageRequest
<> *req
) {
467 RWLock::RLocker
locker(m_lock
);
468 if (req
->is_write_op()) {
469 assert(m_queued_writes
> 0);
472 assert(m_queued_reads
> 0);
477 void ImageRequestWQ::finish_in_progress_write() {
478 bool writes_blocked
= false;
480 RWLock::RLocker
locker(m_lock
);
481 assert(m_in_progress_writes
> 0);
482 if (--m_in_progress_writes
== 0 &&
483 !m_write_blocker_contexts
.empty()) {
484 writes_blocked
= true;
488 if (writes_blocked
) {
489 m_image_ctx
.flush(new C_BlockedWrites(this));
493 int ImageRequestWQ::start_in_flight_op(AioCompletion
*c
) {
494 RWLock::RLocker
locker(m_lock
);
497 CephContext
*cct
= m_image_ctx
.cct
;
498 lderr(cct
) << "IO received on closed image" << dendl
;
508 void ImageRequestWQ::finish_in_flight_op() {
509 Context
*on_shutdown
;
511 RWLock::RLocker
locker(m_lock
);
512 if (--m_in_flight_ops
> 0 || !m_shutdown
) {
515 on_shutdown
= m_on_shutdown
;
518 CephContext
*cct
= m_image_ctx
.cct
;
519 ldout(cct
, 5) << "completing shut down" << dendl
;
521 assert(on_shutdown
!= nullptr);
522 m_image_ctx
.flush(on_shutdown
);
525 bool ImageRequestWQ::is_lock_required() const {
526 assert(m_image_ctx
.owner_lock
.is_locked());
527 if (m_image_ctx
.exclusive_lock
== NULL
) {
531 return (!m_image_ctx
.exclusive_lock
->is_lock_owner());
534 void ImageRequestWQ::queue(ImageRequest
<> *req
) {
535 CephContext
*cct
= m_image_ctx
.cct
;
536 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
537 << "req=" << req
<< dendl
;
539 assert(m_image_ctx
.owner_lock
.is_locked());
540 bool write_op
= req
->is_write_op();
541 bool lock_required
= (m_image_ctx
.exclusive_lock
!= nullptr &&
542 ((write_op
&& is_lock_required()) ||
543 (!write_op
&& m_require_lock_on_read
)));
545 if (lock_required
&& !m_image_ctx
.get_exclusive_lock_policy()->may_auto_request_lock()) {
546 lderr(cct
) << "op requires exclusive lock" << dendl
;
549 finish_in_flight_op();
559 ThreadPool::PointerWQ
<ImageRequest
<> >::queue(req
);
562 m_image_ctx
.exclusive_lock
->acquire_lock(nullptr);
566 void ImageRequestWQ::handle_refreshed(int r
, ImageRequest
<> *req
) {
567 CephContext
*cct
= m_image_ctx
.cct
;
568 ldout(cct
, 15) << "resuming IO after image refresh: r=" << r
<< ", "
569 << "req=" << req
<< dendl
;
573 finish_queued_op(req
);
575 finish_in_flight_op();
577 // since IO was stalled for refresh -- original IO order is preserved
578 // if we requeue this op for work queue processing
582 m_refresh_in_progress
= false;
585 // refresh might have enabled exclusive lock -- IO stalled until
586 // we acquire the lock
587 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
588 if (is_lock_required() && is_lock_request_needed()) {
589 m_image_ctx
.exclusive_lock
->acquire_lock(nullptr);
593 void ImageRequestWQ::handle_blocked_writes(int r
) {
596 RWLock::WLocker
locker(m_lock
);
597 contexts
.swap(m_write_blocker_contexts
);
600 for (auto ctx
: contexts
) {
606 } // namespace librbd