]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/image_replayer/journal/Replayer.h
import 15.2.4
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / journal / Replayer.h
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
5#define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
6
7#include "tools/rbd_mirror/image_replayer/Replayer.h"
8#include "include/utime.h"
9#include "common/AsyncOpTracker.h"
10#include "common/ceph_mutex.h"
11#include "common/RefCountedObj.h"
12#include "cls/journal/cls_journal_types.h"
13#include "journal/ReplayEntry.h"
14#include "librbd/ImageCtx.h"
15#include "librbd/journal/Types.h"
16#include "librbd/journal/TypeTraits.h"
17#include <string>
18#include <type_traits>
19
20namespace journal { class Journaler; }
21namespace librbd {
22
23struct ImageCtx;
24namespace journal { template <typename I> class Replay; }
25
26} // namespace librbd
27
28namespace rbd {
29namespace mirror {
30
31template <typename> struct Threads;
32
33namespace image_replayer {
34
35struct ReplayerListener;
36
37namespace journal {
38
39template <typename> class EventPreprocessor;
40template <typename> class ReplayStatusFormatter;
41template <typename> class StateBuilder;
42
43template <typename ImageCtxT>
44class Replayer : public image_replayer::Replayer {
45public:
46 typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
47
48 static Replayer* create(
49 Threads<ImageCtxT>* threads,
50 const std::string& local_mirror_uuid,
51 StateBuilder<ImageCtxT>* state_builder,
52 ReplayerListener* replayer_listener) {
53 return new Replayer(threads, local_mirror_uuid, state_builder,
54 replayer_listener);
55 }
56
57 Replayer(
58 Threads<ImageCtxT>* threads,
59 const std::string& local_mirror_uuid,
60 StateBuilder<ImageCtxT>* state_builder,
61 ReplayerListener* replayer_listener);
62 ~Replayer();
63
64 void destroy() override {
65 delete this;
66 }
67
68 void init(Context* on_finish) override;
69 void shut_down(Context* on_finish) override;
70
71 void flush(Context* on_finish) override;
72
73 bool get_replay_status(std::string* description, Context* on_finish) override;
74
75 bool is_replaying() const override {
76 std::unique_lock locker{m_lock};
77 return (m_state == STATE_REPLAYING);
78 }
79
80 bool is_resync_requested() const override {
81 std::unique_lock locker(m_lock);
82 return m_resync_requested;
83 }
84
85 int get_error_code() const override {
86 std::unique_lock locker(m_lock);
87 return m_error_code;
88 }
89
90 std::string get_error_description() const override {
91 std::unique_lock locker(m_lock);
92 return m_error_description;
93 }
94
95 std::string get_image_spec() const {
96 std::unique_lock locker(m_lock);
97 return m_image_spec;
98 }
99
100private:
101 /**
102 * @verbatim
103 *
104 * <init>
105 * |
106 * v (error)
107 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * * *
108 * | *
109 * v (error) *
110 * START_EXTERNAL_REPLAY * * * * * * * * * * * * * * * * * * *
111 * | *
112 * | /--------------------------------------------\ *
113 * | | | *
114 * v v (asok flush) | *
115 * REPLAYING -------------> LOCAL_REPLAY_FLUSH | *
116 * | \ | | *
117 * | | v | *
118 * | | FLUSH_COMMIT_POSITION | *
119 * | | | | *
120 * | | \--------------------/| *
121 * | | | *
122 * | | (entries available) | *
123 * | \-----------> REPLAY_READY | *
124 * | | | *
125 * | | (skip if not | *
126 * | v needed) (error) *
127 * | REPLAY_FLUSH * * * * * * * * * *
128 * | | | * *
129 * | | (skip if not | * *
130 * | v needed) (error) * *
131 * | GET_REMOTE_TAG * * * * * * * * *
132 * | | | * *
133 * | | (skip if not | * *
134 * | v needed) (error) * *
135 * | ALLOCATE_LOCAL_TAG * * * * * * *
136 * | | | * *
137 * | v (error) * *
138 * | PREPROCESS_ENTRY * * * * * * * *
139 * | | | * *
140 * | v (error) * *
141 * | PROCESS_ENTRY * * * * * * * * * *
142 * | | | * *
143 * | \---------------------/ * *
144 * v (shutdown) * *
145 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * *
146 * | *
147 * v *
e306af50
TL
148 * WAIT_FOR_FLUSH *
149 * | *
150 * v *
9f95a23c
TL
151 * SHUT_DOWN_LOCAL_JOURNAL_REPLAY *
152 * | *
153 * v *
154 * WAIT_FOR_REPLAY *
155 * | *
156 * v *
157 * CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * *
158 * |
159 * v (skip if not started)
160 * STOP_REMOTE_JOURNALER_REPLAY
161 * |
162 * v
163 * WAIT_FOR_IN_FLIGHT_OPS
164 * |
165 * v
166 * <shutdown>
167 *
168 * @endverbatim
169 */
170
171 typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
172
173 enum State {
174 STATE_INIT,
175 STATE_REPLAYING,
176 STATE_COMPLETE
177 };
178
179 struct C_ReplayCommitted;
9f95a23c
TL
180 struct RemoteJournalerListener;
181 struct RemoteReplayHandler;
182 struct LocalJournalListener;
183
184 Threads<ImageCtxT>* m_threads;
185 std::string m_local_mirror_uuid;
186 StateBuilder<ImageCtxT>* m_state_builder;
187 ReplayerListener* m_replayer_listener;
188
189 mutable ceph::mutex m_lock;
190
191 std::string m_image_spec;
192 Context* m_on_init_shutdown = nullptr;
193
194 State m_state = STATE_INIT;
195 int m_error_code = 0;
196 std::string m_error_description;
197 bool m_resync_requested = false;
198
199 ceph::ref_t<typename std::remove_pointer<decltype(ImageCtxT::journal)>::type>
200 m_local_journal;
201 RemoteJournalerListener* m_remote_listener = nullptr;
202
203 librbd::journal::Replay<ImageCtxT>* m_local_journal_replay = nullptr;
204 EventPreprocessor<ImageCtxT>* m_event_preprocessor = nullptr;
205 ReplayStatusFormatter<ImageCtxT>* m_replay_status_formatter = nullptr;
206 RemoteReplayHandler* m_remote_replay_handler = nullptr;
207 LocalJournalListener* m_local_journal_listener = nullptr;
208
209 PerfCounters *m_perf_counters = nullptr;
210
211 ReplayEntry m_replay_entry;
212 uint64_t m_replay_bytes = 0;
213 utime_t m_replay_start_time;
214 bool m_replay_tag_valid = false;
215 uint64_t m_replay_tag_tid = 0;
216 cls::journal::Tag m_replay_tag;
217 librbd::journal::TagData m_replay_tag_data;
218 librbd::journal::EventEntry m_event_entry;
219
e306af50
TL
220 AsyncOpTracker m_flush_tracker;
221
9f95a23c
TL
222 AsyncOpTracker m_event_replay_tracker;
223 Context *m_delayed_preprocess_task = nullptr;
224
225 AsyncOpTracker m_in_flight_op_tracker;
226 Context *m_flush_local_replay_task = nullptr;
227
228 void handle_remote_journal_metadata_updated();
229
230 void schedule_flush_local_replay_task();
231 void cancel_flush_local_replay_task();
232 void handle_flush_local_replay_task(int r);
233
234 void flush_local_replay(Context* on_flush);
235 void handle_flush_local_replay(Context* on_flush, int r);
236
237 void flush_commit_position(Context* on_flush);
238 void handle_flush_commit_position(Context* on_flush, int r);
239
240 void init_remote_journaler();
241 void handle_init_remote_journaler(int r);
242
e306af50 243 void start_external_replay(std::unique_lock<ceph::mutex>& locker);
9f95a23c
TL
244 void handle_start_external_replay(int r);
245
e306af50
TL
246 bool add_local_journal_listener(std::unique_lock<ceph::mutex>& locker);
247
9f95a23c
TL
248 bool notify_init_complete(std::unique_lock<ceph::mutex>& locker);
249
e306af50
TL
250 void wait_for_flush();
251 void handle_wait_for_flush(int r);
252
9f95a23c
TL
253 void shut_down_local_journal_replay();
254 void handle_shut_down_local_journal_replay(int r);
255
256 void wait_for_event_replay();
257 void handle_wait_for_event_replay(int r);
258
259 void close_local_image();
260 void handle_close_local_image(int r);
261
262 void stop_remote_journaler_replay();
263 void handle_stop_remote_journaler_replay(int r);
264
265 void wait_for_in_flight_ops();
266 void handle_wait_for_in_flight_ops(int r);
267
268 void replay_flush();
269 void handle_replay_flush_shut_down(int r);
270 void handle_replay_flush(int r);
271
272 void get_remote_tag();
273 void handle_get_remote_tag(int r);
274
275 void allocate_local_tag();
276 void handle_allocate_local_tag(int r);
277
278 void handle_replay_error(int r, const std::string &error);
279
280 bool is_replay_complete() const;
281 bool is_replay_complete(const std::unique_lock<ceph::mutex>& locker) const;
282
283 void handle_replay_complete(int r, const std::string &error_desc);
284 void handle_replay_complete(const std::unique_lock<ceph::mutex>&,
285 int r, const std::string &error_desc);
286 void handle_replay_ready();
287 void handle_replay_ready(std::unique_lock<ceph::mutex>& locker);
288
289 void preprocess_entry();
290 void handle_delayed_preprocess_task(int r);
291 void handle_preprocess_entry_ready(int r);
292 void handle_preprocess_entry_safe(int r);
293
294 void process_entry();
295 void handle_process_entry_ready(int r);
296 void handle_process_entry_safe(const ReplayEntry& replay_entry,
297 uint64_t relay_bytes,
298 const utime_t &replay_start_time, int r);
299
300 void handle_resync_image();
301
302 void notify_status_updated();
303
304 void cancel_delayed_preprocess_task();
305
306 int validate_remote_client_state(
307 const cls::journal::Client& remote_client,
308 librbd::journal::MirrorPeerClientMeta* remote_client_meta,
309 bool* resync_requested, std::string* error);
310
311 void register_perf_counters();
312 void unregister_perf_counters();
313
314};
315
316} // namespace journal
317} // namespace image_replayer
318} // namespace mirror
319} // namespace rbd
320
321extern template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;
322
323#endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H