]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/journal/Replay.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / librbd / journal / Replay.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/journal/Replay.h"
5 #include "common/dout.h"
6 #include "common/errno.h"
7 #include "common/WorkQueue.h"
8 #include "librbd/ImageCtx.h"
9 #include "librbd/ImageState.h"
10 #include "librbd/internal.h"
11 #include "librbd/Operations.h"
12 #include "librbd/Utils.h"
13 #include "librbd/io/AioCompletion.h"
14 #include "librbd/io/ImageRequest.h"
15
16 #define dout_subsys ceph_subsys_rbd
17 #undef dout_prefix
18 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
19
20 namespace librbd {
21 namespace journal {
22
23 namespace {
24
25 static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
26 static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
27
28 static NoOpProgressContext no_op_progress_callback;
29
30 template <typename I, typename E>
31 struct ExecuteOp : public Context {
32 I &image_ctx;
33 E event;
34 Context *on_op_complete;
35
36 ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete)
37 : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) {
38 }
39
40 void execute(const journal::SnapCreateEvent &_) {
41 image_ctx.operations->execute_snap_create(event.snap_namespace,
42 event.snap_name,
43 on_op_complete,
44 event.op_tid, false);
45 }
46
47 void execute(const journal::SnapRemoveEvent &_) {
48 image_ctx.operations->execute_snap_remove(event.snap_namespace,
49 event.snap_name,
50 on_op_complete);
51 }
52
53 void execute(const journal::SnapRenameEvent &_) {
54 image_ctx.operations->execute_snap_rename(event.snap_id,
55 event.dst_snap_name,
56 on_op_complete);
57 }
58
59 void execute(const journal::SnapProtectEvent &_) {
60 image_ctx.operations->execute_snap_protect(event.snap_namespace,
61 event.snap_name,
62 on_op_complete);
63 }
64
65 void execute(const journal::SnapUnprotectEvent &_) {
66 image_ctx.operations->execute_snap_unprotect(event.snap_namespace,
67 event.snap_name,
68 on_op_complete);
69 }
70
71 void execute(const journal::SnapRollbackEvent &_) {
72 image_ctx.operations->execute_snap_rollback(event.snap_namespace,
73 event.snap_name,
74 no_op_progress_callback,
75 on_op_complete);
76 }
77
78 void execute(const journal::RenameEvent &_) {
79 image_ctx.operations->execute_rename(event.image_name,
80 on_op_complete);
81 }
82
83 void execute(const journal::ResizeEvent &_) {
84 image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback,
85 on_op_complete, event.op_tid);
86 }
87
88 void execute(const journal::FlattenEvent &_) {
89 image_ctx.operations->execute_flatten(no_op_progress_callback,
90 on_op_complete);
91 }
92
93 void execute(const journal::SnapLimitEvent &_) {
94 image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete);
95 }
96
97 void execute(const journal::UpdateFeaturesEvent &_) {
98 image_ctx.operations->execute_update_features(event.features, event.enabled,
99 on_op_complete, event.op_tid);
100 }
101
102 void execute(const journal::MetadataSetEvent &_) {
103 image_ctx.operations->execute_metadata_set(event.key, event.value,
104 on_op_complete);
105 }
106
107 void execute(const journal::MetadataRemoveEvent &_) {
108 image_ctx.operations->execute_metadata_remove(event.key, on_op_complete);
109 }
110
111 void finish(int r) override {
112 CephContext *cct = image_ctx.cct;
113 if (r < 0) {
114 lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl;
115 on_op_complete->complete(r);
116 return;
117 }
118
119 ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl;
120 RWLock::RLocker owner_locker(image_ctx.owner_lock);
121 execute(event);
122 }
123 };
124
125 template <typename I>
126 struct C_RefreshIfRequired : public Context {
127 I &image_ctx;
128 Context *on_finish;
129
130 C_RefreshIfRequired(I &image_ctx, Context *on_finish)
131 : image_ctx(image_ctx), on_finish(on_finish) {
132 }
133 ~C_RefreshIfRequired() override {
134 delete on_finish;
135 }
136
137 void finish(int r) override {
138 CephContext *cct = image_ctx.cct;
139 Context *ctx = on_finish;
140 on_finish = nullptr;
141
142 if (r < 0) {
143 lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl;
144 image_ctx.op_work_queue->queue(ctx, r);
145 return;
146 }
147
148 if (image_ctx.state->is_refresh_required()) {
149 ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": "
150 << "refresh required" << dendl;
151 image_ctx.state->refresh(ctx);
152 return;
153 }
154
155 image_ctx.op_work_queue->queue(ctx, 0);
156 }
157 };
158
159 } // anonymous namespace
160
161 #undef dout_prefix
162 #define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
163 << __func__
164
165 template <typename I>
166 Replay<I>::Replay(I &image_ctx)
167 : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock") {
168 }
169
170 template <typename I>
171 Replay<I>::~Replay() {
172 assert(m_in_flight_aio_flush == 0);
173 assert(m_in_flight_aio_modify == 0);
174 assert(m_aio_modify_unsafe_contexts.empty());
175 assert(m_aio_modify_safe_contexts.empty());
176 assert(m_op_events.empty());
177 assert(m_in_flight_op_events == 0);
178 }
179
180 template <typename I>
181 int Replay<I>::decode(bufferlist::iterator *it, EventEntry *event_entry) {
182 try {
183 ::decode(*event_entry, *it);
184 } catch (const buffer::error &err) {
185 return -EBADMSG;
186 }
187 return 0;
188 }
189
190 template <typename I>
191 void Replay<I>::process(const EventEntry &event_entry,
192 Context *on_ready, Context *on_safe) {
193 CephContext *cct = m_image_ctx.cct;
194 ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe
195 << dendl;
196
197 on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
198
199 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
200 boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
201 event_entry.event);
202 }
203
204 template <typename I>
205 void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
206 CephContext *cct = m_image_ctx.cct;
207 ldout(cct, 20) << dendl;
208
209 io::AioCompletion *flush_comp = nullptr;
210 on_finish = util::create_async_context_callback(
211 m_image_ctx, on_finish);
212
213 {
214 Mutex::Locker locker(m_lock);
215
216 // safely commit any remaining AIO modify operations
217 if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
218 flush_comp = create_aio_flush_completion(nullptr);
219 }
220
221 for (auto &op_event_pair : m_op_events) {
222 OpEvent &op_event = op_event_pair.second;
223 if (cancel_ops) {
224 // cancel ops that are waiting to start (waiting for
225 // OpFinishEvent or waiting for ready)
226 if (op_event.on_start_ready == nullptr &&
227 op_event.on_op_finish_event != nullptr) {
228 Context *on_op_finish_event = nullptr;
229 std::swap(on_op_finish_event, op_event.on_op_finish_event);
230 m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
231 }
232 } else if (op_event.on_op_finish_event != nullptr) {
233 // start ops waiting for OpFinishEvent
234 Context *on_op_finish_event = nullptr;
235 std::swap(on_op_finish_event, op_event.on_op_finish_event);
236 m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
237 } else if (op_event.on_start_ready != nullptr) {
238 // waiting for op ready
239 op_event_pair.second.finish_on_ready = true;
240 }
241 }
242
243 assert(m_flush_ctx == nullptr);
244 if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
245 std::swap(m_flush_ctx, on_finish);
246 }
247 }
248
249 // execute the following outside of lock scope
250 if (flush_comp != nullptr) {
251 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
252 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp);
253 }
254 if (on_finish != nullptr) {
255 on_finish->complete(0);
256 }
257 }
258
259 template <typename I>
260 void Replay<I>::flush(Context *on_finish) {
261 io::AioCompletion *aio_comp;
262 {
263 Mutex::Locker locker(m_lock);
264 aio_comp = create_aio_flush_completion(
265 util::create_async_context_callback(m_image_ctx, on_finish));
266 }
267
268 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
269 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp);
270 }
271
272 template <typename I>
273 void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
274 CephContext *cct = m_image_ctx.cct;
275 ldout(cct, 20) << ": op_tid=" << op_tid << dendl;
276
277 Mutex::Locker locker(m_lock);
278 auto op_it = m_op_events.find(op_tid);
279 assert(op_it != m_op_events.end());
280
281 OpEvent &op_event = op_it->second;
282 assert(op_event.op_in_progress &&
283 op_event.on_op_finish_event == nullptr &&
284 op_event.on_finish_ready == nullptr &&
285 op_event.on_finish_safe == nullptr);
286
287 // resume processing replay events
288 Context *on_start_ready = nullptr;
289 std::swap(on_start_ready, op_event.on_start_ready);
290 on_start_ready->complete(0);
291
292 // cancel has been requested -- send error to paused state machine
293 if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
294 m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
295 return;
296 }
297
298 // resume the op state machine once the associated OpFinishEvent
299 // is processed
300 op_event.on_op_finish_event = new FunctionContext(
301 [on_resume](int r) {
302 on_resume->complete(r);
303 });
304
305 // shut down request -- don't expect OpFinishEvent
306 if (op_event.finish_on_ready) {
307 m_image_ctx.op_work_queue->queue(on_resume, 0);
308 }
309 }
310
311 template <typename I>
312 void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
313 Context *on_ready, Context *on_safe) {
314 CephContext *cct = m_image_ctx.cct;
315 ldout(cct, 20) << ": AIO discard event" << dendl;
316
317 bool flush_required;
318 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
319 io::AIO_TYPE_DISCARD,
320 &flush_required);
321 io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
322 event.length, event.skip_partial_discard);
323 if (flush_required) {
324 m_lock.Lock();
325 auto flush_comp = create_aio_flush_completion(nullptr);
326 m_lock.Unlock();
327
328 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp);
329 }
330 }
331
332 template <typename I>
333 void Replay<I>::handle_event(const journal::AioWriteEvent &event,
334 Context *on_ready, Context *on_safe) {
335 CephContext *cct = m_image_ctx.cct;
336 ldout(cct, 20) << ": AIO write event" << dendl;
337
338 bufferlist data = event.data;
339 bool flush_required;
340 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
341 io::AIO_TYPE_WRITE,
342 &flush_required);
343 io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
344 {{event.offset, event.length}},
345 std::move(data), 0);
346 if (flush_required) {
347 m_lock.Lock();
348 auto flush_comp = create_aio_flush_completion(nullptr);
349 m_lock.Unlock();
350
351 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp);
352 }
353 }
354
355 template <typename I>
356 void Replay<I>::handle_event(const journal::AioFlushEvent &event,
357 Context *on_ready, Context *on_safe) {
358 CephContext *cct = m_image_ctx.cct;
359 ldout(cct, 20) << ": AIO flush event" << dendl;
360
361 io::AioCompletion *aio_comp;
362 {
363 Mutex::Locker locker(m_lock);
364 aio_comp = create_aio_flush_completion(on_safe);
365 }
366 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp);
367
368 on_ready->complete(0);
369 }
370
371 template <typename I>
372 void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
373 Context *on_ready, Context *on_safe) {
374 CephContext *cct = m_image_ctx.cct;
375 ldout(cct, 20) << ": AIO writesame event" << dendl;
376
377 bufferlist data = event.data;
378 bool flush_required;
379 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
380 io::AIO_TYPE_WRITESAME,
381 &flush_required);
382 io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp, event.offset,
383 event.length, std::move(data), 0);
384 if (flush_required) {
385 m_lock.Lock();
386 auto flush_comp = create_aio_flush_completion(nullptr);
387 m_lock.Unlock();
388
389 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp);
390 }
391 }
392
393 template <typename I>
394 void Replay<I>::handle_event(const journal::OpFinishEvent &event,
395 Context *on_ready, Context *on_safe) {
396 CephContext *cct = m_image_ctx.cct;
397 ldout(cct, 20) << ": Op finish event: "
398 << "op_tid=" << event.op_tid << dendl;
399
400 bool op_in_progress;
401 bool filter_ret_val;
402 Context *on_op_complete = nullptr;
403 Context *on_op_finish_event = nullptr;
404 {
405 Mutex::Locker locker(m_lock);
406 auto op_it = m_op_events.find(event.op_tid);
407 if (op_it == m_op_events.end()) {
408 ldout(cct, 10) << ": unable to locate associated op: assuming previously "
409 << "committed." << dendl;
410 on_ready->complete(0);
411 m_image_ctx.op_work_queue->queue(on_safe, 0);
412 return;
413 }
414
415 OpEvent &op_event = op_it->second;
416 assert(op_event.on_finish_safe == nullptr);
417 op_event.on_finish_ready = on_ready;
418 op_event.on_finish_safe = on_safe;
419 op_in_progress = op_event.op_in_progress;
420 std::swap(on_op_complete, op_event.on_op_complete);
421 std::swap(on_op_finish_event, op_event.on_op_finish_event);
422
423 // special errors which indicate op never started but was recorded
424 // as failed in the journal
425 filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0);
426 }
427
428 if (event.r < 0) {
429 if (op_in_progress) {
430 // bubble the error up to the in-progress op to cancel it
431 on_op_finish_event->complete(event.r);
432 } else {
433 // op hasn't been started -- bubble the error up since
434 // our image is now potentially in an inconsistent state
435 // since simple errors should have been caught before
436 // creating the op event
437 delete on_op_complete;
438 delete on_op_finish_event;
439 handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r);
440 }
441 return;
442 }
443
444 // journal recorded success -- apply the op now
445 on_op_finish_event->complete(0);
446 }
447
448 template <typename I>
449 void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
450 Context *on_ready, Context *on_safe) {
451 CephContext *cct = m_image_ctx.cct;
452 ldout(cct, 20) << ": Snap create event" << dendl;
453
454 Mutex::Locker locker(m_lock);
455 OpEvent *op_event;
456 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
457 on_safe, &op_event);
458 if (on_op_complete == nullptr) {
459 return;
460 }
461
462 // ignore errors caused due to replay
463 op_event->ignore_error_codes = {-EEXIST};
464
465 // avoid lock cycles
466 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
467 m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event,
468 on_op_complete)),
469 0);
470
471 // do not process more events until the state machine is ready
472 // since it will affect IO
473 op_event->op_in_progress = true;
474 op_event->on_start_ready = on_ready;
475 }
476
477 template <typename I>
478 void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
479 Context *on_ready, Context *on_safe) {
480 CephContext *cct = m_image_ctx.cct;
481 ldout(cct, 20) << ": Snap remove event" << dendl;
482
483 Mutex::Locker locker(m_lock);
484 OpEvent *op_event;
485 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
486 on_safe, &op_event);
487 if (on_op_complete == nullptr) {
488 return;
489 }
490
491 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
492 m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event,
493 on_op_complete));
494
495 // ignore errors caused due to replay
496 op_event->ignore_error_codes = {-ENOENT};
497
498 on_ready->complete(0);
499 }
500
501 template <typename I>
502 void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
503 Context *on_ready, Context *on_safe) {
504 CephContext *cct = m_image_ctx.cct;
505 ldout(cct, 20) << ": Snap rename event" << dendl;
506
507 Mutex::Locker locker(m_lock);
508 OpEvent *op_event;
509 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
510 on_safe, &op_event);
511 if (on_op_complete == nullptr) {
512 return;
513 }
514
515 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
516 m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event,
517 on_op_complete));
518
519 // ignore errors caused due to replay
520 op_event->ignore_error_codes = {-EEXIST};
521
522 on_ready->complete(0);
523 }
524
525 template <typename I>
526 void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
527 Context *on_ready, Context *on_safe) {
528 CephContext *cct = m_image_ctx.cct;
529 ldout(cct, 20) << ": Snap protect event" << dendl;
530
531 Mutex::Locker locker(m_lock);
532 OpEvent *op_event;
533 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
534 on_safe, &op_event);
535 if (on_op_complete == nullptr) {
536 return;
537 }
538
539 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
540 m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event,
541 on_op_complete));
542
543 // ignore errors caused due to replay
544 op_event->ignore_error_codes = {-EBUSY};
545
546 on_ready->complete(0);
547 }
548
549 template <typename I>
550 void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
551 Context *on_ready, Context *on_safe) {
552 CephContext *cct = m_image_ctx.cct;
553 ldout(cct, 20) << ": Snap unprotect event" << dendl;
554
555 Mutex::Locker locker(m_lock);
556 OpEvent *op_event;
557 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
558 on_safe, &op_event);
559 if (on_op_complete == nullptr) {
560 return;
561 }
562
563 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
564 m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx,
565 event,
566 on_op_complete));
567
568 // ignore errors recorded in the journal
569 op_event->op_finish_error_codes = {-EBUSY};
570
571 // ignore errors caused due to replay
572 op_event->ignore_error_codes = {-EINVAL};
573
574 on_ready->complete(0);
575 }
576
577 template <typename I>
578 void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
579 Context *on_ready, Context *on_safe) {
580 CephContext *cct = m_image_ctx.cct;
581 ldout(cct, 20) << ": Snap rollback start event" << dendl;
582
583 Mutex::Locker locker(m_lock);
584 OpEvent *op_event;
585 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
586 on_safe, &op_event);
587 if (on_op_complete == nullptr) {
588 return;
589 }
590
591 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
592 m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx,
593 event,
594 on_op_complete));
595
596 on_ready->complete(0);
597 }
598
599 template <typename I>
600 void Replay<I>::handle_event(const journal::RenameEvent &event,
601 Context *on_ready, Context *on_safe) {
602 CephContext *cct = m_image_ctx.cct;
603 ldout(cct, 20) << ": Rename event" << dendl;
604
605 Mutex::Locker locker(m_lock);
606 OpEvent *op_event;
607 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
608 on_safe, &op_event);
609 if (on_op_complete == nullptr) {
610 return;
611 }
612
613 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
614 m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event,
615 on_op_complete));
616
617 // ignore errors caused due to replay
618 op_event->ignore_error_codes = {-EEXIST};
619
620 on_ready->complete(0);
621 }
622
623 template <typename I>
624 void Replay<I>::handle_event(const journal::ResizeEvent &event,
625 Context *on_ready, Context *on_safe) {
626 CephContext *cct = m_image_ctx.cct;
627 ldout(cct, 20) << ": Resize start event" << dendl;
628
629 Mutex::Locker locker(m_lock);
630 OpEvent *op_event;
631 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
632 on_safe, &op_event);
633 if (on_op_complete == nullptr) {
634 return;
635 }
636
637 // avoid lock cycles
638 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
639 m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event,
640 on_op_complete)), 0);
641
642 // do not process more events until the state machine is ready
643 // since it will affect IO
644 op_event->op_in_progress = true;
645 op_event->on_start_ready = on_ready;
646 }
647
648 template <typename I>
649 void Replay<I>::handle_event(const journal::FlattenEvent &event,
650 Context *on_ready, Context *on_safe) {
651 CephContext *cct = m_image_ctx.cct;
652 ldout(cct, 20) << ": Flatten start event" << dendl;
653
654 Mutex::Locker locker(m_lock);
655 OpEvent *op_event;
656 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
657 on_safe, &op_event);
658 if (on_op_complete == nullptr) {
659 return;
660 }
661
662 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
663 m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event,
664 on_op_complete));
665
666 // ignore errors caused due to replay
667 op_event->ignore_error_codes = {-EINVAL};
668
669 on_ready->complete(0);
670 }
671
672 template <typename I>
673 void Replay<I>::handle_event(const journal::DemotePromoteEvent &event,
674 Context *on_ready, Context *on_safe) {
675 CephContext *cct = m_image_ctx.cct;
676 ldout(cct, 20) << ": Demote/Promote event" << dendl;
677 on_ready->complete(0);
678 on_safe->complete(0);
679 }
680
681 template <typename I>
682 void Replay<I>::handle_event(const journal::SnapLimitEvent &event,
683 Context *on_ready, Context *on_safe) {
684 CephContext *cct = m_image_ctx.cct;
685 ldout(cct, 20) << ": Snap limit event" << dendl;
686
687 Mutex::Locker locker(m_lock);
688 OpEvent *op_event;
689 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
690 on_safe, &op_event);
691 if (on_op_complete == nullptr) {
692 return;
693 }
694
695 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
696 m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx,
697 event,
698 on_op_complete));
699
700 on_ready->complete(0);
701 }
702
703 template <typename I>
704 void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event,
705 Context *on_ready, Context *on_safe) {
706 CephContext *cct = m_image_ctx.cct;
707 ldout(cct, 20) << ": Update features event" << dendl;
708
709 Mutex::Locker locker(m_lock);
710 OpEvent *op_event;
711 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
712 on_safe, &op_event);
713 if (on_op_complete == nullptr) {
714 return;
715 }
716
717 // avoid lock cycles
718 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
719 m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>(
720 m_image_ctx, event, on_op_complete)), 0);
721
722 // do not process more events until the state machine is ready
723 // since it will affect IO
724 op_event->op_in_progress = true;
725 op_event->on_start_ready = on_ready;
726 }
727
728 template <typename I>
729 void Replay<I>::handle_event(const journal::MetadataSetEvent &event,
730 Context *on_ready, Context *on_safe) {
731 CephContext *cct = m_image_ctx.cct;
732 ldout(cct, 20) << ": Metadata set event" << dendl;
733
734 Mutex::Locker locker(m_lock);
735 OpEvent *op_event;
736 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
737 on_safe, &op_event);
738 if (on_op_complete == nullptr) {
739 return;
740 }
741
742 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
743 m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>(
744 m_image_ctx, event, on_op_complete));
745
746 on_ready->complete(0);
747 }
748
749 template <typename I>
750 void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event,
751 Context *on_ready, Context *on_safe) {
752 CephContext *cct = m_image_ctx.cct;
753 ldout(cct, 20) << ": Metadata remove event" << dendl;
754
755 Mutex::Locker locker(m_lock);
756 OpEvent *op_event;
757 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
758 on_safe, &op_event);
759 if (on_op_complete == nullptr) {
760 return;
761 }
762
763 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
764 m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>(
765 m_image_ctx, event, on_op_complete));
766
767 // ignore errors caused due to replay
768 op_event->ignore_error_codes = {-ENOENT};
769
770 on_ready->complete(0);
771 }
772
773 template <typename I>
774 void Replay<I>::handle_event(const journal::UnknownEvent &event,
775 Context *on_ready, Context *on_safe) {
776 CephContext *cct = m_image_ctx.cct;
777 ldout(cct, 20) << ": unknown event" << dendl;
778 on_ready->complete(0);
779 on_safe->complete(0);
780 }
781
782 template <typename I>
783 void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
784 int r) {
785 Mutex::Locker locker(m_lock);
786 CephContext *cct = m_image_ctx.cct;
787 ldout(cct, 20) << ": on_ready=" << on_ready << ", "
788 << "on_safe=" << on_safe << ", r=" << r << dendl;
789
790 if (on_ready != nullptr) {
791 on_ready->complete(0);
792 }
793 if (r < 0) {
794 lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl;
795 on_safe->complete(r);
796 return;
797 }
798
799 // will be completed after next flush operation completes
800 m_aio_modify_safe_contexts.insert(on_safe);
801 }
802
803 template <typename I>
804 void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
805 Contexts &on_safe_ctxs, int r) {
806 CephContext *cct = m_image_ctx.cct;
807 ldout(cct, 20) << ": r=" << r << dendl;
808
809 if (r < 0) {
810 lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl;
811 }
812
813 Context *on_aio_ready = nullptr;
814 Context *on_flush = nullptr;
815 {
816 Mutex::Locker locker(m_lock);
817 assert(m_in_flight_aio_flush > 0);
818 assert(m_in_flight_aio_modify >= on_safe_ctxs.size());
819 --m_in_flight_aio_flush;
820 m_in_flight_aio_modify -= on_safe_ctxs.size();
821
822 std::swap(on_aio_ready, m_on_aio_ready);
823 if (m_in_flight_op_events == 0 &&
824 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
825 on_flush = m_flush_ctx;
826 }
827
828 // strip out previously failed on_safe contexts
829 for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
830 if (m_aio_modify_safe_contexts.erase(*it)) {
831 ++it;
832 } else {
833 it = on_safe_ctxs.erase(it);
834 }
835 }
836 }
837
838 if (on_aio_ready != nullptr) {
839 ldout(cct, 10) << ": resuming paused AIO" << dendl;
840 on_aio_ready->complete(0);
841 }
842
843 if (on_flush_safe != nullptr) {
844 on_safe_ctxs.push_back(on_flush_safe);
845 }
846 for (auto ctx : on_safe_ctxs) {
847 ldout(cct, 20) << ": completing safe context: " << ctx << dendl;
848 ctx->complete(r);
849 }
850
851 if (on_flush != nullptr) {
852 ldout(cct, 20) << ": completing flush context: " << on_flush << dendl;
853 on_flush->complete(r);
854 }
855 }
856
857 template <typename I>
858 Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
859 Context *on_ready,
860 Context *on_safe,
861 OpEvent **op_event) {
862 CephContext *cct = m_image_ctx.cct;
863
864 assert(m_lock.is_locked());
865 if (m_op_events.count(op_tid) != 0) {
866 lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl;
867
868 // on_ready is already async but on failure invoke on_safe async
869 // as well
870 on_ready->complete(0);
871 m_image_ctx.op_work_queue->queue(on_safe, -EINVAL);
872 return nullptr;
873 }
874
875 ++m_in_flight_op_events;
876 *op_event = &m_op_events[op_tid];
877 (*op_event)->on_start_safe = on_safe;
878
879 Context *on_op_complete = new C_OpOnComplete(this, op_tid);
880 (*op_event)->on_op_complete = on_op_complete;
881 return on_op_complete;
882 }
883
884 template <typename I>
885 void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
886 CephContext *cct = m_image_ctx.cct;
887 ldout(cct, 20) << ": op_tid=" << op_tid << ", "
888 << "r=" << r << dendl;
889
890 OpEvent op_event;
891 bool shutting_down = false;
892 {
893 Mutex::Locker locker(m_lock);
894 auto op_it = m_op_events.find(op_tid);
895 assert(op_it != m_op_events.end());
896
897 op_event = std::move(op_it->second);
898 m_op_events.erase(op_it);
899
900 shutting_down = (m_flush_ctx != nullptr);
901 }
902
903 assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
904 if (op_event.on_start_ready != nullptr) {
905 // blocking op event failed before it became ready
906 assert(op_event.on_finish_ready == nullptr &&
907 op_event.on_finish_safe == nullptr);
908
909 op_event.on_start_ready->complete(0);
910 } else {
911 // event kicked off by OpFinishEvent
912 assert((op_event.on_finish_ready != nullptr &&
913 op_event.on_finish_safe != nullptr) || shutting_down);
914 }
915
916 if (op_event.on_op_finish_event != nullptr) {
917 op_event.on_op_finish_event->complete(r);
918 }
919
920 if (op_event.on_finish_ready != nullptr) {
921 op_event.on_finish_ready->complete(0);
922 }
923
924 // filter out errors caused by replay of the same op
925 if (r < 0 && op_event.ignore_error_codes.count(r) != 0) {
926 r = 0;
927 }
928
929 op_event.on_start_safe->complete(r);
930 if (op_event.on_finish_safe != nullptr) {
931 op_event.on_finish_safe->complete(r);
932 }
933
934 // shut down request might have occurred while lock was
935 // dropped -- handle if pending
936 Context *on_flush = nullptr;
937 {
938 Mutex::Locker locker(m_lock);
939 assert(m_in_flight_op_events > 0);
940 --m_in_flight_op_events;
941 if (m_in_flight_op_events == 0 &&
942 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
943 on_flush = m_flush_ctx;
944 }
945 }
946 if (on_flush != nullptr) {
947 m_image_ctx.op_work_queue->queue(on_flush, 0);
948 }
949 }
950
951 template <typename I>
952 io::AioCompletion *
953 Replay<I>::create_aio_modify_completion(Context *on_ready, Context *on_safe,
954 io::aio_type_t aio_type,
955 bool *flush_required) {
956 Mutex::Locker locker(m_lock);
957 CephContext *cct = m_image_ctx.cct;
958 assert(m_on_aio_ready == nullptr);
959
960 ++m_in_flight_aio_modify;
961 m_aio_modify_unsafe_contexts.push_back(on_safe);
962
963 // FLUSH if we hit the low-water mark -- on_safe contexts are
964 // completed by flushes-only so that we don't move the journal
965 // commit position until safely on-disk
966
967 *flush_required = (m_aio_modify_unsafe_contexts.size() ==
968 IN_FLIGHT_IO_LOW_WATER_MARK);
969 if (*flush_required) {
970 ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
971 << dendl;
972 }
973
974 // READY for more events if:
975 // * not at high-water mark for IO
976 // * in-flight ops are at a consistent point (snap create has IO flushed,
977 // shrink has adjusted clip boundary, etc) -- should have already been
978 // flagged not-ready
979 if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
980 ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay"
981 << dendl;
982 assert(m_on_aio_ready == nullptr);
983 std::swap(m_on_aio_ready, on_ready);
984 }
985
986 // when the modification is ACKed by librbd, we can process the next
987 // event. when flushed, the completion of the next flush will fire the
988 // on_safe callback
989 auto aio_comp = io::AioCompletion::create_and_start<Context>(
990 new C_AioModifyComplete(this, on_ready, on_safe),
991 util::get_image_ctx(&m_image_ctx), aio_type);
992 return aio_comp;
993 }
994
995 template <typename I>
996 io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
997 assert(m_lock.is_locked());
998
999 ++m_in_flight_aio_flush;
1000
1001 // associate all prior write/discard ops to this flush request
1002 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1003 new C_AioFlushComplete(this, on_safe,
1004 std::move(m_aio_modify_unsafe_contexts)),
1005 util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH);
1006 m_aio_modify_unsafe_contexts.clear();
1007 return aio_comp;
1008 }
1009
1010 } // namespace journal
1011 } // namespace librbd
1012
1013 template class librbd::journal::Replay<librbd::ImageCtx>;