]>
git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/io/ImageRequestWQ.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "librbd/io/ImageRequestWQ.h"
5 #include "common/errno.h"
6 #include "librbd/ExclusiveLock.h"
7 #include "librbd/ImageCtx.h"
8 #include "librbd/ImageState.h"
9 #include "librbd/internal.h"
10 #include "librbd/Utils.h"
11 #include "librbd/exclusive_lock/Policy.h"
12 #include "librbd/io/AioCompletion.h"
13 #include "librbd/io/ImageRequest.h"
15 #define dout_subsys ceph_subsys_rbd
17 #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
18 << " " << __func__ << ": "
23 ImageRequestWQ::ImageRequestWQ(ImageCtx
*image_ctx
, const string
&name
,
24 time_t ti
, ThreadPool
*tp
)
25 : ThreadPool::PointerWQ
<ImageRequest
<> >(name
, ti
, 0, tp
),
26 m_image_ctx(*image_ctx
),
27 m_lock(util::unique_lock_name("ImageRequestWQ::m_lock", this)),
28 m_write_blockers(0), m_in_progress_writes(0), m_queued_reads(0),
29 m_queued_writes(0), m_in_flight_ops(0), m_refresh_in_progress(false),
30 m_shutdown(false), m_on_shutdown(nullptr) {
31 CephContext
*cct
= m_image_ctx
.cct
;
32 ldout(cct
, 5) << "ictx=" << image_ctx
<< dendl
;
33 tp
->add_work_queue(this);
36 ssize_t
ImageRequestWQ::read(uint64_t off
, uint64_t len
,
37 ReadResult
&&read_result
, int op_flags
) {
38 CephContext
*cct
= m_image_ctx
.cct
;
39 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
40 << "len = " << len
<< dendl
;
43 AioCompletion
*c
= AioCompletion::create(&cond
);
44 aio_read(c
, off
, len
, std::move(read_result
), op_flags
, false);
48 ssize_t
ImageRequestWQ::write(uint64_t off
, uint64_t len
,
49 bufferlist
&&bl
, int op_flags
) {
50 CephContext
*cct
= m_image_ctx
.cct
;
51 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
52 << "len = " << len
<< dendl
;
54 m_image_ctx
.snap_lock
.get_read();
55 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
56 m_image_ctx
.snap_lock
.put_read();
58 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
63 AioCompletion
*c
= AioCompletion::create(&cond
);
64 aio_write(c
, off
, len
, std::move(bl
), op_flags
, false);
73 ssize_t
ImageRequestWQ::discard(uint64_t off
, uint64_t len
, bool skip_partial_discard
) {
74 CephContext
*cct
= m_image_ctx
.cct
;
75 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
76 << "len = " << len
<< dendl
;
78 m_image_ctx
.snap_lock
.get_read();
79 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
80 m_image_ctx
.snap_lock
.put_read();
82 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
87 AioCompletion
*c
= AioCompletion::create(&cond
);
88 aio_discard(c
, off
, len
, skip_partial_discard
, false);
97 ssize_t
ImageRequestWQ::writesame(uint64_t off
, uint64_t len
, bufferlist
&&bl
,
99 CephContext
*cct
= m_image_ctx
.cct
;
100 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", off=" << off
<< ", "
101 << "len = " << len
<< ", data_len " << bl
.length() << dendl
;
103 m_image_ctx
.snap_lock
.get_read();
104 int r
= clip_io(util::get_image_ctx(&m_image_ctx
), off
, &len
);
105 m_image_ctx
.snap_lock
.put_read();
107 lderr(cct
) << "invalid IO request: " << cpp_strerror(r
) << dendl
;
112 AioCompletion
*c
= AioCompletion::create(&cond
);
113 aio_writesame(c
, off
, len
, std::move(bl
), op_flags
, false);
122 void ImageRequestWQ::aio_read(AioCompletion
*c
, uint64_t off
, uint64_t len
,
123 ReadResult
&&read_result
, int op_flags
,
125 c
->init_time(&m_image_ctx
, AIO_TYPE_READ
);
126 CephContext
*cct
= m_image_ctx
.cct
;
127 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
128 << "completion=" << c
<< ", off=" << off
<< ", "
129 << "len=" << len
<< ", " << "flags=" << op_flags
<< dendl
;
131 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
132 c
->set_event_notify(true);
135 if (!start_in_flight_op(c
)) {
139 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
141 // if journaling is enabled -- we need to replay the journal because
142 // it might contain an uncommitted write
145 RWLock::RLocker
locker(m_lock
);
146 lock_required
= m_require_lock_on_read
;
149 if (m_image_ctx
.non_blocking_aio
|| writes_blocked() || !writes_empty() ||
151 queue(new ImageReadRequest
<>(m_image_ctx
, c
, {{off
, len
}},
152 std::move(read_result
), op_flags
));
155 ImageRequest
<>::aio_read(&m_image_ctx
, c
, {{off
, len
}},
156 std::move(read_result
), op_flags
);
157 finish_in_flight_op();
161 void ImageRequestWQ::aio_write(AioCompletion
*c
, uint64_t off
, uint64_t len
,
162 bufferlist
&&bl
, int op_flags
,
164 c
->init_time(&m_image_ctx
, AIO_TYPE_WRITE
);
165 CephContext
*cct
= m_image_ctx
.cct
;
166 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
167 << "completion=" << c
<< ", off=" << off
<< ", "
168 << "len=" << len
<< ", flags=" << op_flags
<< dendl
;
170 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
171 c
->set_event_notify(true);
174 if (!start_in_flight_op(c
)) {
178 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
179 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
180 queue(new ImageWriteRequest
<>(m_image_ctx
, c
, {{off
, len
}},
181 std::move(bl
), op_flags
));
184 ImageRequest
<>::aio_write(&m_image_ctx
, c
, {{off
, len
}},
185 std::move(bl
), op_flags
);
186 finish_in_flight_op();
190 void ImageRequestWQ::aio_discard(AioCompletion
*c
, uint64_t off
,
191 uint64_t len
, bool skip_partial_discard
,
193 c
->init_time(&m_image_ctx
, AIO_TYPE_DISCARD
);
194 CephContext
*cct
= m_image_ctx
.cct
;
195 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
196 << "completion=" << c
<< ", off=" << off
<< ", len=" << len
199 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
200 c
->set_event_notify(true);
203 if (!start_in_flight_op(c
)) {
207 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
208 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
209 queue(new ImageDiscardRequest
<>(m_image_ctx
, c
, off
, len
, skip_partial_discard
));
212 ImageRequest
<>::aio_discard(&m_image_ctx
, c
, off
, len
, skip_partial_discard
);
213 finish_in_flight_op();
217 void ImageRequestWQ::aio_flush(AioCompletion
*c
, bool native_async
) {
218 c
->init_time(&m_image_ctx
, AIO_TYPE_FLUSH
);
219 CephContext
*cct
= m_image_ctx
.cct
;
220 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
221 << "completion=" << c
<< dendl
;
223 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
224 c
->set_event_notify(true);
227 if (!start_in_flight_op(c
)) {
231 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
232 if (m_image_ctx
.non_blocking_aio
|| writes_blocked() || !writes_empty()) {
233 queue(new ImageFlushRequest
<>(m_image_ctx
, c
));
235 ImageRequest
<>::aio_flush(&m_image_ctx
, c
);
236 finish_in_flight_op();
240 void ImageRequestWQ::aio_writesame(AioCompletion
*c
, uint64_t off
, uint64_t len
,
241 bufferlist
&&bl
, int op_flags
,
243 c
->init_time(&m_image_ctx
, AIO_TYPE_WRITESAME
);
244 CephContext
*cct
= m_image_ctx
.cct
;
245 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
246 << "completion=" << c
<< ", off=" << off
<< ", "
247 << "len=" << len
<< ", data_len = " << bl
.length() << ", "
248 << "flags=" << op_flags
<< dendl
;
250 if (native_async
&& m_image_ctx
.event_socket
.is_valid()) {
251 c
->set_event_notify(true);
254 if (!start_in_flight_op(c
)) {
258 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
259 if (m_image_ctx
.non_blocking_aio
|| writes_blocked()) {
260 queue(new ImageWriteSameRequest
<>(m_image_ctx
, c
, off
, len
, std::move(bl
),
264 ImageRequest
<>::aio_writesame(&m_image_ctx
, c
, off
, len
, std::move(bl
),
266 finish_in_flight_op();
270 void ImageRequestWQ::shut_down(Context
*on_shutdown
) {
271 assert(m_image_ctx
.owner_lock
.is_locked());
274 RWLock::WLocker
locker(m_lock
);
278 CephContext
*cct
= m_image_ctx
.cct
;
279 ldout(cct
, 5) << __func__
<< ": in_flight=" << m_in_flight_ops
.load()
281 if (m_in_flight_ops
> 0) {
282 m_on_shutdown
= on_shutdown
;
287 // ensure that all in-flight IO is flushed
288 m_image_ctx
.flush(on_shutdown
);
291 bool ImageRequestWQ::is_lock_request_needed() const {
292 RWLock::RLocker
locker(m_lock
);
293 return (m_queued_writes
> 0 ||
294 (m_require_lock_on_read
&& m_queued_reads
> 0));
297 int ImageRequestWQ::block_writes() {
298 C_SaferCond cond_ctx
;
299 block_writes(&cond_ctx
);
300 return cond_ctx
.wait();
303 void ImageRequestWQ::block_writes(Context
*on_blocked
) {
304 assert(m_image_ctx
.owner_lock
.is_locked());
305 CephContext
*cct
= m_image_ctx
.cct
;
308 RWLock::WLocker
locker(m_lock
);
310 ldout(cct
, 5) << &m_image_ctx
<< ", " << "num="
311 << m_write_blockers
<< dendl
;
312 if (!m_write_blocker_contexts
.empty() || m_in_progress_writes
> 0) {
313 m_write_blocker_contexts
.push_back(on_blocked
);
318 // ensure that all in-flight IO is flushed
319 m_image_ctx
.flush(on_blocked
);
322 void ImageRequestWQ::unblock_writes() {
323 CephContext
*cct
= m_image_ctx
.cct
;
325 bool wake_up
= false;
327 RWLock::WLocker
locker(m_lock
);
328 assert(m_write_blockers
> 0);
331 ldout(cct
, 5) << &m_image_ctx
<< ", " << "num="
332 << m_write_blockers
<< dendl
;
333 if (m_write_blockers
== 0) {
343 void ImageRequestWQ::set_require_lock_on_read() {
344 CephContext
*cct
= m_image_ctx
.cct
;
345 ldout(cct
, 20) << dendl
;
347 RWLock::WLocker
locker(m_lock
);
348 m_require_lock_on_read
= true;
351 void ImageRequestWQ::clear_require_lock_on_read() {
352 CephContext
*cct
= m_image_ctx
.cct
;
353 ldout(cct
, 20) << dendl
;
356 RWLock::WLocker
locker(m_lock
);
357 if (!m_require_lock_on_read
) {
361 m_require_lock_on_read
= false;
366 void *ImageRequestWQ::_void_dequeue() {
367 ImageRequest
<> *peek_item
= front();
369 // no IO ops available or refresh in-progress (IO stalled)
370 if (peek_item
== nullptr || m_refresh_in_progress
) {
374 bool refresh_required
= m_image_ctx
.state
->is_refresh_required();
376 RWLock::RLocker
locker(m_lock
);
377 if (peek_item
->is_write_op()) {
378 if (m_write_blockers
> 0) {
382 // refresh will requeue the op -- don't count it as in-progress
383 if (!refresh_required
) {
384 m_in_progress_writes
++;
386 } else if (m_require_lock_on_read
) {
391 ImageRequest
<> *item
= reinterpret_cast<ImageRequest
<> *>(
392 ThreadPool::PointerWQ
<ImageRequest
<> >::_void_dequeue());
393 assert(peek_item
== item
);
395 if (refresh_required
) {
396 ldout(m_image_ctx
.cct
, 15) << "image refresh required: delaying IO " << item
399 // stall IO until the refresh completes
400 m_refresh_in_progress
= true;
402 get_pool_lock().Unlock();
403 m_image_ctx
.state
->refresh(new C_RefreshFinish(this, item
));
404 get_pool_lock().Lock();
412 void ImageRequestWQ::process(ImageRequest
<> *req
) {
413 CephContext
*cct
= m_image_ctx
.cct
;
414 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
415 << "req=" << req
<< dendl
;
419 finish_queued_op(req
);
420 if (req
->is_write_op()) {
421 finish_in_progress_write();
425 finish_in_flight_op();
428 void ImageRequestWQ::finish_queued_op(ImageRequest
<> *req
) {
429 RWLock::RLocker
locker(m_lock
);
430 if (req
->is_write_op()) {
431 assert(m_queued_writes
> 0);
434 assert(m_queued_reads
> 0);
439 void ImageRequestWQ::finish_in_progress_write() {
440 bool writes_blocked
= false;
442 RWLock::RLocker
locker(m_lock
);
443 assert(m_in_progress_writes
> 0);
444 if (--m_in_progress_writes
== 0 &&
445 !m_write_blocker_contexts
.empty()) {
446 writes_blocked
= true;
450 if (writes_blocked
) {
451 m_image_ctx
.flush(new C_BlockedWrites(this));
455 int ImageRequestWQ::start_in_flight_op(AioCompletion
*c
) {
456 RWLock::RLocker
locker(m_lock
);
459 CephContext
*cct
= m_image_ctx
.cct
;
460 lderr(cct
) << "IO received on closed image" << dendl
;
470 void ImageRequestWQ::finish_in_flight_op() {
471 Context
*on_shutdown
;
473 RWLock::RLocker
locker(m_lock
);
474 if (--m_in_flight_ops
> 0 || !m_shutdown
) {
477 on_shutdown
= m_on_shutdown
;
480 CephContext
*cct
= m_image_ctx
.cct
;
481 ldout(cct
, 5) << "completing shut down" << dendl
;
483 assert(on_shutdown
!= nullptr);
484 m_image_ctx
.flush(on_shutdown
);
487 bool ImageRequestWQ::is_lock_required() const {
488 assert(m_image_ctx
.owner_lock
.is_locked());
489 if (m_image_ctx
.exclusive_lock
== NULL
) {
493 return (!m_image_ctx
.exclusive_lock
->is_lock_owner());
496 void ImageRequestWQ::queue(ImageRequest
<> *req
) {
497 CephContext
*cct
= m_image_ctx
.cct
;
498 ldout(cct
, 20) << "ictx=" << &m_image_ctx
<< ", "
499 << "req=" << req
<< dendl
;
501 assert(m_image_ctx
.owner_lock
.is_locked());
502 bool write_op
= req
->is_write_op();
503 bool lock_required
= (m_image_ctx
.exclusive_lock
!= nullptr &&
504 ((write_op
&& is_lock_required()) ||
505 (!write_op
&& m_require_lock_on_read
)));
507 if (lock_required
&& !m_image_ctx
.get_exclusive_lock_policy()->may_auto_request_lock()) {
508 lderr(cct
) << "op requires exclusive lock" << dendl
;
511 finish_in_flight_op();
521 ThreadPool::PointerWQ
<ImageRequest
<> >::queue(req
);
524 m_image_ctx
.exclusive_lock
->acquire_lock(nullptr);
528 void ImageRequestWQ::handle_refreshed(int r
, ImageRequest
<> *req
) {
529 CephContext
*cct
= m_image_ctx
.cct
;
530 ldout(cct
, 15) << "resuming IO after image refresh: r=" << r
<< ", "
531 << "req=" << req
<< dendl
;
535 finish_queued_op(req
);
537 finish_in_flight_op();
539 // since IO was stalled for refresh -- original IO order is preserved
540 // if we requeue this op for work queue processing
544 m_refresh_in_progress
= false;
547 // refresh might have enabled exclusive lock -- IO stalled until
548 // we acquire the lock
549 RWLock::RLocker
owner_locker(m_image_ctx
.owner_lock
);
550 if (is_lock_required() && is_lock_request_needed()) {
551 m_image_ctx
.exclusive_lock
->acquire_lock(nullptr);
555 void ImageRequestWQ::handle_blocked_writes(int r
) {
558 RWLock::WLocker
locker(m_lock
);
559 contexts
.swap(m_write_blocker_contexts
);
562 for (auto ctx
: contexts
) {
568 } // namespace librbd