]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/io/ImageRequestWQ.h" | |
5 | #include "common/errno.h" | |
31f18b77 | 6 | #include "common/zipkin_trace.h" |
11fdf7f2 | 7 | #include "common/Cond.h" |
7c673cae FG |
8 | #include "librbd/ExclusiveLock.h" |
9 | #include "librbd/ImageCtx.h" | |
10 | #include "librbd/ImageState.h" | |
91327a77 | 11 | #include "librbd/ImageWatcher.h" |
7c673cae FG |
12 | #include "librbd/internal.h" |
13 | #include "librbd/Utils.h" | |
14 | #include "librbd/exclusive_lock/Policy.h" | |
15 | #include "librbd/io/AioCompletion.h" | |
16 | #include "librbd/io/ImageRequest.h" | |
11fdf7f2 TL |
17 | #include "librbd/io/ImageDispatchSpec.h" |
18 | #include "common/EventTrace.h" | |
7c673cae FG |
19 | |
20 | #define dout_subsys ceph_subsys_rbd | |
21 | #undef dout_prefix | |
22 | #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \ | |
23 | << " " << __func__ << ": " | |
24 | ||
25 | namespace librbd { | |
26 | namespace io { | |
27 | ||
11fdf7f2 TL |
28 | namespace { |
29 | ||
30 | template <typename I> | |
31 | void flush_image(I& image_ctx, Context* on_finish) { | |
32 | auto aio_comp = librbd::io::AioCompletion::create( | |
33 | on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH); | |
34 | auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request( | |
35 | image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {}); | |
36 | req->send(); | |
37 | delete req; | |
38 | } | |
39 | ||
40 | } // anonymous namespace | |
41 | ||
224ce89b WB |
42 | template <typename I> |
43 | struct ImageRequestWQ<I>::C_AcquireLock : public Context { | |
44 | ImageRequestWQ *work_queue; | |
11fdf7f2 | 45 | ImageDispatchSpec<I> *image_request; |
224ce89b | 46 | |
11fdf7f2 | 47 | C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request) |
224ce89b WB |
48 | : work_queue(work_queue), image_request(image_request) { |
49 | } | |
50 | ||
51 | void finish(int r) override { | |
52 | work_queue->handle_acquire_lock(r, image_request); | |
53 | } | |
54 | }; | |
55 | ||
56 | template <typename I> | |
57 | struct ImageRequestWQ<I>::C_BlockedWrites : public Context { | |
58 | ImageRequestWQ *work_queue; | |
11fdf7f2 | 59 | explicit C_BlockedWrites(ImageRequestWQ *_work_queue) |
224ce89b WB |
60 | : work_queue(_work_queue) { |
61 | } | |
62 | ||
63 | void finish(int r) override { | |
64 | work_queue->handle_blocked_writes(r); | |
65 | } | |
66 | }; | |
67 | ||
68 | template <typename I> | |
69 | struct ImageRequestWQ<I>::C_RefreshFinish : public Context { | |
70 | ImageRequestWQ *work_queue; | |
11fdf7f2 | 71 | ImageDispatchSpec<I> *image_request; |
224ce89b WB |
72 | |
73 | C_RefreshFinish(ImageRequestWQ *work_queue, | |
11fdf7f2 | 74 | ImageDispatchSpec<I> *image_request) |
224ce89b WB |
75 | : work_queue(work_queue), image_request(image_request) { |
76 | } | |
77 | void finish(int r) override { | |
78 | work_queue->handle_refreshed(r, image_request); | |
79 | } | |
80 | }; | |
81 | ||
11fdf7f2 TL |
82 | static std::map<uint64_t, std::string> throttle_flags = { |
83 | { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" }, | |
84 | { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" }, | |
85 | { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" }, | |
86 | { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" }, | |
87 | { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" }, | |
88 | { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" } | |
89 | }; | |
90 | ||
224ce89b WB |
91 | template <typename I> |
92 | ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name, | |
93 | time_t ti, ThreadPool *tp) | |
11fdf7f2 | 94 | : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp), |
7c673cae | 95 | m_image_ctx(*image_ctx), |
224ce89b | 96 | m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) { |
7c673cae FG |
97 | CephContext *cct = m_image_ctx.cct; |
98 | ldout(cct, 5) << "ictx=" << image_ctx << dendl; | |
11fdf7f2 TL |
99 | |
100 | SafeTimer *timer; | |
101 | Mutex *timer_lock; | |
102 | ImageCtx::get_timer_instance(cct, &timer, &timer_lock); | |
103 | ||
104 | for (auto flag : throttle_flags) { | |
105 | m_throttles.push_back(make_pair( | |
106 | flag.first, | |
107 | new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock))); | |
108 | } | |
109 | ||
224ce89b | 110 | this->register_work_queue(); |
7c673cae FG |
111 | } |
112 | ||
11fdf7f2 TL |
113 | template <typename I> |
114 | ImageRequestWQ<I>::~ImageRequestWQ() { | |
115 | for (auto t : m_throttles) { | |
116 | delete t.second; | |
117 | } | |
118 | } | |
119 | ||
224ce89b WB |
120 | template <typename I> |
121 | ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len, | |
122 | ReadResult &&read_result, int op_flags) { | |
7c673cae FG |
123 | CephContext *cct = m_image_ctx.cct; |
124 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
125 | << "len = " << len << dendl; | |
126 | ||
127 | C_SaferCond cond; | |
128 | AioCompletion *c = AioCompletion::create(&cond); | |
129 | aio_read(c, off, len, std::move(read_result), op_flags, false); | |
130 | return cond.wait(); | |
131 | } | |
132 | ||
224ce89b WB |
133 | template <typename I> |
134 | ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len, | |
135 | bufferlist &&bl, int op_flags) { | |
7c673cae FG |
136 | CephContext *cct = m_image_ctx.cct; |
137 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
138 | << "len = " << len << dendl; | |
139 | ||
140 | m_image_ctx.snap_lock.get_read(); | |
141 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
142 | m_image_ctx.snap_lock.put_read(); | |
143 | if (r < 0) { | |
144 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
145 | return r; | |
146 | } | |
147 | ||
148 | C_SaferCond cond; | |
149 | AioCompletion *c = AioCompletion::create(&cond); | |
150 | aio_write(c, off, len, std::move(bl), op_flags, false); | |
151 | ||
152 | r = cond.wait(); | |
153 | if (r < 0) { | |
154 | return r; | |
155 | } | |
156 | return len; | |
157 | } | |
158 | ||
224ce89b WB |
159 | template <typename I> |
160 | ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len, | |
11fdf7f2 | 161 | uint32_t discard_granularity_bytes) { |
7c673cae FG |
162 | CephContext *cct = m_image_ctx.cct; |
163 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
164 | << "len = " << len << dendl; | |
165 | ||
166 | m_image_ctx.snap_lock.get_read(); | |
167 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
168 | m_image_ctx.snap_lock.put_read(); | |
169 | if (r < 0) { | |
170 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
171 | return r; | |
172 | } | |
173 | ||
174 | C_SaferCond cond; | |
175 | AioCompletion *c = AioCompletion::create(&cond); | |
11fdf7f2 | 176 | aio_discard(c, off, len, discard_granularity_bytes, false); |
7c673cae FG |
177 | |
178 | r = cond.wait(); | |
179 | if (r < 0) { | |
180 | return r; | |
181 | } | |
182 | return len; | |
183 | } | |
184 | ||
224ce89b WB |
185 | template <typename I> |
186 | ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len, | |
187 | bufferlist &&bl, int op_flags) { | |
7c673cae FG |
188 | CephContext *cct = m_image_ctx.cct; |
189 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
190 | << "len = " << len << ", data_len " << bl.length() << dendl; | |
191 | ||
192 | m_image_ctx.snap_lock.get_read(); | |
193 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
194 | m_image_ctx.snap_lock.put_read(); | |
195 | if (r < 0) { | |
196 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
197 | return r; | |
198 | } | |
199 | ||
200 | C_SaferCond cond; | |
201 | AioCompletion *c = AioCompletion::create(&cond); | |
202 | aio_writesame(c, off, len, std::move(bl), op_flags, false); | |
203 | ||
204 | r = cond.wait(); | |
205 | if (r < 0) { | |
206 | return r; | |
207 | } | |
208 | return len; | |
209 | } | |
210 | ||
c07f9fc5 FG |
211 | template <typename I> |
212 | ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len, | |
213 | bufferlist &&cmp_bl, | |
214 | bufferlist &&bl, | |
215 | uint64_t *mismatch_off, | |
216 | int op_flags){ | |
217 | CephContext *cct = m_image_ctx.cct; | |
218 | ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off=" | |
219 | << off << ", " << "len = " << len << dendl; | |
220 | ||
221 | m_image_ctx.snap_lock.get_read(); | |
222 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
223 | m_image_ctx.snap_lock.put_read(); | |
224 | if (r < 0) { | |
225 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
226 | return r; | |
227 | } | |
228 | ||
229 | C_SaferCond cond; | |
230 | AioCompletion *c = AioCompletion::create(&cond); | |
231 | aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl), | |
232 | mismatch_off, op_flags, false); | |
233 | ||
234 | r = cond.wait(); | |
235 | if (r < 0) { | |
236 | return r; | |
237 | } | |
238 | ||
239 | return len; | |
240 | } | |
241 | ||
11fdf7f2 TL |
242 | template <typename I> |
243 | int ImageRequestWQ<I>::flush() { | |
244 | CephContext *cct = m_image_ctx.cct; | |
245 | ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; | |
246 | ||
247 | C_SaferCond cond; | |
248 | AioCompletion *c = AioCompletion::create(&cond); | |
249 | aio_flush(c, false); | |
250 | ||
251 | int r = cond.wait(); | |
252 | if (r < 0) { | |
253 | return r; | |
254 | } | |
255 | ||
256 | return 0; | |
257 | } | |
258 | ||
224ce89b WB |
259 | template <typename I> |
260 | void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len, | |
261 | ReadResult &&read_result, int op_flags, | |
262 | bool native_async) { | |
7c673cae | 263 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 264 | FUNCTRACE(cct); |
31f18b77 | 265 | ZTracer::Trace trace; |
181888fb | 266 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
267 | trace.init("wq: read", &m_image_ctx.trace_endpoint); |
268 | trace.event("start"); | |
269 | } | |
270 | ||
224ce89b | 271 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ); |
7c673cae FG |
272 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
273 | << "completion=" << c << ", off=" << off << ", " | |
274 | << "len=" << len << ", " << "flags=" << op_flags << dendl; | |
275 | ||
276 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
277 | c->set_event_notify(true); | |
278 | } | |
279 | ||
224ce89b | 280 | if (!start_in_flight_io(c)) { |
7c673cae FG |
281 | return; |
282 | } | |
283 | ||
7c673cae FG |
284 | // if journaling is enabled -- we need to replay the journal because |
285 | // it might contain an uncommitted write | |
224ce89b | 286 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); |
7c673cae | 287 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() || |
224ce89b | 288 | require_lock_on_read()) { |
11fdf7f2 | 289 | queue(ImageDispatchSpec<I>::create_read_request( |
224ce89b WB |
290 | m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags, |
291 | trace)); | |
7c673cae FG |
292 | } else { |
293 | c->start_op(); | |
224ce89b WB |
294 | ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}}, |
295 | std::move(read_result), op_flags, trace); | |
296 | finish_in_flight_io(); | |
7c673cae | 297 | } |
31f18b77 | 298 | trace.event("finish"); |
7c673cae FG |
299 | } |
300 | ||
224ce89b WB |
301 | template <typename I> |
302 | void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len, | |
303 | bufferlist &&bl, int op_flags, | |
304 | bool native_async) { | |
7c673cae | 305 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 306 | FUNCTRACE(cct); |
31f18b77 | 307 | ZTracer::Trace trace; |
181888fb | 308 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
309 | trace.init("wq: write", &m_image_ctx.trace_endpoint); |
310 | trace.event("init"); | |
311 | } | |
312 | ||
224ce89b | 313 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE); |
7c673cae FG |
314 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
315 | << "completion=" << c << ", off=" << off << ", " | |
316 | << "len=" << len << ", flags=" << op_flags << dendl; | |
317 | ||
318 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
319 | c->set_event_notify(true); | |
320 | } | |
321 | ||
224ce89b | 322 | if (!start_in_flight_io(c)) { |
7c673cae FG |
323 | return; |
324 | } | |
325 | ||
326 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
327 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
11fdf7f2 | 328 | queue(ImageDispatchSpec<I>::create_write_request( |
224ce89b | 329 | m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace)); |
7c673cae FG |
330 | } else { |
331 | c->start_op(); | |
224ce89b WB |
332 | ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}}, |
333 | std::move(bl), op_flags, trace); | |
334 | finish_in_flight_io(); | |
7c673cae | 335 | } |
31f18b77 | 336 | trace.event("finish"); |
7c673cae FG |
337 | } |
338 | ||
224ce89b WB |
339 | template <typename I> |
340 | void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off, | |
11fdf7f2 TL |
341 | uint64_t len, |
342 | uint32_t discard_granularity_bytes, | |
224ce89b | 343 | bool native_async) { |
7c673cae | 344 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 345 | FUNCTRACE(cct); |
31f18b77 | 346 | ZTracer::Trace trace; |
181888fb | 347 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
348 | trace.init("wq: discard", &m_image_ctx.trace_endpoint); |
349 | trace.event("init"); | |
350 | } | |
351 | ||
224ce89b | 352 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD); |
7c673cae FG |
353 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
354 | << "completion=" << c << ", off=" << off << ", len=" << len | |
355 | << dendl; | |
356 | ||
357 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
358 | c->set_event_notify(true); | |
359 | } | |
360 | ||
224ce89b | 361 | if (!start_in_flight_io(c)) { |
7c673cae FG |
362 | return; |
363 | } | |
364 | ||
365 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
366 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
11fdf7f2 TL |
367 | queue(ImageDispatchSpec<I>::create_discard_request( |
368 | m_image_ctx, c, off, len, discard_granularity_bytes, trace)); | |
7c673cae FG |
369 | } else { |
370 | c->start_op(); | |
11fdf7f2 TL |
371 | ImageRequest<I>::aio_discard(&m_image_ctx, c, {{off, len}}, |
372 | discard_granularity_bytes, trace); | |
224ce89b | 373 | finish_in_flight_io(); |
7c673cae | 374 | } |
31f18b77 | 375 | trace.event("finish"); |
7c673cae FG |
376 | } |
377 | ||
224ce89b WB |
378 | template <typename I> |
379 | void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) { | |
7c673cae | 380 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 381 | FUNCTRACE(cct); |
31f18b77 | 382 | ZTracer::Trace trace; |
181888fb | 383 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
384 | trace.init("wq: flush", &m_image_ctx.trace_endpoint); |
385 | trace.event("init"); | |
386 | } | |
387 | ||
224ce89b | 388 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH); |
7c673cae FG |
389 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
390 | << "completion=" << c << dendl; | |
391 | ||
392 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
393 | c->set_event_notify(true); | |
394 | } | |
395 | ||
224ce89b | 396 | if (!start_in_flight_io(c)) { |
7c673cae FG |
397 | return; |
398 | } | |
399 | ||
400 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
401 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { | |
11fdf7f2 TL |
402 | queue(ImageDispatchSpec<I>::create_flush_request( |
403 | m_image_ctx, c, FLUSH_SOURCE_USER, trace)); | |
7c673cae | 404 | } else { |
11fdf7f2 | 405 | ImageRequest<I>::aio_flush(&m_image_ctx, c, FLUSH_SOURCE_USER, trace); |
224ce89b | 406 | finish_in_flight_io(); |
7c673cae | 407 | } |
31f18b77 | 408 | trace.event("finish"); |
7c673cae FG |
409 | } |
410 | ||
224ce89b WB |
411 | template <typename I> |
412 | void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off, | |
413 | uint64_t len, bufferlist &&bl, | |
414 | int op_flags, bool native_async) { | |
7c673cae | 415 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 416 | FUNCTRACE(cct); |
31f18b77 | 417 | ZTracer::Trace trace; |
181888fb | 418 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
419 | trace.init("wq: writesame", &m_image_ctx.trace_endpoint); |
420 | trace.event("init"); | |
421 | } | |
422 | ||
224ce89b | 423 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME); |
7c673cae FG |
424 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
425 | << "completion=" << c << ", off=" << off << ", " | |
426 | << "len=" << len << ", data_len = " << bl.length() << ", " | |
427 | << "flags=" << op_flags << dendl; | |
428 | ||
429 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
430 | c->set_event_notify(true); | |
431 | } | |
432 | ||
224ce89b | 433 | if (!start_in_flight_io(c)) { |
7c673cae FG |
434 | return; |
435 | } | |
436 | ||
437 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
438 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
11fdf7f2 | 439 | queue(ImageDispatchSpec<I>::create_write_same_request( |
224ce89b | 440 | m_image_ctx, c, off, len, std::move(bl), op_flags, trace)); |
7c673cae FG |
441 | } else { |
442 | c->start_op(); | |
11fdf7f2 | 443 | ImageRequest<I>::aio_writesame(&m_image_ctx, c, {{off, len}}, std::move(bl), |
224ce89b WB |
444 | op_flags, trace); |
445 | finish_in_flight_io(); | |
7c673cae | 446 | } |
31f18b77 | 447 | trace.event("finish"); |
7c673cae FG |
448 | } |
449 | ||
c07f9fc5 FG |
450 | template <typename I> |
451 | void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c, | |
452 | uint64_t off, uint64_t len, | |
453 | bufferlist &&cmp_bl, | |
454 | bufferlist &&bl, | |
455 | uint64_t *mismatch_off, | |
456 | int op_flags, bool native_async) { | |
457 | CephContext *cct = m_image_ctx.cct; | |
11fdf7f2 | 458 | FUNCTRACE(cct); |
c07f9fc5 | 459 | ZTracer::Trace trace; |
181888fb | 460 | if (m_image_ctx.blkin_trace_all) { |
c07f9fc5 FG |
461 | trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint); |
462 | trace.event("init"); | |
463 | } | |
464 | ||
465 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE); | |
466 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
467 | << "completion=" << c << ", off=" << off << ", " | |
468 | << "len=" << len << dendl; | |
469 | ||
470 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
471 | c->set_event_notify(true); | |
472 | } | |
473 | ||
474 | if (!start_in_flight_io(c)) { | |
475 | return; | |
476 | } | |
477 | ||
478 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
479 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
11fdf7f2 | 480 | queue(ImageDispatchSpec<I>::create_compare_and_write_request( |
c07f9fc5 FG |
481 | m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl), |
482 | mismatch_off, op_flags, trace)); | |
483 | } else { | |
484 | c->start_op(); | |
485 | ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}}, | |
486 | std::move(cmp_bl), std::move(bl), | |
487 | mismatch_off, op_flags, trace); | |
488 | finish_in_flight_io(); | |
489 | } | |
490 | trace.event("finish"); | |
491 | } | |
492 | ||
224ce89b WB |
493 | template <typename I> |
494 | void ImageRequestWQ<I>::shut_down(Context *on_shutdown) { | |
11fdf7f2 | 495 | ceph_assert(m_image_ctx.owner_lock.is_locked()); |
7c673cae FG |
496 | |
497 | { | |
498 | RWLock::WLocker locker(m_lock); | |
11fdf7f2 | 499 | ceph_assert(!m_shutdown); |
7c673cae FG |
500 | m_shutdown = true; |
501 | ||
502 | CephContext *cct = m_image_ctx.cct; | |
224ce89b | 503 | ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load() |
7c673cae | 504 | << dendl; |
224ce89b | 505 | if (m_in_flight_ios > 0) { |
7c673cae FG |
506 | m_on_shutdown = on_shutdown; |
507 | return; | |
508 | } | |
509 | } | |
510 | ||
511 | // ensure that all in-flight IO is flushed | |
11fdf7f2 | 512 | flush_image(m_image_ctx, on_shutdown); |
7c673cae FG |
513 | } |
514 | ||
224ce89b WB |
515 | template <typename I> |
516 | int ImageRequestWQ<I>::block_writes() { | |
7c673cae FG |
517 | C_SaferCond cond_ctx; |
518 | block_writes(&cond_ctx); | |
519 | return cond_ctx.wait(); | |
520 | } | |
521 | ||
224ce89b WB |
522 | template <typename I> |
523 | void ImageRequestWQ<I>::block_writes(Context *on_blocked) { | |
11fdf7f2 | 524 | ceph_assert(m_image_ctx.owner_lock.is_locked()); |
7c673cae FG |
525 | CephContext *cct = m_image_ctx.cct; |
526 | ||
527 | { | |
528 | RWLock::WLocker locker(m_lock); | |
529 | ++m_write_blockers; | |
530 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
531 | << m_write_blockers << dendl; | |
224ce89b | 532 | if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) { |
7c673cae FG |
533 | m_write_blocker_contexts.push_back(on_blocked); |
534 | return; | |
535 | } | |
536 | } | |
537 | ||
538 | // ensure that all in-flight IO is flushed | |
11fdf7f2 | 539 | flush_image(m_image_ctx, on_blocked); |
7c673cae FG |
540 | } |
541 | ||
224ce89b WB |
542 | template <typename I> |
543 | void ImageRequestWQ<I>::unblock_writes() { | |
7c673cae FG |
544 | CephContext *cct = m_image_ctx.cct; |
545 | ||
546 | bool wake_up = false; | |
11fdf7f2 | 547 | Contexts waiter_contexts; |
7c673cae FG |
548 | { |
549 | RWLock::WLocker locker(m_lock); | |
11fdf7f2 | 550 | ceph_assert(m_write_blockers > 0); |
7c673cae FG |
551 | --m_write_blockers; |
552 | ||
553 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
554 | << m_write_blockers << dendl; | |
555 | if (m_write_blockers == 0) { | |
556 | wake_up = true; | |
11fdf7f2 | 557 | std::swap(waiter_contexts, m_unblocked_write_waiter_contexts); |
7c673cae FG |
558 | } |
559 | } | |
560 | ||
561 | if (wake_up) { | |
11fdf7f2 TL |
562 | for (auto ctx : waiter_contexts) { |
563 | ctx->complete(0); | |
564 | } | |
224ce89b | 565 | this->signal(); |
7c673cae FG |
566 | } |
567 | } | |
568 | ||
11fdf7f2 TL |
569 | template <typename I> |
570 | void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) { | |
571 | ceph_assert(m_image_ctx.owner_lock.is_locked()); | |
572 | CephContext *cct = m_image_ctx.cct; | |
573 | ||
574 | { | |
575 | RWLock::WLocker locker(m_lock); | |
576 | ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers=" | |
577 | << m_write_blockers << dendl; | |
578 | if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) { | |
579 | m_unblocked_write_waiter_contexts.push_back(on_unblocked); | |
580 | return; | |
581 | } | |
582 | } | |
583 | ||
584 | on_unblocked->complete(0); | |
585 | } | |
586 | ||
224ce89b WB |
587 | template <typename I> |
588 | void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) { | |
7c673cae FG |
589 | CephContext *cct = m_image_ctx.cct; |
590 | ldout(cct, 20) << dendl; | |
591 | ||
224ce89b | 592 | bool wake_up = false; |
7c673cae FG |
593 | { |
594 | RWLock::WLocker locker(m_lock); | |
224ce89b WB |
595 | switch (direction) { |
596 | case DIRECTION_READ: | |
597 | wake_up = (enabled != m_require_lock_on_read); | |
598 | m_require_lock_on_read = enabled; | |
599 | break; | |
600 | case DIRECTION_WRITE: | |
601 | wake_up = (enabled != m_require_lock_on_write); | |
602 | m_require_lock_on_write = enabled; | |
603 | break; | |
604 | case DIRECTION_BOTH: | |
605 | wake_up = (enabled != m_require_lock_on_read || | |
606 | enabled != m_require_lock_on_write); | |
607 | m_require_lock_on_read = enabled; | |
608 | m_require_lock_on_write = enabled; | |
609 | break; | |
7c673cae | 610 | } |
224ce89b | 611 | } |
7c673cae | 612 | |
224ce89b WB |
613 | // wake up the thread pool whenever the state changes so that |
614 | // we can re-request the lock if required | |
615 | if (wake_up) { | |
616 | this->signal(); | |
7c673cae | 617 | } |
7c673cae FG |
618 | } |
619 | ||
11fdf7f2 TL |
620 | template <typename I> |
621 | void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){ | |
622 | for (auto pair : m_throttles) { | |
623 | pair.second->set_schedule_tick_min(tick); | |
624 | } | |
625 | } | |
626 | ||
627 | template <typename I> | |
628 | void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag, | |
629 | uint64_t limit, | |
630 | uint64_t burst) { | |
631 | CephContext *cct = m_image_ctx.cct; | |
632 | TokenBucketThrottle *throttle = nullptr; | |
633 | for (auto pair : m_throttles) { | |
634 | if (flag == pair.first) { | |
635 | throttle = pair.second; | |
636 | break; | |
637 | } | |
638 | } | |
639 | ceph_assert(throttle != nullptr); | |
640 | ||
641 | int r = throttle->set_limit(limit, burst); | |
642 | if (r < 0) { | |
643 | lderr(cct) << throttle->get_name() << ": invalid qos parameter: " | |
644 | << "burst(" << burst << ") is less than " | |
645 | << "limit(" << limit << ")" << dendl; | |
646 | // if apply failed, we should at least make sure the limit works. | |
647 | throttle->set_limit(limit, 0); | |
648 | } | |
649 | ||
650 | if (limit) | |
651 | m_qos_enabled_flag |= flag; | |
652 | else | |
653 | m_qos_enabled_flag &= ~flag; | |
654 | } | |
655 | ||
656 | template <typename I> | |
657 | void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item, uint64_t flag) { | |
658 | CephContext *cct = m_image_ctx.cct; | |
659 | ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl; | |
660 | ||
661 | ceph_assert(m_io_throttled.load() > 0); | |
662 | item->set_throttled(flag); | |
663 | if (item->were_all_throttled()) { | |
81eedcae | 664 | this->requeue_back(item); |
11fdf7f2 TL |
665 | --m_io_throttled; |
666 | this->signal(); | |
667 | } | |
668 | } | |
669 | ||
670 | template <typename I> | |
671 | bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) { | |
672 | uint64_t tokens = 0; | |
673 | uint64_t flag = 0; | |
674 | bool blocked = false; | |
675 | TokenBucketThrottle* throttle = nullptr; | |
676 | ||
677 | for (auto t : m_throttles) { | |
678 | flag = t.first; | |
679 | if (item->was_throttled(flag)) | |
680 | continue; | |
681 | ||
682 | if (!(m_qos_enabled_flag & flag)) { | |
683 | item->set_throttled(flag); | |
684 | continue; | |
685 | } | |
686 | ||
687 | throttle = t.second; | |
81eedcae TL |
688 | if (item->tokens_requested(flag, &tokens) && |
689 | throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>, | |
11fdf7f2 TL |
690 | &ImageRequestWQ<I>::handle_throttle_ready>( |
691 | tokens, this, item, flag)) { | |
692 | blocked = true; | |
693 | } else { | |
694 | item->set_throttled(flag); | |
695 | } | |
696 | } | |
697 | return blocked; | |
698 | } | |
699 | ||
224ce89b WB |
700 | template <typename I> |
701 | void *ImageRequestWQ<I>::_void_dequeue() { | |
702 | CephContext *cct = m_image_ctx.cct; | |
11fdf7f2 | 703 | ImageDispatchSpec<I> *peek_item = this->front(); |
7c673cae | 704 | |
224ce89b WB |
705 | // no queued IO requests or all IO is blocked/stalled |
706 | if (peek_item == nullptr || m_io_blockers.load() > 0) { | |
7c673cae FG |
707 | return nullptr; |
708 | } | |
709 | ||
11fdf7f2 TL |
710 | if (needs_throttle(peek_item)) { |
711 | ldout(cct, 15) << "throttling IO " << peek_item << dendl; | |
712 | ||
713 | ++m_io_throttled; | |
714 | // dequeue the throttled item | |
715 | ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue(); | |
716 | return nullptr; | |
717 | } | |
718 | ||
224ce89b | 719 | bool lock_required; |
7c673cae FG |
720 | bool refresh_required = m_image_ctx.state->is_refresh_required(); |
721 | { | |
722 | RWLock::RLocker locker(m_lock); | |
224ce89b WB |
723 | bool write_op = peek_item->is_write_op(); |
724 | lock_required = is_lock_required(write_op); | |
725 | if (write_op) { | |
726 | if (!lock_required && m_write_blockers > 0) { | |
727 | // missing lock is not the write blocker | |
7c673cae FG |
728 | return nullptr; |
729 | } | |
730 | ||
224ce89b WB |
731 | if (!lock_required && !refresh_required) { |
732 | // completed ops will requeue the IO -- don't count it as in-progress | |
733 | m_in_flight_writes++; | |
7c673cae | 734 | } |
7c673cae FG |
735 | } |
736 | } | |
737 | ||
11fdf7f2 TL |
738 | auto item = reinterpret_cast<ImageDispatchSpec<I> *>( |
739 | ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue()); | |
740 | ceph_assert(peek_item == item); | |
7c673cae | 741 | |
224ce89b | 742 | if (lock_required) { |
11fdf7f2 | 743 | this->get_pool_lock().unlock(); |
224ce89b WB |
744 | m_image_ctx.owner_lock.get_read(); |
745 | if (m_image_ctx.exclusive_lock != nullptr) { | |
746 | ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl; | |
747 | if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) { | |
748 | lderr(cct) << "op requires exclusive lock" << dendl; | |
91327a77 AA |
749 | fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(), |
750 | item); | |
224ce89b WB |
751 | |
752 | // wake up the IO since we won't be returning a request to process | |
753 | this->signal(); | |
754 | } else { | |
755 | // stall IO until the acquire completes | |
756 | ++m_io_blockers; | |
757 | m_image_ctx.exclusive_lock->acquire_lock(new C_AcquireLock(this, item)); | |
758 | } | |
759 | } else { | |
760 | // raced with the exclusive lock being disabled | |
761 | lock_required = false; | |
762 | } | |
763 | m_image_ctx.owner_lock.put_read(); | |
11fdf7f2 | 764 | this->get_pool_lock().lock(); |
224ce89b WB |
765 | |
766 | if (lock_required) { | |
767 | return nullptr; | |
768 | } | |
769 | } | |
770 | ||
7c673cae | 771 | if (refresh_required) { |
224ce89b | 772 | ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl; |
7c673cae FG |
773 | |
774 | // stall IO until the refresh completes | |
224ce89b | 775 | ++m_io_blockers; |
7c673cae | 776 | |
11fdf7f2 | 777 | this->get_pool_lock().unlock(); |
7c673cae | 778 | m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); |
11fdf7f2 | 779 | this->get_pool_lock().lock(); |
7c673cae FG |
780 | return nullptr; |
781 | } | |
782 | ||
783 | item->start_op(); | |
784 | return item; | |
785 | } | |
786 | ||
224ce89b | 787 | template <typename I> |
11fdf7f2 | 788 | void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) { |
7c673cae FG |
789 | CephContext *cct = m_image_ctx.cct; |
790 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
791 | << "req=" << req << dendl; | |
792 | ||
793 | req->send(); | |
794 | ||
224ce89b | 795 | finish_queued_io(req); |
7c673cae | 796 | if (req->is_write_op()) { |
224ce89b | 797 | finish_in_flight_write(); |
7c673cae FG |
798 | } |
799 | delete req; | |
800 | ||
224ce89b | 801 | finish_in_flight_io(); |
7c673cae FG |
802 | } |
803 | ||
224ce89b | 804 | template <typename I> |
11fdf7f2 | 805 | void ImageRequestWQ<I>::finish_queued_io(ImageDispatchSpec<I> *req) { |
7c673cae FG |
806 | RWLock::RLocker locker(m_lock); |
807 | if (req->is_write_op()) { | |
11fdf7f2 | 808 | ceph_assert(m_queued_writes > 0); |
7c673cae FG |
809 | m_queued_writes--; |
810 | } else { | |
11fdf7f2 | 811 | ceph_assert(m_queued_reads > 0); |
7c673cae FG |
812 | m_queued_reads--; |
813 | } | |
814 | } | |
815 | ||
224ce89b WB |
816 | template <typename I> |
817 | void ImageRequestWQ<I>::finish_in_flight_write() { | |
7c673cae FG |
818 | bool writes_blocked = false; |
819 | { | |
820 | RWLock::RLocker locker(m_lock); | |
11fdf7f2 | 821 | ceph_assert(m_in_flight_writes > 0); |
224ce89b | 822 | if (--m_in_flight_writes == 0 && |
7c673cae FG |
823 | !m_write_blocker_contexts.empty()) { |
824 | writes_blocked = true; | |
825 | } | |
826 | } | |
827 | ||
828 | if (writes_blocked) { | |
11fdf7f2 | 829 | flush_image(m_image_ctx, new C_BlockedWrites(this)); |
7c673cae FG |
830 | } |
831 | } | |
832 | ||
224ce89b WB |
833 | template <typename I> |
834 | int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) { | |
7c673cae FG |
835 | RWLock::RLocker locker(m_lock); |
836 | ||
837 | if (m_shutdown) { | |
838 | CephContext *cct = m_image_ctx.cct; | |
839 | lderr(cct) << "IO received on closed image" << dendl; | |
840 | ||
224ce89b | 841 | c->get(); |
7c673cae FG |
842 | c->fail(-ESHUTDOWN); |
843 | return false; | |
844 | } | |
845 | ||
224ce89b | 846 | m_in_flight_ios++; |
7c673cae FG |
847 | return true; |
848 | } | |
849 | ||
224ce89b WB |
850 | template <typename I> |
851 | void ImageRequestWQ<I>::finish_in_flight_io() { | |
7c673cae FG |
852 | Context *on_shutdown; |
853 | { | |
854 | RWLock::RLocker locker(m_lock); | |
224ce89b | 855 | if (--m_in_flight_ios > 0 || !m_shutdown) { |
7c673cae FG |
856 | return; |
857 | } | |
858 | on_shutdown = m_on_shutdown; | |
859 | } | |
860 | ||
861 | CephContext *cct = m_image_ctx.cct; | |
862 | ldout(cct, 5) << "completing shut down" << dendl; | |
863 | ||
11fdf7f2 TL |
864 | ceph_assert(on_shutdown != nullptr); |
865 | flush_image(m_image_ctx, on_shutdown); | |
7c673cae FG |
866 | } |
867 | ||
224ce89b | 868 | template <typename I> |
11fdf7f2 TL |
869 | void ImageRequestWQ<I>::fail_in_flight_io( |
870 | int r, ImageDispatchSpec<I> *req) { | |
224ce89b WB |
871 | this->process_finish(); |
872 | req->fail(r); | |
873 | finish_queued_io(req); | |
874 | delete req; | |
875 | finish_in_flight_io(); | |
876 | } | |
7c673cae | 877 | |
224ce89b WB |
878 | template <typename I> |
879 | bool ImageRequestWQ<I>::is_lock_required(bool write_op) const { | |
11fdf7f2 | 880 | ceph_assert(m_lock.is_locked()); |
224ce89b WB |
881 | return ((write_op && m_require_lock_on_write) || |
882 | (!write_op && m_require_lock_on_read)); | |
7c673cae FG |
883 | } |
884 | ||
224ce89b | 885 | template <typename I> |
11fdf7f2 TL |
886 | void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) { |
887 | ceph_assert(m_image_ctx.owner_lock.is_locked()); | |
224ce89b | 888 | |
7c673cae FG |
889 | CephContext *cct = m_image_ctx.cct; |
890 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
891 | << "req=" << req << dendl; | |
892 | ||
224ce89b | 893 | if (req->is_write_op()) { |
7c673cae FG |
894 | m_queued_writes++; |
895 | } else { | |
896 | m_queued_reads++; | |
897 | } | |
898 | ||
11fdf7f2 | 899 | ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req); |
224ce89b | 900 | } |
7c673cae | 901 | |
224ce89b | 902 | template <typename I> |
11fdf7f2 TL |
903 | void ImageRequestWQ<I>::handle_acquire_lock( |
904 | int r, ImageDispatchSpec<I> *req) { | |
224ce89b WB |
905 | CephContext *cct = m_image_ctx.cct; |
906 | ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl; | |
907 | ||
908 | if (r < 0) { | |
909 | fail_in_flight_io(r, req); | |
910 | } else { | |
911 | // since IO was stalled for acquire -- original IO order is preserved | |
912 | // if we requeue this op for work queue processing | |
81eedcae | 913 | this->requeue_front(req); |
7c673cae | 914 | } |
224ce89b | 915 | |
11fdf7f2 | 916 | ceph_assert(m_io_blockers.load() > 0); |
224ce89b WB |
917 | --m_io_blockers; |
918 | this->signal(); | |
7c673cae FG |
919 | } |
920 | ||
224ce89b | 921 | template <typename I> |
11fdf7f2 TL |
922 | void ImageRequestWQ<I>::handle_refreshed( |
923 | int r, ImageDispatchSpec<I> *req) { | |
7c673cae | 924 | CephContext *cct = m_image_ctx.cct; |
224ce89b WB |
925 | ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", " |
926 | << "req=" << req << dendl; | |
7c673cae | 927 | if (r < 0) { |
224ce89b | 928 | fail_in_flight_io(r, req); |
7c673cae FG |
929 | } else { |
930 | // since IO was stalled for refresh -- original IO order is preserved | |
931 | // if we requeue this op for work queue processing | |
81eedcae | 932 | this->requeue_front(req); |
7c673cae FG |
933 | } |
934 | ||
11fdf7f2 | 935 | ceph_assert(m_io_blockers.load() > 0); |
224ce89b WB |
936 | --m_io_blockers; |
937 | this->signal(); | |
7c673cae FG |
938 | } |
939 | ||
224ce89b WB |
940 | template <typename I> |
941 | void ImageRequestWQ<I>::handle_blocked_writes(int r) { | |
7c673cae FG |
942 | Contexts contexts; |
943 | { | |
944 | RWLock::WLocker locker(m_lock); | |
945 | contexts.swap(m_write_blocker_contexts); | |
946 | } | |
947 | ||
948 | for (auto ctx : contexts) { | |
949 | ctx->complete(0); | |
950 | } | |
951 | } | |
952 | ||
224ce89b WB |
953 | template class librbd::io::ImageRequestWQ<librbd::ImageCtx>; |
954 | ||
7c673cae FG |
955 | } // namespace io |
956 | } // namespace librbd |