]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/io/ImageRequestWQ.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / librbd / io / ImageRequestWQ.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/io/ImageRequestWQ.h"
5 #include "common/errno.h"
6 #include "common/zipkin_trace.h"
7 #include "common/Cond.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageState.h"
11 #include "librbd/ImageWatcher.h"
12 #include "librbd/internal.h"
13 #include "librbd/Utils.h"
14 #include "librbd/exclusive_lock/Policy.h"
15 #include "librbd/io/AioCompletion.h"
16 #include "librbd/io/ImageRequest.h"
17 #include "librbd/io/ImageDispatchSpec.h"
18 #include "common/EventTrace.h"
19
20 #define dout_subsys ceph_subsys_rbd
21 #undef dout_prefix
22 #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
23 << " " << __func__ << ": "
24
25 namespace librbd {
26
27 using util::create_context_callback;
28
29 namespace io {
30
31 namespace {
32
33 template <typename I>
34 void flush_image(I& image_ctx, Context* on_finish) {
35 auto aio_comp = librbd::io::AioCompletion::create_and_start(
36 on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH);
37 auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request(
38 image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {});
39 req->send();
40 delete req;
41 }
42
43 } // anonymous namespace
44
45 template <typename I>
46 struct ImageRequestWQ<I>::C_AcquireLock : public Context {
47 ImageRequestWQ *work_queue;
48 ImageDispatchSpec<I> *image_request;
49
50 C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request)
51 : work_queue(work_queue), image_request(image_request) {
52 }
53
54 void finish(int r) override {
55 work_queue->handle_acquire_lock(r, image_request);
56 }
57 };
58
59 template <typename I>
60 struct ImageRequestWQ<I>::C_BlockedWrites : public Context {
61 ImageRequestWQ *work_queue;
62 explicit C_BlockedWrites(ImageRequestWQ *_work_queue)
63 : work_queue(_work_queue) {
64 }
65
66 void finish(int r) override {
67 work_queue->handle_blocked_writes(r);
68 }
69 };
70
71 template <typename I>
72 struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
73 ImageRequestWQ *work_queue;
74 ImageDispatchSpec<I> *image_request;
75
76 C_RefreshFinish(ImageRequestWQ *work_queue,
77 ImageDispatchSpec<I> *image_request)
78 : work_queue(work_queue), image_request(image_request) {
79 }
80 void finish(int r) override {
81 work_queue->handle_refreshed(r, image_request);
82 }
83 };
84
85 static std::map<uint64_t, std::string> throttle_flags = {
86 { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" },
87 { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" },
88 { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" },
89 { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" },
90 { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" },
91 { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" }
92 };
93
94 template <typename I>
95 ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
96 time_t ti, ThreadPool *tp)
97 : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp),
98 m_image_ctx(*image_ctx),
99 m_lock(ceph::make_shared_mutex(
100 util::unique_lock_name("ImageRequestWQ<I>::m_lock", this))) {
101 CephContext *cct = m_image_ctx.cct;
102 ldout(cct, 5) << "ictx=" << image_ctx << dendl;
103
104 SafeTimer *timer;
105 ceph::mutex *timer_lock;
106 ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
107
108 for (auto flag : throttle_flags) {
109 m_throttles.push_back(make_pair(
110 flag.first,
111 new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock)));
112 }
113
114 this->register_work_queue();
115 }
116
117 template <typename I>
118 ImageRequestWQ<I>::~ImageRequestWQ() {
119 for (auto t : m_throttles) {
120 delete t.second;
121 }
122 }
123
124 template <typename I>
125 ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
126 ReadResult &&read_result, int op_flags) {
127 CephContext *cct = m_image_ctx.cct;
128 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
129 << "len = " << len << dendl;
130
131 C_SaferCond cond;
132 AioCompletion *c = AioCompletion::create(&cond);
133 aio_read(c, off, len, std::move(read_result), op_flags, false);
134 return cond.wait();
135 }
136
137 template <typename I>
138 ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
139 bufferlist &&bl, int op_flags) {
140 CephContext *cct = m_image_ctx.cct;
141 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
142 << "len = " << len << dendl;
143
144 m_image_ctx.image_lock.lock_shared();
145 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
146 m_image_ctx.image_lock.unlock_shared();
147 if (r < 0) {
148 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
149 return r;
150 }
151
152 C_SaferCond cond;
153 AioCompletion *c = AioCompletion::create(&cond);
154 aio_write(c, off, len, std::move(bl), op_flags, false);
155
156 r = cond.wait();
157 if (r < 0) {
158 return r;
159 }
160 return len;
161 }
162
163 template <typename I>
164 ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
165 uint32_t discard_granularity_bytes) {
166 CephContext *cct = m_image_ctx.cct;
167 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
168 << "len = " << len << dendl;
169
170 m_image_ctx.image_lock.lock_shared();
171 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
172 m_image_ctx.image_lock.unlock_shared();
173 if (r < 0) {
174 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
175 return r;
176 }
177
178 C_SaferCond cond;
179 AioCompletion *c = AioCompletion::create(&cond);
180 aio_discard(c, off, len, discard_granularity_bytes, false);
181
182 r = cond.wait();
183 if (r < 0) {
184 return r;
185 }
186 return len;
187 }
188
189 template <typename I>
190 ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
191 bufferlist &&bl, int op_flags) {
192 CephContext *cct = m_image_ctx.cct;
193 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
194 << "len = " << len << ", data_len " << bl.length() << dendl;
195
196 m_image_ctx.image_lock.lock_shared();
197 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
198 m_image_ctx.image_lock.unlock_shared();
199 if (r < 0) {
200 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
201 return r;
202 }
203
204 C_SaferCond cond;
205 AioCompletion *c = AioCompletion::create(&cond);
206 aio_writesame(c, off, len, std::move(bl), op_flags, false);
207
208 r = cond.wait();
209 if (r < 0) {
210 return r;
211 }
212 return len;
213 }
214
215 template <typename I>
216 ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len,
217 bufferlist &&cmp_bl,
218 bufferlist &&bl,
219 uint64_t *mismatch_off,
220 int op_flags){
221 CephContext *cct = m_image_ctx.cct;
222 ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off="
223 << off << ", " << "len = " << len << dendl;
224
225 m_image_ctx.image_lock.lock_shared();
226 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
227 m_image_ctx.image_lock.unlock_shared();
228 if (r < 0) {
229 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
230 return r;
231 }
232
233 C_SaferCond cond;
234 AioCompletion *c = AioCompletion::create(&cond);
235 aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl),
236 mismatch_off, op_flags, false);
237
238 r = cond.wait();
239 if (r < 0) {
240 return r;
241 }
242
243 return len;
244 }
245
246 template <typename I>
247 int ImageRequestWQ<I>::flush() {
248 CephContext *cct = m_image_ctx.cct;
249 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
250
251 C_SaferCond cond;
252 AioCompletion *c = AioCompletion::create(&cond);
253 aio_flush(c, false);
254
255 int r = cond.wait();
256 if (r < 0) {
257 return r;
258 }
259
260 return 0;
261 }
262
263 template <typename I>
264 void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
265 ReadResult &&read_result, int op_flags,
266 bool native_async) {
267 CephContext *cct = m_image_ctx.cct;
268 FUNCTRACE(cct);
269 ZTracer::Trace trace;
270 if (m_image_ctx.blkin_trace_all) {
271 trace.init("wq: read", &m_image_ctx.trace_endpoint);
272 trace.event("start");
273 }
274
275 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ);
276 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
277 << "completion=" << c << ", off=" << off << ", "
278 << "len=" << len << ", " << "flags=" << op_flags << dendl;
279
280 if (native_async && m_image_ctx.event_socket.is_valid()) {
281 c->set_event_notify(true);
282 }
283
284 if (!start_in_flight_io(c)) {
285 return;
286 }
287
288 // if journaling is enabled -- we need to replay the journal because
289 // it might contain an uncommitted write
290 std::shared_lock owner_locker{m_image_ctx.owner_lock};
291 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
292 require_lock_on_read()) {
293 queue(ImageDispatchSpec<I>::create_read_request(
294 m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags,
295 trace));
296 } else {
297 c->start_op();
298 ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
299 std::move(read_result), op_flags, trace);
300 finish_in_flight_io();
301 }
302 trace.event("finish");
303 }
304
305 template <typename I>
306 void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
307 bufferlist &&bl, int op_flags,
308 bool native_async) {
309 CephContext *cct = m_image_ctx.cct;
310 FUNCTRACE(cct);
311 ZTracer::Trace trace;
312 if (m_image_ctx.blkin_trace_all) {
313 trace.init("wq: write", &m_image_ctx.trace_endpoint);
314 trace.event("init");
315 }
316
317 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE);
318 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
319 << "completion=" << c << ", off=" << off << ", "
320 << "len=" << len << ", flags=" << op_flags << dendl;
321
322 if (native_async && m_image_ctx.event_socket.is_valid()) {
323 c->set_event_notify(true);
324 }
325
326 if (!start_in_flight_io(c)) {
327 return;
328 }
329
330 auto tid = ++m_last_tid;
331
332 {
333 std::lock_guard locker{m_lock};
334 m_queued_or_blocked_io_tids.insert(tid);
335 }
336
337 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_request(
338 m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace, tid);
339
340 std::shared_lock owner_locker{m_image_ctx.owner_lock};
341 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
342 queue(req);
343 } else {
344 process_io(req, false);
345 finish_in_flight_io();
346 }
347 trace.event("finish");
348 }
349
350 template <typename I>
351 void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
352 uint64_t len,
353 uint32_t discard_granularity_bytes,
354 bool native_async) {
355 CephContext *cct = m_image_ctx.cct;
356 FUNCTRACE(cct);
357 ZTracer::Trace trace;
358 if (m_image_ctx.blkin_trace_all) {
359 trace.init("wq: discard", &m_image_ctx.trace_endpoint);
360 trace.event("init");
361 }
362
363 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD);
364 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
365 << "completion=" << c << ", off=" << off << ", len=" << len
366 << dendl;
367
368 if (native_async && m_image_ctx.event_socket.is_valid()) {
369 c->set_event_notify(true);
370 }
371
372 if (!start_in_flight_io(c)) {
373 return;
374 }
375
376 auto tid = ++m_last_tid;
377
378 {
379 std::lock_guard locker{m_lock};
380 m_queued_or_blocked_io_tids.insert(tid);
381 }
382
383 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_discard_request(
384 m_image_ctx, c, off, len, discard_granularity_bytes, trace, tid);
385
386 std::shared_lock owner_locker{m_image_ctx.owner_lock};
387 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
388 queue(req);
389 } else {
390 process_io(req, false);
391 finish_in_flight_io();
392 }
393 trace.event("finish");
394 }
395
396 template <typename I>
397 void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
398 CephContext *cct = m_image_ctx.cct;
399 FUNCTRACE(cct);
400 ZTracer::Trace trace;
401 if (m_image_ctx.blkin_trace_all) {
402 trace.init("wq: flush", &m_image_ctx.trace_endpoint);
403 trace.event("init");
404 }
405
406 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH);
407 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
408 << "completion=" << c << dendl;
409
410 if (native_async && m_image_ctx.event_socket.is_valid()) {
411 c->set_event_notify(true);
412 }
413
414 if (!start_in_flight_io(c)) {
415 return;
416 }
417
418 auto tid = ++m_last_tid;
419
420 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_flush_request(
421 m_image_ctx, c, FLUSH_SOURCE_USER, trace);
422
423 {
424 std::lock_guard locker{m_lock};
425 if(!m_queued_or_blocked_io_tids.empty()) {
426 ldout(cct, 20) << "queueing flush, tid: " << tid << dendl;
427 m_queued_flushes.emplace(tid, req);
428 --m_in_flight_ios;
429 return;
430 }
431 }
432
433 std::shared_lock owner_locker{m_image_ctx.owner_lock};
434 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
435 queue(req);
436 } else {
437 process_io(req, false);
438 finish_in_flight_io();
439 }
440 trace.event("finish");
441 }
442
443 template <typename I>
444 void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
445 uint64_t len, bufferlist &&bl,
446 int op_flags, bool native_async) {
447 CephContext *cct = m_image_ctx.cct;
448 FUNCTRACE(cct);
449 ZTracer::Trace trace;
450 if (m_image_ctx.blkin_trace_all) {
451 trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
452 trace.event("init");
453 }
454
455 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME);
456 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
457 << "completion=" << c << ", off=" << off << ", "
458 << "len=" << len << ", data_len = " << bl.length() << ", "
459 << "flags=" << op_flags << dendl;
460
461 if (native_async && m_image_ctx.event_socket.is_valid()) {
462 c->set_event_notify(true);
463 }
464
465 if (!start_in_flight_io(c)) {
466 return;
467 }
468
469 auto tid = ++m_last_tid;
470
471 {
472 std::lock_guard locker{m_lock};
473 m_queued_or_blocked_io_tids.insert(tid);
474 }
475
476 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_same_request(
477 m_image_ctx, c, off, len, std::move(bl), op_flags, trace, tid);
478
479 std::shared_lock owner_locker{m_image_ctx.owner_lock};
480 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
481 queue(req);
482 } else {
483 process_io(req, false);
484 finish_in_flight_io();
485 }
486 trace.event("finish");
487 }
488
489 template <typename I>
490 void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
491 uint64_t off, uint64_t len,
492 bufferlist &&cmp_bl,
493 bufferlist &&bl,
494 uint64_t *mismatch_off,
495 int op_flags, bool native_async) {
496 CephContext *cct = m_image_ctx.cct;
497 FUNCTRACE(cct);
498 ZTracer::Trace trace;
499 if (m_image_ctx.blkin_trace_all) {
500 trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint);
501 trace.event("init");
502 }
503
504 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE);
505 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
506 << "completion=" << c << ", off=" << off << ", "
507 << "len=" << len << dendl;
508
509 if (native_async && m_image_ctx.event_socket.is_valid()) {
510 c->set_event_notify(true);
511 }
512
513 if (!start_in_flight_io(c)) {
514 return;
515 }
516
517 auto tid = ++m_last_tid;
518
519 {
520 std::lock_guard locker{m_lock};
521 m_queued_or_blocked_io_tids.insert(tid);
522 }
523
524 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_compare_and_write_request(
525 m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
526 mismatch_off, op_flags, trace, tid);
527
528 std::shared_lock owner_locker{m_image_ctx.owner_lock};
529 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
530 queue(req);
531 } else {
532 process_io(req, false);
533 finish_in_flight_io();
534 }
535 trace.event("finish");
536 }
537
538 template <typename I>
539 bool ImageRequestWQ<I>::block_overlapping_io(
540 ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) {
541 CephContext *cct = m_image_ctx.cct;
542 ldout(cct, 20) << "ictx=" << &m_image_ctx
543 << "off: " << off << " len: " << len <<dendl;
544
545 if(len == 0) {
546 return false;
547 }
548
549 if (in_flight_image_extents->empty() ||
550 !in_flight_image_extents->intersects(off, len)) {
551 in_flight_image_extents->insert(off, len);
552 return false;
553 }
554
555 return true;
556 }
557
558 template <typename I>
559 void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length,
560 uint64_t tid) {
561 CephContext *cct = m_image_ctx.cct;
562 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
563
564 remove_in_flight_write_ios(offset, length, true, tid);
565
566 std::unique_lock locker{m_lock};
567 if (!m_blocked_ios.empty()) {
568 auto it = m_blocked_ios.begin();
569 while (it != m_blocked_ios.end()) {
570 auto blocked_io = *it;
571
572 const auto& extents = blocked_io->get_image_extents();
573 uint64_t off = extents.size() ? extents.front().first : 0;
574 uint64_t len = extents.size() ? extents.front().second : 0;
575
576 if (block_overlapping_io(&m_in_flight_extents, off, len)) {
577 break;
578 }
579 ldout(cct, 20) << "unblocking off: " << off << ", "
580 << "len: " << len << dendl;
581 AioCompletion *aio_comp = blocked_io->get_aio_completion();
582
583 m_blocked_ios.erase(it);
584 locker.unlock();
585 queue_unblocked_io(aio_comp, blocked_io);
586 locker.lock();
587 }
588 }
589 }
590
591 template <typename I>
592 void ImageRequestWQ<I>::unblock_flushes() {
593 CephContext *cct = m_image_ctx.cct;
594 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
595 std::unique_lock locker{m_lock};
596 auto io_tid_it = m_queued_or_blocked_io_tids.begin();
597 while (true) {
598 auto it = m_queued_flushes.begin();
599 if (it == m_queued_flushes.end() ||
600 (io_tid_it != m_queued_or_blocked_io_tids.end() &&
601 *io_tid_it < it->first)) {
602 break;
603 }
604
605 auto blocked_flush = *it;
606 ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl;
607
608 AioCompletion *aio_comp = blocked_flush.second->get_aio_completion();
609
610 m_queued_flushes.erase(it);
611 locker.unlock();
612 queue_unblocked_io(aio_comp, blocked_flush.second);
613 locker.lock();
614 }
615 }
616
617 template <typename I>
618 void ImageRequestWQ<I>::queue_unblocked_io(AioCompletion *comp,
619 ImageDispatchSpec<I> *req) {
620 if (!start_in_flight_io(comp)) {
621 return;
622 }
623
624 std::shared_lock owner_locker{m_image_ctx.owner_lock};
625 queue(req);
626 }
627
628 template <typename I>
629 void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
630 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
631
632 {
633 std::unique_lock locker{m_lock};
634 ceph_assert(!m_shutdown);
635 m_shutdown = true;
636
637 CephContext *cct = m_image_ctx.cct;
638 ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load()
639 << dendl;
640 if (m_in_flight_ios > 0) {
641 m_on_shutdown = on_shutdown;
642 return;
643 }
644 }
645
646 // ensure that all in-flight IO is flushed
647 flush_image(m_image_ctx, on_shutdown);
648 }
649
650 template <typename I>
651 int ImageRequestWQ<I>::block_writes() {
652 C_SaferCond cond_ctx;
653 block_writes(&cond_ctx);
654 return cond_ctx.wait();
655 }
656
657 template <typename I>
658 void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
659 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
660 CephContext *cct = m_image_ctx.cct;
661
662 {
663 std::unique_lock locker{m_lock};
664 ++m_write_blockers;
665 ldout(cct, 5) << &m_image_ctx << ", " << "num="
666 << m_write_blockers << dendl;
667 if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) {
668 m_write_blocker_contexts.push_back(on_blocked);
669 return;
670 }
671 }
672
673 // ensure that all in-flight IO is flushed
674 flush_image(m_image_ctx, on_blocked);
675 }
676
677 template <typename I>
678 void ImageRequestWQ<I>::unblock_writes() {
679 CephContext *cct = m_image_ctx.cct;
680
681 bool wake_up = false;
682 Contexts waiter_contexts;
683 {
684 std::unique_lock locker{m_lock};
685 ceph_assert(m_write_blockers > 0);
686 --m_write_blockers;
687
688 ldout(cct, 5) << &m_image_ctx << ", " << "num="
689 << m_write_blockers << dendl;
690 if (m_write_blockers == 0) {
691 wake_up = true;
692 std::swap(waiter_contexts, m_unblocked_write_waiter_contexts);
693 }
694 }
695
696 if (wake_up) {
697 for (auto ctx : waiter_contexts) {
698 ctx->complete(0);
699 }
700 this->signal();
701 }
702 }
703
704 template <typename I>
705 void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) {
706 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
707 CephContext *cct = m_image_ctx.cct;
708
709 {
710 std::unique_lock locker{m_lock};
711 ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers="
712 << m_write_blockers << dendl;
713 if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) {
714 m_unblocked_write_waiter_contexts.push_back(on_unblocked);
715 return;
716 }
717 }
718
719 on_unblocked->complete(0);
720 }
721
722 template <typename I>
723 void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
724 CephContext *cct = m_image_ctx.cct;
725 ldout(cct, 20) << dendl;
726
727 bool wake_up = false;
728 {
729 std::unique_lock locker{m_lock};
730 switch (direction) {
731 case DIRECTION_READ:
732 wake_up = (enabled != m_require_lock_on_read);
733 m_require_lock_on_read = enabled;
734 break;
735 case DIRECTION_WRITE:
736 wake_up = (enabled != m_require_lock_on_write);
737 m_require_lock_on_write = enabled;
738 break;
739 case DIRECTION_BOTH:
740 wake_up = (enabled != m_require_lock_on_read ||
741 enabled != m_require_lock_on_write);
742 m_require_lock_on_read = enabled;
743 m_require_lock_on_write = enabled;
744 break;
745 }
746 }
747
748 // wake up the thread pool whenever the state changes so that
749 // we can re-request the lock if required
750 if (wake_up) {
751 this->signal();
752 }
753 }
754
755 template <typename I>
756 void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){
757 for (auto pair : m_throttles) {
758 pair.second->set_schedule_tick_min(tick);
759 }
760 }
761
762 template <typename I>
763 void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag,
764 uint64_t limit,
765 uint64_t burst) {
766 CephContext *cct = m_image_ctx.cct;
767 TokenBucketThrottle *throttle = nullptr;
768 for (auto pair : m_throttles) {
769 if (flag == pair.first) {
770 throttle = pair.second;
771 break;
772 }
773 }
774 ceph_assert(throttle != nullptr);
775
776 int r = throttle->set_limit(limit, burst);
777 if (r < 0) {
778 lderr(cct) << throttle->get_name() << ": invalid qos parameter: "
779 << "burst(" << burst << ") is less than "
780 << "limit(" << limit << ")" << dendl;
781 // if apply failed, we should at least make sure the limit works.
782 throttle->set_limit(limit, 0);
783 }
784
785 if (limit)
786 m_qos_enabled_flag |= flag;
787 else
788 m_qos_enabled_flag &= ~flag;
789 }
790
791 template <typename I>
792 void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item, uint64_t flag) {
793 CephContext *cct = m_image_ctx.cct;
794 ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl;
795
796 ceph_assert(m_io_throttled.load() > 0);
797 item->set_throttled(flag);
798 if (item->were_all_throttled()) {
799 this->requeue_back(item);
800 --m_io_throttled;
801 this->signal();
802 }
803 }
804
805 template <typename I>
806 bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) {
807 uint64_t tokens = 0;
808 uint64_t flag = 0;
809 bool blocked = false;
810 TokenBucketThrottle* throttle = nullptr;
811
812 for (auto t : m_throttles) {
813 flag = t.first;
814 if (item->was_throttled(flag))
815 continue;
816
817 if (!(m_qos_enabled_flag & flag)) {
818 item->set_throttled(flag);
819 continue;
820 }
821
822 throttle = t.second;
823 if (item->tokens_requested(flag, &tokens) &&
824 throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>,
825 &ImageRequestWQ<I>::handle_throttle_ready>(
826 tokens, this, item, flag)) {
827 blocked = true;
828 } else {
829 item->set_throttled(flag);
830 }
831 }
832 return blocked;
833 }
834
835 template <typename I>
836 void *ImageRequestWQ<I>::_void_dequeue() {
837 CephContext *cct = m_image_ctx.cct;
838 ImageDispatchSpec<I> *peek_item = this->front();
839
840 // no queued IO requests or all IO is blocked/stalled
841 if (peek_item == nullptr || m_io_blockers.load() > 0) {
842 return nullptr;
843 }
844
845 if (needs_throttle(peek_item)) {
846 ldout(cct, 15) << "throttling IO " << peek_item << dendl;
847
848 ++m_io_throttled;
849 // dequeue the throttled item
850 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue();
851 return nullptr;
852 }
853
854 bool lock_required;
855 bool refresh_required = m_image_ctx.state->is_refresh_required();
856 {
857 std::shared_lock locker{m_lock};
858 bool write_op = peek_item->is_write_op();
859 lock_required = is_lock_required(write_op);
860 if (write_op) {
861 if (!lock_required && m_write_blockers > 0) {
862 // missing lock is not the write blocker
863 return nullptr;
864 }
865
866 if (!lock_required && !refresh_required && !peek_item->blocked) {
867 // completed ops will requeue the IO -- don't count it as in-progress
868 m_in_flight_writes++;
869 }
870 }
871 }
872
873 auto item = reinterpret_cast<ImageDispatchSpec<I> *>(
874 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue());
875 ceph_assert(peek_item == item);
876
877 if (lock_required) {
878 this->get_pool_lock().unlock();
879 m_image_ctx.owner_lock.lock_shared();
880 if (m_image_ctx.exclusive_lock != nullptr) {
881 ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
882 if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
883 lderr(cct) << "op requires exclusive lock" << dendl;
884 fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(),
885 item);
886
887 // wake up the IO since we won't be returning a request to process
888 this->signal();
889 } else {
890 // stall IO until the acquire completes
891 ++m_io_blockers;
892 Context *ctx = new C_AcquireLock(this, item);
893 ctx = create_context_callback<
894 Context, &Context::complete>(
895 ctx, m_image_ctx.exclusive_lock);
896 m_image_ctx.exclusive_lock->acquire_lock(ctx);
897 }
898 } else {
899 // raced with the exclusive lock being disabled
900 lock_required = false;
901 }
902 m_image_ctx.owner_lock.unlock_shared();
903 this->get_pool_lock().lock();
904
905 if (lock_required) {
906 return nullptr;
907 }
908 }
909
910 if (refresh_required) {
911 ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
912
913 // stall IO until the refresh completes
914 ++m_io_blockers;
915
916 this->get_pool_lock().unlock();
917 m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
918 this->get_pool_lock().lock();
919 return nullptr;
920 }
921
922 return item;
923 }
924
925 template <typename I>
926 void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req,
927 bool non_blocking_io) {
928 CephContext *cct = m_image_ctx.cct;
929 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
930 << "req=" << req << dendl;
931
932 //extents are invalidated after the request is sent
933 //so gather them ahead of that
934 const auto& extents = req->get_image_extents();
935 bool write_op = req->is_write_op();
936 uint64_t tid = req->get_tid();
937 uint64_t offset = extents.size() ? extents.front().first : 0;
938 uint64_t length = extents.size() ? extents.front().second : 0;
939
940 if (write_op && !req->blocked) {
941 std::lock_guard locker{m_lock};
942 bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length);
943 if (blocked) {
944 ldout(cct, 20) << "blocking overlapping IO: " << "ictx="
945 << &m_image_ctx << ", "
946 << "off=" << offset << ", len=" << length << dendl;
947 req->blocked = true;
948 m_blocked_ios.push_back(req);
949 return;
950 }
951 }
952
953 req->start_op();
954 req->send();
955
956 if (write_op) {
957 if (non_blocking_io) {
958 finish_in_flight_write();
959 }
960 unblock_overlapping_io(offset, length, tid);
961 unblock_flushes();
962 }
963 delete req;
964 }
965
966 template <typename I>
967 void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
968 CephContext *cct = m_image_ctx.cct;
969 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
970 << "req=" << req << dendl;
971
972 bool write_op = req->is_write_op();
973
974 process_io(req, true);
975
976 finish_queued_io(write_op);
977 finish_in_flight_io();
978 }
979
980 template <typename I>
981 void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t length,
982 bool write_op, uint64_t tid) {
983 CephContext *cct = m_image_ctx.cct;
984 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
985 {
986 std::lock_guard locker{m_lock};
987 if (write_op) {
988 if (length > 0) {
989 if(!m_in_flight_extents.empty()) {
990 CephContext *cct = m_image_ctx.cct;
991 ldout(cct, 20) << "erasing in flight extents with tid:"
992 << tid << ", offset: " << offset << dendl;
993 ImageExtentIntervals extents;
994 extents.insert(offset, length);
995 ImageExtentIntervals intersect;
996 intersect.intersection_of(extents, m_in_flight_extents);
997 m_in_flight_extents.subtract(intersect);
998 }
999 }
1000 m_queued_or_blocked_io_tids.erase(tid);
1001 }
1002 }
1003 }
1004
1005 template <typename I>
1006 void ImageRequestWQ<I>::finish_queued_io(bool write_op) {
1007 std::shared_lock locker{m_lock};
1008 if (write_op) {
1009 ceph_assert(m_queued_writes > 0);
1010 m_queued_writes--;
1011 } else {
1012 ceph_assert(m_queued_reads > 0);
1013 m_queued_reads--;
1014 }
1015 }
1016
1017 template <typename I>
1018 void ImageRequestWQ<I>::finish_in_flight_write() {
1019 bool writes_blocked = false;
1020 {
1021 std::shared_lock locker{m_lock};
1022 ceph_assert(m_in_flight_writes > 0);
1023 if (--m_in_flight_writes == 0 &&
1024 !m_write_blocker_contexts.empty()) {
1025 writes_blocked = true;
1026 }
1027 }
1028 if (writes_blocked) {
1029 flush_image(m_image_ctx, new C_BlockedWrites(this));
1030 }
1031 }
1032
1033 template <typename I>
1034 int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
1035 std::shared_lock locker{m_lock};
1036
1037 if (m_shutdown) {
1038 CephContext *cct = m_image_ctx.cct;
1039 lderr(cct) << "IO received on closed image" << dendl;
1040
1041 c->fail(-ESHUTDOWN);
1042 return false;
1043 }
1044
1045 if (!m_image_ctx.data_ctx.is_valid()) {
1046 CephContext *cct = m_image_ctx.cct;
1047 lderr(cct) << "missing data pool" << dendl;
1048
1049 c->fail(-ENODEV);
1050 return false;
1051 }
1052
1053 m_in_flight_ios++;
1054 return true;
1055 }
1056
1057 template <typename I>
1058 void ImageRequestWQ<I>::finish_in_flight_io() {
1059 Context *on_shutdown;
1060 {
1061 std::shared_lock locker{m_lock};
1062 if (--m_in_flight_ios > 0 || !m_shutdown) {
1063 return;
1064 }
1065 on_shutdown = m_on_shutdown;
1066 }
1067
1068 CephContext *cct = m_image_ctx.cct;
1069 ldout(cct, 5) << "completing shut down" << dendl;
1070
1071 ceph_assert(on_shutdown != nullptr);
1072 flush_image(m_image_ctx, on_shutdown);
1073 }
1074
1075 template <typename I>
1076 void ImageRequestWQ<I>::fail_in_flight_io(
1077 int r, ImageDispatchSpec<I> *req) {
1078 this->process_finish();
1079 req->fail(r);
1080
1081 bool write_op = req->is_write_op();
1082 uint64_t tid = req->get_tid();
1083 const auto& extents = req->get_image_extents();
1084 uint64_t offset = extents.size() ? extents.front().first : 0;
1085 uint64_t length = extents.size() ? extents.front().second : 0;
1086
1087 finish_queued_io(write_op);
1088 remove_in_flight_write_ios(offset, length, write_op, tid);
1089 delete req;
1090 finish_in_flight_io();
1091 }
1092
1093 template <typename I>
1094 bool ImageRequestWQ<I>::is_lock_required(bool write_op) const {
1095 ceph_assert(ceph_mutex_is_locked(m_lock));
1096 return ((write_op && m_require_lock_on_write) ||
1097 (!write_op && m_require_lock_on_read));
1098 }
1099
1100 template <typename I>
1101 void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) {
1102 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
1103
1104 CephContext *cct = m_image_ctx.cct;
1105 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
1106 << "req=" << req << dendl;
1107
1108 if (req->is_write_op()) {
1109 m_queued_writes++;
1110 } else {
1111 m_queued_reads++;
1112 }
1113
1114 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req);
1115 }
1116
1117 template <typename I>
1118 void ImageRequestWQ<I>::handle_acquire_lock(
1119 int r, ImageDispatchSpec<I> *req) {
1120 CephContext *cct = m_image_ctx.cct;
1121 ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
1122
1123 if (r < 0) {
1124 fail_in_flight_io(r, req);
1125 } else {
1126 // since IO was stalled for acquire -- original IO order is preserved
1127 // if we requeue this op for work queue processing
1128 this->requeue_front(req);
1129 }
1130
1131 ceph_assert(m_io_blockers.load() > 0);
1132 --m_io_blockers;
1133 this->signal();
1134 }
1135
1136 template <typename I>
1137 void ImageRequestWQ<I>::handle_refreshed(
1138 int r, ImageDispatchSpec<I> *req) {
1139 CephContext *cct = m_image_ctx.cct;
1140 ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
1141 << "req=" << req << dendl;
1142 if (r < 0) {
1143 fail_in_flight_io(r, req);
1144 } else {
1145 // since IO was stalled for refresh -- original IO order is preserved
1146 // if we requeue this op for work queue processing
1147 this->requeue_front(req);
1148 }
1149
1150 ceph_assert(m_io_blockers.load() > 0);
1151 --m_io_blockers;
1152 this->signal();
1153 }
1154
1155 template <typename I>
1156 void ImageRequestWQ<I>::handle_blocked_writes(int r) {
1157 Contexts contexts;
1158 {
1159 std::unique_lock locker{m_lock};
1160 contexts.swap(m_write_blocker_contexts);
1161 }
1162
1163 for (auto ctx : contexts) {
1164 ctx->complete(0);
1165 }
1166 }
1167
1168 template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
1169
1170 } // namespace io
1171 } // namespace librbd