]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/journal/Replay.cc
update sources to v12.1.2
[ceph.git] / ceph / src / librbd / journal / Replay.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/journal/Replay.h"
5#include "common/dout.h"
6#include "common/errno.h"
7#include "common/WorkQueue.h"
31f18b77 8#include "librbd/ExclusiveLock.h"
7c673cae
FG
9#include "librbd/ImageCtx.h"
10#include "librbd/ImageState.h"
11#include "librbd/internal.h"
12#include "librbd/Operations.h"
13#include "librbd/Utils.h"
14#include "librbd/io/AioCompletion.h"
15#include "librbd/io/ImageRequest.h"
16
17#define dout_subsys ceph_subsys_rbd
18#undef dout_prefix
19#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " "
20
21namespace librbd {
22namespace journal {
23
24namespace {
25
26static const uint64_t IN_FLIGHT_IO_LOW_WATER_MARK(32);
27static const uint64_t IN_FLIGHT_IO_HIGH_WATER_MARK(64);
28
29static NoOpProgressContext no_op_progress_callback;
30
31template <typename I, typename E>
32struct ExecuteOp : public Context {
33 I &image_ctx;
34 E event;
35 Context *on_op_complete;
36
37 ExecuteOp(I &image_ctx, const E &event, Context *on_op_complete)
38 : image_ctx(image_ctx), event(event), on_op_complete(on_op_complete) {
39 }
40
41 void execute(const journal::SnapCreateEvent &_) {
42 image_ctx.operations->execute_snap_create(event.snap_namespace,
43 event.snap_name,
44 on_op_complete,
45 event.op_tid, false);
46 }
47
48 void execute(const journal::SnapRemoveEvent &_) {
49 image_ctx.operations->execute_snap_remove(event.snap_namespace,
50 event.snap_name,
51 on_op_complete);
52 }
53
54 void execute(const journal::SnapRenameEvent &_) {
55 image_ctx.operations->execute_snap_rename(event.snap_id,
56 event.dst_snap_name,
57 on_op_complete);
58 }
59
60 void execute(const journal::SnapProtectEvent &_) {
61 image_ctx.operations->execute_snap_protect(event.snap_namespace,
62 event.snap_name,
63 on_op_complete);
64 }
65
66 void execute(const journal::SnapUnprotectEvent &_) {
67 image_ctx.operations->execute_snap_unprotect(event.snap_namespace,
68 event.snap_name,
69 on_op_complete);
70 }
71
72 void execute(const journal::SnapRollbackEvent &_) {
73 image_ctx.operations->execute_snap_rollback(event.snap_namespace,
74 event.snap_name,
75 no_op_progress_callback,
76 on_op_complete);
77 }
78
79 void execute(const journal::RenameEvent &_) {
80 image_ctx.operations->execute_rename(event.image_name,
81 on_op_complete);
82 }
83
84 void execute(const journal::ResizeEvent &_) {
85 image_ctx.operations->execute_resize(event.size, true, no_op_progress_callback,
86 on_op_complete, event.op_tid);
87 }
88
89 void execute(const journal::FlattenEvent &_) {
90 image_ctx.operations->execute_flatten(no_op_progress_callback,
91 on_op_complete);
92 }
93
94 void execute(const journal::SnapLimitEvent &_) {
95 image_ctx.operations->execute_snap_set_limit(event.limit, on_op_complete);
96 }
97
98 void execute(const journal::UpdateFeaturesEvent &_) {
99 image_ctx.operations->execute_update_features(event.features, event.enabled,
100 on_op_complete, event.op_tid);
101 }
102
103 void execute(const journal::MetadataSetEvent &_) {
104 image_ctx.operations->execute_metadata_set(event.key, event.value,
105 on_op_complete);
106 }
107
108 void execute(const journal::MetadataRemoveEvent &_) {
109 image_ctx.operations->execute_metadata_remove(event.key, on_op_complete);
110 }
111
112 void finish(int r) override {
113 CephContext *cct = image_ctx.cct;
114 if (r < 0) {
115 lderr(cct) << ": ExecuteOp::" << __func__ << ": r=" << r << dendl;
116 on_op_complete->complete(r);
117 return;
118 }
119
120 ldout(cct, 20) << ": ExecuteOp::" << __func__ << dendl;
121 RWLock::RLocker owner_locker(image_ctx.owner_lock);
31f18b77
FG
122
123 if (image_ctx.exclusive_lock == nullptr ||
124 !image_ctx.exclusive_lock->accept_ops()) {
125 ldout(cct, 5) << ": lost exclusive lock -- skipping op" << dendl;
126 on_op_complete->complete(-ECANCELED);
127 return;
128 }
129
7c673cae
FG
130 execute(event);
131 }
132};
133
134template <typename I>
135struct C_RefreshIfRequired : public Context {
136 I &image_ctx;
137 Context *on_finish;
138
139 C_RefreshIfRequired(I &image_ctx, Context *on_finish)
140 : image_ctx(image_ctx), on_finish(on_finish) {
141 }
142 ~C_RefreshIfRequired() override {
143 delete on_finish;
144 }
145
146 void finish(int r) override {
147 CephContext *cct = image_ctx.cct;
148 Context *ctx = on_finish;
149 on_finish = nullptr;
150
151 if (r < 0) {
152 lderr(cct) << ": C_RefreshIfRequired::" << __func__ << ": r=" << r << dendl;
153 image_ctx.op_work_queue->queue(ctx, r);
154 return;
155 }
156
157 if (image_ctx.state->is_refresh_required()) {
158 ldout(cct, 20) << ": C_RefreshIfRequired::" << __func__ << ": "
159 << "refresh required" << dendl;
160 image_ctx.state->refresh(ctx);
161 return;
162 }
163
164 image_ctx.op_work_queue->queue(ctx, 0);
165 }
166};
167
168} // anonymous namespace
169
170#undef dout_prefix
171#define dout_prefix *_dout << "librbd::journal::Replay: " << this << " " \
172 << __func__
173
174template <typename I>
175Replay<I>::Replay(I &image_ctx)
176 : m_image_ctx(image_ctx), m_lock("Replay<I>::m_lock") {
177}
178
179template <typename I>
180Replay<I>::~Replay() {
181 assert(m_in_flight_aio_flush == 0);
182 assert(m_in_flight_aio_modify == 0);
183 assert(m_aio_modify_unsafe_contexts.empty());
184 assert(m_aio_modify_safe_contexts.empty());
185 assert(m_op_events.empty());
186 assert(m_in_flight_op_events == 0);
187}
188
189template <typename I>
190int Replay<I>::decode(bufferlist::iterator *it, EventEntry *event_entry) {
191 try {
192 ::decode(*event_entry, *it);
193 } catch (const buffer::error &err) {
194 return -EBADMSG;
195 }
196 return 0;
197}
198
199template <typename I>
200void Replay<I>::process(const EventEntry &event_entry,
201 Context *on_ready, Context *on_safe) {
202 CephContext *cct = m_image_ctx.cct;
203 ldout(cct, 20) << ": on_ready=" << on_ready << ", on_safe=" << on_safe
204 << dendl;
205
206 on_ready = util::create_async_context_callback(m_image_ctx, on_ready);
207
208 RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
31f18b77
FG
209 if (m_image_ctx.exclusive_lock == nullptr ||
210 !m_image_ctx.exclusive_lock->accept_ops()) {
211 ldout(cct, 5) << ": lost exclusive lock -- skipping event" << dendl;
212 m_image_ctx.op_work_queue->queue(on_safe, -ECANCELED);
213 on_ready->complete(0);
214 return;
215 }
216
7c673cae
FG
217 boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
218 event_entry.event);
219}
220
221template <typename I>
222void Replay<I>::shut_down(bool cancel_ops, Context *on_finish) {
223 CephContext *cct = m_image_ctx.cct;
224 ldout(cct, 20) << dendl;
225
226 io::AioCompletion *flush_comp = nullptr;
227 on_finish = util::create_async_context_callback(
228 m_image_ctx, on_finish);
229
230 {
231 Mutex::Locker locker(m_lock);
232
233 // safely commit any remaining AIO modify operations
234 if ((m_in_flight_aio_flush + m_in_flight_aio_modify) != 0) {
235 flush_comp = create_aio_flush_completion(nullptr);
31f18b77 236 assert(flush_comp != nullptr);
7c673cae
FG
237 }
238
239 for (auto &op_event_pair : m_op_events) {
240 OpEvent &op_event = op_event_pair.second;
241 if (cancel_ops) {
242 // cancel ops that are waiting to start (waiting for
243 // OpFinishEvent or waiting for ready)
244 if (op_event.on_start_ready == nullptr &&
245 op_event.on_op_finish_event != nullptr) {
246 Context *on_op_finish_event = nullptr;
247 std::swap(on_op_finish_event, op_event.on_op_finish_event);
248 m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
249 }
250 } else if (op_event.on_op_finish_event != nullptr) {
251 // start ops waiting for OpFinishEvent
252 Context *on_op_finish_event = nullptr;
253 std::swap(on_op_finish_event, op_event.on_op_finish_event);
254 m_image_ctx.op_work_queue->queue(on_op_finish_event, 0);
255 } else if (op_event.on_start_ready != nullptr) {
256 // waiting for op ready
257 op_event_pair.second.finish_on_ready = true;
258 }
259 }
260
31f18b77
FG
261 assert(!m_shut_down);
262 m_shut_down = true;
263
7c673cae
FG
264 assert(m_flush_ctx == nullptr);
265 if (m_in_flight_op_events > 0 || flush_comp != nullptr) {
266 std::swap(m_flush_ctx, on_finish);
267 }
268 }
269
270 // execute the following outside of lock scope
271 if (flush_comp != nullptr) {
272 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
31f18b77 273 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
7c673cae
FG
274 }
275 if (on_finish != nullptr) {
276 on_finish->complete(0);
277 }
278}
279
280template <typename I>
281void Replay<I>::flush(Context *on_finish) {
282 io::AioCompletion *aio_comp;
283 {
284 Mutex::Locker locker(m_lock);
285 aio_comp = create_aio_flush_completion(
286 util::create_async_context_callback(m_image_ctx, on_finish));
31f18b77
FG
287 if (aio_comp == nullptr) {
288 return;
289 }
7c673cae
FG
290 }
291
292 RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
31f18b77 293 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
7c673cae
FG
294}
295
296template <typename I>
297void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
298 CephContext *cct = m_image_ctx.cct;
299 ldout(cct, 20) << ": op_tid=" << op_tid << dendl;
300
301 Mutex::Locker locker(m_lock);
302 auto op_it = m_op_events.find(op_tid);
303 assert(op_it != m_op_events.end());
304
305 OpEvent &op_event = op_it->second;
306 assert(op_event.op_in_progress &&
307 op_event.on_op_finish_event == nullptr &&
308 op_event.on_finish_ready == nullptr &&
309 op_event.on_finish_safe == nullptr);
310
311 // resume processing replay events
312 Context *on_start_ready = nullptr;
313 std::swap(on_start_ready, op_event.on_start_ready);
314 on_start_ready->complete(0);
315
316 // cancel has been requested -- send error to paused state machine
317 if (!op_event.finish_on_ready && m_flush_ctx != nullptr) {
318 m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
319 return;
320 }
321
322 // resume the op state machine once the associated OpFinishEvent
323 // is processed
324 op_event.on_op_finish_event = new FunctionContext(
325 [on_resume](int r) {
326 on_resume->complete(r);
327 });
328
329 // shut down request -- don't expect OpFinishEvent
330 if (op_event.finish_on_ready) {
331 m_image_ctx.op_work_queue->queue(on_resume, 0);
332 }
333}
334
335template <typename I>
336void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
337 Context *on_ready, Context *on_safe) {
338 CephContext *cct = m_image_ctx.cct;
339 ldout(cct, 20) << ": AIO discard event" << dendl;
340
341 bool flush_required;
342 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
343 io::AIO_TYPE_DISCARD,
c07f9fc5
FG
344 &flush_required,
345 {});
31f18b77
FG
346 if (aio_comp == nullptr) {
347 return;
348 }
349
7c673cae 350 io::ImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
31f18b77
FG
351 event.length, event.skip_partial_discard,
352 {});
7c673cae
FG
353 if (flush_required) {
354 m_lock.Lock();
355 auto flush_comp = create_aio_flush_completion(nullptr);
356 m_lock.Unlock();
357
31f18b77
FG
358 if (flush_comp != nullptr) {
359 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
360 }
7c673cae
FG
361 }
362}
363
364template <typename I>
365void Replay<I>::handle_event(const journal::AioWriteEvent &event,
366 Context *on_ready, Context *on_safe) {
367 CephContext *cct = m_image_ctx.cct;
368 ldout(cct, 20) << ": AIO write event" << dendl;
369
370 bufferlist data = event.data;
371 bool flush_required;
372 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
373 io::AIO_TYPE_WRITE,
c07f9fc5
FG
374 &flush_required,
375 {});
31f18b77
FG
376 if (aio_comp == nullptr) {
377 return;
378 }
379
7c673cae
FG
380 io::ImageRequest<I>::aio_write(&m_image_ctx, aio_comp,
381 {{event.offset, event.length}},
31f18b77 382 std::move(data), 0, {});
7c673cae
FG
383 if (flush_required) {
384 m_lock.Lock();
385 auto flush_comp = create_aio_flush_completion(nullptr);
386 m_lock.Unlock();
387
31f18b77
FG
388 if (flush_comp != nullptr) {
389 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
390 }
7c673cae
FG
391 }
392}
393
394template <typename I>
395void Replay<I>::handle_event(const journal::AioFlushEvent &event,
396 Context *on_ready, Context *on_safe) {
397 CephContext *cct = m_image_ctx.cct;
398 ldout(cct, 20) << ": AIO flush event" << dendl;
399
400 io::AioCompletion *aio_comp;
401 {
402 Mutex::Locker locker(m_lock);
403 aio_comp = create_aio_flush_completion(on_safe);
404 }
7c673cae 405
31f18b77
FG
406 if (aio_comp != nullptr) {
407 io::ImageRequest<I>::aio_flush(&m_image_ctx, aio_comp, {});
408 }
7c673cae
FG
409 on_ready->complete(0);
410}
411
412template <typename I>
413void Replay<I>::handle_event(const journal::AioWriteSameEvent &event,
414 Context *on_ready, Context *on_safe) {
415 CephContext *cct = m_image_ctx.cct;
416 ldout(cct, 20) << ": AIO writesame event" << dendl;
417
418 bufferlist data = event.data;
419 bool flush_required;
420 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
421 io::AIO_TYPE_WRITESAME,
c07f9fc5
FG
422 &flush_required,
423 {});
31f18b77
FG
424 if (aio_comp == nullptr) {
425 return;
426 }
427
7c673cae 428 io::ImageRequest<I>::aio_writesame(&m_image_ctx, aio_comp, event.offset,
31f18b77 429 event.length, std::move(data), 0, {});
7c673cae
FG
430 if (flush_required) {
431 m_lock.Lock();
432 auto flush_comp = create_aio_flush_completion(nullptr);
433 m_lock.Unlock();
434
31f18b77
FG
435 if (flush_comp != nullptr) {
436 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
437 }
7c673cae
FG
438 }
439}
440
c07f9fc5
FG
441 template <typename I>
442 void Replay<I>::handle_event(const journal::AioCompareAndWriteEvent &event,
443 Context *on_ready, Context *on_safe) {
444 CephContext *cct = m_image_ctx.cct;
445 ldout(cct, 20) << ": AIO CompareAndWrite event" << dendl;
446
447 bufferlist cmp_data = event.cmp_data;
448 bufferlist write_data = event.write_data;
449 bool flush_required;
450 auto aio_comp = create_aio_modify_completion(on_ready, on_safe,
451 io::AIO_TYPE_COMPARE_AND_WRITE,
452 &flush_required,
453 {-EILSEQ});
454 io::ImageRequest<I>::aio_compare_and_write(&m_image_ctx, aio_comp,
455 {{event.offset, event.length}},
456 std::move(cmp_data),
457 std::move(write_data),
458 nullptr, 0, {});
459 if (flush_required) {
460 m_lock.Lock();
461 auto flush_comp = create_aio_flush_completion(nullptr);
462 m_lock.Unlock();
463
464 io::ImageRequest<I>::aio_flush(&m_image_ctx, flush_comp, {});
465 }
466}
467
7c673cae
FG
468template <typename I>
469void Replay<I>::handle_event(const journal::OpFinishEvent &event,
470 Context *on_ready, Context *on_safe) {
471 CephContext *cct = m_image_ctx.cct;
472 ldout(cct, 20) << ": Op finish event: "
473 << "op_tid=" << event.op_tid << dendl;
474
475 bool op_in_progress;
476 bool filter_ret_val;
477 Context *on_op_complete = nullptr;
478 Context *on_op_finish_event = nullptr;
479 {
480 Mutex::Locker locker(m_lock);
481 auto op_it = m_op_events.find(event.op_tid);
482 if (op_it == m_op_events.end()) {
483 ldout(cct, 10) << ": unable to locate associated op: assuming previously "
484 << "committed." << dendl;
485 on_ready->complete(0);
486 m_image_ctx.op_work_queue->queue(on_safe, 0);
487 return;
488 }
489
490 OpEvent &op_event = op_it->second;
491 assert(op_event.on_finish_safe == nullptr);
492 op_event.on_finish_ready = on_ready;
493 op_event.on_finish_safe = on_safe;
494 op_in_progress = op_event.op_in_progress;
495 std::swap(on_op_complete, op_event.on_op_complete);
496 std::swap(on_op_finish_event, op_event.on_op_finish_event);
497
498 // special errors which indicate op never started but was recorded
499 // as failed in the journal
500 filter_ret_val = (op_event.op_finish_error_codes.count(event.r) != 0);
501 }
502
503 if (event.r < 0) {
504 if (op_in_progress) {
505 // bubble the error up to the in-progress op to cancel it
506 on_op_finish_event->complete(event.r);
507 } else {
508 // op hasn't been started -- bubble the error up since
509 // our image is now potentially in an inconsistent state
510 // since simple errors should have been caught before
511 // creating the op event
512 delete on_op_complete;
513 delete on_op_finish_event;
514 handle_op_complete(event.op_tid, filter_ret_val ? 0 : event.r);
515 }
516 return;
517 }
518
519 // journal recorded success -- apply the op now
520 on_op_finish_event->complete(0);
521}
522
523template <typename I>
524void Replay<I>::handle_event(const journal::SnapCreateEvent &event,
525 Context *on_ready, Context *on_safe) {
526 CephContext *cct = m_image_ctx.cct;
527 ldout(cct, 20) << ": Snap create event" << dendl;
528
529 Mutex::Locker locker(m_lock);
530 OpEvent *op_event;
531 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
532 on_safe, &op_event);
533 if (on_op_complete == nullptr) {
534 return;
535 }
536
537 // ignore errors caused due to replay
538 op_event->ignore_error_codes = {-EEXIST};
539
540 // avoid lock cycles
541 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
542 m_image_ctx, new ExecuteOp<I, journal::SnapCreateEvent>(m_image_ctx, event,
543 on_op_complete)),
544 0);
545
546 // do not process more events until the state machine is ready
547 // since it will affect IO
548 op_event->op_in_progress = true;
549 op_event->on_start_ready = on_ready;
550}
551
552template <typename I>
553void Replay<I>::handle_event(const journal::SnapRemoveEvent &event,
554 Context *on_ready, Context *on_safe) {
555 CephContext *cct = m_image_ctx.cct;
556 ldout(cct, 20) << ": Snap remove event" << dendl;
557
558 Mutex::Locker locker(m_lock);
559 OpEvent *op_event;
560 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
561 on_safe, &op_event);
562 if (on_op_complete == nullptr) {
563 return;
564 }
565
566 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
567 m_image_ctx, new ExecuteOp<I, journal::SnapRemoveEvent>(m_image_ctx, event,
568 on_op_complete));
569
570 // ignore errors caused due to replay
571 op_event->ignore_error_codes = {-ENOENT};
572
573 on_ready->complete(0);
574}
575
576template <typename I>
577void Replay<I>::handle_event(const journal::SnapRenameEvent &event,
578 Context *on_ready, Context *on_safe) {
579 CephContext *cct = m_image_ctx.cct;
580 ldout(cct, 20) << ": Snap rename event" << dendl;
581
582 Mutex::Locker locker(m_lock);
583 OpEvent *op_event;
584 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
585 on_safe, &op_event);
586 if (on_op_complete == nullptr) {
587 return;
588 }
589
590 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
591 m_image_ctx, new ExecuteOp<I, journal::SnapRenameEvent>(m_image_ctx, event,
592 on_op_complete));
593
594 // ignore errors caused due to replay
595 op_event->ignore_error_codes = {-EEXIST};
596
597 on_ready->complete(0);
598}
599
600template <typename I>
601void Replay<I>::handle_event(const journal::SnapProtectEvent &event,
602 Context *on_ready, Context *on_safe) {
603 CephContext *cct = m_image_ctx.cct;
604 ldout(cct, 20) << ": Snap protect event" << dendl;
605
606 Mutex::Locker locker(m_lock);
607 OpEvent *op_event;
608 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
609 on_safe, &op_event);
610 if (on_op_complete == nullptr) {
611 return;
612 }
613
614 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
615 m_image_ctx, new ExecuteOp<I, journal::SnapProtectEvent>(m_image_ctx, event,
616 on_op_complete));
617
618 // ignore errors caused due to replay
619 op_event->ignore_error_codes = {-EBUSY};
620
621 on_ready->complete(0);
622}
623
624template <typename I>
625void Replay<I>::handle_event(const journal::SnapUnprotectEvent &event,
626 Context *on_ready, Context *on_safe) {
627 CephContext *cct = m_image_ctx.cct;
628 ldout(cct, 20) << ": Snap unprotect event" << dendl;
629
630 Mutex::Locker locker(m_lock);
631 OpEvent *op_event;
632 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
633 on_safe, &op_event);
634 if (on_op_complete == nullptr) {
635 return;
636 }
637
638 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
639 m_image_ctx, new ExecuteOp<I, journal::SnapUnprotectEvent>(m_image_ctx,
640 event,
641 on_op_complete));
642
643 // ignore errors recorded in the journal
644 op_event->op_finish_error_codes = {-EBUSY};
645
646 // ignore errors caused due to replay
647 op_event->ignore_error_codes = {-EINVAL};
648
649 on_ready->complete(0);
650}
651
652template <typename I>
653void Replay<I>::handle_event(const journal::SnapRollbackEvent &event,
654 Context *on_ready, Context *on_safe) {
655 CephContext *cct = m_image_ctx.cct;
656 ldout(cct, 20) << ": Snap rollback start event" << dendl;
657
658 Mutex::Locker locker(m_lock);
659 OpEvent *op_event;
660 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
661 on_safe, &op_event);
662 if (on_op_complete == nullptr) {
663 return;
664 }
665
666 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
667 m_image_ctx, new ExecuteOp<I, journal::SnapRollbackEvent>(m_image_ctx,
668 event,
669 on_op_complete));
670
671 on_ready->complete(0);
672}
673
674template <typename I>
675void Replay<I>::handle_event(const journal::RenameEvent &event,
676 Context *on_ready, Context *on_safe) {
677 CephContext *cct = m_image_ctx.cct;
678 ldout(cct, 20) << ": Rename event" << dendl;
679
680 Mutex::Locker locker(m_lock);
681 OpEvent *op_event;
682 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
683 on_safe, &op_event);
684 if (on_op_complete == nullptr) {
685 return;
686 }
687
688 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
689 m_image_ctx, new ExecuteOp<I, journal::RenameEvent>(m_image_ctx, event,
690 on_op_complete));
691
692 // ignore errors caused due to replay
693 op_event->ignore_error_codes = {-EEXIST};
694
695 on_ready->complete(0);
696}
697
698template <typename I>
699void Replay<I>::handle_event(const journal::ResizeEvent &event,
700 Context *on_ready, Context *on_safe) {
701 CephContext *cct = m_image_ctx.cct;
702 ldout(cct, 20) << ": Resize start event" << dendl;
703
704 Mutex::Locker locker(m_lock);
705 OpEvent *op_event;
706 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
707 on_safe, &op_event);
708 if (on_op_complete == nullptr) {
709 return;
710 }
711
712 // avoid lock cycles
713 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
714 m_image_ctx, new ExecuteOp<I, journal::ResizeEvent>(m_image_ctx, event,
715 on_op_complete)), 0);
716
717 // do not process more events until the state machine is ready
718 // since it will affect IO
719 op_event->op_in_progress = true;
720 op_event->on_start_ready = on_ready;
721}
722
723template <typename I>
724void Replay<I>::handle_event(const journal::FlattenEvent &event,
725 Context *on_ready, Context *on_safe) {
726 CephContext *cct = m_image_ctx.cct;
727 ldout(cct, 20) << ": Flatten start event" << dendl;
728
729 Mutex::Locker locker(m_lock);
730 OpEvent *op_event;
731 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
732 on_safe, &op_event);
733 if (on_op_complete == nullptr) {
734 return;
735 }
736
737 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
738 m_image_ctx, new ExecuteOp<I, journal::FlattenEvent>(m_image_ctx, event,
739 on_op_complete));
740
741 // ignore errors caused due to replay
742 op_event->ignore_error_codes = {-EINVAL};
743
744 on_ready->complete(0);
745}
746
747template <typename I>
748void Replay<I>::handle_event(const journal::DemotePromoteEvent &event,
749 Context *on_ready, Context *on_safe) {
750 CephContext *cct = m_image_ctx.cct;
751 ldout(cct, 20) << ": Demote/Promote event" << dendl;
752 on_ready->complete(0);
753 on_safe->complete(0);
754}
755
756template <typename I>
757void Replay<I>::handle_event(const journal::SnapLimitEvent &event,
758 Context *on_ready, Context *on_safe) {
759 CephContext *cct = m_image_ctx.cct;
760 ldout(cct, 20) << ": Snap limit event" << dendl;
761
762 Mutex::Locker locker(m_lock);
763 OpEvent *op_event;
764 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
765 on_safe, &op_event);
766 if (on_op_complete == nullptr) {
767 return;
768 }
769
770 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
771 m_image_ctx, new ExecuteOp<I, journal::SnapLimitEvent>(m_image_ctx,
772 event,
773 on_op_complete));
774
775 on_ready->complete(0);
776}
777
778template <typename I>
779void Replay<I>::handle_event(const journal::UpdateFeaturesEvent &event,
780 Context *on_ready, Context *on_safe) {
781 CephContext *cct = m_image_ctx.cct;
782 ldout(cct, 20) << ": Update features event" << dendl;
783
784 Mutex::Locker locker(m_lock);
785 OpEvent *op_event;
786 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
787 on_safe, &op_event);
788 if (on_op_complete == nullptr) {
789 return;
790 }
791
792 // avoid lock cycles
793 m_image_ctx.op_work_queue->queue(new C_RefreshIfRequired<I>(
794 m_image_ctx, new ExecuteOp<I, journal::UpdateFeaturesEvent>(
795 m_image_ctx, event, on_op_complete)), 0);
796
797 // do not process more events until the state machine is ready
798 // since it will affect IO
799 op_event->op_in_progress = true;
800 op_event->on_start_ready = on_ready;
801}
802
803template <typename I>
804void Replay<I>::handle_event(const journal::MetadataSetEvent &event,
805 Context *on_ready, Context *on_safe) {
806 CephContext *cct = m_image_ctx.cct;
807 ldout(cct, 20) << ": Metadata set event" << dendl;
808
809 Mutex::Locker locker(m_lock);
810 OpEvent *op_event;
811 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
812 on_safe, &op_event);
813 if (on_op_complete == nullptr) {
814 return;
815 }
816
817 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
818 m_image_ctx, new ExecuteOp<I, journal::MetadataSetEvent>(
819 m_image_ctx, event, on_op_complete));
820
821 on_ready->complete(0);
822}
823
824template <typename I>
825void Replay<I>::handle_event(const journal::MetadataRemoveEvent &event,
826 Context *on_ready, Context *on_safe) {
827 CephContext *cct = m_image_ctx.cct;
828 ldout(cct, 20) << ": Metadata remove event" << dendl;
829
830 Mutex::Locker locker(m_lock);
831 OpEvent *op_event;
832 Context *on_op_complete = create_op_context_callback(event.op_tid, on_ready,
833 on_safe, &op_event);
834 if (on_op_complete == nullptr) {
835 return;
836 }
837
838 op_event->on_op_finish_event = new C_RefreshIfRequired<I>(
839 m_image_ctx, new ExecuteOp<I, journal::MetadataRemoveEvent>(
840 m_image_ctx, event, on_op_complete));
841
842 // ignore errors caused due to replay
843 op_event->ignore_error_codes = {-ENOENT};
844
845 on_ready->complete(0);
846}
847
848template <typename I>
849void Replay<I>::handle_event(const journal::UnknownEvent &event,
850 Context *on_ready, Context *on_safe) {
851 CephContext *cct = m_image_ctx.cct;
852 ldout(cct, 20) << ": unknown event" << dendl;
853 on_ready->complete(0);
854 on_safe->complete(0);
855}
856
857template <typename I>
858void Replay<I>::handle_aio_modify_complete(Context *on_ready, Context *on_safe,
c07f9fc5 859 int r, std::set<int> &filters) {
7c673cae
FG
860 Mutex::Locker locker(m_lock);
861 CephContext *cct = m_image_ctx.cct;
862 ldout(cct, 20) << ": on_ready=" << on_ready << ", "
863 << "on_safe=" << on_safe << ", r=" << r << dendl;
864
865 if (on_ready != nullptr) {
866 on_ready->complete(0);
867 }
c07f9fc5
FG
868
869 if (filters.find(r) != filters.end())
870 r = 0;
871
7c673cae
FG
872 if (r < 0) {
873 lderr(cct) << ": AIO modify op failed: " << cpp_strerror(r) << dendl;
874 on_safe->complete(r);
875 return;
876 }
877
878 // will be completed after next flush operation completes
879 m_aio_modify_safe_contexts.insert(on_safe);
880}
881
882template <typename I>
883void Replay<I>::handle_aio_flush_complete(Context *on_flush_safe,
884 Contexts &on_safe_ctxs, int r) {
885 CephContext *cct = m_image_ctx.cct;
886 ldout(cct, 20) << ": r=" << r << dendl;
887
888 if (r < 0) {
889 lderr(cct) << ": AIO flush failed: " << cpp_strerror(r) << dendl;
890 }
891
892 Context *on_aio_ready = nullptr;
893 Context *on_flush = nullptr;
894 {
895 Mutex::Locker locker(m_lock);
896 assert(m_in_flight_aio_flush > 0);
897 assert(m_in_flight_aio_modify >= on_safe_ctxs.size());
898 --m_in_flight_aio_flush;
899 m_in_flight_aio_modify -= on_safe_ctxs.size();
900
901 std::swap(on_aio_ready, m_on_aio_ready);
902 if (m_in_flight_op_events == 0 &&
903 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
904 on_flush = m_flush_ctx;
905 }
906
907 // strip out previously failed on_safe contexts
908 for (auto it = on_safe_ctxs.begin(); it != on_safe_ctxs.end(); ) {
909 if (m_aio_modify_safe_contexts.erase(*it)) {
910 ++it;
911 } else {
912 it = on_safe_ctxs.erase(it);
913 }
914 }
915 }
916
917 if (on_aio_ready != nullptr) {
918 ldout(cct, 10) << ": resuming paused AIO" << dendl;
919 on_aio_ready->complete(0);
920 }
921
922 if (on_flush_safe != nullptr) {
923 on_safe_ctxs.push_back(on_flush_safe);
924 }
925 for (auto ctx : on_safe_ctxs) {
926 ldout(cct, 20) << ": completing safe context: " << ctx << dendl;
927 ctx->complete(r);
928 }
929
930 if (on_flush != nullptr) {
931 ldout(cct, 20) << ": completing flush context: " << on_flush << dendl;
932 on_flush->complete(r);
933 }
934}
935
936template <typename I>
937Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
938 Context *on_ready,
939 Context *on_safe,
940 OpEvent **op_event) {
941 CephContext *cct = m_image_ctx.cct;
31f18b77
FG
942 if (m_shut_down) {
943 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
944 on_ready->complete(0);
945 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
946 return nullptr;
947 }
7c673cae
FG
948
949 assert(m_lock.is_locked());
950 if (m_op_events.count(op_tid) != 0) {
951 lderr(cct) << ": duplicate op tid detected: " << op_tid << dendl;
952
953 // on_ready is already async but on failure invoke on_safe async
954 // as well
955 on_ready->complete(0);
956 m_image_ctx.op_work_queue->queue(on_safe, -EINVAL);
957 return nullptr;
958 }
959
960 ++m_in_flight_op_events;
961 *op_event = &m_op_events[op_tid];
962 (*op_event)->on_start_safe = on_safe;
963
964 Context *on_op_complete = new C_OpOnComplete(this, op_tid);
965 (*op_event)->on_op_complete = on_op_complete;
966 return on_op_complete;
967}
968
969template <typename I>
970void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
971 CephContext *cct = m_image_ctx.cct;
972 ldout(cct, 20) << ": op_tid=" << op_tid << ", "
973 << "r=" << r << dendl;
974
975 OpEvent op_event;
976 bool shutting_down = false;
977 {
978 Mutex::Locker locker(m_lock);
979 auto op_it = m_op_events.find(op_tid);
980 assert(op_it != m_op_events.end());
981
982 op_event = std::move(op_it->second);
983 m_op_events.erase(op_it);
984
31f18b77
FG
985 if (m_shut_down) {
986 assert(m_flush_ctx != nullptr);
987 shutting_down = true;
988 }
7c673cae
FG
989 }
990
991 assert(op_event.on_start_ready == nullptr || (r < 0 && r != -ERESTART));
992 if (op_event.on_start_ready != nullptr) {
993 // blocking op event failed before it became ready
994 assert(op_event.on_finish_ready == nullptr &&
995 op_event.on_finish_safe == nullptr);
996
997 op_event.on_start_ready->complete(0);
998 } else {
999 // event kicked off by OpFinishEvent
1000 assert((op_event.on_finish_ready != nullptr &&
1001 op_event.on_finish_safe != nullptr) || shutting_down);
1002 }
1003
1004 if (op_event.on_op_finish_event != nullptr) {
1005 op_event.on_op_finish_event->complete(r);
1006 }
1007
1008 if (op_event.on_finish_ready != nullptr) {
1009 op_event.on_finish_ready->complete(0);
1010 }
1011
1012 // filter out errors caused by replay of the same op
1013 if (r < 0 && op_event.ignore_error_codes.count(r) != 0) {
1014 r = 0;
1015 }
1016
1017 op_event.on_start_safe->complete(r);
1018 if (op_event.on_finish_safe != nullptr) {
1019 op_event.on_finish_safe->complete(r);
1020 }
1021
1022 // shut down request might have occurred while lock was
1023 // dropped -- handle if pending
1024 Context *on_flush = nullptr;
1025 {
1026 Mutex::Locker locker(m_lock);
1027 assert(m_in_flight_op_events > 0);
1028 --m_in_flight_op_events;
1029 if (m_in_flight_op_events == 0 &&
1030 (m_in_flight_aio_flush + m_in_flight_aio_modify) == 0) {
1031 on_flush = m_flush_ctx;
1032 }
1033 }
1034 if (on_flush != nullptr) {
1035 m_image_ctx.op_work_queue->queue(on_flush, 0);
1036 }
1037}
1038
1039template <typename I>
1040io::AioCompletion *
c07f9fc5
FG
1041Replay<I>::create_aio_modify_completion(Context *on_ready,
1042 Context *on_safe,
7c673cae 1043 io::aio_type_t aio_type,
c07f9fc5
FG
1044 bool *flush_required,
1045 std::set<int> &&filters) {
7c673cae
FG
1046 Mutex::Locker locker(m_lock);
1047 CephContext *cct = m_image_ctx.cct;
1048 assert(m_on_aio_ready == nullptr);
1049
31f18b77
FG
1050 if (m_shut_down) {
1051 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1052 on_ready->complete(0);
1053 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1054 return nullptr;
1055 }
1056
7c673cae
FG
1057 ++m_in_flight_aio_modify;
1058 m_aio_modify_unsafe_contexts.push_back(on_safe);
1059
1060 // FLUSH if we hit the low-water mark -- on_safe contexts are
1061 // completed by flushes-only so that we don't move the journal
1062 // commit position until safely on-disk
1063
1064 *flush_required = (m_aio_modify_unsafe_contexts.size() ==
1065 IN_FLIGHT_IO_LOW_WATER_MARK);
1066 if (*flush_required) {
1067 ldout(cct, 10) << ": hit AIO replay low-water mark: scheduling flush"
1068 << dendl;
1069 }
1070
1071 // READY for more events if:
1072 // * not at high-water mark for IO
1073 // * in-flight ops are at a consistent point (snap create has IO flushed,
1074 // shrink has adjusted clip boundary, etc) -- should have already been
1075 // flagged not-ready
1076 if (m_in_flight_aio_modify == IN_FLIGHT_IO_HIGH_WATER_MARK) {
1077 ldout(cct, 10) << ": hit AIO replay high-water mark: pausing replay"
1078 << dendl;
1079 assert(m_on_aio_ready == nullptr);
1080 std::swap(m_on_aio_ready, on_ready);
1081 }
1082
1083 // when the modification is ACKed by librbd, we can process the next
1084 // event. when flushed, the completion of the next flush will fire the
1085 // on_safe callback
1086 auto aio_comp = io::AioCompletion::create_and_start<Context>(
c07f9fc5 1087 new C_AioModifyComplete(this, on_ready, on_safe, std::move(filters)),
7c673cae
FG
1088 util::get_image_ctx(&m_image_ctx), aio_type);
1089 return aio_comp;
1090}
1091
1092template <typename I>
1093io::AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_safe) {
1094 assert(m_lock.is_locked());
1095
31f18b77
FG
1096 CephContext *cct = m_image_ctx.cct;
1097 if (m_shut_down) {
1098 ldout(cct, 5) << ": ignoring event after shut down" << dendl;
1099 if (on_safe != nullptr) {
1100 m_image_ctx.op_work_queue->queue(on_safe, -ESHUTDOWN);
1101 }
1102 return nullptr;
1103 }
1104
7c673cae
FG
1105 ++m_in_flight_aio_flush;
1106
1107 // associate all prior write/discard ops to this flush request
1108 auto aio_comp = io::AioCompletion::create_and_start<Context>(
1109 new C_AioFlushComplete(this, on_safe,
1110 std::move(m_aio_modify_unsafe_contexts)),
1111 util::get_image_ctx(&m_image_ctx), io::AIO_TYPE_FLUSH);
1112 m_aio_modify_unsafe_contexts.clear();
1113 return aio_comp;
1114}
1115
1116} // namespace journal
1117} // namespace librbd
1118
1119template class librbd::journal::Replay<librbd::ImageCtx>;