]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/io/ImageRequestWQ.h" | |
5 | #include "common/errno.h" | |
31f18b77 | 6 | #include "common/zipkin_trace.h" |
7c673cae FG |
7 | #include "librbd/ExclusiveLock.h" |
8 | #include "librbd/ImageCtx.h" | |
9 | #include "librbd/ImageState.h" | |
10 | #include "librbd/internal.h" | |
11 | #include "librbd/Utils.h" | |
12 | #include "librbd/exclusive_lock/Policy.h" | |
13 | #include "librbd/io/AioCompletion.h" | |
14 | #include "librbd/io/ImageRequest.h" | |
15 | ||
16 | #define dout_subsys ceph_subsys_rbd | |
17 | #undef dout_prefix | |
18 | #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \ | |
19 | << " " << __func__ << ": " | |
20 | ||
21 | namespace librbd { | |
22 | namespace io { | |
23 | ||
224ce89b WB |
24 | template <typename I> |
25 | struct ImageRequestWQ<I>::C_AcquireLock : public Context { | |
26 | ImageRequestWQ *work_queue; | |
27 | ImageRequest<I> *image_request; | |
28 | ||
29 | C_AcquireLock(ImageRequestWQ *work_queue, ImageRequest<I> *image_request) | |
30 | : work_queue(work_queue), image_request(image_request) { | |
31 | } | |
32 | ||
33 | void finish(int r) override { | |
34 | work_queue->handle_acquire_lock(r, image_request); | |
35 | } | |
36 | }; | |
37 | ||
38 | template <typename I> | |
39 | struct ImageRequestWQ<I>::C_BlockedWrites : public Context { | |
40 | ImageRequestWQ *work_queue; | |
41 | C_BlockedWrites(ImageRequestWQ *_work_queue) | |
42 | : work_queue(_work_queue) { | |
43 | } | |
44 | ||
45 | void finish(int r) override { | |
46 | work_queue->handle_blocked_writes(r); | |
47 | } | |
48 | }; | |
49 | ||
50 | template <typename I> | |
51 | struct ImageRequestWQ<I>::C_RefreshFinish : public Context { | |
52 | ImageRequestWQ *work_queue; | |
53 | ImageRequest<I> *image_request; | |
54 | ||
55 | C_RefreshFinish(ImageRequestWQ *work_queue, | |
56 | ImageRequest<I> *image_request) | |
57 | : work_queue(work_queue), image_request(image_request) { | |
58 | } | |
59 | void finish(int r) override { | |
60 | work_queue->handle_refreshed(r, image_request); | |
61 | } | |
62 | }; | |
63 | ||
64 | template <typename I> | |
65 | ImageRequestWQ<I>::ImageRequestWQ(I *image_ctx, const string &name, | |
66 | time_t ti, ThreadPool *tp) | |
67 | : ThreadPool::PointerWQ<ImageRequest<I> >(name, ti, 0, tp), | |
7c673cae | 68 | m_image_ctx(*image_ctx), |
224ce89b | 69 | m_lock(util::unique_lock_name("ImageRequestWQ<I>::m_lock", this)) { |
7c673cae FG |
70 | CephContext *cct = m_image_ctx.cct; |
71 | ldout(cct, 5) << "ictx=" << image_ctx << dendl; | |
224ce89b | 72 | this->register_work_queue(); |
7c673cae FG |
73 | } |
74 | ||
224ce89b WB |
75 | template <typename I> |
76 | ssize_t ImageRequestWQ<I>::read(uint64_t off, uint64_t len, | |
77 | ReadResult &&read_result, int op_flags) { | |
7c673cae FG |
78 | CephContext *cct = m_image_ctx.cct; |
79 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
80 | << "len = " << len << dendl; | |
81 | ||
82 | C_SaferCond cond; | |
83 | AioCompletion *c = AioCompletion::create(&cond); | |
84 | aio_read(c, off, len, std::move(read_result), op_flags, false); | |
85 | return cond.wait(); | |
86 | } | |
87 | ||
224ce89b WB |
88 | template <typename I> |
89 | ssize_t ImageRequestWQ<I>::write(uint64_t off, uint64_t len, | |
90 | bufferlist &&bl, int op_flags) { | |
7c673cae FG |
91 | CephContext *cct = m_image_ctx.cct; |
92 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
93 | << "len = " << len << dendl; | |
94 | ||
95 | m_image_ctx.snap_lock.get_read(); | |
96 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
97 | m_image_ctx.snap_lock.put_read(); | |
98 | if (r < 0) { | |
99 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
100 | return r; | |
101 | } | |
102 | ||
103 | C_SaferCond cond; | |
104 | AioCompletion *c = AioCompletion::create(&cond); | |
105 | aio_write(c, off, len, std::move(bl), op_flags, false); | |
106 | ||
107 | r = cond.wait(); | |
108 | if (r < 0) { | |
109 | return r; | |
110 | } | |
111 | return len; | |
112 | } | |
113 | ||
224ce89b WB |
114 | template <typename I> |
115 | ssize_t ImageRequestWQ<I>::discard(uint64_t off, uint64_t len, | |
116 | bool skip_partial_discard) { | |
7c673cae FG |
117 | CephContext *cct = m_image_ctx.cct; |
118 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
119 | << "len = " << len << dendl; | |
120 | ||
121 | m_image_ctx.snap_lock.get_read(); | |
122 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
123 | m_image_ctx.snap_lock.put_read(); | |
124 | if (r < 0) { | |
125 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
126 | return r; | |
127 | } | |
128 | ||
129 | C_SaferCond cond; | |
130 | AioCompletion *c = AioCompletion::create(&cond); | |
131 | aio_discard(c, off, len, skip_partial_discard, false); | |
132 | ||
133 | r = cond.wait(); | |
134 | if (r < 0) { | |
135 | return r; | |
136 | } | |
137 | return len; | |
138 | } | |
139 | ||
224ce89b WB |
140 | template <typename I> |
141 | ssize_t ImageRequestWQ<I>::writesame(uint64_t off, uint64_t len, | |
142 | bufferlist &&bl, int op_flags) { | |
7c673cae FG |
143 | CephContext *cct = m_image_ctx.cct; |
144 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
145 | << "len = " << len << ", data_len " << bl.length() << dendl; | |
146 | ||
147 | m_image_ctx.snap_lock.get_read(); | |
148 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
149 | m_image_ctx.snap_lock.put_read(); | |
150 | if (r < 0) { | |
151 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
152 | return r; | |
153 | } | |
154 | ||
155 | C_SaferCond cond; | |
156 | AioCompletion *c = AioCompletion::create(&cond); | |
157 | aio_writesame(c, off, len, std::move(bl), op_flags, false); | |
158 | ||
159 | r = cond.wait(); | |
160 | if (r < 0) { | |
161 | return r; | |
162 | } | |
163 | return len; | |
164 | } | |
165 | ||
c07f9fc5 FG |
166 | template <typename I> |
167 | ssize_t ImageRequestWQ<I>::compare_and_write(uint64_t off, uint64_t len, | |
168 | bufferlist &&cmp_bl, | |
169 | bufferlist &&bl, | |
170 | uint64_t *mismatch_off, | |
171 | int op_flags){ | |
172 | CephContext *cct = m_image_ctx.cct; | |
173 | ldout(cct, 20) << "compare_and_write ictx=" << &m_image_ctx << ", off=" | |
174 | << off << ", " << "len = " << len << dendl; | |
175 | ||
176 | m_image_ctx.snap_lock.get_read(); | |
177 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
178 | m_image_ctx.snap_lock.put_read(); | |
179 | if (r < 0) { | |
180 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
181 | return r; | |
182 | } | |
183 | ||
184 | C_SaferCond cond; | |
185 | AioCompletion *c = AioCompletion::create(&cond); | |
186 | aio_compare_and_write(c, off, len, std::move(cmp_bl), std::move(bl), | |
187 | mismatch_off, op_flags, false); | |
188 | ||
189 | r = cond.wait(); | |
190 | if (r < 0) { | |
191 | return r; | |
192 | } | |
193 | ||
194 | return len; | |
195 | } | |
196 | ||
224ce89b WB |
197 | template <typename I> |
198 | void ImageRequestWQ<I>::aio_read(AioCompletion *c, uint64_t off, uint64_t len, | |
199 | ReadResult &&read_result, int op_flags, | |
200 | bool native_async) { | |
7c673cae | 201 | CephContext *cct = m_image_ctx.cct; |
31f18b77 | 202 | ZTracer::Trace trace; |
181888fb | 203 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
204 | trace.init("wq: read", &m_image_ctx.trace_endpoint); |
205 | trace.event("start"); | |
206 | } | |
207 | ||
224ce89b | 208 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_READ); |
7c673cae FG |
209 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
210 | << "completion=" << c << ", off=" << off << ", " | |
211 | << "len=" << len << ", " << "flags=" << op_flags << dendl; | |
212 | ||
213 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
214 | c->set_event_notify(true); | |
215 | } | |
216 | ||
224ce89b | 217 | if (!start_in_flight_io(c)) { |
7c673cae FG |
218 | return; |
219 | } | |
220 | ||
7c673cae FG |
221 | // if journaling is enabled -- we need to replay the journal because |
222 | // it might contain an uncommitted write | |
224ce89b | 223 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); |
7c673cae | 224 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() || |
224ce89b WB |
225 | require_lock_on_read()) { |
226 | queue(ImageRequest<I>::create_read_request( | |
227 | m_image_ctx, c, {{off, len}}, std::move(read_result), op_flags, | |
228 | trace)); | |
7c673cae FG |
229 | } else { |
230 | c->start_op(); | |
224ce89b WB |
231 | ImageRequest<I>::aio_read(&m_image_ctx, c, {{off, len}}, |
232 | std::move(read_result), op_flags, trace); | |
233 | finish_in_flight_io(); | |
7c673cae | 234 | } |
31f18b77 | 235 | trace.event("finish"); |
7c673cae FG |
236 | } |
237 | ||
224ce89b WB |
238 | template <typename I> |
239 | void ImageRequestWQ<I>::aio_write(AioCompletion *c, uint64_t off, uint64_t len, | |
240 | bufferlist &&bl, int op_flags, | |
241 | bool native_async) { | |
7c673cae | 242 | CephContext *cct = m_image_ctx.cct; |
31f18b77 | 243 | ZTracer::Trace trace; |
181888fb | 244 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
245 | trace.init("wq: write", &m_image_ctx.trace_endpoint); |
246 | trace.event("init"); | |
247 | } | |
248 | ||
224ce89b | 249 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITE); |
7c673cae FG |
250 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
251 | << "completion=" << c << ", off=" << off << ", " | |
252 | << "len=" << len << ", flags=" << op_flags << dendl; | |
253 | ||
254 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
255 | c->set_event_notify(true); | |
256 | } | |
257 | ||
224ce89b | 258 | if (!start_in_flight_io(c)) { |
7c673cae FG |
259 | return; |
260 | } | |
261 | ||
262 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
263 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
224ce89b WB |
264 | queue(ImageRequest<I>::create_write_request( |
265 | m_image_ctx, c, {{off, len}}, std::move(bl), op_flags, trace)); | |
7c673cae FG |
266 | } else { |
267 | c->start_op(); | |
224ce89b WB |
268 | ImageRequest<I>::aio_write(&m_image_ctx, c, {{off, len}}, |
269 | std::move(bl), op_flags, trace); | |
270 | finish_in_flight_io(); | |
7c673cae | 271 | } |
31f18b77 | 272 | trace.event("finish"); |
7c673cae FG |
273 | } |
274 | ||
224ce89b WB |
275 | template <typename I> |
276 | void ImageRequestWQ<I>::aio_discard(AioCompletion *c, uint64_t off, | |
277 | uint64_t len, bool skip_partial_discard, | |
278 | bool native_async) { | |
7c673cae | 279 | CephContext *cct = m_image_ctx.cct; |
31f18b77 | 280 | ZTracer::Trace trace; |
181888fb | 281 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
282 | trace.init("wq: discard", &m_image_ctx.trace_endpoint); |
283 | trace.event("init"); | |
284 | } | |
285 | ||
224ce89b | 286 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_DISCARD); |
7c673cae FG |
287 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
288 | << "completion=" << c << ", off=" << off << ", len=" << len | |
289 | << dendl; | |
290 | ||
291 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
292 | c->set_event_notify(true); | |
293 | } | |
294 | ||
224ce89b | 295 | if (!start_in_flight_io(c)) { |
7c673cae FG |
296 | return; |
297 | } | |
298 | ||
299 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
300 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
224ce89b WB |
301 | queue(ImageRequest<I>::create_discard_request( |
302 | m_image_ctx, c, off, len, skip_partial_discard, trace)); | |
7c673cae FG |
303 | } else { |
304 | c->start_op(); | |
224ce89b WB |
305 | ImageRequest<I>::aio_discard(&m_image_ctx, c, off, len, |
306 | skip_partial_discard, trace); | |
307 | finish_in_flight_io(); | |
7c673cae | 308 | } |
31f18b77 | 309 | trace.event("finish"); |
7c673cae FG |
310 | } |
311 | ||
224ce89b WB |
312 | template <typename I> |
313 | void ImageRequestWQ<I>::aio_flush(AioCompletion *c, bool native_async) { | |
7c673cae | 314 | CephContext *cct = m_image_ctx.cct; |
31f18b77 | 315 | ZTracer::Trace trace; |
181888fb | 316 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
317 | trace.init("wq: flush", &m_image_ctx.trace_endpoint); |
318 | trace.event("init"); | |
319 | } | |
320 | ||
224ce89b | 321 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_FLUSH); |
7c673cae FG |
322 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
323 | << "completion=" << c << dendl; | |
324 | ||
325 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
326 | c->set_event_notify(true); | |
327 | } | |
328 | ||
224ce89b | 329 | if (!start_in_flight_io(c)) { |
7c673cae FG |
330 | return; |
331 | } | |
332 | ||
333 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
334 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { | |
224ce89b | 335 | queue(ImageRequest<I>::create_flush_request(m_image_ctx, c, trace)); |
7c673cae | 336 | } else { |
224ce89b WB |
337 | ImageRequest<I>::aio_flush(&m_image_ctx, c, trace); |
338 | finish_in_flight_io(); | |
7c673cae | 339 | } |
31f18b77 | 340 | trace.event("finish"); |
7c673cae FG |
341 | } |
342 | ||
224ce89b WB |
343 | template <typename I> |
344 | void ImageRequestWQ<I>::aio_writesame(AioCompletion *c, uint64_t off, | |
345 | uint64_t len, bufferlist &&bl, | |
346 | int op_flags, bool native_async) { | |
7c673cae | 347 | CephContext *cct = m_image_ctx.cct; |
31f18b77 | 348 | ZTracer::Trace trace; |
181888fb | 349 | if (m_image_ctx.blkin_trace_all) { |
31f18b77 FG |
350 | trace.init("wq: writesame", &m_image_ctx.trace_endpoint); |
351 | trace.event("init"); | |
352 | } | |
353 | ||
224ce89b | 354 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_WRITESAME); |
7c673cae FG |
355 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
356 | << "completion=" << c << ", off=" << off << ", " | |
357 | << "len=" << len << ", data_len = " << bl.length() << ", " | |
358 | << "flags=" << op_flags << dendl; | |
359 | ||
360 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
361 | c->set_event_notify(true); | |
362 | } | |
363 | ||
224ce89b | 364 | if (!start_in_flight_io(c)) { |
7c673cae FG |
365 | return; |
366 | } | |
367 | ||
368 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
369 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
224ce89b WB |
370 | queue(ImageRequest<I>::create_writesame_request( |
371 | m_image_ctx, c, off, len, std::move(bl), op_flags, trace)); | |
7c673cae FG |
372 | } else { |
373 | c->start_op(); | |
224ce89b WB |
374 | ImageRequest<I>::aio_writesame(&m_image_ctx, c, off, len, std::move(bl), |
375 | op_flags, trace); | |
376 | finish_in_flight_io(); | |
7c673cae | 377 | } |
31f18b77 | 378 | trace.event("finish"); |
7c673cae FG |
379 | } |
380 | ||
c07f9fc5 FG |
381 | template <typename I> |
382 | void ImageRequestWQ<I>::aio_compare_and_write(AioCompletion *c, | |
383 | uint64_t off, uint64_t len, | |
384 | bufferlist &&cmp_bl, | |
385 | bufferlist &&bl, | |
386 | uint64_t *mismatch_off, | |
387 | int op_flags, bool native_async) { | |
388 | CephContext *cct = m_image_ctx.cct; | |
389 | ZTracer::Trace trace; | |
181888fb | 390 | if (m_image_ctx.blkin_trace_all) { |
c07f9fc5 FG |
391 | trace.init("wq: compare_and_write", &m_image_ctx.trace_endpoint); |
392 | trace.event("init"); | |
393 | } | |
394 | ||
395 | c->init_time(util::get_image_ctx(&m_image_ctx), AIO_TYPE_COMPARE_AND_WRITE); | |
396 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
397 | << "completion=" << c << ", off=" << off << ", " | |
398 | << "len=" << len << dendl; | |
399 | ||
400 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
401 | c->set_event_notify(true); | |
402 | } | |
403 | ||
404 | if (!start_in_flight_io(c)) { | |
405 | return; | |
406 | } | |
407 | ||
408 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
409 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
410 | queue(ImageRequest<I>::create_compare_and_write_request( | |
411 | m_image_ctx, c, {{off, len}}, std::move(cmp_bl), std::move(bl), | |
412 | mismatch_off, op_flags, trace)); | |
413 | } else { | |
414 | c->start_op(); | |
415 | ImageRequest<I>::aio_compare_and_write(&m_image_ctx, c, {{off, len}}, | |
416 | std::move(cmp_bl), std::move(bl), | |
417 | mismatch_off, op_flags, trace); | |
418 | finish_in_flight_io(); | |
419 | } | |
420 | trace.event("finish"); | |
421 | } | |
422 | ||
224ce89b WB |
423 | template <typename I> |
424 | void ImageRequestWQ<I>::shut_down(Context *on_shutdown) { | |
7c673cae FG |
425 | assert(m_image_ctx.owner_lock.is_locked()); |
426 | ||
427 | { | |
428 | RWLock::WLocker locker(m_lock); | |
429 | assert(!m_shutdown); | |
430 | m_shutdown = true; | |
431 | ||
432 | CephContext *cct = m_image_ctx.cct; | |
224ce89b | 433 | ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ios.load() |
7c673cae | 434 | << dendl; |
224ce89b | 435 | if (m_in_flight_ios > 0) { |
7c673cae FG |
436 | m_on_shutdown = on_shutdown; |
437 | return; | |
438 | } | |
439 | } | |
440 | ||
441 | // ensure that all in-flight IO is flushed | |
442 | m_image_ctx.flush(on_shutdown); | |
443 | } | |
444 | ||
224ce89b WB |
445 | template <typename I> |
446 | int ImageRequestWQ<I>::block_writes() { | |
7c673cae FG |
447 | C_SaferCond cond_ctx; |
448 | block_writes(&cond_ctx); | |
449 | return cond_ctx.wait(); | |
450 | } | |
451 | ||
224ce89b WB |
452 | template <typename I> |
453 | void ImageRequestWQ<I>::block_writes(Context *on_blocked) { | |
7c673cae FG |
454 | assert(m_image_ctx.owner_lock.is_locked()); |
455 | CephContext *cct = m_image_ctx.cct; | |
456 | ||
457 | { | |
458 | RWLock::WLocker locker(m_lock); | |
459 | ++m_write_blockers; | |
460 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
461 | << m_write_blockers << dendl; | |
224ce89b | 462 | if (!m_write_blocker_contexts.empty() || m_in_flight_writes > 0) { |
7c673cae FG |
463 | m_write_blocker_contexts.push_back(on_blocked); |
464 | return; | |
465 | } | |
466 | } | |
467 | ||
468 | // ensure that all in-flight IO is flushed | |
469 | m_image_ctx.flush(on_blocked); | |
470 | } | |
471 | ||
224ce89b WB |
472 | template <typename I> |
473 | void ImageRequestWQ<I>::unblock_writes() { | |
7c673cae FG |
474 | CephContext *cct = m_image_ctx.cct; |
475 | ||
476 | bool wake_up = false; | |
477 | { | |
478 | RWLock::WLocker locker(m_lock); | |
479 | assert(m_write_blockers > 0); | |
480 | --m_write_blockers; | |
481 | ||
482 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
483 | << m_write_blockers << dendl; | |
484 | if (m_write_blockers == 0) { | |
485 | wake_up = true; | |
486 | } | |
487 | } | |
488 | ||
489 | if (wake_up) { | |
224ce89b | 490 | this->signal(); |
7c673cae FG |
491 | } |
492 | } | |
493 | ||
224ce89b WB |
494 | template <typename I> |
495 | void ImageRequestWQ<I>::set_require_lock(Direction direction, bool enabled) { | |
7c673cae FG |
496 | CephContext *cct = m_image_ctx.cct; |
497 | ldout(cct, 20) << dendl; | |
498 | ||
224ce89b | 499 | bool wake_up = false; |
7c673cae FG |
500 | { |
501 | RWLock::WLocker locker(m_lock); | |
224ce89b WB |
502 | switch (direction) { |
503 | case DIRECTION_READ: | |
504 | wake_up = (enabled != m_require_lock_on_read); | |
505 | m_require_lock_on_read = enabled; | |
506 | break; | |
507 | case DIRECTION_WRITE: | |
508 | wake_up = (enabled != m_require_lock_on_write); | |
509 | m_require_lock_on_write = enabled; | |
510 | break; | |
511 | case DIRECTION_BOTH: | |
512 | wake_up = (enabled != m_require_lock_on_read || | |
513 | enabled != m_require_lock_on_write); | |
514 | m_require_lock_on_read = enabled; | |
515 | m_require_lock_on_write = enabled; | |
516 | break; | |
7c673cae | 517 | } |
224ce89b | 518 | } |
7c673cae | 519 | |
224ce89b WB |
520 | // wake up the thread pool whenever the state changes so that |
521 | // we can re-request the lock if required | |
522 | if (wake_up) { | |
523 | this->signal(); | |
7c673cae | 524 | } |
7c673cae FG |
525 | } |
526 | ||
224ce89b WB |
527 | template <typename I> |
528 | void *ImageRequestWQ<I>::_void_dequeue() { | |
529 | CephContext *cct = m_image_ctx.cct; | |
530 | ImageRequest<I> *peek_item = this->front(); | |
7c673cae | 531 | |
224ce89b WB |
532 | // no queued IO requests or all IO is blocked/stalled |
533 | if (peek_item == nullptr || m_io_blockers.load() > 0) { | |
7c673cae FG |
534 | return nullptr; |
535 | } | |
536 | ||
224ce89b | 537 | bool lock_required; |
7c673cae FG |
538 | bool refresh_required = m_image_ctx.state->is_refresh_required(); |
539 | { | |
540 | RWLock::RLocker locker(m_lock); | |
224ce89b WB |
541 | bool write_op = peek_item->is_write_op(); |
542 | lock_required = is_lock_required(write_op); | |
543 | if (write_op) { | |
544 | if (!lock_required && m_write_blockers > 0) { | |
545 | // missing lock is not the write blocker | |
7c673cae FG |
546 | return nullptr; |
547 | } | |
548 | ||
224ce89b WB |
549 | if (!lock_required && !refresh_required) { |
550 | // completed ops will requeue the IO -- don't count it as in-progress | |
551 | m_in_flight_writes++; | |
7c673cae | 552 | } |
7c673cae FG |
553 | } |
554 | } | |
555 | ||
224ce89b WB |
556 | ImageRequest<I> *item = reinterpret_cast<ImageRequest<I> *>( |
557 | ThreadPool::PointerWQ<ImageRequest<I> >::_void_dequeue()); | |
7c673cae FG |
558 | assert(peek_item == item); |
559 | ||
224ce89b WB |
560 | if (lock_required) { |
561 | this->get_pool_lock().Unlock(); | |
562 | m_image_ctx.owner_lock.get_read(); | |
563 | if (m_image_ctx.exclusive_lock != nullptr) { | |
564 | ldout(cct, 5) << "exclusive lock required: delaying IO " << item << dendl; | |
565 | if (!m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) { | |
566 | lderr(cct) << "op requires exclusive lock" << dendl; | |
567 | fail_in_flight_io(-EROFS, item); | |
568 | ||
569 | // wake up the IO since we won't be returning a request to process | |
570 | this->signal(); | |
571 | } else { | |
572 | // stall IO until the acquire completes | |
573 | ++m_io_blockers; | |
574 | m_image_ctx.exclusive_lock->acquire_lock(new C_AcquireLock(this, item)); | |
575 | } | |
576 | } else { | |
577 | // raced with the exclusive lock being disabled | |
578 | lock_required = false; | |
579 | } | |
580 | m_image_ctx.owner_lock.put_read(); | |
581 | this->get_pool_lock().Lock(); | |
582 | ||
583 | if (lock_required) { | |
584 | return nullptr; | |
585 | } | |
586 | } | |
587 | ||
7c673cae | 588 | if (refresh_required) { |
224ce89b | 589 | ldout(cct, 5) << "image refresh required: delaying IO " << item << dendl; |
7c673cae FG |
590 | |
591 | // stall IO until the refresh completes | |
224ce89b | 592 | ++m_io_blockers; |
7c673cae | 593 | |
224ce89b | 594 | this->get_pool_lock().Unlock(); |
7c673cae | 595 | m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); |
224ce89b | 596 | this->get_pool_lock().Lock(); |
7c673cae FG |
597 | return nullptr; |
598 | } | |
599 | ||
600 | item->start_op(); | |
601 | return item; | |
602 | } | |
603 | ||
224ce89b WB |
604 | template <typename I> |
605 | void ImageRequestWQ<I>::process(ImageRequest<I> *req) { | |
7c673cae FG |
606 | CephContext *cct = m_image_ctx.cct; |
607 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
608 | << "req=" << req << dendl; | |
609 | ||
610 | req->send(); | |
611 | ||
224ce89b | 612 | finish_queued_io(req); |
7c673cae | 613 | if (req->is_write_op()) { |
224ce89b | 614 | finish_in_flight_write(); |
7c673cae FG |
615 | } |
616 | delete req; | |
617 | ||
224ce89b | 618 | finish_in_flight_io(); |
7c673cae FG |
619 | } |
620 | ||
224ce89b WB |
621 | template <typename I> |
622 | void ImageRequestWQ<I>::finish_queued_io(ImageRequest<I> *req) { | |
7c673cae FG |
623 | RWLock::RLocker locker(m_lock); |
624 | if (req->is_write_op()) { | |
625 | assert(m_queued_writes > 0); | |
626 | m_queued_writes--; | |
627 | } else { | |
628 | assert(m_queued_reads > 0); | |
629 | m_queued_reads--; | |
630 | } | |
631 | } | |
632 | ||
224ce89b WB |
633 | template <typename I> |
634 | void ImageRequestWQ<I>::finish_in_flight_write() { | |
7c673cae FG |
635 | bool writes_blocked = false; |
636 | { | |
637 | RWLock::RLocker locker(m_lock); | |
224ce89b WB |
638 | assert(m_in_flight_writes > 0); |
639 | if (--m_in_flight_writes == 0 && | |
7c673cae FG |
640 | !m_write_blocker_contexts.empty()) { |
641 | writes_blocked = true; | |
642 | } | |
643 | } | |
644 | ||
645 | if (writes_blocked) { | |
646 | m_image_ctx.flush(new C_BlockedWrites(this)); | |
647 | } | |
648 | } | |
649 | ||
224ce89b WB |
650 | template <typename I> |
651 | int ImageRequestWQ<I>::start_in_flight_io(AioCompletion *c) { | |
7c673cae FG |
652 | RWLock::RLocker locker(m_lock); |
653 | ||
654 | if (m_shutdown) { | |
655 | CephContext *cct = m_image_ctx.cct; | |
656 | lderr(cct) << "IO received on closed image" << dendl; | |
657 | ||
224ce89b | 658 | c->get(); |
7c673cae FG |
659 | c->fail(-ESHUTDOWN); |
660 | return false; | |
661 | } | |
662 | ||
224ce89b | 663 | m_in_flight_ios++; |
7c673cae FG |
664 | return true; |
665 | } | |
666 | ||
224ce89b WB |
667 | template <typename I> |
668 | void ImageRequestWQ<I>::finish_in_flight_io() { | |
7c673cae FG |
669 | Context *on_shutdown; |
670 | { | |
671 | RWLock::RLocker locker(m_lock); | |
224ce89b | 672 | if (--m_in_flight_ios > 0 || !m_shutdown) { |
7c673cae FG |
673 | return; |
674 | } | |
675 | on_shutdown = m_on_shutdown; | |
676 | } | |
677 | ||
678 | CephContext *cct = m_image_ctx.cct; | |
679 | ldout(cct, 5) << "completing shut down" << dendl; | |
680 | ||
681 | assert(on_shutdown != nullptr); | |
682 | m_image_ctx.flush(on_shutdown); | |
683 | } | |
684 | ||
224ce89b WB |
685 | template <typename I> |
686 | void ImageRequestWQ<I>::fail_in_flight_io(int r, ImageRequest<I> *req) { | |
687 | this->process_finish(); | |
688 | req->fail(r); | |
689 | finish_queued_io(req); | |
690 | delete req; | |
691 | finish_in_flight_io(); | |
692 | } | |
7c673cae | 693 | |
224ce89b WB |
694 | template <typename I> |
695 | bool ImageRequestWQ<I>::is_lock_required(bool write_op) const { | |
696 | assert(m_lock.is_locked()); | |
697 | return ((write_op && m_require_lock_on_write) || | |
698 | (!write_op && m_require_lock_on_read)); | |
7c673cae FG |
699 | } |
700 | ||
224ce89b WB |
701 | template <typename I> |
702 | void ImageRequestWQ<I>::queue(ImageRequest<I> *req) { | |
703 | assert(m_image_ctx.owner_lock.is_locked()); | |
704 | ||
7c673cae FG |
705 | CephContext *cct = m_image_ctx.cct; |
706 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
707 | << "req=" << req << dendl; | |
708 | ||
224ce89b | 709 | if (req->is_write_op()) { |
7c673cae FG |
710 | m_queued_writes++; |
711 | } else { | |
712 | m_queued_reads++; | |
713 | } | |
714 | ||
224ce89b WB |
715 | ThreadPool::PointerWQ<ImageRequest<I> >::queue(req); |
716 | } | |
7c673cae | 717 | |
224ce89b WB |
718 | template <typename I> |
719 | void ImageRequestWQ<I>::handle_acquire_lock(int r, ImageRequest<I> *req) { | |
720 | CephContext *cct = m_image_ctx.cct; | |
721 | ldout(cct, 5) << "r=" << r << ", " << "req=" << req << dendl; | |
722 | ||
723 | if (r < 0) { | |
724 | fail_in_flight_io(r, req); | |
725 | } else { | |
726 | // since IO was stalled for acquire -- original IO order is preserved | |
727 | // if we requeue this op for work queue processing | |
728 | this->requeue(req); | |
7c673cae | 729 | } |
224ce89b WB |
730 | |
731 | assert(m_io_blockers.load() > 0); | |
732 | --m_io_blockers; | |
733 | this->signal(); | |
7c673cae FG |
734 | } |
735 | ||
224ce89b WB |
736 | template <typename I> |
737 | void ImageRequestWQ<I>::handle_refreshed(int r, ImageRequest<I> *req) { | |
7c673cae | 738 | CephContext *cct = m_image_ctx.cct; |
224ce89b WB |
739 | ldout(cct, 5) << "resuming IO after image refresh: r=" << r << ", " |
740 | << "req=" << req << dendl; | |
7c673cae | 741 | if (r < 0) { |
224ce89b | 742 | fail_in_flight_io(r, req); |
7c673cae FG |
743 | } else { |
744 | // since IO was stalled for refresh -- original IO order is preserved | |
745 | // if we requeue this op for work queue processing | |
224ce89b | 746 | this->requeue(req); |
7c673cae FG |
747 | } |
748 | ||
224ce89b WB |
749 | assert(m_io_blockers.load() > 0); |
750 | --m_io_blockers; | |
751 | this->signal(); | |
7c673cae FG |
752 | } |
753 | ||
224ce89b WB |
754 | template <typename I> |
755 | void ImageRequestWQ<I>::handle_blocked_writes(int r) { | |
7c673cae FG |
756 | Contexts contexts; |
757 | { | |
758 | RWLock::WLocker locker(m_lock); | |
759 | contexts.swap(m_write_blocker_contexts); | |
760 | } | |
761 | ||
762 | for (auto ctx : contexts) { | |
763 | ctx->complete(0); | |
764 | } | |
765 | } | |
766 | ||
224ce89b WB |
767 | template class librbd::io::ImageRequestWQ<librbd::ImageCtx>; |
768 | ||
7c673cae FG |
769 | } // namespace io |
770 | } // namespace librbd |