]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/ImageReplayer.h
update sources to 12.2.7
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
5 #define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
6
7 #include "common/AsyncOpTracker.h"
8 #include "common/Mutex.h"
9 #include "common/WorkQueue.h"
10 #include "include/rados/librados.hpp"
11 #include "cls/journal/cls_journal_types.h"
12 #include "cls/rbd/cls_rbd_types.h"
13 #include "journal/JournalMetadataListener.h"
14 #include "journal/ReplayEntry.h"
15 #include "librbd/ImageCtx.h"
16 #include "librbd/journal/Types.h"
17 #include "librbd/journal/TypeTraits.h"
18 #include "ProgressContext.h"
19 #include "types.h"
20 #include "tools/rbd_mirror/image_replayer/Types.h"
21
22 #include <boost/noncopyable.hpp>
23 #include <boost/optional.hpp>
24
25 #include <set>
26 #include <map>
27 #include <atomic>
28 #include <string>
29 #include <vector>
30
31 class AdminSocketHook;
32
33 namespace journal {
34
35 class Journaler;
36 class ReplayHandler;
37
38 }
39
40 namespace librbd {
41
42 class ImageCtx;
43 namespace journal { template <typename> class Replay; }
44
45 }
46
47 namespace rbd {
48 namespace mirror {
49
50 template <typename> struct ImageDeleter;
51 template <typename> struct InstanceWatcher;
52 template <typename> struct Threads;
53
54 namespace image_replayer { template <typename> class BootstrapRequest; }
55 namespace image_replayer { template <typename> class EventPreprocessor; }
56 namespace image_replayer { template <typename> class ReplayStatusFormatter; }
57
58 /**
59 * Replays changes from a remote cluster for a single image.
60 */
61 template <typename ImageCtxT = librbd::ImageCtx>
62 class ImageReplayer {
63 public:
64 static ImageReplayer *create(
65 Threads<ImageCtxT> *threads, ImageDeleter<ImageCtxT>* image_deleter,
66 InstanceWatcher<ImageCtxT> *instance_watcher,
67 RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
68 const std::string &global_image_id) {
69 return new ImageReplayer(threads, image_deleter, instance_watcher,
70 local, local_mirror_uuid, local_pool_id,
71 global_image_id);
72 }
73 void destroy() {
74 delete this;
75 }
76
77 ImageReplayer(Threads<ImageCtxT> *threads,
78 ImageDeleter<ImageCtxT>* image_deleter,
79 InstanceWatcher<ImageCtxT> *instance_watcher,
80 RadosRef local, const std::string &local_mirror_uuid,
81 int64_t local_pool_id, const std::string &global_image_id);
82 virtual ~ImageReplayer();
83 ImageReplayer(const ImageReplayer&) = delete;
84 ImageReplayer& operator=(const ImageReplayer&) = delete;
85
86 bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); }
87 bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
88 bool is_replaying() { Mutex::Locker l(m_lock); return is_replaying_(); }
89
90 std::string get_name() { Mutex::Locker l(m_lock); return m_name; };
91 void set_state_description(int r, const std::string &desc);
92
93 // TODO temporary until policy handles release of image replayers
94 inline bool is_finished() const {
95 Mutex::Locker locker(m_lock);
96 return m_finished;
97 }
98 inline void set_finished(bool finished) {
99 Mutex::Locker locker(m_lock);
100 m_finished = finished;
101 }
102
103 inline bool is_blacklisted() const {
104 Mutex::Locker locker(m_lock);
105 return (m_last_r == -EBLACKLISTED);
106 }
107
108 image_replayer::HealthState get_health_state() const;
109
110 void add_peer(const std::string &peer_uuid, librados::IoCtx &remote_io_ctx);
111
112 inline int64_t get_local_pool_id() const {
113 return m_local_pool_id;
114 }
115 inline const std::string& get_global_image_id() const {
116 return m_global_image_id;
117 }
118
119 void start(Context *on_finish = nullptr, bool manual = false);
120 void stop(Context *on_finish = nullptr, bool manual = false,
121 int r = 0, const std::string& desc = "");
122 void restart(Context *on_finish = nullptr);
123 void flush(Context *on_finish = nullptr);
124
125 void resync_image(Context *on_finish=nullptr);
126
127 void print_status(Formatter *f, stringstream *ss);
128
129 virtual void handle_replay_ready();
130 virtual void handle_replay_complete(int r, const std::string &error_desc);
131
132 protected:
133 /**
134 * @verbatim
135 * (error)
136 * <uninitialized> <------------------------------------ FAIL
137 * | ^
138 * v *
139 * <starting> *
140 * | *
141 * v *
142 * WAIT_FOR_DELETION *
143 * | *
144 * v (error) *
145 * PREPARE_LOCAL_IMAGE * * * * * * * * * * * * * * * * * *
146 * | *
147 * v (error) *
148 * PREPARE_REMOTE_IMAGE * * * * * * * * * * * * * * * * * *
149 * | *
150 * v (error) *
151 * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * *
152 * | *
153 * v (error) *
154 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * *
155 * | *
156 * v (error) *
157 * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
158 * |
159 * | /--------------------------------------------\
160 * | | |
161 * v v (asok flush) |
162 * REPLAYING -------------> LOCAL_REPLAY_FLUSH |
163 * | \ | |
164 * | | v |
165 * | | FLUSH_COMMIT_POSITION |
166 * | | | |
167 * | | \--------------------/|
168 * | | |
169 * | | (entries available) |
170 * | \-----------> REPLAY_READY |
171 * | | |
172 * | | (skip if not |
173 * | v needed) (error)
174 * | REPLAY_FLUSH * * * * * * * * *
175 * | | | *
176 * | | (skip if not | *
177 * | v needed) (error) *
178 * | GET_REMOTE_TAG * * * * * * * *
179 * | | | *
180 * | | (skip if not | *
181 * | v needed) (error) *
182 * | ALLOCATE_LOCAL_TAG * * * * * *
183 * | | | *
184 * | v (error) *
185 * | PREPROCESS_ENTRY * * * * * * *
186 * | | | *
187 * | v (error) *
188 * | PROCESS_ENTRY * * * * * * * * *
189 * | | | *
190 * | \---------------------/ *
191 * v *
192 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * *
193 * |
194 * v
195 * JOURNAL_REPLAY_SHUT_DOWN
196 * |
197 * v
198 * LOCAL_IMAGE_CLOSE
199 * |
200 * v
201 * <stopped>
202 *
203 * @endverbatim
204 */
205
206 virtual void on_start_fail(int r, const std::string &desc = "");
207 virtual bool on_start_interrupted();
208
209 virtual void on_stop_journal_replay(int r = 0, const std::string &desc = "");
210
211 virtual void on_flush_local_replay_flush_start(Context *on_flush);
212 virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
213 virtual void on_flush_flush_commit_position_start(Context *on_flush);
214 virtual void on_flush_flush_commit_position_finish(Context *on_flush, int r);
215
216 bool on_replay_interrupted();
217
218 private:
219 typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
220
221 enum State {
222 STATE_UNKNOWN,
223 STATE_STARTING,
224 STATE_REPLAYING,
225 STATE_REPLAY_FLUSHING,
226 STATE_STOPPING,
227 STATE_STOPPED,
228 };
229
230 struct RemoteImage {
231 std::string mirror_uuid;
232 std::string image_id;
233 librados::IoCtx io_ctx;
234
235 RemoteImage() {
236 }
237 RemoteImage(const Peer& peer) : io_ctx(peer.io_ctx) {
238 }
239 };
240
241 typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
242 typedef boost::optional<State> OptionalState;
243 typedef boost::optional<cls::rbd::MirrorImageStatusState>
244 OptionalMirrorImageStatusState;
245
246 struct JournalListener : public librbd::journal::Listener {
247 ImageReplayer *img_replayer;
248
249 JournalListener(ImageReplayer *img_replayer)
250 : img_replayer(img_replayer) {
251 }
252
253 void handle_close() override {
254 img_replayer->on_stop_journal_replay();
255 }
256
257 void handle_promoted() override {
258 img_replayer->on_stop_journal_replay(0, "force promoted");
259 }
260
261 void handle_resync() override {
262 img_replayer->resync_image();
263 }
264 };
265
266 class BootstrapProgressContext : public ProgressContext {
267 public:
268 BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
269 replayer(replayer) {
270 }
271
272 void update_progress(const std::string &description,
273 bool flush = true) override;
274 private:
275 ImageReplayer<ImageCtxT> *replayer;
276 };
277
278 Threads<ImageCtxT> *m_threads;
279 ImageDeleter<ImageCtxT>* m_image_deleter;
280 InstanceWatcher<ImageCtxT> *m_instance_watcher;
281
282 Peers m_peers;
283 RemoteImage m_remote_image;
284
285 RadosRef m_local;
286 std::string m_local_mirror_uuid;
287 int64_t m_local_pool_id;
288 std::string m_local_image_id;
289 std::string m_global_image_id;
290 std::string m_local_image_name;
291 std::string m_name;
292
293 mutable Mutex m_lock;
294 State m_state = STATE_STOPPED;
295 std::string m_state_desc;
296
297 OptionalMirrorImageStatusState m_mirror_image_status_state = boost::none;
298 int m_last_r = 0;
299
300 BootstrapProgressContext m_progress_cxt;
301
302 bool m_finished = false;
303 bool m_delete_requested = false;
304 bool m_resync_requested = false;
305
306 image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
307 image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
308 nullptr;
309 librados::IoCtx m_local_ioctx;
310 ImageCtxT *m_local_image_ctx = nullptr;
311 std::string m_local_image_tag_owner;
312
313 decltype(ImageCtxT::journal) m_local_journal = nullptr;
314 librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
315 Journaler* m_remote_journaler = nullptr;
316 ::journal::ReplayHandler *m_replay_handler = nullptr;
317 librbd::journal::Listener *m_journal_listener;
318
319 Context *m_on_start_finish = nullptr;
320 Context *m_on_stop_finish = nullptr;
321 Context *m_update_status_task = nullptr;
322 int m_update_status_interval = 0;
323 librados::AioCompletion *m_update_status_comp = nullptr;
324 bool m_stop_requested = false;
325 bool m_manual_stop = false;
326
327 AdminSocketHook *m_asok_hook = nullptr;
328
329 image_replayer::BootstrapRequest<ImageCtxT> *m_bootstrap_request = nullptr;
330
331 uint32_t m_in_flight_status_updates = 0;
332 bool m_update_status_requested = false;
333 Context *m_on_update_status_finish = nullptr;
334
335 cls::journal::ClientState m_client_state =
336 cls::journal::CLIENT_STATE_DISCONNECTED;
337 librbd::journal::MirrorPeerClientMeta m_client_meta;
338
339 ReplayEntry m_replay_entry;
340 bool m_replay_tag_valid = false;
341 uint64_t m_replay_tag_tid = 0;
342 cls::journal::Tag m_replay_tag;
343 librbd::journal::TagData m_replay_tag_data;
344 librbd::journal::EventEntry m_event_entry;
345 AsyncOpTracker m_event_replay_tracker;
346 Context *m_delayed_preprocess_task = nullptr;
347
348 struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
349 ImageReplayer *replayer;
350
351 RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { }
352
353 void handle_update(::journal::JournalMetadata *) override;
354 } m_remote_listener;
355
356 struct C_ReplayCommitted : public Context {
357 ImageReplayer *replayer;
358 ReplayEntry replay_entry;
359
360 C_ReplayCommitted(ImageReplayer *replayer,
361 ReplayEntry &&replay_entry)
362 : replayer(replayer), replay_entry(std::move(replay_entry)) {
363 }
364 void finish(int r) override {
365 replayer->handle_process_entry_safe(replay_entry, r);
366 }
367 };
368
369 static std::string to_string(const State state);
370
371 bool is_stopped_() const {
372 return m_state == STATE_STOPPED;
373 }
374 bool is_running_() const {
375 return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
376 }
377 bool is_replaying_() const {
378 return (m_state == STATE_REPLAYING ||
379 m_state == STATE_REPLAY_FLUSHING);
380 }
381
382 bool update_mirror_image_status(bool force, const OptionalState &state);
383 bool start_mirror_image_status_update(bool force, bool restarting);
384 void finish_mirror_image_status_update();
385 void queue_mirror_image_status_update(const OptionalState &state);
386 void send_mirror_status_update(const OptionalState &state);
387 void handle_mirror_status_update(int r);
388 void reschedule_update_status_task(int new_interval = 0);
389
390 void shut_down(int r);
391 void handle_shut_down(int r);
392 void handle_remote_journal_metadata_updated();
393
394 void wait_for_deletion();
395 void handle_wait_for_deletion(int r);
396
397 void prepare_local_image();
398 void handle_prepare_local_image(int r);
399
400 void prepare_remote_image();
401 void handle_prepare_remote_image(int r);
402
403 void bootstrap();
404 void handle_bootstrap(int r);
405
406 void init_remote_journaler();
407 void handle_init_remote_journaler(int r);
408
409 void start_replay();
410 void handle_start_replay(int r);
411
412 void replay_flush();
413 void handle_replay_flush(int r);
414
415 void get_remote_tag();
416 void handle_get_remote_tag(int r);
417
418 void allocate_local_tag();
419 void handle_allocate_local_tag(int r);
420
421 void preprocess_entry();
422 void handle_preprocess_entry_ready(int r);
423 void handle_preprocess_entry_safe(int r);
424
425 void process_entry();
426 void handle_process_entry_ready(int r);
427 void handle_process_entry_safe(const ReplayEntry& replay_entry, int r);
428
429 void register_admin_socket_hook();
430 void unregister_admin_socket_hook();
431 void reregister_admin_socket_hook();
432 };
433
434 } // namespace mirror
435 } // namespace rbd
436
437 extern template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;
438
439 #endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H