]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/journal/Replay.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / librbd / journal / Replay.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/journal/Replay.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "librbd/ExclusiveLock.h"
8 #include "librbd/ImageCtx.h"
9 #include "librbd/ImageState.h"
10 #include "librbd/internal.h"
11 #include "librbd/Operations.h"
12 #include "librbd/Utils.h"
13 #include "librbd/asio/ContextWQ.h"
14 #include "librbd/io/AioCompletion.h"
15 #include "librbd/io/ImageRequest.h"
16
17 #define dout_subsys ceph_subsys_rbd
18 #undef dout_prefix
19 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
20
21 namespace librbd {
22 namespace journal {
23
24 namespace {
25
26 static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27 static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
28
29 static NoOpProgressContext no_op_progress_callback;
30
31 template <typename I, typename E>
32 struct ExecuteOp : public Context {
33 I &image_ctx;
34 E event;
35 Context *on_op_complete;
36
37 ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete)
38 : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) {
39 }
40
41 void execute(const journal::SnapCreateEvent &_) {
42 image_ctx.operations->execute_snap_create(event.snap_namespace,
43 event.snap_name,
44 on_op_complete,
45 event.op_tid,
46 SNAP_CREATE_FLAG_SKIP_NOTIFY_QUIESCE,
47 no_op_progress_callback);
48 }
49
50 void execute(const journal::SnapRemoveEvent &_) {
51 image_ctx.operations->execute_snap_remove(event.snap_namespace,
52 event.snap_name,
53 on_op_complete);
54 }
55
56 void execute(const journal::SnapRenameEvent &_) {
57 image_ctx.operations->execute_snap_rename(event.snap_id,
58 event.dst_snap_name,
59 on_op_complete);
60 }
61
62 void execute(const journal::SnapProtectEvent &_) {
63 image_ctx.operations->execute_snap_protect(event.snap_namespace,
64 event.snap_name,
65 on_op_complete);
66 }
67
68 void execute(const journal::SnapUnprotectEvent &_) {
69 image_ctx.operations->execute_snap_unprotect(event.snap_namespace,
70 event.snap_name,
71 on_op_complete);
72 }
73
74 void execute(const journal::SnapRollbackEvent &_) {
75 image_ctx.operations->execute_snap_rollback(event.snap_namespace,
76 event.snap_name,
77 no_op_progress_callback,
78 on_op_complete);
79 }
80
81 void execute(const journal::RenameEvent &_) {
82 image_ctx.operations->execute_rename(event.image_name,
83 on_op_complete);
84 }
85
86 void execute(const journal::ResizeEvent &_) {
87 image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback,
88 on_op_complete, event.op_tid);
89 }
90
91 void execute(const journal::FlattenEvent &_) {
92 image_ctx.operations->execute_flatten(no_op_progress_callback,
93 on_op_complete);
94 }
95
96 void execute(const journal::SnapLimitEvent &_) {
97 image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete);
98 }
99
100 void execute(const journal::UpdateFeaturesEvent &_) {
101 image_ctx.operations->execute_update_features(event.features, event.enabled,
102 on_op_complete, event.op_tid);
103 }
104
105 void execute(const journal::MetadataSetEvent &_) {
106 image_ctx.operations->execute_metadata_set(event.key, event.value,
107 on_op_complete);
108 }
109
110 void execute(const journal::MetadataRemoveEvent &_) {
111 image_ctx.operations->execute_metadata_remove(event.key, on_op_complete);
112 }
113
114 void finish(int r) override {
115 CephContext *cct = image_ctx.cct;
116 if (r < 0) {
117 lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl;
118 on_op_complete->complete(r);
119 return;
120 }
121
122 ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl;
123 std::shared_lock owner_locker{image_ctx.owner_lock};
124
125 if (image_ctx.exclusive_lock == nullptr ||
126 !image_ctx.exclusive_lock->accept_ops()) {
127 ldout(cct, 5) << ": lost exclusive lock -- skipping op" << dendl;
128 on_op_complete->complete(-ECANCELED);
129 return;
130 }
131
132 execute(event);
133 }
134 };
135
136 template <typename I>
137 struct C_RefreshIfRequired : public Context {
138 I &image_ctx;
139 Context *on_finish;
140
141 C_RefreshIfRequired(I &image_ctx, Context *on_finish)
142 : image_ctx(image_ctx), on_finish(on_finish) {
143 }
144 ~C_RefreshIfRequired() override {
145 delete on_finish;
146 }
147
148 void finish(int r) override {
149 CephContext *cct = image_ctx.cct;
150 Context *ctx = on_finish;
151 on_finish = nullptr;
152
153 if (r < 0) {
154 lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl;
155 image_ctx.op_work_queue->queue(ctx, r);
156 return;
157 }
158
159 if (image_ctx.state->is_refresh_required()) {
160 ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": "
161 << "refresh required" << dendl;
162 image_ctx.state->refresh(ctx);
163 return;
164 }
165
166 image_ctx.op_work_queue->queue(ctx, 0);
167 }
168 };
169
170 } // anonymous namespace
171
172 #undef dout_prefix
173 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
174 << __func__
175
176 template <typename I>
177 Replay<I>::Replay(I &image_ctx)
178 : m_image_ctx(image_ctx) {
179 }
180
181 template <typename I>
182 Replay<I>::~Replay() {
183 std::lock_guard locker{m_lock};
184 ceph_assert(m_in_flight_aio_flush == 0);
185 ceph_assert(m_in_flight_aio_modify == 0);
186 ceph_assert(m_aio_modify_unsafe_contexts.empty());
187 ceph_assert(m_aio_modify_safe_contexts.empty());
188 ceph_assert(m_op_events.empty());
189 ceph_assert(m_in_flight_op_events == 0);
190 }
191
192 template <typename I>
193 int Replay<I>::decode(bufferlist::const_iterator *it, EventEntry *event_entry) {
194 try {
195 using ceph::decode;
196 decode(*event_entry, *it);
197 } catch (const buffer::error &err) {
198 return -EBADMSG;
199 }
200 return 0;
201 }
202
203 template <typename I>
204 void Replay<I>::process(const EventEntry &event_entry,
205 Context *on_ready, Context *on_safe) {
206 CephContext *cct = m_image_ctx.cct;
207 ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe
208 << dendl;
209
210 on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
211
212 std::shared_lock owner_lock{m_image_ctx.owner_lock};
213 if (m_image_ctx.exclusive_lock == nullptr ||
214 !m_image_ctx.exclusive_lock->accept_ops()) {
215 ldout(cct, 5) << ": lost exclusive lock -- skipping event" << dendl;
216 m_image_ctx.op_work_queue->queue(on_safe, -ECANCELED);
217 on_ready->complete(0);
218 return;
219 }
220
221 boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
222 event_entry.event);
223 }
224
225 template <typename I>
226 void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
227 CephContext *cct = m_image_ctx.cct;
228 ldout(cct, 20) << dendl;
229
230 io::AioCompletion *flush_comp = nullptr;
231 on_finish = util::create_async_context_callback(
232 m_image_ctx, on_finish);
233
234 {
235 std::lock_guard locker{m_lock};
236
237 // safely commit any remaining AIO modify operations
238 if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
239 flush_comp = create_aio_flush_completion(nullptr);
240 ceph_assert(flush_comp != nullptr);
241 }
242
243 for (auto &op_event_pair : m_op_events) {
244 OpEvent &op_event = op_event_pair.second;
245 if (cancel_ops) {
246 // cancel ops that are waiting to start (waiting for
247 // OpFinishEvent or waiting for ready)
248 if (op_event.on_start_ready == nullptr &&
249 op_event.on_op_finish_event != nullptr) {
250 Context *on_op_finish_event = nullptr;
251 std::swap(on_op_finish_event, op_event.on_op_finish_event);
252 m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
253 }
254 } else if (op_event.on_op_finish_event != nullptr) {
255 // start ops waiting for OpFinishEvent
256 Context *on_op_finish_event = nullptr;
257 std::swap(on_op_finish_event, op_event.on_op_finish_event);
258 m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
259 } else if (op_event.on_start_ready != nullptr) {
260 // waiting for op ready
261 op_event_pair.second.finish_on_ready = true;
262 }
263 }
264
265 ceph_assert(!m_shut_down);
266 m_shut_down = true;
267
268 ceph_assert(m_flush_ctx == nullptr);
269 if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
270 std::swap(m_flush_ctx, on_finish);
271 }
272 }
273
274 // execute the following outside of lock scope
275 if (flush_comp != nullptr) {
276 std::shared_lock owner_locker{m_image_ctx.owner_lock};
277 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
278 io::FLUSH_SOURCE_INTERNAL, {});
279 }
280 if (on_finish != nullptr) {
281 on_finish->complete(0);
282 }
283 }
284
285 template <typename I>
286 void Replay<I>::flush(Context *on_finish) {
287 io::AioCompletion *aio_comp;
288 {
289 std::lock_guard locker{m_lock};
290 aio_comp = create_aio_flush_completion(
291 util::create_async_context_callback(m_image_ctx, on_finish));
292 if (aio_comp == nullptr) {
293 return;
294 }
295 }
296
297 std::shared_lock owner_locker{m_image_ctx.owner_lock};
298 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp,
299 io::FLUSH_SOURCE_INTERNAL, {});
300 }
301
302 template <typename I>
303 void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
304 CephContext *cct = m_image_ctx.cct;
305 ldout(cct, 20) << ": op_tid=" << op_tid << dendl;
306
307 std::lock_guard locker{m_lock};
308 auto op_it = m_op_events.find(op_tid);
309 ceph_assert(op_it != m_op_events.end());
310
311 OpEvent &op_event = op_it->second;
312 ceph_assert(op_event.op_in_progress &&
313 op_event.on_op_finish_event == nullptr &&
314 op_event.on_finish_ready == nullptr &&
315 op_event.on_finish_safe == nullptr);
316
317 // resume processing replay events
318 Context *on_start_ready = nullptr;
319 std::swap(on_start_ready, op_event.on_start_ready);
320 on_start_ready->complete(0);
321
322 // cancel has been requested -- send error to paused state machine
323 if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
324 m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
325 return;
326 }
327
328 // resume the op state machine once the associated OpFinishEvent
329 // is processed
330 op_event.on_op_finish_event = new LambdaContext(
331 [on_resume](int r) {
332 on_resume->complete(r);
333 });
334
335 // shut down request -- don't expect OpFinishEvent
336 if (op_event.finish_on_ready) {
337 m_image_ctx.op_work_queue->queue(on_resume, 0);
338 }
339 }
340
341 template <typename I>
342 void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
343 Context *on_ready, Context *on_safe) {
344 CephContext *cct = m_image_ctx.cct;
345 ldout(cct, 20) << ": AIO discard event" << dendl;
346
347 bool flush_required;
348 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
349 io::AIO_TYPE_DISCARD,
350 &flush_required,
351 {});
352 if (aio_comp == nullptr) {
353 return;
354 }
355
356 if (!clipped_io(event.offset, aio_comp)) {
357 io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp,
358 {{event.offset, event.length}},
359 io::ImageArea::DATA,
360 event.discard_granularity_bytes, {});
361 }
362
363 if (flush_required) {
364 m_lock.lock();
365 auto flush_comp = create_aio_flush_completion(nullptr);
366 m_lock.unlock();
367
368 if (flush_comp != nullptr) {
369 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
370 io::FLUSH_SOURCE_INTERNAL, {});
371 }
372 }
373 }
374
375 template <typename I>
376 void Replay<I>::handle_event(const journal::AioWriteEvent &event,
377 Context *on_ready, Context *on_safe) {
378 CephContext *cct = m_image_ctx.cct;
379 ldout(cct, 20) << ": AIO write event" << dendl;
380
381 bufferlist data = event.data;
382 bool flush_required;
383 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
384 io::AIO_TYPE_WRITE,
385 &flush_required,
386 {});
387 if (aio_comp == nullptr) {
388 return;
389 }
390
391 if (!clipped_io(event.offset, aio_comp)) {
392 io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
393 {{event.offset, event.length}},
394 io::ImageArea::DATA, std::move(data),
395 0, {});
396 }
397
398 if (flush_required) {
399 m_lock.lock();
400 auto flush_comp = create_aio_flush_completion(nullptr);
401 m_lock.unlock();
402
403 if (flush_comp != nullptr) {
404 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
405 io::FLUSH_SOURCE_INTERNAL, {});
406 }
407 }
408 }
409
410 template <typename I>
411 void Replay<I>::handle_event(const journal::AioFlushEvent &event,
412 Context *on_ready, Context *on_safe) {
413 CephContext *cct = m_image_ctx.cct;
414 ldout(cct, 20) << ": AIO flush event" << dendl;
415
416 io::AioCompletion *aio_comp;
417 {
418 std::lock_guard locker{m_lock};
419 aio_comp = create_aio_flush_completion(on_safe);
420 }
421
422 if (aio_comp != nullptr) {
423 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp,
424 io::FLUSH_SOURCE_INTERNAL, {});
425 }
426 on_ready->complete(0);
427 }
428
429 template <typename I>
430 void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
431 Context *on_ready, Context *on_safe) {
432 CephContext *cct = m_image_ctx.cct;
433 ldout(cct, 20) << ": AIO writesame event" << dendl;
434
435 bufferlist data = event.data;
436 bool flush_required;
437 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
438 io::AIO_TYPE_WRITESAME,
439 &flush_required,
440 {});
441 if (aio_comp == nullptr) {
442 return;
443 }
444
445 if (!clipped_io(event.offset, aio_comp)) {
446 io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp,
447 {{event.offset, event.length}},
448 io::ImageArea::DATA, std::move(data),
449 0, {});
450 }
451
452 if (flush_required) {
453 m_lock.lock();
454 auto flush_comp = create_aio_flush_completion(nullptr);
455 m_lock.unlock();
456
457 if (flush_comp != nullptr) {
458 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
459 io::FLUSH_SOURCE_INTERNAL, {});
460 }
461 }
462 }
463
464 template <typename I>
465 void Replay<I>::handle_event(const journal::AioCompareAndWriteEvent &event,
466 Context *on_ready, Context *on_safe) {
467 CephContext *cct = m_image_ctx.cct;
468 ldout(cct, 20) << ": AIO CompareAndWrite event" << dendl;
469
470 bufferlist cmp_data = event.cmp_data;
471 bufferlist write_data = event.write_data;
472 bool flush_required;
473 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
474 io::AIO_TYPE_COMPARE_AND_WRITE,
475 &flush_required,
476 {-EILSEQ});
477
478 if (!clipped_io(event.offset, aio_comp)) {
479 io::ImageRequest<I>::aio_compare_and_write(&m_image_ctx, aio_comp,
480 {{event.offset, event.length}},
481 io::ImageArea::DATA,
482 std::move(cmp_data),
483 std::move(write_data),
484 nullptr, 0, {});
485 }
486
487 if (flush_required) {
488 m_lock.lock();
489 auto flush_comp = create_aio_flush_completion(nullptr);
490 m_lock.unlock();
491
492 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
493 io::FLUSH_SOURCE_INTERNAL, {});
494 }
495 }
496
497 template <typename I>
498 void Replay<I>::handle_event(const journal::OpFinishEvent &event,
499 Context *on_ready, Context *on_safe) {
500 CephContext *cct = m_image_ctx.cct;
501 ldout(cct, 20) << ": Op finish event: "
502 << "op_tid=" << event.op_tid << dendl;
503
504 bool op_in_progress;
505 bool filter_ret_val;
506 Context *on_op_complete = nullptr;
507 Context *on_op_finish_event = nullptr;
508 {
509 std::lock_guard locker{m_lock};
510 auto op_it = m_op_events.find(event.op_tid);
511 if (op_it == m_op_events.end()) {
512 ldout(cct, 10) << ": unable to locate associated op: assuming previously "
513 << "committed." << dendl;
514 on_ready->complete(0);
515 m_image_ctx.op_work_queue->queue(on_safe, 0);
516 return;
517 }
518
519 OpEvent &op_event = op_it->second;
520 ceph_assert(op_event.on_finish_safe == nullptr);
521 op_event.on_finish_ready = on_ready;
522 op_event.on_finish_safe = on_safe;
523 op_in_progress = op_event.op_in_progress;
524 std::swap(on_op_complete, op_event.on_op_complete);
525 std::swap(on_op_finish_event, op_event.on_op_finish_event);
526
527 // special errors which indicate op never started but was recorded
528 // as failed in the journal
529 filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0);
530 }
531
532 if (event.r < 0) {
533 if (op_in_progress) {
534 // bubble the error up to the in-progress op to cancel it
535 on_op_finish_event->complete(event.r);
536 } else {
537 // op hasn't been started -- bubble the error up since
538 // our image is now potentially in an inconsistent state
539 // since simple errors should have been caught before
540 // creating the op event
541 delete on_op_complete;
542 delete on_op_finish_event;
543 handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r);
544 }
545 return;
546 }
547
548 // journal recorded success -- apply the op now
549 on_op_finish_event->complete(0);
550 }
551
552 template <typename I>
553 void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
554 Context *on_ready, Context *on_safe) {
555 CephContext *cct = m_image_ctx.cct;
556 ldout(cct, 20) << ": Snap create event" << dendl;
557
558 std::lock_guard locker{m_lock};
559 OpEvent *op_event;
560 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
561 on_safe, &op_event);
562 if (on_op_complete == nullptr) {
563 return;
564 }
565
566 // ignore errors caused due to replay
567 op_event->ignore_error_codes = {-EEXIST};
568
569 // avoid lock cycles
570 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
571 m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event,
572 on_op_complete)),
573 0);
574
575 // do not process more events until the state machine is ready
576 // since it will affect IO
577 op_event->op_in_progress = true;
578 op_event->on_start_ready = on_ready;
579 }
580
581 template <typename I>
582 void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
583 Context *on_ready, Context *on_safe) {
584 CephContext *cct = m_image_ctx.cct;
585 ldout(cct, 20) << ": Snap remove event" << dendl;
586
587 std::lock_guard locker{m_lock};
588 OpEvent *op_event;
589 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
590 on_safe, &op_event);
591 if (on_op_complete == nullptr) {
592 return;
593 }
594
595 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
596 m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event,
597 on_op_complete));
598
599 // ignore errors caused due to replay
600 op_event->ignore_error_codes = {-ENOENT};
601
602 on_ready->complete(0);
603 }
604
605 template <typename I>
606 void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
607 Context *on_ready, Context *on_safe) {
608 CephContext *cct = m_image_ctx.cct;
609 ldout(cct, 20) << ": Snap rename event" << dendl;
610
611 std::lock_guard locker{m_lock};
612 OpEvent *op_event;
613 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
614 on_safe, &op_event);
615 if (on_op_complete == nullptr) {
616 return;
617 }
618
619 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
620 m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event,
621 on_op_complete));
622
623 // ignore errors caused due to replay
624 op_event->ignore_error_codes = {-EEXIST};
625
626 on_ready->complete(0);
627 }
628
629 template <typename I>
630 void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
631 Context *on_ready, Context *on_safe) {
632 CephContext *cct = m_image_ctx.cct;
633 ldout(cct, 20) << ": Snap protect event" << dendl;
634
635 std::lock_guard locker{m_lock};
636 OpEvent *op_event;
637 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
638 on_safe, &op_event);
639 if (on_op_complete == nullptr) {
640 return;
641 }
642
643 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
644 m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event,
645 on_op_complete));
646
647 // ignore errors caused due to replay
648 op_event->ignore_error_codes = {-EBUSY};
649
650 on_ready->complete(0);
651 }
652
653 template <typename I>
654 void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
655 Context *on_ready, Context *on_safe) {
656 CephContext *cct = m_image_ctx.cct;
657 ldout(cct, 20) << ": Snap unprotect event" << dendl;
658
659 std::lock_guard locker{m_lock};
660 OpEvent *op_event;
661 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
662 on_safe, &op_event);
663 if (on_op_complete == nullptr) {
664 return;
665 }
666
667 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
668 m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx,
669 event,
670 on_op_complete));
671
672 // ignore errors recorded in the journal
673 op_event->op_finish_error_codes = {-EBUSY};
674
675 // ignore errors caused due to replay
676 op_event->ignore_error_codes = {-EINVAL};
677
678 on_ready->complete(0);
679 }
680
681 template <typename I>
682 void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
683 Context *on_ready, Context *on_safe) {
684 CephContext *cct = m_image_ctx.cct;
685 ldout(cct, 20) << ": Snap rollback start event" << dendl;
686
687 std::lock_guard locker{m_lock};
688 OpEvent *op_event;
689 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
690 on_safe, &op_event);
691 if (on_op_complete == nullptr) {
692 return;
693 }
694
695 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
696 m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx,
697 event,
698 on_op_complete));
699
700 on_ready->complete(0);
701 }
702
703 template <typename I>
704 void Replay<I>::handle_event(const journal::RenameEvent &event,
705 Context *on_ready, Context *on_safe) {
706 CephContext *cct = m_image_ctx.cct;
707 ldout(cct, 20) << ": Rename event" << dendl;
708
709 std::lock_guard locker{m_lock};
710 OpEvent *op_event;
711 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
712 on_safe, &op_event);
713 if (on_op_complete == nullptr) {
714 return;
715 }
716
717 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
718 m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event,
719 on_op_complete));
720
721 // ignore errors caused due to replay
722 op_event->ignore_error_codes = {-EEXIST};
723
724 on_ready->complete(0);
725 }
726
727 template <typename I>
728 void Replay<I>::handle_event(const journal::ResizeEvent &event,
729 Context *on_ready, Context *on_safe) {
730 CephContext *cct = m_image_ctx.cct;
731 ldout(cct, 20) << ": Resize start event" << dendl;
732
733 std::lock_guard locker{m_lock};
734 OpEvent *op_event;
735 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
736 on_safe, &op_event);
737 if (on_op_complete == nullptr) {
738 return;
739 }
740
741 // avoid lock cycles
742 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
743 m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event,
744 on_op_complete)), 0);
745
746 // do not process more events until the state machine is ready
747 // since it will affect IO
748 op_event->op_in_progress = true;
749 op_event->on_start_ready = on_ready;
750 }
751
752 template <typename I>
753 void Replay<I>::handle_event(const journal::FlattenEvent &event,
754 Context *on_ready, Context *on_safe) {
755 CephContext *cct = m_image_ctx.cct;
756 ldout(cct, 20) << ": Flatten start event" << dendl;
757
758 std::lock_guard locker{m_lock};
759 OpEvent *op_event;
760 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
761 on_safe, &op_event);
762 if (on_op_complete == nullptr) {
763 return;
764 }
765
766 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
767 m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event,
768 on_op_complete));
769
770 // ignore errors caused due to replay
771 op_event->ignore_error_codes = {-EINVAL};
772
773 on_ready->complete(0);
774 }
775
776 template <typename I>
777 void Replay<I>::handle_event(const journal::DemotePromoteEvent &event,
778 Context *on_ready, Context *on_safe) {
779 CephContext *cct = m_image_ctx.cct;
780 ldout(cct, 20) << ": Demote/Promote event" << dendl;
781 on_ready->complete(0);
782 on_safe->complete(0);
783 }
784
785 template <typename I>
786 void Replay<I>::handle_event(const journal::SnapLimitEvent &event,
787 Context *on_ready, Context *on_safe) {
788 CephContext *cct = m_image_ctx.cct;
789 ldout(cct, 20) << ": Snap limit event" << dendl;
790
791 std::lock_guard locker{m_lock};
792 OpEvent *op_event;
793 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
794 on_safe, &op_event);
795 if (on_op_complete == nullptr) {
796 return;
797 }
798
799 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
800 m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx,
801 event,
802 on_op_complete));
803
804 op_event->ignore_error_codes = {-ERANGE};
805
806 on_ready->complete(0);
807 }
808
809 template <typename I>
810 void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event,
811 Context *on_ready, Context *on_safe) {
812 CephContext *cct = m_image_ctx.cct;
813 ldout(cct, 20) << ": Update features event" << dendl;
814
815 std::lock_guard locker{m_lock};
816 OpEvent *op_event;
817 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
818 on_safe, &op_event);
819 if (on_op_complete == nullptr) {
820 return;
821 }
822
823 // avoid lock cycles
824 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
825 m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>(
826 m_image_ctx, event, on_op_complete)), 0);
827
828 // do not process more events until the state machine is ready
829 // since it will affect IO
830 op_event->op_in_progress = true;
831 op_event->on_start_ready = on_ready;
832 }
833
834 template <typename I>
835 void Replay<I>::handle_event(const journal::MetadataSetEvent &event,
836 Context *on_ready, Context *on_safe) {
837 CephContext *cct = m_image_ctx.cct;
838 ldout(cct, 20) << ": Metadata set event" << dendl;
839
840 std::lock_guard locker{m_lock};
841 OpEvent *op_event;
842 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
843 on_safe, &op_event);
844 if (on_op_complete == nullptr) {
845 return;
846 }
847
848 on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
849 op_event->on_op_finish_event = util::create_async_context_callback(
850 m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>(
851 m_image_ctx, event, on_op_complete));
852
853 on_ready->complete(0);
854 }
855
856 template <typename I>
857 void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event,
858 Context *on_ready, Context *on_safe) {
859 CephContext *cct = m_image_ctx.cct;
860 ldout(cct, 20) << ": Metadata remove event" << dendl;
861
862 std::lock_guard locker{m_lock};
863 OpEvent *op_event;
864 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
865 on_safe, &op_event);
866 if (on_op_complete == nullptr) {
867 return;
868 }
869
870 on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
871 op_event->on_op_finish_event = util::create_async_context_callback(
872 m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>(
873 m_image_ctx, event, on_op_complete));
874
875 // ignore errors caused due to replay
876 op_event->ignore_error_codes = {-ENOENT};
877
878 on_ready->complete(0);
879 }
880
881 template <typename I>
882 void Replay<I>::handle_event(const journal::UnknownEvent &event,
883 Context *on_ready, Context *on_safe) {
884 CephContext *cct = m_image_ctx.cct;
885 ldout(cct, 20) << ": unknown event" << dendl;
886 on_ready->complete(0);
887 on_safe->complete(0);
888 }
889
890 template <typename I>
891 void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
892 int r, std::set<int> &filters) {
893 std::lock_guard locker{m_lock};
894 CephContext *cct = m_image_ctx.cct;
895 ldout(cct, 20) << ": on_ready=" << on_ready << ", "
896 << "on_safe=" << on_safe << ", r=" << r << dendl;
897
898 if (on_ready != nullptr) {
899 on_ready->complete(0);
900 }
901
902 if (filters.find(r) != filters.end())
903 r = 0;
904
905 if (r < 0) {
906 lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl;
907 m_image_ctx.op_work_queue->queue(on_safe, r);
908 return;
909 }
910
911 // will be completed after next flush operation completes
912 m_aio_modify_safe_contexts.insert(on_safe);
913 }
914
915 template <typename I>
916 void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
917 Contexts &on_safe_ctxs, int r) {
918 CephContext *cct = m_image_ctx.cct;
919 ldout(cct, 20) << ": r=" << r << dendl;
920
921 if (r < 0) {
922 lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl;
923 }
924
925 Context *on_aio_ready = nullptr;
926 Context *on_flush = nullptr;
927 {
928 std::lock_guard locker{m_lock};
929 ceph_assert(m_in_flight_aio_flush > 0);
930 ceph_assert(m_in_flight_aio_modify >= on_safe_ctxs.size());
931 --m_in_flight_aio_flush;
932 m_in_flight_aio_modify -= on_safe_ctxs.size();
933
934 std::swap(on_aio_ready, m_on_aio_ready);
935 if (m_in_flight_op_events == 0 &&
936 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
937 on_flush = m_flush_ctx;
938 }
939
940 // strip out previously failed on_safe contexts
941 for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
942 if (m_aio_modify_safe_contexts.erase(*it)) {
943 ++it;
944 } else {
945 it = on_safe_ctxs.erase(it);
946 }
947 }
948 }
949
950 if (on_aio_ready != nullptr) {
951 ldout(cct, 10) << ": resuming paused AIO" << dendl;
952 on_aio_ready->complete(0);
953 }
954
955 if (on_flush_safe != nullptr) {
956 on_safe_ctxs.push_back(on_flush_safe);
957 }
958 for (auto ctx : on_safe_ctxs) {
959 ldout(cct, 20) << ": completing safe context: " << ctx << dendl;
960 ctx->complete(r);
961 }
962
963 if (on_flush != nullptr) {
964 ldout(cct, 20) << ": completing flush context: " << on_flush << dendl;
965 on_flush->complete(r);
966 }
967 }
968
969 template <typename I>
970 Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
971 Context *on_ready,
972 Context *on_safe,
973 OpEvent **op_event) {
974 CephContext *cct = m_image_ctx.cct;
975 if (m_shut_down) {
976 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
977 on_ready->complete(0);
978 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
979 return nullptr;
980 }
981
982 ceph_assert(ceph_mutex_is_locked(m_lock));
983 if (m_op_events.count(op_tid) != 0) {
984 lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl;
985
986 // on_ready is already async but on failure invoke on_safe async
987 // as well
988 on_ready->complete(0);
989 m_image_ctx.op_work_queue->queue(on_safe, -EINVAL);
990 return nullptr;
991 }
992
993 ++m_in_flight_op_events;
994 *op_event = &m_op_events[op_tid];
995 (*op_event)->on_start_safe = on_safe;
996
997 Context *on_op_complete = new C_OpOnComplete(this, op_tid);
998 (*op_event)->on_op_complete = on_op_complete;
999 return on_op_complete;
1000 }
1001
1002 template <typename I>
1003 void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
1004 CephContext *cct = m_image_ctx.cct;
1005 ldout(cct, 20) << ": op_tid=" << op_tid << ", "
1006 << "r=" << r << dendl;
1007
1008 OpEvent op_event;
1009 bool shutting_down = false;
1010 {
1011 std::lock_guard locker{m_lock};
1012 auto op_it = m_op_events.find(op_tid);
1013 ceph_assert(op_it != m_op_events.end());
1014
1015 op_event = std::move(op_it->second);
1016 m_op_events.erase(op_it);
1017
1018 if (m_shut_down) {
1019 ceph_assert(m_flush_ctx != nullptr);
1020 shutting_down = true;
1021 }
1022 }
1023
1024 ceph_assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
1025 if (op_event.on_start_ready != nullptr) {
1026 // blocking op event failed before it became ready
1027 ceph_assert(op_event.on_finish_ready == nullptr &&
1028 op_event.on_finish_safe == nullptr);
1029
1030 op_event.on_start_ready->complete(0);
1031 } else {
1032 // event kicked off by OpFinishEvent
1033 ceph_assert((op_event.on_finish_ready != nullptr &&
1034 op_event.on_finish_safe != nullptr) || shutting_down);
1035 }
1036
1037 if (op_event.on_op_finish_event != nullptr) {
1038 op_event.on_op_finish_event->complete(r);
1039 }
1040
1041 if (op_event.on_finish_ready != nullptr) {
1042 op_event.on_finish_ready->complete(0);
1043 }
1044
1045 // filter out errors caused by replay of the same op
1046 if (r < 0 && op_event.ignore_error_codes.count(r) != 0) {
1047 r = 0;
1048 }
1049
1050 op_event.on_start_safe->complete(r);
1051 if (op_event.on_finish_safe != nullptr) {
1052 op_event.on_finish_safe->complete(r);
1053 }
1054
1055 // shut down request might have occurred while lock was
1056 // dropped -- handle if pending
1057 Context *on_flush = nullptr;
1058 {
1059 std::lock_guard locker{m_lock};
1060 ceph_assert(m_in_flight_op_events > 0);
1061 --m_in_flight_op_events;
1062 if (m_in_flight_op_events == 0 &&
1063 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
1064 on_flush = m_flush_ctx;
1065 }
1066 }
1067 if (on_flush != nullptr) {
1068 m_image_ctx.op_work_queue->queue(on_flush, 0);
1069 }
1070 }
1071
1072 template <typename I>
1073 io::AioCompletion *
1074 Replay<I>::create_aio_modify_completion(Context *on_ready,
1075 Context *on_safe,
1076 io::aio_type_t aio_type,
1077 bool *flush_required,
1078 std::set<int> &&filters) {
1079 std::lock_guard locker{m_lock};
1080 CephContext *cct = m_image_ctx.cct;
1081 ceph_assert(m_on_aio_ready == nullptr);
1082
1083 if (m_shut_down) {
1084 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1085 on_ready->complete(0);
1086 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1087 return nullptr;
1088 }
1089
1090 ++m_in_flight_aio_modify;
1091 m_aio_modify_unsafe_contexts.push_back(on_safe);
1092
1093 // FLUSH if we hit the low-water mark -- on_safe contexts are
1094 // completed by flushes-only so that we don't move the journal
1095 // commit position until safely on-disk
1096
1097 *flush_required = (m_aio_modify_unsafe_contexts.size() ==
1098 IN_FLIGHT_IO_LOW_WATER_MARK);
1099 if (*flush_required) {
1100 ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
1101 << dendl;
1102 }
1103
1104 // READY for more events if:
1105 // * not at high-water mark for IO
1106 // * in-flight ops are at a consistent point (snap create has IO flushed,
1107 // shrink has adjusted clip boundary, etc) -- should have already been
1108 // flagged not-ready
1109 if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
1110 ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay"
1111 << dendl;
1112 ceph_assert(m_on_aio_ready == nullptr);
1113 std::swap(m_on_aio_ready, on_ready);
1114 }
1115
1116 // when the modification is ACKed by librbd, we can process the next
1117 // event. when flushed, the completion of the next flush will fire the
1118 // on_safe callback
1119 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1120 new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters)),
1121 util::get_image_ctx(&m_image_ctx), aio_type);
1122 return aio_comp;
1123 }
1124
1125 template <typename I>
1126 io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
1127 ceph_assert(ceph_mutex_is_locked(m_lock));
1128
1129 CephContext *cct = m_image_ctx.cct;
1130 if (m_shut_down) {
1131 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1132 if (on_safe != nullptr) {
1133 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1134 }
1135 return nullptr;
1136 }
1137
1138 ++m_in_flight_aio_flush;
1139
1140 // associate all prior write/discard ops to this flush request
1141 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1142 new C_AioFlushComplete(this, on_safe,
1143 std::move(m_aio_modify_unsafe_contexts)),
1144 util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH);
1145 m_aio_modify_unsafe_contexts.clear();
1146 return aio_comp;
1147 }
1148
1149 template <typename I>
1150 bool Replay<I>::clipped_io(uint64_t image_offset, io::AioCompletion *aio_comp) {
1151 CephContext *cct = m_image_ctx.cct;
1152
1153 m_image_ctx.image_lock.lock_shared();
1154 size_t image_size = m_image_ctx.size;
1155 m_image_ctx.image_lock.unlock_shared();
1156
1157 if (image_offset >= image_size) {
1158 // rbd-mirror image sync might race an IO event w/ associated resize between
1159 // the point the peer is registered and the sync point is created, so no-op
1160 // IO events beyond the current image extents since under normal conditions
1161 // it wouldn't have been recorded in the journal
1162 ldout(cct, 5) << ": no-op IO event beyond image size" << dendl;
1163 aio_comp->get();
1164 aio_comp->set_request_count(0);
1165 aio_comp->put();
1166 return true;
1167 }
1168
1169 return false;
1170 }
1171
1172 } // namespace journal
1173 } // namespace librbd
1174
1175 template class librbd::journal::Replay<librbd::ImageCtx>;