]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/io/ImageRequestWQ.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / librbd / io / ImageRequestWQ.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/io/ImageRequestWQ.h"
5#include "common/errno.h"
6#include "librbd/ExclusiveLock.h"
7#include "librbd/ImageCtx.h"
8#include "librbd/ImageState.h"
9#include "librbd/internal.h"
10#include "librbd/Utils.h"
11#include "librbd/exclusive_lock/Policy.h"
12#include "librbd/io/AioCompletion.h"
13#include "librbd/io/ImageRequest.h"
14
15#define dout_subsys ceph_subsys_rbd
16#undef dout_prefix
17#define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
18 << " " << __func__ << ": "
19
20namespace librbd {
21namespace io {
22
23ImageRequestWQ::ImageRequestWQ(ImageCtx *image_ctx, const string &name,
24 time_t ti, ThreadPool *tp)
25 : ThreadPool::PointerWQ<ImageRequest<> >(name, ti, 0, tp),
26 m_image_ctx(*image_ctx),
27 m_lock(util::unique_lock_name("ImageRequestWQ::m_lock", this)),
28 m_write_blockers(0), m_in_progress_writes(0), m_queued_reads(0),
29 m_queued_writes(0), m_in_flight_ops(0), m_refresh_in_progress(false),
30 m_shutdown(false), m_on_shutdown(nullptr) {
31 CephContext *cct = m_image_ctx.cct;
32 ldout(cct, 5) << "ictx=" << image_ctx << dendl;
33 tp->add_work_queue(this);
34}
35
36ssize_t ImageRequestWQ::read(uint64_t off, uint64_t len,
37 ReadResult &&read_result, int op_flags) {
38 CephContext *cct = m_image_ctx.cct;
39 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
40 << "len = " << len << dendl;
41
42 C_SaferCond cond;
43 AioCompletion *c = AioCompletion::create(&cond);
44 aio_read(c, off, len, std::move(read_result), op_flags, false);
45 return cond.wait();
46}
47
48ssize_t ImageRequestWQ::write(uint64_t off, uint64_t len,
49 bufferlist &&bl, int op_flags) {
50 CephContext *cct = m_image_ctx.cct;
51 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
52 << "len = " << len << dendl;
53
54 m_image_ctx.snap_lock.get_read();
55 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
56 m_image_ctx.snap_lock.put_read();
57 if (r < 0) {
58 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
59 return r;
60 }
61
62 C_SaferCond cond;
63 AioCompletion *c = AioCompletion::create(&cond);
64 aio_write(c, off, len, std::move(bl), op_flags, false);
65
66 r = cond.wait();
67 if (r < 0) {
68 return r;
69 }
70 return len;
71}
72
73ssize_t ImageRequestWQ::discard(uint64_t off, uint64_t len, bool skip_partial_discard) {
74 CephContext *cct = m_image_ctx.cct;
75 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
76 << "len = " << len << dendl;
77
78 m_image_ctx.snap_lock.get_read();
79 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
80 m_image_ctx.snap_lock.put_read();
81 if (r < 0) {
82 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
83 return r;
84 }
85
86 C_SaferCond cond;
87 AioCompletion *c = AioCompletion::create(&cond);
88 aio_discard(c, off, len, skip_partial_discard, false);
89
90 r = cond.wait();
91 if (r < 0) {
92 return r;
93 }
94 return len;
95}
96
97ssize_t ImageRequestWQ::writesame(uint64_t off, uint64_t len, bufferlist &&bl,
98 int op_flags) {
99 CephContext *cct = m_image_ctx.cct;
100 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
101 << "len = " << len << ", data_len " << bl.length() << dendl;
102
103 m_image_ctx.snap_lock.get_read();
104 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
105 m_image_ctx.snap_lock.put_read();
106 if (r < 0) {
107 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
108 return r;
109 }
110
111 C_SaferCond cond;
112 AioCompletion *c = AioCompletion::create(&cond);
113 aio_writesame(c, off, len, std::move(bl), op_flags, false);
114
115 r = cond.wait();
116 if (r < 0) {
117 return r;
118 }
119 return len;
120}
121
122void ImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
123 ReadResult &&read_result, int op_flags,
124 bool native_async) {
125 c->init_time(&m_image_ctx, AIO_TYPE_READ);
126 CephContext *cct = m_image_ctx.cct;
127 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
128 << "completion=" << c << ", off=" << off << ", "
129 << "len=" << len << ", " << "flags=" << op_flags << dendl;
130
131 if (native_async && m_image_ctx.event_socket.is_valid()) {
132 c->set_event_notify(true);
133 }
134
135 if (!start_in_flight_op(c)) {
136 return;
137 }
138
139 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
140
141 // if journaling is enabled -- we need to replay the journal because
142 // it might contain an uncommitted write
143 bool lock_required;
144 {
145 RWLock::RLocker locker(m_lock);
146 lock_required = m_require_lock_on_read;
147 }
148
149 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
150 lock_required) {
151 queue(new ImageReadRequest<>(m_image_ctx, c, {{off, len}},
152 std::move(read_result), op_flags));
153 } else {
154 c->start_op();
155 ImageRequest<>::aio_read(&m_image_ctx, c, {{off, len}},
156 std::move(read_result), op_flags);
157 finish_in_flight_op();
158 }
159}
160
161void ImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
162 bufferlist &&bl, int op_flags,
163 bool native_async) {
164 c->init_time(&m_image_ctx, AIO_TYPE_WRITE);
165 CephContext *cct = m_image_ctx.cct;
166 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
167 << "completion=" << c << ", off=" << off << ", "
168 << "len=" << len << ", flags=" << op_flags << dendl;
169
170 if (native_async && m_image_ctx.event_socket.is_valid()) {
171 c->set_event_notify(true);
172 }
173
174 if (!start_in_flight_op(c)) {
175 return;
176 }
177
178 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
179 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
180 queue(new ImageWriteRequest<>(m_image_ctx, c, {{off, len}},
181 std::move(bl), op_flags));
182 } else {
183 c->start_op();
184 ImageRequest<>::aio_write(&m_image_ctx, c, {{off, len}},
185 std::move(bl), op_flags);
186 finish_in_flight_op();
187 }
188}
189
190void ImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off,
191 uint64_t len, bool skip_partial_discard,
192 bool native_async) {
193 c->init_time(&m_image_ctx, AIO_TYPE_DISCARD);
194 CephContext *cct = m_image_ctx.cct;
195 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
196 << "completion=" << c << ", off=" << off << ", len=" << len
197 << dendl;
198
199 if (native_async && m_image_ctx.event_socket.is_valid()) {
200 c->set_event_notify(true);
201 }
202
203 if (!start_in_flight_op(c)) {
204 return;
205 }
206
207 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
208 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
209 queue(new ImageDiscardRequest<>(m_image_ctx, c, off, len, skip_partial_discard));
210 } else {
211 c->start_op();
212 ImageRequest<>::aio_discard(&m_image_ctx, c, off, len, skip_partial_discard);
213 finish_in_flight_op();
214 }
215}
216
217void ImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) {
218 c->init_time(&m_image_ctx, AIO_TYPE_FLUSH);
219 CephContext *cct = m_image_ctx.cct;
220 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
221 << "completion=" << c << dendl;
222
223 if (native_async && m_image_ctx.event_socket.is_valid()) {
224 c->set_event_notify(true);
225 }
226
227 if (!start_in_flight_op(c)) {
228 return;
229 }
230
231 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
232 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
233 queue(new ImageFlushRequest<>(m_image_ctx, c));
234 } else {
235 ImageRequest<>::aio_flush(&m_image_ctx, c);
236 finish_in_flight_op();
237 }
238}
239
240void ImageRequestWQ::aio_writesame(AioCompletion *c, uint64_t off, uint64_t len,
241 bufferlist &&bl, int op_flags,
242 bool native_async) {
243 c->init_time(&m_image_ctx, AIO_TYPE_WRITESAME);
244 CephContext *cct = m_image_ctx.cct;
245 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
246 << "completion=" << c << ", off=" << off << ", "
247 << "len=" << len << ", data_len = " << bl.length() << ", "
248 << "flags=" << op_flags << dendl;
249
250 if (native_async && m_image_ctx.event_socket.is_valid()) {
251 c->set_event_notify(true);
252 }
253
254 if (!start_in_flight_op(c)) {
255 return;
256 }
257
258 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
259 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
260 queue(new ImageWriteSameRequest<>(m_image_ctx, c, off, len, std::move(bl),
261 op_flags));
262 } else {
263 c->start_op();
264 ImageRequest<>::aio_writesame(&m_image_ctx, c, off, len, std::move(bl),
265 op_flags);
266 finish_in_flight_op();
267 }
268}
269
270void ImageRequestWQ::shut_down(Context *on_shutdown) {
271 assert(m_image_ctx.owner_lock.is_locked());
272
273 {
274 RWLock::WLocker locker(m_lock);
275 assert(!m_shutdown);
276 m_shutdown = true;
277
278 CephContext *cct = m_image_ctx.cct;
279 ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ops.load()
280 << dendl;
281 if (m_in_flight_ops > 0) {
282 m_on_shutdown = on_shutdown;
283 return;
284 }
285 }
286
287 // ensure that all in-flight IO is flushed
288 m_image_ctx.flush(on_shutdown);
289}
290
291bool ImageRequestWQ::is_lock_request_needed() const {
292 RWLock::RLocker locker(m_lock);
293 return (m_queued_writes > 0 ||
294 (m_require_lock_on_read && m_queued_reads > 0));
295}
296
297int ImageRequestWQ::block_writes() {
298 C_SaferCond cond_ctx;
299 block_writes(&cond_ctx);
300 return cond_ctx.wait();
301}
302
303void ImageRequestWQ::block_writes(Context *on_blocked) {
304 assert(m_image_ctx.owner_lock.is_locked());
305 CephContext *cct = m_image_ctx.cct;
306
307 {
308 RWLock::WLocker locker(m_lock);
309 ++m_write_blockers;
310 ldout(cct, 5) << &m_image_ctx << ", " << "num="
311 << m_write_blockers << dendl;
312 if (!m_write_blocker_contexts.empty() || m_in_progress_writes > 0) {
313 m_write_blocker_contexts.push_back(on_blocked);
314 return;
315 }
316 }
317
318 // ensure that all in-flight IO is flushed
319 m_image_ctx.flush(on_blocked);
320}
321
322void ImageRequestWQ::unblock_writes() {
323 CephContext *cct = m_image_ctx.cct;
324
325 bool wake_up = false;
326 {
327 RWLock::WLocker locker(m_lock);
328 assert(m_write_blockers > 0);
329 --m_write_blockers;
330
331 ldout(cct, 5) << &m_image_ctx << ", " << "num="
332 << m_write_blockers << dendl;
333 if (m_write_blockers == 0) {
334 wake_up = true;
335 }
336 }
337
338 if (wake_up) {
339 signal();
340 }
341}
342
343void ImageRequestWQ::set_require_lock_on_read() {
344 CephContext *cct = m_image_ctx.cct;
345 ldout(cct, 20) << dendl;
346
347 RWLock::WLocker locker(m_lock);
348 m_require_lock_on_read = true;
349}
350
351void ImageRequestWQ::clear_require_lock_on_read() {
352 CephContext *cct = m_image_ctx.cct;
353 ldout(cct, 20) << dendl;
354
355 {
356 RWLock::WLocker locker(m_lock);
357 if (!m_require_lock_on_read) {
358 return;
359 }
360
361 m_require_lock_on_read = false;
362 }
363 signal();
364}
365
366void *ImageRequestWQ::_void_dequeue() {
367 ImageRequest<> *peek_item = front();
368
369 // no IO ops available or refresh in-progress (IO stalled)
370 if (peek_item == nullptr || m_refresh_in_progress) {
371 return nullptr;
372 }
373
374 bool refresh_required = m_image_ctx.state->is_refresh_required();
375 {
376 RWLock::RLocker locker(m_lock);
377 if (peek_item->is_write_op()) {
378 if (m_write_blockers > 0) {
379 return nullptr;
380 }
381
382 // refresh will requeue the op -- don't count it as in-progress
383 if (!refresh_required) {
384 m_in_progress_writes++;
385 }
386 } else if (m_require_lock_on_read) {
387 return nullptr;
388 }
389 }
390
391 ImageRequest<> *item = reinterpret_cast<ImageRequest<> *>(
392 ThreadPool::PointerWQ<ImageRequest<> >::_void_dequeue());
393 assert(peek_item == item);
394
395 if (refresh_required) {
396 ldout(m_image_ctx.cct, 15) << "image refresh required: delaying IO " << item
397 << dendl;
398
399 // stall IO until the refresh completes
400 m_refresh_in_progress = true;
401
402 get_pool_lock().Unlock();
403 m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
404 get_pool_lock().Lock();
405 return nullptr;
406 }
407
408 item->start_op();
409 return item;
410}
411
412void ImageRequestWQ::process(ImageRequest<> *req) {
413 CephContext *cct = m_image_ctx.cct;
414 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
415 << "req=" << req << dendl;
416
417 req->send();
418
419 finish_queued_op(req);
420 if (req->is_write_op()) {
421 finish_in_progress_write();
422 }
423 delete req;
424
425 finish_in_flight_op();
426}
427
428void ImageRequestWQ::finish_queued_op(ImageRequest<> *req) {
429 RWLock::RLocker locker(m_lock);
430 if (req->is_write_op()) {
431 assert(m_queued_writes > 0);
432 m_queued_writes--;
433 } else {
434 assert(m_queued_reads > 0);
435 m_queued_reads--;
436 }
437}
438
439void ImageRequestWQ::finish_in_progress_write() {
440 bool writes_blocked = false;
441 {
442 RWLock::RLocker locker(m_lock);
443 assert(m_in_progress_writes > 0);
444 if (--m_in_progress_writes == 0 &&
445 !m_write_blocker_contexts.empty()) {
446 writes_blocked = true;
447 }
448 }
449
450 if (writes_blocked) {
451 m_image_ctx.flush(new C_BlockedWrites(this));
452 }
453}
454
455int ImageRequestWQ::start_in_flight_op(AioCompletion *c) {
456 RWLock::RLocker locker(m_lock);
457
458 if (m_shutdown) {
459 CephContext *cct = m_image_ctx.cct;
460 lderr(cct) << "IO received on closed image" << dendl;
461
462 c->fail(-ESHUTDOWN);
463 return false;
464 }
465
466 m_in_flight_ops++;
467 return true;
468}
469
470void ImageRequestWQ::finish_in_flight_op() {
471 Context *on_shutdown;
472 {
473 RWLock::RLocker locker(m_lock);
474 if (--m_in_flight_ops > 0 || !m_shutdown) {
475 return;
476 }
477 on_shutdown = m_on_shutdown;
478 }
479
480 CephContext *cct = m_image_ctx.cct;
481 ldout(cct, 5) << "completing shut down" << dendl;
482
483 assert(on_shutdown != nullptr);
484 m_image_ctx.flush(on_shutdown);
485}
486
487bool ImageRequestWQ::is_lock_required() const {
488 assert(m_image_ctx.owner_lock.is_locked());
489 if (m_image_ctx.exclusive_lock == NULL) {
490 return false;
491 }
492
493 return (!m_image_ctx.exclusive_lock->is_lock_owner());
494}
495
496void ImageRequestWQ::queue(ImageRequest<> *req) {
497 CephContext *cct = m_image_ctx.cct;
498 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
499 << "req=" << req << dendl;
500
501 assert(m_image_ctx.owner_lock.is_locked());
502 bool write_op = req->is_write_op();
503 bool lock_required = (m_image_ctx.exclusive_lock != nullptr &&
504 ((write_op && is_lock_required()) ||
505 (!write_op && m_require_lock_on_read)));
506
507 if (lock_required && !m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
508 lderr(cct) << "op requires exclusive lock" << dendl;
509 req->fail(-EROFS);
510 delete req;
511 finish_in_flight_op();
512 return;
513 }
514
515 if (write_op) {
516 m_queued_writes++;
517 } else {
518 m_queued_reads++;
519 }
520
521 ThreadPool::PointerWQ<ImageRequest<> >::queue(req);
522
523 if (lock_required) {
524 m_image_ctx.exclusive_lock->acquire_lock(nullptr);
525 }
526}
527
528void ImageRequestWQ::handle_refreshed(int r, ImageRequest<> *req) {
529 CephContext *cct = m_image_ctx.cct;
530 ldout(cct, 15) << "resuming IO after image refresh: r=" << r << ", "
531 << "req=" << req << dendl;
532 if (r < 0) {
533 process_finish();
534 req->fail(r);
535 finish_queued_op(req);
536 delete req;
537 finish_in_flight_op();
538 } else {
539 // since IO was stalled for refresh -- original IO order is preserved
540 // if we requeue this op for work queue processing
541 requeue(req);
542 }
543
544 m_refresh_in_progress = false;
545 signal();
546
547 // refresh might have enabled exclusive lock -- IO stalled until
548 // we acquire the lock
549 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
550 if (is_lock_required() && is_lock_request_needed()) {
551 m_image_ctx.exclusive_lock->acquire_lock(nullptr);
552 }
553}
554
555void ImageRequestWQ::handle_blocked_writes(int r) {
556 Contexts contexts;
557 {
558 RWLock::WLocker locker(m_lock);
559 contexts.swap(m_write_blocker_contexts);
560 }
561
562 for (auto ctx : contexts) {
563 ctx->complete(0);
564 }
565}
566
567} // namespace io
568} // namespace librbd