]>
git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/io/ImageRequestWQ.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/io/ImageRequestWQ.h"
5 #include "common/errno.h"
6 #include "common/zipkin_trace.h"
7 #include "librbd/ExclusiveLock.h"
8 #include "librbd/ImageCtx.h"
9 #include "librbd/ImageState.h"
10 #include "librbd/internal.h"
11 #include "librbd/Utils.h"
12 #include "librbd/exclusive_lock/Policy.h"
13 #include "librbd/io/AioCompletion.h"
14 #include "librbd/io/ImageRequest.h"
16 #define dout_subsys ceph_subsys_rbd
18 #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
19 << " " << __func__ << ": "
25 struct ImageRequestWQ
<I
>::C_AcquireLock
: public Context
{
26 ImageRequestWQ
*work_queue
;
27 ImageRequest
<I
> *image_request
;
29 C_AcquireLock(ImageRequestWQ
*work_queue
, ImageRequest
<I
> *image_request
)
30 : work_queue(work_queue
), image_request(image_request
) {
33 void finish(int r
) override
{
34 work_queue
->handle_acquire_lock(r
, image_request
);
39 struct ImageRequestWQ
<I
>::C_BlockedWrites
: public Context
{
40 ImageRequestWQ
*work_queue
;
41 C_BlockedWrites(ImageRequestWQ
*_work_queue
)
42 : work_queue(_work_queue
) {
45 void finish(int r
) override
{
46 work_queue
->handle_blocked_writes(r
);
51 struct ImageRequestWQ
<I
>::C_RefreshFinish
: public Context
{
52 ImageRequestWQ
*work_queue
;
53 ImageRequest
<I
> *image_request
;
55 C_RefreshFinish(ImageRequestWQ
*work_queue
,
56 ImageRequest
<I
> *image_request
)
57 : work_queue(work_queue
), image_request(image_request
) {
59 void finish(int r
) override
{
60 work_queue
->handle_refreshed(r
, image_request
);
65 ImageRequestWQ
<I
>::ImageRequestWQ(I
*image_ctx
, const string
&name
,
66 time_t ti
, ThreadPool
*tp
)
67 : ThreadPool::PointerWQ
<ImageRequest
<I
> >(name
, ti
, 0, tp
),
68 m_image_ctx(*image_ctx
),
69 m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) {
70 CephContext
*cct
= m_image_ctx
.cct
;
71 ldout(cct
, 5) << "ictx=" << image_ctx
<< dendl
;
72 this->register_work_queue();
76 ssize_t ImageRequestWQ
<I
>::read(uint64_t off
, uint64_t len
,
77 ReadResult
&&read_result
, int op_flags
) {
78 CephContext
*cct
= m_image_ctx
.cct
;
79 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
80 << "len = " << len
<< dendl
;
83 AioCompletion
*c
= AioCompletion::create(&cond
);
84 aio_read(c
, off
, len
, std::move(read_result
), op_flags
, false);
89 ssize_t ImageRequestWQ
<I
>::write(uint64_t off
, uint64_t len
,
90 bufferlist
&&bl
, int op_flags
) {
91 CephContext
*cct
= m_image_ctx
.cct
;
92 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
93 << "len = " << len
<< dendl
;
95 m_image_ctx
.snap_lock
.get_read();
96 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
97 m_image_ctx
.snap_lock
.put_read();
99 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
104 AioCompletion
*c
= AioCompletion::create(&cond
);
105 aio_write(c
, off
, len
, std::move(bl
), op_flags
, false);
114 template <typename I
>
115 ssize_t ImageRequestWQ
<I
>::discard(uint64_t off
, uint64_t len
,
116 bool skip_partial_discard
) {
117 CephContext
*cct
= m_image_ctx
.cct
;
118 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
119 << "len = " << len
<< dendl
;
121 m_image_ctx
.snap_lock
.get_read();
122 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
123 m_image_ctx
.snap_lock
.put_read();
125 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
130 AioCompletion
*c
= AioCompletion::create(&cond
);
131 aio_discard(c
, off
, len
, skip_partial_discard
, false);
140 template <typename I
>
141 ssize_t ImageRequestWQ
<I
>::writesame(uint64_t off
, uint64_t len
,
142 bufferlist
&&bl
, int op_flags
) {
143 CephContext
*cct
= m_image_ctx
.cct
;
144 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
145 << "len = " << len
<< ", data_len " << bl
.length() << dendl
;
147 m_image_ctx
.snap_lock
.get_read();
148 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
149 m_image_ctx
.snap_lock
.put_read();
151 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
156 AioCompletion
*c
= AioCompletion::create(&cond
);
157 aio_writesame(c
, off
, len
, std::move(bl
), op_flags
, false);
166 template <typename I
>
167 void ImageRequestWQ
<I
>::aio_read(AioCompletion
*c
, uint64_t off
, uint64_t len
,
168 ReadResult
&&read_result
, int op_flags
,
170 CephContext
*cct
= m_image_ctx
.cct
;
171 ZTracer::Trace trace
;
172 if (cct
->_conf
->rbd_blkin_trace_all
) {
173 trace
.init("wq: read", &m_image_ctx
.trace_endpoint
);
174 trace
.event("start");
177 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_READ
);
178 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
179 << "completion=" << c
<< ", off=" << off
<< ", "
180 << "len=" << len
<< ", " << "flags=" << op_flags
<< dendl
;
182 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
183 c
->set_event_notify(true);
186 if (!start_in_flight_io(c
)) {
190 // if journaling is enabled -- we need to replay the journal because
191 // it might contain an uncommitted write
192 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
193 if (m_image_ctx
.non_blocking_aio
|| writes_blocked() || !writes_empty() ||
194 require_lock_on_read()) {
195 queue(ImageRequest
<I
>::create_read_request(
196 m_image_ctx
, c
, {{off
, len
}}, std::move(read_result
), op_flags
,
200 ImageRequest
<I
>::aio_read(&m_image_ctx
, c
, {{off
, len
}},
201 std::move(read_result
), op_flags
, trace
);
202 finish_in_flight_io();
204 trace
.event("finish");
207 template <typename I
>
208 void ImageRequestWQ
<I
>::aio_write(AioCompletion
*c
, uint64_t off
, uint64_t len
,
209 bufferlist
&&bl
, int op_flags
,
211 CephContext
*cct
= m_image_ctx
.cct
;
212 ZTracer::Trace trace
;
213 if (cct
->_conf
->rbd_blkin_trace_all
) {
214 trace
.init("wq: write", &m_image_ctx
.trace_endpoint
);
218 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_WRITE
);
219 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
220 << "completion=" << c
<< ", off=" << off
<< ", "
221 << "len=" << len
<< ", flags=" << op_flags
<< dendl
;
223 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
224 c
->set_event_notify(true);
227 if (!start_in_flight_io(c
)) {
231 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
232 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
233 queue(ImageRequest
<I
>::create_write_request(
234 m_image_ctx
, c
, {{off
, len
}}, std::move(bl
), op_flags
, trace
));
237 ImageRequest
<I
>::aio_write(&m_image_ctx
, c
, {{off
, len
}},
238 std::move(bl
), op_flags
, trace
);
239 finish_in_flight_io();
241 trace
.event("finish");
244 template <typename I
>
245 void ImageRequestWQ
<I
>::aio_discard(AioCompletion
*c
, uint64_t off
,
246 uint64_t len
, bool skip_partial_discard
,
248 CephContext
*cct
= m_image_ctx
.cct
;
249 ZTracer::Trace trace
;
250 if (cct
->_conf
->rbd_blkin_trace_all
) {
251 trace
.init("wq: discard", &m_image_ctx
.trace_endpoint
);
255 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_DISCARD
);
256 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
257 << "completion=" << c
<< ", off=" << off
<< ", len=" << len
260 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
261 c
->set_event_notify(true);
264 if (!start_in_flight_io(c
)) {
268 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
269 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
270 queue(ImageRequest
<I
>::create_discard_request(
271 m_image_ctx
, c
, off
, len
, skip_partial_discard
, trace
));
274 ImageRequest
<I
>::aio_discard(&m_image_ctx
, c
, off
, len
,
275 skip_partial_discard
, trace
);
276 finish_in_flight_io();
278 trace
.event("finish");
281 template <typename I
>
282 void ImageRequestWQ
<I
>::aio_flush(AioCompletion
*c
, bool native_async
) {
283 CephContext
*cct
= m_image_ctx
.cct
;
284 ZTracer::Trace trace
;
285 if (cct
->_conf
->rbd_blkin_trace_all
) {
286 trace
.init("wq: flush", &m_image_ctx
.trace_endpoint
);
290 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_FLUSH
);
291 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
292 << "completion=" << c
<< dendl
;
294 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
295 c
->set_event_notify(true);
298 if (!start_in_flight_io(c
)) {
302 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
303 if (m_image_ctx
.non_blocking_aio
|| writes_blocked() || !writes_empty()) {
304 queue(ImageRequest
<I
>::create_flush_request(m_image_ctx
, c
, trace
));
306 ImageRequest
<I
>::aio_flush(&m_image_ctx
, c
, trace
);
307 finish_in_flight_io();
309 trace
.event("finish");
312 template <typename I
>
313 void ImageRequestWQ
<I
>::aio_writesame(AioCompletion
*c
, uint64_t off
,
314 uint64_t len
, bufferlist
&&bl
,
315 int op_flags
, bool native_async
) {
316 CephContext
*cct
= m_image_ctx
.cct
;
317 ZTracer::Trace trace
;
318 if (cct
->_conf
->rbd_blkin_trace_all
) {
319 trace
.init("wq: writesame", &m_image_ctx
.trace_endpoint
);
323 c
->init_time(util::get_image_ctx(&m_image_ctx
), AIO_TYPE_WRITESAME
);
324 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
325 << "completion=" << c
<< ", off=" << off
<< ", "
326 << "len=" << len
<< ", data_len = " << bl
.length() << ", "
327 << "flags=" << op_flags
<< dendl
;
329 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
330 c
->set_event_notify(true);
333 if (!start_in_flight_io(c
)) {
337 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
338 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
339 queue(ImageRequest
<I
>::create_writesame_request(
340 m_image_ctx
, c
, off
, len
, std::move(bl
), op_flags
, trace
));
343 ImageRequest
<I
>::aio_writesame(&m_image_ctx
, c
, off
, len
, std::move(bl
),
345 finish_in_flight_io();
347 trace
.event("finish");
350 template <typename I
>
351 void ImageRequestWQ
<I
>::shut_down(Context
*on_shutdown
) {
352 assert(m_image_ctx
.owner_lock
.is_locked());
355 RWLock::WLocker
locker(m_lock
);
359 CephContext
*cct
= m_image_ctx
.cct
;
360 ldout(cct
, 5) << __func__
<< ": in_flight=" << m_in_flight_ios
.load()
362 if (m_in_flight_ios
> 0) {
363 m_on_shutdown
= on_shutdown
;
368 // ensure that all in-flight IO is flushed
369 m_image_ctx
.flush(on_shutdown
);
372 template <typename I
>
373 int ImageRequestWQ
<I
>::block_writes() {
374 C_SaferCond cond_ctx
;
375 block_writes(&cond_ctx
);
376 return cond_ctx
.wait();
379 template <typename I
>
380 void ImageRequestWQ
<I
>::block_writes(Context
*on_blocked
) {
381 assert(m_image_ctx
.owner_lock
.is_locked());
382 CephContext
*cct
= m_image_ctx
.cct
;
385 RWLock::WLocker
locker(m_lock
);
387 ldout(cct
, 5) << &m_image_ctx
<< ", " << "num="
388 << m_write_blockers
<< dendl
;
389 if (!m_write_blocker_contexts
.empty() || m_in_flight_writes
> 0) {
390 m_write_blocker_contexts
.push_back(on_blocked
);
395 // ensure that all in-flight IO is flushed
396 m_image_ctx
.flush(on_blocked
);
399 template <typename I
>
400 void ImageRequestWQ
<I
>::unblock_writes() {
401 CephContext
*cct
= m_image_ctx
.cct
;
403 bool wake_up
= false;
405 RWLock::WLocker
locker(m_lock
);
406 assert(m_write_blockers
> 0);
409 ldout(cct
, 5) << &m_image_ctx
<< ", " << "num="
410 << m_write_blockers
<< dendl
;
411 if (m_write_blockers
== 0) {
421 template <typename I
>
422 void ImageRequestWQ
<I
>::set_require_lock(Direction direction
, bool enabled
) {
423 CephContext
*cct
= m_image_ctx
.cct
;
424 ldout(cct
, 20) << dendl
;
426 bool wake_up
= false;
428 RWLock::WLocker
locker(m_lock
);
431 wake_up
= (enabled
!= m_require_lock_on_read
);
432 m_require_lock_on_read
= enabled
;
434 case DIRECTION_WRITE
:
435 wake_up
= (enabled
!= m_require_lock_on_write
);
436 m_require_lock_on_write
= enabled
;
439 wake_up
= (enabled
!= m_require_lock_on_read
||
440 enabled
!= m_require_lock_on_write
);
441 m_require_lock_on_read
= enabled
;
442 m_require_lock_on_write
= enabled
;
447 // wake up the thread pool whenever the state changes so that
448 // we can re-request the lock if required
454 template <typename I
>
455 void *ImageRequestWQ
<I
>::_void_dequeue() {
456 CephContext
*cct
= m_image_ctx
.cct
;
457 ImageRequest
<I
> *peek_item
= this->front();
459 // no queued IO requests or all IO is blocked/stalled
460 if (peek_item
== nullptr || m_io_blockers
.load() > 0) {
465 bool refresh_required
= m_image_ctx
.state
->is_refresh_required();
467 RWLock::RLocker
locker(m_lock
);
468 bool write_op
= peek_item
->is_write_op();
469 lock_required
= is_lock_required(write_op
);
471 if (!lock_required
&& m_write_blockers
> 0) {
472 // missing lock is not the write blocker
476 if (!lock_required
&& !refresh_required
) {
477 // completed ops will requeue the IO -- don't count it as in-progress
478 m_in_flight_writes
++;
483 ImageRequest
<I
> *item
= reinterpret_cast<ImageRequest
<I
> *>(
484 ThreadPool::PointerWQ
<ImageRequest
<I
> >::_void_dequeue());
485 assert(peek_item
== item
);
488 this->get_pool_lock().Unlock();
489 m_image_ctx
.owner_lock
.get_read();
490 if (m_image_ctx
.exclusive_lock
!= nullptr) {
491 ldout(cct
, 5) << "exclusive lock required: delaying IO " << item
<< dendl
;
492 if (!m_image_ctx
.get_exclusive_lock_policy()->may_auto_request_lock()) {
493 lderr(cct
) << "op requires exclusive lock" << dendl
;
494 fail_in_flight_io(-EROFS
, item
);
496 // wake up the IO since we won't be returning a request to process
499 // stall IO until the acquire completes
501 m_image_ctx
.exclusive_lock
->acquire_lock(new C_AcquireLock(this, item
));
504 // raced with the exclusive lock being disabled
505 lock_required
= false;
507 m_image_ctx
.owner_lock
.put_read();
508 this->get_pool_lock().Lock();
515 if (refresh_required
) {
516 ldout(cct
, 5) << "image refresh required: delaying IO " << item
<< dendl
;
518 // stall IO until the refresh completes
521 this->get_pool_lock().Unlock();
522 m_image_ctx
.state
->refresh(new C_RefreshFinish(this, item
));
523 this->get_pool_lock().Lock();
531 template <typename I
>
532 void ImageRequestWQ
<I
>::process(ImageRequest
<I
> *req
) {
533 CephContext
*cct
= m_image_ctx
.cct
;
534 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
535 << "req=" << req
<< dendl
;
539 finish_queued_io(req
);
540 if (req
->is_write_op()) {
541 finish_in_flight_write();
545 finish_in_flight_io();
548 template <typename I
>
549 void ImageRequestWQ
<I
>::finish_queued_io(ImageRequest
<I
> *req
) {
550 RWLock::RLocker
locker(m_lock
);
551 if (req
->is_write_op()) {
552 assert(m_queued_writes
> 0);
555 assert(m_queued_reads
> 0);
560 template <typename I
>
561 void ImageRequestWQ
<I
>::finish_in_flight_write() {
562 bool writes_blocked
= false;
564 RWLock::RLocker
locker(m_lock
);
565 assert(m_in_flight_writes
> 0);
566 if (--m_in_flight_writes
== 0 &&
567 !m_write_blocker_contexts
.empty()) {
568 writes_blocked
= true;
572 if (writes_blocked
) {
573 m_image_ctx
.flush(new C_BlockedWrites(this));
577 template <typename I
>
578 int ImageRequestWQ
<I
>::start_in_flight_io(AioCompletion
*c
) {
579 RWLock::RLocker
locker(m_lock
);
582 CephContext
*cct
= m_image_ctx
.cct
;
583 lderr(cct
) << "IO received on closed image" << dendl
;
594 template <typename I
>
595 void ImageRequestWQ
<I
>::finish_in_flight_io() {
596 Context
*on_shutdown
;
598 RWLock::RLocker
locker(m_lock
);
599 if (--m_in_flight_ios
> 0 || !m_shutdown
) {
602 on_shutdown
= m_on_shutdown
;
605 CephContext
*cct
= m_image_ctx
.cct
;
606 ldout(cct
, 5) << "completing shut down" << dendl
;
608 assert(on_shutdown
!= nullptr);
609 m_image_ctx
.flush(on_shutdown
);
612 template <typename I
>
613 void ImageRequestWQ
<I
>::fail_in_flight_io(int r
, ImageRequest
<I
> *req
) {
614 this->process_finish();
616 finish_queued_io(req
);
618 finish_in_flight_io();
621 template <typename I
>
622 bool ImageRequestWQ
<I
>::is_lock_required(bool write_op
) const {
623 assert(m_lock
.is_locked());
624 return ((write_op
&& m_require_lock_on_write
) ||
625 (!write_op
&& m_require_lock_on_read
));
628 template <typename I
>
629 void ImageRequestWQ
<I
>::queue(ImageRequest
<I
> *req
) {
630 assert(m_image_ctx
.owner_lock
.is_locked());
632 CephContext
*cct
= m_image_ctx
.cct
;
633 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
634 << "req=" << req
<< dendl
;
636 if (req
->is_write_op()) {
642 ThreadPool::PointerWQ
<ImageRequest
<I
> >::queue(req
);
645 template <typename I
>
646 void ImageRequestWQ
<I
>::handle_acquire_lock(int r
, ImageRequest
<I
> *req
) {
647 CephContext
*cct
= m_image_ctx
.cct
;
648 ldout(cct
, 5) << "r=" << r
<< ", " << "req=" << req
<< dendl
;
651 fail_in_flight_io(r
, req
);
653 // since IO was stalled for acquire -- original IO order is preserved
654 // if we requeue this op for work queue processing
658 assert(m_io_blockers
.load() > 0);
663 template <typename I
>
664 void ImageRequestWQ
<I
>::handle_refreshed(int r
, ImageRequest
<I
> *req
) {
665 CephContext
*cct
= m_image_ctx
.cct
;
666 ldout(cct
, 5) << "resuming IO after image refresh: r=" << r
<< ", "
667 << "req=" << req
<< dendl
;
669 fail_in_flight_io(r
, req
);
671 // since IO was stalled for refresh -- original IO order is preserved
672 // if we requeue this op for work queue processing
676 assert(m_io_blockers
.load() > 0);
681 template <typename I
>
682 void ImageRequestWQ
<I
>::handle_blocked_writes(int r
) {
685 RWLock::WLocker
locker(m_lock
);
686 contexts
.swap(m_write_blocker_contexts
);
689 for (auto ctx
: contexts
) {
694 template class librbd::io::ImageRequestWQ
<librbd::ImageCtx
>;
697 } // namespace librbd