]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/journal/Replay.cc
update sources to 12.2.7
[ceph.git] / ceph / src / librbd / journal / Replay.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/journal/Replay.h"
5#include "common/dout.h"
6#include "common/errno.h"
7#include "common/WorkQueue.h"
31f18b77 8#include "librbd/ExclusiveLock.h"
7c673cae
FG
9#include "librbd/ImageCtx.h"
10#include "librbd/ImageState.h"
11#include "librbd/internal.h"
12#include "librbd/Operations.h"
13#include "librbd/Utils.h"
14#include "librbd/io/AioCompletion.h"
15#include "librbd/io/ImageRequest.h"
16
17#define dout_subsys ceph_subsys_rbd
18#undef dout_prefix
19#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
20
21namespace librbd {
22namespace journal {
23
24namespace {
25
26static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
28
29static NoOpProgressContext no_op_progress_callback;
30
31template <typename I, typename E>
32struct ExecuteOp : public Context {
33 I &image_ctx;
34 E event;
35 Context *on_op_complete;
36
37 ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete)
38 : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) {
39 }
40
41 void execute(const journal::SnapCreateEvent &_) {
42 image_ctx.operations->execute_snap_create(event.snap_namespace,
43 event.snap_name,
44 on_op_complete,
45 event.op_tid, false);
46 }
47
48 void execute(const journal::SnapRemoveEvent &_) {
49 image_ctx.operations->execute_snap_remove(event.snap_namespace,
50 event.snap_name,
51 on_op_complete);
52 }
53
54 void execute(const journal::SnapRenameEvent &_) {
55 image_ctx.operations->execute_snap_rename(event.snap_id,
56 event.dst_snap_name,
57 on_op_complete);
58 }
59
60 void execute(const journal::SnapProtectEvent &_) {
61 image_ctx.operations->execute_snap_protect(event.snap_namespace,
62 event.snap_name,
63 on_op_complete);
64 }
65
66 void execute(const journal::SnapUnprotectEvent &_) {
67 image_ctx.operations->execute_snap_unprotect(event.snap_namespace,
68 event.snap_name,
69 on_op_complete);
70 }
71
72 void execute(const journal::SnapRollbackEvent &_) {
73 image_ctx.operations->execute_snap_rollback(event.snap_namespace,
74 event.snap_name,
75 no_op_progress_callback,
76 on_op_complete);
77 }
78
79 void execute(const journal::RenameEvent &_) {
80 image_ctx.operations->execute_rename(event.image_name,
81 on_op_complete);
82 }
83
84 void execute(const journal::ResizeEvent &_) {
85 image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback,
86 on_op_complete, event.op_tid);
87 }
88
89 void execute(const journal::FlattenEvent &_) {
90 image_ctx.operations->execute_flatten(no_op_progress_callback,
91 on_op_complete);
92 }
93
94 void execute(const journal::SnapLimitEvent &_) {
95 image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete);
96 }
97
98 void execute(const journal::UpdateFeaturesEvent &_) {
99 image_ctx.operations->execute_update_features(event.features, event.enabled,
100 on_op_complete, event.op_tid);
101 }
102
103 void execute(const journal::MetadataSetEvent &_) {
104 image_ctx.operations->execute_metadata_set(event.key, event.value,
105 on_op_complete);
106 }
107
108 void execute(const journal::MetadataRemoveEvent &_) {
109 image_ctx.operations->execute_metadata_remove(event.key, on_op_complete);
110 }
111
112 void finish(int r) override {
113 CephContext *cct = image_ctx.cct;
114 if (r < 0) {
115 lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl;
116 on_op_complete->complete(r);
117 return;
118 }
119
120 ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl;
121 RWLock::RLocker owner_locker(image_ctx.owner_lock);
31f18b77
FG
122
123 if (image_ctx.exclusive_lock == nullptr ||
124 !image_ctx.exclusive_lock->accept_ops()) {
125 ldout(cct, 5) << ": lost exclusive lock -- skipping op" << dendl;
126 on_op_complete->complete(-ECANCELED);
127 return;
128 }
129
7c673cae
FG
130 execute(event);
131 }
132};
133
134template <typename I>
135struct C_RefreshIfRequired : public Context {
136 I &image_ctx;
137 Context *on_finish;
138
139 C_RefreshIfRequired(I &image_ctx, Context *on_finish)
140 : image_ctx(image_ctx), on_finish(on_finish) {
141 }
142 ~C_RefreshIfRequired() override {
143 delete on_finish;
144 }
145
146 void finish(int r) override {
147 CephContext *cct = image_ctx.cct;
148 Context *ctx = on_finish;
149 on_finish = nullptr;
150
151 if (r < 0) {
152 lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl;
153 image_ctx.op_work_queue->queue(ctx, r);
154 return;
155 }
156
157 if (image_ctx.state->is_refresh_required()) {
158 ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": "
159 << "refresh required" << dendl;
160 image_ctx.state->refresh(ctx);
161 return;
162 }
163
164 image_ctx.op_work_queue->queue(ctx, 0);
165 }
166};
167
168} // anonymous namespace
169
170#undef dout_prefix
171#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
172 << __func__
173
174template <typename I>
175Replay<I>::Replay(I &image_ctx)
176 : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock") {
177}
178
179template <typename I>
180Replay<I>::~Replay() {
181 assert(m_in_flight_aio_flush == 0);
182 assert(m_in_flight_aio_modify == 0);
183 assert(m_aio_modify_unsafe_contexts.empty());
184 assert(m_aio_modify_safe_contexts.empty());
185 assert(m_op_events.empty());
186 assert(m_in_flight_op_events == 0);
187}
188
189template <typename I>
190int Replay<I>::decode(bufferlist::iterator *it, EventEntry *event_entry) {
191 try {
192 ::decode(*event_entry, *it);
193 } catch (const buffer::error &err) {
194 return -EBADMSG;
195 }
196 return 0;
197}
198
199template <typename I>
200void Replay<I>::process(const EventEntry &event_entry,
201 Context *on_ready, Context *on_safe) {
202 CephContext *cct = m_image_ctx.cct;
203 ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe
204 << dendl;
205
206 on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
207
208 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
31f18b77
FG
209 if (m_image_ctx.exclusive_lock == nullptr ||
210 !m_image_ctx.exclusive_lock->accept_ops()) {
211 ldout(cct, 5) << ": lost exclusive lock -- skipping event" << dendl;
212 m_image_ctx.op_work_queue->queue(on_safe, -ECANCELED);
213 on_ready->complete(0);
214 return;
215 }
216
7c673cae
FG
217 boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
218 event_entry.event);
219}
220
221template <typename I>
222void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
223 CephContext *cct = m_image_ctx.cct;
224 ldout(cct, 20) << dendl;
225
226 io::AioCompletion *flush_comp = nullptr;
227 on_finish = util::create_async_context_callback(
228 m_image_ctx, on_finish);
229
230 {
231 Mutex::Locker locker(m_lock);
232
233 // safely commit any remaining AIO modify operations
234 if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
235 flush_comp = create_aio_flush_completion(nullptr);
31f18b77 236 assert(flush_comp != nullptr);
7c673cae
FG
237 }
238
239 for (auto &op_event_pair : m_op_events) {
240 OpEvent &op_event = op_event_pair.second;
241 if (cancel_ops) {
242 // cancel ops that are waiting to start (waiting for
243 // OpFinishEvent or waiting for ready)
244 if (op_event.on_start_ready == nullptr &&
245 op_event.on_op_finish_event != nullptr) {
246 Context *on_op_finish_event = nullptr;
247 std::swap(on_op_finish_event, op_event.on_op_finish_event);
248 m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
249 }
250 } else if (op_event.on_op_finish_event != nullptr) {
251 // start ops waiting for OpFinishEvent
252 Context *on_op_finish_event = nullptr;
253 std::swap(on_op_finish_event, op_event.on_op_finish_event);
254 m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
255 } else if (op_event.on_start_ready != nullptr) {
256 // waiting for op ready
257 op_event_pair.second.finish_on_ready = true;
258 }
259 }
260
31f18b77
FG
261 assert(!m_shut_down);
262 m_shut_down = true;
263
7c673cae
FG
264 assert(m_flush_ctx == nullptr);
265 if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
266 std::swap(m_flush_ctx, on_finish);
267 }
268 }
269
270 // execute the following outside of lock scope
271 if (flush_comp != nullptr) {
272 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
31f18b77 273 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
7c673cae
FG
274 }
275 if (on_finish != nullptr) {
276 on_finish->complete(0);
277 }
278}
279
280template <typename I>
281void Replay<I>::flush(Context *on_finish) {
282 io::AioCompletion *aio_comp;
283 {
284 Mutex::Locker locker(m_lock);
285 aio_comp = create_aio_flush_completion(
286 util::create_async_context_callback(m_image_ctx, on_finish));
31f18b77
FG
287 if (aio_comp == nullptr) {
288 return;
289 }
7c673cae
FG
290 }
291
292 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
31f18b77 293 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
7c673cae
FG
294}
295
296template <typename I>
297void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
298 CephContext *cct = m_image_ctx.cct;
299 ldout(cct, 20) << ": op_tid=" << op_tid << dendl;
300
301 Mutex::Locker locker(m_lock);
302 auto op_it = m_op_events.find(op_tid);
303 assert(op_it != m_op_events.end());
304
305 OpEvent &op_event = op_it->second;
306 assert(op_event.op_in_progress &&
307 op_event.on_op_finish_event == nullptr &&
308 op_event.on_finish_ready == nullptr &&
309 op_event.on_finish_safe == nullptr);
310
311 // resume processing replay events
312 Context *on_start_ready = nullptr;
313 std::swap(on_start_ready, op_event.on_start_ready);
314 on_start_ready->complete(0);
315
316 // cancel has been requested -- send error to paused state machine
317 if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
318 m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
319 return;
320 }
321
322 // resume the op state machine once the associated OpFinishEvent
323 // is processed
324 op_event.on_op_finish_event = new FunctionContext(
325 [on_resume](int r) {
326 on_resume->complete(r);
327 });
328
329 // shut down request -- don't expect OpFinishEvent
330 if (op_event.finish_on_ready) {
331 m_image_ctx.op_work_queue->queue(on_resume, 0);
332 }
333}
334
335template <typename I>
336void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
337 Context *on_ready, Context *on_safe) {
338 CephContext *cct = m_image_ctx.cct;
339 ldout(cct, 20) << ": AIO discard event" << dendl;
340
341 bool flush_required;
342 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
343 io::AIO_TYPE_DISCARD,
c07f9fc5
FG
344 &flush_required,
345 {});
31f18b77
FG
346 if (aio_comp == nullptr) {
347 return;
348 }
349
7c673cae 350 io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
31f18b77
FG
351 event.length, event.skip_partial_discard,
352 {});
7c673cae
FG
353 if (flush_required) {
354 m_lock.Lock();
355 auto flush_comp = create_aio_flush_completion(nullptr);
356 m_lock.Unlock();
357
31f18b77
FG
358 if (flush_comp != nullptr) {
359 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
360 }
7c673cae
FG
361 }
362}
363
364template <typename I>
365void Replay<I>::handle_event(const journal::AioWriteEvent &event,
366 Context *on_ready, Context *on_safe) {
367 CephContext *cct = m_image_ctx.cct;
368 ldout(cct, 20) << ": AIO write event" << dendl;
369
370 bufferlist data = event.data;
371 bool flush_required;
372 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
373 io::AIO_TYPE_WRITE,
c07f9fc5
FG
374 &flush_required,
375 {});
31f18b77
FG
376 if (aio_comp == nullptr) {
377 return;
378 }
379
7c673cae
FG
380 io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
381 {{event.offset, event.length}},
31f18b77 382 std::move(data), 0, {});
7c673cae
FG
383 if (flush_required) {
384 m_lock.Lock();
385 auto flush_comp = create_aio_flush_completion(nullptr);
386 m_lock.Unlock();
387
31f18b77
FG
388 if (flush_comp != nullptr) {
389 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
390 }
7c673cae
FG
391 }
392}
393
394template <typename I>
395void Replay<I>::handle_event(const journal::AioFlushEvent &event,
396 Context *on_ready, Context *on_safe) {
397 CephContext *cct = m_image_ctx.cct;
398 ldout(cct, 20) << ": AIO flush event" << dendl;
399
400 io::AioCompletion *aio_comp;
401 {
402 Mutex::Locker locker(m_lock);
403 aio_comp = create_aio_flush_completion(on_safe);
404 }
7c673cae 405
31f18b77
FG
406 if (aio_comp != nullptr) {
407 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
408 }
7c673cae
FG
409 on_ready->complete(0);
410}
411
412template <typename I>
413void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
414 Context *on_ready, Context *on_safe) {
415 CephContext *cct = m_image_ctx.cct;
416 ldout(cct, 20) << ": AIO writesame event" << dendl;
417
418 bufferlist data = event.data;
419 bool flush_required;
420 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
421 io::AIO_TYPE_WRITESAME,
c07f9fc5
FG
422 &flush_required,
423 {});
31f18b77
FG
424 if (aio_comp == nullptr) {
425 return;
426 }
427
7c673cae 428 io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp, event.offset,
31f18b77 429 event.length, std::move(data), 0, {});
7c673cae
FG
430 if (flush_required) {
431 m_lock.Lock();
432 auto flush_comp = create_aio_flush_completion(nullptr);
433 m_lock.Unlock();
434
31f18b77
FG
435 if (flush_comp != nullptr) {
436 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
437 }
7c673cae
FG
438 }
439}
440
c07f9fc5
FG
441 template <typename I>
442 void Replay<I>::handle_event(const journal::AioCompareAndWriteEvent &event,
443 Context *on_ready, Context *on_safe) {
444 CephContext *cct = m_image_ctx.cct;
445 ldout(cct, 20) << ": AIO CompareAndWrite event" << dendl;
446
447 bufferlist cmp_data = event.cmp_data;
448 bufferlist write_data = event.write_data;
449 bool flush_required;
450 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
451 io::AIO_TYPE_COMPARE_AND_WRITE,
452 &flush_required,
453 {-EILSEQ});
454 io::ImageRequest<I>::aio_compare_and_write(&m_image_ctx, aio_comp,
455 {{event.offset, event.length}},
456 std::move(cmp_data),
457 std::move(write_data),
458 nullptr, 0, {});
459 if (flush_required) {
460 m_lock.Lock();
461 auto flush_comp = create_aio_flush_completion(nullptr);
462 m_lock.Unlock();
463
464 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
465 }
466}
467
7c673cae
FG
468template <typename I>
469void Replay<I>::handle_event(const journal::OpFinishEvent &event,
470 Context *on_ready, Context *on_safe) {
471 CephContext *cct = m_image_ctx.cct;
472 ldout(cct, 20) << ": Op finish event: "
473 << "op_tid=" << event.op_tid << dendl;
474
475 bool op_in_progress;
476 bool filter_ret_val;
477 Context *on_op_complete = nullptr;
478 Context *on_op_finish_event = nullptr;
479 {
480 Mutex::Locker locker(m_lock);
481 auto op_it = m_op_events.find(event.op_tid);
482 if (op_it == m_op_events.end()) {
483 ldout(cct, 10) << ": unable to locate associated op: assuming previously "
484 << "committed." << dendl;
485 on_ready->complete(0);
486 m_image_ctx.op_work_queue->queue(on_safe, 0);
487 return;
488 }
489
490 OpEvent &op_event = op_it->second;
491 assert(op_event.on_finish_safe == nullptr);
492 op_event.on_finish_ready = on_ready;
493 op_event.on_finish_safe = on_safe;
494 op_in_progress = op_event.op_in_progress;
495 std::swap(on_op_complete, op_event.on_op_complete);
496 std::swap(on_op_finish_event, op_event.on_op_finish_event);
497
498 // special errors which indicate op never started but was recorded
499 // as failed in the journal
500 filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0);
501 }
502
503 if (event.r < 0) {
504 if (op_in_progress) {
505 // bubble the error up to the in-progress op to cancel it
506 on_op_finish_event->complete(event.r);
507 } else {
508 // op hasn't been started -- bubble the error up since
509 // our image is now potentially in an inconsistent state
510 // since simple errors should have been caught before
511 // creating the op event
512 delete on_op_complete;
513 delete on_op_finish_event;
514 handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r);
515 }
516 return;
517 }
518
519 // journal recorded success -- apply the op now
520 on_op_finish_event->complete(0);
521}
522
523template <typename I>
524void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
525 Context *on_ready, Context *on_safe) {
526 CephContext *cct = m_image_ctx.cct;
527 ldout(cct, 20) << ": Snap create event" << dendl;
528
529 Mutex::Locker locker(m_lock);
530 OpEvent *op_event;
531 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
532 on_safe, &op_event);
533 if (on_op_complete == nullptr) {
534 return;
535 }
536
537 // ignore errors caused due to replay
538 op_event->ignore_error_codes = {-EEXIST};
539
540 // avoid lock cycles
541 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
542 m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event,
543 on_op_complete)),
544 0);
545
546 // do not process more events until the state machine is ready
547 // since it will affect IO
548 op_event->op_in_progress = true;
549 op_event->on_start_ready = on_ready;
550}
551
552template <typename I>
553void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
554 Context *on_ready, Context *on_safe) {
555 CephContext *cct = m_image_ctx.cct;
556 ldout(cct, 20) << ": Snap remove event" << dendl;
557
558 Mutex::Locker locker(m_lock);
559 OpEvent *op_event;
560 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
561 on_safe, &op_event);
562 if (on_op_complete == nullptr) {
563 return;
564 }
565
566 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
567 m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event,
568 on_op_complete));
569
570 // ignore errors caused due to replay
571 op_event->ignore_error_codes = {-ENOENT};
572
573 on_ready->complete(0);
574}
575
576template <typename I>
577void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
578 Context *on_ready, Context *on_safe) {
579 CephContext *cct = m_image_ctx.cct;
580 ldout(cct, 20) << ": Snap rename event" << dendl;
581
582 Mutex::Locker locker(m_lock);
583 OpEvent *op_event;
584 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
585 on_safe, &op_event);
586 if (on_op_complete == nullptr) {
587 return;
588 }
589
590 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
591 m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event,
592 on_op_complete));
593
594 // ignore errors caused due to replay
595 op_event->ignore_error_codes = {-EEXIST};
596
597 on_ready->complete(0);
598}
599
600template <typename I>
601void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
602 Context *on_ready, Context *on_safe) {
603 CephContext *cct = m_image_ctx.cct;
604 ldout(cct, 20) << ": Snap protect event" << dendl;
605
606 Mutex::Locker locker(m_lock);
607 OpEvent *op_event;
608 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
609 on_safe, &op_event);
610 if (on_op_complete == nullptr) {
611 return;
612 }
613
614 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
615 m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event,
616 on_op_complete));
617
618 // ignore errors caused due to replay
619 op_event->ignore_error_codes = {-EBUSY};
620
621 on_ready->complete(0);
622}
623
624template <typename I>
625void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
626 Context *on_ready, Context *on_safe) {
627 CephContext *cct = m_image_ctx.cct;
628 ldout(cct, 20) << ": Snap unprotect event" << dendl;
629
630 Mutex::Locker locker(m_lock);
631 OpEvent *op_event;
632 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
633 on_safe, &op_event);
634 if (on_op_complete == nullptr) {
635 return;
636 }
637
638 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
639 m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx,
640 event,
641 on_op_complete));
642
643 // ignore errors recorded in the journal
644 op_event->op_finish_error_codes = {-EBUSY};
645
646 // ignore errors caused due to replay
647 op_event->ignore_error_codes = {-EINVAL};
648
649 on_ready->complete(0);
650}
651
652template <typename I>
653void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
654 Context *on_ready, Context *on_safe) {
655 CephContext *cct = m_image_ctx.cct;
656 ldout(cct, 20) << ": Snap rollback start event" << dendl;
657
658 Mutex::Locker locker(m_lock);
659 OpEvent *op_event;
660 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
661 on_safe, &op_event);
662 if (on_op_complete == nullptr) {
663 return;
664 }
665
666 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
667 m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx,
668 event,
669 on_op_complete));
670
671 on_ready->complete(0);
672}
673
674template <typename I>
675void Replay<I>::handle_event(const journal::RenameEvent &event,
676 Context *on_ready, Context *on_safe) {
677 CephContext *cct = m_image_ctx.cct;
678 ldout(cct, 20) << ": Rename event" << dendl;
679
680 Mutex::Locker locker(m_lock);
681 OpEvent *op_event;
682 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
683 on_safe, &op_event);
684 if (on_op_complete == nullptr) {
685 return;
686 }
687
688 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
689 m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event,
690 on_op_complete));
691
692 // ignore errors caused due to replay
693 op_event->ignore_error_codes = {-EEXIST};
694
695 on_ready->complete(0);
696}
697
698template <typename I>
699void Replay<I>::handle_event(const journal::ResizeEvent &event,
700 Context *on_ready, Context *on_safe) {
701 CephContext *cct = m_image_ctx.cct;
702 ldout(cct, 20) << ": Resize start event" << dendl;
703
704 Mutex::Locker locker(m_lock);
705 OpEvent *op_event;
706 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
707 on_safe, &op_event);
708 if (on_op_complete == nullptr) {
709 return;
710 }
711
712 // avoid lock cycles
713 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
714 m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event,
715 on_op_complete)), 0);
716
717 // do not process more events until the state machine is ready
718 // since it will affect IO
719 op_event->op_in_progress = true;
720 op_event->on_start_ready = on_ready;
721}
722
723template <typename I>
724void Replay<I>::handle_event(const journal::FlattenEvent &event,
725 Context *on_ready, Context *on_safe) {
726 CephContext *cct = m_image_ctx.cct;
727 ldout(cct, 20) << ": Flatten start event" << dendl;
728
729 Mutex::Locker locker(m_lock);
730 OpEvent *op_event;
731 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
732 on_safe, &op_event);
733 if (on_op_complete == nullptr) {
734 return;
735 }
736
737 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
738 m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event,
739 on_op_complete));
740
741 // ignore errors caused due to replay
742 op_event->ignore_error_codes = {-EINVAL};
743
744 on_ready->complete(0);
745}
746
747template <typename I>
748void Replay<I>::handle_event(const journal::DemotePromoteEvent &event,
749 Context *on_ready, Context *on_safe) {
750 CephContext *cct = m_image_ctx.cct;
751 ldout(cct, 20) << ": Demote/Promote event" << dendl;
752 on_ready->complete(0);
753 on_safe->complete(0);
754}
755
756template <typename I>
757void Replay<I>::handle_event(const journal::SnapLimitEvent &event,
758 Context *on_ready, Context *on_safe) {
759 CephContext *cct = m_image_ctx.cct;
760 ldout(cct, 20) << ": Snap limit event" << dendl;
761
762 Mutex::Locker locker(m_lock);
763 OpEvent *op_event;
764 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
765 on_safe, &op_event);
766 if (on_op_complete == nullptr) {
767 return;
768 }
769
770 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
771 m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx,
772 event,
773 on_op_complete));
774
775 on_ready->complete(0);
776}
777
778template <typename I>
779void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event,
780 Context *on_ready, Context *on_safe) {
781 CephContext *cct = m_image_ctx.cct;
782 ldout(cct, 20) << ": Update features event" << dendl;
783
784 Mutex::Locker locker(m_lock);
785 OpEvent *op_event;
786 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
787 on_safe, &op_event);
788 if (on_op_complete == nullptr) {
789 return;
790 }
791
792 // avoid lock cycles
793 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
794 m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>(
795 m_image_ctx, event, on_op_complete)), 0);
796
797 // do not process more events until the state machine is ready
798 // since it will affect IO
799 op_event->op_in_progress = true;
800 op_event->on_start_ready = on_ready;
801}
802
803template <typename I>
804void Replay<I>::handle_event(const journal::MetadataSetEvent &event,
805 Context *on_ready, Context *on_safe) {
806 CephContext *cct = m_image_ctx.cct;
807 ldout(cct, 20) << ": Metadata set event" << dendl;
808
809 Mutex::Locker locker(m_lock);
810 OpEvent *op_event;
811 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
812 on_safe, &op_event);
813 if (on_op_complete == nullptr) {
814 return;
815 }
816
b32b8144
FG
817 on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
818 op_event->on_op_finish_event = util::create_async_context_callback(
7c673cae
FG
819 m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>(
820 m_image_ctx, event, on_op_complete));
821
822 on_ready->complete(0);
823}
824
825template <typename I>
826void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event,
827 Context *on_ready, Context *on_safe) {
828 CephContext *cct = m_image_ctx.cct;
829 ldout(cct, 20) << ": Metadata remove event" << dendl;
830
831 Mutex::Locker locker(m_lock);
832 OpEvent *op_event;
833 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
834 on_safe, &op_event);
835 if (on_op_complete == nullptr) {
836 return;
837 }
838
b32b8144
FG
839 on_op_complete = new C_RefreshIfRequired<I>(m_image_ctx, on_op_complete);
840 op_event->on_op_finish_event = util::create_async_context_callback(
7c673cae
FG
841 m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>(
842 m_image_ctx, event, on_op_complete));
843
844 // ignore errors caused due to replay
845 op_event->ignore_error_codes = {-ENOENT};
846
847 on_ready->complete(0);
848}
849
850template <typename I>
851void Replay<I>::handle_event(const journal::UnknownEvent &event,
852 Context *on_ready, Context *on_safe) {
853 CephContext *cct = m_image_ctx.cct;
854 ldout(cct, 20) << ": unknown event" << dendl;
855 on_ready->complete(0);
856 on_safe->complete(0);
857}
858
859template <typename I>
860void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
28e407b8
AA
861 int r, std::set<int> &filters,
862 bool writeback_cache_enabled) {
7c673cae
FG
863 Mutex::Locker locker(m_lock);
864 CephContext *cct = m_image_ctx.cct;
865 ldout(cct, 20) << ": on_ready=" << on_ready << ", "
866 << "on_safe=" << on_safe << ", r=" << r << dendl;
867
868 if (on_ready != nullptr) {
869 on_ready->complete(0);
870 }
c07f9fc5
FG
871
872 if (filters.find(r) != filters.end())
873 r = 0;
874
7c673cae
FG
875 if (r < 0) {
876 lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl;
877 on_safe->complete(r);
878 return;
879 }
880
28e407b8
AA
881 if (writeback_cache_enabled) {
882 // will be completed after next flush operation completes
883 m_aio_modify_safe_contexts.insert(on_safe);
884 } else {
885 // IO is safely stored on disk
886 assert(m_in_flight_aio_modify > 0);
887 --m_in_flight_aio_modify;
888
889 if (m_on_aio_ready != nullptr) {
890 ldout(cct, 10) << ": resuming paused AIO" << dendl;
891 m_on_aio_ready->complete(0);
892 m_on_aio_ready = nullptr;
893 }
894
895 ldout(cct, 20) << ": completing safe context: " << on_safe << dendl;
896 m_image_ctx.op_work_queue->queue(on_safe, 0);
897 }
7c673cae
FG
898}
899
900template <typename I>
901void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
902 Contexts &on_safe_ctxs, int r) {
903 CephContext *cct = m_image_ctx.cct;
904 ldout(cct, 20) << ": r=" << r << dendl;
905
906 if (r < 0) {
907 lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl;
908 }
909
910 Context *on_aio_ready = nullptr;
911 Context *on_flush = nullptr;
912 {
913 Mutex::Locker locker(m_lock);
914 assert(m_in_flight_aio_flush > 0);
915 assert(m_in_flight_aio_modify >= on_safe_ctxs.size());
916 --m_in_flight_aio_flush;
917 m_in_flight_aio_modify -= on_safe_ctxs.size();
918
919 std::swap(on_aio_ready, m_on_aio_ready);
920 if (m_in_flight_op_events == 0 &&
921 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
922 on_flush = m_flush_ctx;
923 }
924
925 // strip out previously failed on_safe contexts
926 for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
927 if (m_aio_modify_safe_contexts.erase(*it)) {
928 ++it;
929 } else {
930 it = on_safe_ctxs.erase(it);
931 }
932 }
933 }
934
935 if (on_aio_ready != nullptr) {
936 ldout(cct, 10) << ": resuming paused AIO" << dendl;
937 on_aio_ready->complete(0);
938 }
939
940 if (on_flush_safe != nullptr) {
941 on_safe_ctxs.push_back(on_flush_safe);
942 }
943 for (auto ctx : on_safe_ctxs) {
944 ldout(cct, 20) << ": completing safe context: " << ctx << dendl;
945 ctx->complete(r);
946 }
947
948 if (on_flush != nullptr) {
949 ldout(cct, 20) << ": completing flush context: " << on_flush << dendl;
950 on_flush->complete(r);
951 }
952}
953
954template <typename I>
955Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
956 Context *on_ready,
957 Context *on_safe,
958 OpEvent **op_event) {
959 CephContext *cct = m_image_ctx.cct;
31f18b77
FG
960 if (m_shut_down) {
961 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
962 on_ready->complete(0);
963 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
964 return nullptr;
965 }
7c673cae
FG
966
967 assert(m_lock.is_locked());
968 if (m_op_events.count(op_tid) != 0) {
969 lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl;
970
971 // on_ready is already async but on failure invoke on_safe async
972 // as well
973 on_ready->complete(0);
974 m_image_ctx.op_work_queue->queue(on_safe, -EINVAL);
975 return nullptr;
976 }
977
978 ++m_in_flight_op_events;
979 *op_event = &m_op_events[op_tid];
980 (*op_event)->on_start_safe = on_safe;
981
982 Context *on_op_complete = new C_OpOnComplete(this, op_tid);
983 (*op_event)->on_op_complete = on_op_complete;
984 return on_op_complete;
985}
986
987template <typename I>
988void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
989 CephContext *cct = m_image_ctx.cct;
990 ldout(cct, 20) << ": op_tid=" << op_tid << ", "
991 << "r=" << r << dendl;
992
993 OpEvent op_event;
994 bool shutting_down = false;
995 {
996 Mutex::Locker locker(m_lock);
997 auto op_it = m_op_events.find(op_tid);
998 assert(op_it != m_op_events.end());
999
1000 op_event = std::move(op_it->second);
1001 m_op_events.erase(op_it);
1002
31f18b77
FG
1003 if (m_shut_down) {
1004 assert(m_flush_ctx != nullptr);
1005 shutting_down = true;
1006 }
7c673cae
FG
1007 }
1008
1009 assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
1010 if (op_event.on_start_ready != nullptr) {
1011 // blocking op event failed before it became ready
1012 assert(op_event.on_finish_ready == nullptr &&
1013 op_event.on_finish_safe == nullptr);
1014
1015 op_event.on_start_ready->complete(0);
1016 } else {
1017 // event kicked off by OpFinishEvent
1018 assert((op_event.on_finish_ready != nullptr &&
1019 op_event.on_finish_safe != nullptr) || shutting_down);
1020 }
1021
1022 if (op_event.on_op_finish_event != nullptr) {
1023 op_event.on_op_finish_event->complete(r);
1024 }
1025
1026 if (op_event.on_finish_ready != nullptr) {
1027 op_event.on_finish_ready->complete(0);
1028 }
1029
1030 // filter out errors caused by replay of the same op
1031 if (r < 0 && op_event.ignore_error_codes.count(r) != 0) {
1032 r = 0;
1033 }
1034
1035 op_event.on_start_safe->complete(r);
1036 if (op_event.on_finish_safe != nullptr) {
1037 op_event.on_finish_safe->complete(r);
1038 }
1039
1040 // shut down request might have occurred while lock was
1041 // dropped -- handle if pending
1042 Context *on_flush = nullptr;
1043 {
1044 Mutex::Locker locker(m_lock);
1045 assert(m_in_flight_op_events > 0);
1046 --m_in_flight_op_events;
1047 if (m_in_flight_op_events == 0 &&
1048 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
1049 on_flush = m_flush_ctx;
1050 }
1051 }
1052 if (on_flush != nullptr) {
1053 m_image_ctx.op_work_queue->queue(on_flush, 0);
1054 }
1055}
1056
1057template <typename I>
1058io::AioCompletion *
c07f9fc5
FG
1059Replay<I>::create_aio_modify_completion(Context *on_ready,
1060 Context *on_safe,
7c673cae 1061 io::aio_type_t aio_type,
c07f9fc5
FG
1062 bool *flush_required,
1063 std::set<int> &&filters) {
7c673cae
FG
1064 Mutex::Locker locker(m_lock);
1065 CephContext *cct = m_image_ctx.cct;
1066 assert(m_on_aio_ready == nullptr);
1067
31f18b77
FG
1068 if (m_shut_down) {
1069 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1070 on_ready->complete(0);
1071 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1072 return nullptr;
1073 }
1074
7c673cae 1075 ++m_in_flight_aio_modify;
28e407b8
AA
1076
1077 bool writeback_cache_enabled = m_image_ctx.is_writeback_cache_enabled();
1078 if (writeback_cache_enabled) {
1079 m_aio_modify_unsafe_contexts.push_back(on_safe);
1080 }
7c673cae
FG
1081
1082 // FLUSH if we hit the low-water mark -- on_safe contexts are
1083 // completed by flushes-only so that we don't move the journal
1084 // commit position until safely on-disk
1085
28e407b8
AA
1086 *flush_required = (writeback_cache_enabled &&
1087 m_aio_modify_unsafe_contexts.size() ==
7c673cae
FG
1088 IN_FLIGHT_IO_LOW_WATER_MARK);
1089 if (*flush_required) {
1090 ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
1091 << dendl;
1092 }
1093
1094 // READY for more events if:
1095 // * not at high-water mark for IO
1096 // * in-flight ops are at a consistent point (snap create has IO flushed,
1097 // shrink has adjusted clip boundary, etc) -- should have already been
1098 // flagged not-ready
1099 if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
1100 ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay"
1101 << dendl;
1102 assert(m_on_aio_ready == nullptr);
1103 std::swap(m_on_aio_ready, on_ready);
1104 }
1105
1106 // when the modification is ACKed by librbd, we can process the next
1107 // event. when flushed, the completion of the next flush will fire the
1108 // on_safe callback
1109 auto aio_comp = io::AioCompletion::create_and_start<Context>(
28e407b8
AA
1110 new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters),
1111 writeback_cache_enabled),
7c673cae
FG
1112 util::get_image_ctx(&m_image_ctx), aio_type);
1113 return aio_comp;
1114}
1115
1116template <typename I>
1117io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
1118 assert(m_lock.is_locked());
1119
31f18b77
FG
1120 CephContext *cct = m_image_ctx.cct;
1121 if (m_shut_down) {
1122 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1123 if (on_safe != nullptr) {
1124 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1125 }
1126 return nullptr;
1127 }
1128
7c673cae
FG
1129 ++m_in_flight_aio_flush;
1130
1131 // associate all prior write/discard ops to this flush request
1132 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1133 new C_AioFlushComplete(this, on_safe,
1134 std::move(m_aio_modify_unsafe_contexts)),
1135 util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH);
1136 m_aio_modify_unsafe_contexts.clear();
1137 return aio_comp;
1138}
1139
1140} // namespace journal
1141} // namespace librbd
1142
1143template class librbd::journal::Replay<librbd::ImageCtx>;