]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/io/ImageRequestWQ.cc
2758790eb087a87f2d44a49fb495056d879eb35c
[ceph.git] / ceph / src / librbd / io / ImageRequestWQ.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/io/ImageRequestWQ.h"
5 #include "common/errno.h"
6 #include "common/zipkin_trace.h"
7 #include "librbd/ExclusiveLock.h"
8 #include "librbd/ImageCtx.h"
9 #include "librbd/ImageState.h"
10 #include "librbd/internal.h"
11 #include "librbd/Utils.h"
12 #include "librbd/exclusive_lock/Policy.h"
13 #include "librbd/io/AioCompletion.h"
14 #include "librbd/io/ImageRequest.h"
15
16 #define dout_subsys ceph_subsys_rbd
17 #undef dout_prefix
18 #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
19 << " " << __func__ << ": "
20
21 namespace librbd {
22 namespace io {
23
24 ImageRequestWQ::ImageRequestWQ(ImageCtx *image_ctx, const string &name,
25 time_t ti, ThreadPool *tp)
26 : ThreadPool::PointerWQ<ImageRequest<> >(name, ti, 0, tp),
27 m_image_ctx(*image_ctx),
28 m_lock(util::unique_lock_name("ImageRequestWQ::m_lock", this)),
29 m_write_blockers(0), m_in_progress_writes(0), m_queued_reads(0),
30 m_queued_writes(0), m_in_flight_ops(0), m_refresh_in_progress(false),
31 m_shutdown(false), m_on_shutdown(nullptr) {
32 CephContext *cct = m_image_ctx.cct;
33 ldout(cct, 5) << "ictx=" << image_ctx << dendl;
34 tp->add_work_queue(this);
35 }
36
37 ssize_t ImageRequestWQ::read(uint64_t off, uint64_t len,
38 ReadResult &&read_result, int op_flags) {
39 CephContext *cct = m_image_ctx.cct;
40 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
41 << "len = " << len << dendl;
42
43 C_SaferCond cond;
44 AioCompletion *c = AioCompletion::create(&cond);
45 aio_read(c, off, len, std::move(read_result), op_flags, false);
46 return cond.wait();
47 }
48
49 ssize_t ImageRequestWQ::write(uint64_t off, uint64_t len,
50 bufferlist &&bl, int op_flags) {
51 CephContext *cct = m_image_ctx.cct;
52 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
53 << "len = " << len << dendl;
54
55 m_image_ctx.snap_lock.get_read();
56 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
57 m_image_ctx.snap_lock.put_read();
58 if (r < 0) {
59 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
60 return r;
61 }
62
63 C_SaferCond cond;
64 AioCompletion *c = AioCompletion::create(&cond);
65 aio_write(c, off, len, std::move(bl), op_flags, false);
66
67 r = cond.wait();
68 if (r < 0) {
69 return r;
70 }
71 return len;
72 }
73
74 ssize_t ImageRequestWQ::discard(uint64_t off, uint64_t len, bool skip_partial_discard) {
75 CephContext *cct = m_image_ctx.cct;
76 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
77 << "len = " << len << dendl;
78
79 m_image_ctx.snap_lock.get_read();
80 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
81 m_image_ctx.snap_lock.put_read();
82 if (r < 0) {
83 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
84 return r;
85 }
86
87 C_SaferCond cond;
88 AioCompletion *c = AioCompletion::create(&cond);
89 aio_discard(c, off, len, skip_partial_discard, false);
90
91 r = cond.wait();
92 if (r < 0) {
93 return r;
94 }
95 return len;
96 }
97
98 ssize_t ImageRequestWQ::writesame(uint64_t off, uint64_t len, bufferlist &&bl,
99 int op_flags) {
100 CephContext *cct = m_image_ctx.cct;
101 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
102 << "len = " << len << ", data_len " << bl.length() << dendl;
103
104 m_image_ctx.snap_lock.get_read();
105 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
106 m_image_ctx.snap_lock.put_read();
107 if (r < 0) {
108 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
109 return r;
110 }
111
112 C_SaferCond cond;
113 AioCompletion *c = AioCompletion::create(&cond);
114 aio_writesame(c, off, len, std::move(bl), op_flags, false);
115
116 r = cond.wait();
117 if (r < 0) {
118 return r;
119 }
120 return len;
121 }
122
123 void ImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
124 ReadResult &&read_result, int op_flags,
125 bool native_async) {
126 CephContext *cct = m_image_ctx.cct;
127 ZTracer::Trace trace;
128 if (cct->_conf->rbd_blkin_trace_all) {
129 trace.init("wq: read", &m_image_ctx.trace_endpoint);
130 trace.event("start");
131 }
132
133 c->init_time(&m_image_ctx, AIO_TYPE_READ);
134 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
135 << "completion=" << c << ", off=" << off << ", "
136 << "len=" << len << ", " << "flags=" << op_flags << dendl;
137
138 if (native_async && m_image_ctx.event_socket.is_valid()) {
139 c->set_event_notify(true);
140 }
141
142 if (!start_in_flight_op(c)) {
143 return;
144 }
145
146 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
147
148 // if journaling is enabled -- we need to replay the journal because
149 // it might contain an uncommitted write
150 bool lock_required;
151 {
152 RWLock::RLocker locker(m_lock);
153 lock_required = m_require_lock_on_read;
154 }
155
156 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
157 lock_required) {
158 queue(new ImageReadRequest<>(m_image_ctx, c, {{off, len}},
159 std::move(read_result), op_flags, trace));
160 } else {
161 c->start_op();
162 ImageRequest<>::aio_read(&m_image_ctx, c, {{off, len}},
163 std::move(read_result), op_flags, trace);
164 finish_in_flight_op();
165 }
166 trace.event("finish");
167 }
168
169 void ImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
170 bufferlist &&bl, int op_flags,
171 bool native_async) {
172 CephContext *cct = m_image_ctx.cct;
173 ZTracer::Trace trace;
174 if (cct->_conf->rbd_blkin_trace_all) {
175 trace.init("wq: write", &m_image_ctx.trace_endpoint);
176 trace.event("init");
177 }
178
179 c->init_time(&m_image_ctx, AIO_TYPE_WRITE);
180 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
181 << "completion=" << c << ", off=" << off << ", "
182 << "len=" << len << ", flags=" << op_flags << dendl;
183
184 if (native_async && m_image_ctx.event_socket.is_valid()) {
185 c->set_event_notify(true);
186 }
187
188 if (!start_in_flight_op(c)) {
189 return;
190 }
191
192 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
193 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
194 queue(new ImageWriteRequest<>(m_image_ctx, c, {{off, len}},
195 std::move(bl), op_flags, trace));
196 } else {
197 c->start_op();
198 ImageRequest<>::aio_write(&m_image_ctx, c, {{off, len}},
199 std::move(bl), op_flags, trace);
200 finish_in_flight_op();
201 }
202 trace.event("finish");
203 }
204
205 void ImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off,
206 uint64_t len, bool skip_partial_discard,
207 bool native_async) {
208 CephContext *cct = m_image_ctx.cct;
209 ZTracer::Trace trace;
210 if (cct->_conf->rbd_blkin_trace_all) {
211 trace.init("wq: discard", &m_image_ctx.trace_endpoint);
212 trace.event("init");
213 }
214
215 c->init_time(&m_image_ctx, AIO_TYPE_DISCARD);
216 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
217 << "completion=" << c << ", off=" << off << ", len=" << len
218 << dendl;
219
220 if (native_async && m_image_ctx.event_socket.is_valid()) {
221 c->set_event_notify(true);
222 }
223
224 if (!start_in_flight_op(c)) {
225 return;
226 }
227
228 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
229 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
230 queue(new ImageDiscardRequest<>(m_image_ctx, c, off, len,
231 skip_partial_discard, trace));
232 } else {
233 c->start_op();
234 ImageRequest<>::aio_discard(&m_image_ctx, c, off, len,
235 skip_partial_discard, trace);
236 finish_in_flight_op();
237 }
238 trace.event("finish");
239 }
240
241 void ImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) {
242 CephContext *cct = m_image_ctx.cct;
243 ZTracer::Trace trace;
244 if (cct->_conf->rbd_blkin_trace_all) {
245 trace.init("wq: flush", &m_image_ctx.trace_endpoint);
246 trace.event("init");
247 }
248
249 c->init_time(&m_image_ctx, AIO_TYPE_FLUSH);
250 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
251 << "completion=" << c << dendl;
252
253 if (native_async && m_image_ctx.event_socket.is_valid()) {
254 c->set_event_notify(true);
255 }
256
257 if (!start_in_flight_op(c)) {
258 return;
259 }
260
261 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
262 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
263 queue(new ImageFlushRequest<>(m_image_ctx, c, trace));
264 } else {
265 ImageRequest<>::aio_flush(&m_image_ctx, c, trace);
266 finish_in_flight_op();
267 }
268 trace.event("finish");
269 }
270
271 void ImageRequestWQ::aio_writesame(AioCompletion *c, uint64_t off, uint64_t len,
272 bufferlist &&bl, int op_flags,
273 bool native_async) {
274 CephContext *cct = m_image_ctx.cct;
275 ZTracer::Trace trace;
276 if (cct->_conf->rbd_blkin_trace_all) {
277 trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
278 trace.event("init");
279 }
280
281 c->init_time(&m_image_ctx, AIO_TYPE_WRITESAME);
282 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
283 << "completion=" << c << ", off=" << off << ", "
284 << "len=" << len << ", data_len = " << bl.length() << ", "
285 << "flags=" << op_flags << dendl;
286
287 if (native_async && m_image_ctx.event_socket.is_valid()) {
288 c->set_event_notify(true);
289 }
290
291 if (!start_in_flight_op(c)) {
292 return;
293 }
294
295 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
296 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
297 queue(new ImageWriteSameRequest<>(m_image_ctx, c, off, len, std::move(bl),
298 op_flags, trace));
299 } else {
300 c->start_op();
301 ImageRequest<>::aio_writesame(&m_image_ctx, c, off, len, std::move(bl),
302 op_flags, trace);
303 finish_in_flight_op();
304 }
305 trace.event("finish");
306 }
307
308 void ImageRequestWQ::shut_down(Context *on_shutdown) {
309 assert(m_image_ctx.owner_lock.is_locked());
310
311 {
312 RWLock::WLocker locker(m_lock);
313 assert(!m_shutdown);
314 m_shutdown = true;
315
316 CephContext *cct = m_image_ctx.cct;
317 ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ops.load()
318 << dendl;
319 if (m_in_flight_ops > 0) {
320 m_on_shutdown = on_shutdown;
321 return;
322 }
323 }
324
325 // ensure that all in-flight IO is flushed
326 m_image_ctx.flush(on_shutdown);
327 }
328
329 bool ImageRequestWQ::is_lock_request_needed() const {
330 RWLock::RLocker locker(m_lock);
331 return (m_queued_writes > 0 ||
332 (m_require_lock_on_read && m_queued_reads > 0));
333 }
334
335 int ImageRequestWQ::block_writes() {
336 C_SaferCond cond_ctx;
337 block_writes(&cond_ctx);
338 return cond_ctx.wait();
339 }
340
341 void ImageRequestWQ::block_writes(Context *on_blocked) {
342 assert(m_image_ctx.owner_lock.is_locked());
343 CephContext *cct = m_image_ctx.cct;
344
345 {
346 RWLock::WLocker locker(m_lock);
347 ++m_write_blockers;
348 ldout(cct, 5) << &m_image_ctx << ", " << "num="
349 << m_write_blockers << dendl;
350 if (!m_write_blocker_contexts.empty() || m_in_progress_writes > 0) {
351 m_write_blocker_contexts.push_back(on_blocked);
352 return;
353 }
354 }
355
356 // ensure that all in-flight IO is flushed
357 m_image_ctx.flush(on_blocked);
358 }
359
360 void ImageRequestWQ::unblock_writes() {
361 CephContext *cct = m_image_ctx.cct;
362
363 bool wake_up = false;
364 {
365 RWLock::WLocker locker(m_lock);
366 assert(m_write_blockers > 0);
367 --m_write_blockers;
368
369 ldout(cct, 5) << &m_image_ctx << ", " << "num="
370 << m_write_blockers << dendl;
371 if (m_write_blockers == 0) {
372 wake_up = true;
373 }
374 }
375
376 if (wake_up) {
377 signal();
378 }
379 }
380
381 void ImageRequestWQ::set_require_lock_on_read() {
382 CephContext *cct = m_image_ctx.cct;
383 ldout(cct, 20) << dendl;
384
385 RWLock::WLocker locker(m_lock);
386 m_require_lock_on_read = true;
387 }
388
389 void ImageRequestWQ::clear_require_lock_on_read() {
390 CephContext *cct = m_image_ctx.cct;
391 ldout(cct, 20) << dendl;
392
393 {
394 RWLock::WLocker locker(m_lock);
395 if (!m_require_lock_on_read) {
396 return;
397 }
398
399 m_require_lock_on_read = false;
400 }
401 signal();
402 }
403
404 void *ImageRequestWQ::_void_dequeue() {
405 ImageRequest<> *peek_item = front();
406
407 // no IO ops available or refresh in-progress (IO stalled)
408 if (peek_item == nullptr || m_refresh_in_progress) {
409 return nullptr;
410 }
411
412 bool refresh_required = m_image_ctx.state->is_refresh_required();
413 {
414 RWLock::RLocker locker(m_lock);
415 if (peek_item->is_write_op()) {
416 if (m_write_blockers > 0) {
417 return nullptr;
418 }
419
420 // refresh will requeue the op -- don't count it as in-progress
421 if (!refresh_required) {
422 m_in_progress_writes++;
423 }
424 } else if (m_require_lock_on_read) {
425 return nullptr;
426 }
427 }
428
429 ImageRequest<> *item = reinterpret_cast<ImageRequest<> *>(
430 ThreadPool::PointerWQ<ImageRequest<> >::_void_dequeue());
431 assert(peek_item == item);
432
433 if (refresh_required) {
434 ldout(m_image_ctx.cct, 15) << "image refresh required: delaying IO " << item
435 << dendl;
436
437 // stall IO until the refresh completes
438 m_refresh_in_progress = true;
439
440 get_pool_lock().Unlock();
441 m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
442 get_pool_lock().Lock();
443 return nullptr;
444 }
445
446 item->start_op();
447 return item;
448 }
449
450 void ImageRequestWQ::process(ImageRequest<> *req) {
451 CephContext *cct = m_image_ctx.cct;
452 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
453 << "req=" << req << dendl;
454
455 req->send();
456
457 finish_queued_op(req);
458 if (req->is_write_op()) {
459 finish_in_progress_write();
460 }
461 delete req;
462
463 finish_in_flight_op();
464 }
465
466 void ImageRequestWQ::finish_queued_op(ImageRequest<> *req) {
467 RWLock::RLocker locker(m_lock);
468 if (req->is_write_op()) {
469 assert(m_queued_writes > 0);
470 m_queued_writes--;
471 } else {
472 assert(m_queued_reads > 0);
473 m_queued_reads--;
474 }
475 }
476
477 void ImageRequestWQ::finish_in_progress_write() {
478 bool writes_blocked = false;
479 {
480 RWLock::RLocker locker(m_lock);
481 assert(m_in_progress_writes > 0);
482 if (--m_in_progress_writes == 0 &&
483 !m_write_blocker_contexts.empty()) {
484 writes_blocked = true;
485 }
486 }
487
488 if (writes_blocked) {
489 m_image_ctx.flush(new C_BlockedWrites(this));
490 }
491 }
492
493 int ImageRequestWQ::start_in_flight_op(AioCompletion *c) {
494 RWLock::RLocker locker(m_lock);
495
496 if (m_shutdown) {
497 CephContext *cct = m_image_ctx.cct;
498 lderr(cct) << "IO received on closed image" << dendl;
499
500 c->fail(-ESHUTDOWN);
501 return false;
502 }
503
504 m_in_flight_ops++;
505 return true;
506 }
507
508 void ImageRequestWQ::finish_in_flight_op() {
509 Context *on_shutdown;
510 {
511 RWLock::RLocker locker(m_lock);
512 if (--m_in_flight_ops > 0 || !m_shutdown) {
513 return;
514 }
515 on_shutdown = m_on_shutdown;
516 }
517
518 CephContext *cct = m_image_ctx.cct;
519 ldout(cct, 5) << "completing shut down" << dendl;
520
521 assert(on_shutdown != nullptr);
522 m_image_ctx.flush(on_shutdown);
523 }
524
525 bool ImageRequestWQ::is_lock_required() const {
526 assert(m_image_ctx.owner_lock.is_locked());
527 if (m_image_ctx.exclusive_lock == NULL) {
528 return false;
529 }
530
531 return (!m_image_ctx.exclusive_lock->is_lock_owner());
532 }
533
534 void ImageRequestWQ::queue(ImageRequest<> *req) {
535 CephContext *cct = m_image_ctx.cct;
536 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
537 << "req=" << req << dendl;
538
539 assert(m_image_ctx.owner_lock.is_locked());
540 bool write_op = req->is_write_op();
541 bool lock_required = (m_image_ctx.exclusive_lock != nullptr &&
542 ((write_op && is_lock_required()) ||
543 (!write_op && m_require_lock_on_read)));
544
545 if (lock_required && !m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
546 lderr(cct) << "op requires exclusive lock" << dendl;
547 req->fail(-EROFS);
548 delete req;
549 finish_in_flight_op();
550 return;
551 }
552
553 if (write_op) {
554 m_queued_writes++;
555 } else {
556 m_queued_reads++;
557 }
558
559 ThreadPool::PointerWQ<ImageRequest<> >::queue(req);
560
561 if (lock_required) {
562 m_image_ctx.exclusive_lock->acquire_lock(nullptr);
563 }
564 }
565
566 void ImageRequestWQ::handle_refreshed(int r, ImageRequest<> *req) {
567 CephContext *cct = m_image_ctx.cct;
568 ldout(cct, 15) << "resuming IO after image refresh: r=" << r << ", "
569 << "req=" << req << dendl;
570 if (r < 0) {
571 process_finish();
572 req->fail(r);
573 finish_queued_op(req);
574 delete req;
575 finish_in_flight_op();
576 } else {
577 // since IO was stalled for refresh -- original IO order is preserved
578 // if we requeue this op for work queue processing
579 requeue(req);
580 }
581
582 m_refresh_in_progress = false;
583 signal();
584
585 // refresh might have enabled exclusive lock -- IO stalled until
586 // we acquire the lock
587 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
588 if (is_lock_required() && is_lock_request_needed()) {
589 m_image_ctx.exclusive_lock->acquire_lock(nullptr);
590 }
591 }
592
593 void ImageRequestWQ::handle_blocked_writes(int r) {
594 Contexts contexts;
595 {
596 RWLock::WLocker locker(m_lock);
597 contexts.swap(m_write_blocker_contexts);
598 }
599
600 for (auto ctx : contexts) {
601 ctx->complete(0);
602 }
603 }
604
605 } // namespace io
606 } // namespace librbd