}
};
-template <typename I>
-struct Replayer<I>::C_TrackedOp : public Context {
- Replayer *replayer;
- Context* ctx;
-
- C_TrackedOp(Replayer* replayer, Context* ctx)
- : replayer(replayer), ctx(ctx) {
- replayer->m_in_flight_op_tracker.start_op();
- }
-
- void finish(int r) override {
- ctx->complete(r);
- replayer->m_in_flight_op_tracker.finish_op();
- }
-};
-
template <typename I>
struct Replayer<I>::RemoteJournalerListener
: public ::journal::JournalMetadataListener {
RemoteJournalerListener(Replayer* replayer) : replayer(replayer) {}
void handle_update(::journal::JournalMetadata*) override {
- auto ctx = new C_TrackedOp(replayer, new LambdaContext([this](int r) {
- replayer->handle_remote_journal_metadata_updated();
- }));
+ auto ctx = new C_TrackedOp(
+ replayer->m_in_flight_op_tracker,
+ new LambdaContext([this](int r) {
+ replayer->handle_remote_journal_metadata_updated();
+ }));
replayer->m_threads->work_queue->queue(ctx, 0);
}
};
void Replayer<I>::flush(Context* on_finish) {
dout(10) << dendl;
- flush_local_replay(new C_TrackedOp(this, on_finish));
+ flush_local_replay(new C_TrackedOp(m_in_flight_op_tracker, on_finish));
}
template <typename I>
return false;
}
- on_finish = new C_TrackedOp(this, on_finish);
+ on_finish = new C_TrackedOp(m_in_flight_op_tracker, on_finish);
return m_replay_status_formatter->get_or_send_update(description,
on_finish);
}
}
}
+ m_replay_status_formatter->handle_entry_processed(m_replay_bytes);
+
if (update_status) {
unregister_perf_counters();
register_perf_counters();
dout(10) << dendl;
- auto ctx = new C_TrackedOp(this, new LambdaContext(
+ auto ctx = new C_TrackedOp(m_in_flight_op_tracker, new LambdaContext(
[this](int) {
m_replayer_listener->handle_notification();
}));