]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #ifndef RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H | |
5 | #define RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H | |
6 | ||
7 | #include "tools/rbd_mirror/image_replayer/Replayer.h" | |
8 | #include "include/utime.h" | |
9 | #include "common/AsyncOpTracker.h" | |
10 | #include "common/ceph_mutex.h" | |
11 | #include "common/RefCountedObj.h" | |
12 | #include "cls/journal/cls_journal_types.h" | |
13 | #include "journal/ReplayEntry.h" | |
14 | #include "librbd/ImageCtx.h" | |
15 | #include "librbd/journal/Types.h" | |
16 | #include "librbd/journal/TypeTraits.h" | |
17 | #include <string> | |
18 | #include <type_traits> | |
19 | ||
20 | namespace journal { class Journaler; } | |
21 | namespace librbd { | |
22 | ||
23 | struct ImageCtx; | |
24 | namespace journal { template <typename I> class Replay; } | |
25 | ||
26 | } // namespace librbd | |
27 | ||
28 | namespace rbd { | |
29 | namespace mirror { | |
30 | ||
31 | template <typename> struct Threads; | |
32 | ||
33 | namespace image_replayer { | |
34 | ||
35 | struct ReplayerListener; | |
36 | ||
37 | namespace journal { | |
38 | ||
39 | template <typename> class EventPreprocessor; | |
40 | template <typename> class ReplayStatusFormatter; | |
41 | template <typename> class StateBuilder; | |
42 | ||
43 | template <typename ImageCtxT> | |
44 | class Replayer : public image_replayer::Replayer { | |
45 | public: | |
46 | typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler; | |
47 | ||
48 | static Replayer* create( | |
49 | Threads<ImageCtxT>* threads, | |
50 | const std::string& local_mirror_uuid, | |
51 | StateBuilder<ImageCtxT>* state_builder, | |
52 | ReplayerListener* replayer_listener) { | |
53 | return new Replayer(threads, local_mirror_uuid, state_builder, | |
54 | replayer_listener); | |
55 | } | |
56 | ||
57 | Replayer( | |
58 | Threads<ImageCtxT>* threads, | |
59 | const std::string& local_mirror_uuid, | |
60 | StateBuilder<ImageCtxT>* state_builder, | |
61 | ReplayerListener* replayer_listener); | |
62 | ~Replayer(); | |
63 | ||
64 | void destroy() override { | |
65 | delete this; | |
66 | } | |
67 | ||
68 | void init(Context* on_finish) override; | |
69 | void shut_down(Context* on_finish) override; | |
70 | ||
71 | void flush(Context* on_finish) override; | |
72 | ||
73 | bool get_replay_status(std::string* description, Context* on_finish) override; | |
74 | ||
75 | bool is_replaying() const override { | |
76 | std::unique_lock locker{m_lock}; | |
77 | return (m_state == STATE_REPLAYING); | |
78 | } | |
79 | ||
80 | bool is_resync_requested() const override { | |
81 | std::unique_lock locker(m_lock); | |
82 | return m_resync_requested; | |
83 | } | |
84 | ||
85 | int get_error_code() const override { | |
86 | std::unique_lock locker(m_lock); | |
87 | return m_error_code; | |
88 | } | |
89 | ||
90 | std::string get_error_description() const override { | |
91 | std::unique_lock locker(m_lock); | |
92 | return m_error_description; | |
93 | } | |
94 | ||
95 | std::string get_image_spec() const { | |
96 | std::unique_lock locker(m_lock); | |
97 | return m_image_spec; | |
98 | } | |
99 | ||
100 | private: | |
101 | /** | |
102 | * @verbatim | |
103 | * | |
104 | * <init> | |
105 | * | | |
106 | * v (error) | |
107 | * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * * * * | |
108 | * | * | |
109 | * v (error) * | |
110 | * START_EXTERNAL_REPLAY * * * * * * * * * * * * * * * * * * * | |
111 | * | * | |
112 | * | /--------------------------------------------\ * | |
113 | * | | | * | |
114 | * v v (asok flush) | * | |
115 | * REPLAYING -------------> LOCAL_REPLAY_FLUSH | * | |
116 | * | \ | | * | |
117 | * | | v | * | |
118 | * | | FLUSH_COMMIT_POSITION | * | |
119 | * | | | | * | |
120 | * | | \--------------------/| * | |
121 | * | | | * | |
122 | * | | (entries available) | * | |
123 | * | \-----------> REPLAY_READY | * | |
124 | * | | | * | |
125 | * | | (skip if not | * | |
126 | * | v needed) (error) * | |
127 | * | REPLAY_FLUSH * * * * * * * * * * | |
128 | * | | | * * | |
129 | * | | (skip if not | * * | |
130 | * | v needed) (error) * * | |
131 | * | GET_REMOTE_TAG * * * * * * * * * | |
132 | * | | | * * | |
133 | * | | (skip if not | * * | |
134 | * | v needed) (error) * * | |
135 | * | ALLOCATE_LOCAL_TAG * * * * * * * | |
136 | * | | | * * | |
137 | * | v (error) * * | |
138 | * | PREPROCESS_ENTRY * * * * * * * * | |
139 | * | | | * * | |
140 | * | v (error) * * | |
141 | * | PROCESS_ENTRY * * * * * * * * * * | |
142 | * | | | * * | |
143 | * | \---------------------/ * * | |
144 | * v (shutdown) * * | |
145 | * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * * * | |
146 | * | * | |
147 | * v * | |
148 | * SHUT_DOWN_LOCAL_JOURNAL_REPLAY * | |
149 | * | * | |
150 | * v * | |
151 | * WAIT_FOR_REPLAY * | |
152 | * | * | |
153 | * v * | |
154 | * CLOSE_LOCAL_IMAGE < * * * * * * * * * * * * * * * * * * * * | |
155 | * | | |
156 | * v (skip if not started) | |
157 | * STOP_REMOTE_JOURNALER_REPLAY | |
158 | * | | |
159 | * v | |
160 | * WAIT_FOR_IN_FLIGHT_OPS | |
161 | * | | |
162 | * v | |
163 | * <shutdown> | |
164 | * | |
165 | * @endverbatim | |
166 | */ | |
167 | ||
168 | typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry; | |
169 | ||
170 | enum State { | |
171 | STATE_INIT, | |
172 | STATE_REPLAYING, | |
173 | STATE_COMPLETE | |
174 | }; | |
175 | ||
176 | struct C_ReplayCommitted; | |
9f95a23c TL |
177 | struct RemoteJournalerListener; |
178 | struct RemoteReplayHandler; | |
179 | struct LocalJournalListener; | |
180 | ||
181 | Threads<ImageCtxT>* m_threads; | |
182 | std::string m_local_mirror_uuid; | |
183 | StateBuilder<ImageCtxT>* m_state_builder; | |
184 | ReplayerListener* m_replayer_listener; | |
185 | ||
186 | mutable ceph::mutex m_lock; | |
187 | ||
188 | std::string m_image_spec; | |
189 | Context* m_on_init_shutdown = nullptr; | |
190 | ||
191 | State m_state = STATE_INIT; | |
192 | int m_error_code = 0; | |
193 | std::string m_error_description; | |
194 | bool m_resync_requested = false; | |
195 | ||
196 | ceph::ref_t<typename std::remove_pointer<decltype(ImageCtxT::journal)>::type> | |
197 | m_local_journal; | |
198 | RemoteJournalerListener* m_remote_listener = nullptr; | |
199 | ||
200 | librbd::journal::Replay<ImageCtxT>* m_local_journal_replay = nullptr; | |
201 | EventPreprocessor<ImageCtxT>* m_event_preprocessor = nullptr; | |
202 | ReplayStatusFormatter<ImageCtxT>* m_replay_status_formatter = nullptr; | |
203 | RemoteReplayHandler* m_remote_replay_handler = nullptr; | |
204 | LocalJournalListener* m_local_journal_listener = nullptr; | |
205 | ||
206 | PerfCounters *m_perf_counters = nullptr; | |
207 | ||
208 | ReplayEntry m_replay_entry; | |
209 | uint64_t m_replay_bytes = 0; | |
210 | utime_t m_replay_start_time; | |
211 | bool m_replay_tag_valid = false; | |
212 | uint64_t m_replay_tag_tid = 0; | |
213 | cls::journal::Tag m_replay_tag; | |
214 | librbd::journal::TagData m_replay_tag_data; | |
215 | librbd::journal::EventEntry m_event_entry; | |
216 | ||
217 | AsyncOpTracker m_event_replay_tracker; | |
218 | Context *m_delayed_preprocess_task = nullptr; | |
219 | ||
220 | AsyncOpTracker m_in_flight_op_tracker; | |
221 | Context *m_flush_local_replay_task = nullptr; | |
222 | ||
223 | void handle_remote_journal_metadata_updated(); | |
224 | ||
225 | void schedule_flush_local_replay_task(); | |
226 | void cancel_flush_local_replay_task(); | |
227 | void handle_flush_local_replay_task(int r); | |
228 | ||
229 | void flush_local_replay(Context* on_flush); | |
230 | void handle_flush_local_replay(Context* on_flush, int r); | |
231 | ||
232 | void flush_commit_position(Context* on_flush); | |
233 | void handle_flush_commit_position(Context* on_flush, int r); | |
234 | ||
235 | void init_remote_journaler(); | |
236 | void handle_init_remote_journaler(int r); | |
237 | ||
238 | void start_external_replay(); | |
239 | void handle_start_external_replay(int r); | |
240 | ||
241 | bool notify_init_complete(std::unique_lock<ceph::mutex>& locker); | |
242 | ||
243 | void shut_down_local_journal_replay(); | |
244 | void handle_shut_down_local_journal_replay(int r); | |
245 | ||
246 | void wait_for_event_replay(); | |
247 | void handle_wait_for_event_replay(int r); | |
248 | ||
249 | void close_local_image(); | |
250 | void handle_close_local_image(int r); | |
251 | ||
252 | void stop_remote_journaler_replay(); | |
253 | void handle_stop_remote_journaler_replay(int r); | |
254 | ||
255 | void wait_for_in_flight_ops(); | |
256 | void handle_wait_for_in_flight_ops(int r); | |
257 | ||
258 | void replay_flush(); | |
259 | void handle_replay_flush_shut_down(int r); | |
260 | void handle_replay_flush(int r); | |
261 | ||
262 | void get_remote_tag(); | |
263 | void handle_get_remote_tag(int r); | |
264 | ||
265 | void allocate_local_tag(); | |
266 | void handle_allocate_local_tag(int r); | |
267 | ||
268 | void handle_replay_error(int r, const std::string &error); | |
269 | ||
270 | bool is_replay_complete() const; | |
271 | bool is_replay_complete(const std::unique_lock<ceph::mutex>& locker) const; | |
272 | ||
273 | void handle_replay_complete(int r, const std::string &error_desc); | |
274 | void handle_replay_complete(const std::unique_lock<ceph::mutex>&, | |
275 | int r, const std::string &error_desc); | |
276 | void handle_replay_ready(); | |
277 | void handle_replay_ready(std::unique_lock<ceph::mutex>& locker); | |
278 | ||
279 | void preprocess_entry(); | |
280 | void handle_delayed_preprocess_task(int r); | |
281 | void handle_preprocess_entry_ready(int r); | |
282 | void handle_preprocess_entry_safe(int r); | |
283 | ||
284 | void process_entry(); | |
285 | void handle_process_entry_ready(int r); | |
286 | void handle_process_entry_safe(const ReplayEntry& replay_entry, | |
287 | uint64_t relay_bytes, | |
288 | const utime_t &replay_start_time, int r); | |
289 | ||
290 | void handle_resync_image(); | |
291 | ||
292 | void notify_status_updated(); | |
293 | ||
294 | void cancel_delayed_preprocess_task(); | |
295 | ||
296 | int validate_remote_client_state( | |
297 | const cls::journal::Client& remote_client, | |
298 | librbd::journal::MirrorPeerClientMeta* remote_client_meta, | |
299 | bool* resync_requested, std::string* error); | |
300 | ||
301 | void register_perf_counters(); | |
302 | void unregister_perf_counters(); | |
303 | ||
304 | }; | |
305 | ||
306 | } // namespace journal | |
307 | } // namespace image_replayer | |
308 | } // namespace mirror | |
309 | } // namespace rbd | |
310 | ||
311 | extern template class rbd::mirror::image_replayer::journal::Replayer<librbd::ImageCtx>; | |
312 | ||
313 | #endif // RBD_MIRROR_IMAGE_REPLAYER_JOURNAL_REPLAYER_H |