]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/io/ImageRequestWQ.h" | |
5 | #include "common/errno.h" | |
31f18b77 | 6 | #include "common/zipkin_trace.h" |
11fdf7f2 | 7 | #include "common/Cond.h" |
7c673cae FG |
8 | #include "librbd/ExclusiveLock.h" |
9 | #include "librbd/ImageCtx.h" | |
10 | #include "librbd/ImageState.h" | |
91327a77 | 11 | #include "librbd/ImageWatcher.h" |
7c673cae FG |
12 | #include "librbd/internal.h" |
13 | #include "librbd/Utils.h" | |
14 | #include "librbd/exclusive_lock/Policy.h" | |
15 | #include "librbd/io/AioCompletion.h" | |
16 | #include "librbd/io/ImageRequest.h" | |
11fdf7f2 TL |
17 | #include "librbd/io/ImageDispatchSpec.h" |
18 | #include "common/EventTrace.h" | |
7c673cae FG |
19 | |
20 | #define dout_subsys ceph_subsys_rbd | |
21 | #undef dout_prefix | |
22 | #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \ | |
23 | << " " << __func__ << ": " | |
24 | ||
25 | namespace librbd { | |
9f95a23c TL |
26 | |
27 | using util::create_context_callback; | |
28 | ||
7c673cae FG |
29 | namespace io { |
30 | ||
11fdf7f2 TL |
31 | namespace { |
32 | ||
33 | template <typename I> | |
34 | void flush_image(I& image_ctx, Context* on_finish) { | |
494da23a | 35 | auto aio_comp = librbd::io::AioCompletion::create_and_start( |
11fdf7f2 TL |
36 | on_finish, util::get_image_ctx(&image_ctx), librbd::io::AIO_TYPE_FLUSH); |
37 | auto req = librbd::io::ImageDispatchSpec<I>::create_flush_request( | |
38 | image_ctx, aio_comp, librbd::io::FLUSH_SOURCE_INTERNAL, {}); | |
39 | req->send(); | |
40 | delete req; | |
41 | } | |
42 | ||
43 | } // anonymous namespace | |
44 | ||
224ce89b WB |
45 | template <typename I> |
46 | struct ImageRequestWQ<I>::C_AcquireLock : public Context { | |
47 | ImageRequestWQ *work_queue; | |
11fdf7f2 | 48 | ImageDispatchSpec<I> *image_request; |
224ce89b | 49 | |
11fdf7f2 | 50 | C_AcquireLock(ImageRequestWQ *work_queue, ImageDispatchSpec<I> *image_request) |
224ce89b WB |
51 | : work_queue(work_queue), image_request(image_request) { |
52 | } | |
53 | ||
54 | void finish(int r) override { | |
55 | work_queue->handle_acquire_lock(r, image_request); | |
56 | } | |
57 | }; | |
58 | ||
59 | template <typename I> | |
60 | struct ImageRequestWQ<I>::C_BlockedWrites : public Context { | |
61 | ImageRequestWQ *work_queue; | |
11fdf7f2 | 62 | explicit C_BlockedWrites(ImageRequestWQ *_work_queue) |
224ce89b WB |
63 | : work_queue(_work_queue) { |
64 | } | |
65 | ||
66 | void finish(int r) override { | |
67 | work_queue->handle_blocked_writes(r); | |
68 | } | |
69 | }; | |
70 | ||
71 | template <typename I> | |
72 | struct ImageRequestWQ<I>::C_RefreshFinish : public Context { | |
73 | ImageRequestWQ *work_queue; | |
11fdf7f2 | 74 | ImageDispatchSpec<I> *image_request; |
224ce89b WB |
75 | |
76 | C_RefreshFinish(ImageRequestWQ *work_queue, | |
11fdf7f2 | 77 | ImageDispatchSpec<I> *image_request) |
224ce89b WB |
78 | : work_queue(work_queue), image_request(image_request) { |
79 | } | |
80 | void finish(int r) override { | |
81 | work_queue->handle_refreshed(r, image_request); | |
82 | } | |
83 | }; | |
84 | ||
11fdf7f2 TL |
85 | static std::map<uint64_t, std::string> throttle_flags = { |
86 | { RBD_QOS_IOPS_THROTTLE, "rbd_qos_iops_throttle" }, | |
87 | { RBD_QOS_BPS_THROTTLE, "rbd_qos_bps_throttle" }, | |
88 | { RBD_QOS_READ_IOPS_THROTTLE, "rbd_qos_read_iops_throttle" }, | |
89 | { RBD_QOS_WRITE_IOPS_THROTTLE, "rbd_qos_write_iops_throttle" }, | |
90 | { RBD_QOS_READ_BPS_THROTTLE, "rbd_qos_read_bps_throttle" }, | |
91 | { RBD_QOS_WRITE_BPS_THROTTLE, "rbd_qos_write_bps_throttle" } | |
92 | }; | |
93 | ||
224ce89b WB |
94 | template <typename I> |
95 | ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name, | |
96 | time_t ti, ThreadPool *tp) | |
11fdf7f2 | 97 | : ThreadPool::PointerWQ<ImageDispatchSpec<I> >(name, ti, 0, tp), |
7c673cae | 98 | m_image_ctx(*image_ctx), |
9f95a23c TL |
99 | m_lock(ceph::make_shared_mutex( |
100 | util::unique_lock_name("ImageRequestWQ<I>::m_lock", this))) { | |
7c673cae FG |
101 | CephContext *cct = m_image_ctx.cct; |
102 | ldout(cct, 5) << "ictx=" << image_ctx << dendl; | |
11fdf7f2 TL |
103 | |
104 | SafeTimer *timer; | |
9f95a23c | 105 | ceph::mutex *timer_lock; |
11fdf7f2 TL |
106 | ImageCtx::get_timer_instance(cct, &timer, &timer_lock); |
107 | ||
108 | for (auto flag : throttle_flags) { | |
109 | m_throttles.push_back(make_pair( | |
110 | flag.first, | |
111 | new TokenBucketThrottle(cct, flag.second, 0, 0, timer, timer_lock))); | |
112 | } | |
113 | ||
224ce89b | 114 | this->register_work_queue(); |
7c673cae FG |
115 | } |
116 | ||
11fdf7f2 TL |
117 | template <typename I> |
118 | ImageRequestWQ<I>::~ImageRequestWQ() { | |
119 | for (auto t : m_throttles) { | |
120 | delete t.second; | |
121 | } | |
122 | } | |
123 | ||
224ce89b WB |
124 | template <typename I> |
125 | ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len, | |
126 | ReadResult &&read_result, int op_flags) { | |
7c673cae FG |
127 | CephContext *cct = m_image_ctx.cct; |
128 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
129 | << "len = " << len << dendl; | |
130 | ||
131 | C_SaferCond cond; | |
132 | AioCompletion *c = AioCompletion::create(&cond); | |
133 | aio_read(c, off, len, std::move(read_result), op_flags, false); | |
134 | return cond.wait(); | |
135 | } | |
136 | ||
224ce89b WB |
137 | template <typename I> |
138 | ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len, | |
139 | bufferlist &&bl, int op_flags) { | |
7c673cae FG |
140 | CephContext *cct = m_image_ctx.cct; |
141 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
142 | << "len = " << len << dendl; | |
143 | ||
9f95a23c | 144 | m_image_ctx.image_lock.lock_shared(); |
7c673cae | 145 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); |
9f95a23c | 146 | m_image_ctx.image_lock.unlock_shared(); |
7c673cae FG |
147 | if (r < 0) { |
148 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
149 | return r; | |
150 | } | |
151 | ||
152 | C_SaferCond cond; | |
153 | AioCompletion *c = AioCompletion::create(&cond); | |
154 | aio_write(c, off, len, std::move(bl), op_flags, false); | |
155 | ||
156 | r = cond.wait(); | |
157 | if (r < 0) { | |
158 | return r; | |
159 | } | |
160 | return len; | |
161 | } | |
162 | ||
224ce89b WB |
163 | template <typename I> |
164 | ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len, | |
11fdf7f2 | 165 | uint32_t discard_granularity_bytes) { |
7c673cae FG |
166 | CephContext *cct = m_image_ctx.cct; |
167 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
168 | << "len = " << len << dendl; | |
169 | ||
9f95a23c | 170 | m_image_ctx.image_lock.lock_shared(); |
7c673cae | 171 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); |
9f95a23c | 172 | m_image_ctx.image_lock.unlock_shared(); |
7c673cae FG |
173 | if (r < 0) { |
174 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
175 | return r; | |
176 | } | |
177 | ||
178 | C_SaferCond cond; | |
179 | AioCompletion *c = AioCompletion::create(&cond); | |
11fdf7f2 | 180 | aio_discard(c, off, len, discard_granularity_bytes, false); |
7c673cae FG |
181 | |
182 | r = cond.wait(); | |
183 | if (r < 0) { | |
184 | return r; | |
185 | } | |
186 | return len; | |
187 | } | |
188 | ||
224ce89b WB |
189 | template <typename I> |
190 | ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len, | |
191 | bufferlist &&bl, int op_flags) { | |
7c673cae FG |
192 | CephContext *cct = m_image_ctx.cct; |
193 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
194 | << "len = " << len << ", data_len " << bl.length() << dendl; | |
195 | ||
9f95a23c | 196 | m_image_ctx.image_lock.lock_shared(); |
7c673cae | 197 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); |
9f95a23c | 198 | m_image_ctx.image_lock.unlock_shared(); |
7c673cae FG |
199 | if (r < 0) { |
200 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
201 | return r; | |
202 | } | |
203 | ||
204 | C_SaferCond cond; | |
205 | AioCompletion *c = AioCompletion::create(&cond); | |
206 | aio_writesame(c, off, len, std::move(bl), op_flags, false); | |
207 | ||
208 | r = cond.wait(); | |
209 | if (r < 0) { | |
210 | return r; | |
211 | } | |
212 | return len; | |
213 | } | |
214 | ||
f6b5b4d7 TL |
215 | template <typename I> |
216 | ssize_t ImageRequestWQ<I>::write_zeroes(uint64_t off, uint64_t len, | |
217 | int zero_flags, int op_flags) { | |
218 | auto cct = m_image_ctx.cct; | |
219 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
220 | << "len = " << len << dendl; | |
221 | ||
222 | m_image_ctx.image_lock.lock_shared(); | |
223 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
224 | m_image_ctx.image_lock.unlock_shared(); | |
225 | if (r < 0) { | |
226 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
227 | return r; | |
228 | } | |
229 | ||
230 | C_SaferCond ctx; | |
231 | auto aio_comp = io::AioCompletion::create(&ctx); | |
232 | aio_write_zeroes(aio_comp, off, len, zero_flags, op_flags, false); | |
233 | ||
234 | r = ctx.wait(); | |
235 | if (r < 0) { | |
236 | return r; | |
237 | } | |
238 | return len; | |
239 | } | |
240 | ||
c07f9fc5 FG |
241 | template <typename I> |
242 | ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len, | |
243 | bufferlist &&cmp_bl, | |
244 | bufferlist &&bl, | |
245 | uint64_t *mismatch_off, | |
246 | int op_flags){ | |
247 | CephContext *cct = m_image_ctx.cct; | |
248 | ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off=" | |
249 | << off << ", " << "len = " << len << dendl; | |
250 | ||
9f95a23c | 251 | m_image_ctx.image_lock.lock_shared(); |
c07f9fc5 | 252 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); |
9f95a23c | 253 | m_image_ctx.image_lock.unlock_shared(); |
c07f9fc5 FG |
254 | if (r < 0) { |
255 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
256 | return r; | |
257 | } | |
258 | ||
259 | C_SaferCond cond; | |
260 | AioCompletion *c = AioCompletion::create(&cond); | |
261 | aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl), | |
262 | mismatch_off, op_flags, false); | |
263 | ||
264 | r = cond.wait(); | |
265 | if (r < 0) { | |
266 | return r; | |
267 | } | |
268 | ||
269 | return len; | |
270 | } | |
271 | ||
11fdf7f2 TL |
272 | template <typename I> |
273 | int ImageRequestWQ<I>::flush() { | |
274 | CephContext *cct = m_image_ctx.cct; | |
275 | ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; | |
276 | ||
277 | C_SaferCond cond; | |
278 | AioCompletion *c = AioCompletion::create(&cond); | |
279 | aio_flush(c, false); | |
280 | ||
281 | int r = cond.wait(); | |
282 | if (r < 0) { | |
283 | return r; | |
284 | } | |
285 | ||
286 | return 0; | |
287 | } | |
288 | ||
224ce89b WB |
289 | template <typename I> |
290 | void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len, | |
291 | ReadResult &&read_result, int op_flags, | |
292 | bool native_async) { | |
7c673cae | 293 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 294 | FUNCTRACE(cct); |
31f18b77 | 295 | ZTracer::Trace trace; |
181888fb | 296 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
297 | trace.init("wq: read", &m_image_ctx.trace_endpoint); |
298 | trace.event("start"); | |
299 | } | |
300 | ||
224ce89b | 301 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ); |
7c673cae FG |
302 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
303 | << "completion=" << c << ", off=" << off << ", " | |
304 | << "len=" << len << ", " << "flags=" << op_flags << dendl; | |
305 | ||
306 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
307 | c->set_event_notify(true); | |
308 | } | |
309 | ||
224ce89b | 310 | if (!start_in_flight_io(c)) { |
7c673cae FG |
311 | return; |
312 | } | |
313 | ||
7c673cae FG |
314 | // if journaling is enabled -- we need to replay the journal because |
315 | // it might contain an uncommitted write | |
9f95a23c | 316 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; |
7c673cae | 317 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() || |
224ce89b | 318 | require_lock_on_read()) { |
11fdf7f2 | 319 | queue(ImageDispatchSpec<I>::create_read_request( |
224ce89b WB |
320 | m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags, |
321 | trace)); | |
7c673cae FG |
322 | } else { |
323 | c->start_op(); | |
224ce89b WB |
324 | ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}}, |
325 | std::move(read_result), op_flags, trace); | |
326 | finish_in_flight_io(); | |
7c673cae | 327 | } |
31f18b77 | 328 | trace.event("finish"); |
7c673cae FG |
329 | } |
330 | ||
224ce89b WB |
331 | template <typename I> |
332 | void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len, | |
333 | bufferlist &&bl, int op_flags, | |
334 | bool native_async) { | |
7c673cae | 335 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 336 | FUNCTRACE(cct); |
31f18b77 | 337 | ZTracer::Trace trace; |
181888fb | 338 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
339 | trace.init("wq: write", &m_image_ctx.trace_endpoint); |
340 | trace.event("init"); | |
341 | } | |
342 | ||
224ce89b | 343 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE); |
7c673cae FG |
344 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
345 | << "completion=" << c << ", off=" << off << ", " | |
346 | << "len=" << len << ", flags=" << op_flags << dendl; | |
347 | ||
348 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
349 | c->set_event_notify(true); | |
350 | } | |
351 | ||
224ce89b | 352 | if (!start_in_flight_io(c)) { |
7c673cae FG |
353 | return; |
354 | } | |
355 | ||
9f95a23c TL |
356 | auto tid = ++m_last_tid; |
357 | ||
358 | { | |
359 | std::lock_guard locker{m_lock}; | |
360 | m_queued_or_blocked_io_tids.insert(tid); | |
361 | } | |
362 | ||
363 | ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_request( | |
364 | m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace, tid); | |
365 | ||
366 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
7c673cae | 367 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { |
9f95a23c | 368 | queue(req); |
7c673cae | 369 | } else { |
9f95a23c | 370 | process_io(req, false); |
224ce89b | 371 | finish_in_flight_io(); |
7c673cae | 372 | } |
31f18b77 | 373 | trace.event("finish"); |
7c673cae FG |
374 | } |
375 | ||
224ce89b WB |
376 | template <typename I> |
377 | void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off, | |
11fdf7f2 TL |
378 | uint64_t len, |
379 | uint32_t discard_granularity_bytes, | |
224ce89b | 380 | bool native_async) { |
7c673cae | 381 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 382 | FUNCTRACE(cct); |
31f18b77 | 383 | ZTracer::Trace trace; |
181888fb | 384 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
385 | trace.init("wq: discard", &m_image_ctx.trace_endpoint); |
386 | trace.event("init"); | |
387 | } | |
388 | ||
224ce89b | 389 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD); |
7c673cae FG |
390 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
391 | << "completion=" << c << ", off=" << off << ", len=" << len | |
392 | << dendl; | |
393 | ||
394 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
395 | c->set_event_notify(true); | |
396 | } | |
397 | ||
224ce89b | 398 | if (!start_in_flight_io(c)) { |
7c673cae FG |
399 | return; |
400 | } | |
401 | ||
9f95a23c TL |
402 | auto tid = ++m_last_tid; |
403 | ||
404 | { | |
405 | std::lock_guard locker{m_lock}; | |
406 | m_queued_or_blocked_io_tids.insert(tid); | |
407 | } | |
408 | ||
409 | ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_discard_request( | |
410 | m_image_ctx, c, off, len, discard_granularity_bytes, trace, tid); | |
411 | ||
412 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
7c673cae | 413 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { |
9f95a23c | 414 | queue(req); |
7c673cae | 415 | } else { |
9f95a23c | 416 | process_io(req, false); |
224ce89b | 417 | finish_in_flight_io(); |
7c673cae | 418 | } |
31f18b77 | 419 | trace.event("finish"); |
7c673cae FG |
420 | } |
421 | ||
224ce89b WB |
422 | template <typename I> |
423 | void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) { | |
7c673cae | 424 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 425 | FUNCTRACE(cct); |
31f18b77 | 426 | ZTracer::Trace trace; |
181888fb | 427 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
428 | trace.init("wq: flush", &m_image_ctx.trace_endpoint); |
429 | trace.event("init"); | |
430 | } | |
431 | ||
224ce89b | 432 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH); |
7c673cae FG |
433 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
434 | << "completion=" << c << dendl; | |
435 | ||
436 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
437 | c->set_event_notify(true); | |
438 | } | |
439 | ||
224ce89b | 440 | if (!start_in_flight_io(c)) { |
7c673cae FG |
441 | return; |
442 | } | |
443 | ||
9f95a23c TL |
444 | auto tid = ++m_last_tid; |
445 | ||
446 | ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_flush_request( | |
447 | m_image_ctx, c, FLUSH_SOURCE_USER, trace); | |
448 | ||
449 | { | |
450 | std::lock_guard locker{m_lock}; | |
451 | if(!m_queued_or_blocked_io_tids.empty()) { | |
452 | ldout(cct, 20) << "queueing flush, tid: " << tid << dendl; | |
453 | m_queued_flushes.emplace(tid, req); | |
454 | --m_in_flight_ios; | |
455 | return; | |
456 | } | |
457 | } | |
458 | ||
459 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
7c673cae | 460 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { |
9f95a23c | 461 | queue(req); |
7c673cae | 462 | } else { |
9f95a23c | 463 | process_io(req, false); |
224ce89b | 464 | finish_in_flight_io(); |
7c673cae | 465 | } |
31f18b77 | 466 | trace.event("finish"); |
7c673cae FG |
467 | } |
468 | ||
224ce89b WB |
469 | template <typename I> |
470 | void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off, | |
471 | uint64_t len, bufferlist &&bl, | |
472 | int op_flags, bool native_async) { | |
7c673cae | 473 | CephContext *cct = m_image_ctx.cct; |
11fdf7f2 | 474 | FUNCTRACE(cct); |
31f18b77 | 475 | ZTracer::Trace trace; |
181888fb | 476 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
477 | trace.init("wq: writesame", &m_image_ctx.trace_endpoint); |
478 | trace.event("init"); | |
479 | } | |
480 | ||
224ce89b | 481 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME); |
7c673cae FG |
482 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
483 | << "completion=" << c << ", off=" << off << ", " | |
484 | << "len=" << len << ", data_len = " << bl.length() << ", " | |
485 | << "flags=" << op_flags << dendl; | |
486 | ||
487 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
488 | c->set_event_notify(true); | |
489 | } | |
490 | ||
224ce89b | 491 | if (!start_in_flight_io(c)) { |
7c673cae FG |
492 | return; |
493 | } | |
494 | ||
9f95a23c TL |
495 | auto tid = ++m_last_tid; |
496 | ||
497 | { | |
498 | std::lock_guard locker{m_lock}; | |
499 | m_queued_or_blocked_io_tids.insert(tid); | |
500 | } | |
501 | ||
502 | ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_write_same_request( | |
503 | m_image_ctx, c, off, len, std::move(bl), op_flags, trace, tid); | |
504 | ||
505 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
f6b5b4d7 TL |
506 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { |
507 | queue(req); | |
508 | } else { | |
509 | process_io(req, false); | |
510 | finish_in_flight_io(); | |
511 | } | |
512 | trace.event("finish"); | |
513 | } | |
514 | ||
515 | ||
516 | template <typename I> | |
517 | void ImageRequestWQ<I>::aio_write_zeroes(io::AioCompletion *aio_comp, | |
518 | uint64_t off, uint64_t len, | |
519 | int zero_flags, int op_flags, | |
520 | bool native_async) { | |
521 | auto cct = m_image_ctx.cct; | |
522 | FUNCTRACE(cct); | |
523 | ZTracer::Trace trace; | |
524 | if (m_image_ctx.blkin_trace_all) { | |
525 | trace.init("io: write_zeroes", &m_image_ctx.trace_endpoint); | |
526 | trace.event("init"); | |
527 | } | |
528 | ||
529 | aio_comp->init_time(util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_DISCARD); | |
530 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
531 | << "completion=" << aio_comp << ", off=" << off << ", " | |
532 | << "len=" << len << dendl; | |
533 | ||
534 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
535 | aio_comp->set_event_notify(true); | |
536 | } | |
537 | ||
538 | // validate the supported flags | |
539 | if (zero_flags != 0U) { | |
540 | aio_comp->fail(-EINVAL); | |
541 | return; | |
542 | } | |
543 | ||
544 | if (!start_in_flight_io(aio_comp)) { | |
545 | return; | |
546 | } | |
547 | ||
548 | // enable partial discard (zeroing) of objects | |
549 | uint32_t discard_granularity_bytes = 0; | |
550 | ||
551 | auto tid = ++m_last_tid; | |
552 | ||
553 | { | |
554 | std::lock_guard locker{m_lock}; | |
555 | m_queued_or_blocked_io_tids.insert(tid); | |
556 | } | |
557 | ||
558 | auto req = ImageDispatchSpec<I>::create_discard_request( | |
559 | m_image_ctx, aio_comp, off, len, discard_granularity_bytes, trace, tid); | |
560 | ||
561 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
7c673cae | 562 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { |
9f95a23c | 563 | queue(req); |
7c673cae | 564 | } else { |
9f95a23c | 565 | process_io(req, false); |
224ce89b | 566 | finish_in_flight_io(); |
7c673cae | 567 | } |
31f18b77 | 568 | trace.event("finish"); |
7c673cae FG |
569 | } |
570 | ||
c07f9fc5 FG |
571 | template <typename I> |
572 | void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c, | |
573 | uint64_t off, uint64_t len, | |
574 | bufferlist &&cmp_bl, | |
575 | bufferlist &&bl, | |
576 | uint64_t *mismatch_off, | |
577 | int op_flags, bool native_async) { | |
578 | CephContext *cct = m_image_ctx.cct; | |
11fdf7f2 | 579 | FUNCTRACE(cct); |
c07f9fc5 | 580 | ZTracer::Trace trace; |
181888fb | 581 | if (m_image_ctx.blkin_trace_all) { |
c07f9fc5 FG |
582 | trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint); |
583 | trace.event("init"); | |
584 | } | |
585 | ||
586 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE); | |
587 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
588 | << "completion=" << c << ", off=" << off << ", " | |
589 | << "len=" << len << dendl; | |
590 | ||
591 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
592 | c->set_event_notify(true); | |
593 | } | |
594 | ||
595 | if (!start_in_flight_io(c)) { | |
596 | return; | |
597 | } | |
598 | ||
9f95a23c TL |
599 | auto tid = ++m_last_tid; |
600 | ||
601 | { | |
602 | std::lock_guard locker{m_lock}; | |
603 | m_queued_or_blocked_io_tids.insert(tid); | |
604 | } | |
605 | ||
606 | ImageDispatchSpec<I> *req = ImageDispatchSpec<I>::create_compare_and_write_request( | |
c07f9fc5 | 607 | m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl), |
9f95a23c TL |
608 | mismatch_off, op_flags, trace, tid); |
609 | ||
610 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
611 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
612 | queue(req); | |
c07f9fc5 | 613 | } else { |
9f95a23c | 614 | process_io(req, false); |
c07f9fc5 FG |
615 | finish_in_flight_io(); |
616 | } | |
617 | trace.event("finish"); | |
618 | } | |
619 | ||
9f95a23c TL |
620 | template <typename I> |
621 | bool ImageRequestWQ<I>::block_overlapping_io( | |
622 | ImageExtentIntervals* in_flight_image_extents, uint64_t off, uint64_t len) { | |
623 | CephContext *cct = m_image_ctx.cct; | |
624 | ldout(cct, 20) << "ictx=" << &m_image_ctx | |
625 | << "off: " << off << " len: " << len <<dendl; | |
626 | ||
627 | if(len == 0) { | |
628 | return false; | |
629 | } | |
630 | ||
631 | if (in_flight_image_extents->empty() || | |
632 | !in_flight_image_extents->intersects(off, len)) { | |
633 | in_flight_image_extents->insert(off, len); | |
634 | return false; | |
635 | } | |
636 | ||
637 | return true; | |
638 | } | |
639 | ||
640 | template <typename I> | |
641 | void ImageRequestWQ<I>::unblock_overlapping_io(uint64_t offset, uint64_t length, | |
642 | uint64_t tid) { | |
643 | CephContext *cct = m_image_ctx.cct; | |
644 | ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; | |
645 | ||
646 | remove_in_flight_write_ios(offset, length, true, tid); | |
647 | ||
648 | std::unique_lock locker{m_lock}; | |
649 | if (!m_blocked_ios.empty()) { | |
650 | auto it = m_blocked_ios.begin(); | |
651 | while (it != m_blocked_ios.end()) { | |
652 | auto blocked_io = *it; | |
653 | ||
654 | const auto& extents = blocked_io->get_image_extents(); | |
655 | uint64_t off = extents.size() ? extents.front().first : 0; | |
656 | uint64_t len = extents.size() ? extents.front().second : 0; | |
657 | ||
658 | if (block_overlapping_io(&m_in_flight_extents, off, len)) { | |
659 | break; | |
660 | } | |
661 | ldout(cct, 20) << "unblocking off: " << off << ", " | |
662 | << "len: " << len << dendl; | |
663 | AioCompletion *aio_comp = blocked_io->get_aio_completion(); | |
664 | ||
665 | m_blocked_ios.erase(it); | |
666 | locker.unlock(); | |
667 | queue_unblocked_io(aio_comp, blocked_io); | |
668 | locker.lock(); | |
669 | } | |
670 | } | |
671 | } | |
672 | ||
673 | template <typename I> | |
674 | void ImageRequestWQ<I>::unblock_flushes() { | |
675 | CephContext *cct = m_image_ctx.cct; | |
676 | ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; | |
677 | std::unique_lock locker{m_lock}; | |
678 | auto io_tid_it = m_queued_or_blocked_io_tids.begin(); | |
679 | while (true) { | |
680 | auto it = m_queued_flushes.begin(); | |
681 | if (it == m_queued_flushes.end() || | |
682 | (io_tid_it != m_queued_or_blocked_io_tids.end() && | |
683 | *io_tid_it < it->first)) { | |
684 | break; | |
685 | } | |
686 | ||
687 | auto blocked_flush = *it; | |
688 | ldout(cct, 20) << "unblocking flush: tid " << blocked_flush.first << dendl; | |
689 | ||
690 | AioCompletion *aio_comp = blocked_flush.second->get_aio_completion(); | |
691 | ||
692 | m_queued_flushes.erase(it); | |
693 | locker.unlock(); | |
694 | queue_unblocked_io(aio_comp, blocked_flush.second); | |
695 | locker.lock(); | |
696 | } | |
697 | } | |
698 | ||
699 | template <typename I> | |
700 | void ImageRequestWQ<I>::queue_unblocked_io(AioCompletion *comp, | |
701 | ImageDispatchSpec<I> *req) { | |
702 | if (!start_in_flight_io(comp)) { | |
703 | return; | |
704 | } | |
705 | ||
706 | std::shared_lock owner_locker{m_image_ctx.owner_lock}; | |
707 | queue(req); | |
708 | } | |
709 | ||
224ce89b WB |
710 | template <typename I> |
711 | void ImageRequestWQ<I>::shut_down(Context *on_shutdown) { | |
9f95a23c | 712 | ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); |
7c673cae FG |
713 | |
714 | { | |
9f95a23c | 715 | std::unique_lock locker{m_lock}; |
11fdf7f2 | 716 | ceph_assert(!m_shutdown); |
7c673cae FG |
717 | m_shutdown = true; |
718 | ||
719 | CephContext *cct = m_image_ctx.cct; | |
224ce89b | 720 | ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load() |
7c673cae | 721 | << dendl; |
224ce89b | 722 | if (m_in_flight_ios > 0) { |
7c673cae FG |
723 | m_on_shutdown = on_shutdown; |
724 | return; | |
725 | } | |
726 | } | |
727 | ||
728 | // ensure that all in-flight IO is flushed | |
11fdf7f2 | 729 | flush_image(m_image_ctx, on_shutdown); |
7c673cae FG |
730 | } |
731 | ||
224ce89b WB |
732 | template <typename I> |
733 | int ImageRequestWQ<I>::block_writes() { | |
7c673cae FG |
734 | C_SaferCond cond_ctx; |
735 | block_writes(&cond_ctx); | |
736 | return cond_ctx.wait(); | |
737 | } | |
738 | ||
224ce89b WB |
739 | template <typename I> |
740 | void ImageRequestWQ<I>::block_writes(Context *on_blocked) { | |
9f95a23c | 741 | ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); |
7c673cae FG |
742 | CephContext *cct = m_image_ctx.cct; |
743 | ||
744 | { | |
9f95a23c | 745 | std::unique_lock locker{m_lock}; |
7c673cae FG |
746 | ++m_write_blockers; |
747 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
748 | << m_write_blockers << dendl; | |
224ce89b | 749 | if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) { |
7c673cae FG |
750 | m_write_blocker_contexts.push_back(on_blocked); |
751 | return; | |
752 | } | |
753 | } | |
754 | ||
755 | // ensure that all in-flight IO is flushed | |
11fdf7f2 | 756 | flush_image(m_image_ctx, on_blocked); |
7c673cae FG |
757 | } |
758 | ||
224ce89b WB |
759 | template <typename I> |
760 | void ImageRequestWQ<I>::unblock_writes() { | |
7c673cae FG |
761 | CephContext *cct = m_image_ctx.cct; |
762 | ||
763 | bool wake_up = false; | |
11fdf7f2 | 764 | Contexts waiter_contexts; |
7c673cae | 765 | { |
9f95a23c | 766 | std::unique_lock locker{m_lock}; |
11fdf7f2 | 767 | ceph_assert(m_write_blockers > 0); |
7c673cae FG |
768 | --m_write_blockers; |
769 | ||
770 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
771 | << m_write_blockers << dendl; | |
772 | if (m_write_blockers == 0) { | |
773 | wake_up = true; | |
11fdf7f2 | 774 | std::swap(waiter_contexts, m_unblocked_write_waiter_contexts); |
7c673cae FG |
775 | } |
776 | } | |
777 | ||
778 | if (wake_up) { | |
11fdf7f2 TL |
779 | for (auto ctx : waiter_contexts) { |
780 | ctx->complete(0); | |
781 | } | |
224ce89b | 782 | this->signal(); |
7c673cae FG |
783 | } |
784 | } | |
785 | ||
11fdf7f2 TL |
786 | template <typename I> |
787 | void ImageRequestWQ<I>::wait_on_writes_unblocked(Context *on_unblocked) { | |
9f95a23c | 788 | ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); |
11fdf7f2 TL |
789 | CephContext *cct = m_image_ctx.cct; |
790 | ||
791 | { | |
9f95a23c | 792 | std::unique_lock locker{m_lock}; |
11fdf7f2 TL |
793 | ldout(cct, 20) << &m_image_ctx << ", " << "write_blockers=" |
794 | << m_write_blockers << dendl; | |
795 | if (!m_unblocked_write_waiter_contexts.empty() || m_write_blockers > 0) { | |
796 | m_unblocked_write_waiter_contexts.push_back(on_unblocked); | |
797 | return; | |
798 | } | |
799 | } | |
800 | ||
801 | on_unblocked->complete(0); | |
802 | } | |
803 | ||
224ce89b WB |
804 | template <typename I> |
805 | void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) { | |
7c673cae FG |
806 | CephContext *cct = m_image_ctx.cct; |
807 | ldout(cct, 20) << dendl; | |
808 | ||
224ce89b | 809 | bool wake_up = false; |
7c673cae | 810 | { |
9f95a23c | 811 | std::unique_lock locker{m_lock}; |
224ce89b WB |
812 | switch (direction) { |
813 | case DIRECTION_READ: | |
814 | wake_up = (enabled != m_require_lock_on_read); | |
815 | m_require_lock_on_read = enabled; | |
816 | break; | |
817 | case DIRECTION_WRITE: | |
818 | wake_up = (enabled != m_require_lock_on_write); | |
819 | m_require_lock_on_write = enabled; | |
820 | break; | |
821 | case DIRECTION_BOTH: | |
822 | wake_up = (enabled != m_require_lock_on_read || | |
823 | enabled != m_require_lock_on_write); | |
824 | m_require_lock_on_read = enabled; | |
825 | m_require_lock_on_write = enabled; | |
826 | break; | |
7c673cae | 827 | } |
224ce89b | 828 | } |
7c673cae | 829 | |
224ce89b WB |
830 | // wake up the thread pool whenever the state changes so that |
831 | // we can re-request the lock if required | |
832 | if (wake_up) { | |
833 | this->signal(); | |
7c673cae | 834 | } |
7c673cae FG |
835 | } |
836 | ||
11fdf7f2 TL |
837 | template <typename I> |
838 | void ImageRequestWQ<I>::apply_qos_schedule_tick_min(uint64_t tick){ | |
839 | for (auto pair : m_throttles) { | |
840 | pair.second->set_schedule_tick_min(tick); | |
841 | } | |
842 | } | |
843 | ||
844 | template <typename I> | |
845 | void ImageRequestWQ<I>::apply_qos_limit(const uint64_t flag, | |
846 | uint64_t limit, | |
847 | uint64_t burst) { | |
848 | CephContext *cct = m_image_ctx.cct; | |
849 | TokenBucketThrottle *throttle = nullptr; | |
850 | for (auto pair : m_throttles) { | |
851 | if (flag == pair.first) { | |
852 | throttle = pair.second; | |
853 | break; | |
854 | } | |
855 | } | |
856 | ceph_assert(throttle != nullptr); | |
857 | ||
858 | int r = throttle->set_limit(limit, burst); | |
859 | if (r < 0) { | |
860 | lderr(cct) << throttle->get_name() << ": invalid qos parameter: " | |
861 | << "burst(" << burst << ") is less than " | |
862 | << "limit(" << limit << ")" << dendl; | |
863 | // if apply failed, we should at least make sure the limit works. | |
864 | throttle->set_limit(limit, 0); | |
865 | } | |
866 | ||
867 | if (limit) | |
868 | m_qos_enabled_flag |= flag; | |
869 | else | |
870 | m_qos_enabled_flag &= ~flag; | |
871 | } | |
872 | ||
873 | template <typename I> | |
874 | void ImageRequestWQ<I>::handle_throttle_ready(int r, ImageDispatchSpec<I> *item, uint64_t flag) { | |
875 | CephContext *cct = m_image_ctx.cct; | |
876 | ldout(cct, 15) << "r=" << r << ", " << "req=" << item << dendl; | |
877 | ||
878 | ceph_assert(m_io_throttled.load() > 0); | |
879 | item->set_throttled(flag); | |
880 | if (item->were_all_throttled()) { | |
81eedcae | 881 | this->requeue_back(item); |
11fdf7f2 TL |
882 | --m_io_throttled; |
883 | this->signal(); | |
884 | } | |
885 | } | |
886 | ||
887 | template <typename I> | |
888 | bool ImageRequestWQ<I>::needs_throttle(ImageDispatchSpec<I> *item) { | |
889 | uint64_t tokens = 0; | |
890 | uint64_t flag = 0; | |
891 | bool blocked = false; | |
892 | TokenBucketThrottle* throttle = nullptr; | |
893 | ||
894 | for (auto t : m_throttles) { | |
895 | flag = t.first; | |
896 | if (item->was_throttled(flag)) | |
897 | continue; | |
898 | ||
899 | if (!(m_qos_enabled_flag & flag)) { | |
900 | item->set_throttled(flag); | |
901 | continue; | |
902 | } | |
903 | ||
904 | throttle = t.second; | |
81eedcae TL |
905 | if (item->tokens_requested(flag, &tokens) && |
906 | throttle->get<ImageRequestWQ<I>, ImageDispatchSpec<I>, | |
11fdf7f2 TL |
907 | &ImageRequestWQ<I>::handle_throttle_ready>( |
908 | tokens, this, item, flag)) { | |
909 | blocked = true; | |
910 | } else { | |
911 | item->set_throttled(flag); | |
912 | } | |
913 | } | |
914 | return blocked; | |
915 | } | |
916 | ||
224ce89b WB |
917 | template <typename I> |
918 | void *ImageRequestWQ<I>::_void_dequeue() { | |
919 | CephContext *cct = m_image_ctx.cct; | |
11fdf7f2 | 920 | ImageDispatchSpec<I> *peek_item = this->front(); |
7c673cae | 921 | |
224ce89b WB |
922 | // no queued IO requests or all IO is blocked/stalled |
923 | if (peek_item == nullptr || m_io_blockers.load() > 0) { | |
7c673cae FG |
924 | return nullptr; |
925 | } | |
926 | ||
11fdf7f2 TL |
927 | if (needs_throttle(peek_item)) { |
928 | ldout(cct, 15) << "throttling IO " << peek_item << dendl; | |
929 | ||
930 | ++m_io_throttled; | |
931 | // dequeue the throttled item | |
932 | ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue(); | |
933 | return nullptr; | |
934 | } | |
935 | ||
224ce89b | 936 | bool lock_required; |
7c673cae FG |
937 | bool refresh_required = m_image_ctx.state->is_refresh_required(); |
938 | { | |
9f95a23c | 939 | std::shared_lock locker{m_lock}; |
224ce89b WB |
940 | bool write_op = peek_item->is_write_op(); |
941 | lock_required = is_lock_required(write_op); | |
942 | if (write_op) { | |
943 | if (!lock_required && m_write_blockers > 0) { | |
944 | // missing lock is not the write blocker | |
7c673cae FG |
945 | return nullptr; |
946 | } | |
947 | ||
9f95a23c | 948 | if (!lock_required && !refresh_required && !peek_item->blocked) { |
224ce89b WB |
949 | // completed ops will requeue the IO -- don't count it as in-progress |
950 | m_in_flight_writes++; | |
7c673cae | 951 | } |
7c673cae FG |
952 | } |
953 | } | |
954 | ||
11fdf7f2 TL |
955 | auto item = reinterpret_cast<ImageDispatchSpec<I> *>( |
956 | ThreadPool::PointerWQ<ImageDispatchSpec<I> >::_void_dequeue()); | |
957 | ceph_assert(peek_item == item); | |
7c673cae | 958 | |
224ce89b | 959 | if (lock_required) { |
11fdf7f2 | 960 | this->get_pool_lock().unlock(); |
9f95a23c | 961 | m_image_ctx.owner_lock.lock_shared(); |
224ce89b WB |
962 | if (m_image_ctx.exclusive_lock != nullptr) { |
963 | ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl; | |
964 | if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) { | |
965 | lderr(cct) << "op requires exclusive lock" << dendl; | |
91327a77 AA |
966 | fail_in_flight_io(m_image_ctx.exclusive_lock->get_unlocked_op_error(), |
967 | item); | |
224ce89b WB |
968 | |
969 | // wake up the IO since we won't be returning a request to process | |
970 | this->signal(); | |
971 | } else { | |
972 | // stall IO until the acquire completes | |
973 | ++m_io_blockers; | |
9f95a23c TL |
974 | Context *ctx = new C_AcquireLock(this, item); |
975 | ctx = create_context_callback< | |
976 | Context, &Context::complete>( | |
977 | ctx, m_image_ctx.exclusive_lock); | |
978 | m_image_ctx.exclusive_lock->acquire_lock(ctx); | |
224ce89b WB |
979 | } |
980 | } else { | |
981 | // raced with the exclusive lock being disabled | |
982 | lock_required = false; | |
983 | } | |
9f95a23c | 984 | m_image_ctx.owner_lock.unlock_shared(); |
11fdf7f2 | 985 | this->get_pool_lock().lock(); |
224ce89b WB |
986 | |
987 | if (lock_required) { | |
988 | return nullptr; | |
989 | } | |
990 | } | |
991 | ||
7c673cae | 992 | if (refresh_required) { |
224ce89b | 993 | ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl; |
7c673cae FG |
994 | |
995 | // stall IO until the refresh completes | |
224ce89b | 996 | ++m_io_blockers; |
7c673cae | 997 | |
11fdf7f2 | 998 | this->get_pool_lock().unlock(); |
7c673cae | 999 | m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); |
11fdf7f2 | 1000 | this->get_pool_lock().lock(); |
7c673cae FG |
1001 | return nullptr; |
1002 | } | |
1003 | ||
7c673cae FG |
1004 | return item; |
1005 | } | |
1006 | ||
224ce89b | 1007 | template <typename I> |
9f95a23c TL |
1008 | void ImageRequestWQ<I>::process_io(ImageDispatchSpec<I> *req, |
1009 | bool non_blocking_io) { | |
7c673cae FG |
1010 | CephContext *cct = m_image_ctx.cct; |
1011 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
1012 | << "req=" << req << dendl; | |
1013 | ||
9f95a23c TL |
1014 | //extents are invalidated after the request is sent |
1015 | //so gather them ahead of that | |
1016 | const auto& extents = req->get_image_extents(); | |
1017 | bool write_op = req->is_write_op(); | |
1018 | uint64_t tid = req->get_tid(); | |
1019 | uint64_t offset = extents.size() ? extents.front().first : 0; | |
1020 | uint64_t length = extents.size() ? extents.front().second : 0; | |
1021 | ||
1022 | if (write_op && !req->blocked) { | |
1023 | std::lock_guard locker{m_lock}; | |
1024 | bool blocked = block_overlapping_io(&m_in_flight_extents, offset, length); | |
1025 | if (blocked) { | |
1026 | ldout(cct, 20) << "blocking overlapping IO: " << "ictx=" | |
1027 | << &m_image_ctx << ", " | |
1028 | << "off=" << offset << ", len=" << length << dendl; | |
1029 | req->blocked = true; | |
1030 | m_blocked_ios.push_back(req); | |
1031 | return; | |
1032 | } | |
1033 | } | |
1034 | ||
1035 | req->start_op(); | |
7c673cae FG |
1036 | req->send(); |
1037 | ||
9f95a23c TL |
1038 | if (write_op) { |
1039 | if (non_blocking_io) { | |
1040 | finish_in_flight_write(); | |
1041 | } | |
1042 | unblock_overlapping_io(offset, length, tid); | |
1043 | unblock_flushes(); | |
7c673cae FG |
1044 | } |
1045 | delete req; | |
9f95a23c | 1046 | } |
7c673cae | 1047 | |
9f95a23c TL |
1048 | template <typename I> |
1049 | void ImageRequestWQ<I>::process(ImageDispatchSpec<I> *req) { | |
1050 | CephContext *cct = m_image_ctx.cct; | |
1051 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
1052 | << "req=" << req << dendl; | |
1053 | ||
1054 | bool write_op = req->is_write_op(); | |
1055 | ||
1056 | process_io(req, true); | |
1057 | ||
1058 | finish_queued_io(write_op); | |
224ce89b | 1059 | finish_in_flight_io(); |
7c673cae FG |
1060 | } |
1061 | ||
224ce89b | 1062 | template <typename I> |
9f95a23c TL |
1063 | void ImageRequestWQ<I>::remove_in_flight_write_ios(uint64_t offset, uint64_t length, |
1064 | bool write_op, uint64_t tid) { | |
1065 | CephContext *cct = m_image_ctx.cct; | |
1066 | ldout(cct, 20) << "ictx=" << &m_image_ctx << dendl; | |
1067 | { | |
1068 | std::lock_guard locker{m_lock}; | |
1069 | if (write_op) { | |
1070 | if (length > 0) { | |
1071 | if(!m_in_flight_extents.empty()) { | |
1072 | CephContext *cct = m_image_ctx.cct; | |
1073 | ldout(cct, 20) << "erasing in flight extents with tid:" | |
1074 | << tid << ", offset: " << offset << dendl; | |
1075 | ImageExtentIntervals extents; | |
1076 | extents.insert(offset, length); | |
1077 | ImageExtentIntervals intersect; | |
1078 | intersect.intersection_of(extents, m_in_flight_extents); | |
1079 | m_in_flight_extents.subtract(intersect); | |
1080 | } | |
1081 | } | |
1082 | m_queued_or_blocked_io_tids.erase(tid); | |
1083 | } | |
1084 | } | |
1085 | } | |
1086 | ||
1087 | template <typename I> | |
1088 | void ImageRequestWQ<I>::finish_queued_io(bool write_op) { | |
1089 | std::shared_lock locker{m_lock}; | |
1090 | if (write_op) { | |
11fdf7f2 | 1091 | ceph_assert(m_queued_writes > 0); |
7c673cae FG |
1092 | m_queued_writes--; |
1093 | } else { | |
11fdf7f2 | 1094 | ceph_assert(m_queued_reads > 0); |
7c673cae FG |
1095 | m_queued_reads--; |
1096 | } | |
1097 | } | |
1098 | ||
224ce89b WB |
1099 | template <typename I> |
1100 | void ImageRequestWQ<I>::finish_in_flight_write() { | |
7c673cae FG |
1101 | bool writes_blocked = false; |
1102 | { | |
9f95a23c | 1103 | std::shared_lock locker{m_lock}; |
11fdf7f2 | 1104 | ceph_assert(m_in_flight_writes > 0); |
224ce89b | 1105 | if (--m_in_flight_writes == 0 && |
7c673cae FG |
1106 | !m_write_blocker_contexts.empty()) { |
1107 | writes_blocked = true; | |
1108 | } | |
1109 | } | |
7c673cae | 1110 | if (writes_blocked) { |
11fdf7f2 | 1111 | flush_image(m_image_ctx, new C_BlockedWrites(this)); |
7c673cae FG |
1112 | } |
1113 | } | |
1114 | ||
224ce89b WB |
1115 | template <typename I> |
1116 | int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) { | |
9f95a23c | 1117 | std::shared_lock locker{m_lock}; |
7c673cae FG |
1118 | |
1119 | if (m_shutdown) { | |
1120 | CephContext *cct = m_image_ctx.cct; | |
1121 | lderr(cct) << "IO received on closed image" << dendl; | |
1122 | ||
1123 | c->fail(-ESHUTDOWN); | |
1124 | return false; | |
1125 | } | |
1126 | ||
eafe8130 TL |
1127 | if (!m_image_ctx.data_ctx.is_valid()) { |
1128 | CephContext *cct = m_image_ctx.cct; | |
1129 | lderr(cct) << "missing data pool" << dendl; | |
1130 | ||
eafe8130 TL |
1131 | c->fail(-ENODEV); |
1132 | return false; | |
1133 | } | |
1134 | ||
224ce89b | 1135 | m_in_flight_ios++; |
7c673cae FG |
1136 | return true; |
1137 | } | |
1138 | ||
224ce89b WB |
1139 | template <typename I> |
1140 | void ImageRequestWQ<I>::finish_in_flight_io() { | |
7c673cae FG |
1141 | Context *on_shutdown; |
1142 | { | |
9f95a23c | 1143 | std::shared_lock locker{m_lock}; |
224ce89b | 1144 | if (--m_in_flight_ios > 0 || !m_shutdown) { |
7c673cae FG |
1145 | return; |
1146 | } | |
1147 | on_shutdown = m_on_shutdown; | |
1148 | } | |
1149 | ||
1150 | CephContext *cct = m_image_ctx.cct; | |
1151 | ldout(cct, 5) << "completing shut down" << dendl; | |
1152 | ||
11fdf7f2 TL |
1153 | ceph_assert(on_shutdown != nullptr); |
1154 | flush_image(m_image_ctx, on_shutdown); | |
7c673cae FG |
1155 | } |
1156 | ||
224ce89b | 1157 | template <typename I> |
11fdf7f2 TL |
1158 | void ImageRequestWQ<I>::fail_in_flight_io( |
1159 | int r, ImageDispatchSpec<I> *req) { | |
224ce89b WB |
1160 | this->process_finish(); |
1161 | req->fail(r); | |
9f95a23c TL |
1162 | |
1163 | bool write_op = req->is_write_op(); | |
1164 | uint64_t tid = req->get_tid(); | |
1165 | const auto& extents = req->get_image_extents(); | |
1166 | uint64_t offset = extents.size() ? extents.front().first : 0; | |
1167 | uint64_t length = extents.size() ? extents.front().second : 0; | |
1168 | ||
1169 | finish_queued_io(write_op); | |
1170 | remove_in_flight_write_ios(offset, length, write_op, tid); | |
224ce89b WB |
1171 | delete req; |
1172 | finish_in_flight_io(); | |
1173 | } | |
7c673cae | 1174 | |
224ce89b WB |
1175 | template <typename I> |
1176 | bool ImageRequestWQ<I>::is_lock_required(bool write_op) const { | |
9f95a23c | 1177 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
224ce89b WB |
1178 | return ((write_op && m_require_lock_on_write) || |
1179 | (!write_op && m_require_lock_on_read)); | |
7c673cae FG |
1180 | } |
1181 | ||
224ce89b | 1182 | template <typename I> |
11fdf7f2 | 1183 | void ImageRequestWQ<I>::queue(ImageDispatchSpec<I> *req) { |
9f95a23c | 1184 | ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock)); |
224ce89b | 1185 | |
7c673cae FG |
1186 | CephContext *cct = m_image_ctx.cct; |
1187 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
1188 | << "req=" << req << dendl; | |
1189 | ||
224ce89b | 1190 | if (req->is_write_op()) { |
7c673cae FG |
1191 | m_queued_writes++; |
1192 | } else { | |
1193 | m_queued_reads++; | |
1194 | } | |
1195 | ||
11fdf7f2 | 1196 | ThreadPool::PointerWQ<ImageDispatchSpec<I> >::queue(req); |
224ce89b | 1197 | } |
7c673cae | 1198 | |
224ce89b | 1199 | template <typename I> |
11fdf7f2 TL |
1200 | void ImageRequestWQ<I>::handle_acquire_lock( |
1201 | int r, ImageDispatchSpec<I> *req) { | |
224ce89b WB |
1202 | CephContext *cct = m_image_ctx.cct; |
1203 | ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl; | |
1204 | ||
1205 | if (r < 0) { | |
1206 | fail_in_flight_io(r, req); | |
1207 | } else { | |
1208 | // since IO was stalled for acquire -- original IO order is preserved | |
1209 | // if we requeue this op for work queue processing | |
81eedcae | 1210 | this->requeue_front(req); |
7c673cae | 1211 | } |
224ce89b | 1212 | |
11fdf7f2 | 1213 | ceph_assert(m_io_blockers.load() > 0); |
224ce89b WB |
1214 | --m_io_blockers; |
1215 | this->signal(); | |
7c673cae FG |
1216 | } |
1217 | ||
224ce89b | 1218 | template <typename I> |
11fdf7f2 TL |
1219 | void ImageRequestWQ<I>::handle_refreshed( |
1220 | int r, ImageDispatchSpec<I> *req) { | |
7c673cae | 1221 | CephContext *cct = m_image_ctx.cct; |
224ce89b WB |
1222 | ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", " |
1223 | << "req=" << req << dendl; | |
7c673cae | 1224 | if (r < 0) { |
224ce89b | 1225 | fail_in_flight_io(r, req); |
7c673cae FG |
1226 | } else { |
1227 | // since IO was stalled for refresh -- original IO order is preserved | |
1228 | // if we requeue this op for work queue processing | |
81eedcae | 1229 | this->requeue_front(req); |
7c673cae FG |
1230 | } |
1231 | ||
11fdf7f2 | 1232 | ceph_assert(m_io_blockers.load() > 0); |
224ce89b WB |
1233 | --m_io_blockers; |
1234 | this->signal(); | |
7c673cae FG |
1235 | } |
1236 | ||
224ce89b WB |
1237 | template <typename I> |
1238 | void ImageRequestWQ<I>::handle_blocked_writes(int r) { | |
7c673cae FG |
1239 | Contexts contexts; |
1240 | { | |
9f95a23c | 1241 | std::unique_lock locker{m_lock}; |
7c673cae FG |
1242 | contexts.swap(m_write_blocker_contexts); |
1243 | } | |
1244 | ||
1245 | for (auto ctx : contexts) { | |
1246 | ctx->complete(0); | |
1247 | } | |
1248 | } | |
1249 | ||
224ce89b WB |
1250 | template class librbd::io::ImageRequestWQ<librbd::ImageCtx>; |
1251 | ||
7c673cae FG |
1252 | } // namespace io |
1253 | } // namespace librbd |