]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "librbd/io/ImageRequestWQ.h" | |
5 | #include "common/errno.h" | |
31f18b77 | 6 | #include "common/zipkin_trace.h" |
7c673cae FG |
7 | #include "librbd/ExclusiveLock.h" |
8 | #include "librbd/ImageCtx.h" | |
9 | #include "librbd/ImageState.h" | |
10 | #include "librbd/internal.h" | |
11 | #include "librbd/Utils.h" | |
12 | #include "librbd/exclusive_lock/Policy.h" | |
13 | #include "librbd/io/AioCompletion.h" | |
14 | #include "librbd/io/ImageRequest.h" | |
15 | ||
16 | #define dout_subsys ceph_subsys_rbd | |
17 | #undef dout_prefix | |
18 | #define dout_prefix *_dout << "librbd::io::ImageRequestWQ: " << this \ | |
19 | << " " << __func__ << ": " | |
20 | ||
21 | namespace librbd { | |
22 | namespace io { | |
23 | ||
24 | ImageRequestWQ::ImageRequestWQ(ImageCtx *image_ctx, const string &name, | |
25 | time_t ti, ThreadPool *tp) | |
26 | : ThreadPool::PointerWQ<ImageRequest<> >(name, ti, 0, tp), | |
27 | m_image_ctx(*image_ctx), | |
28 | m_lock(util::unique_lock_name("ImageRequestWQ::m_lock", this)), | |
29 | m_write_blockers(0), m_in_progress_writes(0), m_queued_reads(0), | |
30 | m_queued_writes(0), m_in_flight_ops(0), m_refresh_in_progress(false), | |
31 | m_shutdown(false), m_on_shutdown(nullptr) { | |
32 | CephContext *cct = m_image_ctx.cct; | |
33 | ldout(cct, 5) << "ictx=" << image_ctx << dendl; | |
34 | tp->add_work_queue(this); | |
35 | } | |
36 | ||
37 | ssize_t ImageRequestWQ::read(uint64_t off, uint64_t len, | |
38 | ReadResult &&read_result, int op_flags) { | |
39 | CephContext *cct = m_image_ctx.cct; | |
40 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
41 | << "len = " << len << dendl; | |
42 | ||
43 | C_SaferCond cond; | |
44 | AioCompletion *c = AioCompletion::create(&cond); | |
45 | aio_read(c, off, len, std::move(read_result), op_flags, false); | |
46 | return cond.wait(); | |
47 | } | |
48 | ||
49 | ssize_t ImageRequestWQ::write(uint64_t off, uint64_t len, | |
50 | bufferlist &&bl, int op_flags) { | |
51 | CephContext *cct = m_image_ctx.cct; | |
52 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
53 | << "len = " << len << dendl; | |
54 | ||
55 | m_image_ctx.snap_lock.get_read(); | |
56 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
57 | m_image_ctx.snap_lock.put_read(); | |
58 | if (r < 0) { | |
59 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
60 | return r; | |
61 | } | |
62 | ||
63 | C_SaferCond cond; | |
64 | AioCompletion *c = AioCompletion::create(&cond); | |
65 | aio_write(c, off, len, std::move(bl), op_flags, false); | |
66 | ||
67 | r = cond.wait(); | |
68 | if (r < 0) { | |
69 | return r; | |
70 | } | |
71 | return len; | |
72 | } | |
73 | ||
74 | ssize_t ImageRequestWQ::discard(uint64_t off, uint64_t len, bool skip_partial_discard) { | |
75 | CephContext *cct = m_image_ctx.cct; | |
76 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
77 | << "len = " << len << dendl; | |
78 | ||
79 | m_image_ctx.snap_lock.get_read(); | |
80 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
81 | m_image_ctx.snap_lock.put_read(); | |
82 | if (r < 0) { | |
83 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
84 | return r; | |
85 | } | |
86 | ||
87 | C_SaferCond cond; | |
88 | AioCompletion *c = AioCompletion::create(&cond); | |
89 | aio_discard(c, off, len, skip_partial_discard, false); | |
90 | ||
91 | r = cond.wait(); | |
92 | if (r < 0) { | |
93 | return r; | |
94 | } | |
95 | return len; | |
96 | } | |
97 | ||
98 | ssize_t ImageRequestWQ::writesame(uint64_t off, uint64_t len, bufferlist &&bl, | |
99 | int op_flags) { | |
100 | CephContext *cct = m_image_ctx.cct; | |
101 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", off=" << off << ", " | |
102 | << "len = " << len << ", data_len " << bl.length() << dendl; | |
103 | ||
104 | m_image_ctx.snap_lock.get_read(); | |
105 | int r = clip_io(util::get_image_ctx(&m_image_ctx), off, &len); | |
106 | m_image_ctx.snap_lock.put_read(); | |
107 | if (r < 0) { | |
108 | lderr(cct) << "invalid IO request: " << cpp_strerror(r) << dendl; | |
109 | return r; | |
110 | } | |
111 | ||
112 | C_SaferCond cond; | |
113 | AioCompletion *c = AioCompletion::create(&cond); | |
114 | aio_writesame(c, off, len, std::move(bl), op_flags, false); | |
115 | ||
116 | r = cond.wait(); | |
117 | if (r < 0) { | |
118 | return r; | |
119 | } | |
120 | return len; | |
121 | } | |
122 | ||
123 | void ImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, uint64_t len, | |
124 | ReadResult &&read_result, int op_flags, | |
125 | bool native_async) { | |
7c673cae | 126 | CephContext *cct = m_image_ctx.cct; |
31f18b77 FG |
127 | ZTracer::Trace trace; |
128 | if (cct->_conf->rbd_blkin_trace_all) { | |
129 | trace.init("wq: read", &m_image_ctx.trace_endpoint); | |
130 | trace.event("start"); | |
131 | } | |
132 | ||
133 | c->init_time(&m_image_ctx, AIO_TYPE_READ); | |
7c673cae FG |
134 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
135 | << "completion=" << c << ", off=" << off << ", " | |
136 | << "len=" << len << ", " << "flags=" << op_flags << dendl; | |
137 | ||
138 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
139 | c->set_event_notify(true); | |
140 | } | |
141 | ||
142 | if (!start_in_flight_op(c)) { | |
143 | return; | |
144 | } | |
145 | ||
146 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
147 | ||
148 | // if journaling is enabled -- we need to replay the journal because | |
149 | // it might contain an uncommitted write | |
150 | bool lock_required; | |
151 | { | |
152 | RWLock::RLocker locker(m_lock); | |
153 | lock_required = m_require_lock_on_read; | |
154 | } | |
155 | ||
156 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() || | |
157 | lock_required) { | |
158 | queue(new ImageReadRequest<>(m_image_ctx, c, {{off, len}}, | |
31f18b77 | 159 | std::move(read_result), op_flags, trace)); |
7c673cae FG |
160 | } else { |
161 | c->start_op(); | |
162 | ImageRequest<>::aio_read(&m_image_ctx, c, {{off, len}}, | |
31f18b77 | 163 | std::move(read_result), op_flags, trace); |
7c673cae FG |
164 | finish_in_flight_op(); |
165 | } | |
31f18b77 | 166 | trace.event("finish"); |
7c673cae FG |
167 | } |
168 | ||
169 | void ImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, uint64_t len, | |
170 | bufferlist &&bl, int op_flags, | |
171 | bool native_async) { | |
7c673cae | 172 | CephContext *cct = m_image_ctx.cct; |
31f18b77 FG |
173 | ZTracer::Trace trace; |
174 | if (cct->_conf->rbd_blkin_trace_all) { | |
175 | trace.init("wq: write", &m_image_ctx.trace_endpoint); | |
176 | trace.event("init"); | |
177 | } | |
178 | ||
179 | c->init_time(&m_image_ctx, AIO_TYPE_WRITE); | |
7c673cae FG |
180 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
181 | << "completion=" << c << ", off=" << off << ", " | |
182 | << "len=" << len << ", flags=" << op_flags << dendl; | |
183 | ||
184 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
185 | c->set_event_notify(true); | |
186 | } | |
187 | ||
188 | if (!start_in_flight_op(c)) { | |
189 | return; | |
190 | } | |
191 | ||
192 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
193 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
194 | queue(new ImageWriteRequest<>(m_image_ctx, c, {{off, len}}, | |
31f18b77 | 195 | std::move(bl), op_flags, trace)); |
7c673cae FG |
196 | } else { |
197 | c->start_op(); | |
198 | ImageRequest<>::aio_write(&m_image_ctx, c, {{off, len}}, | |
31f18b77 | 199 | std::move(bl), op_flags, trace); |
7c673cae FG |
200 | finish_in_flight_op(); |
201 | } | |
31f18b77 | 202 | trace.event("finish"); |
7c673cae FG |
203 | } |
204 | ||
205 | void ImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off, | |
206 | uint64_t len, bool skip_partial_discard, | |
207 | bool native_async) { | |
7c673cae | 208 | CephContext *cct = m_image_ctx.cct; |
31f18b77 FG |
209 | ZTracer::Trace trace; |
210 | if (cct->_conf->rbd_blkin_trace_all) { | |
211 | trace.init("wq: discard", &m_image_ctx.trace_endpoint); | |
212 | trace.event("init"); | |
213 | } | |
214 | ||
215 | c->init_time(&m_image_ctx, AIO_TYPE_DISCARD); | |
7c673cae FG |
216 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
217 | << "completion=" << c << ", off=" << off << ", len=" << len | |
218 | << dendl; | |
219 | ||
220 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
221 | c->set_event_notify(true); | |
222 | } | |
223 | ||
224 | if (!start_in_flight_op(c)) { | |
225 | return; | |
226 | } | |
227 | ||
228 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
229 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
31f18b77 FG |
230 | queue(new ImageDiscardRequest<>(m_image_ctx, c, off, len, |
231 | skip_partial_discard, trace)); | |
7c673cae FG |
232 | } else { |
233 | c->start_op(); | |
31f18b77 FG |
234 | ImageRequest<>::aio_discard(&m_image_ctx, c, off, len, |
235 | skip_partial_discard, trace); | |
7c673cae FG |
236 | finish_in_flight_op(); |
237 | } | |
31f18b77 | 238 | trace.event("finish"); |
7c673cae FG |
239 | } |
240 | ||
241 | void ImageRequestWQ::aio_flush(AioCompletion *c, bool native_async) { | |
7c673cae | 242 | CephContext *cct = m_image_ctx.cct; |
31f18b77 FG |
243 | ZTracer::Trace trace; |
244 | if (cct->_conf->rbd_blkin_trace_all) { | |
245 | trace.init("wq: flush", &m_image_ctx.trace_endpoint); | |
246 | trace.event("init"); | |
247 | } | |
248 | ||
249 | c->init_time(&m_image_ctx, AIO_TYPE_FLUSH); | |
7c673cae FG |
250 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
251 | << "completion=" << c << dendl; | |
252 | ||
253 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
254 | c->set_event_notify(true); | |
255 | } | |
256 | ||
257 | if (!start_in_flight_op(c)) { | |
258 | return; | |
259 | } | |
260 | ||
261 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
262 | if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) { | |
31f18b77 | 263 | queue(new ImageFlushRequest<>(m_image_ctx, c, trace)); |
7c673cae | 264 | } else { |
31f18b77 | 265 | ImageRequest<>::aio_flush(&m_image_ctx, c, trace); |
7c673cae FG |
266 | finish_in_flight_op(); |
267 | } | |
31f18b77 | 268 | trace.event("finish"); |
7c673cae FG |
269 | } |
270 | ||
271 | void ImageRequestWQ::aio_writesame(AioCompletion *c, uint64_t off, uint64_t len, | |
272 | bufferlist &&bl, int op_flags, | |
273 | bool native_async) { | |
7c673cae | 274 | CephContext *cct = m_image_ctx.cct; |
31f18b77 FG |
275 | ZTracer::Trace trace; |
276 | if (cct->_conf->rbd_blkin_trace_all) { | |
277 | trace.init("wq: writesame", &m_image_ctx.trace_endpoint); | |
278 | trace.event("init"); | |
279 | } | |
280 | ||
281 | c->init_time(&m_image_ctx, AIO_TYPE_WRITESAME); | |
7c673cae FG |
282 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " |
283 | << "completion=" << c << ", off=" << off << ", " | |
284 | << "len=" << len << ", data_len = " << bl.length() << ", " | |
285 | << "flags=" << op_flags << dendl; | |
286 | ||
287 | if (native_async && m_image_ctx.event_socket.is_valid()) { | |
288 | c->set_event_notify(true); | |
289 | } | |
290 | ||
291 | if (!start_in_flight_op(c)) { | |
292 | return; | |
293 | } | |
294 | ||
295 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
296 | if (m_image_ctx.non_blocking_aio || writes_blocked()) { | |
297 | queue(new ImageWriteSameRequest<>(m_image_ctx, c, off, len, std::move(bl), | |
31f18b77 | 298 | op_flags, trace)); |
7c673cae FG |
299 | } else { |
300 | c->start_op(); | |
301 | ImageRequest<>::aio_writesame(&m_image_ctx, c, off, len, std::move(bl), | |
31f18b77 | 302 | op_flags, trace); |
7c673cae FG |
303 | finish_in_flight_op(); |
304 | } | |
31f18b77 | 305 | trace.event("finish"); |
7c673cae FG |
306 | } |
307 | ||
308 | void ImageRequestWQ::shut_down(Context *on_shutdown) { | |
309 | assert(m_image_ctx.owner_lock.is_locked()); | |
310 | ||
311 | { | |
312 | RWLock::WLocker locker(m_lock); | |
313 | assert(!m_shutdown); | |
314 | m_shutdown = true; | |
315 | ||
316 | CephContext *cct = m_image_ctx.cct; | |
317 | ldout(cct, 5) << __func__ << ": in_flight=" << m_in_flight_ops.load() | |
318 | << dendl; | |
319 | if (m_in_flight_ops > 0) { | |
320 | m_on_shutdown = on_shutdown; | |
321 | return; | |
322 | } | |
323 | } | |
324 | ||
325 | // ensure that all in-flight IO is flushed | |
326 | m_image_ctx.flush(on_shutdown); | |
327 | } | |
328 | ||
329 | bool ImageRequestWQ::is_lock_request_needed() const { | |
330 | RWLock::RLocker locker(m_lock); | |
331 | return (m_queued_writes > 0 || | |
332 | (m_require_lock_on_read && m_queued_reads > 0)); | |
333 | } | |
334 | ||
335 | int ImageRequestWQ::block_writes() { | |
336 | C_SaferCond cond_ctx; | |
337 | block_writes(&cond_ctx); | |
338 | return cond_ctx.wait(); | |
339 | } | |
340 | ||
341 | void ImageRequestWQ::block_writes(Context *on_blocked) { | |
342 | assert(m_image_ctx.owner_lock.is_locked()); | |
343 | CephContext *cct = m_image_ctx.cct; | |
344 | ||
345 | { | |
346 | RWLock::WLocker locker(m_lock); | |
347 | ++m_write_blockers; | |
348 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
349 | << m_write_blockers << dendl; | |
350 | if (!m_write_blocker_contexts.empty() || m_in_progress_writes > 0) { | |
351 | m_write_blocker_contexts.push_back(on_blocked); | |
352 | return; | |
353 | } | |
354 | } | |
355 | ||
356 | // ensure that all in-flight IO is flushed | |
357 | m_image_ctx.flush(on_blocked); | |
358 | } | |
359 | ||
360 | void ImageRequestWQ::unblock_writes() { | |
361 | CephContext *cct = m_image_ctx.cct; | |
362 | ||
363 | bool wake_up = false; | |
364 | { | |
365 | RWLock::WLocker locker(m_lock); | |
366 | assert(m_write_blockers > 0); | |
367 | --m_write_blockers; | |
368 | ||
369 | ldout(cct, 5) << &m_image_ctx << ", " << "num=" | |
370 | << m_write_blockers << dendl; | |
371 | if (m_write_blockers == 0) { | |
372 | wake_up = true; | |
373 | } | |
374 | } | |
375 | ||
376 | if (wake_up) { | |
377 | signal(); | |
378 | } | |
379 | } | |
380 | ||
381 | void ImageRequestWQ::set_require_lock_on_read() { | |
382 | CephContext *cct = m_image_ctx.cct; | |
383 | ldout(cct, 20) << dendl; | |
384 | ||
385 | RWLock::WLocker locker(m_lock); | |
386 | m_require_lock_on_read = true; | |
387 | } | |
388 | ||
389 | void ImageRequestWQ::clear_require_lock_on_read() { | |
390 | CephContext *cct = m_image_ctx.cct; | |
391 | ldout(cct, 20) << dendl; | |
392 | ||
393 | { | |
394 | RWLock::WLocker locker(m_lock); | |
395 | if (!m_require_lock_on_read) { | |
396 | return; | |
397 | } | |
398 | ||
399 | m_require_lock_on_read = false; | |
400 | } | |
401 | signal(); | |
402 | } | |
403 | ||
404 | void *ImageRequestWQ::_void_dequeue() { | |
405 | ImageRequest<> *peek_item = front(); | |
406 | ||
407 | // no IO ops available or refresh in-progress (IO stalled) | |
408 | if (peek_item == nullptr || m_refresh_in_progress) { | |
409 | return nullptr; | |
410 | } | |
411 | ||
412 | bool refresh_required = m_image_ctx.state->is_refresh_required(); | |
413 | { | |
414 | RWLock::RLocker locker(m_lock); | |
415 | if (peek_item->is_write_op()) { | |
416 | if (m_write_blockers > 0) { | |
417 | return nullptr; | |
418 | } | |
419 | ||
420 | // refresh will requeue the op -- don't count it as in-progress | |
421 | if (!refresh_required) { | |
422 | m_in_progress_writes++; | |
423 | } | |
424 | } else if (m_require_lock_on_read) { | |
425 | return nullptr; | |
426 | } | |
427 | } | |
428 | ||
429 | ImageRequest<> *item = reinterpret_cast<ImageRequest<> *>( | |
430 | ThreadPool::PointerWQ<ImageRequest<> >::_void_dequeue()); | |
431 | assert(peek_item == item); | |
432 | ||
433 | if (refresh_required) { | |
434 | ldout(m_image_ctx.cct, 15) << "image refresh required: delaying IO " << item | |
435 | << dendl; | |
436 | ||
437 | // stall IO until the refresh completes | |
438 | m_refresh_in_progress = true; | |
439 | ||
440 | get_pool_lock().Unlock(); | |
441 | m_image_ctx.state->refresh(new C_RefreshFinish(this, item)); | |
442 | get_pool_lock().Lock(); | |
443 | return nullptr; | |
444 | } | |
445 | ||
446 | item->start_op(); | |
447 | return item; | |
448 | } | |
449 | ||
450 | void ImageRequestWQ::process(ImageRequest<> *req) { | |
451 | CephContext *cct = m_image_ctx.cct; | |
452 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
453 | << "req=" << req << dendl; | |
454 | ||
455 | req->send(); | |
456 | ||
457 | finish_queued_op(req); | |
458 | if (req->is_write_op()) { | |
459 | finish_in_progress_write(); | |
460 | } | |
461 | delete req; | |
462 | ||
463 | finish_in_flight_op(); | |
464 | } | |
465 | ||
466 | void ImageRequestWQ::finish_queued_op(ImageRequest<> *req) { | |
467 | RWLock::RLocker locker(m_lock); | |
468 | if (req->is_write_op()) { | |
469 | assert(m_queued_writes > 0); | |
470 | m_queued_writes--; | |
471 | } else { | |
472 | assert(m_queued_reads > 0); | |
473 | m_queued_reads--; | |
474 | } | |
475 | } | |
476 | ||
477 | void ImageRequestWQ::finish_in_progress_write() { | |
478 | bool writes_blocked = false; | |
479 | { | |
480 | RWLock::RLocker locker(m_lock); | |
481 | assert(m_in_progress_writes > 0); | |
482 | if (--m_in_progress_writes == 0 && | |
483 | !m_write_blocker_contexts.empty()) { | |
484 | writes_blocked = true; | |
485 | } | |
486 | } | |
487 | ||
488 | if (writes_blocked) { | |
489 | m_image_ctx.flush(new C_BlockedWrites(this)); | |
490 | } | |
491 | } | |
492 | ||
493 | int ImageRequestWQ::start_in_flight_op(AioCompletion *c) { | |
494 | RWLock::RLocker locker(m_lock); | |
495 | ||
496 | if (m_shutdown) { | |
497 | CephContext *cct = m_image_ctx.cct; | |
498 | lderr(cct) << "IO received on closed image" << dendl; | |
499 | ||
500 | c->fail(-ESHUTDOWN); | |
501 | return false; | |
502 | } | |
503 | ||
504 | m_in_flight_ops++; | |
505 | return true; | |
506 | } | |
507 | ||
508 | void ImageRequestWQ::finish_in_flight_op() { | |
509 | Context *on_shutdown; | |
510 | { | |
511 | RWLock::RLocker locker(m_lock); | |
512 | if (--m_in_flight_ops > 0 || !m_shutdown) { | |
513 | return; | |
514 | } | |
515 | on_shutdown = m_on_shutdown; | |
516 | } | |
517 | ||
518 | CephContext *cct = m_image_ctx.cct; | |
519 | ldout(cct, 5) << "completing shut down" << dendl; | |
520 | ||
521 | assert(on_shutdown != nullptr); | |
522 | m_image_ctx.flush(on_shutdown); | |
523 | } | |
524 | ||
525 | bool ImageRequestWQ::is_lock_required() const { | |
526 | assert(m_image_ctx.owner_lock.is_locked()); | |
527 | if (m_image_ctx.exclusive_lock == NULL) { | |
528 | return false; | |
529 | } | |
530 | ||
531 | return (!m_image_ctx.exclusive_lock->is_lock_owner()); | |
532 | } | |
533 | ||
534 | void ImageRequestWQ::queue(ImageRequest<> *req) { | |
535 | CephContext *cct = m_image_ctx.cct; | |
536 | ldout(cct, 20) << "ictx=" << &m_image_ctx << ", " | |
537 | << "req=" << req << dendl; | |
538 | ||
539 | assert(m_image_ctx.owner_lock.is_locked()); | |
540 | bool write_op = req->is_write_op(); | |
541 | bool lock_required = (m_image_ctx.exclusive_lock != nullptr && | |
542 | ((write_op && is_lock_required()) || | |
543 | (!write_op && m_require_lock_on_read))); | |
544 | ||
545 | if (lock_required && !m_image_ctx.get_exclusive_lock_policy()->may_auto_request_lock()) { | |
546 | lderr(cct) << "op requires exclusive lock" << dendl; | |
547 | req->fail(-EROFS); | |
548 | delete req; | |
549 | finish_in_flight_op(); | |
550 | return; | |
551 | } | |
552 | ||
553 | if (write_op) { | |
554 | m_queued_writes++; | |
555 | } else { | |
556 | m_queued_reads++; | |
557 | } | |
558 | ||
559 | ThreadPool::PointerWQ<ImageRequest<> >::queue(req); | |
560 | ||
561 | if (lock_required) { | |
562 | m_image_ctx.exclusive_lock->acquire_lock(nullptr); | |
563 | } | |
564 | } | |
565 | ||
566 | void ImageRequestWQ::handle_refreshed(int r, ImageRequest<> *req) { | |
567 | CephContext *cct = m_image_ctx.cct; | |
568 | ldout(cct, 15) << "resuming IO after image refresh: r=" << r << ", " | |
569 | << "req=" << req << dendl; | |
570 | if (r < 0) { | |
571 | process_finish(); | |
572 | req->fail(r); | |
573 | finish_queued_op(req); | |
574 | delete req; | |
575 | finish_in_flight_op(); | |
576 | } else { | |
577 | // since IO was stalled for refresh -- original IO order is preserved | |
578 | // if we requeue this op for work queue processing | |
579 | requeue(req); | |
580 | } | |
581 | ||
582 | m_refresh_in_progress = false; | |
583 | signal(); | |
584 | ||
585 | // refresh might have enabled exclusive lock -- IO stalled until | |
586 | // we acquire the lock | |
587 | RWLock::RLocker owner_locker(m_image_ctx.owner_lock); | |
588 | if (is_lock_required() && is_lock_request_needed()) { | |
589 | m_image_ctx.exclusive_lock->acquire_lock(nullptr); | |
590 | } | |
591 | } | |
592 | ||
593 | void ImageRequestWQ::handle_blocked_writes(int r) { | |
594 | Contexts contexts; | |
595 | { | |
596 | RWLock::WLocker locker(m_lock); | |
597 | contexts.swap(m_write_blocker_contexts); | |
598 | } | |
599 | ||
600 | for (auto ctx : contexts) { | |
601 | ctx->complete(0); | |
602 | } | |
603 | } | |
604 | ||
605 | } // namespace io | |
606 | } // namespace librbd |