]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/image_replayer/journal/Replayer.h
f5d59a07f8a4733d1f9e864dab4be2e815aea1fe
[ceph.git] / ceph / src / tools / rbd_mirror / image_replayer / journal / Replayer.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
5 #define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H
6
7 #include "tools/rbd_mirror/image_replayer/Replayer.h"
8 #include "include/utime.h"
9 #include "common/AsyncOpTracker.h"
10 #include "common/ceph_mutex.h"
11 #include "common/RefCountedObj.h"
12 #include "cls/journal/cls_journal_types.h"
13 #include "journal/ReplayEntry.h"
14 #include "librbd/ImageCtx.h"
15 #include "librbd/journal/Types.h"
16 #include "librbd/journal/TypeTraits.h"
17 #include <string>
18 #include <type_traits>
19
20 namespace journal { class Journaler; }
21 namespace librbd {
22
23 struct ImageCtx;
24 namespace journal { template <typename I> class Replay; }
25
26 } // namespace librbd
27
28 namespace rbd {
29 namespace mirror {
30
31 template <typename> struct Threads;
32
33 namespace image_replayer {
34
35 struct ReplayerListener;
36
37 namespace journal {
38
39 template <typename> class EventPreprocessor;
40 template <typename> class ReplayStatusFormatter;
41 template <typename> class StateBuilder;
42
43 template <typename ImageCtxT>
44 class Replayer : public image_replayer::Replayer {
45 public:
46 typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
47
48 static Replayer* create(
49 Threads<ImageCtxT>* threads,
50 const std::string& local_mirror_uuid,
51 StateBuilder<ImageCtxT>* state_builder,
52 ReplayerListener* replayer_listener) {
53 return new Replayer(threads, local_mirror_uuid, state_builder,
54 replayer_listener);
55 }
56
57 Replayer(
58 Threads<ImageCtxT>* threads,
59 const std::string& local_mirror_uuid,
60 StateBuilder<ImageCtxT>* state_builder,
61 ReplayerListener* replayer_listener);
62 ~Replayer();
63
64 void destroy() override {
65 delete this;
66 }
67
68 void init(Context* on_finish) override;
69 void shut_down(Context* on_finish) override;
70
71 void flush(Context* on_finish) override;
72
73 bool get_replay_status(std::string* description, Context* on_finish) override;
74
75 bool is_replaying() const override {
76 std::unique_lock locker{m_lock};
77 return (m_state == STATE_REPLAYING);
78 }
79
80 bool is_resync_requested() const override {
81 std::unique_lock locker(m_lock);
82 return m_resync_requested;
83 }
84
85 int get_error_code() const override {
86 std::unique_lock locker(m_lock);
87 return m_error_code;
88 }
89
90 std::string get_error_description() const override {
91 std::unique_lock locker(m_lock);
92 return m_error_description;
93 }
94
95 std::string get_image_spec() const {
96 std::unique_lock locker(m_lock);
97 return m_image_spec;
98 }
99
100 private:
101 /**
102 * @verbatim
103 *
104 * <init>
105 * |
106 * v (error)
107 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * * *
108 * | *
109 * v (error) *
110 * START_EXTERNAL_REPLAY * * * * * * * * * * * * * * * * * * *
111 * | *
112 * | /--------------------------------------------\ *
113 * | | | *
114 * v v (asok flush) | *
115 * REPLAYING -------------> LOCAL_REPLAY_FLUSH | *
116 * | \ | | *
117 * | | v | *
118 * | | FLUSH_COMMIT_POSITION | *
119 * | | | | *
120 * | | \--------------------/| *
121 * | | | *
122 * | | (entries available) | *
123 * | \-----------> REPLAY_READY | *
124 * | | | *
125 * | | (skip if not | *
126 * | v needed) (error) *
127 * | REPLAY_FLUSH * * * * * * * * * *
128 * | | | * *
129 * | | (skip if not | * *
130 * | v needed) (error) * *
131 * | GET_REMOTE_TAG * * * * * * * * *
132 * | | | * *
133 * | | (skip if not | * *
134 * | v needed) (error) * *
135 * | ALLOCATE_LOCAL_TAG * * * * * * *
136 * | | | * *
137 * | v (error) * *
138 * | PREPROCESS_ENTRY * * * * * * * *
139 * | | | * *
140 * | v (error) * *
141 * | PROCESS_ENTRY * * * * * * * * * *
142 * | | | * *
143 * | \---------------------/ * *
144 * v (shutdown) * *
145 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * *
146 * | *
147 * v *
148 * SHUT_DOWN_LOCAL_JOURNAL_REPLAY *
149 * | *
150 * v *
151 * WAIT_FOR_REPLAY *
152 * | *
153 * v *
154 * CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * *
155 * |
156 * v (skip if not started)
157 * STOP_REMOTE_JOURNALER_REPLAY
158 * |
159 * v
160 * WAIT_FOR_IN_FLIGHT_OPS
161 * |
162 * v
163 * <shutdown>
164 *
165 * @endverbatim
166 */
167
168 typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
169
170 enum State {
171 STATE_INIT,
172 STATE_REPLAYING,
173 STATE_COMPLETE
174 };
175
176 struct C_ReplayCommitted;
177 struct RemoteJournalerListener;
178 struct RemoteReplayHandler;
179 struct LocalJournalListener;
180
181 Threads<ImageCtxT>* m_threads;
182 std::string m_local_mirror_uuid;
183 StateBuilder<ImageCtxT>* m_state_builder;
184 ReplayerListener* m_replayer_listener;
185
186 mutable ceph::mutex m_lock;
187
188 std::string m_image_spec;
189 Context* m_on_init_shutdown = nullptr;
190
191 State m_state = STATE_INIT;
192 int m_error_code = 0;
193 std::string m_error_description;
194 bool m_resync_requested = false;
195
196 ceph::ref_t<typename std::remove_pointer<decltype(ImageCtxT::journal)>::type>
197 m_local_journal;
198 RemoteJournalerListener* m_remote_listener = nullptr;
199
200 librbd::journal::Replay<ImageCtxT>* m_local_journal_replay = nullptr;
201 EventPreprocessor<ImageCtxT>* m_event_preprocessor = nullptr;
202 ReplayStatusFormatter<ImageCtxT>* m_replay_status_formatter = nullptr;
203 RemoteReplayHandler* m_remote_replay_handler = nullptr;
204 LocalJournalListener* m_local_journal_listener = nullptr;
205
206 PerfCounters *m_perf_counters = nullptr;
207
208 ReplayEntry m_replay_entry;
209 uint64_t m_replay_bytes = 0;
210 utime_t m_replay_start_time;
211 bool m_replay_tag_valid = false;
212 uint64_t m_replay_tag_tid = 0;
213 cls::journal::Tag m_replay_tag;
214 librbd::journal::TagData m_replay_tag_data;
215 librbd::journal::EventEntry m_event_entry;
216
217 AsyncOpTracker m_event_replay_tracker;
218 Context *m_delayed_preprocess_task = nullptr;
219
220 AsyncOpTracker m_in_flight_op_tracker;
221 Context *m_flush_local_replay_task = nullptr;
222
223 void handle_remote_journal_metadata_updated();
224
225 void schedule_flush_local_replay_task();
226 void cancel_flush_local_replay_task();
227 void handle_flush_local_replay_task(int r);
228
229 void flush_local_replay(Context* on_flush);
230 void handle_flush_local_replay(Context* on_flush, int r);
231
232 void flush_commit_position(Context* on_flush);
233 void handle_flush_commit_position(Context* on_flush, int r);
234
235 void init_remote_journaler();
236 void handle_init_remote_journaler(int r);
237
238 void start_external_replay();
239 void handle_start_external_replay(int r);
240
241 bool notify_init_complete(std::unique_lock<ceph::mutex>& locker);
242
243 void shut_down_local_journal_replay();
244 void handle_shut_down_local_journal_replay(int r);
245
246 void wait_for_event_replay();
247 void handle_wait_for_event_replay(int r);
248
249 void close_local_image();
250 void handle_close_local_image(int r);
251
252 void stop_remote_journaler_replay();
253 void handle_stop_remote_journaler_replay(int r);
254
255 void wait_for_in_flight_ops();
256 void handle_wait_for_in_flight_ops(int r);
257
258 void replay_flush();
259 void handle_replay_flush_shut_down(int r);
260 void handle_replay_flush(int r);
261
262 void get_remote_tag();
263 void handle_get_remote_tag(int r);
264
265 void allocate_local_tag();
266 void handle_allocate_local_tag(int r);
267
268 void handle_replay_error(int r, const std::string &error);
269
270 bool is_replay_complete() const;
271 bool is_replay_complete(const std::unique_lock<ceph::mutex>& locker) const;
272
273 void handle_replay_complete(int r, const std::string &error_desc);
274 void handle_replay_complete(const std::unique_lock<ceph::mutex>&,
275 int r, const std::string &error_desc);
276 void handle_replay_ready();
277 void handle_replay_ready(std::unique_lock<ceph::mutex>& locker);
278
279 void preprocess_entry();
280 void handle_delayed_preprocess_task(int r);
281 void handle_preprocess_entry_ready(int r);
282 void handle_preprocess_entry_safe(int r);
283
284 void process_entry();
285 void handle_process_entry_ready(int r);
286 void handle_process_entry_safe(const ReplayEntry& replay_entry,
287 uint64_t relay_bytes,
288 const utime_t &replay_start_time, int r);
289
290 void handle_resync_image();
291
292 void notify_status_updated();
293
294 void cancel_delayed_preprocess_task();
295
296 int validate_remote_client_state(
297 const cls::journal::Client& remote_client,
298 librbd::journal::MirrorPeerClientMeta* remote_client_meta,
299 bool* resync_requested, std::string* error);
300
301 void register_perf_counters();
302 void unregister_perf_counters();
303
304 };
305
306 } // namespace journal
307 } // namespace image_replayer
308 } // namespace mirror
309 } // namespace rbd
310
311 extern template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>;
312
313 #endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H