]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/journal/Replay.cc
update sources to v12.1.0
[ceph.git] / ceph / src / librbd / journal / Replay.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/journal/Replay.h"
5#include "common/dout.h"
6#include "common/errno.h"
7#include "common/WorkQueue.h"
31f18b77 8#include "librbd/ExclusiveLock.h"
7c673cae
FG
9#include "librbd/ImageCtx.h"
10#include "librbd/ImageState.h"
11#include "librbd/internal.h"
12#include "librbd/Operations.h"
13#include "librbd/Utils.h"
14#include "librbd/io/AioCompletion.h"
15#include "librbd/io/ImageRequest.h"
16
17#define dout_subsys ceph_subsys_rbd
18#undef dout_prefix
19#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
20
21namespace librbd {
22namespace journal {
23
24namespace {
25
26static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
28
29static NoOpProgressContext no_op_progress_callback;
30
31template <typename I, typename E>
32struct ExecuteOp : public Context {
33 I &image_ctx;
34 E event;
35 Context *on_op_complete;
36
37 ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete)
38 : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) {
39 }
40
41 void execute(const journal::SnapCreateEvent &_) {
42 image_ctx.operations->execute_snap_create(event.snap_namespace,
43 event.snap_name,
44 on_op_complete,
45 event.op_tid, false);
46 }
47
48 void execute(const journal::SnapRemoveEvent &_) {
49 image_ctx.operations->execute_snap_remove(event.snap_namespace,
50 event.snap_name,
51 on_op_complete);
52 }
53
54 void execute(const journal::SnapRenameEvent &_) {
55 image_ctx.operations->execute_snap_rename(event.snap_id,
56 event.dst_snap_name,
57 on_op_complete);
58 }
59
60 void execute(const journal::SnapProtectEvent &_) {
61 image_ctx.operations->execute_snap_protect(event.snap_namespace,
62 event.snap_name,
63 on_op_complete);
64 }
65
66 void execute(const journal::SnapUnprotectEvent &_) {
67 image_ctx.operations->execute_snap_unprotect(event.snap_namespace,
68 event.snap_name,
69 on_op_complete);
70 }
71
72 void execute(const journal::SnapRollbackEvent &_) {
73 image_ctx.operations->execute_snap_rollback(event.snap_namespace,
74 event.snap_name,
75 no_op_progress_callback,
76 on_op_complete);
77 }
78
79 void execute(const journal::RenameEvent &_) {
80 image_ctx.operations->execute_rename(event.image_name,
81 on_op_complete);
82 }
83
84 void execute(const journal::ResizeEvent &_) {
85 image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback,
86 on_op_complete, event.op_tid);
87 }
88
89 void execute(const journal::FlattenEvent &_) {
90 image_ctx.operations->execute_flatten(no_op_progress_callback,
91 on_op_complete);
92 }
93
94 void execute(const journal::SnapLimitEvent &_) {
95 image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete);
96 }
97
98 void execute(const journal::UpdateFeaturesEvent &_) {
99 image_ctx.operations->execute_update_features(event.features, event.enabled,
100 on_op_complete, event.op_tid);
101 }
102
103 void execute(const journal::MetadataSetEvent &_) {
104 image_ctx.operations->execute_metadata_set(event.key, event.value,
105 on_op_complete);
106 }
107
108 void execute(const journal::MetadataRemoveEvent &_) {
109 image_ctx.operations->execute_metadata_remove(event.key, on_op_complete);
110 }
111
112 void finish(int r) override {
113 CephContext *cct = image_ctx.cct;
114 if (r < 0) {
115 lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl;
116 on_op_complete->complete(r);
117 return;
118 }
119
120 ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl;
121 RWLock::RLocker owner_locker(image_ctx.owner_lock);
31f18b77
FG
122
123 if (image_ctx.exclusive_lock == nullptr ||
124 !image_ctx.exclusive_lock->accept_ops()) {
125 ldout(cct, 5) << ": lost exclusive lock -- skipping op" << dendl;
126 on_op_complete->complete(-ECANCELED);
127 return;
128 }
129
7c673cae
FG
130 execute(event);
131 }
132};
133
134template <typename I>
135struct C_RefreshIfRequired : public Context {
136 I &image_ctx;
137 Context *on_finish;
138
139 C_RefreshIfRequired(I &image_ctx, Context *on_finish)
140 : image_ctx(image_ctx), on_finish(on_finish) {
141 }
142 ~C_RefreshIfRequired() override {
143 delete on_finish;
144 }
145
146 void finish(int r) override {
147 CephContext *cct = image_ctx.cct;
148 Context *ctx = on_finish;
149 on_finish = nullptr;
150
151 if (r < 0) {
152 lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl;
153 image_ctx.op_work_queue->queue(ctx, r);
154 return;
155 }
156
157 if (image_ctx.state->is_refresh_required()) {
158 ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": "
159 << "refresh required" << dendl;
160 image_ctx.state->refresh(ctx);
161 return;
162 }
163
164 image_ctx.op_work_queue->queue(ctx, 0);
165 }
166};
167
168} // anonymous namespace
169
170#undef dout_prefix
171#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
172 << __func__
173
174template <typename I>
175Replay<I>::Replay(I &image_ctx)
176 : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock") {
177}
178
179template <typename I>
180Replay<I>::~Replay() {
181 assert(m_in_flight_aio_flush == 0);
182 assert(m_in_flight_aio_modify == 0);
183 assert(m_aio_modify_unsafe_contexts.empty());
184 assert(m_aio_modify_safe_contexts.empty());
185 assert(m_op_events.empty());
186 assert(m_in_flight_op_events == 0);
187}
188
189template <typename I>
190int Replay<I>::decode(bufferlist::iterator *it, EventEntry *event_entry) {
191 try {
192 ::decode(*event_entry, *it);
193 } catch (const buffer::error &err) {
194 return -EBADMSG;
195 }
196 return 0;
197}
198
199template <typename I>
200void Replay<I>::process(const EventEntry &event_entry,
201 Context *on_ready, Context *on_safe) {
202 CephContext *cct = m_image_ctx.cct;
203 ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe
204 << dendl;
205
206 on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
207
208 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
31f18b77
FG
209 if (m_image_ctx.exclusive_lock == nullptr ||
210 !m_image_ctx.exclusive_lock->accept_ops()) {
211 ldout(cct, 5) << ": lost exclusive lock -- skipping event" << dendl;
212 m_image_ctx.op_work_queue->queue(on_safe, -ECANCELED);
213 on_ready->complete(0);
214 return;
215 }
216
7c673cae
FG
217 boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
218 event_entry.event);
219}
220
221template <typename I>
222void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
223 CephContext *cct = m_image_ctx.cct;
224 ldout(cct, 20) << dendl;
225
226 io::AioCompletion *flush_comp = nullptr;
227 on_finish = util::create_async_context_callback(
228 m_image_ctx, on_finish);
229
230 {
231 Mutex::Locker locker(m_lock);
232
233 // safely commit any remaining AIO modify operations
234 if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
235 flush_comp = create_aio_flush_completion(nullptr);
31f18b77 236 assert(flush_comp != nullptr);
7c673cae
FG
237 }
238
239 for (auto &op_event_pair : m_op_events) {
240 OpEvent &op_event = op_event_pair.second;
241 if (cancel_ops) {
242 // cancel ops that are waiting to start (waiting for
243 // OpFinishEvent or waiting for ready)
244 if (op_event.on_start_ready == nullptr &&
245 op_event.on_op_finish_event != nullptr) {
246 Context *on_op_finish_event = nullptr;
247 std::swap(on_op_finish_event, op_event.on_op_finish_event);
248 m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
249 }
250 } else if (op_event.on_op_finish_event != nullptr) {
251 // start ops waiting for OpFinishEvent
252 Context *on_op_finish_event = nullptr;
253 std::swap(on_op_finish_event, op_event.on_op_finish_event);
254 m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
255 } else if (op_event.on_start_ready != nullptr) {
256 // waiting for op ready
257 op_event_pair.second.finish_on_ready = true;
258 }
259 }
260
31f18b77
FG
261 assert(!m_shut_down);
262 m_shut_down = true;
263
7c673cae
FG
264 assert(m_flush_ctx == nullptr);
265 if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
266 std::swap(m_flush_ctx, on_finish);
267 }
268 }
269
270 // execute the following outside of lock scope
271 if (flush_comp != nullptr) {
272 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
31f18b77 273 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
7c673cae
FG
274 }
275 if (on_finish != nullptr) {
276 on_finish->complete(0);
277 }
278}
279
280template <typename I>
281void Replay<I>::flush(Context *on_finish) {
282 io::AioCompletion *aio_comp;
283 {
284 Mutex::Locker locker(m_lock);
285 aio_comp = create_aio_flush_completion(
286 util::create_async_context_callback(m_image_ctx, on_finish));
31f18b77
FG
287 if (aio_comp == nullptr) {
288 return;
289 }
7c673cae
FG
290 }
291
292 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
31f18b77 293 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
7c673cae
FG
294}
295
296template <typename I>
297void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
298 CephContext *cct = m_image_ctx.cct;
299 ldout(cct, 20) << ": op_tid=" << op_tid << dendl;
300
301 Mutex::Locker locker(m_lock);
302 auto op_it = m_op_events.find(op_tid);
303 assert(op_it != m_op_events.end());
304
305 OpEvent &op_event = op_it->second;
306 assert(op_event.op_in_progress &&
307 op_event.on_op_finish_event == nullptr &&
308 op_event.on_finish_ready == nullptr &&
309 op_event.on_finish_safe == nullptr);
310
311 // resume processing replay events
312 Context *on_start_ready = nullptr;
313 std::swap(on_start_ready, op_event.on_start_ready);
314 on_start_ready->complete(0);
315
316 // cancel has been requested -- send error to paused state machine
317 if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
318 m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
319 return;
320 }
321
322 // resume the op state machine once the associated OpFinishEvent
323 // is processed
324 op_event.on_op_finish_event = new FunctionContext(
325 [on_resume](int r) {
326 on_resume->complete(r);
327 });
328
329 // shut down request -- don't expect OpFinishEvent
330 if (op_event.finish_on_ready) {
331 m_image_ctx.op_work_queue->queue(on_resume, 0);
332 }
333}
334
335template <typename I>
336void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
337 Context *on_ready, Context *on_safe) {
338 CephContext *cct = m_image_ctx.cct;
339 ldout(cct, 20) << ": AIO discard event" << dendl;
340
341 bool flush_required;
342 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
343 io::AIO_TYPE_DISCARD,
344 &flush_required);
31f18b77
FG
345 if (aio_comp == nullptr) {
346 return;
347 }
348
7c673cae 349 io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
31f18b77
FG
350 event.length, event.skip_partial_discard,
351 {});
7c673cae
FG
352 if (flush_required) {
353 m_lock.Lock();
354 auto flush_comp = create_aio_flush_completion(nullptr);
355 m_lock.Unlock();
356
31f18b77
FG
357 if (flush_comp != nullptr) {
358 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
359 }
7c673cae
FG
360 }
361}
362
363template <typename I>
364void Replay<I>::handle_event(const journal::AioWriteEvent &event,
365 Context *on_ready, Context *on_safe) {
366 CephContext *cct = m_image_ctx.cct;
367 ldout(cct, 20) << ": AIO write event" << dendl;
368
369 bufferlist data = event.data;
370 bool flush_required;
371 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
372 io::AIO_TYPE_WRITE,
373 &flush_required);
31f18b77
FG
374 if (aio_comp == nullptr) {
375 return;
376 }
377
7c673cae
FG
378 io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
379 {{event.offset, event.length}},
31f18b77 380 std::move(data), 0, {});
7c673cae
FG
381 if (flush_required) {
382 m_lock.Lock();
383 auto flush_comp = create_aio_flush_completion(nullptr);
384 m_lock.Unlock();
385
31f18b77
FG
386 if (flush_comp != nullptr) {
387 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
388 }
7c673cae
FG
389 }
390}
391
392template <typename I>
393void Replay<I>::handle_event(const journal::AioFlushEvent &event,
394 Context *on_ready, Context *on_safe) {
395 CephContext *cct = m_image_ctx.cct;
396 ldout(cct, 20) << ": AIO flush event" << dendl;
397
398 io::AioCompletion *aio_comp;
399 {
400 Mutex::Locker locker(m_lock);
401 aio_comp = create_aio_flush_completion(on_safe);
402 }
7c673cae 403
31f18b77
FG
404 if (aio_comp != nullptr) {
405 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
406 }
7c673cae
FG
407 on_ready->complete(0);
408}
409
410template <typename I>
411void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
412 Context *on_ready, Context *on_safe) {
413 CephContext *cct = m_image_ctx.cct;
414 ldout(cct, 20) << ": AIO writesame event" << dendl;
415
416 bufferlist data = event.data;
417 bool flush_required;
418 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
419 io::AIO_TYPE_WRITESAME,
420 &flush_required);
31f18b77
FG
421 if (aio_comp == nullptr) {
422 return;
423 }
424
7c673cae 425 io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp, event.offset,
31f18b77 426 event.length, std::move(data), 0, {});
7c673cae
FG
427 if (flush_required) {
428 m_lock.Lock();
429 auto flush_comp = create_aio_flush_completion(nullptr);
430 m_lock.Unlock();
431
31f18b77
FG
432 if (flush_comp != nullptr) {
433 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
434 }
7c673cae
FG
435 }
436}
437
438template <typename I>
439void Replay<I>::handle_event(const journal::OpFinishEvent &event,
440 Context *on_ready, Context *on_safe) {
441 CephContext *cct = m_image_ctx.cct;
442 ldout(cct, 20) << ": Op finish event: "
443 << "op_tid=" << event.op_tid << dendl;
444
445 bool op_in_progress;
446 bool filter_ret_val;
447 Context *on_op_complete = nullptr;
448 Context *on_op_finish_event = nullptr;
449 {
450 Mutex::Locker locker(m_lock);
451 auto op_it = m_op_events.find(event.op_tid);
452 if (op_it == m_op_events.end()) {
453 ldout(cct, 10) << ": unable to locate associated op: assuming previously "
454 << "committed." << dendl;
455 on_ready->complete(0);
456 m_image_ctx.op_work_queue->queue(on_safe, 0);
457 return;
458 }
459
460 OpEvent &op_event = op_it->second;
461 assert(op_event.on_finish_safe == nullptr);
462 op_event.on_finish_ready = on_ready;
463 op_event.on_finish_safe = on_safe;
464 op_in_progress = op_event.op_in_progress;
465 std::swap(on_op_complete, op_event.on_op_complete);
466 std::swap(on_op_finish_event, op_event.on_op_finish_event);
467
468 // special errors which indicate op never started but was recorded
469 // as failed in the journal
470 filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0);
471 }
472
473 if (event.r < 0) {
474 if (op_in_progress) {
475 // bubble the error up to the in-progress op to cancel it
476 on_op_finish_event->complete(event.r);
477 } else {
478 // op hasn't been started -- bubble the error up since
479 // our image is now potentially in an inconsistent state
480 // since simple errors should have been caught before
481 // creating the op event
482 delete on_op_complete;
483 delete on_op_finish_event;
484 handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r);
485 }
486 return;
487 }
488
489 // journal recorded success -- apply the op now
490 on_op_finish_event->complete(0);
491}
492
493template <typename I>
494void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
495 Context *on_ready, Context *on_safe) {
496 CephContext *cct = m_image_ctx.cct;
497 ldout(cct, 20) << ": Snap create event" << dendl;
498
499 Mutex::Locker locker(m_lock);
500 OpEvent *op_event;
501 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
502 on_safe, &op_event);
503 if (on_op_complete == nullptr) {
504 return;
505 }
506
507 // ignore errors caused due to replay
508 op_event->ignore_error_codes = {-EEXIST};
509
510 // avoid lock cycles
511 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
512 m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event,
513 on_op_complete)),
514 0);
515
516 // do not process more events until the state machine is ready
517 // since it will affect IO
518 op_event->op_in_progress = true;
519 op_event->on_start_ready = on_ready;
520}
521
522template <typename I>
523void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
524 Context *on_ready, Context *on_safe) {
525 CephContext *cct = m_image_ctx.cct;
526 ldout(cct, 20) << ": Snap remove event" << dendl;
527
528 Mutex::Locker locker(m_lock);
529 OpEvent *op_event;
530 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
531 on_safe, &op_event);
532 if (on_op_complete == nullptr) {
533 return;
534 }
535
536 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
537 m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event,
538 on_op_complete));
539
540 // ignore errors caused due to replay
541 op_event->ignore_error_codes = {-ENOENT};
542
543 on_ready->complete(0);
544}
545
546template <typename I>
547void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
548 Context *on_ready, Context *on_safe) {
549 CephContext *cct = m_image_ctx.cct;
550 ldout(cct, 20) << ": Snap rename event" << dendl;
551
552 Mutex::Locker locker(m_lock);
553 OpEvent *op_event;
554 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
555 on_safe, &op_event);
556 if (on_op_complete == nullptr) {
557 return;
558 }
559
560 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
561 m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event,
562 on_op_complete));
563
564 // ignore errors caused due to replay
565 op_event->ignore_error_codes = {-EEXIST};
566
567 on_ready->complete(0);
568}
569
570template <typename I>
571void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
572 Context *on_ready, Context *on_safe) {
573 CephContext *cct = m_image_ctx.cct;
574 ldout(cct, 20) << ": Snap protect event" << dendl;
575
576 Mutex::Locker locker(m_lock);
577 OpEvent *op_event;
578 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
579 on_safe, &op_event);
580 if (on_op_complete == nullptr) {
581 return;
582 }
583
584 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
585 m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event,
586 on_op_complete));
587
588 // ignore errors caused due to replay
589 op_event->ignore_error_codes = {-EBUSY};
590
591 on_ready->complete(0);
592}
593
594template <typename I>
595void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
596 Context *on_ready, Context *on_safe) {
597 CephContext *cct = m_image_ctx.cct;
598 ldout(cct, 20) << ": Snap unprotect event" << dendl;
599
600 Mutex::Locker locker(m_lock);
601 OpEvent *op_event;
602 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
603 on_safe, &op_event);
604 if (on_op_complete == nullptr) {
605 return;
606 }
607
608 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
609 m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx,
610 event,
611 on_op_complete));
612
613 // ignore errors recorded in the journal
614 op_event->op_finish_error_codes = {-EBUSY};
615
616 // ignore errors caused due to replay
617 op_event->ignore_error_codes = {-EINVAL};
618
619 on_ready->complete(0);
620}
621
622template <typename I>
623void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
624 Context *on_ready, Context *on_safe) {
625 CephContext *cct = m_image_ctx.cct;
626 ldout(cct, 20) << ": Snap rollback start event" << dendl;
627
628 Mutex::Locker locker(m_lock);
629 OpEvent *op_event;
630 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
631 on_safe, &op_event);
632 if (on_op_complete == nullptr) {
633 return;
634 }
635
636 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
637 m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx,
638 event,
639 on_op_complete));
640
641 on_ready->complete(0);
642}
643
644template <typename I>
645void Replay<I>::handle_event(const journal::RenameEvent &event,
646 Context *on_ready, Context *on_safe) {
647 CephContext *cct = m_image_ctx.cct;
648 ldout(cct, 20) << ": Rename event" << dendl;
649
650 Mutex::Locker locker(m_lock);
651 OpEvent *op_event;
652 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
653 on_safe, &op_event);
654 if (on_op_complete == nullptr) {
655 return;
656 }
657
658 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
659 m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event,
660 on_op_complete));
661
662 // ignore errors caused due to replay
663 op_event->ignore_error_codes = {-EEXIST};
664
665 on_ready->complete(0);
666}
667
668template <typename I>
669void Replay<I>::handle_event(const journal::ResizeEvent &event,
670 Context *on_ready, Context *on_safe) {
671 CephContext *cct = m_image_ctx.cct;
672 ldout(cct, 20) << ": Resize start event" << dendl;
673
674 Mutex::Locker locker(m_lock);
675 OpEvent *op_event;
676 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
677 on_safe, &op_event);
678 if (on_op_complete == nullptr) {
679 return;
680 }
681
682 // avoid lock cycles
683 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
684 m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event,
685 on_op_complete)), 0);
686
687 // do not process more events until the state machine is ready
688 // since it will affect IO
689 op_event->op_in_progress = true;
690 op_event->on_start_ready = on_ready;
691}
692
693template <typename I>
694void Replay<I>::handle_event(const journal::FlattenEvent &event,
695 Context *on_ready, Context *on_safe) {
696 CephContext *cct = m_image_ctx.cct;
697 ldout(cct, 20) << ": Flatten start event" << dendl;
698
699 Mutex::Locker locker(m_lock);
700 OpEvent *op_event;
701 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
702 on_safe, &op_event);
703 if (on_op_complete == nullptr) {
704 return;
705 }
706
707 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
708 m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event,
709 on_op_complete));
710
711 // ignore errors caused due to replay
712 op_event->ignore_error_codes = {-EINVAL};
713
714 on_ready->complete(0);
715}
716
717template <typename I>
718void Replay<I>::handle_event(const journal::DemotePromoteEvent &event,
719 Context *on_ready, Context *on_safe) {
720 CephContext *cct = m_image_ctx.cct;
721 ldout(cct, 20) << ": Demote/Promote event" << dendl;
722 on_ready->complete(0);
723 on_safe->complete(0);
724}
725
726template <typename I>
727void Replay<I>::handle_event(const journal::SnapLimitEvent &event,
728 Context *on_ready, Context *on_safe) {
729 CephContext *cct = m_image_ctx.cct;
730 ldout(cct, 20) << ": Snap limit event" << dendl;
731
732 Mutex::Locker locker(m_lock);
733 OpEvent *op_event;
734 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
735 on_safe, &op_event);
736 if (on_op_complete == nullptr) {
737 return;
738 }
739
740 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
741 m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx,
742 event,
743 on_op_complete));
744
745 on_ready->complete(0);
746}
747
748template <typename I>
749void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event,
750 Context *on_ready, Context *on_safe) {
751 CephContext *cct = m_image_ctx.cct;
752 ldout(cct, 20) << ": Update features event" << dendl;
753
754 Mutex::Locker locker(m_lock);
755 OpEvent *op_event;
756 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
757 on_safe, &op_event);
758 if (on_op_complete == nullptr) {
759 return;
760 }
761
762 // avoid lock cycles
763 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
764 m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>(
765 m_image_ctx, event, on_op_complete)), 0);
766
767 // do not process more events until the state machine is ready
768 // since it will affect IO
769 op_event->op_in_progress = true;
770 op_event->on_start_ready = on_ready;
771}
772
773template <typename I>
774void Replay<I>::handle_event(const journal::MetadataSetEvent &event,
775 Context *on_ready, Context *on_safe) {
776 CephContext *cct = m_image_ctx.cct;
777 ldout(cct, 20) << ": Metadata set event" << dendl;
778
779 Mutex::Locker locker(m_lock);
780 OpEvent *op_event;
781 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
782 on_safe, &op_event);
783 if (on_op_complete == nullptr) {
784 return;
785 }
786
787 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
788 m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>(
789 m_image_ctx, event, on_op_complete));
790
791 on_ready->complete(0);
792}
793
794template <typename I>
795void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event,
796 Context *on_ready, Context *on_safe) {
797 CephContext *cct = m_image_ctx.cct;
798 ldout(cct, 20) << ": Metadata remove event" << dendl;
799
800 Mutex::Locker locker(m_lock);
801 OpEvent *op_event;
802 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
803 on_safe, &op_event);
804 if (on_op_complete == nullptr) {
805 return;
806 }
807
808 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
809 m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>(
810 m_image_ctx, event, on_op_complete));
811
812 // ignore errors caused due to replay
813 op_event->ignore_error_codes = {-ENOENT};
814
815 on_ready->complete(0);
816}
817
818template <typename I>
819void Replay<I>::handle_event(const journal::UnknownEvent &event,
820 Context *on_ready, Context *on_safe) {
821 CephContext *cct = m_image_ctx.cct;
822 ldout(cct, 20) << ": unknown event" << dendl;
823 on_ready->complete(0);
824 on_safe->complete(0);
825}
826
827template <typename I>
828void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
829 int r) {
830 Mutex::Locker locker(m_lock);
831 CephContext *cct = m_image_ctx.cct;
832 ldout(cct, 20) << ": on_ready=" << on_ready << ", "
833 << "on_safe=" << on_safe << ", r=" << r << dendl;
834
835 if (on_ready != nullptr) {
836 on_ready->complete(0);
837 }
838 if (r < 0) {
839 lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl;
840 on_safe->complete(r);
841 return;
842 }
843
844 // will be completed after next flush operation completes
845 m_aio_modify_safe_contexts.insert(on_safe);
846}
847
848template <typename I>
849void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
850 Contexts &on_safe_ctxs, int r) {
851 CephContext *cct = m_image_ctx.cct;
852 ldout(cct, 20) << ": r=" << r << dendl;
853
854 if (r < 0) {
855 lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl;
856 }
857
858 Context *on_aio_ready = nullptr;
859 Context *on_flush = nullptr;
860 {
861 Mutex::Locker locker(m_lock);
862 assert(m_in_flight_aio_flush > 0);
863 assert(m_in_flight_aio_modify >= on_safe_ctxs.size());
864 --m_in_flight_aio_flush;
865 m_in_flight_aio_modify -= on_safe_ctxs.size();
866
867 std::swap(on_aio_ready, m_on_aio_ready);
868 if (m_in_flight_op_events == 0 &&
869 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
870 on_flush = m_flush_ctx;
871 }
872
873 // strip out previously failed on_safe contexts
874 for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
875 if (m_aio_modify_safe_contexts.erase(*it)) {
876 ++it;
877 } else {
878 it = on_safe_ctxs.erase(it);
879 }
880 }
881 }
882
883 if (on_aio_ready != nullptr) {
884 ldout(cct, 10) << ": resuming paused AIO" << dendl;
885 on_aio_ready->complete(0);
886 }
887
888 if (on_flush_safe != nullptr) {
889 on_safe_ctxs.push_back(on_flush_safe);
890 }
891 for (auto ctx : on_safe_ctxs) {
892 ldout(cct, 20) << ": completing safe context: " << ctx << dendl;
893 ctx->complete(r);
894 }
895
896 if (on_flush != nullptr) {
897 ldout(cct, 20) << ": completing flush context: " << on_flush << dendl;
898 on_flush->complete(r);
899 }
900}
901
902template <typename I>
903Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
904 Context *on_ready,
905 Context *on_safe,
906 OpEvent **op_event) {
907 CephContext *cct = m_image_ctx.cct;
31f18b77
FG
908 if (m_shut_down) {
909 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
910 on_ready->complete(0);
911 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
912 return nullptr;
913 }
7c673cae
FG
914
915 assert(m_lock.is_locked());
916 if (m_op_events.count(op_tid) != 0) {
917 lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl;
918
919 // on_ready is already async but on failure invoke on_safe async
920 // as well
921 on_ready->complete(0);
922 m_image_ctx.op_work_queue->queue(on_safe, -EINVAL);
923 return nullptr;
924 }
925
926 ++m_in_flight_op_events;
927 *op_event = &m_op_events[op_tid];
928 (*op_event)->on_start_safe = on_safe;
929
930 Context *on_op_complete = new C_OpOnComplete(this, op_tid);
931 (*op_event)->on_op_complete = on_op_complete;
932 return on_op_complete;
933}
934
935template <typename I>
936void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
937 CephContext *cct = m_image_ctx.cct;
938 ldout(cct, 20) << ": op_tid=" << op_tid << ", "
939 << "r=" << r << dendl;
940
941 OpEvent op_event;
942 bool shutting_down = false;
943 {
944 Mutex::Locker locker(m_lock);
945 auto op_it = m_op_events.find(op_tid);
946 assert(op_it != m_op_events.end());
947
948 op_event = std::move(op_it->second);
949 m_op_events.erase(op_it);
950
31f18b77
FG
951 if (m_shut_down) {
952 assert(m_flush_ctx != nullptr);
953 shutting_down = true;
954 }
7c673cae
FG
955 }
956
957 assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
958 if (op_event.on_start_ready != nullptr) {
959 // blocking op event failed before it became ready
960 assert(op_event.on_finish_ready == nullptr &&
961 op_event.on_finish_safe == nullptr);
962
963 op_event.on_start_ready->complete(0);
964 } else {
965 // event kicked off by OpFinishEvent
966 assert((op_event.on_finish_ready != nullptr &&
967 op_event.on_finish_safe != nullptr) || shutting_down);
968 }
969
970 if (op_event.on_op_finish_event != nullptr) {
971 op_event.on_op_finish_event->complete(r);
972 }
973
974 if (op_event.on_finish_ready != nullptr) {
975 op_event.on_finish_ready->complete(0);
976 }
977
978 // filter out errors caused by replay of the same op
979 if (r < 0 && op_event.ignore_error_codes.count(r) != 0) {
980 r = 0;
981 }
982
983 op_event.on_start_safe->complete(r);
984 if (op_event.on_finish_safe != nullptr) {
985 op_event.on_finish_safe->complete(r);
986 }
987
988 // shut down request might have occurred while lock was
989 // dropped -- handle if pending
990 Context *on_flush = nullptr;
991 {
992 Mutex::Locker locker(m_lock);
993 assert(m_in_flight_op_events > 0);
994 --m_in_flight_op_events;
995 if (m_in_flight_op_events == 0 &&
996 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
997 on_flush = m_flush_ctx;
998 }
999 }
1000 if (on_flush != nullptr) {
1001 m_image_ctx.op_work_queue->queue(on_flush, 0);
1002 }
1003}
1004
1005template <typename I>
1006io::AioCompletion *
1007Replay<I>::create_aio_modify_completion(Context *on_ready, Context *on_safe,
1008 io::aio_type_t aio_type,
1009 bool *flush_required) {
1010 Mutex::Locker locker(m_lock);
1011 CephContext *cct = m_image_ctx.cct;
1012 assert(m_on_aio_ready == nullptr);
1013
31f18b77
FG
1014 if (m_shut_down) {
1015 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1016 on_ready->complete(0);
1017 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1018 return nullptr;
1019 }
1020
7c673cae
FG
1021 ++m_in_flight_aio_modify;
1022 m_aio_modify_unsafe_contexts.push_back(on_safe);
1023
1024 // FLUSH if we hit the low-water mark -- on_safe contexts are
1025 // completed by flushes-only so that we don't move the journal
1026 // commit position until safely on-disk
1027
1028 *flush_required = (m_aio_modify_unsafe_contexts.size() ==
1029 IN_FLIGHT_IO_LOW_WATER_MARK);
1030 if (*flush_required) {
1031 ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
1032 << dendl;
1033 }
1034
1035 // READY for more events if:
1036 // * not at high-water mark for IO
1037 // * in-flight ops are at a consistent point (snap create has IO flushed,
1038 // shrink has adjusted clip boundary, etc) -- should have already been
1039 // flagged not-ready
1040 if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
1041 ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay"
1042 << dendl;
1043 assert(m_on_aio_ready == nullptr);
1044 std::swap(m_on_aio_ready, on_ready);
1045 }
1046
1047 // when the modification is ACKed by librbd, we can process the next
1048 // event. when flushed, the completion of the next flush will fire the
1049 // on_safe callback
1050 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1051 new C_AioModifyComplete(this, on_ready, on_safe),
1052 util::get_image_ctx(&m_image_ctx), aio_type);
1053 return aio_comp;
1054}
1055
1056template <typename I>
1057io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
1058 assert(m_lock.is_locked());
1059
31f18b77
FG
1060 CephContext *cct = m_image_ctx.cct;
1061 if (m_shut_down) {
1062 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1063 if (on_safe != nullptr) {
1064 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1065 }
1066 return nullptr;
1067 }
1068
7c673cae
FG
1069 ++m_in_flight_aio_flush;
1070
1071 // associate all prior write/discard ops to this flush request
1072 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1073 new C_AioFlushComplete(this, on_safe,
1074 std::move(m_aio_modify_unsafe_contexts)),
1075 util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH);
1076 m_aio_modify_unsafe_contexts.clear();
1077 return aio_comp;
1078}
1079
1080} // namespace journal
1081} // namespace librbd
1082
1083template class librbd::journal::Replay<librbd::ImageCtx>;