]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageReplayer.h
3ea41dc5378f50abc88d9c7b26272b27ac974811
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
5 #define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
6
7 #include "common/AsyncOpTracker.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/rados/librados.hpp"
11 #include "cls/journal/cls_journal_types.h"
12 #include "cls/rbd/cls_rbd_types.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayEntry.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/journal/Types.h"
17 #include "librbd/journal/TypeTraits.h"
18 #include "ImageDeleter.h"
19 #include "ProgressContext.h"
20 #include "types.h"
21 #include "tools/rbd_mirror/image_replayer/Types.h"
22
23 #include <boost/noncopyable.hpp>
24 #include <boost/optional.hpp>
25
26 #include <set>
27 #include <map>
28 #include <atomic>
29 #include <string>
30 #include <vector>
31
32 class AdminSocketHook;
33
34 namespace journal {
35
36 class Journaler;
37 class ReplayHandler;
38
39 }
40
41 namespace librbd {
42
43 class ImageCtx;
44 namespace journal { template <typename> class Replay; }
45
46 }
47
48 namespace rbd {
49 namespace mirror {
50
51 template <typename> struct InstanceWatcher;
52 template <typename> struct Threads;
53
54 namespace image_replayer { template <typename> class BootstrapRequest; }
55 namespace image_replayer { template <typename> class EventPreprocessor; }
56 namespace image_replayer { template <typename> class ReplayStatusFormatter; }
57
58 /**
59 * Replays changes from a remote cluster for a single image.
60 */
61 template <typename ImageCtxT = librbd::ImageCtx>
62 class ImageReplayer {
63 public:
64 static ImageReplayer *create(
65 Threads<librbd::ImageCtx> *threads, ImageDeleter<ImageCtxT>* image_deleter,
66 InstanceWatcher<ImageCtxT> *instance_watcher,
67 RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
68 const std::string &global_image_id) {
69 return new ImageReplayer(threads, image_deleter, instance_watcher,
70 local, local_mirror_uuid, local_pool_id,
71 global_image_id);
72 }
73 void destroy() {
74 delete this;
75 }
76
77 ImageReplayer(Threads<librbd::ImageCtx> *threads,
78 ImageDeleter<ImageCtxT>* image_deleter,
79 InstanceWatcher<ImageCtxT> *instance_watcher,
80 RadosRef local, const std::string &local_mirror_uuid,
81 int64_t local_pool_id, const std::string &global_image_id);
82 virtual ~ImageReplayer();
83 ImageReplayer(const ImageReplayer&) = delete;
84 ImageReplayer& operator=(const ImageReplayer&) = delete;
85
86 bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); }
87 bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
88 bool is_replaying() { Mutex::Locker l(m_lock); return is_replaying_(); }
89
90 std::string get_name() { Mutex::Locker l(m_lock); return m_name; };
91 void set_state_description(int r, const std::string &desc);
92
93 inline bool is_blacklisted() const {
94 Mutex::Locker locker(m_lock);
95 return (m_last_r == -EBLACKLISTED);
96 }
97
98 image_replayer::HealthState get_health_state() const;
99
100 void add_remote_image(const std::string &remote_mirror_uuid,
101 const std::string &remote_image_id,
102 librados::IoCtx &remote_io_ctx);
103 void remove_remote_image(const std::string &remote_mirror_uuid,
104 const std::string &remote_image_id,
105 bool schedule_delete);
106 bool remote_images_empty() const;
107
108 inline int64_t get_local_pool_id() const {
109 return m_local_pool_id;
110 }
111 inline const std::string& get_global_image_id() const {
112 return m_global_image_id;
113 }
114
115 void start(Context *on_finish = nullptr, bool manual = false);
116 void stop(Context *on_finish = nullptr, bool manual = false,
117 int r = 0, const std::string& desc = "");
118 void restart(Context *on_finish = nullptr);
119 void flush(Context *on_finish = nullptr);
120
121 void resync_image(Context *on_finish=nullptr);
122
123 void print_status(Formatter *f, stringstream *ss);
124
125 virtual void handle_replay_ready();
126 virtual void handle_replay_complete(int r, const std::string &error_desc);
127
128 protected:
129 /**
130 * @verbatim
131 * (error)
132 * <uninitialized> <------------------------------------ FAIL
133 * | ^
134 * v *
135 * <starting> *
136 * | *
137 * v (error) *
138 * PREPARE_LOCAL_IMAGE * * * * * * * * * * * * * * * * * *
139 * | *
140 * v (error) *
141 * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * *
142 * | *
143 * v (error) *
144 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * *
145 * | *
146 * v (error) *
147 * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
148 * |
149 * | /--------------------------------------------\
150 * | | |
151 * v v (asok flush) |
152 * REPLAYING -------------> LOCAL_REPLAY_FLUSH |
153 * | \ | |
154 * | | v |
155 * | | FLUSH_COMMIT_POSITION |
156 * | | | |
157 * | | \--------------------/|
158 * | | |
159 * | | (entries available) |
160 * | \-----------> REPLAY_READY |
161 * | | |
162 * | | (skip if not |
163 * | v needed) (error)
164 * | REPLAY_FLUSH * * * * * * * * *
165 * | | | *
166 * | | (skip if not | *
167 * | v needed) (error) *
168 * | GET_REMOTE_TAG * * * * * * * *
169 * | | | *
170 * | | (skip if not | *
171 * | v needed) (error) *
172 * | ALLOCATE_LOCAL_TAG * * * * * *
173 * | | | *
174 * | v (error) *
175 * | PREPROCESS_ENTRY * * * * * * *
176 * | | | *
177 * | v (error) *
178 * | PROCESS_ENTRY * * * * * * * * *
179 * | | | *
180 * | \---------------------/ *
181 * v *
182 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * *
183 * |
184 * v
185 * JOURNAL_REPLAY_SHUT_DOWN
186 * |
187 * v
188 * LOCAL_IMAGE_CLOSE
189 * |
190 * v
191 * <stopped>
192 *
193 * @endverbatim
194 */
195
196 virtual void on_start_fail(int r, const std::string &desc = "");
197 virtual bool on_start_interrupted();
198
199 virtual void on_stop_journal_replay(int r = 0, const std::string &desc = "");
200
201 virtual void on_flush_local_replay_flush_start(Context *on_flush);
202 virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
203 virtual void on_flush_flush_commit_position_start(Context *on_flush);
204 virtual void on_flush_flush_commit_position_finish(Context *on_flush, int r);
205
206 bool on_replay_interrupted();
207
208 private:
209 typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
210
211 enum State {
212 STATE_UNKNOWN,
213 STATE_STARTING,
214 STATE_REPLAYING,
215 STATE_REPLAY_FLUSHING,
216 STATE_STOPPING,
217 STATE_STOPPED,
218 };
219
220 struct RemoteImage {
221 std::string mirror_uuid;
222 std::string image_id;
223 librados::IoCtx io_ctx;
224
225 RemoteImage() {
226 }
227 RemoteImage(const std::string &mirror_uuid,
228 const std::string &image_id)
229 : mirror_uuid(mirror_uuid), image_id(image_id) {
230 }
231 RemoteImage(const std::string &mirror_uuid,
232 const std::string &image_id,
233 librados::IoCtx &io_ctx)
234 : mirror_uuid(mirror_uuid), image_id(image_id), io_ctx(io_ctx) {
235 }
236
237 inline bool operator<(const RemoteImage &rhs) const {
238 if (mirror_uuid != rhs.mirror_uuid) {
239 return mirror_uuid < rhs.mirror_uuid;
240 } else {
241 return image_id < rhs.image_id;
242 }
243 }
244 inline bool operator==(const RemoteImage &rhs) const {
245 return (mirror_uuid == rhs.mirror_uuid && image_id == rhs.image_id);
246 }
247 };
248
249 typedef std::set<RemoteImage> RemoteImages;
250
251 typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
252 typedef boost::optional<State> OptionalState;
253 typedef boost::optional<cls::rbd::MirrorImageStatusState>
254 OptionalMirrorImageStatusState;
255
256 struct JournalListener : public librbd::journal::Listener {
257 ImageReplayer *img_replayer;
258
259 JournalListener(ImageReplayer *img_replayer)
260 : img_replayer(img_replayer) {
261 }
262
263 void handle_close() override {
264 img_replayer->on_stop_journal_replay();
265 }
266
267 void handle_promoted() override {
268 img_replayer->on_stop_journal_replay(0, "force promoted");
269 }
270
271 void handle_resync() override {
272 img_replayer->resync_image();
273 }
274 };
275
276 class BootstrapProgressContext : public ProgressContext {
277 public:
278 BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
279 replayer(replayer) {
280 }
281
282 void update_progress(const std::string &description,
283 bool flush = true) override;
284 private:
285 ImageReplayer<ImageCtxT> *replayer;
286 };
287
288 Threads<librbd::ImageCtx> *m_threads;
289 ImageDeleter<ImageCtxT>* m_image_deleter;
290 InstanceWatcher<ImageCtxT> *m_instance_watcher;
291
292 RemoteImages m_remote_images;
293 RemoteImage m_remote_image;
294
295 RadosRef m_local;
296 std::string m_local_mirror_uuid;
297 int64_t m_local_pool_id;
298 std::string m_local_image_id;
299 std::string m_global_image_id;
300 std::string m_name;
301 mutable Mutex m_lock;
302 State m_state = STATE_STOPPED;
303 std::string m_state_desc;
304
305 OptionalMirrorImageStatusState m_mirror_image_status_state = boost::none;
306 int m_last_r = 0;
307
308 BootstrapProgressContext m_progress_cxt;
309 bool m_do_resync{false};
310 image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
311 image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
312 nullptr;
313 librados::IoCtx m_local_ioctx;
314 ImageCtxT *m_local_image_ctx = nullptr;
315 std::string m_local_image_tag_owner;
316
317 decltype(ImageCtxT::journal) m_local_journal = nullptr;
318 librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
319 Journaler* m_remote_journaler = nullptr;
320 ::journal::ReplayHandler *m_replay_handler = nullptr;
321 librbd::journal::Listener *m_journal_listener;
322 bool m_stopping_for_resync = false;
323
324 Context *m_on_start_finish = nullptr;
325 Context *m_on_stop_finish = nullptr;
326 Context *m_update_status_task = nullptr;
327 int m_update_status_interval = 0;
328 librados::AioCompletion *m_update_status_comp = nullptr;
329 bool m_stop_requested = false;
330 bool m_manual_stop = false;
331
332 AdminSocketHook *m_asok_hook = nullptr;
333
334 image_replayer::BootstrapRequest<ImageCtxT> *m_bootstrap_request = nullptr;
335
336 uint32_t m_in_flight_status_updates = 0;
337 bool m_update_status_requested = false;
338 Context *m_on_update_status_finish = nullptr;
339
340 librbd::journal::MirrorPeerClientMeta m_client_meta;
341
342 ReplayEntry m_replay_entry;
343 bool m_replay_tag_valid = false;
344 uint64_t m_replay_tag_tid = 0;
345 cls::journal::Tag m_replay_tag;
346 librbd::journal::TagData m_replay_tag_data;
347 librbd::journal::EventEntry m_event_entry;
348 AsyncOpTracker m_event_replay_tracker;
349 Context *m_delayed_preprocess_task = nullptr;
350
351 struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
352 ImageReplayer *replayer;
353
354 RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { }
355
356 void handle_update(::journal::JournalMetadata *) override;
357 } m_remote_listener;
358
359 struct C_ReplayCommitted : public Context {
360 ImageReplayer *replayer;
361 ReplayEntry replay_entry;
362
363 C_ReplayCommitted(ImageReplayer *replayer,
364 ReplayEntry &&replay_entry)
365 : replayer(replayer), replay_entry(std::move(replay_entry)) {
366 }
367 void finish(int r) override {
368 replayer->handle_process_entry_safe(replay_entry, r);
369 }
370 };
371
372 static std::string to_string(const State state);
373
374 bool is_stopped_() const {
375 return m_state == STATE_STOPPED;
376 }
377 bool is_running_() const {
378 return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
379 }
380 bool is_replaying_() const {
381 return (m_state == STATE_REPLAYING ||
382 m_state == STATE_REPLAY_FLUSHING);
383 }
384
385 bool update_mirror_image_status(bool force, const OptionalState &state);
386 bool start_mirror_image_status_update(bool force, bool restarting);
387 void finish_mirror_image_status_update();
388 void queue_mirror_image_status_update(const OptionalState &state);
389 void send_mirror_status_update(const OptionalState &state);
390 void handle_mirror_status_update(int r);
391 void reschedule_update_status_task(int new_interval = 0);
392
393 void shut_down(int r);
394 void handle_shut_down(int r);
395 void handle_remote_journal_metadata_updated();
396
397 void prepare_local_image();
398 void handle_prepare_local_image(int r);
399
400 void bootstrap();
401 void handle_bootstrap(int r);
402
403 void init_remote_journaler();
404 void handle_init_remote_journaler(int r);
405
406 void start_replay();
407 void handle_start_replay(int r);
408
409 void replay_flush();
410 void handle_replay_flush(int r);
411
412 void get_remote_tag();
413 void handle_get_remote_tag(int r);
414
415 void allocate_local_tag();
416 void handle_allocate_local_tag(int r);
417
418 void preprocess_entry();
419 void handle_preprocess_entry_ready(int r);
420 void handle_preprocess_entry_safe(int r);
421
422 void process_entry();
423 void handle_process_entry_ready(int r);
424 void handle_process_entry_safe(const ReplayEntry& replay_entry, int r);
425
426 };
427
428 } // namespace mirror
429 } // namespace rbd
430
431 extern template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;
432
433 #endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H