]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/io/ImageRequestWQ.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / librbd / io / ImageRequestWQ.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/io/ImageRequestWQ.h"
5 #include "common/errno.h"
6 #include "common/zipkin_trace.h"
7 #include "common/Cond.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageState.h"
11 #include "librbd/ImageWatcher.h"
12 #include "librbd/internal.h"
13 #include "librbd/Utils.h"
14 #include "librbd/exclusive_lock/Policy.h"
15 #include "librbd/io/AioCompletion.h"
16 #include "librbd/io/ImageRequest.h"
17 #include "librbd/io/ImageDispatchSpec.h"
18 #include "common/EventTrace.h"
19
20 #define dout_subsys ceph_subsys_rbd
21 #undef dout_prefix
22 #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
23 << " " << __func__ << ": "
24
25 namespace librbd {
26
27 using util::create_context_callback;
28
29 namespace io {
30
31 namespace {
32
33 template <typename I>
34 void flush_image(I& image_ctx, Context* on_finish) {
35 auto aio_comp = librbd::io::AioCompletion::create_and_start(
36 on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH);
37 auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request(
38 image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {});
39 req->send();
40 delete req;
41 }
42
43 } // anonymous namespace
44
45 template <typename I>
46 struct ImageRequestWQ<I>::C_AcquireLock : public Context {
47 ImageRequestWQ *work_queue;
48 ImageDispatchSpec<I> *image_request;
49
50 C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request)
51 : work_queue(work_queue), image_request(image_request) {
52 }
53
54 void finish(int r) override {
55 work_queue->handle_acquire_lock(r, image_request);
56 }
57 };
58
59 template <typename I>
60 struct ImageRequestWQ<I>::C_BlockedWrites : public Context {
61 ImageRequestWQ *work_queue;
62 explicit C_BlockedWrites(ImageRequestWQ *_work_queue)
63 : work_queue(_work_queue) {
64 }
65
66 void finish(int r) override {
67 work_queue->handle_blocked_writes(r);
68 }
69 };
70
71 template <typename I>
72 struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
73 ImageRequestWQ *work_queue;
74 ImageDispatchSpec<I> *image_request;
75
76 C_RefreshFinish(ImageRequestWQ *work_queue,
77 ImageDispatchSpec<I> *image_request)
78 : work_queue(work_queue), image_request(image_request) {
79 }
80 void finish(int r) override {
81 work_queue->handle_refreshed(r, image_request);
82 }
83 };
84
85 static std::map<uint64_t, std::string> throttle_flags = {
86 { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" },
87 { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" },
88 { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" },
89 { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" },
90 { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" },
91 { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" }
92 };
93
94 template <typename I>
95 ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
96 time_t ti, ThreadPool *tp)
97 : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp),
98 m_image_ctx(*image_ctx),
99 m_lock(ceph::make_shared_mutex(
100 util::unique_lock_name("ImageRequestWQ<I>::m_lock", this))) {
101 CephContext *cct = m_image_ctx.cct;
102 ldout(cct, 5) << "ictx=" << image_ctx << dendl;
103
104 SafeTimer *timer;
105 ceph::mutex *timer_lock;
106 ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
107
108 for (auto flag : throttle_flags) {
109 m_throttles.push_back(make_pair(
110 flag.first,
111 new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock)));
112 }
113
114 this->register_work_queue();
115 }
116
117 template <typename I>
118 ImageRequestWQ<I>::~ImageRequestWQ() {
119 for (auto t : m_throttles) {
120 delete t.second;
121 }
122 }
123
124 template <typename I>
125 ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
126 ReadResult &&read_result, int op_flags) {
127 CephContext *cct = m_image_ctx.cct;
128 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
129 << "len = " << len << dendl;
130
131 C_SaferCond cond;
132 AioCompletion *c = AioCompletion::create(&cond);
133 aio_read(c, off, len, std::move(read_result), op_flags, false);
134 return cond.wait();
135 }
136
137 template <typename I>
138 ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
139 bufferlist &&bl, int op_flags) {
140 CephContext *cct = m_image_ctx.cct;
141 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
142 << "len = " << len << dendl;
143
144 m_image_ctx.image_lock.lock_shared();
145 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
146 m_image_ctx.image_lock.unlock_shared();
147 if (r < 0) {
148 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
149 return r;
150 }
151
152 C_SaferCond cond;
153 AioCompletion *c = AioCompletion::create(&cond);
154 aio_write(c, off, len, std::move(bl), op_flags, false);
155
156 r = cond.wait();
157 if (r < 0) {
158 return r;
159 }
160 return len;
161 }
162
163 template <typename I>
164 ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
165 uint32_t discard_granularity_bytes) {
166 CephContext *cct = m_image_ctx.cct;
167 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
168 << "len = " << len << dendl;
169
170 m_image_ctx.image_lock.lock_shared();
171 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
172 m_image_ctx.image_lock.unlock_shared();
173 if (r < 0) {
174 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
175 return r;
176 }
177
178 C_SaferCond cond;
179 AioCompletion *c = AioCompletion::create(&cond);
180 aio_discard(c, off, len, discard_granularity_bytes, false);
181
182 r = cond.wait();
183 if (r < 0) {
184 return r;
185 }
186 return len;
187 }
188
189 template <typename I>
190 ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
191 bufferlist &&bl, int op_flags) {
192 CephContext *cct = m_image_ctx.cct;
193 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
194 << "len = " << len << ", data_len " << bl.length() << dendl;
195
196 m_image_ctx.image_lock.lock_shared();
197 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
198 m_image_ctx.image_lock.unlock_shared();
199 if (r < 0) {
200 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
201 return r;
202 }
203
204 C_SaferCond cond;
205 AioCompletion *c = AioCompletion::create(&cond);
206 aio_writesame(c, off, len, std::move(bl), op_flags, false);
207
208 r = cond.wait();
209 if (r < 0) {
210 return r;
211 }
212 return len;
213 }
214
215 template <typename I>
216 ssize_t ImageRequestWQ<I>::write_zeroes(uint64_t off, uint64_t len,
217 int zero_flags, int op_flags) {
218 auto cct = m_image_ctx.cct;
219 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
220 << "len = " << len << dendl;
221
222 m_image_ctx.image_lock.lock_shared();
223 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
224 m_image_ctx.image_lock.unlock_shared();
225 if (r < 0) {
226 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
227 return r;
228 }
229
230 C_SaferCond ctx;
231 auto aio_comp = io::AioCompletion::create(&ctx);
232 aio_write_zeroes(aio_comp, off, len, zero_flags, op_flags, false);
233
234 r = ctx.wait();
235 if (r < 0) {
236 return r;
237 }
238 return len;
239 }
240
241 template <typename I>
242 ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len,
243 bufferlist &&cmp_bl,
244 bufferlist &&bl,
245 uint64_t *mismatch_off,
246 int op_flags){
247 CephContext *cct = m_image_ctx.cct;
248 ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off="
249 << off << ", " << "len = " << len << dendl;
250
251 m_image_ctx.image_lock.lock_shared();
252 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
253 m_image_ctx.image_lock.unlock_shared();
254 if (r < 0) {
255 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
256 return r;
257 }
258
259 C_SaferCond cond;
260 AioCompletion *c = AioCompletion::create(&cond);
261 aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl),
262 mismatch_off, op_flags, false);
263
264 r = cond.wait();
265 if (r < 0) {
266 return r;
267 }
268
269 return len;
270 }
271
272 template <typename I>
273 int ImageRequestWQ<I>::flush() {
274 CephContext *cct = m_image_ctx.cct;
275 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
276
277 C_SaferCond cond;
278 AioCompletion *c = AioCompletion::create(&cond);
279 aio_flush(c, false);
280
281 int r = cond.wait();
282 if (r < 0) {
283 return r;
284 }
285
286 return 0;
287 }
288
289 template <typename I>
290 void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
291 ReadResult &&read_result, int op_flags,
292 bool native_async) {
293 CephContext *cct = m_image_ctx.cct;
294 FUNCTRACE(cct);
295 ZTracer::Trace trace;
296 if (m_image_ctx.blkin_trace_all) {
297 trace.init("wq: read", &m_image_ctx.trace_endpoint);
298 trace.event("start");
299 }
300
301 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ);
302 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
303 << "completion=" << c << ", off=" << off << ", "
304 << "len=" << len << ", " << "flags=" << op_flags << dendl;
305
306 if (native_async && m_image_ctx.event_socket.is_valid()) {
307 c->set_event_notify(true);
308 }
309
310 if (!start_in_flight_io(c)) {
311 return;
312 }
313
314 // if journaling is enabled -- we need to replay the journal because
315 // it might contain an uncommitted write
316 std::shared_lock owner_locker{m_image_ctx.owner_lock};
317 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
318 require_lock_on_read()) {
319 queue(ImageDispatchSpec<I>::create_read_request(
320 m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags,
321 trace));
322 } else {
323 c->start_op();
324 ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
325 std::move(read_result), op_flags, trace);
326 finish_in_flight_io();
327 }
328 trace.event("finish");
329 }
330
331 template <typename I>
332 void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
333 bufferlist &&bl, int op_flags,
334 bool native_async) {
335 CephContext *cct = m_image_ctx.cct;
336 FUNCTRACE(cct);
337 ZTracer::Trace trace;
338 if (m_image_ctx.blkin_trace_all) {
339 trace.init("wq: write", &m_image_ctx.trace_endpoint);
340 trace.event("init");
341 }
342
343 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE);
344 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
345 << "completion=" << c << ", off=" << off << ", "
346 << "len=" << len << ", flags=" << op_flags << dendl;
347
348 if (native_async && m_image_ctx.event_socket.is_valid()) {
349 c->set_event_notify(true);
350 }
351
352 if (!start_in_flight_io(c)) {
353 return;
354 }
355
356 auto tid = ++m_last_tid;
357
358 {
359 std::lock_guard locker{m_lock};
360 m_queued_or_blocked_io_tids.insert(tid);
361 }
362
363 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_request(
364 m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace, tid);
365
366 std::shared_lock owner_locker{m_image_ctx.owner_lock};
367 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
368 queue(req);
369 } else {
370 process_io(req, false);
371 finish_in_flight_io();
372 }
373 trace.event("finish");
374 }
375
376 template <typename I>
377 void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
378 uint64_t len,
379 uint32_t discard_granularity_bytes,
380 bool native_async) {
381 CephContext *cct = m_image_ctx.cct;
382 FUNCTRACE(cct);
383 ZTracer::Trace trace;
384 if (m_image_ctx.blkin_trace_all) {
385 trace.init("wq: discard", &m_image_ctx.trace_endpoint);
386 trace.event("init");
387 }
388
389 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD);
390 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
391 << "completion=" << c << ", off=" << off << ", len=" << len
392 << dendl;
393
394 if (native_async && m_image_ctx.event_socket.is_valid()) {
395 c->set_event_notify(true);
396 }
397
398 if (!start_in_flight_io(c)) {
399 return;
400 }
401
402 auto tid = ++m_last_tid;
403
404 {
405 std::lock_guard locker{m_lock};
406 m_queued_or_blocked_io_tids.insert(tid);
407 }
408
409 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_discard_request(
410 m_image_ctx, c, off, len, discard_granularity_bytes, trace, tid);
411
412 std::shared_lock owner_locker{m_image_ctx.owner_lock};
413 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
414 queue(req);
415 } else {
416 process_io(req, false);
417 finish_in_flight_io();
418 }
419 trace.event("finish");
420 }
421
422 template <typename I>
423 void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
424 CephContext *cct = m_image_ctx.cct;
425 FUNCTRACE(cct);
426 ZTracer::Trace trace;
427 if (m_image_ctx.blkin_trace_all) {
428 trace.init("wq: flush", &m_image_ctx.trace_endpoint);
429 trace.event("init");
430 }
431
432 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH);
433 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
434 << "completion=" << c << dendl;
435
436 if (native_async && m_image_ctx.event_socket.is_valid()) {
437 c->set_event_notify(true);
438 }
439
440 if (!start_in_flight_io(c)) {
441 return;
442 }
443
444 auto tid = ++m_last_tid;
445
446 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_flush_request(
447 m_image_ctx, c, FLUSH_SOURCE_USER, trace);
448
449 {
450 std::lock_guard locker{m_lock};
451 if(!m_queued_or_blocked_io_tids.empty()) {
452 ldout(cct, 20) << "queueing flush, tid: " << tid << dendl;
453 m_queued_flushes.emplace(tid, req);
454 --m_in_flight_ios;
455 return;
456 }
457 }
458
459 std::shared_lock owner_locker{m_image_ctx.owner_lock};
460 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
461 queue(req);
462 } else {
463 process_io(req, false);
464 finish_in_flight_io();
465 }
466 trace.event("finish");
467 }
468
469 template <typename I>
470 void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
471 uint64_t len, bufferlist &&bl,
472 int op_flags, bool native_async) {
473 CephContext *cct = m_image_ctx.cct;
474 FUNCTRACE(cct);
475 ZTracer::Trace trace;
476 if (m_image_ctx.blkin_trace_all) {
477 trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
478 trace.event("init");
479 }
480
481 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME);
482 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
483 << "completion=" << c << ", off=" << off << ", "
484 << "len=" << len << ", data_len = " << bl.length() << ", "
485 << "flags=" << op_flags << dendl;
486
487 if (native_async && m_image_ctx.event_socket.is_valid()) {
488 c->set_event_notify(true);
489 }
490
491 if (!start_in_flight_io(c)) {
492 return;
493 }
494
495 auto tid = ++m_last_tid;
496
497 {
498 std::lock_guard locker{m_lock};
499 m_queued_or_blocked_io_tids.insert(tid);
500 }
501
502 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_same_request(
503 m_image_ctx, c, off, len, std::move(bl), op_flags, trace, tid);
504
505 std::shared_lock owner_locker{m_image_ctx.owner_lock};
506 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
507 queue(req);
508 } else {
509 process_io(req, false);
510 finish_in_flight_io();
511 }
512 trace.event("finish");
513 }
514
515
516 template <typename I>
517 void ImageRequestWQ<I>::aio_write_zeroes(io::AioCompletion *aio_comp,
518 uint64_t off, uint64_t len,
519 int zero_flags, int op_flags,
520 bool native_async) {
521 auto cct = m_image_ctx.cct;
522 FUNCTRACE(cct);
523 ZTracer::Trace trace;
524 if (m_image_ctx.blkin_trace_all) {
525 trace.init("io: write_zeroes", &m_image_ctx.trace_endpoint);
526 trace.event("init");
527 }
528
529 aio_comp->init_time(util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_DISCARD);
530 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
531 << "completion=" << aio_comp << ", off=" << off << ", "
532 << "len=" << len << dendl;
533
534 if (native_async && m_image_ctx.event_socket.is_valid()) {
535 aio_comp->set_event_notify(true);
536 }
537
538 // validate the supported flags
539 if (zero_flags != 0U) {
540 aio_comp->fail(-EINVAL);
541 return;
542 }
543
544 if (!start_in_flight_io(aio_comp)) {
545 return;
546 }
547
548 // enable partial discard (zeroing) of objects
549 uint32_t discard_granularity_bytes = 0;
550
551 auto tid = ++m_last_tid;
552
553 {
554 std::lock_guard locker{m_lock};
555 m_queued_or_blocked_io_tids.insert(tid);
556 }
557
558 auto req = ImageDispatchSpec<I>::create_discard_request(
559 m_image_ctx, aio_comp, off, len, discard_granularity_bytes, trace, tid);
560
561 std::shared_lock owner_locker{m_image_ctx.owner_lock};
562 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
563 queue(req);
564 } else {
565 process_io(req, false);
566 finish_in_flight_io();
567 }
568 trace.event("finish");
569 }
570
571 template <typename I>
572 void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
573 uint64_t off, uint64_t len,
574 bufferlist &&cmp_bl,
575 bufferlist &&bl,
576 uint64_t *mismatch_off,
577 int op_flags, bool native_async) {
578 CephContext *cct = m_image_ctx.cct;
579 FUNCTRACE(cct);
580 ZTracer::Trace trace;
581 if (m_image_ctx.blkin_trace_all) {
582 trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint);
583 trace.event("init");
584 }
585
586 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE);
587 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
588 << "completion=" << c << ", off=" << off << ", "
589 << "len=" << len << dendl;
590
591 if (native_async && m_image_ctx.event_socket.is_valid()) {
592 c->set_event_notify(true);
593 }
594
595 if (!start_in_flight_io(c)) {
596 return;
597 }
598
599 auto tid = ++m_last_tid;
600
601 {
602 std::lock_guard locker{m_lock};
603 m_queued_or_blocked_io_tids.insert(tid);
604 }
605
606 ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_compare_and_write_request(
607 m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
608 mismatch_off, op_flags, trace, tid);
609
610 std::shared_lock owner_locker{m_image_ctx.owner_lock};
611 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
612 queue(req);
613 } else {
614 process_io(req, false);
615 finish_in_flight_io();
616 }
617 trace.event("finish");
618 }
619
620 template <typename I>
621 bool ImageRequestWQ<I>::block_overlapping_io(
622 ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) {
623 CephContext *cct = m_image_ctx.cct;
624 ldout(cct, 20) << "ictx=" << &m_image_ctx
625 << "off: " << off << " len: " << len <<dendl;
626
627 if(len == 0) {
628 return false;
629 }
630
631 if (in_flight_image_extents->empty() ||
632 !in_flight_image_extents->intersects(off, len)) {
633 in_flight_image_extents->insert(off, len);
634 return false;
635 }
636
637 return true;
638 }
639
640 template <typename I>
641 void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length,
642 uint64_t tid) {
643 CephContext *cct = m_image_ctx.cct;
644 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
645
646 remove_in_flight_write_ios(offset, length, true, tid);
647
648 std::unique_lock locker{m_lock};
649 if (!m_blocked_ios.empty()) {
650 auto it = m_blocked_ios.begin();
651 while (it != m_blocked_ios.end()) {
652 auto blocked_io = *it;
653
654 const auto& extents = blocked_io->get_image_extents();
655 uint64_t off = extents.size() ? extents.front().first : 0;
656 uint64_t len = extents.size() ? extents.front().second : 0;
657
658 if (block_overlapping_io(&m_in_flight_extents, off, len)) {
659 break;
660 }
661 ldout(cct, 20) << "unblocking off: " << off << ", "
662 << "len: " << len << dendl;
663 AioCompletion *aio_comp = blocked_io->get_aio_completion();
664
665 m_blocked_ios.erase(it);
666 locker.unlock();
667 queue_unblocked_io(aio_comp, blocked_io);
668 locker.lock();
669 }
670 }
671 }
672
673 template <typename I>
674 void ImageRequestWQ<I>::unblock_flushes() {
675 CephContext *cct = m_image_ctx.cct;
676 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
677 std::unique_lock locker{m_lock};
678 auto io_tid_it = m_queued_or_blocked_io_tids.begin();
679 while (true) {
680 auto it = m_queued_flushes.begin();
681 if (it == m_queued_flushes.end() ||
682 (io_tid_it != m_queued_or_blocked_io_tids.end() &&
683 *io_tid_it < it->first)) {
684 break;
685 }
686
687 auto blocked_flush = *it;
688 ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl;
689
690 AioCompletion *aio_comp = blocked_flush.second->get_aio_completion();
691
692 m_queued_flushes.erase(it);
693 locker.unlock();
694 queue_unblocked_io(aio_comp, blocked_flush.second);
695 locker.lock();
696 }
697 }
698
699 template <typename I>
700 void ImageRequestWQ<I>::queue_unblocked_io(AioCompletion *comp,
701 ImageDispatchSpec<I> *req) {
702 if (!start_in_flight_io(comp)) {
703 return;
704 }
705
706 std::shared_lock owner_locker{m_image_ctx.owner_lock};
707 queue(req);
708 }
709
710 template <typename I>
711 void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
712 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
713
714 {
715 std::unique_lock locker{m_lock};
716 ceph_assert(!m_shutdown);
717 m_shutdown = true;
718
719 CephContext *cct = m_image_ctx.cct;
720 ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load()
721 << dendl;
722 if (m_in_flight_ios > 0) {
723 m_on_shutdown = on_shutdown;
724 return;
725 }
726 }
727
728 // ensure that all in-flight IO is flushed
729 flush_image(m_image_ctx, on_shutdown);
730 }
731
732 template <typename I>
733 int ImageRequestWQ<I>::block_writes() {
734 C_SaferCond cond_ctx;
735 block_writes(&cond_ctx);
736 return cond_ctx.wait();
737 }
738
739 template <typename I>
740 void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
741 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
742 CephContext *cct = m_image_ctx.cct;
743
744 {
745 std::unique_lock locker{m_lock};
746 ++m_write_blockers;
747 ldout(cct, 5) << &m_image_ctx << ", " << "num="
748 << m_write_blockers << dendl;
749 if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) {
750 m_write_blocker_contexts.push_back(on_blocked);
751 return;
752 }
753 }
754
755 // ensure that all in-flight IO is flushed
756 flush_image(m_image_ctx, on_blocked);
757 }
758
759 template <typename I>
760 void ImageRequestWQ<I>::unblock_writes() {
761 CephContext *cct = m_image_ctx.cct;
762
763 bool wake_up = false;
764 Contexts waiter_contexts;
765 {
766 std::unique_lock locker{m_lock};
767 ceph_assert(m_write_blockers > 0);
768 --m_write_blockers;
769
770 ldout(cct, 5) << &m_image_ctx << ", " << "num="
771 << m_write_blockers << dendl;
772 if (m_write_blockers == 0) {
773 wake_up = true;
774 std::swap(waiter_contexts, m_unblocked_write_waiter_contexts);
775 }
776 }
777
778 if (wake_up) {
779 for (auto ctx : waiter_contexts) {
780 ctx->complete(0);
781 }
782 this->signal();
783 }
784 }
785
786 template <typename I>
787 void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) {
788 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
789 CephContext *cct = m_image_ctx.cct;
790
791 {
792 std::unique_lock locker{m_lock};
793 ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers="
794 << m_write_blockers << dendl;
795 if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) {
796 m_unblocked_write_waiter_contexts.push_back(on_unblocked);
797 return;
798 }
799 }
800
801 on_unblocked->complete(0);
802 }
803
804 template <typename I>
805 void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
806 CephContext *cct = m_image_ctx.cct;
807 ldout(cct, 20) << dendl;
808
809 bool wake_up = false;
810 {
811 std::unique_lock locker{m_lock};
812 switch (direction) {
813 case DIRECTION_READ:
814 wake_up = (enabled != m_require_lock_on_read);
815 m_require_lock_on_read = enabled;
816 break;
817 case DIRECTION_WRITE:
818 wake_up = (enabled != m_require_lock_on_write);
819 m_require_lock_on_write = enabled;
820 break;
821 case DIRECTION_BOTH:
822 wake_up = (enabled != m_require_lock_on_read ||
823 enabled != m_require_lock_on_write);
824 m_require_lock_on_read = enabled;
825 m_require_lock_on_write = enabled;
826 break;
827 }
828 }
829
830 // wake up the thread pool whenever the state changes so that
831 // we can re-request the lock if required
832 if (wake_up) {
833 this->signal();
834 }
835 }
836
837 template <typename I>
838 void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){
839 for (auto pair : m_throttles) {
840 pair.second->set_schedule_tick_min(tick);
841 }
842 }
843
844 template <typename I>
845 void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag,
846 uint64_t limit,
847 uint64_t burst) {
848 CephContext *cct = m_image_ctx.cct;
849 TokenBucketThrottle *throttle = nullptr;
850 for (auto pair : m_throttles) {
851 if (flag == pair.first) {
852 throttle = pair.second;
853 break;
854 }
855 }
856 ceph_assert(throttle != nullptr);
857
858 int r = throttle->set_limit(limit, burst);
859 if (r < 0) {
860 lderr(cct) << throttle->get_name() << ": invalid qos parameter: "
861 << "burst(" << burst << ") is less than "
862 << "limit(" << limit << ")" << dendl;
863 // if apply failed, we should at least make sure the limit works.
864 throttle->set_limit(limit, 0);
865 }
866
867 if (limit)
868 m_qos_enabled_flag |= flag;
869 else
870 m_qos_enabled_flag &= ~flag;
871 }
872
873 template <typename I>
874 void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item,
875 uint64_t flag) {
876 CephContext *cct = m_image_ctx.cct;
877 ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl;
878
879 std::lock_guard pool_locker{this->get_pool_lock()};
880 ceph_assert(m_io_throttled.load() > 0);
881 item->set_throttled(flag);
882 if (item->were_all_throttled()) {
883 this->requeue_back(pool_locker, item);
884 --m_io_throttled;
885 this->signal(pool_locker);
886 }
887 }
888
889 template <typename I>
890 bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) {
891 uint64_t tokens = 0;
892 uint64_t flag = 0;
893 bool blocked = false;
894 TokenBucketThrottle* throttle = nullptr;
895
896 for (auto t : m_throttles) {
897 flag = t.first;
898 if (item->was_throttled(flag))
899 continue;
900
901 if (!(m_qos_enabled_flag & flag)) {
902 item->set_throttled(flag);
903 continue;
904 }
905
906 throttle = t.second;
907 if (item->tokens_requested(flag, &tokens) &&
908 throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>,
909 &ImageRequestWQ<I>::handle_throttle_ready>(
910 tokens, this, item, flag)) {
911 blocked = true;
912 } else {
913 item->set_throttled(flag);
914 }
915 }
916 return blocked;
917 }
918
919 template <typename I>
920 void *ImageRequestWQ<I>::_void_dequeue() {
921 CephContext *cct = m_image_ctx.cct;
922 ImageDispatchSpec<I> *peek_item = this->front();
923
924 // no queued IO requests or all IO is blocked/stalled
925 if (peek_item == nullptr || m_io_blockers.load() > 0) {
926 return nullptr;
927 }
928
929 if (needs_throttle(peek_item)) {
930 ldout(cct, 15) << "throttling IO " << peek_item << dendl;
931
932 ++m_io_throttled;
933 // dequeue the throttled item
934 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue();
935 return nullptr;
936 }
937
938 bool lock_required;
939 bool refresh_required = m_image_ctx.state->is_refresh_required();
940 {
941 std::shared_lock locker{m_lock};
942 bool write_op = peek_item->is_write_op();
943 lock_required = is_lock_required(write_op);
944 if (write_op) {
945 if (!lock_required && m_write_blockers > 0) {
946 // missing lock is not the write blocker
947 return nullptr;
948 }
949
950 if (!lock_required && !refresh_required && !peek_item->blocked) {
951 // completed ops will requeue the IO -- don't count it as in-progress
952 m_in_flight_writes++;
953 }
954 }
955 }
956
957 auto item = reinterpret_cast<ImageDispatchSpec<I> *>(
958 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue());
959 ceph_assert(peek_item == item);
960
961 if (lock_required) {
962 this->get_pool_lock().unlock();
963 m_image_ctx.owner_lock.lock_shared();
964 if (m_image_ctx.exclusive_lock != nullptr) {
965 ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
966 if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
967 lderr(cct) << "op requires exclusive lock" << dendl;
968 fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(),
969 item);
970
971 // wake up the IO since we won't be returning a request to process
972 this->signal();
973 } else {
974 // stall IO until the acquire completes
975 ++m_io_blockers;
976 Context *ctx = new C_AcquireLock(this, item);
977 ctx = create_context_callback<
978 Context, &Context::complete>(
979 ctx, m_image_ctx.exclusive_lock);
980 m_image_ctx.exclusive_lock->acquire_lock(ctx);
981 }
982 } else {
983 // raced with the exclusive lock being disabled
984 lock_required = false;
985 }
986 m_image_ctx.owner_lock.unlock_shared();
987 this->get_pool_lock().lock();
988
989 if (lock_required) {
990 return nullptr;
991 }
992 }
993
994 if (refresh_required) {
995 ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
996
997 // stall IO until the refresh completes
998 ++m_io_blockers;
999
1000 this->get_pool_lock().unlock();
1001 m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
1002 this->get_pool_lock().lock();
1003 return nullptr;
1004 }
1005
1006 return item;
1007 }
1008
1009 template <typename I>
1010 void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req,
1011 bool non_blocking_io) {
1012 CephContext *cct = m_image_ctx.cct;
1013 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
1014 << "req=" << req << dendl;
1015
1016 //extents are invalidated after the request is sent
1017 //so gather them ahead of that
1018 const auto& extents = req->get_image_extents();
1019 bool write_op = req->is_write_op();
1020 uint64_t tid = req->get_tid();
1021 uint64_t offset = extents.size() ? extents.front().first : 0;
1022 uint64_t length = extents.size() ? extents.front().second : 0;
1023
1024 if (write_op && !req->blocked) {
1025 std::lock_guard locker{m_lock};
1026 bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length);
1027 if (blocked) {
1028 ldout(cct, 20) << "blocking overlapping IO: " << "ictx="
1029 << &m_image_ctx << ", "
1030 << "off=" << offset << ", len=" << length << dendl;
1031 req->blocked = true;
1032 m_blocked_ios.push_back(req);
1033 return;
1034 }
1035 }
1036
1037 req->start_op();
1038 req->send();
1039
1040 if (write_op) {
1041 if (non_blocking_io) {
1042 finish_in_flight_write();
1043 }
1044 unblock_overlapping_io(offset, length, tid);
1045 unblock_flushes();
1046 }
1047 delete req;
1048 }
1049
1050 template <typename I>
1051 void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
1052 CephContext *cct = m_image_ctx.cct;
1053 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
1054 << "req=" << req << dendl;
1055
1056 bool write_op = req->is_write_op();
1057
1058 process_io(req, true);
1059
1060 finish_queued_io(write_op);
1061 finish_in_flight_io();
1062 }
1063
1064 template <typename I>
1065 void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t length,
1066 bool write_op, uint64_t tid) {
1067 CephContext *cct = m_image_ctx.cct;
1068 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
1069 {
1070 std::lock_guard locker{m_lock};
1071 if (write_op) {
1072 if (length > 0) {
1073 if(!m_in_flight_extents.empty()) {
1074 CephContext *cct = m_image_ctx.cct;
1075 ldout(cct, 20) << "erasing in flight extents with tid:"
1076 << tid << ", offset: " << offset << dendl;
1077 ImageExtentIntervals extents;
1078 extents.insert(offset, length);
1079 ImageExtentIntervals intersect;
1080 intersect.intersection_of(extents, m_in_flight_extents);
1081 m_in_flight_extents.subtract(intersect);
1082 }
1083 }
1084 m_queued_or_blocked_io_tids.erase(tid);
1085 }
1086 }
1087 }
1088
1089 template <typename I>
1090 void ImageRequestWQ<I>::finish_queued_io(bool write_op) {
1091 std::shared_lock locker{m_lock};
1092 if (write_op) {
1093 ceph_assert(m_queued_writes > 0);
1094 m_queued_writes--;
1095 } else {
1096 ceph_assert(m_queued_reads > 0);
1097 m_queued_reads--;
1098 }
1099 }
1100
1101 template <typename I>
1102 void ImageRequestWQ<I>::finish_in_flight_write() {
1103 bool writes_blocked = false;
1104 {
1105 std::shared_lock locker{m_lock};
1106 ceph_assert(m_in_flight_writes > 0);
1107 if (--m_in_flight_writes == 0 &&
1108 !m_write_blocker_contexts.empty()) {
1109 writes_blocked = true;
1110 }
1111 }
1112 if (writes_blocked) {
1113 flush_image(m_image_ctx, new C_BlockedWrites(this));
1114 }
1115 }
1116
1117 template <typename I>
1118 int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
1119 std::shared_lock locker{m_lock};
1120
1121 if (m_shutdown) {
1122 CephContext *cct = m_image_ctx.cct;
1123 lderr(cct) << "IO received on closed image" << dendl;
1124
1125 c->fail(-ESHUTDOWN);
1126 return false;
1127 }
1128
1129 if (!m_image_ctx.data_ctx.is_valid()) {
1130 CephContext *cct = m_image_ctx.cct;
1131 lderr(cct) << "missing data pool" << dendl;
1132
1133 c->fail(-ENODEV);
1134 return false;
1135 }
1136
1137 m_in_flight_ios++;
1138 return true;
1139 }
1140
1141 template <typename I>
1142 void ImageRequestWQ<I>::finish_in_flight_io() {
1143 Context *on_shutdown;
1144 {
1145 std::shared_lock locker{m_lock};
1146 if (--m_in_flight_ios > 0 || !m_shutdown) {
1147 return;
1148 }
1149 on_shutdown = m_on_shutdown;
1150 }
1151
1152 CephContext *cct = m_image_ctx.cct;
1153 ldout(cct, 5) << "completing shut down" << dendl;
1154
1155 ceph_assert(on_shutdown != nullptr);
1156 flush_image(m_image_ctx, on_shutdown);
1157 }
1158
1159 template <typename I>
1160 void ImageRequestWQ<I>::fail_in_flight_io(
1161 int r, ImageDispatchSpec<I> *req) {
1162 this->process_finish();
1163 req->fail(r);
1164
1165 bool write_op = req->is_write_op();
1166 uint64_t tid = req->get_tid();
1167 const auto& extents = req->get_image_extents();
1168 uint64_t offset = extents.size() ? extents.front().first : 0;
1169 uint64_t length = extents.size() ? extents.front().second : 0;
1170
1171 finish_queued_io(write_op);
1172 remove_in_flight_write_ios(offset, length, write_op, tid);
1173 delete req;
1174 finish_in_flight_io();
1175 }
1176
1177 template <typename I>
1178 bool ImageRequestWQ<I>::is_lock_required(bool write_op) const {
1179 ceph_assert(ceph_mutex_is_locked(m_lock));
1180 return ((write_op && m_require_lock_on_write) ||
1181 (!write_op && m_require_lock_on_read));
1182 }
1183
1184 template <typename I>
1185 void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) {
1186 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
1187
1188 CephContext *cct = m_image_ctx.cct;
1189 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
1190 << "req=" << req << dendl;
1191
1192 if (req->is_write_op()) {
1193 m_queued_writes++;
1194 } else {
1195 m_queued_reads++;
1196 }
1197
1198 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req);
1199 }
1200
1201 template <typename I>
1202 void ImageRequestWQ<I>::handle_acquire_lock(
1203 int r, ImageDispatchSpec<I> *req) {
1204 CephContext *cct = m_image_ctx.cct;
1205 ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
1206
1207 if (r < 0) {
1208 fail_in_flight_io(r, req);
1209 } else {
1210 // since IO was stalled for acquire -- original IO order is preserved
1211 // if we requeue this op for work queue processing
1212 this->requeue_front(req);
1213 }
1214
1215 ceph_assert(m_io_blockers.load() > 0);
1216 --m_io_blockers;
1217 this->signal();
1218 }
1219
1220 template <typename I>
1221 void ImageRequestWQ<I>::handle_refreshed(
1222 int r, ImageDispatchSpec<I> *req) {
1223 CephContext *cct = m_image_ctx.cct;
1224 ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
1225 << "req=" << req << dendl;
1226 if (r < 0) {
1227 fail_in_flight_io(r, req);
1228 } else {
1229 // since IO was stalled for refresh -- original IO order is preserved
1230 // if we requeue this op for work queue processing
1231 this->requeue_front(req);
1232 }
1233
1234 ceph_assert(m_io_blockers.load() > 0);
1235 --m_io_blockers;
1236 this->signal();
1237 }
1238
1239 template <typename I>
1240 void ImageRequestWQ<I>::handle_blocked_writes(int r) {
1241 Contexts contexts;
1242 {
1243 std::unique_lock locker{m_lock};
1244 contexts.swap(m_write_blocker_contexts);
1245 }
1246
1247 for (auto ctx : contexts) {
1248 ctx->complete(0);
1249 }
1250 }
1251
1252 template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
1253
1254 } // namespace io
1255 } // namespace librbd