]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/journal/Replay.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / librbd / journal / Replay.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/journal/Replay.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "common/WorkQueue.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageState.h"
11 #include "librbd/internal.h"
12 #include "librbd/Operations.h"
13 #include "librbd/Utils.h"
14 #include "librbd/io/AioCompletion.h"
15 #include "librbd/io/ImageRequest.h"
16
17 #define dout_subsys ceph_subsys_rbd
18 #undef dout_prefix
19 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
20
21 namespace librbd {
22 namespace journal {
23
24 namespace {
25
26 static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27 static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
28
29 static NoOpProgressContext no_op_progress_callback;
30
31 template <typename I, typename E>
32 struct ExecuteOp : public Context {
33 I &image_ctx;
34 E event;
35 Context *on_op_complete;
36
37 ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete)
38 : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) {
39 }
40
41 void execute(const journal::SnapCreateEvent &_) {
42 image_ctx.operations->execute_snap_create(event.snap_namespace,
43 event.snap_name,
44 on_op_complete,
45 event.op_tid, false);
46 }
47
48 void execute(const journal::SnapRemoveEvent &_) {
49 image_ctx.operations->execute_snap_remove(event.snap_namespace,
50 event.snap_name,
51 on_op_complete);
52 }
53
54 void execute(const journal::SnapRenameEvent &_) {
55 image_ctx.operations->execute_snap_rename(event.snap_id,
56 event.dst_snap_name,
57 on_op_complete);
58 }
59
60 void execute(const journal::SnapProtectEvent &_) {
61 image_ctx.operations->execute_snap_protect(event.snap_namespace,
62 event.snap_name,
63 on_op_complete);
64 }
65
66 void execute(const journal::SnapUnprotectEvent &_) {
67 image_ctx.operations->execute_snap_unprotect(event.snap_namespace,
68 event.snap_name,
69 on_op_complete);
70 }
71
72 void execute(const journal::SnapRollbackEvent &_) {
73 image_ctx.operations->execute_snap_rollback(event.snap_namespace,
74 event.snap_name,
75 no_op_progress_callback,
76 on_op_complete);
77 }
78
79 void execute(const journal::RenameEvent &_) {
80 image_ctx.operations->execute_rename(event.image_name,
81 on_op_complete);
82 }
83
84 void execute(const journal::ResizeEvent &_) {
85 image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback,
86 on_op_complete, event.op_tid);
87 }
88
89 void execute(const journal::FlattenEvent &_) {
90 image_ctx.operations->execute_flatten(no_op_progress_callback,
91 on_op_complete);
92 }
93
94 void execute(const journal::SnapLimitEvent &_) {
95 image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete);
96 }
97
98 void execute(const journal::UpdateFeaturesEvent &_) {
99 image_ctx.operations->execute_update_features(event.features, event.enabled,
100 on_op_complete, event.op_tid);
101 }
102
103 void execute(const journal::MetadataSetEvent &_) {
104 image_ctx.operations->execute_metadata_set(event.key, event.value,
105 on_op_complete);
106 }
107
108 void execute(const journal::MetadataRemoveEvent &_) {
109 image_ctx.operations->execute_metadata_remove(event.key, on_op_complete);
110 }
111
112 void finish(int r) override {
113 CephContext *cct = image_ctx.cct;
114 if (r < 0) {
115 lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl;
116 on_op_complete->complete(r);
117 return;
118 }
119
120 ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl;
121 RWLock::RLocker owner_locker(image_ctx.owner_lock);
122
123 if (image_ctx.exclusive_lock == nullptr ||
124 !image_ctx.exclusive_lock->accept_ops()) {
125 ldout(cct, 5) << ": lost exclusive lock -- skipping op" << dendl;
126 on_op_complete->complete(-ECANCELED);
127 return;
128 }
129
130 execute(event);
131 }
132 };
133
134 template <typename I>
135 struct C_RefreshIfRequired : public Context {
136 I &image_ctx;
137 Context *on_finish;
138
139 C_RefreshIfRequired(I &image_ctx, Context *on_finish)
140 : image_ctx(image_ctx), on_finish(on_finish) {
141 }
142 ~C_RefreshIfRequired() override {
143 delete on_finish;
144 }
145
146 void finish(int r) override {
147 CephContext *cct = image_ctx.cct;
148 Context *ctx = on_finish;
149 on_finish = nullptr;
150
151 if (r < 0) {
152 lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl;
153 image_ctx.op_work_queue->queue(ctx, r);
154 return;
155 }
156
157 if (image_ctx.state->is_refresh_required()) {
158 ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": "
159 << "refresh required" << dendl;
160 image_ctx.state->refresh(ctx);
161 return;
162 }
163
164 image_ctx.op_work_queue->queue(ctx, 0);
165 }
166 };
167
168 } // anonymous namespace
169
170 #undef dout_prefix
171 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
172 << __func__
173
174 template <typename I>
175 Replay<I>::Replay(I &image_ctx)
176 : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock") {
177 }
178
179 template <typename I>
180 Replay<I>::~Replay() {
181 ceph_assert(m_in_flight_aio_flush == 0);
182 ceph_assert(m_in_flight_aio_modify == 0);
183 ceph_assert(m_aio_modify_unsafe_contexts.empty());
184 ceph_assert(m_aio_modify_safe_contexts.empty());
185 ceph_assert(m_op_events.empty());
186 ceph_assert(m_in_flight_op_events == 0);
187 }
188
189 template <typename I>
190 int Replay<I>::decode(bufferlist::const_iterator *it, EventEntry *event_entry) {
191 try {
192 using ceph::decode;
193 decode(*event_entry, *it);
194 } catch (const buffer::error &err) {
195 return -EBADMSG;
196 }
197 return 0;
198 }
199
200 template <typename I>
201 void Replay<I>::process(const EventEntry &event_entry,
202 Context *on_ready, Context *on_safe) {
203 CephContext *cct = m_image_ctx.cct;
204 ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe
205 << dendl;
206
207 on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
208
209 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
210 if (m_image_ctx.exclusive_lock == nullptr ||
211 !m_image_ctx.exclusive_lock->accept_ops()) {
212 ldout(cct, 5) << ": lost exclusive lock -- skipping event" << dendl;
213 m_image_ctx.op_work_queue->queue(on_safe, -ECANCELED);
214 on_ready->complete(0);
215 return;
216 }
217
218 boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
219 event_entry.event);
220 }
221
222 template <typename I>
223 void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
224 CephContext *cct = m_image_ctx.cct;
225 ldout(cct, 20) << dendl;
226
227 io::AioCompletion *flush_comp = nullptr;
228 on_finish = util::create_async_context_callback(
229 m_image_ctx, on_finish);
230
231 {
232 Mutex::Locker locker(m_lock);
233
234 // safely commit any remaining AIO modify operations
235 if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
236 flush_comp = create_aio_flush_completion(nullptr);
237 ceph_assert(flush_comp != nullptr);
238 }
239
240 for (auto &op_event_pair : m_op_events) {
241 OpEvent &op_event = op_event_pair.second;
242 if (cancel_ops) {
243 // cancel ops that are waiting to start (waiting for
244 // OpFinishEvent or waiting for ready)
245 if (op_event.on_start_ready == nullptr &&
246 op_event.on_op_finish_event != nullptr) {
247 Context *on_op_finish_event = nullptr;
248 std::swap(on_op_finish_event, op_event.on_op_finish_event);
249 m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
250 }
251 } else if (op_event.on_op_finish_event != nullptr) {
252 // start ops waiting for OpFinishEvent
253 Context *on_op_finish_event = nullptr;
254 std::swap(on_op_finish_event, op_event.on_op_finish_event);
255 m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
256 } else if (op_event.on_start_ready != nullptr) {
257 // waiting for op ready
258 op_event_pair.second.finish_on_ready = true;
259 }
260 }
261
262 ceph_assert(!m_shut_down);
263 m_shut_down = true;
264
265 ceph_assert(m_flush_ctx == nullptr);
266 if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
267 std::swap(m_flush_ctx, on_finish);
268 }
269 }
270
271 // execute the following outside of lock scope
272 if (flush_comp != nullptr) {
273 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
274 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
275 io::FLUSH_SOURCE_INTERNAL, {});
276 }
277 if (on_finish != nullptr) {
278 on_finish->complete(0);
279 }
280 }
281
282 template <typename I>
283 void Replay<I>::flush(Context *on_finish) {
284 io::AioCompletion *aio_comp;
285 {
286 Mutex::Locker locker(m_lock);
287 aio_comp = create_aio_flush_completion(
288 util::create_async_context_callback(m_image_ctx, on_finish));
289 if (aio_comp == nullptr) {
290 return;
291 }
292 }
293
294 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
295 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp,
296 io::FLUSH_SOURCE_INTERNAL, {});
297 }
298
299 template <typename I>
300 void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
301 CephContext *cct = m_image_ctx.cct;
302 ldout(cct, 20) << ": op_tid=" << op_tid << dendl;
303
304 Mutex::Locker locker(m_lock);
305 auto op_it = m_op_events.find(op_tid);
306 ceph_assert(op_it != m_op_events.end());
307
308 OpEvent &op_event = op_it->second;
309 ceph_assert(op_event.op_in_progress &&
310 op_event.on_op_finish_event == nullptr &&
311 op_event.on_finish_ready == nullptr &&
312 op_event.on_finish_safe == nullptr);
313
314 // resume processing replay events
315 Context *on_start_ready = nullptr;
316 std::swap(on_start_ready, op_event.on_start_ready);
317 on_start_ready->complete(0);
318
319 // cancel has been requested -- send error to paused state machine
320 if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
321 m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
322 return;
323 }
324
325 // resume the op state machine once the associated OpFinishEvent
326 // is processed
327 op_event.on_op_finish_event = new FunctionContext(
328 [on_resume](int r) {
329 on_resume->complete(r);
330 });
331
332 // shut down request -- don't expect OpFinishEvent
333 if (op_event.finish_on_ready) {
334 m_image_ctx.op_work_queue->queue(on_resume, 0);
335 }
336 }
337
338 template <typename I>
339 void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
340 Context *on_ready, Context *on_safe) {
341 CephContext *cct = m_image_ctx.cct;
342 ldout(cct, 20) << ": AIO discard event" << dendl;
343
344 bool flush_required;
345 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
346 io::AIO_TYPE_DISCARD,
347 &flush_required,
348 {});
349 if (aio_comp == nullptr) {
350 return;
351 }
352
353 if (!clipped_io(event.offset, aio_comp)) {
354 io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp,
355 {{event.offset, event.length}},
356 event.discard_granularity_bytes, {});
357 }
358
359 if (flush_required) {
360 m_lock.Lock();
361 auto flush_comp = create_aio_flush_completion(nullptr);
362 m_lock.Unlock();
363
364 if (flush_comp != nullptr) {
365 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
366 io::FLUSH_SOURCE_INTERNAL, {});
367 }
368 }
369 }
370
371 template <typename I>
372 void Replay<I>::handle_event(const journal::AioWriteEvent &event,
373 Context *on_ready, Context *on_safe) {
374 CephContext *cct = m_image_ctx.cct;
375 ldout(cct, 20) << ": AIO write event" << dendl;
376
377 bufferlist data = event.data;
378 bool flush_required;
379 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
380 io::AIO_TYPE_WRITE,
381 &flush_required,
382 {});
383 if (aio_comp == nullptr) {
384 return;
385 }
386
387 if (!clipped_io(event.offset, aio_comp)) {
388 io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
389 {{event.offset, event.length}},
390 std::move(data), 0, {});
391 }
392
393 if (flush_required) {
394 m_lock.Lock();
395 auto flush_comp = create_aio_flush_completion(nullptr);
396 m_lock.Unlock();
397
398 if (flush_comp != nullptr) {
399 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
400 io::FLUSH_SOURCE_INTERNAL, {});
401 }
402 }
403 }
404
405 template <typename I>
406 void Replay<I>::handle_event(const journal::AioFlushEvent &event,
407 Context *on_ready, Context *on_safe) {
408 CephContext *cct = m_image_ctx.cct;
409 ldout(cct, 20) << ": AIO flush event" << dendl;
410
411 io::AioCompletion *aio_comp;
412 {
413 Mutex::Locker locker(m_lock);
414 aio_comp = create_aio_flush_completion(on_safe);
415 }
416
417 if (aio_comp != nullptr) {
418 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp,
419 io::FLUSH_SOURCE_INTERNAL, {});
420 }
421 on_ready->complete(0);
422 }
423
424 template <typename I>
425 void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
426 Context *on_ready, Context *on_safe) {
427 CephContext *cct = m_image_ctx.cct;
428 ldout(cct, 20) << ": AIO writesame event" << dendl;
429
430 bufferlist data = event.data;
431 bool flush_required;
432 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
433 io::AIO_TYPE_WRITESAME,
434 &flush_required,
435 {});
436 if (aio_comp == nullptr) {
437 return;
438 }
439
440 if (!clipped_io(event.offset, aio_comp)) {
441 io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp,
442 {{event.offset, event.length}},
443 std::move(data), 0, {});
444 }
445
446 if (flush_required) {
447 m_lock.Lock();
448 auto flush_comp = create_aio_flush_completion(nullptr);
449 m_lock.Unlock();
450
451 if (flush_comp != nullptr) {
452 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
453 io::FLUSH_SOURCE_INTERNAL, {});
454 }
455 }
456 }
457
458 template <typename I>
459 void Replay<I>::handle_event(const journal::AioCompareAndWriteEvent &event,
460 Context *on_ready, Context *on_safe) {
461 CephContext *cct = m_image_ctx.cct;
462 ldout(cct, 20) << ": AIO CompareAndWrite event" << dendl;
463
464 bufferlist cmp_data = event.cmp_data;
465 bufferlist write_data = event.write_data;
466 bool flush_required;
467 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
468 io::AIO_TYPE_COMPARE_AND_WRITE,
469 &flush_required,
470 {-EILSEQ});
471
472 if (!clipped_io(event.offset, aio_comp)) {
473 io::ImageRequest<I>::aio_compare_and_write(&m_image_ctx, aio_comp,
474 {{event.offset, event.length}},
475 std::move(cmp_data),
476 std::move(write_data),
477 nullptr, 0, {});
478 }
479
480 if (flush_required) {
481 m_lock.Lock();
482 auto flush_comp = create_aio_flush_completion(nullptr);
483 m_lock.Unlock();
484
485 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
486 io::FLUSH_SOURCE_INTERNAL, {});
487 }
488 }
489
490 template <typename I>
491 void Replay<I>::handle_event(const journal::OpFinishEvent &event,
492 Context *on_ready, Context *on_safe) {
493 CephContext *cct = m_image_ctx.cct;
494 ldout(cct, 20) << ": Op finish event: "
495 << "op_tid=" << event.op_tid << dendl;
496
497 bool op_in_progress;
498 bool filter_ret_val;
499 Context *on_op_complete = nullptr;
500 Context *on_op_finish_event = nullptr;
501 {
502 Mutex::Locker locker(m_lock);
503 auto op_it = m_op_events.find(event.op_tid);
504 if (op_it == m_op_events.end()) {
505 ldout(cct, 10) << ": unable to locate associated op: assuming previously "
506 << "committed." << dendl;
507 on_ready->complete(0);
508 m_image_ctx.op_work_queue->queue(on_safe, 0);
509 return;
510 }
511
512 OpEvent &op_event = op_it->second;
513 ceph_assert(op_event.on_finish_safe == nullptr);
514 op_event.on_finish_ready = on_ready;
515 op_event.on_finish_safe = on_safe;
516 op_in_progress = op_event.op_in_progress;
517 std::swap(on_op_complete, op_event.on_op_complete);
518 std::swap(on_op_finish_event, op_event.on_op_finish_event);
519
520 // special errors which indicate op never started but was recorded
521 // as failed in the journal
522 filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0);
523 }
524
525 if (event.r < 0) {
526 if (op_in_progress) {
527 // bubble the error up to the in-progress op to cancel it
528 on_op_finish_event->complete(event.r);
529 } else {
530 // op hasn't been started -- bubble the error up since
531 // our image is now potentially in an inconsistent state
532 // since simple errors should have been caught before
533 // creating the op event
534 delete on_op_complete;
535 delete on_op_finish_event;
536 handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r);
537 }
538 return;
539 }
540
541 // journal recorded success -- apply the op now
542 on_op_finish_event->complete(0);
543 }
544
545 template <typename I>
546 void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
547 Context *on_ready, Context *on_safe) {
548 CephContext *cct = m_image_ctx.cct;
549 ldout(cct, 20) << ": Snap create event" << dendl;
550
551 Mutex::Locker locker(m_lock);
552 OpEvent *op_event;
553 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
554 on_safe, &op_event);
555 if (on_op_complete == nullptr) {
556 return;
557 }
558
559 // ignore errors caused due to replay
560 op_event->ignore_error_codes = {-EEXIST};
561
562 // avoid lock cycles
563 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
564 m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event,
565 on_op_complete)),
566 0);
567
568 // do not process more events until the state machine is ready
569 // since it will affect IO
570 op_event->op_in_progress = true;
571 op_event->on_start_ready = on_ready;
572 }
573
574 template <typename I>
575 void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
576 Context *on_ready, Context *on_safe) {
577 CephContext *cct = m_image_ctx.cct;
578 ldout(cct, 20) << ": Snap remove event" << dendl;
579
580 Mutex::Locker locker(m_lock);
581 OpEvent *op_event;
582 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
583 on_safe, &op_event);
584 if (on_op_complete == nullptr) {
585 return;
586 }
587
588 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
589 m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event,
590 on_op_complete));
591
592 // ignore errors caused due to replay
593 op_event->ignore_error_codes = {-ENOENT};
594
595 on_ready->complete(0);
596 }
597
598 template <typename I>
599 void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
600 Context *on_ready, Context *on_safe) {
601 CephContext *cct = m_image_ctx.cct;
602 ldout(cct, 20) << ": Snap rename event" << dendl;
603
604 Mutex::Locker locker(m_lock);
605 OpEvent *op_event;
606 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
607 on_safe, &op_event);
608 if (on_op_complete == nullptr) {
609 return;
610 }
611
612 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
613 m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event,
614 on_op_complete));
615
616 // ignore errors caused due to replay
617 op_event->ignore_error_codes = {-EEXIST};
618
619 on_ready->complete(0);
620 }
621
622 template <typename I>
623 void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
624 Context *on_ready, Context *on_safe) {
625 CephContext *cct = m_image_ctx.cct;
626 ldout(cct, 20) << ": Snap protect event" << dendl;
627
628 Mutex::Locker locker(m_lock);
629 OpEvent *op_event;
630 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
631 on_safe, &op_event);
632 if (on_op_complete == nullptr) {
633 return;
634 }
635
636 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
637 m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event,
638 on_op_complete));
639
640 // ignore errors caused due to replay
641 op_event->ignore_error_codes = {-EBUSY};
642
643 on_ready->complete(0);
644 }
645
646 template <typename I>
647 void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
648 Context *on_ready, Context *on_safe) {
649 CephContext *cct = m_image_ctx.cct;
650 ldout(cct, 20) << ": Snap unprotect event" << dendl;
651
652 Mutex::Locker locker(m_lock);
653 OpEvent *op_event;
654 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
655 on_safe, &op_event);
656 if (on_op_complete == nullptr) {
657 return;
658 }
659
660 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
661 m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx,
662 event,
663 on_op_complete));
664
665 // ignore errors recorded in the journal
666 op_event->op_finish_error_codes = {-EBUSY};
667
668 // ignore errors caused due to replay
669 op_event->ignore_error_codes = {-EINVAL};
670
671 on_ready->complete(0);
672 }
673
674 template <typename I>
675 void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
676 Context *on_ready, Context *on_safe) {
677 CephContext *cct = m_image_ctx.cct;
678 ldout(cct, 20) << ": Snap rollback start event" << dendl;
679
680 Mutex::Locker locker(m_lock);
681 OpEvent *op_event;
682 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
683 on_safe, &op_event);
684 if (on_op_complete == nullptr) {
685 return;
686 }
687
688 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
689 m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx,
690 event,
691 on_op_complete));
692
693 on_ready->complete(0);
694 }
695
696 template <typename I>
697 void Replay<I>::handle_event(const journal::RenameEvent &event,
698 Context *on_ready, Context *on_safe) {
699 CephContext *cct = m_image_ctx.cct;
700 ldout(cct, 20) << ": Rename event" << dendl;
701
702 Mutex::Locker locker(m_lock);
703 OpEvent *op_event;
704 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
705 on_safe, &op_event);
706 if (on_op_complete == nullptr) {
707 return;
708 }
709
710 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
711 m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event,
712 on_op_complete));
713
714 // ignore errors caused due to replay
715 op_event->ignore_error_codes = {-EEXIST};
716
717 on_ready->complete(0);
718 }
719
720 template <typename I>
721 void Replay<I>::handle_event(const journal::ResizeEvent &event,
722 Context *on_ready, Context *on_safe) {
723 CephContext *cct = m_image_ctx.cct;
724 ldout(cct, 20) << ": Resize start event" << dendl;
725
726 Mutex::Locker locker(m_lock);
727 OpEvent *op_event;
728 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
729 on_safe, &op_event);
730 if (on_op_complete == nullptr) {
731 return;
732 }
733
734 // avoid lock cycles
735 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
736 m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event,
737 on_op_complete)), 0);
738
739 // do not process more events until the state machine is ready
740 // since it will affect IO
741 op_event->op_in_progress = true;
742 op_event->on_start_ready = on_ready;
743 }
744
745 template <typename I>
746 void Replay<I>::handle_event(const journal::FlattenEvent &event,
747 Context *on_ready, Context *on_safe) {
748 CephContext *cct = m_image_ctx.cct;
749 ldout(cct, 20) << ": Flatten start event" << dendl;
750
751 Mutex::Locker locker(m_lock);
752 OpEvent *op_event;
753 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
754 on_safe, &op_event);
755 if (on_op_complete == nullptr) {
756 return;
757 }
758
759 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
760 m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event,
761 on_op_complete));
762
763 // ignore errors caused due to replay
764 op_event->ignore_error_codes = {-EINVAL};
765
766 on_ready->complete(0);
767 }
768
769 template <typename I>
770 void Replay<I>::handle_event(const journal::DemotePromoteEvent &event,
771 Context *on_ready, Context *on_safe) {
772 CephContext *cct = m_image_ctx.cct;
773 ldout(cct, 20) << ": Demote/Promote event" << dendl;
774 on_ready->complete(0);
775 on_safe->complete(0);
776 }
777
778 template <typename I>
779 void Replay<I>::handle_event(const journal::SnapLimitEvent &event,
780 Context *on_ready, Context *on_safe) {
781 CephContext *cct = m_image_ctx.cct;
782 ldout(cct, 20) << ": Snap limit event" << dendl;
783
784 Mutex::Locker locker(m_lock);
785 OpEvent *op_event;
786 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
787 on_safe, &op_event);
788 if (on_op_complete == nullptr) {
789 return;
790 }
791
792 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
793 m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx,
794 event,
795 on_op_complete));
796
797 op_event->ignore_error_codes = {-ERANGE};
798
799 on_ready->complete(0);
800 }
801
802 template <typename I>
803 void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event,
804 Context *on_ready, Context *on_safe) {
805 CephContext *cct = m_image_ctx.cct;
806 ldout(cct, 20) << ": Update features event" << dendl;
807
808 Mutex::Locker locker(m_lock);
809 OpEvent *op_event;
810 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
811 on_safe, &op_event);
812 if (on_op_complete == nullptr) {
813 return;
814 }
815
816 // avoid lock cycles
817 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
818 m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>(
819 m_image_ctx, event, on_op_complete)), 0);
820
821 // do not process more events until the state machine is ready
822 // since it will affect IO
823 op_event->op_in_progress = true;
824 op_event->on_start_ready = on_ready;
825 }
826
827 template <typename I>
828 void Replay<I>::handle_event(const journal::MetadataSetEvent &event,
829 Context *on_ready, Context *on_safe) {
830 CephContext *cct = m_image_ctx.cct;
831 ldout(cct, 20) << ": Metadata set event" << dendl;
832
833 Mutex::Locker locker(m_lock);
834 OpEvent *op_event;
835 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
836 on_safe, &op_event);
837 if (on_op_complete == nullptr) {
838 return;
839 }
840
841 on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
842 op_event->on_op_finish_event = util::create_async_context_callback(
843 m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>(
844 m_image_ctx, event, on_op_complete));
845
846 on_ready->complete(0);
847 }
848
849 template <typename I>
850 void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event,
851 Context *on_ready, Context *on_safe) {
852 CephContext *cct = m_image_ctx.cct;
853 ldout(cct, 20) << ": Metadata remove event" << dendl;
854
855 Mutex::Locker locker(m_lock);
856 OpEvent *op_event;
857 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
858 on_safe, &op_event);
859 if (on_op_complete == nullptr) {
860 return;
861 }
862
863 on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
864 op_event->on_op_finish_event = util::create_async_context_callback(
865 m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>(
866 m_image_ctx, event, on_op_complete));
867
868 // ignore errors caused due to replay
869 op_event->ignore_error_codes = {-ENOENT};
870
871 on_ready->complete(0);
872 }
873
874 template <typename I>
875 void Replay<I>::handle_event(const journal::UnknownEvent &event,
876 Context *on_ready, Context *on_safe) {
877 CephContext *cct = m_image_ctx.cct;
878 ldout(cct, 20) << ": unknown event" << dendl;
879 on_ready->complete(0);
880 on_safe->complete(0);
881 }
882
883 template <typename I>
884 void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
885 int r, std::set<int> &filters,
886 bool writeback_cache_enabled) {
887 Mutex::Locker locker(m_lock);
888 CephContext *cct = m_image_ctx.cct;
889 ldout(cct, 20) << ": on_ready=" << on_ready << ", "
890 << "on_safe=" << on_safe << ", r=" << r << dendl;
891
892 if (on_ready != nullptr) {
893 on_ready->complete(0);
894 }
895
896 if (filters.find(r) != filters.end())
897 r = 0;
898
899 if (r < 0) {
900 lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl;
901 m_image_ctx.op_work_queue->queue(on_safe, r);
902 return;
903 }
904
905 if (writeback_cache_enabled) {
906 // will be completed after next flush operation completes
907 m_aio_modify_safe_contexts.insert(on_safe);
908 } else {
909 // IO is safely stored on disk
910 ceph_assert(m_in_flight_aio_modify > 0);
911 --m_in_flight_aio_modify;
912
913 if (m_on_aio_ready != nullptr) {
914 ldout(cct, 10) << ": resuming paused AIO" << dendl;
915 m_on_aio_ready->complete(0);
916 m_on_aio_ready = nullptr;
917 }
918
919 ldout(cct, 20) << ": completing safe context: " << on_safe << dendl;
920 m_image_ctx.op_work_queue->queue(on_safe, 0);
921 }
922 }
923
924 template <typename I>
925 void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
926 Contexts &on_safe_ctxs, int r) {
927 CephContext *cct = m_image_ctx.cct;
928 ldout(cct, 20) << ": r=" << r << dendl;
929
930 if (r < 0) {
931 lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl;
932 }
933
934 Context *on_aio_ready = nullptr;
935 Context *on_flush = nullptr;
936 {
937 Mutex::Locker locker(m_lock);
938 ceph_assert(m_in_flight_aio_flush > 0);
939 ceph_assert(m_in_flight_aio_modify >= on_safe_ctxs.size());
940 --m_in_flight_aio_flush;
941 m_in_flight_aio_modify -= on_safe_ctxs.size();
942
943 std::swap(on_aio_ready, m_on_aio_ready);
944 if (m_in_flight_op_events == 0 &&
945 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
946 on_flush = m_flush_ctx;
947 }
948
949 // strip out previously failed on_safe contexts
950 for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
951 if (m_aio_modify_safe_contexts.erase(*it)) {
952 ++it;
953 } else {
954 it = on_safe_ctxs.erase(it);
955 }
956 }
957 }
958
959 if (on_aio_ready != nullptr) {
960 ldout(cct, 10) << ": resuming paused AIO" << dendl;
961 on_aio_ready->complete(0);
962 }
963
964 if (on_flush_safe != nullptr) {
965 on_safe_ctxs.push_back(on_flush_safe);
966 }
967 for (auto ctx : on_safe_ctxs) {
968 ldout(cct, 20) << ": completing safe context: " << ctx << dendl;
969 ctx->complete(r);
970 }
971
972 if (on_flush != nullptr) {
973 ldout(cct, 20) << ": completing flush context: " << on_flush << dendl;
974 on_flush->complete(r);
975 }
976 }
977
978 template <typename I>
979 Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
980 Context *on_ready,
981 Context *on_safe,
982 OpEvent **op_event) {
983 CephContext *cct = m_image_ctx.cct;
984 if (m_shut_down) {
985 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
986 on_ready->complete(0);
987 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
988 return nullptr;
989 }
990
991 ceph_assert(m_lock.is_locked());
992 if (m_op_events.count(op_tid) != 0) {
993 lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl;
994
995 // on_ready is already async but on failure invoke on_safe async
996 // as well
997 on_ready->complete(0);
998 m_image_ctx.op_work_queue->queue(on_safe, -EINVAL);
999 return nullptr;
1000 }
1001
1002 ++m_in_flight_op_events;
1003 *op_event = &m_op_events[op_tid];
1004 (*op_event)->on_start_safe = on_safe;
1005
1006 Context *on_op_complete = new C_OpOnComplete(this, op_tid);
1007 (*op_event)->on_op_complete = on_op_complete;
1008 return on_op_complete;
1009 }
1010
1011 template <typename I>
1012 void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
1013 CephContext *cct = m_image_ctx.cct;
1014 ldout(cct, 20) << ": op_tid=" << op_tid << ", "
1015 << "r=" << r << dendl;
1016
1017 OpEvent op_event;
1018 bool shutting_down = false;
1019 {
1020 Mutex::Locker locker(m_lock);
1021 auto op_it = m_op_events.find(op_tid);
1022 ceph_assert(op_it != m_op_events.end());
1023
1024 op_event = std::move(op_it->second);
1025 m_op_events.erase(op_it);
1026
1027 if (m_shut_down) {
1028 ceph_assert(m_flush_ctx != nullptr);
1029 shutting_down = true;
1030 }
1031 }
1032
1033 ceph_assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
1034 if (op_event.on_start_ready != nullptr) {
1035 // blocking op event failed before it became ready
1036 ceph_assert(op_event.on_finish_ready == nullptr &&
1037 op_event.on_finish_safe == nullptr);
1038
1039 op_event.on_start_ready->complete(0);
1040 } else {
1041 // event kicked off by OpFinishEvent
1042 ceph_assert((op_event.on_finish_ready != nullptr &&
1043 op_event.on_finish_safe != nullptr) || shutting_down);
1044 }
1045
1046 if (op_event.on_op_finish_event != nullptr) {
1047 op_event.on_op_finish_event->complete(r);
1048 }
1049
1050 if (op_event.on_finish_ready != nullptr) {
1051 op_event.on_finish_ready->complete(0);
1052 }
1053
1054 // filter out errors caused by replay of the same op
1055 if (r < 0 && op_event.ignore_error_codes.count(r) != 0) {
1056 r = 0;
1057 }
1058
1059 op_event.on_start_safe->complete(r);
1060 if (op_event.on_finish_safe != nullptr) {
1061 op_event.on_finish_safe->complete(r);
1062 }
1063
1064 // shut down request might have occurred while lock was
1065 // dropped -- handle if pending
1066 Context *on_flush = nullptr;
1067 {
1068 Mutex::Locker locker(m_lock);
1069 ceph_assert(m_in_flight_op_events > 0);
1070 --m_in_flight_op_events;
1071 if (m_in_flight_op_events == 0 &&
1072 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
1073 on_flush = m_flush_ctx;
1074 }
1075 }
1076 if (on_flush != nullptr) {
1077 m_image_ctx.op_work_queue->queue(on_flush, 0);
1078 }
1079 }
1080
1081 template <typename I>
1082 io::AioCompletion *
1083 Replay<I>::create_aio_modify_completion(Context *on_ready,
1084 Context *on_safe,
1085 io::aio_type_t aio_type,
1086 bool *flush_required,
1087 std::set<int> &&filters) {
1088 Mutex::Locker locker(m_lock);
1089 CephContext *cct = m_image_ctx.cct;
1090 ceph_assert(m_on_aio_ready == nullptr);
1091
1092 if (m_shut_down) {
1093 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1094 on_ready->complete(0);
1095 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1096 return nullptr;
1097 }
1098
1099 ++m_in_flight_aio_modify;
1100
1101 bool writeback_cache_enabled = m_image_ctx.is_writeback_cache_enabled();
1102 if (writeback_cache_enabled) {
1103 m_aio_modify_unsafe_contexts.push_back(on_safe);
1104 }
1105
1106 // FLUSH if we hit the low-water mark -- on_safe contexts are
1107 // completed by flushes-only so that we don't move the journal
1108 // commit position until safely on-disk
1109
1110 *flush_required = (writeback_cache_enabled &&
1111 m_aio_modify_unsafe_contexts.size() ==
1112 IN_FLIGHT_IO_LOW_WATER_MARK);
1113 if (*flush_required) {
1114 ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
1115 << dendl;
1116 }
1117
1118 // READY for more events if:
1119 // * not at high-water mark for IO
1120 // * in-flight ops are at a consistent point (snap create has IO flushed,
1121 // shrink has adjusted clip boundary, etc) -- should have already been
1122 // flagged not-ready
1123 if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
1124 ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay"
1125 << dendl;
1126 ceph_assert(m_on_aio_ready == nullptr);
1127 std::swap(m_on_aio_ready, on_ready);
1128 }
1129
1130 // when the modification is ACKed by librbd, we can process the next
1131 // event. when flushed, the completion of the next flush will fire the
1132 // on_safe callback
1133 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1134 new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters),
1135 writeback_cache_enabled),
1136 util::get_image_ctx(&m_image_ctx), aio_type);
1137 return aio_comp;
1138 }
1139
1140 template <typename I>
1141 io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
1142 ceph_assert(m_lock.is_locked());
1143
1144 CephContext *cct = m_image_ctx.cct;
1145 if (m_shut_down) {
1146 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1147 if (on_safe != nullptr) {
1148 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1149 }
1150 return nullptr;
1151 }
1152
1153 ++m_in_flight_aio_flush;
1154
1155 // associate all prior write/discard ops to this flush request
1156 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1157 new C_AioFlushComplete(this, on_safe,
1158 std::move(m_aio_modify_unsafe_contexts)),
1159 util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH);
1160 m_aio_modify_unsafe_contexts.clear();
1161 return aio_comp;
1162 }
1163
1164 template <typename I>
1165 bool Replay<I>::clipped_io(uint64_t image_offset, io::AioCompletion *aio_comp) {
1166 CephContext *cct = m_image_ctx.cct;
1167
1168 m_image_ctx.snap_lock.get_read();
1169 size_t image_size = m_image_ctx.size;
1170 m_image_ctx.snap_lock.put_read();
1171
1172 if (image_offset >= image_size) {
1173 // rbd-mirror image sync might race an IO event w/ associated resize between
1174 // the point the peer is registered and the sync point is created, so no-op
1175 // IO events beyond the current image extents since under normal conditions
1176 // it wouldn't have been recorded in the journal
1177 ldout(cct, 5) << ": no-op IO event beyond image size" << dendl;
1178 aio_comp->get();
1179 aio_comp->unblock();
1180 aio_comp->put();
1181 return true;
1182 }
1183
1184 return false;
1185 }
1186
1187 } // namespace journal
1188 } // namespace librbd
1189
1190 template class librbd::journal::Replay<librbd::ImageCtx>;