]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/io/ImageRequestWQ.h" | |
5 | #include "common/errno.h" | |
31f18b77 | 6 | #include "common/zipkin_trace.h" |
11fdf7f2 | 7 | #include "common/Cond.h" |
7c673cae FG |
8 | #include "librbd/ExclusiveLock.h" |
9 | #include "librbd/ImageCtx.h" | |
10 | #include "librbd/ImageState.h" | |
91327a77 | 11 | #include "librbd/ImageWatcher.h" |
7c673cae FG |
12 | #include "librbd/internal.h" |
13 | #include "librbd/Utils.h" | |
14 | #include "librbd/exclusive_lock/Policy.h" | |
15 | #include "librbd/io/AioCompletion.h" | |
16 | #include "librbd/io/ImageRequest.h" | |
11fdf7f2 TL |
17 | #include "librbd/io/ImageDispatchSpec.h" |
18 | #include "common/EventTrace.h" | |
7c673cae FG |
19 | |
20 | #define dout_subsys ceph_subsys_rbd | |
21 | #undef dout_prefix | |
22 | #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \ | |
23 | << " " << __func__ << ": " | |
24 | ||
25 | namespace librbd { | |
9f95a23c TL |
26 | |
27 | using util::create_context_callback; | |
28 | ||
7c673cae FG |
29 | namespace io { |
30 | ||
11fdf7f2 TL |
31 | namespace { |
32 | ||
33 | template <typename I> | |
34 | void flush_image(I& image_ctx, Context* on_finish) { | |
494da23a | 35 | auto aio_comp = librbd::io::AioCompletion::create_and_start( |
11fdf7f2 TL |
36 | on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH); |
37 | auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request( | |
38 | image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {}); | |
39 | req->send(); | |
40 | delete req; | |
41 | } | |
42 | ||
43 | } // anonymous namespace | |
44 | ||
224ce89b WB |
45 | template <typename I> |
46 | struct ImageRequestWQ<I>::C_AcquireLock : public Context { | |
47 | ImageRequestWQ *work_queue; | |
11fdf7f2 | 48 | ImageDispatchSpec<I> *image_request; |
224ce89b | 49 | |
11fdf7f2 | 50 | C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request) |
224ce89b WB |
51 | : work_queue(work_queue), image_request(image_request) { |
52 | } | |
53 | ||
54 | void finish(int r) override { | |
55 | work_queue->handle_acquire_lock(r, image_request); | |
56 | } | |
57 | }; | |
58 | ||
59 | template <typename I> | |
60 | struct ImageRequestWQ<I>::C_BlockedWrites : public Context { | |
61 | ImageRequestWQ *work_queue; | |
11fdf7f2 | 62 | explicit C_BlockedWrites(ImageRequestWQ *_work_queue) |
224ce89b WB |
63 | : work_queue(_work_queue) { |
64 | } | |
65 | ||
66 | void finish(int r) override { | |
67 | work_queue->handle_blocked_writes(r); | |
68 | } | |
69 | }; | |
70 | ||
71 | template <typename I> | |
72 | struct ImageRequestWQ<I>::C_RefreshFinish : public Context { | |
73 | ImageRequestWQ *work_queue; | |
11fdf7f2 | 74 | ImageDispatchSpec<I> *image_request; |
224ce89b WB |
75 | |
76 | C_RefreshFinish(ImageRequestWQ *work_queue, | |
11fdf7f2 | 77 | ImageDispatchSpec<I> *image_request) |
224ce89b WB |
78 | : work_queue(work_queue), image_request(image_request) { |
79 | } | |
80 | void finish(int r) override { | |
81 | work_queue->handle_refreshed(r, image_request); | |
82 | } | |
83 | }; | |
84 | ||
11fdf7f2 TL |
85 | static std::map<uint64_t, std::string> throttle_flags = { |
86 | { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" }, | |
87 | { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" }, | |
88 | { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" }, | |
89 | { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" }, | |
90 | { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" }, | |
91 | { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" } | |
92 | }; | |
93 | ||
224ce89b WB |
94 | template <typename I> |
95 | ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name, | |
96 | time_t ti, ThreadPool *tp) | |
11fdf7f2 | 97 | : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp), |
7c673cae | 98 | m_image_ctx(*image_ctx), |
9f95a23c TL |
99 | m_lock(ceph::make_shared_mutex( |
100 | util::unique_lock_name("ImageRequestWQ<I>::m_lock", this))) { | |
7c673cae FG |
101 | CephContext *cct = m_image_ctx.cct; |
102 | ldout(cct, 5) << "ictx=" << image_ctx << dendl; | |
11fdf7f2 TL |
103 | |
104 | SafeTimer *timer; | |
9f95a23c | 105 | ceph::mutex *timer_lock; |
11fdf7f2 TL |
106 | ImageCtx::get_timer_instance(cct, &timer, &timer_lock); |
107 | ||
108 | for (auto flag : throttle_flags) { | |
109 | m_throttles.push_back(make_pair( | |
110 | flag.first, | |
111 | new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock))); | |
112 | } | |
113 | ||
224ce89b | 114 | this->register_work_queue(); |
7c673cae FG |
115 | } |
116 | ||
11fdf7f2 TL |
117 | template <typename I> |
118 | ImageRequestWQ<I>::~ImageRequestWQ() { | |
119 | for (auto t : m_throttles) { | |
120 | delete t.second; | |
121 | } | |
122 | } | |
123 | ||
224ce89b WB |
124 | template <typename I> |
125 | ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len, | |
126 | ReadResult &&read_result, int op_flags) { | |
7c673cae FG |
127 | CephContext *cct = m_image_ctx.cct; |
128 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
129 | << "len = " << len << dendl; | |
130 | ||
131 | C_SaferCond cond; | |
132 | AioCompletion *c = AioCompletion::create(&cond); | |
133 | aio_read(c, off, len, std::move(read_result), op_flags, false); | |
134 | return cond.wait(); | |
135 | } | |
136 | ||
224ce89b WB |
137 | template <typename I> |
138 | ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len, | |
139 | bufferlist &&bl, int op_flags) { | |
7c673cae FG |
140 | CephContext *cct = m_image_ctx.cct; |
141 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
142 | << "len = " << len << dendl; | |
143 | ||
9f95a23c | 144 | m_image_ctx.image_lock.lock_shared(); |
7c673cae | 145 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); |
9f95a23c | 146 | m_image_ctx.image_lock.unlock_shared(); |
7c673cae FG |
147 | if (r < 0) { |
148 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
149 | return r; | |
150 | } | |
151 | ||
152 | C_SaferCond cond; | |
153 | AioCompletion *c = AioCompletion::create(&cond); | |
154 | aio_write(c, off, len, std::move(bl), op_flags, false); | |
155 | ||
156 | r = cond.wait(); | |
157 | if (r < 0) { | |
158 | return r; | |
159 | } | |
160 | return len; | |
161 | } | |
162 | ||
224ce89b WB |
163 | template <typename I> |
164 | ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len, | |
11fdf7f2 | 165 | uint32_t discard_granularity_bytes) { |
7c673cae FG |
166 | CephContext *cct = m_image_ctx.cct; |
167 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
168 | << "len = " << len << dendl; | |
169 | ||
9f95a23c | 170 | m_image_ctx.image_lock.lock_shared(); |
7c673cae | 171 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); |
9f95a23c | 172 | m_image_ctx.image_lock.unlock_shared(); |
7c673cae FG |
173 | if (r < 0) { |
174 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
175 | return r; | |
176 | } | |
177 | ||
178 | C_SaferCond cond; | |
179 | AioCompletion *c = AioCompletion::create(&cond); | |
11fdf7f2 | 180 | aio_discard(c, off, len, discard_granularity_bytes, false); |
7c673cae FG |
181 | |
182 | r = cond.wait(); | |
183 | if (r < 0) { | |
184 | return r; | |
185 | } | |
186 | return len; | |
187 | } | |
188 | ||
224ce89b WB |
189 | template <typename I> |
190 | ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len, | |
191 | bufferlist &&bl, int op_flags) { | |
7c673cae FG |
192 | CephContext *cct = m_image_ctx.cct; |
193 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
194 | << "len = " << len << ", data_len " << bl.length() << dendl; | |
195 | ||
9f95a23c | 196 | m_image_ctx.image_lock.lock_shared(); |
7c673cae | 197 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); |
9f95a23c | 198 | m_image_ctx.image_lock.unlock_shared(); |
7c673cae FG |
199 | if (r < 0) { |
200 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
201 | return r; | |
202 | } | |
203 | ||
204 | C_SaferCond cond; | |
205 | AioCompletion *c = AioCompletion::create(&cond); | |
206 | aio_writesame(c, off, len, std::move(bl), op_flags, false); | |
207 | ||
208 | r = cond.wait(); | |
209 | if (r < 0) { | |
210 | return r; | |
211 | } | |
212 | return len; | |
213 | } | |
214 | ||
c07f9fc5 FG |
215 | template <typename I> |
216 | ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len, | |
217 | bufferlist &&cmp_bl, | |
218 | bufferlist &&bl, | |
219 | uint64_t *mismatch_off, | |
220 | int op_flags){ | |
221 | CephContext *cct = m_image_ctx.cct; | |
222 | ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off=" | |
223 | << off << ", " << "len = " << len << dendl; | |
224 | ||
9f95a23c | 225 | m_image_ctx.image_lock.lock_shared(); |
c07f9fc5 | 226 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); |
9f95a23c | 227 | m_image_ctx.image_lock.unlock_shared(); |
c07f9fc5 FG |
228 | if (r < 0) { |
229 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
230 | return r; | |
231 | } | |
232 | ||
233 | C_SaferCond cond; | |
234 | AioCompletion *c = AioCompletion::create(&cond); | |
235 | aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl), | |
236 | mismatch_off, op_flags, false); | |
237 | ||
238 | r = cond.wait(); | |
239 | if (r < 0) { | |
240 | return r; | |
241 | } | |
242 | ||
243 | return len; | |
244 | } | |
245 | ||
11fdf7f2 TL |
246 | template <typename I> |
247 | int ImageRequestWQ<I>::flush() { | |
248 | CephContext *cct = m_image_ctx.cct; | |
249 | ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; | |
250 | ||
251 | C_SaferCond cond; | |
252 | AioCompletion *c = AioCompletion::create(&cond); | |
253 | aio_flush(c, false); | |
254 | ||
255 | int r = cond.wait(); | |
256 | if (r < 0) { | |
257 | return r; | |
258 | } | |
259 | ||
260 | return 0; | |
261 | } | |
262 | ||
224ce89b WB |
263 | template <typename I> |
264 | void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len, | |
265 | ReadResult &&read_result, int op_flags, | |
266 | bool native_async) { | |
7c673cae | 267 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 268 | FUNCTRACE(cct); |
31f18b77 | 269 | ZTracer::Trace trace; |
181888fb | 270 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
271 | trace.init("wq: read", &m_image_ctx.trace_endpoint); |
272 | trace.event("start"); | |
273 | } | |
274 | ||
224ce89b | 275 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ); |
7c673cae FG |
276 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
277 | << "completion=" << c << ", off=" << off << ", " | |
278 | << "len=" << len << ", " << "flags=" << op_flags << dendl; | |
279 | ||
280 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
281 | c->set_event_notify(true); | |
282 | } | |
283 | ||
224ce89b | 284 | if (!start_in_flight_io(c)) { |
7c673cae FG |
285 | return; |
286 | } | |
287 | ||
7c673cae FG |
288 | // if journaling is enabled -- we need to replay the journal because |
289 | // it might contain an uncommitted write | |
9f95a23c | 290 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; |
7c673cae | 291 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() || |
224ce89b | 292 | require_lock_on_read()) { |
11fdf7f2 | 293 | queue(ImageDispatchSpec<I>::create_read_request( |
224ce89b WB |
294 | m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags, |
295 | trace)); | |
7c673cae FG |
296 | } else { |
297 | c->start_op(); | |
224ce89b WB |
298 | ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}}, |
299 | std::move(read_result), op_flags, trace); | |
300 | finish_in_flight_io(); | |
7c673cae | 301 | } |
31f18b77 | 302 | trace.event("finish"); |
7c673cae FG |
303 | } |
304 | ||
224ce89b WB |
305 | template <typename I> |
306 | void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len, | |
307 | bufferlist &&bl, int op_flags, | |
308 | bool native_async) { | |
7c673cae | 309 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 310 | FUNCTRACE(cct); |
31f18b77 | 311 | ZTracer::Trace trace; |
181888fb | 312 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
313 | trace.init("wq: write", &m_image_ctx.trace_endpoint); |
314 | trace.event("init"); | |
315 | } | |
316 | ||
224ce89b | 317 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE); |
7c673cae FG |
318 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
319 | << "completion=" << c << ", off=" << off << ", " | |
320 | << "len=" << len << ", flags=" << op_flags << dendl; | |
321 | ||
322 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
323 | c->set_event_notify(true); | |
324 | } | |
325 | ||
224ce89b | 326 | if (!start_in_flight_io(c)) { |
7c673cae FG |
327 | return; |
328 | } | |
329 | ||
9f95a23c TL |
330 | auto tid = ++m_last_tid; |
331 | ||
332 | { | |
333 | std::lock_guard locker{m_lock}; | |
334 | m_queued_or_blocked_io_tids.insert(tid); | |
335 | } | |
336 | ||
337 | ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_request( | |
338 | m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace, tid); | |
339 | ||
340 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
7c673cae | 341 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { |
9f95a23c | 342 | queue(req); |
7c673cae | 343 | } else { |
9f95a23c | 344 | process_io(req, false); |
224ce89b | 345 | finish_in_flight_io(); |
7c673cae | 346 | } |
31f18b77 | 347 | trace.event("finish"); |
7c673cae FG |
348 | } |
349 | ||
224ce89b WB |
350 | template <typename I> |
351 | void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off, | |
11fdf7f2 TL |
352 | uint64_t len, |
353 | uint32_t discard_granularity_bytes, | |
224ce89b | 354 | bool native_async) { |
7c673cae | 355 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 356 | FUNCTRACE(cct); |
31f18b77 | 357 | ZTracer::Trace trace; |
181888fb | 358 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
359 | trace.init("wq: discard", &m_image_ctx.trace_endpoint); |
360 | trace.event("init"); | |
361 | } | |
362 | ||
224ce89b | 363 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD); |
7c673cae FG |
364 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
365 | << "completion=" << c << ", off=" << off << ", len=" << len | |
366 | << dendl; | |
367 | ||
368 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
369 | c->set_event_notify(true); | |
370 | } | |
371 | ||
224ce89b | 372 | if (!start_in_flight_io(c)) { |
7c673cae FG |
373 | return; |
374 | } | |
375 | ||
9f95a23c TL |
376 | auto tid = ++m_last_tid; |
377 | ||
378 | { | |
379 | std::lock_guard locker{m_lock}; | |
380 | m_queued_or_blocked_io_tids.insert(tid); | |
381 | } | |
382 | ||
383 | ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_discard_request( | |
384 | m_image_ctx, c, off, len, discard_granularity_bytes, trace, tid); | |
385 | ||
386 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
7c673cae | 387 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { |
9f95a23c | 388 | queue(req); |
7c673cae | 389 | } else { |
9f95a23c | 390 | process_io(req, false); |
224ce89b | 391 | finish_in_flight_io(); |
7c673cae | 392 | } |
31f18b77 | 393 | trace.event("finish"); |
7c673cae FG |
394 | } |
395 | ||
224ce89b WB |
396 | template <typename I> |
397 | void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) { | |
7c673cae | 398 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 399 | FUNCTRACE(cct); |
31f18b77 | 400 | ZTracer::Trace trace; |
181888fb | 401 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
402 | trace.init("wq: flush", &m_image_ctx.trace_endpoint); |
403 | trace.event("init"); | |
404 | } | |
405 | ||
224ce89b | 406 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH); |
7c673cae FG |
407 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
408 | << "completion=" << c << dendl; | |
409 | ||
410 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
411 | c->set_event_notify(true); | |
412 | } | |
413 | ||
224ce89b | 414 | if (!start_in_flight_io(c)) { |
7c673cae FG |
415 | return; |
416 | } | |
417 | ||
9f95a23c TL |
418 | auto tid = ++m_last_tid; |
419 | ||
420 | ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_flush_request( | |
421 | m_image_ctx, c, FLUSH_SOURCE_USER, trace); | |
422 | ||
423 | { | |
424 | std::lock_guard locker{m_lock}; | |
425 | if(!m_queued_or_blocked_io_tids.empty()) { | |
426 | ldout(cct, 20) << "queueing flush, tid: " << tid << dendl; | |
427 | m_queued_flushes.emplace(tid, req); | |
428 | --m_in_flight_ios; | |
429 | return; | |
430 | } | |
431 | } | |
432 | ||
433 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
7c673cae | 434 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { |
9f95a23c | 435 | queue(req); |
7c673cae | 436 | } else { |
9f95a23c | 437 | process_io(req, false); |
224ce89b | 438 | finish_in_flight_io(); |
7c673cae | 439 | } |
31f18b77 | 440 | trace.event("finish"); |
7c673cae FG |
441 | } |
442 | ||
224ce89b WB |
443 | template <typename I> |
444 | void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off, | |
445 | uint64_t len, bufferlist &&bl, | |
446 | int op_flags, bool native_async) { | |
7c673cae | 447 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 448 | FUNCTRACE(cct); |
31f18b77 | 449 | ZTracer::Trace trace; |
181888fb | 450 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
451 | trace.init("wq: writesame", &m_image_ctx.trace_endpoint); |
452 | trace.event("init"); | |
453 | } | |
454 | ||
224ce89b | 455 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME); |
7c673cae FG |
456 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
457 | << "completion=" << c << ", off=" << off << ", " | |
458 | << "len=" << len << ", data_len = " << bl.length() << ", " | |
459 | << "flags=" << op_flags << dendl; | |
460 | ||
461 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
462 | c->set_event_notify(true); | |
463 | } | |
464 | ||
224ce89b | 465 | if (!start_in_flight_io(c)) { |
7c673cae FG |
466 | return; |
467 | } | |
468 | ||
9f95a23c TL |
469 | auto tid = ++m_last_tid; |
470 | ||
471 | { | |
472 | std::lock_guard locker{m_lock}; | |
473 | m_queued_or_blocked_io_tids.insert(tid); | |
474 | } | |
475 | ||
476 | ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_same_request( | |
477 | m_image_ctx, c, off, len, std::move(bl), op_flags, trace, tid); | |
478 | ||
479 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
7c673cae | 480 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { |
9f95a23c | 481 | queue(req); |
7c673cae | 482 | } else { |
9f95a23c | 483 | process_io(req, false); |
224ce89b | 484 | finish_in_flight_io(); |
7c673cae | 485 | } |
31f18b77 | 486 | trace.event("finish"); |
7c673cae FG |
487 | } |
488 | ||
c07f9fc5 FG |
489 | template <typename I> |
490 | void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c, | |
491 | uint64_t off, uint64_t len, | |
492 | bufferlist &&cmp_bl, | |
493 | bufferlist &&bl, | |
494 | uint64_t *mismatch_off, | |
495 | int op_flags, bool native_async) { | |
496 | CephContext *cct = m_image_ctx.cct; | |
11fdf7f2 | 497 | FUNCTRACE(cct); |
c07f9fc5 | 498 | ZTracer::Trace trace; |
181888fb | 499 | if (m_image_ctx.blkin_trace_all) { |
c07f9fc5 FG |
500 | trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint); |
501 | trace.event("init"); | |
502 | } | |
503 | ||
504 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE); | |
505 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
506 | << "completion=" << c << ", off=" << off << ", " | |
507 | << "len=" << len << dendl; | |
508 | ||
509 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
510 | c->set_event_notify(true); | |
511 | } | |
512 | ||
513 | if (!start_in_flight_io(c)) { | |
514 | return; | |
515 | } | |
516 | ||
9f95a23c TL |
517 | auto tid = ++m_last_tid; |
518 | ||
519 | { | |
520 | std::lock_guard locker{m_lock}; | |
521 | m_queued_or_blocked_io_tids.insert(tid); | |
522 | } | |
523 | ||
524 | ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_compare_and_write_request( | |
c07f9fc5 | 525 | m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl), |
9f95a23c TL |
526 | mismatch_off, op_flags, trace, tid); |
527 | ||
528 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
529 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
530 | queue(req); | |
c07f9fc5 | 531 | } else { |
9f95a23c | 532 | process_io(req, false); |
c07f9fc5 FG |
533 | finish_in_flight_io(); |
534 | } | |
535 | trace.event("finish"); | |
536 | } | |
537 | ||
9f95a23c TL |
538 | template <typename I> |
539 | bool ImageRequestWQ<I>::block_overlapping_io( | |
540 | ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) { | |
541 | CephContext *cct = m_image_ctx.cct; | |
542 | ldout(cct, 20) << "ictx=" << &m_image_ctx | |
543 | << "off: " << off << " len: " << len <<dendl; | |
544 | ||
545 | if(len == 0) { | |
546 | return false; | |
547 | } | |
548 | ||
549 | if (in_flight_image_extents->empty() || | |
550 | !in_flight_image_extents->intersects(off, len)) { | |
551 | in_flight_image_extents->insert(off, len); | |
552 | return false; | |
553 | } | |
554 | ||
555 | return true; | |
556 | } | |
557 | ||
558 | template <typename I> | |
559 | void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length, | |
560 | uint64_t tid) { | |
561 | CephContext *cct = m_image_ctx.cct; | |
562 | ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; | |
563 | ||
564 | remove_in_flight_write_ios(offset, length, true, tid); | |
565 | ||
566 | std::unique_lock locker{m_lock}; | |
567 | if (!m_blocked_ios.empty()) { | |
568 | auto it = m_blocked_ios.begin(); | |
569 | while (it != m_blocked_ios.end()) { | |
570 | auto blocked_io = *it; | |
571 | ||
572 | const auto& extents = blocked_io->get_image_extents(); | |
573 | uint64_t off = extents.size() ? extents.front().first : 0; | |
574 | uint64_t len = extents.size() ? extents.front().second : 0; | |
575 | ||
576 | if (block_overlapping_io(&m_in_flight_extents, off, len)) { | |
577 | break; | |
578 | } | |
579 | ldout(cct, 20) << "unblocking off: " << off << ", " | |
580 | << "len: " << len << dendl; | |
581 | AioCompletion *aio_comp = blocked_io->get_aio_completion(); | |
582 | ||
583 | m_blocked_ios.erase(it); | |
584 | locker.unlock(); | |
585 | queue_unblocked_io(aio_comp, blocked_io); | |
586 | locker.lock(); | |
587 | } | |
588 | } | |
589 | } | |
590 | ||
591 | template <typename I> | |
592 | void ImageRequestWQ<I>::unblock_flushes() { | |
593 | CephContext *cct = m_image_ctx.cct; | |
594 | ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; | |
595 | std::unique_lock locker{m_lock}; | |
596 | auto io_tid_it = m_queued_or_blocked_io_tids.begin(); | |
597 | while (true) { | |
598 | auto it = m_queued_flushes.begin(); | |
599 | if (it == m_queued_flushes.end() || | |
600 | (io_tid_it != m_queued_or_blocked_io_tids.end() && | |
601 | *io_tid_it < it->first)) { | |
602 | break; | |
603 | } | |
604 | ||
605 | auto blocked_flush = *it; | |
606 | ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl; | |
607 | ||
608 | AioCompletion *aio_comp = blocked_flush.second->get_aio_completion(); | |
609 | ||
610 | m_queued_flushes.erase(it); | |
611 | locker.unlock(); | |
612 | queue_unblocked_io(aio_comp, blocked_flush.second); | |
613 | locker.lock(); | |
614 | } | |
615 | } | |
616 | ||
617 | template <typename I> | |
618 | void ImageRequestWQ<I>::queue_unblocked_io(AioCompletion *comp, | |
619 | ImageDispatchSpec<I> *req) { | |
620 | if (!start_in_flight_io(comp)) { | |
621 | return; | |
622 | } | |
623 | ||
624 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
625 | queue(req); | |
626 | } | |
627 | ||
224ce89b WB |
628 | template <typename I> |
629 | void ImageRequestWQ<I>::shut_down(Context *on_shutdown) { | |
9f95a23c | 630 | ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); |
7c673cae FG |
631 | |
632 | { | |
9f95a23c | 633 | std::unique_lock locker{m_lock}; |
11fdf7f2 | 634 | ceph_assert(!m_shutdown); |
7c673cae FG |
635 | m_shutdown = true; |
636 | ||
637 | CephContext *cct = m_image_ctx.cct; | |
224ce89b | 638 | ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load() |
7c673cae | 639 | << dendl; |
224ce89b | 640 | if (m_in_flight_ios > 0) { |
7c673cae FG |
641 | m_on_shutdown = on_shutdown; |
642 | return; | |
643 | } | |
644 | } | |
645 | ||
646 | // ensure that all in-flight IO is flushed | |
11fdf7f2 | 647 | flush_image(m_image_ctx, on_shutdown); |
7c673cae FG |
648 | } |
649 | ||
224ce89b WB |
650 | template <typename I> |
651 | int ImageRequestWQ<I>::block_writes() { | |
7c673cae FG |
652 | C_SaferCond cond_ctx; |
653 | block_writes(&cond_ctx); | |
654 | return cond_ctx.wait(); | |
655 | } | |
656 | ||
224ce89b WB |
657 | template <typename I> |
658 | void ImageRequestWQ<I>::block_writes(Context *on_blocked) { | |
9f95a23c | 659 | ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); |
7c673cae FG |
660 | CephContext *cct = m_image_ctx.cct; |
661 | ||
662 | { | |
9f95a23c | 663 | std::unique_lock locker{m_lock}; |
7c673cae FG |
664 | ++m_write_blockers; |
665 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
666 | << m_write_blockers << dendl; | |
224ce89b | 667 | if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) { |
7c673cae FG |
668 | m_write_blocker_contexts.push_back(on_blocked); |
669 | return; | |
670 | } | |
671 | } | |
672 | ||
673 | // ensure that all in-flight IO is flushed | |
11fdf7f2 | 674 | flush_image(m_image_ctx, on_blocked); |
7c673cae FG |
675 | } |
676 | ||
224ce89b WB |
677 | template <typename I> |
678 | void ImageRequestWQ<I>::unblock_writes() { | |
7c673cae FG |
679 | CephContext *cct = m_image_ctx.cct; |
680 | ||
681 | bool wake_up = false; | |
11fdf7f2 | 682 | Contexts waiter_contexts; |
7c673cae | 683 | { |
9f95a23c | 684 | std::unique_lock locker{m_lock}; |
11fdf7f2 | 685 | ceph_assert(m_write_blockers > 0); |
7c673cae FG |
686 | --m_write_blockers; |
687 | ||
688 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
689 | << m_write_blockers << dendl; | |
690 | if (m_write_blockers == 0) { | |
691 | wake_up = true; | |
11fdf7f2 | 692 | std::swap(waiter_contexts, m_unblocked_write_waiter_contexts); |
7c673cae FG |
693 | } |
694 | } | |
695 | ||
696 | if (wake_up) { | |
11fdf7f2 TL |
697 | for (auto ctx : waiter_contexts) { |
698 | ctx->complete(0); | |
699 | } | |
224ce89b | 700 | this->signal(); |
7c673cae FG |
701 | } |
702 | } | |
703 | ||
11fdf7f2 TL |
704 | template <typename I> |
705 | void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) { | |
9f95a23c | 706 | ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); |
11fdf7f2 TL |
707 | CephContext *cct = m_image_ctx.cct; |
708 | ||
709 | { | |
9f95a23c | 710 | std::unique_lock locker{m_lock}; |
11fdf7f2 TL |
711 | ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers=" |
712 | << m_write_blockers << dendl; | |
713 | if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) { | |
714 | m_unblocked_write_waiter_contexts.push_back(on_unblocked); | |
715 | return; | |
716 | } | |
717 | } | |
718 | ||
719 | on_unblocked->complete(0); | |
720 | } | |
721 | ||
224ce89b WB |
722 | template <typename I> |
723 | void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) { | |
7c673cae FG |
724 | CephContext *cct = m_image_ctx.cct; |
725 | ldout(cct, 20) << dendl; | |
726 | ||
224ce89b | 727 | bool wake_up = false; |
7c673cae | 728 | { |
9f95a23c | 729 | std::unique_lock locker{m_lock}; |
224ce89b WB |
730 | switch (direction) { |
731 | case DIRECTION_READ: | |
732 | wake_up = (enabled != m_require_lock_on_read); | |
733 | m_require_lock_on_read = enabled; | |
734 | break; | |
735 | case DIRECTION_WRITE: | |
736 | wake_up = (enabled != m_require_lock_on_write); | |
737 | m_require_lock_on_write = enabled; | |
738 | break; | |
739 | case DIRECTION_BOTH: | |
740 | wake_up = (enabled != m_require_lock_on_read || | |
741 | enabled != m_require_lock_on_write); | |
742 | m_require_lock_on_read = enabled; | |
743 | m_require_lock_on_write = enabled; | |
744 | break; | |
7c673cae | 745 | } |
224ce89b | 746 | } |
7c673cae | 747 | |
224ce89b WB |
748 | // wake up the thread pool whenever the state changes so that |
749 | // we can re-request the lock if required | |
750 | if (wake_up) { | |
751 | this->signal(); | |
7c673cae | 752 | } |
7c673cae FG |
753 | } |
754 | ||
11fdf7f2 TL |
755 | template <typename I> |
756 | void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){ | |
757 | for (auto pair : m_throttles) { | |
758 | pair.second->set_schedule_tick_min(tick); | |
759 | } | |
760 | } | |
761 | ||
762 | template <typename I> | |
763 | void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag, | |
764 | uint64_t limit, | |
765 | uint64_t burst) { | |
766 | CephContext *cct = m_image_ctx.cct; | |
767 | TokenBucketThrottle *throttle = nullptr; | |
768 | for (auto pair : m_throttles) { | |
769 | if (flag == pair.first) { | |
770 | throttle = pair.second; | |
771 | break; | |
772 | } | |
773 | } | |
774 | ceph_assert(throttle != nullptr); | |
775 | ||
776 | int r = throttle->set_limit(limit, burst); | |
777 | if (r < 0) { | |
778 | lderr(cct) << throttle->get_name() << ": invalid qos parameter: " | |
779 | << "burst(" << burst << ") is less than " | |
780 | << "limit(" << limit << ")" << dendl; | |
781 | // if apply failed, we should at least make sure the limit works. | |
782 | throttle->set_limit(limit, 0); | |
783 | } | |
784 | ||
785 | if (limit) | |
786 | m_qos_enabled_flag |= flag; | |
787 | else | |
788 | m_qos_enabled_flag &= ~flag; | |
789 | } | |
790 | ||
791 | template <typename I> | |
792 | void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item, uint64_t flag) { | |
793 | CephContext *cct = m_image_ctx.cct; | |
794 | ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl; | |
795 | ||
796 | ceph_assert(m_io_throttled.load() > 0); | |
797 | item->set_throttled(flag); | |
798 | if (item->were_all_throttled()) { | |
81eedcae | 799 | this->requeue_back(item); |
11fdf7f2 TL |
800 | --m_io_throttled; |
801 | this->signal(); | |
802 | } | |
803 | } | |
804 | ||
805 | template <typename I> | |
806 | bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) { | |
807 | uint64_t tokens = 0; | |
808 | uint64_t flag = 0; | |
809 | bool blocked = false; | |
810 | TokenBucketThrottle* throttle = nullptr; | |
811 | ||
812 | for (auto t : m_throttles) { | |
813 | flag = t.first; | |
814 | if (item->was_throttled(flag)) | |
815 | continue; | |
816 | ||
817 | if (!(m_qos_enabled_flag & flag)) { | |
818 | item->set_throttled(flag); | |
819 | continue; | |
820 | } | |
821 | ||
822 | throttle = t.second; | |
81eedcae TL |
823 | if (item->tokens_requested(flag, &tokens) && |
824 | throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>, | |
11fdf7f2 TL |
825 | &ImageRequestWQ<I>::handle_throttle_ready>( |
826 | tokens, this, item, flag)) { | |
827 | blocked = true; | |
828 | } else { | |
829 | item->set_throttled(flag); | |
830 | } | |
831 | } | |
832 | return blocked; | |
833 | } | |
834 | ||
224ce89b WB |
835 | template <typename I> |
836 | void *ImageRequestWQ<I>::_void_dequeue() { | |
837 | CephContext *cct = m_image_ctx.cct; | |
11fdf7f2 | 838 | ImageDispatchSpec<I> *peek_item = this->front(); |
7c673cae | 839 | |
224ce89b WB |
840 | // no queued IO requests or all IO is blocked/stalled |
841 | if (peek_item == nullptr || m_io_blockers.load() > 0) { | |
7c673cae FG |
842 | return nullptr; |
843 | } | |
844 | ||
11fdf7f2 TL |
845 | if (needs_throttle(peek_item)) { |
846 | ldout(cct, 15) << "throttling IO " << peek_item << dendl; | |
847 | ||
848 | ++m_io_throttled; | |
849 | // dequeue the throttled item | |
850 | ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue(); | |
851 | return nullptr; | |
852 | } | |
853 | ||
224ce89b | 854 | bool lock_required; |
7c673cae FG |
855 | bool refresh_required = m_image_ctx.state->is_refresh_required(); |
856 | { | |
9f95a23c | 857 | std::shared_lock locker{m_lock}; |
224ce89b WB |
858 | bool write_op = peek_item->is_write_op(); |
859 | lock_required = is_lock_required(write_op); | |
860 | if (write_op) { | |
861 | if (!lock_required && m_write_blockers > 0) { | |
862 | // missing lock is not the write blocker | |
7c673cae FG |
863 | return nullptr; |
864 | } | |
865 | ||
9f95a23c | 866 | if (!lock_required && !refresh_required && !peek_item->blocked) { |
224ce89b WB |
867 | // completed ops will requeue the IO -- don't count it as in-progress |
868 | m_in_flight_writes++; | |
7c673cae | 869 | } |
7c673cae FG |
870 | } |
871 | } | |
872 | ||
11fdf7f2 TL |
873 | auto item = reinterpret_cast<ImageDispatchSpec<I> *>( |
874 | ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue()); | |
875 | ceph_assert(peek_item == item); | |
7c673cae | 876 | |
224ce89b | 877 | if (lock_required) { |
11fdf7f2 | 878 | this->get_pool_lock().unlock(); |
9f95a23c | 879 | m_image_ctx.owner_lock.lock_shared(); |
224ce89b WB |
880 | if (m_image_ctx.exclusive_lock != nullptr) { |
881 | ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl; | |
882 | if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) { | |
883 | lderr(cct) << "op requires exclusive lock" << dendl; | |
91327a77 AA |
884 | fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(), |
885 | item); | |
224ce89b WB |
886 | |
887 | // wake up the IO since we won't be returning a request to process | |
888 | this->signal(); | |
889 | } else { | |
890 | // stall IO until the acquire completes | |
891 | ++m_io_blockers; | |
9f95a23c TL |
892 | Context *ctx = new C_AcquireLock(this, item); |
893 | ctx = create_context_callback< | |
894 | Context, &Context::complete>( | |
895 | ctx, m_image_ctx.exclusive_lock); | |
896 | m_image_ctx.exclusive_lock->acquire_lock(ctx); | |
224ce89b WB |
897 | } |
898 | } else { | |
899 | // raced with the exclusive lock being disabled | |
900 | lock_required = false; | |
901 | } | |
9f95a23c | 902 | m_image_ctx.owner_lock.unlock_shared(); |
11fdf7f2 | 903 | this->get_pool_lock().lock(); |
224ce89b WB |
904 | |
905 | if (lock_required) { | |
906 | return nullptr; | |
907 | } | |
908 | } | |
909 | ||
7c673cae | 910 | if (refresh_required) { |
224ce89b | 911 | ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl; |
7c673cae FG |
912 | |
913 | // stall IO until the refresh completes | |
224ce89b | 914 | ++m_io_blockers; |
7c673cae | 915 | |
11fdf7f2 | 916 | this->get_pool_lock().unlock(); |
7c673cae | 917 | m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); |
11fdf7f2 | 918 | this->get_pool_lock().lock(); |
7c673cae FG |
919 | return nullptr; |
920 | } | |
921 | ||
7c673cae FG |
922 | return item; |
923 | } | |
924 | ||
224ce89b | 925 | template <typename I> |
9f95a23c TL |
926 | void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req, |
927 | bool non_blocking_io) { | |
7c673cae FG |
928 | CephContext *cct = m_image_ctx.cct; |
929 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
930 | << "req=" << req << dendl; | |
931 | ||
9f95a23c TL |
932 | //extents are invalidated after the request is sent |
933 | //so gather them ahead of that | |
934 | const auto& extents = req->get_image_extents(); | |
935 | bool write_op = req->is_write_op(); | |
936 | uint64_t tid = req->get_tid(); | |
937 | uint64_t offset = extents.size() ? extents.front().first : 0; | |
938 | uint64_t length = extents.size() ? extents.front().second : 0; | |
939 | ||
940 | if (write_op && !req->blocked) { | |
941 | std::lock_guard locker{m_lock}; | |
942 | bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length); | |
943 | if (blocked) { | |
944 | ldout(cct, 20) << "blocking overlapping IO: " << "ictx=" | |
945 | << &m_image_ctx << ", " | |
946 | << "off=" << offset << ", len=" << length << dendl; | |
947 | req->blocked = true; | |
948 | m_blocked_ios.push_back(req); | |
949 | return; | |
950 | } | |
951 | } | |
952 | ||
953 | req->start_op(); | |
7c673cae FG |
954 | req->send(); |
955 | ||
9f95a23c TL |
956 | if (write_op) { |
957 | if (non_blocking_io) { | |
958 | finish_in_flight_write(); | |
959 | } | |
960 | unblock_overlapping_io(offset, length, tid); | |
961 | unblock_flushes(); | |
7c673cae FG |
962 | } |
963 | delete req; | |
9f95a23c | 964 | } |
7c673cae | 965 | |
9f95a23c TL |
966 | template <typename I> |
967 | void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) { | |
968 | CephContext *cct = m_image_ctx.cct; | |
969 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
970 | << "req=" << req << dendl; | |
971 | ||
972 | bool write_op = req->is_write_op(); | |
973 | ||
974 | process_io(req, true); | |
975 | ||
976 | finish_queued_io(write_op); | |
224ce89b | 977 | finish_in_flight_io(); |
7c673cae FG |
978 | } |
979 | ||
224ce89b | 980 | template <typename I> |
9f95a23c TL |
981 | void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t length, |
982 | bool write_op, uint64_t tid) { | |
983 | CephContext *cct = m_image_ctx.cct; | |
984 | ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; | |
985 | { | |
986 | std::lock_guard locker{m_lock}; | |
987 | if (write_op) { | |
988 | if (length > 0) { | |
989 | if(!m_in_flight_extents.empty()) { | |
990 | CephContext *cct = m_image_ctx.cct; | |
991 | ldout(cct, 20) << "erasing in flight extents with tid:" | |
992 | << tid << ", offset: " << offset << dendl; | |
993 | ImageExtentIntervals extents; | |
994 | extents.insert(offset, length); | |
995 | ImageExtentIntervals intersect; | |
996 | intersect.intersection_of(extents, m_in_flight_extents); | |
997 | m_in_flight_extents.subtract(intersect); | |
998 | } | |
999 | } | |
1000 | m_queued_or_blocked_io_tids.erase(tid); | |
1001 | } | |
1002 | } | |
1003 | } | |
1004 | ||
1005 | template <typename I> | |
1006 | void ImageRequestWQ<I>::finish_queued_io(bool write_op) { | |
1007 | std::shared_lock locker{m_lock}; | |
1008 | if (write_op) { | |
11fdf7f2 | 1009 | ceph_assert(m_queued_writes > 0); |
7c673cae FG |
1010 | m_queued_writes--; |
1011 | } else { | |
11fdf7f2 | 1012 | ceph_assert(m_queued_reads > 0); |
7c673cae FG |
1013 | m_queued_reads--; |
1014 | } | |
1015 | } | |
1016 | ||
224ce89b WB |
1017 | template <typename I> |
1018 | void ImageRequestWQ<I>::finish_in_flight_write() { | |
7c673cae FG |
1019 | bool writes_blocked = false; |
1020 | { | |
9f95a23c | 1021 | std::shared_lock locker{m_lock}; |
11fdf7f2 | 1022 | ceph_assert(m_in_flight_writes > 0); |
224ce89b | 1023 | if (--m_in_flight_writes == 0 && |
7c673cae FG |
1024 | !m_write_blocker_contexts.empty()) { |
1025 | writes_blocked = true; | |
1026 | } | |
1027 | } | |
7c673cae | 1028 | if (writes_blocked) { |
11fdf7f2 | 1029 | flush_image(m_image_ctx, new C_BlockedWrites(this)); |
7c673cae FG |
1030 | } |
1031 | } | |
1032 | ||
224ce89b WB |
1033 | template <typename I> |
1034 | int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) { | |
9f95a23c | 1035 | std::shared_lock locker{m_lock}; |
7c673cae FG |
1036 | |
1037 | if (m_shutdown) { | |
1038 | CephContext *cct = m_image_ctx.cct; | |
1039 | lderr(cct) << "IO received on closed image" << dendl; | |
1040 | ||
1041 | c->fail(-ESHUTDOWN); | |
1042 | return false; | |
1043 | } | |
1044 | ||
eafe8130 TL |
1045 | if (!m_image_ctx.data_ctx.is_valid()) { |
1046 | CephContext *cct = m_image_ctx.cct; | |
1047 | lderr(cct) << "missing data pool" << dendl; | |
1048 | ||
eafe8130 TL |
1049 | c->fail(-ENODEV); |
1050 | return false; | |
1051 | } | |
1052 | ||
224ce89b | 1053 | m_in_flight_ios++; |
7c673cae FG |
1054 | return true; |
1055 | } | |
1056 | ||
224ce89b WB |
1057 | template <typename I> |
1058 | void ImageRequestWQ<I>::finish_in_flight_io() { | |
7c673cae FG |
1059 | Context *on_shutdown; |
1060 | { | |
9f95a23c | 1061 | std::shared_lock locker{m_lock}; |
224ce89b | 1062 | if (--m_in_flight_ios > 0 || !m_shutdown) { |
7c673cae FG |
1063 | return; |
1064 | } | |
1065 | on_shutdown = m_on_shutdown; | |
1066 | } | |
1067 | ||
1068 | CephContext *cct = m_image_ctx.cct; | |
1069 | ldout(cct, 5) << "completing shut down" << dendl; | |
1070 | ||
11fdf7f2 TL |
1071 | ceph_assert(on_shutdown != nullptr); |
1072 | flush_image(m_image_ctx, on_shutdown); | |
7c673cae FG |
1073 | } |
1074 | ||
224ce89b | 1075 | template <typename I> |
11fdf7f2 TL |
1076 | void ImageRequestWQ<I>::fail_in_flight_io( |
1077 | int r, ImageDispatchSpec<I> *req) { | |
224ce89b WB |
1078 | this->process_finish(); |
1079 | req->fail(r); | |
9f95a23c TL |
1080 | |
1081 | bool write_op = req->is_write_op(); | |
1082 | uint64_t tid = req->get_tid(); | |
1083 | const auto& extents = req->get_image_extents(); | |
1084 | uint64_t offset = extents.size() ? extents.front().first : 0; | |
1085 | uint64_t length = extents.size() ? extents.front().second : 0; | |
1086 | ||
1087 | finish_queued_io(write_op); | |
1088 | remove_in_flight_write_ios(offset, length, write_op, tid); | |
224ce89b WB |
1089 | delete req; |
1090 | finish_in_flight_io(); | |
1091 | } | |
7c673cae | 1092 | |
224ce89b WB |
1093 | template <typename I> |
1094 | bool ImageRequestWQ<I>::is_lock_required(bool write_op) const { | |
9f95a23c | 1095 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
224ce89b WB |
1096 | return ((write_op && m_require_lock_on_write) || |
1097 | (!write_op && m_require_lock_on_read)); | |
7c673cae FG |
1098 | } |
1099 | ||
224ce89b | 1100 | template <typename I> |
11fdf7f2 | 1101 | void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) { |
9f95a23c | 1102 | ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); |
224ce89b | 1103 | |
7c673cae FG |
1104 | CephContext *cct = m_image_ctx.cct; |
1105 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
1106 | << "req=" << req << dendl; | |
1107 | ||
224ce89b | 1108 | if (req->is_write_op()) { |
7c673cae FG |
1109 | m_queued_writes++; |
1110 | } else { | |
1111 | m_queued_reads++; | |
1112 | } | |
1113 | ||
11fdf7f2 | 1114 | ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req); |
224ce89b | 1115 | } |
7c673cae | 1116 | |
224ce89b | 1117 | template <typename I> |
11fdf7f2 TL |
1118 | void ImageRequestWQ<I>::handle_acquire_lock( |
1119 | int r, ImageDispatchSpec<I> *req) { | |
224ce89b WB |
1120 | CephContext *cct = m_image_ctx.cct; |
1121 | ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl; | |
1122 | ||
1123 | if (r < 0) { | |
1124 | fail_in_flight_io(r, req); | |
1125 | } else { | |
1126 | // since IO was stalled for acquire -- original IO order is preserved | |
1127 | // if we requeue this op for work queue processing | |
81eedcae | 1128 | this->requeue_front(req); |
7c673cae | 1129 | } |
224ce89b | 1130 | |
11fdf7f2 | 1131 | ceph_assert(m_io_blockers.load() > 0); |
224ce89b WB |
1132 | --m_io_blockers; |
1133 | this->signal(); | |
7c673cae FG |
1134 | } |
1135 | ||
224ce89b | 1136 | template <typename I> |
11fdf7f2 TL |
1137 | void ImageRequestWQ<I>::handle_refreshed( |
1138 | int r, ImageDispatchSpec<I> *req) { | |
7c673cae | 1139 | CephContext *cct = m_image_ctx.cct; |
224ce89b WB |
1140 | ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", " |
1141 | << "req=" << req << dendl; | |
7c673cae | 1142 | if (r < 0) { |
224ce89b | 1143 | fail_in_flight_io(r, req); |
7c673cae FG |
1144 | } else { |
1145 | // since IO was stalled for refresh -- original IO order is preserved | |
1146 | // if we requeue this op for work queue processing | |
81eedcae | 1147 | this->requeue_front(req); |
7c673cae FG |
1148 | } |
1149 | ||
11fdf7f2 | 1150 | ceph_assert(m_io_blockers.load() > 0); |
224ce89b WB |
1151 | --m_io_blockers; |
1152 | this->signal(); | |
7c673cae FG |
1153 | } |
1154 | ||
224ce89b WB |
1155 | template <typename I> |
1156 | void ImageRequestWQ<I>::handle_blocked_writes(int r) { | |
7c673cae FG |
1157 | Contexts contexts; |
1158 | { | |
9f95a23c | 1159 | std::unique_lock locker{m_lock}; |
7c673cae FG |
1160 | contexts.swap(m_write_blocker_contexts); |
1161 | } | |
1162 | ||
1163 | for (auto ctx : contexts) { | |
1164 | ctx->complete(0); | |
1165 | } | |
1166 | } | |
1167 | ||
224ce89b WB |
1168 | template class librbd::io::ImageRequestWQ<librbd::ImageCtx>; |
1169 | ||
7c673cae FG |
1170 | } // namespace io |
1171 | } // namespace librbd |