]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/io/ImageRequestWQ.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / librbd / io / ImageRequestWQ.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/io/ImageRequestWQ.h"
5#include "common/errno.h"
31f18b77 6#include "common/zipkin_trace.h"
11fdf7f2 7#include "common/Cond.h"
7c673cae
FG
8#include "librbd/ExclusiveLock.h"
9#include "librbd/ImageCtx.h"
10#include "librbd/ImageState.h"
91327a77 11#include "librbd/ImageWatcher.h"
7c673cae
FG
12#include "librbd/internal.h"
13#include "librbd/Utils.h"
14#include "librbd/exclusive_lock/Policy.h"
15#include "librbd/io/AioCompletion.h"
16#include "librbd/io/ImageRequest.h"
11fdf7f2
TL
17#include "librbd/io/ImageDispatchSpec.h"
18#include "common/EventTrace.h"
7c673cae
FG
19
20#define dout_subsys ceph_subsys_rbd
21#undef dout_prefix
22#define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \
23 << " " << __func__ << ": "
24
25namespace librbd {
26namespace io {
27
11fdf7f2
TL
28namespace {
29
30template <typename I>
31void flush_image(I& image_ctx, Context* on_finish) {
32 auto aio_comp = librbd::io::AioCompletion::create(
33 on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH);
34 auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request(
35 image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {});
36 req->send();
37 delete req;
38}
39
40} // anonymous namespace
41
224ce89b
WB
42template <typename I>
43struct ImageRequestWQ<I>::C_AcquireLock : public Context {
44 ImageRequestWQ *work_queue;
11fdf7f2 45 ImageDispatchSpec<I> *image_request;
224ce89b 46
11fdf7f2 47 C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request)
224ce89b
WB
48 : work_queue(work_queue), image_request(image_request) {
49 }
50
51 void finish(int r) override {
52 work_queue->handle_acquire_lock(r, image_request);
53 }
54};
55
56template <typename I>
57struct ImageRequestWQ<I>::C_BlockedWrites : public Context {
58 ImageRequestWQ *work_queue;
11fdf7f2 59 explicit C_BlockedWrites(ImageRequestWQ *_work_queue)
224ce89b
WB
60 : work_queue(_work_queue) {
61 }
62
63 void finish(int r) override {
64 work_queue->handle_blocked_writes(r);
65 }
66};
67
68template <typename I>
69struct ImageRequestWQ<I>::C_RefreshFinish : public Context {
70 ImageRequestWQ *work_queue;
11fdf7f2 71 ImageDispatchSpec<I> *image_request;
224ce89b
WB
72
73 C_RefreshFinish(ImageRequestWQ *work_queue,
11fdf7f2 74 ImageDispatchSpec<I> *image_request)
224ce89b
WB
75 : work_queue(work_queue), image_request(image_request) {
76 }
77 void finish(int r) override {
78 work_queue->handle_refreshed(r, image_request);
79 }
80};
81
11fdf7f2
TL
82static std::map<uint64_t, std::string> throttle_flags = {
83 { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" },
84 { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" },
85 { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" },
86 { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" },
87 { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" },
88 { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" }
89};
90
224ce89b
WB
91template <typename I>
92ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name,
93 time_t ti, ThreadPool *tp)
11fdf7f2 94 : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp),
7c673cae 95 m_image_ctx(*image_ctx),
224ce89b 96 m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) {
7c673cae
FG
97 CephContext *cct = m_image_ctx.cct;
98 ldout(cct, 5) << "ictx=" << image_ctx << dendl;
11fdf7f2
TL
99
100 SafeTimer *timer;
101 Mutex *timer_lock;
102 ImageCtx::get_timer_instance(cct, &timer, &timer_lock);
103
104 for (auto flag : throttle_flags) {
105 m_throttles.push_back(make_pair(
106 flag.first,
107 new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock)));
108 }
109
224ce89b 110 this->register_work_queue();
7c673cae
FG
111}
112
11fdf7f2
TL
113template <typename I>
114ImageRequestWQ<I>::~ImageRequestWQ() {
115 for (auto t : m_throttles) {
116 delete t.second;
117 }
118}
119
224ce89b
WB
120template <typename I>
121ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len,
122 ReadResult &&read_result, int op_flags) {
7c673cae
FG
123 CephContext *cct = m_image_ctx.cct;
124 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
125 << "len = " << len << dendl;
126
127 C_SaferCond cond;
128 AioCompletion *c = AioCompletion::create(&cond);
129 aio_read(c, off, len, std::move(read_result), op_flags, false);
130 return cond.wait();
131}
132
224ce89b
WB
133template <typename I>
134ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len,
135 bufferlist &&bl, int op_flags) {
7c673cae
FG
136 CephContext *cct = m_image_ctx.cct;
137 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
138 << "len = " << len << dendl;
139
140 m_image_ctx.snap_lock.get_read();
141 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
142 m_image_ctx.snap_lock.put_read();
143 if (r < 0) {
144 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
145 return r;
146 }
147
148 C_SaferCond cond;
149 AioCompletion *c = AioCompletion::create(&cond);
150 aio_write(c, off, len, std::move(bl), op_flags, false);
151
152 r = cond.wait();
153 if (r < 0) {
154 return r;
155 }
156 return len;
157}
158
224ce89b
WB
159template <typename I>
160ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len,
11fdf7f2 161 uint32_t discard_granularity_bytes) {
7c673cae
FG
162 CephContext *cct = m_image_ctx.cct;
163 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
164 << "len = " << len << dendl;
165
166 m_image_ctx.snap_lock.get_read();
167 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
168 m_image_ctx.snap_lock.put_read();
169 if (r < 0) {
170 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
171 return r;
172 }
173
174 C_SaferCond cond;
175 AioCompletion *c = AioCompletion::create(&cond);
11fdf7f2 176 aio_discard(c, off, len, discard_granularity_bytes, false);
7c673cae
FG
177
178 r = cond.wait();
179 if (r < 0) {
180 return r;
181 }
182 return len;
183}
184
224ce89b
WB
185template <typename I>
186ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len,
187 bufferlist &&bl, int op_flags) {
7c673cae
FG
188 CephContext *cct = m_image_ctx.cct;
189 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", "
190 << "len = " << len << ", data_len " << bl.length() << dendl;
191
192 m_image_ctx.snap_lock.get_read();
193 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
194 m_image_ctx.snap_lock.put_read();
195 if (r < 0) {
196 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
197 return r;
198 }
199
200 C_SaferCond cond;
201 AioCompletion *c = AioCompletion::create(&cond);
202 aio_writesame(c, off, len, std::move(bl), op_flags, false);
203
204 r = cond.wait();
205 if (r < 0) {
206 return r;
207 }
208 return len;
209}
210
c07f9fc5
FG
211template <typename I>
212ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len,
213 bufferlist &&cmp_bl,
214 bufferlist &&bl,
215 uint64_t *mismatch_off,
216 int op_flags){
217 CephContext *cct = m_image_ctx.cct;
218 ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off="
219 << off << ", " << "len = " << len << dendl;
220
221 m_image_ctx.snap_lock.get_read();
222 int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len);
223 m_image_ctx.snap_lock.put_read();
224 if (r < 0) {
225 lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl;
226 return r;
227 }
228
229 C_SaferCond cond;
230 AioCompletion *c = AioCompletion::create(&cond);
231 aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl),
232 mismatch_off, op_flags, false);
233
234 r = cond.wait();
235 if (r < 0) {
236 return r;
237 }
238
239 return len;
240}
241
11fdf7f2
TL
242template <typename I>
243int ImageRequestWQ<I>::flush() {
244 CephContext *cct = m_image_ctx.cct;
245 ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl;
246
247 C_SaferCond cond;
248 AioCompletion *c = AioCompletion::create(&cond);
249 aio_flush(c, false);
250
251 int r = cond.wait();
252 if (r < 0) {
253 return r;
254 }
255
256 return 0;
257}
258
224ce89b
WB
259template <typename I>
260void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len,
261 ReadResult &&read_result, int op_flags,
262 bool native_async) {
7c673cae 263 CephContext *cct = m_image_ctx.cct;
11fdf7f2 264 FUNCTRACE(cct);
31f18b77 265 ZTracer::Trace trace;
181888fb 266 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
267 trace.init("wq: read", &m_image_ctx.trace_endpoint);
268 trace.event("start");
269 }
270
224ce89b 271 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ);
7c673cae
FG
272 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
273 << "completion=" << c << ", off=" << off << ", "
274 << "len=" << len << ", " << "flags=" << op_flags << dendl;
275
276 if (native_async && m_image_ctx.event_socket.is_valid()) {
277 c->set_event_notify(true);
278 }
279
224ce89b 280 if (!start_in_flight_io(c)) {
7c673cae
FG
281 return;
282 }
283
7c673cae
FG
284 // if journaling is enabled -- we need to replay the journal because
285 // it might contain an uncommitted write
224ce89b 286 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
7c673cae 287 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
224ce89b 288 require_lock_on_read()) {
11fdf7f2 289 queue(ImageDispatchSpec<I>::create_read_request(
224ce89b
WB
290 m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags,
291 trace));
7c673cae
FG
292 } else {
293 c->start_op();
224ce89b
WB
294 ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}},
295 std::move(read_result), op_flags, trace);
296 finish_in_flight_io();
7c673cae 297 }
31f18b77 298 trace.event("finish");
7c673cae
FG
299}
300
224ce89b
WB
301template <typename I>
302void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len,
303 bufferlist &&bl, int op_flags,
304 bool native_async) {
7c673cae 305 CephContext *cct = m_image_ctx.cct;
11fdf7f2 306 FUNCTRACE(cct);
31f18b77 307 ZTracer::Trace trace;
181888fb 308 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
309 trace.init("wq: write", &m_image_ctx.trace_endpoint);
310 trace.event("init");
311 }
312
224ce89b 313 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE);
7c673cae
FG
314 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
315 << "completion=" << c << ", off=" << off << ", "
316 << "len=" << len << ", flags=" << op_flags << dendl;
317
318 if (native_async && m_image_ctx.event_socket.is_valid()) {
319 c->set_event_notify(true);
320 }
321
224ce89b 322 if (!start_in_flight_io(c)) {
7c673cae
FG
323 return;
324 }
325
326 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
327 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
11fdf7f2 328 queue(ImageDispatchSpec<I>::create_write_request(
224ce89b 329 m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace));
7c673cae
FG
330 } else {
331 c->start_op();
224ce89b
WB
332 ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}},
333 std::move(bl), op_flags, trace);
334 finish_in_flight_io();
7c673cae 335 }
31f18b77 336 trace.event("finish");
7c673cae
FG
337}
338
224ce89b
WB
339template <typename I>
340void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off,
11fdf7f2
TL
341 uint64_t len,
342 uint32_t discard_granularity_bytes,
224ce89b 343 bool native_async) {
7c673cae 344 CephContext *cct = m_image_ctx.cct;
11fdf7f2 345 FUNCTRACE(cct);
31f18b77 346 ZTracer::Trace trace;
181888fb 347 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
348 trace.init("wq: discard", &m_image_ctx.trace_endpoint);
349 trace.event("init");
350 }
351
224ce89b 352 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD);
7c673cae
FG
353 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
354 << "completion=" << c << ", off=" << off << ", len=" << len
355 << dendl;
356
357 if (native_async && m_image_ctx.event_socket.is_valid()) {
358 c->set_event_notify(true);
359 }
360
224ce89b 361 if (!start_in_flight_io(c)) {
7c673cae
FG
362 return;
363 }
364
365 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
366 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
11fdf7f2
TL
367 queue(ImageDispatchSpec<I>::create_discard_request(
368 m_image_ctx, c, off, len, discard_granularity_bytes, trace));
7c673cae
FG
369 } else {
370 c->start_op();
11fdf7f2
TL
371 ImageRequest<I>::aio_discard(&m_image_ctx, c, {{off, len}},
372 discard_granularity_bytes, trace);
224ce89b 373 finish_in_flight_io();
7c673cae 374 }
31f18b77 375 trace.event("finish");
7c673cae
FG
376}
377
224ce89b
WB
378template <typename I>
379void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) {
7c673cae 380 CephContext *cct = m_image_ctx.cct;
11fdf7f2 381 FUNCTRACE(cct);
31f18b77 382 ZTracer::Trace trace;
181888fb 383 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
384 trace.init("wq: flush", &m_image_ctx.trace_endpoint);
385 trace.event("init");
386 }
387
224ce89b 388 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH);
7c673cae
FG
389 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
390 << "completion=" << c << dendl;
391
392 if (native_async && m_image_ctx.event_socket.is_valid()) {
393 c->set_event_notify(true);
394 }
395
224ce89b 396 if (!start_in_flight_io(c)) {
7c673cae
FG
397 return;
398 }
399
400 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
401 if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
11fdf7f2
TL
402 queue(ImageDispatchSpec<I>::create_flush_request(
403 m_image_ctx, c, FLUSH_SOURCE_USER, trace));
7c673cae 404 } else {
11fdf7f2 405 ImageRequest<I>::aio_flush(&m_image_ctx, c, FLUSH_SOURCE_USER, trace);
224ce89b 406 finish_in_flight_io();
7c673cae 407 }
31f18b77 408 trace.event("finish");
7c673cae
FG
409}
410
224ce89b
WB
411template <typename I>
412void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off,
413 uint64_t len, bufferlist &&bl,
414 int op_flags, bool native_async) {
7c673cae 415 CephContext *cct = m_image_ctx.cct;
11fdf7f2 416 FUNCTRACE(cct);
31f18b77 417 ZTracer::Trace trace;
181888fb 418 if (m_image_ctx.blkin_trace_all) {
31f18b77
FG
419 trace.init("wq: writesame", &m_image_ctx.trace_endpoint);
420 trace.event("init");
421 }
422
224ce89b 423 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME);
7c673cae
FG
424 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
425 << "completion=" << c << ", off=" << off << ", "
426 << "len=" << len << ", data_len = " << bl.length() << ", "
427 << "flags=" << op_flags << dendl;
428
429 if (native_async && m_image_ctx.event_socket.is_valid()) {
430 c->set_event_notify(true);
431 }
432
224ce89b 433 if (!start_in_flight_io(c)) {
7c673cae
FG
434 return;
435 }
436
437 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
438 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
11fdf7f2 439 queue(ImageDispatchSpec<I>::create_write_same_request(
224ce89b 440 m_image_ctx, c, off, len, std::move(bl), op_flags, trace));
7c673cae
FG
441 } else {
442 c->start_op();
11fdf7f2 443 ImageRequest<I>::aio_writesame(&m_image_ctx, c, {{off, len}}, std::move(bl),
224ce89b
WB
444 op_flags, trace);
445 finish_in_flight_io();
7c673cae 446 }
31f18b77 447 trace.event("finish");
7c673cae
FG
448}
449
c07f9fc5
FG
450template <typename I>
451void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c,
452 uint64_t off, uint64_t len,
453 bufferlist &&cmp_bl,
454 bufferlist &&bl,
455 uint64_t *mismatch_off,
456 int op_flags, bool native_async) {
457 CephContext *cct = m_image_ctx.cct;
11fdf7f2 458 FUNCTRACE(cct);
c07f9fc5 459 ZTracer::Trace trace;
181888fb 460 if (m_image_ctx.blkin_trace_all) {
c07f9fc5
FG
461 trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint);
462 trace.event("init");
463 }
464
465 c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE);
466 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
467 << "completion=" << c << ", off=" << off << ", "
468 << "len=" << len << dendl;
469
470 if (native_async && m_image_ctx.event_socket.is_valid()) {
471 c->set_event_notify(true);
472 }
473
474 if (!start_in_flight_io(c)) {
475 return;
476 }
477
478 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
479 if (m_image_ctx.non_blocking_aio || writes_blocked()) {
11fdf7f2 480 queue(ImageDispatchSpec<I>::create_compare_and_write_request(
c07f9fc5
FG
481 m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl),
482 mismatch_off, op_flags, trace));
483 } else {
484 c->start_op();
485 ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}},
486 std::move(cmp_bl), std::move(bl),
487 mismatch_off, op_flags, trace);
488 finish_in_flight_io();
489 }
490 trace.event("finish");
491}
492
224ce89b
WB
493template <typename I>
494void ImageRequestWQ<I>::shut_down(Context *on_shutdown) {
11fdf7f2 495 ceph_assert(m_image_ctx.owner_lock.is_locked());
7c673cae
FG
496
497 {
498 RWLock::WLocker locker(m_lock);
11fdf7f2 499 ceph_assert(!m_shutdown);
7c673cae
FG
500 m_shutdown = true;
501
502 CephContext *cct = m_image_ctx.cct;
224ce89b 503 ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load()
7c673cae 504 << dendl;
224ce89b 505 if (m_in_flight_ios > 0) {
7c673cae
FG
506 m_on_shutdown = on_shutdown;
507 return;
508 }
509 }
510
511 // ensure that all in-flight IO is flushed
11fdf7f2 512 flush_image(m_image_ctx, on_shutdown);
7c673cae
FG
513}
514
224ce89b
WB
515template <typename I>
516int ImageRequestWQ<I>::block_writes() {
7c673cae
FG
517 C_SaferCond cond_ctx;
518 block_writes(&cond_ctx);
519 return cond_ctx.wait();
520}
521
224ce89b
WB
522template <typename I>
523void ImageRequestWQ<I>::block_writes(Context *on_blocked) {
11fdf7f2 524 ceph_assert(m_image_ctx.owner_lock.is_locked());
7c673cae
FG
525 CephContext *cct = m_image_ctx.cct;
526
527 {
528 RWLock::WLocker locker(m_lock);
529 ++m_write_blockers;
530 ldout(cct, 5) << &m_image_ctx << ", " << "num="
531 << m_write_blockers << dendl;
224ce89b 532 if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) {
7c673cae
FG
533 m_write_blocker_contexts.push_back(on_blocked);
534 return;
535 }
536 }
537
538 // ensure that all in-flight IO is flushed
11fdf7f2 539 flush_image(m_image_ctx, on_blocked);
7c673cae
FG
540}
541
224ce89b
WB
542template <typename I>
543void ImageRequestWQ<I>::unblock_writes() {
7c673cae
FG
544 CephContext *cct = m_image_ctx.cct;
545
546 bool wake_up = false;
11fdf7f2 547 Contexts waiter_contexts;
7c673cae
FG
548 {
549 RWLock::WLocker locker(m_lock);
11fdf7f2 550 ceph_assert(m_write_blockers > 0);
7c673cae
FG
551 --m_write_blockers;
552
553 ldout(cct, 5) << &m_image_ctx << ", " << "num="
554 << m_write_blockers << dendl;
555 if (m_write_blockers == 0) {
556 wake_up = true;
11fdf7f2 557 std::swap(waiter_contexts, m_unblocked_write_waiter_contexts);
7c673cae
FG
558 }
559 }
560
561 if (wake_up) {
11fdf7f2
TL
562 for (auto ctx : waiter_contexts) {
563 ctx->complete(0);
564 }
224ce89b 565 this->signal();
7c673cae
FG
566 }
567}
568
11fdf7f2
TL
569template <typename I>
570void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) {
571 ceph_assert(m_image_ctx.owner_lock.is_locked());
572 CephContext *cct = m_image_ctx.cct;
573
574 {
575 RWLock::WLocker locker(m_lock);
576 ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers="
577 << m_write_blockers << dendl;
578 if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) {
579 m_unblocked_write_waiter_contexts.push_back(on_unblocked);
580 return;
581 }
582 }
583
584 on_unblocked->complete(0);
585}
586
224ce89b
WB
587template <typename I>
588void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) {
7c673cae
FG
589 CephContext *cct = m_image_ctx.cct;
590 ldout(cct, 20) << dendl;
591
224ce89b 592 bool wake_up = false;
7c673cae
FG
593 {
594 RWLock::WLocker locker(m_lock);
224ce89b
WB
595 switch (direction) {
596 case DIRECTION_READ:
597 wake_up = (enabled != m_require_lock_on_read);
598 m_require_lock_on_read = enabled;
599 break;
600 case DIRECTION_WRITE:
601 wake_up = (enabled != m_require_lock_on_write);
602 m_require_lock_on_write = enabled;
603 break;
604 case DIRECTION_BOTH:
605 wake_up = (enabled != m_require_lock_on_read ||
606 enabled != m_require_lock_on_write);
607 m_require_lock_on_read = enabled;
608 m_require_lock_on_write = enabled;
609 break;
7c673cae 610 }
224ce89b 611 }
7c673cae 612
224ce89b
WB
613 // wake up the thread pool whenever the state changes so that
614 // we can re-request the lock if required
615 if (wake_up) {
616 this->signal();
7c673cae 617 }
7c673cae
FG
618}
619
11fdf7f2
TL
620template <typename I>
621void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){
622 for (auto pair : m_throttles) {
623 pair.second->set_schedule_tick_min(tick);
624 }
625}
626
627template <typename I>
628void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag,
629 uint64_t limit,
630 uint64_t burst) {
631 CephContext *cct = m_image_ctx.cct;
632 TokenBucketThrottle *throttle = nullptr;
633 for (auto pair : m_throttles) {
634 if (flag == pair.first) {
635 throttle = pair.second;
636 break;
637 }
638 }
639 ceph_assert(throttle != nullptr);
640
641 int r = throttle->set_limit(limit, burst);
642 if (r < 0) {
643 lderr(cct) << throttle->get_name() << ": invalid qos parameter: "
644 << "burst(" << burst << ") is less than "
645 << "limit(" << limit << ")" << dendl;
646 // if apply failed, we should at least make sure the limit works.
647 throttle->set_limit(limit, 0);
648 }
649
650 if (limit)
651 m_qos_enabled_flag |= flag;
652 else
653 m_qos_enabled_flag &= ~flag;
654}
655
656template <typename I>
657void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item, uint64_t flag) {
658 CephContext *cct = m_image_ctx.cct;
659 ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl;
660
661 ceph_assert(m_io_throttled.load() > 0);
662 item->set_throttled(flag);
663 if (item->were_all_throttled()) {
81eedcae 664 this->requeue_back(item);
11fdf7f2
TL
665 --m_io_throttled;
666 this->signal();
667 }
668}
669
670template <typename I>
671bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) {
672 uint64_t tokens = 0;
673 uint64_t flag = 0;
674 bool blocked = false;
675 TokenBucketThrottle* throttle = nullptr;
676
677 for (auto t : m_throttles) {
678 flag = t.first;
679 if (item->was_throttled(flag))
680 continue;
681
682 if (!(m_qos_enabled_flag & flag)) {
683 item->set_throttled(flag);
684 continue;
685 }
686
687 throttle = t.second;
81eedcae
TL
688 if (item->tokens_requested(flag, &tokens) &&
689 throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>,
11fdf7f2
TL
690 &ImageRequestWQ<I>::handle_throttle_ready>(
691 tokens, this, item, flag)) {
692 blocked = true;
693 } else {
694 item->set_throttled(flag);
695 }
696 }
697 return blocked;
698}
699
224ce89b
WB
700template <typename I>
701void *ImageRequestWQ<I>::_void_dequeue() {
702 CephContext *cct = m_image_ctx.cct;
11fdf7f2 703 ImageDispatchSpec<I> *peek_item = this->front();
7c673cae 704
224ce89b
WB
705 // no queued IO requests or all IO is blocked/stalled
706 if (peek_item == nullptr || m_io_blockers.load() > 0) {
7c673cae
FG
707 return nullptr;
708 }
709
11fdf7f2
TL
710 if (needs_throttle(peek_item)) {
711 ldout(cct, 15) << "throttling IO " << peek_item << dendl;
712
713 ++m_io_throttled;
714 // dequeue the throttled item
715 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue();
716 return nullptr;
717 }
718
224ce89b 719 bool lock_required;
7c673cae
FG
720 bool refresh_required = m_image_ctx.state->is_refresh_required();
721 {
722 RWLock::RLocker locker(m_lock);
224ce89b
WB
723 bool write_op = peek_item->is_write_op();
724 lock_required = is_lock_required(write_op);
725 if (write_op) {
726 if (!lock_required && m_write_blockers > 0) {
727 // missing lock is not the write blocker
7c673cae
FG
728 return nullptr;
729 }
730
224ce89b
WB
731 if (!lock_required && !refresh_required) {
732 // completed ops will requeue the IO -- don't count it as in-progress
733 m_in_flight_writes++;
7c673cae 734 }
7c673cae
FG
735 }
736 }
737
11fdf7f2
TL
738 auto item = reinterpret_cast<ImageDispatchSpec<I> *>(
739 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue());
740 ceph_assert(peek_item == item);
7c673cae 741
224ce89b 742 if (lock_required) {
11fdf7f2 743 this->get_pool_lock().unlock();
224ce89b
WB
744 m_image_ctx.owner_lock.get_read();
745 if (m_image_ctx.exclusive_lock != nullptr) {
746 ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl;
747 if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) {
748 lderr(cct) << "op requires exclusive lock" << dendl;
91327a77
AA
749 fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(),
750 item);
224ce89b
WB
751
752 // wake up the IO since we won't be returning a request to process
753 this->signal();
754 } else {
755 // stall IO until the acquire completes
756 ++m_io_blockers;
757 m_image_ctx.exclusive_lock->acquire_lock(new C_AcquireLock(this, item));
758 }
759 } else {
760 // raced with the exclusive lock being disabled
761 lock_required = false;
762 }
763 m_image_ctx.owner_lock.put_read();
11fdf7f2 764 this->get_pool_lock().lock();
224ce89b
WB
765
766 if (lock_required) {
767 return nullptr;
768 }
769 }
770
7c673cae 771 if (refresh_required) {
224ce89b 772 ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl;
7c673cae
FG
773
774 // stall IO until the refresh completes
224ce89b 775 ++m_io_blockers;
7c673cae 776
11fdf7f2 777 this->get_pool_lock().unlock();
7c673cae 778 m_image_ctx.state->refresh(new C_RefreshFinish(this, item));
11fdf7f2 779 this->get_pool_lock().lock();
7c673cae
FG
780 return nullptr;
781 }
782
783 item->start_op();
784 return item;
785}
786
224ce89b 787template <typename I>
11fdf7f2 788void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) {
7c673cae
FG
789 CephContext *cct = m_image_ctx.cct;
790 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
791 << "req=" << req << dendl;
792
793 req->send();
794
224ce89b 795 finish_queued_io(req);
7c673cae 796 if (req->is_write_op()) {
224ce89b 797 finish_in_flight_write();
7c673cae
FG
798 }
799 delete req;
800
224ce89b 801 finish_in_flight_io();
7c673cae
FG
802}
803
224ce89b 804template <typename I>
11fdf7f2 805void ImageRequestWQ<I>::finish_queued_io(ImageDispatchSpec<I> *req) {
7c673cae
FG
806 RWLock::RLocker locker(m_lock);
807 if (req->is_write_op()) {
11fdf7f2 808 ceph_assert(m_queued_writes > 0);
7c673cae
FG
809 m_queued_writes--;
810 } else {
11fdf7f2 811 ceph_assert(m_queued_reads > 0);
7c673cae
FG
812 m_queued_reads--;
813 }
814}
815
224ce89b
WB
816template <typename I>
817void ImageRequestWQ<I>::finish_in_flight_write() {
7c673cae
FG
818 bool writes_blocked = false;
819 {
820 RWLock::RLocker locker(m_lock);
11fdf7f2 821 ceph_assert(m_in_flight_writes > 0);
224ce89b 822 if (--m_in_flight_writes == 0 &&
7c673cae
FG
823 !m_write_blocker_contexts.empty()) {
824 writes_blocked = true;
825 }
826 }
827
828 if (writes_blocked) {
11fdf7f2 829 flush_image(m_image_ctx, new C_BlockedWrites(this));
7c673cae
FG
830 }
831}
832
224ce89b
WB
833template <typename I>
834int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) {
7c673cae
FG
835 RWLock::RLocker locker(m_lock);
836
837 if (m_shutdown) {
838 CephContext *cct = m_image_ctx.cct;
839 lderr(cct) << "IO received on closed image" << dendl;
840
224ce89b 841 c->get();
7c673cae
FG
842 c->fail(-ESHUTDOWN);
843 return false;
844 }
845
224ce89b 846 m_in_flight_ios++;
7c673cae
FG
847 return true;
848}
849
224ce89b
WB
850template <typename I>
851void ImageRequestWQ<I>::finish_in_flight_io() {
7c673cae
FG
852 Context *on_shutdown;
853 {
854 RWLock::RLocker locker(m_lock);
224ce89b 855 if (--m_in_flight_ios > 0 || !m_shutdown) {
7c673cae
FG
856 return;
857 }
858 on_shutdown = m_on_shutdown;
859 }
860
861 CephContext *cct = m_image_ctx.cct;
862 ldout(cct, 5) << "completing shut down" << dendl;
863
11fdf7f2
TL
864 ceph_assert(on_shutdown != nullptr);
865 flush_image(m_image_ctx, on_shutdown);
7c673cae
FG
866}
867
224ce89b 868template <typename I>
11fdf7f2
TL
869void ImageRequestWQ<I>::fail_in_flight_io(
870 int r, ImageDispatchSpec<I> *req) {
224ce89b
WB
871 this->process_finish();
872 req->fail(r);
873 finish_queued_io(req);
874 delete req;
875 finish_in_flight_io();
876}
7c673cae 877
224ce89b
WB
878template <typename I>
879bool ImageRequestWQ<I>::is_lock_required(bool write_op) const {
11fdf7f2 880 ceph_assert(m_lock.is_locked());
224ce89b
WB
881 return ((write_op && m_require_lock_on_write) ||
882 (!write_op && m_require_lock_on_read));
7c673cae
FG
883}
884
224ce89b 885template <typename I>
11fdf7f2
TL
886void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) {
887 ceph_assert(m_image_ctx.owner_lock.is_locked());
224ce89b 888
7c673cae
FG
889 CephContext *cct = m_image_ctx.cct;
890 ldout(cct, 20) << "ictx=" << &m_image_ctx << ", "
891 << "req=" << req << dendl;
892
224ce89b 893 if (req->is_write_op()) {
7c673cae
FG
894 m_queued_writes++;
895 } else {
896 m_queued_reads++;
897 }
898
11fdf7f2 899 ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req);
224ce89b 900}
7c673cae 901
224ce89b 902template <typename I>
11fdf7f2
TL
903void ImageRequestWQ<I>::handle_acquire_lock(
904 int r, ImageDispatchSpec<I> *req) {
224ce89b
WB
905 CephContext *cct = m_image_ctx.cct;
906 ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl;
907
908 if (r < 0) {
909 fail_in_flight_io(r, req);
910 } else {
911 // since IO was stalled for acquire -- original IO order is preserved
912 // if we requeue this op for work queue processing
81eedcae 913 this->requeue_front(req);
7c673cae 914 }
224ce89b 915
11fdf7f2 916 ceph_assert(m_io_blockers.load() > 0);
224ce89b
WB
917 --m_io_blockers;
918 this->signal();
7c673cae
FG
919}
920
224ce89b 921template <typename I>
11fdf7f2
TL
922void ImageRequestWQ<I>::handle_refreshed(
923 int r, ImageDispatchSpec<I> *req) {
7c673cae 924 CephContext *cct = m_image_ctx.cct;
224ce89b
WB
925 ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", "
926 << "req=" << req << dendl;
7c673cae 927 if (r < 0) {
224ce89b 928 fail_in_flight_io(r, req);
7c673cae
FG
929 } else {
930 // since IO was stalled for refresh -- original IO order is preserved
931 // if we requeue this op for work queue processing
81eedcae 932 this->requeue_front(req);
7c673cae
FG
933 }
934
11fdf7f2 935 ceph_assert(m_io_blockers.load() > 0);
224ce89b
WB
936 --m_io_blockers;
937 this->signal();
7c673cae
FG
938}
939
224ce89b
WB
940template <typename I>
941void ImageRequestWQ<I>::handle_blocked_writes(int r) {
7c673cae
FG
942 Contexts contexts;
943 {
944 RWLock::WLocker locker(m_lock);
945 contexts.swap(m_write_blocker_contexts);
946 }
947
948 for (auto ctx : contexts) {
949 ctx->complete(0);
950 }
951}
952
224ce89b
WB
953template class librbd::io::ImageRequestWQ<librbd::ImageCtx>;
954
7c673cae
FG
955} // namespace io
956} // namespace librbd