]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/journal/Replay.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / librbd / journal / Replay.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/journal/Replay.h"
5#include "common/dout.h"
6#include "common/errno.h"
7#include "common/WorkQueue.h"
31f18b77 8#include "librbd/ExclusiveLock.h"
7c673cae
FG
9#include "librbd/ImageCtx.h"
10#include "librbd/ImageState.h"
11#include "librbd/internal.h"
12#include "librbd/Operations.h"
13#include "librbd/Utils.h"
14#include "librbd/io/AioCompletion.h"
15#include "librbd/io/ImageRequest.h"
16
17#define dout_subsys ceph_subsys_rbd
18#undef dout_prefix
19#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
20
21namespace librbd {
22namespace journal {
23
24namespace {
25
26static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
28
29static NoOpProgressContext no_op_progress_callback;
30
31template <typename I, typename E>
32struct ExecuteOp : public Context {
33 I &image_ctx;
34 E event;
35 Context *on_op_complete;
36
37 ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete)
38 : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) {
39 }
40
41 void execute(const journal::SnapCreateEvent &_) {
42 image_ctx.operations->execute_snap_create(event.snap_namespace,
43 event.snap_name,
44 on_op_complete,
45 event.op_tid, false);
46 }
47
48 void execute(const journal::SnapRemoveEvent &_) {
49 image_ctx.operations->execute_snap_remove(event.snap_namespace,
50 event.snap_name,
51 on_op_complete);
52 }
53
54 void execute(const journal::SnapRenameEvent &_) {
55 image_ctx.operations->execute_snap_rename(event.snap_id,
56 event.dst_snap_name,
57 on_op_complete);
58 }
59
60 void execute(const journal::SnapProtectEvent &_) {
61 image_ctx.operations->execute_snap_protect(event.snap_namespace,
62 event.snap_name,
63 on_op_complete);
64 }
65
66 void execute(const journal::SnapUnprotectEvent &_) {
67 image_ctx.operations->execute_snap_unprotect(event.snap_namespace,
68 event.snap_name,
69 on_op_complete);
70 }
71
72 void execute(const journal::SnapRollbackEvent &_) {
73 image_ctx.operations->execute_snap_rollback(event.snap_namespace,
74 event.snap_name,
75 no_op_progress_callback,
76 on_op_complete);
77 }
78
79 void execute(const journal::RenameEvent &_) {
80 image_ctx.operations->execute_rename(event.image_name,
81 on_op_complete);
82 }
83
84 void execute(const journal::ResizeEvent &_) {
85 image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback,
86 on_op_complete, event.op_tid);
87 }
88
89 void execute(const journal::FlattenEvent &_) {
90 image_ctx.operations->execute_flatten(no_op_progress_callback,
91 on_op_complete);
92 }
93
94 void execute(const journal::SnapLimitEvent &_) {
95 image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete);
96 }
97
98 void execute(const journal::UpdateFeaturesEvent &_) {
99 image_ctx.operations->execute_update_features(event.features, event.enabled,
100 on_op_complete, event.op_tid);
101 }
102
103 void execute(const journal::MetadataSetEvent &_) {
104 image_ctx.operations->execute_metadata_set(event.key, event.value,
105 on_op_complete);
106 }
107
108 void execute(const journal::MetadataRemoveEvent &_) {
109 image_ctx.operations->execute_metadata_remove(event.key, on_op_complete);
110 }
111
112 void finish(int r) override {
113 CephContext *cct = image_ctx.cct;
114 if (r < 0) {
115 lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl;
116 on_op_complete->complete(r);
117 return;
118 }
119
120 ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl;
9f95a23c 121 std::shared_lock owner_locker{image_ctx.owner_lock};
31f18b77
FG
122
123 if (image_ctx.exclusive_lock == nullptr ||
124 !image_ctx.exclusive_lock->accept_ops()) {
125 ldout(cct, 5) << ": lost exclusive lock -- skipping op" << dendl;
126 on_op_complete->complete(-ECANCELED);
127 return;
128 }
129
7c673cae
FG
130 execute(event);
131 }
132};
133
134template <typename I>
135struct C_RefreshIfRequired : public Context {
136 I &image_ctx;
137 Context *on_finish;
138
139 C_RefreshIfRequired(I &image_ctx, Context *on_finish)
140 : image_ctx(image_ctx), on_finish(on_finish) {
141 }
142 ~C_RefreshIfRequired() override {
143 delete on_finish;
144 }
145
146 void finish(int r) override {
147 CephContext *cct = image_ctx.cct;
148 Context *ctx = on_finish;
149 on_finish = nullptr;
150
151 if (r < 0) {
152 lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl;
153 image_ctx.op_work_queue->queue(ctx, r);
154 return;
155 }
156
157 if (image_ctx.state->is_refresh_required()) {
158 ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": "
159 << "refresh required" << dendl;
160 image_ctx.state->refresh(ctx);
161 return;
162 }
163
164 image_ctx.op_work_queue->queue(ctx, 0);
165 }
166};
167
168} // anonymous namespace
169
170#undef dout_prefix
171#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
172 << __func__
173
174template <typename I>
175Replay<I>::Replay(I &image_ctx)
9f95a23c 176 : m_image_ctx(image_ctx) {
7c673cae
FG
177}
178
179template <typename I>
180Replay<I>::~Replay() {
9f95a23c 181 std::lock_guard locker{m_lock};
11fdf7f2
TL
182 ceph_assert(m_in_flight_aio_flush == 0);
183 ceph_assert(m_in_flight_aio_modify == 0);
184 ceph_assert(m_aio_modify_unsafe_contexts.empty());
185 ceph_assert(m_aio_modify_safe_contexts.empty());
186 ceph_assert(m_op_events.empty());
187 ceph_assert(m_in_flight_op_events == 0);
7c673cae
FG
188}
189
190template <typename I>
11fdf7f2 191int Replay<I>::decode(bufferlist::const_iterator *it, EventEntry *event_entry) {
7c673cae 192 try {
11fdf7f2
TL
193 using ceph::decode;
194 decode(*event_entry, *it);
7c673cae
FG
195 } catch (const buffer::error &err) {
196 return -EBADMSG;
197 }
198 return 0;
199}
200
201template <typename I>
202void Replay<I>::process(const EventEntry &event_entry,
203 Context *on_ready, Context *on_safe) {
204 CephContext *cct = m_image_ctx.cct;
205 ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe
206 << dendl;
207
208 on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
209
9f95a23c 210 std::shared_lock owner_lock{m_image_ctx.owner_lock};
31f18b77
FG
211 if (m_image_ctx.exclusive_lock == nullptr ||
212 !m_image_ctx.exclusive_lock->accept_ops()) {
213 ldout(cct, 5) << ": lost exclusive lock -- skipping event" << dendl;
214 m_image_ctx.op_work_queue->queue(on_safe, -ECANCELED);
215 on_ready->complete(0);
216 return;
217 }
218
7c673cae
FG
219 boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
220 event_entry.event);
221}
222
223template <typename I>
224void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
225 CephContext *cct = m_image_ctx.cct;
226 ldout(cct, 20) << dendl;
227
228 io::AioCompletion *flush_comp = nullptr;
229 on_finish = util::create_async_context_callback(
230 m_image_ctx, on_finish);
231
232 {
9f95a23c 233 std::lock_guard locker{m_lock};
7c673cae
FG
234
235 // safely commit any remaining AIO modify operations
236 if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
237 flush_comp = create_aio_flush_completion(nullptr);
11fdf7f2 238 ceph_assert(flush_comp != nullptr);
7c673cae
FG
239 }
240
241 for (auto &op_event_pair : m_op_events) {
242 OpEvent &op_event = op_event_pair.second;
243 if (cancel_ops) {
244 // cancel ops that are waiting to start (waiting for
245 // OpFinishEvent or waiting for ready)
246 if (op_event.on_start_ready == nullptr &&
247 op_event.on_op_finish_event != nullptr) {
248 Context *on_op_finish_event = nullptr;
249 std::swap(on_op_finish_event, op_event.on_op_finish_event);
250 m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
251 }
252 } else if (op_event.on_op_finish_event != nullptr) {
253 // start ops waiting for OpFinishEvent
254 Context *on_op_finish_event = nullptr;
255 std::swap(on_op_finish_event, op_event.on_op_finish_event);
256 m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
257 } else if (op_event.on_start_ready != nullptr) {
258 // waiting for op ready
259 op_event_pair.second.finish_on_ready = true;
260 }
261 }
262
11fdf7f2 263 ceph_assert(!m_shut_down);
31f18b77
FG
264 m_shut_down = true;
265
11fdf7f2 266 ceph_assert(m_flush_ctx == nullptr);
7c673cae
FG
267 if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
268 std::swap(m_flush_ctx, on_finish);
269 }
270 }
271
272 // execute the following outside of lock scope
273 if (flush_comp != nullptr) {
9f95a23c 274 std::shared_lock owner_locker{m_image_ctx.owner_lock};
11fdf7f2
TL
275 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
276 io::FLUSH_SOURCE_INTERNAL, {});
7c673cae
FG
277 }
278 if (on_finish != nullptr) {
279 on_finish->complete(0);
280 }
281}
282
283template <typename I>
284void Replay<I>::flush(Context *on_finish) {
285 io::AioCompletion *aio_comp;
286 {
9f95a23c 287 std::lock_guard locker{m_lock};
7c673cae
FG
288 aio_comp = create_aio_flush_completion(
289 util::create_async_context_callback(m_image_ctx, on_finish));
31f18b77
FG
290 if (aio_comp == nullptr) {
291 return;
292 }
7c673cae
FG
293 }
294
9f95a23c 295 std::shared_lock owner_locker{m_image_ctx.owner_lock};
11fdf7f2
TL
296 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp,
297 io::FLUSH_SOURCE_INTERNAL, {});
7c673cae
FG
298}
299
300template <typename I>
301void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
302 CephContext *cct = m_image_ctx.cct;
303 ldout(cct, 20) << ": op_tid=" << op_tid << dendl;
304
9f95a23c 305 std::lock_guard locker{m_lock};
7c673cae 306 auto op_it = m_op_events.find(op_tid);
11fdf7f2 307 ceph_assert(op_it != m_op_events.end());
7c673cae
FG
308
309 OpEvent &op_event = op_it->second;
11fdf7f2
TL
310 ceph_assert(op_event.op_in_progress &&
311 op_event.on_op_finish_event == nullptr &&
312 op_event.on_finish_ready == nullptr &&
313 op_event.on_finish_safe == nullptr);
7c673cae
FG
314
315 // resume processing replay events
316 Context *on_start_ready = nullptr;
317 std::swap(on_start_ready, op_event.on_start_ready);
318 on_start_ready->complete(0);
319
320 // cancel has been requested -- send error to paused state machine
321 if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
322 m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
323 return;
324 }
325
326 // resume the op state machine once the associated OpFinishEvent
327 // is processed
9f95a23c 328 op_event.on_op_finish_event = new LambdaContext(
7c673cae
FG
329 [on_resume](int r) {
330 on_resume->complete(r);
331 });
332
333 // shut down request -- don't expect OpFinishEvent
334 if (op_event.finish_on_ready) {
335 m_image_ctx.op_work_queue->queue(on_resume, 0);
336 }
337}
338
339template <typename I>
340void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
341 Context *on_ready, Context *on_safe) {
342 CephContext *cct = m_image_ctx.cct;
343 ldout(cct, 20) << ": AIO discard event" << dendl;
344
345 bool flush_required;
346 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
347 io::AIO_TYPE_DISCARD,
c07f9fc5
FG
348 &flush_required,
349 {});
31f18b77
FG
350 if (aio_comp == nullptr) {
351 return;
352 }
353
11fdf7f2
TL
354 if (!clipped_io(event.offset, aio_comp)) {
355 io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp,
356 {{event.offset, event.length}},
357 event.discard_granularity_bytes, {});
358 }
359
7c673cae 360 if (flush_required) {
9f95a23c 361 m_lock.lock();
7c673cae 362 auto flush_comp = create_aio_flush_completion(nullptr);
9f95a23c 363 m_lock.unlock();
7c673cae 364
31f18b77 365 if (flush_comp != nullptr) {
11fdf7f2
TL
366 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
367 io::FLUSH_SOURCE_INTERNAL, {});
31f18b77 368 }
7c673cae
FG
369 }
370}
371
372template <typename I>
373void Replay<I>::handle_event(const journal::AioWriteEvent &event,
374 Context *on_ready, Context *on_safe) {
375 CephContext *cct = m_image_ctx.cct;
376 ldout(cct, 20) << ": AIO write event" << dendl;
377
378 bufferlist data = event.data;
379 bool flush_required;
380 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
381 io::AIO_TYPE_WRITE,
c07f9fc5
FG
382 &flush_required,
383 {});
31f18b77
FG
384 if (aio_comp == nullptr) {
385 return;
386 }
387
11fdf7f2
TL
388 if (!clipped_io(event.offset, aio_comp)) {
389 io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
390 {{event.offset, event.length}},
391 std::move(data), 0, {});
392 }
393
7c673cae 394 if (flush_required) {
9f95a23c 395 m_lock.lock();
7c673cae 396 auto flush_comp = create_aio_flush_completion(nullptr);
9f95a23c 397 m_lock.unlock();
7c673cae 398
31f18b77 399 if (flush_comp != nullptr) {
11fdf7f2
TL
400 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
401 io::FLUSH_SOURCE_INTERNAL, {});
31f18b77 402 }
7c673cae
FG
403 }
404}
405
406template <typename I>
407void Replay<I>::handle_event(const journal::AioFlushEvent &event,
408 Context *on_ready, Context *on_safe) {
409 CephContext *cct = m_image_ctx.cct;
410 ldout(cct, 20) << ": AIO flush event" << dendl;
411
412 io::AioCompletion *aio_comp;
413 {
9f95a23c 414 std::lock_guard locker{m_lock};
7c673cae
FG
415 aio_comp = create_aio_flush_completion(on_safe);
416 }
7c673cae 417
31f18b77 418 if (aio_comp != nullptr) {
11fdf7f2
TL
419 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp,
420 io::FLUSH_SOURCE_INTERNAL, {});
31f18b77 421 }
7c673cae
FG
422 on_ready->complete(0);
423}
424
425template <typename I>
426void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
427 Context *on_ready, Context *on_safe) {
428 CephContext *cct = m_image_ctx.cct;
429 ldout(cct, 20) << ": AIO writesame event" << dendl;
430
431 bufferlist data = event.data;
432 bool flush_required;
433 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
434 io::AIO_TYPE_WRITESAME,
c07f9fc5
FG
435 &flush_required,
436 {});
31f18b77
FG
437 if (aio_comp == nullptr) {
438 return;
439 }
440
11fdf7f2
TL
441 if (!clipped_io(event.offset, aio_comp)) {
442 io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp,
443 {{event.offset, event.length}},
444 std::move(data), 0, {});
445 }
446
7c673cae 447 if (flush_required) {
9f95a23c 448 m_lock.lock();
7c673cae 449 auto flush_comp = create_aio_flush_completion(nullptr);
9f95a23c 450 m_lock.unlock();
7c673cae 451
31f18b77 452 if (flush_comp != nullptr) {
11fdf7f2
TL
453 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
454 io::FLUSH_SOURCE_INTERNAL, {});
31f18b77 455 }
7c673cae
FG
456 }
457}
458
c07f9fc5
FG
459 template <typename I>
460 void Replay<I>::handle_event(const journal::AioCompareAndWriteEvent &event,
461 Context *on_ready, Context *on_safe) {
462 CephContext *cct = m_image_ctx.cct;
463 ldout(cct, 20) << ": AIO CompareAndWrite event" << dendl;
464
465 bufferlist cmp_data = event.cmp_data;
466 bufferlist write_data = event.write_data;
467 bool flush_required;
468 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
469 io::AIO_TYPE_COMPARE_AND_WRITE,
470 &flush_required,
471 {-EILSEQ});
11fdf7f2
TL
472
473 if (!clipped_io(event.offset, aio_comp)) {
474 io::ImageRequest<I>::aio_compare_and_write(&m_image_ctx, aio_comp,
475 {{event.offset, event.length}},
476 std::move(cmp_data),
477 std::move(write_data),
478 nullptr, 0, {});
479 }
480
c07f9fc5 481 if (flush_required) {
9f95a23c 482 m_lock.lock();
c07f9fc5 483 auto flush_comp = create_aio_flush_completion(nullptr);
9f95a23c 484 m_lock.unlock();
c07f9fc5 485
11fdf7f2
TL
486 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp,
487 io::FLUSH_SOURCE_INTERNAL, {});
c07f9fc5
FG
488 }
489}
490
7c673cae
FG
491template <typename I>
492void Replay<I>::handle_event(const journal::OpFinishEvent &event,
493 Context *on_ready, Context *on_safe) {
494 CephContext *cct = m_image_ctx.cct;
495 ldout(cct, 20) << ": Op finish event: "
496 << "op_tid=" << event.op_tid << dendl;
497
498 bool op_in_progress;
499 bool filter_ret_val;
500 Context *on_op_complete = nullptr;
501 Context *on_op_finish_event = nullptr;
502 {
9f95a23c 503 std::lock_guard locker{m_lock};
7c673cae
FG
504 auto op_it = m_op_events.find(event.op_tid);
505 if (op_it == m_op_events.end()) {
506 ldout(cct, 10) << ": unable to locate associated op: assuming previously "
507 << "committed." << dendl;
508 on_ready->complete(0);
509 m_image_ctx.op_work_queue->queue(on_safe, 0);
510 return;
511 }
512
513 OpEvent &op_event = op_it->second;
11fdf7f2 514 ceph_assert(op_event.on_finish_safe == nullptr);
7c673cae
FG
515 op_event.on_finish_ready = on_ready;
516 op_event.on_finish_safe = on_safe;
517 op_in_progress = op_event.op_in_progress;
518 std::swap(on_op_complete, op_event.on_op_complete);
519 std::swap(on_op_finish_event, op_event.on_op_finish_event);
520
521 // special errors which indicate op never started but was recorded
522 // as failed in the journal
523 filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0);
524 }
525
526 if (event.r < 0) {
527 if (op_in_progress) {
528 // bubble the error up to the in-progress op to cancel it
529 on_op_finish_event->complete(event.r);
530 } else {
531 // op hasn't been started -- bubble the error up since
532 // our image is now potentially in an inconsistent state
533 // since simple errors should have been caught before
534 // creating the op event
535 delete on_op_complete;
536 delete on_op_finish_event;
537 handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r);
538 }
539 return;
540 }
541
542 // journal recorded success -- apply the op now
543 on_op_finish_event->complete(0);
544}
545
546template <typename I>
547void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
548 Context *on_ready, Context *on_safe) {
549 CephContext *cct = m_image_ctx.cct;
550 ldout(cct, 20) << ": Snap create event" << dendl;
551
9f95a23c 552 std::lock_guard locker{m_lock};
7c673cae
FG
553 OpEvent *op_event;
554 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
555 on_safe, &op_event);
556 if (on_op_complete == nullptr) {
557 return;
558 }
559
560 // ignore errors caused due to replay
561 op_event->ignore_error_codes = {-EEXIST};
562
563 // avoid lock cycles
564 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
565 m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event,
566 on_op_complete)),
567 0);
568
569 // do not process more events until the state machine is ready
570 // since it will affect IO
571 op_event->op_in_progress = true;
572 op_event->on_start_ready = on_ready;
573}
574
575template <typename I>
576void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
577 Context *on_ready, Context *on_safe) {
578 CephContext *cct = m_image_ctx.cct;
579 ldout(cct, 20) << ": Snap remove event" << dendl;
580
9f95a23c 581 std::lock_guard locker{m_lock};
7c673cae
FG
582 OpEvent *op_event;
583 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
584 on_safe, &op_event);
585 if (on_op_complete == nullptr) {
586 return;
587 }
588
589 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
590 m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event,
591 on_op_complete));
592
593 // ignore errors caused due to replay
594 op_event->ignore_error_codes = {-ENOENT};
595
596 on_ready->complete(0);
597}
598
599template <typename I>
600void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
601 Context *on_ready, Context *on_safe) {
602 CephContext *cct = m_image_ctx.cct;
603 ldout(cct, 20) << ": Snap rename event" << dendl;
604
9f95a23c 605 std::lock_guard locker{m_lock};
7c673cae
FG
606 OpEvent *op_event;
607 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
608 on_safe, &op_event);
609 if (on_op_complete == nullptr) {
610 return;
611 }
612
613 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
614 m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event,
615 on_op_complete));
616
617 // ignore errors caused due to replay
618 op_event->ignore_error_codes = {-EEXIST};
619
620 on_ready->complete(0);
621}
622
623template <typename I>
624void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
625 Context *on_ready, Context *on_safe) {
626 CephContext *cct = m_image_ctx.cct;
627 ldout(cct, 20) << ": Snap protect event" << dendl;
628
9f95a23c 629 std::lock_guard locker{m_lock};
7c673cae
FG
630 OpEvent *op_event;
631 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
632 on_safe, &op_event);
633 if (on_op_complete == nullptr) {
634 return;
635 }
636
637 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
638 m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event,
639 on_op_complete));
640
641 // ignore errors caused due to replay
642 op_event->ignore_error_codes = {-EBUSY};
643
644 on_ready->complete(0);
645}
646
647template <typename I>
648void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
649 Context *on_ready, Context *on_safe) {
650 CephContext *cct = m_image_ctx.cct;
651 ldout(cct, 20) << ": Snap unprotect event" << dendl;
652
9f95a23c 653 std::lock_guard locker{m_lock};
7c673cae
FG
654 OpEvent *op_event;
655 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
656 on_safe, &op_event);
657 if (on_op_complete == nullptr) {
658 return;
659 }
660
661 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
662 m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx,
663 event,
664 on_op_complete));
665
666 // ignore errors recorded in the journal
667 op_event->op_finish_error_codes = {-EBUSY};
668
669 // ignore errors caused due to replay
670 op_event->ignore_error_codes = {-EINVAL};
671
672 on_ready->complete(0);
673}
674
675template <typename I>
676void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
677 Context *on_ready, Context *on_safe) {
678 CephContext *cct = m_image_ctx.cct;
679 ldout(cct, 20) << ": Snap rollback start event" << dendl;
680
9f95a23c 681 std::lock_guard locker{m_lock};
7c673cae
FG
682 OpEvent *op_event;
683 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
684 on_safe, &op_event);
685 if (on_op_complete == nullptr) {
686 return;
687 }
688
689 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
690 m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx,
691 event,
692 on_op_complete));
693
694 on_ready->complete(0);
695}
696
697template <typename I>
698void Replay<I>::handle_event(const journal::RenameEvent &event,
699 Context *on_ready, Context *on_safe) {
700 CephContext *cct = m_image_ctx.cct;
701 ldout(cct, 20) << ": Rename event" << dendl;
702
9f95a23c 703 std::lock_guard locker{m_lock};
7c673cae
FG
704 OpEvent *op_event;
705 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
706 on_safe, &op_event);
707 if (on_op_complete == nullptr) {
708 return;
709 }
710
711 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
712 m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event,
713 on_op_complete));
714
715 // ignore errors caused due to replay
716 op_event->ignore_error_codes = {-EEXIST};
717
718 on_ready->complete(0);
719}
720
721template <typename I>
722void Replay<I>::handle_event(const journal::ResizeEvent &event,
723 Context *on_ready, Context *on_safe) {
724 CephContext *cct = m_image_ctx.cct;
725 ldout(cct, 20) << ": Resize start event" << dendl;
726
9f95a23c 727 std::lock_guard locker{m_lock};
7c673cae
FG
728 OpEvent *op_event;
729 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
730 on_safe, &op_event);
731 if (on_op_complete == nullptr) {
732 return;
733 }
734
735 // avoid lock cycles
736 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
737 m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event,
738 on_op_complete)), 0);
739
740 // do not process more events until the state machine is ready
741 // since it will affect IO
742 op_event->op_in_progress = true;
743 op_event->on_start_ready = on_ready;
744}
745
746template <typename I>
747void Replay<I>::handle_event(const journal::FlattenEvent &event,
748 Context *on_ready, Context *on_safe) {
749 CephContext *cct = m_image_ctx.cct;
750 ldout(cct, 20) << ": Flatten start event" << dendl;
751
9f95a23c 752 std::lock_guard locker{m_lock};
7c673cae
FG
753 OpEvent *op_event;
754 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
755 on_safe, &op_event);
756 if (on_op_complete == nullptr) {
757 return;
758 }
759
760 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
761 m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event,
762 on_op_complete));
763
764 // ignore errors caused due to replay
765 op_event->ignore_error_codes = {-EINVAL};
766
767 on_ready->complete(0);
768}
769
770template <typename I>
771void Replay<I>::handle_event(const journal::DemotePromoteEvent &event,
772 Context *on_ready, Context *on_safe) {
773 CephContext *cct = m_image_ctx.cct;
774 ldout(cct, 20) << ": Demote/Promote event" << dendl;
775 on_ready->complete(0);
776 on_safe->complete(0);
777}
778
779template <typename I>
780void Replay<I>::handle_event(const journal::SnapLimitEvent &event,
781 Context *on_ready, Context *on_safe) {
782 CephContext *cct = m_image_ctx.cct;
783 ldout(cct, 20) << ": Snap limit event" << dendl;
784
9f95a23c 785 std::lock_guard locker{m_lock};
7c673cae
FG
786 OpEvent *op_event;
787 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
788 on_safe, &op_event);
789 if (on_op_complete == nullptr) {
790 return;
791 }
792
793 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
794 m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx,
795 event,
796 on_op_complete));
797
11fdf7f2
TL
798 op_event->ignore_error_codes = {-ERANGE};
799
7c673cae
FG
800 on_ready->complete(0);
801}
802
803template <typename I>
804void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event,
805 Context *on_ready, Context *on_safe) {
806 CephContext *cct = m_image_ctx.cct;
807 ldout(cct, 20) << ": Update features event" << dendl;
808
9f95a23c 809 std::lock_guard locker{m_lock};
7c673cae
FG
810 OpEvent *op_event;
811 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
812 on_safe, &op_event);
813 if (on_op_complete == nullptr) {
814 return;
815 }
816
817 // avoid lock cycles
818 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
819 m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>(
820 m_image_ctx, event, on_op_complete)), 0);
821
822 // do not process more events until the state machine is ready
823 // since it will affect IO
824 op_event->op_in_progress = true;
825 op_event->on_start_ready = on_ready;
826}
827
828template <typename I>
829void Replay<I>::handle_event(const journal::MetadataSetEvent &event,
830 Context *on_ready, Context *on_safe) {
831 CephContext *cct = m_image_ctx.cct;
832 ldout(cct, 20) << ": Metadata set event" << dendl;
833
9f95a23c 834 std::lock_guard locker{m_lock};
7c673cae
FG
835 OpEvent *op_event;
836 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
837 on_safe, &op_event);
838 if (on_op_complete == nullptr) {
839 return;
840 }
841
b32b8144
FG
842 on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
843 op_event->on_op_finish_event = util::create_async_context_callback(
7c673cae
FG
844 m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>(
845 m_image_ctx, event, on_op_complete));
846
847 on_ready->complete(0);
848}
849
850template <typename I>
851void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event,
852 Context *on_ready, Context *on_safe) {
853 CephContext *cct = m_image_ctx.cct;
854 ldout(cct, 20) << ": Metadata remove event" << dendl;
855
9f95a23c 856 std::lock_guard locker{m_lock};
7c673cae
FG
857 OpEvent *op_event;
858 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
859 on_safe, &op_event);
860 if (on_op_complete == nullptr) {
861 return;
862 }
863
b32b8144
FG
864 on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
865 op_event->on_op_finish_event = util::create_async_context_callback(
7c673cae
FG
866 m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>(
867 m_image_ctx, event, on_op_complete));
868
869 // ignore errors caused due to replay
870 op_event->ignore_error_codes = {-ENOENT};
871
872 on_ready->complete(0);
873}
874
875template <typename I>
876void Replay<I>::handle_event(const journal::UnknownEvent &event,
877 Context *on_ready, Context *on_safe) {
878 CephContext *cct = m_image_ctx.cct;
879 ldout(cct, 20) << ": unknown event" << dendl;
880 on_ready->complete(0);
881 on_safe->complete(0);
882}
883
884template <typename I>
885void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
9f95a23c
TL
886 int r, std::set<int> &filters) {
887 std::lock_guard locker{m_lock};
7c673cae
FG
888 CephContext *cct = m_image_ctx.cct;
889 ldout(cct, 20) << ": on_ready=" << on_ready << ", "
890 << "on_safe=" << on_safe << ", r=" << r << dendl;
891
892 if (on_ready != nullptr) {
893 on_ready->complete(0);
894 }
c07f9fc5
FG
895
896 if (filters.find(r) != filters.end())
897 r = 0;
898
7c673cae
FG
899 if (r < 0) {
900 lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl;
11fdf7f2 901 m_image_ctx.op_work_queue->queue(on_safe, r);
7c673cae
FG
902 return;
903 }
904
9f95a23c
TL
905 // will be completed after next flush operation completes
906 m_aio_modify_safe_contexts.insert(on_safe);
7c673cae
FG
907}
908
909template <typename I>
910void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
911 Contexts &on_safe_ctxs, int r) {
912 CephContext *cct = m_image_ctx.cct;
913 ldout(cct, 20) << ": r=" << r << dendl;
914
915 if (r < 0) {
916 lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl;
917 }
918
919 Context *on_aio_ready = nullptr;
920 Context *on_flush = nullptr;
921 {
9f95a23c 922 std::lock_guard locker{m_lock};
11fdf7f2
TL
923 ceph_assert(m_in_flight_aio_flush > 0);
924 ceph_assert(m_in_flight_aio_modify >= on_safe_ctxs.size());
7c673cae
FG
925 --m_in_flight_aio_flush;
926 m_in_flight_aio_modify -= on_safe_ctxs.size();
927
928 std::swap(on_aio_ready, m_on_aio_ready);
929 if (m_in_flight_op_events == 0 &&
930 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
931 on_flush = m_flush_ctx;
932 }
933
934 // strip out previously failed on_safe contexts
935 for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
936 if (m_aio_modify_safe_contexts.erase(*it)) {
937 ++it;
938 } else {
939 it = on_safe_ctxs.erase(it);
940 }
941 }
942 }
943
944 if (on_aio_ready != nullptr) {
945 ldout(cct, 10) << ": resuming paused AIO" << dendl;
946 on_aio_ready->complete(0);
947 }
948
949 if (on_flush_safe != nullptr) {
950 on_safe_ctxs.push_back(on_flush_safe);
951 }
952 for (auto ctx : on_safe_ctxs) {
953 ldout(cct, 20) << ": completing safe context: " << ctx << dendl;
954 ctx->complete(r);
955 }
956
957 if (on_flush != nullptr) {
958 ldout(cct, 20) << ": completing flush context: " << on_flush << dendl;
959 on_flush->complete(r);
960 }
961}
962
963template <typename I>
964Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
965 Context *on_ready,
966 Context *on_safe,
967 OpEvent **op_event) {
968 CephContext *cct = m_image_ctx.cct;
31f18b77
FG
969 if (m_shut_down) {
970 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
971 on_ready->complete(0);
972 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
973 return nullptr;
974 }
7c673cae 975
9f95a23c 976 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
977 if (m_op_events.count(op_tid) != 0) {
978 lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl;
979
980 // on_ready is already async but on failure invoke on_safe async
981 // as well
982 on_ready->complete(0);
983 m_image_ctx.op_work_queue->queue(on_safe, -EINVAL);
984 return nullptr;
985 }
986
987 ++m_in_flight_op_events;
988 *op_event = &m_op_events[op_tid];
989 (*op_event)->on_start_safe = on_safe;
990
991 Context *on_op_complete = new C_OpOnComplete(this, op_tid);
992 (*op_event)->on_op_complete = on_op_complete;
993 return on_op_complete;
994}
995
996template <typename I>
997void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
998 CephContext *cct = m_image_ctx.cct;
999 ldout(cct, 20) << ": op_tid=" << op_tid << ", "
1000 << "r=" << r << dendl;
1001
1002 OpEvent op_event;
1003 bool shutting_down = false;
1004 {
9f95a23c 1005 std::lock_guard locker{m_lock};
7c673cae 1006 auto op_it = m_op_events.find(op_tid);
11fdf7f2 1007 ceph_assert(op_it != m_op_events.end());
7c673cae
FG
1008
1009 op_event = std::move(op_it->second);
1010 m_op_events.erase(op_it);
1011
31f18b77 1012 if (m_shut_down) {
11fdf7f2 1013 ceph_assert(m_flush_ctx != nullptr);
31f18b77
FG
1014 shutting_down = true;
1015 }
7c673cae
FG
1016 }
1017
11fdf7f2 1018 ceph_assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
7c673cae
FG
1019 if (op_event.on_start_ready != nullptr) {
1020 // blocking op event failed before it became ready
11fdf7f2
TL
1021 ceph_assert(op_event.on_finish_ready == nullptr &&
1022 op_event.on_finish_safe == nullptr);
7c673cae
FG
1023
1024 op_event.on_start_ready->complete(0);
1025 } else {
1026 // event kicked off by OpFinishEvent
11fdf7f2
TL
1027 ceph_assert((op_event.on_finish_ready != nullptr &&
1028 op_event.on_finish_safe != nullptr) || shutting_down);
7c673cae
FG
1029 }
1030
1031 if (op_event.on_op_finish_event != nullptr) {
1032 op_event.on_op_finish_event->complete(r);
1033 }
1034
1035 if (op_event.on_finish_ready != nullptr) {
1036 op_event.on_finish_ready->complete(0);
1037 }
1038
1039 // filter out errors caused by replay of the same op
1040 if (r < 0 && op_event.ignore_error_codes.count(r) != 0) {
1041 r = 0;
1042 }
1043
1044 op_event.on_start_safe->complete(r);
1045 if (op_event.on_finish_safe != nullptr) {
1046 op_event.on_finish_safe->complete(r);
1047 }
1048
1049 // shut down request might have occurred while lock was
1050 // dropped -- handle if pending
1051 Context *on_flush = nullptr;
1052 {
9f95a23c 1053 std::lock_guard locker{m_lock};
11fdf7f2 1054 ceph_assert(m_in_flight_op_events > 0);
7c673cae
FG
1055 --m_in_flight_op_events;
1056 if (m_in_flight_op_events == 0 &&
1057 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
1058 on_flush = m_flush_ctx;
1059 }
1060 }
1061 if (on_flush != nullptr) {
1062 m_image_ctx.op_work_queue->queue(on_flush, 0);
1063 }
1064}
1065
1066template <typename I>
1067io::AioCompletion *
c07f9fc5
FG
1068Replay<I>::create_aio_modify_completion(Context *on_ready,
1069 Context *on_safe,
7c673cae 1070 io::aio_type_t aio_type,
c07f9fc5
FG
1071 bool *flush_required,
1072 std::set<int> &&filters) {
9f95a23c 1073 std::lock_guard locker{m_lock};
7c673cae 1074 CephContext *cct = m_image_ctx.cct;
11fdf7f2 1075 ceph_assert(m_on_aio_ready == nullptr);
7c673cae 1076
31f18b77
FG
1077 if (m_shut_down) {
1078 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1079 on_ready->complete(0);
1080 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1081 return nullptr;
1082 }
1083
7c673cae 1084 ++m_in_flight_aio_modify;
9f95a23c 1085 m_aio_modify_unsafe_contexts.push_back(on_safe);
7c673cae
FG
1086
1087 // FLUSH if we hit the low-water mark -- on_safe contexts are
1088 // completed by flushes-only so that we don't move the journal
1089 // commit position until safely on-disk
1090
9f95a23c 1091 *flush_required = (m_aio_modify_unsafe_contexts.size() ==
7c673cae
FG
1092 IN_FLIGHT_IO_LOW_WATER_MARK);
1093 if (*flush_required) {
1094 ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
1095 << dendl;
1096 }
1097
1098 // READY for more events if:
1099 // * not at high-water mark for IO
1100 // * in-flight ops are at a consistent point (snap create has IO flushed,
1101 // shrink has adjusted clip boundary, etc) -- should have already been
1102 // flagged not-ready
1103 if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
1104 ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay"
1105 << dendl;
11fdf7f2 1106 ceph_assert(m_on_aio_ready == nullptr);
7c673cae
FG
1107 std::swap(m_on_aio_ready, on_ready);
1108 }
1109
1110 // when the modification is ACKed by librbd, we can process the next
1111 // event. when flushed, the completion of the next flush will fire the
1112 // on_safe callback
1113 auto aio_comp = io::AioCompletion::create_and_start<Context>(
9f95a23c 1114 new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters)),
7c673cae
FG
1115 util::get_image_ctx(&m_image_ctx), aio_type);
1116 return aio_comp;
1117}
1118
1119template <typename I>
1120io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
9f95a23c 1121 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae 1122
31f18b77
FG
1123 CephContext *cct = m_image_ctx.cct;
1124 if (m_shut_down) {
1125 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1126 if (on_safe != nullptr) {
1127 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1128 }
1129 return nullptr;
1130 }
1131
7c673cae
FG
1132 ++m_in_flight_aio_flush;
1133
1134 // associate all prior write/discard ops to this flush request
1135 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1136 new C_AioFlushComplete(this, on_safe,
1137 std::move(m_aio_modify_unsafe_contexts)),
1138 util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH);
1139 m_aio_modify_unsafe_contexts.clear();
1140 return aio_comp;
1141}
1142
11fdf7f2
TL
1143template <typename I>
1144bool Replay<I>::clipped_io(uint64_t image_offset, io::AioCompletion *aio_comp) {
1145 CephContext *cct = m_image_ctx.cct;
1146
9f95a23c 1147 m_image_ctx.image_lock.lock_shared();
11fdf7f2 1148 size_t image_size = m_image_ctx.size;
9f95a23c 1149 m_image_ctx.image_lock.unlock_shared();
11fdf7f2
TL
1150
1151 if (image_offset >= image_size) {
1152 // rbd-mirror image sync might race an IO event w/ associated resize between
1153 // the point the peer is registered and the sync point is created, so no-op
1154 // IO events beyond the current image extents since under normal conditions
1155 // it wouldn't have been recorded in the journal
1156 ldout(cct, 5) << ": no-op IO event beyond image size" << dendl;
1157 aio_comp->get();
81eedcae 1158 aio_comp->set_request_count(0);
11fdf7f2
TL
1159 aio_comp->put();
1160 return true;
1161 }
1162
1163 return false;
1164}
1165
7c673cae
FG
1166} // namespace journal
1167} // namespace librbd
1168
1169template class librbd::journal::Replay<librbd::ImageCtx>;