]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/journal/Replay.cc
update sources to v12.2.3
[ceph.git] / ceph / src / librbd / journal / Replay.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/journal/Replay.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "common/WorkQueue.h"
8 #include "librbd/ExclusiveLock.h"
9 #include "librbd/ImageCtx.h"
10 #include "librbd/ImageState.h"
11 #include "librbd/internal.h"
12 #include "librbd/Operations.h"
13 #include "librbd/Utils.h"
14 #include "librbd/io/AioCompletion.h"
15 #include "librbd/io/ImageRequest.h"
16
17 #define dout_subsys ceph_subsys_rbd
18 #undef dout_prefix
19 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
20
21 namespace librbd {
22 namespace journal {
23
24 namespace {
25
26 static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27 static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
28
29 static NoOpProgressContext no_op_progress_callback;
30
31 template <typename I, typename E>
32 struct ExecuteOp : public Context {
33 I &image_ctx;
34 E event;
35 Context *on_op_complete;
36
37 ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete)
38 : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) {
39 }
40
41 void execute(const journal::SnapCreateEvent &_) {
42 image_ctx.operations->execute_snap_create(event.snap_namespace,
43 event.snap_name,
44 on_op_complete,
45 event.op_tid, false);
46 }
47
48 void execute(const journal::SnapRemoveEvent &_) {
49 image_ctx.operations->execute_snap_remove(event.snap_namespace,
50 event.snap_name,
51 on_op_complete);
52 }
53
54 void execute(const journal::SnapRenameEvent &_) {
55 image_ctx.operations->execute_snap_rename(event.snap_id,
56 event.dst_snap_name,
57 on_op_complete);
58 }
59
60 void execute(const journal::SnapProtectEvent &_) {
61 image_ctx.operations->execute_snap_protect(event.snap_namespace,
62 event.snap_name,
63 on_op_complete);
64 }
65
66 void execute(const journal::SnapUnprotectEvent &_) {
67 image_ctx.operations->execute_snap_unprotect(event.snap_namespace,
68 event.snap_name,
69 on_op_complete);
70 }
71
72 void execute(const journal::SnapRollbackEvent &_) {
73 image_ctx.operations->execute_snap_rollback(event.snap_namespace,
74 event.snap_name,
75 no_op_progress_callback,
76 on_op_complete);
77 }
78
79 void execute(const journal::RenameEvent &_) {
80 image_ctx.operations->execute_rename(event.image_name,
81 on_op_complete);
82 }
83
84 void execute(const journal::ResizeEvent &_) {
85 image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback,
86 on_op_complete, event.op_tid);
87 }
88
89 void execute(const journal::FlattenEvent &_) {
90 image_ctx.operations->execute_flatten(no_op_progress_callback,
91 on_op_complete);
92 }
93
94 void execute(const journal::SnapLimitEvent &_) {
95 image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete);
96 }
97
98 void execute(const journal::UpdateFeaturesEvent &_) {
99 image_ctx.operations->execute_update_features(event.features, event.enabled,
100 on_op_complete, event.op_tid);
101 }
102
103 void execute(const journal::MetadataSetEvent &_) {
104 image_ctx.operations->execute_metadata_set(event.key, event.value,
105 on_op_complete);
106 }
107
108 void execute(const journal::MetadataRemoveEvent &_) {
109 image_ctx.operations->execute_metadata_remove(event.key, on_op_complete);
110 }
111
112 void finish(int r) override {
113 CephContext *cct = image_ctx.cct;
114 if (r < 0) {
115 lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl;
116 on_op_complete->complete(r);
117 return;
118 }
119
120 ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl;
121 RWLock::RLocker owner_locker(image_ctx.owner_lock);
122
123 if (image_ctx.exclusive_lock == nullptr ||
124 !image_ctx.exclusive_lock->accept_ops()) {
125 ldout(cct, 5) << ": lost exclusive lock -- skipping op" << dendl;
126 on_op_complete->complete(-ECANCELED);
127 return;
128 }
129
130 execute(event);
131 }
132 };
133
134 template <typename I>
135 struct C_RefreshIfRequired : public Context {
136 I &image_ctx;
137 Context *on_finish;
138
139 C_RefreshIfRequired(I &image_ctx, Context *on_finish)
140 : image_ctx(image_ctx), on_finish(on_finish) {
141 }
142 ~C_RefreshIfRequired() override {
143 delete on_finish;
144 }
145
146 void finish(int r) override {
147 CephContext *cct = image_ctx.cct;
148 Context *ctx = on_finish;
149 on_finish = nullptr;
150
151 if (r < 0) {
152 lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl;
153 image_ctx.op_work_queue->queue(ctx, r);
154 return;
155 }
156
157 if (image_ctx.state->is_refresh_required()) {
158 ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": "
159 << "refresh required" << dendl;
160 image_ctx.state->refresh(ctx);
161 return;
162 }
163
164 image_ctx.op_work_queue->queue(ctx, 0);
165 }
166 };
167
168 } // anonymous namespace
169
170 #undef dout_prefix
171 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
172 << __func__
173
174 template <typename I>
175 Replay<I>::Replay(I &image_ctx)
176 : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock") {
177 }
178
179 template <typename I>
180 Replay<I>::~Replay() {
181 assert(m_in_flight_aio_flush == 0);
182 assert(m_in_flight_aio_modify == 0);
183 assert(m_aio_modify_unsafe_contexts.empty());
184 assert(m_aio_modify_safe_contexts.empty());
185 assert(m_op_events.empty());
186 assert(m_in_flight_op_events == 0);
187 }
188
189 template <typename I>
190 int Replay<I>::decode(bufferlist::iterator *it, EventEntry *event_entry) {
191 try {
192 ::decode(*event_entry, *it);
193 } catch (const buffer::error &err) {
194 return -EBADMSG;
195 }
196 return 0;
197 }
198
199 template <typename I>
200 void Replay<I>::process(const EventEntry &event_entry,
201 Context *on_ready, Context *on_safe) {
202 CephContext *cct = m_image_ctx.cct;
203 ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe
204 << dendl;
205
206 on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
207
208 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
209 if (m_image_ctx.exclusive_lock == nullptr ||
210 !m_image_ctx.exclusive_lock->accept_ops()) {
211 ldout(cct, 5) << ": lost exclusive lock -- skipping event" << dendl;
212 m_image_ctx.op_work_queue->queue(on_safe, -ECANCELED);
213 on_ready->complete(0);
214 return;
215 }
216
217 boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
218 event_entry.event);
219 }
220
221 template <typename I>
222 void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
223 CephContext *cct = m_image_ctx.cct;
224 ldout(cct, 20) << dendl;
225
226 io::AioCompletion *flush_comp = nullptr;
227 on_finish = util::create_async_context_callback(
228 m_image_ctx, on_finish);
229
230 {
231 Mutex::Locker locker(m_lock);
232
233 // safely commit any remaining AIO modify operations
234 if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
235 flush_comp = create_aio_flush_completion(nullptr);
236 assert(flush_comp != nullptr);
237 }
238
239 for (auto &op_event_pair : m_op_events) {
240 OpEvent &op_event = op_event_pair.second;
241 if (cancel_ops) {
242 // cancel ops that are waiting to start (waiting for
243 // OpFinishEvent or waiting for ready)
244 if (op_event.on_start_ready == nullptr &&
245 op_event.on_op_finish_event != nullptr) {
246 Context *on_op_finish_event = nullptr;
247 std::swap(on_op_finish_event, op_event.on_op_finish_event);
248 m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
249 }
250 } else if (op_event.on_op_finish_event != nullptr) {
251 // start ops waiting for OpFinishEvent
252 Context *on_op_finish_event = nullptr;
253 std::swap(on_op_finish_event, op_event.on_op_finish_event);
254 m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
255 } else if (op_event.on_start_ready != nullptr) {
256 // waiting for op ready
257 op_event_pair.second.finish_on_ready = true;
258 }
259 }
260
261 assert(!m_shut_down);
262 m_shut_down = true;
263
264 assert(m_flush_ctx == nullptr);
265 if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
266 std::swap(m_flush_ctx, on_finish);
267 }
268 }
269
270 // execute the following outside of lock scope
271 if (flush_comp != nullptr) {
272 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
273 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
274 }
275 if (on_finish != nullptr) {
276 on_finish->complete(0);
277 }
278 }
279
280 template <typename I>
281 void Replay<I>::flush(Context *on_finish) {
282 io::AioCompletion *aio_comp;
283 {
284 Mutex::Locker locker(m_lock);
285 aio_comp = create_aio_flush_completion(
286 util::create_async_context_callback(m_image_ctx, on_finish));
287 if (aio_comp == nullptr) {
288 return;
289 }
290 }
291
292 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
293 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
294 }
295
296 template <typename I>
297 void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
298 CephContext *cct = m_image_ctx.cct;
299 ldout(cct, 20) << ": op_tid=" << op_tid << dendl;
300
301 Mutex::Locker locker(m_lock);
302 auto op_it = m_op_events.find(op_tid);
303 assert(op_it != m_op_events.end());
304
305 OpEvent &op_event = op_it->second;
306 assert(op_event.op_in_progress &&
307 op_event.on_op_finish_event == nullptr &&
308 op_event.on_finish_ready == nullptr &&
309 op_event.on_finish_safe == nullptr);
310
311 // resume processing replay events
312 Context *on_start_ready = nullptr;
313 std::swap(on_start_ready, op_event.on_start_ready);
314 on_start_ready->complete(0);
315
316 // cancel has been requested -- send error to paused state machine
317 if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
318 m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
319 return;
320 }
321
322 // resume the op state machine once the associated OpFinishEvent
323 // is processed
324 op_event.on_op_finish_event = new FunctionContext(
325 [on_resume](int r) {
326 on_resume->complete(r);
327 });
328
329 // shut down request -- don't expect OpFinishEvent
330 if (op_event.finish_on_ready) {
331 m_image_ctx.op_work_queue->queue(on_resume, 0);
332 }
333 }
334
335 template <typename I>
336 void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
337 Context *on_ready, Context *on_safe) {
338 CephContext *cct = m_image_ctx.cct;
339 ldout(cct, 20) << ": AIO discard event" << dendl;
340
341 bool flush_required;
342 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
343 io::AIO_TYPE_DISCARD,
344 &flush_required,
345 {});
346 if (aio_comp == nullptr) {
347 return;
348 }
349
350 io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
351 event.length, event.skip_partial_discard,
352 {});
353 if (flush_required) {
354 m_lock.Lock();
355 auto flush_comp = create_aio_flush_completion(nullptr);
356 m_lock.Unlock();
357
358 if (flush_comp != nullptr) {
359 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
360 }
361 }
362 }
363
364 template <typename I>
365 void Replay<I>::handle_event(const journal::AioWriteEvent &event,
366 Context *on_ready, Context *on_safe) {
367 CephContext *cct = m_image_ctx.cct;
368 ldout(cct, 20) << ": AIO write event" << dendl;
369
370 bufferlist data = event.data;
371 bool flush_required;
372 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
373 io::AIO_TYPE_WRITE,
374 &flush_required,
375 {});
376 if (aio_comp == nullptr) {
377 return;
378 }
379
380 io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
381 {{event.offset, event.length}},
382 std::move(data), 0, {});
383 if (flush_required) {
384 m_lock.Lock();
385 auto flush_comp = create_aio_flush_completion(nullptr);
386 m_lock.Unlock();
387
388 if (flush_comp != nullptr) {
389 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
390 }
391 }
392 }
393
394 template <typename I>
395 void Replay<I>::handle_event(const journal::AioFlushEvent &event,
396 Context *on_ready, Context *on_safe) {
397 CephContext *cct = m_image_ctx.cct;
398 ldout(cct, 20) << ": AIO flush event" << dendl;
399
400 io::AioCompletion *aio_comp;
401 {
402 Mutex::Locker locker(m_lock);
403 aio_comp = create_aio_flush_completion(on_safe);
404 }
405
406 if (aio_comp != nullptr) {
407 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
408 }
409 on_ready->complete(0);
410 }
411
412 template <typename I>
413 void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
414 Context *on_ready, Context *on_safe) {
415 CephContext *cct = m_image_ctx.cct;
416 ldout(cct, 20) << ": AIO writesame event" << dendl;
417
418 bufferlist data = event.data;
419 bool flush_required;
420 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
421 io::AIO_TYPE_WRITESAME,
422 &flush_required,
423 {});
424 if (aio_comp == nullptr) {
425 return;
426 }
427
428 io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp, event.offset,
429 event.length, std::move(data), 0, {});
430 if (flush_required) {
431 m_lock.Lock();
432 auto flush_comp = create_aio_flush_completion(nullptr);
433 m_lock.Unlock();
434
435 if (flush_comp != nullptr) {
436 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
437 }
438 }
439 }
440
441 template <typename I>
442 void Replay<I>::handle_event(const journal::AioCompareAndWriteEvent &event,
443 Context *on_ready, Context *on_safe) {
444 CephContext *cct = m_image_ctx.cct;
445 ldout(cct, 20) << ": AIO CompareAndWrite event" << dendl;
446
447 bufferlist cmp_data = event.cmp_data;
448 bufferlist write_data = event.write_data;
449 bool flush_required;
450 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
451 io::AIO_TYPE_COMPARE_AND_WRITE,
452 &flush_required,
453 {-EILSEQ});
454 io::ImageRequest<I>::aio_compare_and_write(&m_image_ctx, aio_comp,
455 {{event.offset, event.length}},
456 std::move(cmp_data),
457 std::move(write_data),
458 nullptr, 0, {});
459 if (flush_required) {
460 m_lock.Lock();
461 auto flush_comp = create_aio_flush_completion(nullptr);
462 m_lock.Unlock();
463
464 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
465 }
466 }
467
468 template <typename I>
469 void Replay<I>::handle_event(const journal::OpFinishEvent &event,
470 Context *on_ready, Context *on_safe) {
471 CephContext *cct = m_image_ctx.cct;
472 ldout(cct, 20) << ": Op finish event: "
473 << "op_tid=" << event.op_tid << dendl;
474
475 bool op_in_progress;
476 bool filter_ret_val;
477 Context *on_op_complete = nullptr;
478 Context *on_op_finish_event = nullptr;
479 {
480 Mutex::Locker locker(m_lock);
481 auto op_it = m_op_events.find(event.op_tid);
482 if (op_it == m_op_events.end()) {
483 ldout(cct, 10) << ": unable to locate associated op: assuming previously "
484 << "committed." << dendl;
485 on_ready->complete(0);
486 m_image_ctx.op_work_queue->queue(on_safe, 0);
487 return;
488 }
489
490 OpEvent &op_event = op_it->second;
491 assert(op_event.on_finish_safe == nullptr);
492 op_event.on_finish_ready = on_ready;
493 op_event.on_finish_safe = on_safe;
494 op_in_progress = op_event.op_in_progress;
495 std::swap(on_op_complete, op_event.on_op_complete);
496 std::swap(on_op_finish_event, op_event.on_op_finish_event);
497
498 // special errors which indicate op never started but was recorded
499 // as failed in the journal
500 filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0);
501 }
502
503 if (event.r < 0) {
504 if (op_in_progress) {
505 // bubble the error up to the in-progress op to cancel it
506 on_op_finish_event->complete(event.r);
507 } else {
508 // op hasn't been started -- bubble the error up since
509 // our image is now potentially in an inconsistent state
510 // since simple errors should have been caught before
511 // creating the op event
512 delete on_op_complete;
513 delete on_op_finish_event;
514 handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r);
515 }
516 return;
517 }
518
519 // journal recorded success -- apply the op now
520 on_op_finish_event->complete(0);
521 }
522
523 template <typename I>
524 void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
525 Context *on_ready, Context *on_safe) {
526 CephContext *cct = m_image_ctx.cct;
527 ldout(cct, 20) << ": Snap create event" << dendl;
528
529 Mutex::Locker locker(m_lock);
530 OpEvent *op_event;
531 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
532 on_safe, &op_event);
533 if (on_op_complete == nullptr) {
534 return;
535 }
536
537 // ignore errors caused due to replay
538 op_event->ignore_error_codes = {-EEXIST};
539
540 // avoid lock cycles
541 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
542 m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event,
543 on_op_complete)),
544 0);
545
546 // do not process more events until the state machine is ready
547 // since it will affect IO
548 op_event->op_in_progress = true;
549 op_event->on_start_ready = on_ready;
550 }
551
552 template <typename I>
553 void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
554 Context *on_ready, Context *on_safe) {
555 CephContext *cct = m_image_ctx.cct;
556 ldout(cct, 20) << ": Snap remove event" << dendl;
557
558 Mutex::Locker locker(m_lock);
559 OpEvent *op_event;
560 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
561 on_safe, &op_event);
562 if (on_op_complete == nullptr) {
563 return;
564 }
565
566 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
567 m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event,
568 on_op_complete));
569
570 // ignore errors caused due to replay
571 op_event->ignore_error_codes = {-ENOENT};
572
573 on_ready->complete(0);
574 }
575
576 template <typename I>
577 void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
578 Context *on_ready, Context *on_safe) {
579 CephContext *cct = m_image_ctx.cct;
580 ldout(cct, 20) << ": Snap rename event" << dendl;
581
582 Mutex::Locker locker(m_lock);
583 OpEvent *op_event;
584 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
585 on_safe, &op_event);
586 if (on_op_complete == nullptr) {
587 return;
588 }
589
590 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
591 m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event,
592 on_op_complete));
593
594 // ignore errors caused due to replay
595 op_event->ignore_error_codes = {-EEXIST};
596
597 on_ready->complete(0);
598 }
599
600 template <typename I>
601 void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
602 Context *on_ready, Context *on_safe) {
603 CephContext *cct = m_image_ctx.cct;
604 ldout(cct, 20) << ": Snap protect event" << dendl;
605
606 Mutex::Locker locker(m_lock);
607 OpEvent *op_event;
608 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
609 on_safe, &op_event);
610 if (on_op_complete == nullptr) {
611 return;
612 }
613
614 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
615 m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event,
616 on_op_complete));
617
618 // ignore errors caused due to replay
619 op_event->ignore_error_codes = {-EBUSY};
620
621 on_ready->complete(0);
622 }
623
624 template <typename I>
625 void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
626 Context *on_ready, Context *on_safe) {
627 CephContext *cct = m_image_ctx.cct;
628 ldout(cct, 20) << ": Snap unprotect event" << dendl;
629
630 Mutex::Locker locker(m_lock);
631 OpEvent *op_event;
632 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
633 on_safe, &op_event);
634 if (on_op_complete == nullptr) {
635 return;
636 }
637
638 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
639 m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx,
640 event,
641 on_op_complete));
642
643 // ignore errors recorded in the journal
644 op_event->op_finish_error_codes = {-EBUSY};
645
646 // ignore errors caused due to replay
647 op_event->ignore_error_codes = {-EINVAL};
648
649 on_ready->complete(0);
650 }
651
652 template <typename I>
653 void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
654 Context *on_ready, Context *on_safe) {
655 CephContext *cct = m_image_ctx.cct;
656 ldout(cct, 20) << ": Snap rollback start event" << dendl;
657
658 Mutex::Locker locker(m_lock);
659 OpEvent *op_event;
660 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
661 on_safe, &op_event);
662 if (on_op_complete == nullptr) {
663 return;
664 }
665
666 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
667 m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx,
668 event,
669 on_op_complete));
670
671 on_ready->complete(0);
672 }
673
674 template <typename I>
675 void Replay<I>::handle_event(const journal::RenameEvent &event,
676 Context *on_ready, Context *on_safe) {
677 CephContext *cct = m_image_ctx.cct;
678 ldout(cct, 20) << ": Rename event" << dendl;
679
680 Mutex::Locker locker(m_lock);
681 OpEvent *op_event;
682 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
683 on_safe, &op_event);
684 if (on_op_complete == nullptr) {
685 return;
686 }
687
688 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
689 m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event,
690 on_op_complete));
691
692 // ignore errors caused due to replay
693 op_event->ignore_error_codes = {-EEXIST};
694
695 on_ready->complete(0);
696 }
697
698 template <typename I>
699 void Replay<I>::handle_event(const journal::ResizeEvent &event,
700 Context *on_ready, Context *on_safe) {
701 CephContext *cct = m_image_ctx.cct;
702 ldout(cct, 20) << ": Resize start event" << dendl;
703
704 Mutex::Locker locker(m_lock);
705 OpEvent *op_event;
706 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
707 on_safe, &op_event);
708 if (on_op_complete == nullptr) {
709 return;
710 }
711
712 // avoid lock cycles
713 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
714 m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event,
715 on_op_complete)), 0);
716
717 // do not process more events until the state machine is ready
718 // since it will affect IO
719 op_event->op_in_progress = true;
720 op_event->on_start_ready = on_ready;
721 }
722
723 template <typename I>
724 void Replay<I>::handle_event(const journal::FlattenEvent &event,
725 Context *on_ready, Context *on_safe) {
726 CephContext *cct = m_image_ctx.cct;
727 ldout(cct, 20) << ": Flatten start event" << dendl;
728
729 Mutex::Locker locker(m_lock);
730 OpEvent *op_event;
731 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
732 on_safe, &op_event);
733 if (on_op_complete == nullptr) {
734 return;
735 }
736
737 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
738 m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event,
739 on_op_complete));
740
741 // ignore errors caused due to replay
742 op_event->ignore_error_codes = {-EINVAL};
743
744 on_ready->complete(0);
745 }
746
747 template <typename I>
748 void Replay<I>::handle_event(const journal::DemotePromoteEvent &event,
749 Context *on_ready, Context *on_safe) {
750 CephContext *cct = m_image_ctx.cct;
751 ldout(cct, 20) << ": Demote/Promote event" << dendl;
752 on_ready->complete(0);
753 on_safe->complete(0);
754 }
755
756 template <typename I>
757 void Replay<I>::handle_event(const journal::SnapLimitEvent &event,
758 Context *on_ready, Context *on_safe) {
759 CephContext *cct = m_image_ctx.cct;
760 ldout(cct, 20) << ": Snap limit event" << dendl;
761
762 Mutex::Locker locker(m_lock);
763 OpEvent *op_event;
764 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
765 on_safe, &op_event);
766 if (on_op_complete == nullptr) {
767 return;
768 }
769
770 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
771 m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx,
772 event,
773 on_op_complete));
774
775 on_ready->complete(0);
776 }
777
778 template <typename I>
779 void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event,
780 Context *on_ready, Context *on_safe) {
781 CephContext *cct = m_image_ctx.cct;
782 ldout(cct, 20) << ": Update features event" << dendl;
783
784 Mutex::Locker locker(m_lock);
785 OpEvent *op_event;
786 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
787 on_safe, &op_event);
788 if (on_op_complete == nullptr) {
789 return;
790 }
791
792 // avoid lock cycles
793 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
794 m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>(
795 m_image_ctx, event, on_op_complete)), 0);
796
797 // do not process more events until the state machine is ready
798 // since it will affect IO
799 op_event->op_in_progress = true;
800 op_event->on_start_ready = on_ready;
801 }
802
803 template <typename I>
804 void Replay<I>::handle_event(const journal::MetadataSetEvent &event,
805 Context *on_ready, Context *on_safe) {
806 CephContext *cct = m_image_ctx.cct;
807 ldout(cct, 20) << ": Metadata set event" << dendl;
808
809 Mutex::Locker locker(m_lock);
810 OpEvent *op_event;
811 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
812 on_safe, &op_event);
813 if (on_op_complete == nullptr) {
814 return;
815 }
816
817 on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
818 op_event->on_op_finish_event = util::create_async_context_callback(
819 m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>(
820 m_image_ctx, event, on_op_complete));
821
822 on_ready->complete(0);
823 }
824
825 template <typename I>
826 void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event,
827 Context *on_ready, Context *on_safe) {
828 CephContext *cct = m_image_ctx.cct;
829 ldout(cct, 20) << ": Metadata remove event" << dendl;
830
831 Mutex::Locker locker(m_lock);
832 OpEvent *op_event;
833 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
834 on_safe, &op_event);
835 if (on_op_complete == nullptr) {
836 return;
837 }
838
839 on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
840 op_event->on_op_finish_event = util::create_async_context_callback(
841 m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>(
842 m_image_ctx, event, on_op_complete));
843
844 // ignore errors caused due to replay
845 op_event->ignore_error_codes = {-ENOENT};
846
847 on_ready->complete(0);
848 }
849
850 template <typename I>
851 void Replay<I>::handle_event(const journal::UnknownEvent &event,
852 Context *on_ready, Context *on_safe) {
853 CephContext *cct = m_image_ctx.cct;
854 ldout(cct, 20) << ": unknown event" << dendl;
855 on_ready->complete(0);
856 on_safe->complete(0);
857 }
858
859 template <typename I>
860 void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
861 int r, std::set<int> &filters) {
862 Mutex::Locker locker(m_lock);
863 CephContext *cct = m_image_ctx.cct;
864 ldout(cct, 20) << ": on_ready=" << on_ready << ", "
865 << "on_safe=" << on_safe << ", r=" << r << dendl;
866
867 if (on_ready != nullptr) {
868 on_ready->complete(0);
869 }
870
871 if (filters.find(r) != filters.end())
872 r = 0;
873
874 if (r < 0) {
875 lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl;
876 on_safe->complete(r);
877 return;
878 }
879
880 // will be completed after next flush operation completes
881 m_aio_modify_safe_contexts.insert(on_safe);
882 }
883
884 template <typename I>
885 void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
886 Contexts &on_safe_ctxs, int r) {
887 CephContext *cct = m_image_ctx.cct;
888 ldout(cct, 20) << ": r=" << r << dendl;
889
890 if (r < 0) {
891 lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl;
892 }
893
894 Context *on_aio_ready = nullptr;
895 Context *on_flush = nullptr;
896 {
897 Mutex::Locker locker(m_lock);
898 assert(m_in_flight_aio_flush > 0);
899 assert(m_in_flight_aio_modify >= on_safe_ctxs.size());
900 --m_in_flight_aio_flush;
901 m_in_flight_aio_modify -= on_safe_ctxs.size();
902
903 std::swap(on_aio_ready, m_on_aio_ready);
904 if (m_in_flight_op_events == 0 &&
905 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
906 on_flush = m_flush_ctx;
907 }
908
909 // strip out previously failed on_safe contexts
910 for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
911 if (m_aio_modify_safe_contexts.erase(*it)) {
912 ++it;
913 } else {
914 it = on_safe_ctxs.erase(it);
915 }
916 }
917 }
918
919 if (on_aio_ready != nullptr) {
920 ldout(cct, 10) << ": resuming paused AIO" << dendl;
921 on_aio_ready->complete(0);
922 }
923
924 if (on_flush_safe != nullptr) {
925 on_safe_ctxs.push_back(on_flush_safe);
926 }
927 for (auto ctx : on_safe_ctxs) {
928 ldout(cct, 20) << ": completing safe context: " << ctx << dendl;
929 ctx->complete(r);
930 }
931
932 if (on_flush != nullptr) {
933 ldout(cct, 20) << ": completing flush context: " << on_flush << dendl;
934 on_flush->complete(r);
935 }
936 }
937
938 template <typename I>
939 Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
940 Context *on_ready,
941 Context *on_safe,
942 OpEvent **op_event) {
943 CephContext *cct = m_image_ctx.cct;
944 if (m_shut_down) {
945 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
946 on_ready->complete(0);
947 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
948 return nullptr;
949 }
950
951 assert(m_lock.is_locked());
952 if (m_op_events.count(op_tid) != 0) {
953 lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl;
954
955 // on_ready is already async but on failure invoke on_safe async
956 // as well
957 on_ready->complete(0);
958 m_image_ctx.op_work_queue->queue(on_safe, -EINVAL);
959 return nullptr;
960 }
961
962 ++m_in_flight_op_events;
963 *op_event = &m_op_events[op_tid];
964 (*op_event)->on_start_safe = on_safe;
965
966 Context *on_op_complete = new C_OpOnComplete(this, op_tid);
967 (*op_event)->on_op_complete = on_op_complete;
968 return on_op_complete;
969 }
970
971 template <typename I>
972 void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
973 CephContext *cct = m_image_ctx.cct;
974 ldout(cct, 20) << ": op_tid=" << op_tid << ", "
975 << "r=" << r << dendl;
976
977 OpEvent op_event;
978 bool shutting_down = false;
979 {
980 Mutex::Locker locker(m_lock);
981 auto op_it = m_op_events.find(op_tid);
982 assert(op_it != m_op_events.end());
983
984 op_event = std::move(op_it->second);
985 m_op_events.erase(op_it);
986
987 if (m_shut_down) {
988 assert(m_flush_ctx != nullptr);
989 shutting_down = true;
990 }
991 }
992
993 assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
994 if (op_event.on_start_ready != nullptr) {
995 // blocking op event failed before it became ready
996 assert(op_event.on_finish_ready == nullptr &&
997 op_event.on_finish_safe == nullptr);
998
999 op_event.on_start_ready->complete(0);
1000 } else {
1001 // event kicked off by OpFinishEvent
1002 assert((op_event.on_finish_ready != nullptr &&
1003 op_event.on_finish_safe != nullptr) || shutting_down);
1004 }
1005
1006 if (op_event.on_op_finish_event != nullptr) {
1007 op_event.on_op_finish_event->complete(r);
1008 }
1009
1010 if (op_event.on_finish_ready != nullptr) {
1011 op_event.on_finish_ready->complete(0);
1012 }
1013
1014 // filter out errors caused by replay of the same op
1015 if (r < 0 && op_event.ignore_error_codes.count(r) != 0) {
1016 r = 0;
1017 }
1018
1019 op_event.on_start_safe->complete(r);
1020 if (op_event.on_finish_safe != nullptr) {
1021 op_event.on_finish_safe->complete(r);
1022 }
1023
1024 // shut down request might have occurred while lock was
1025 // dropped -- handle if pending
1026 Context *on_flush = nullptr;
1027 {
1028 Mutex::Locker locker(m_lock);
1029 assert(m_in_flight_op_events > 0);
1030 --m_in_flight_op_events;
1031 if (m_in_flight_op_events == 0 &&
1032 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
1033 on_flush = m_flush_ctx;
1034 }
1035 }
1036 if (on_flush != nullptr) {
1037 m_image_ctx.op_work_queue->queue(on_flush, 0);
1038 }
1039 }
1040
1041 template <typename I>
1042 io::AioCompletion *
1043 Replay<I>::create_aio_modify_completion(Context *on_ready,
1044 Context *on_safe,
1045 io::aio_type_t aio_type,
1046 bool *flush_required,
1047 std::set<int> &&filters) {
1048 Mutex::Locker locker(m_lock);
1049 CephContext *cct = m_image_ctx.cct;
1050 assert(m_on_aio_ready == nullptr);
1051
1052 if (m_shut_down) {
1053 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1054 on_ready->complete(0);
1055 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1056 return nullptr;
1057 }
1058
1059 ++m_in_flight_aio_modify;
1060 m_aio_modify_unsafe_contexts.push_back(on_safe);
1061
1062 // FLUSH if we hit the low-water mark -- on_safe contexts are
1063 // completed by flushes-only so that we don't move the journal
1064 // commit position until safely on-disk
1065
1066 *flush_required = (m_aio_modify_unsafe_contexts.size() ==
1067 IN_FLIGHT_IO_LOW_WATER_MARK);
1068 if (*flush_required) {
1069 ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
1070 << dendl;
1071 }
1072
1073 // READY for more events if:
1074 // * not at high-water mark for IO
1075 // * in-flight ops are at a consistent point (snap create has IO flushed,
1076 // shrink has adjusted clip boundary, etc) -- should have already been
1077 // flagged not-ready
1078 if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
1079 ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay"
1080 << dendl;
1081 assert(m_on_aio_ready == nullptr);
1082 std::swap(m_on_aio_ready, on_ready);
1083 }
1084
1085 // when the modification is ACKed by librbd, we can process the next
1086 // event. when flushed, the completion of the next flush will fire the
1087 // on_safe callback
1088 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1089 new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters)),
1090 util::get_image_ctx(&m_image_ctx), aio_type);
1091 return aio_comp;
1092 }
1093
1094 template <typename I>
1095 io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
1096 assert(m_lock.is_locked());
1097
1098 CephContext *cct = m_image_ctx.cct;
1099 if (m_shut_down) {
1100 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1101 if (on_safe != nullptr) {
1102 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1103 }
1104 return nullptr;
1105 }
1106
1107 ++m_in_flight_aio_flush;
1108
1109 // associate all prior write/discard ops to this flush request
1110 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1111 new C_AioFlushComplete(this, on_safe,
1112 std::move(m_aio_modify_unsafe_contexts)),
1113 util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH);
1114 m_aio_modify_unsafe_contexts.clear();
1115 return aio_comp;
1116 }
1117
1118 } // namespace journal
1119 } // namespace librbd
1120
1121 template class librbd::journal::Replay<librbd::ImageCtx>;