]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/ImageReplayer.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / tools / rbd_mirror / ImageReplayer.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#ifndef CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
5#define CEPH_RBD_MIRROR_IMAGE_REPLAYER_H
6
7#include "common/AsyncOpTracker.h"
8#include "common/Mutex.h"
9#include "common/WorkQueue.h"
10#include "include/rados/librados.hpp"
11#include "cls/journal/cls_journal_types.h"
12#include "cls/rbd/cls_rbd_types.h"
13#include "journal/JournalMetadataListener.h"
14#include "journal/ReplayEntry.h"
15#include "librbd/ImageCtx.h"
16#include "librbd/journal/Types.h"
17#include "librbd/journal/TypeTraits.h"
18#include "ImageDeleter.h"
19#include "ProgressContext.h"
20#include "types.h"
21
22#include <boost/noncopyable.hpp>
23#include <boost/optional.hpp>
24
25#include <set>
26#include <map>
27#include <atomic>
28#include <string>
29#include <vector>
30
31class AdminSocketHook;
32
33namespace journal {
34
35class Journaler;
36class ReplayHandler;
37
38}
39
40namespace librbd {
41
42class ImageCtx;
43namespace journal { template <typename> class Replay; }
44
45}
46
47namespace rbd {
48namespace mirror {
49
50template <typename> struct Threads;
51
52namespace image_replayer { template <typename> class BootstrapRequest; }
53namespace image_replayer { template <typename> class EventPreprocessor; }
54namespace image_replayer { template <typename> class ReplayStatusFormatter; }
55
56/**
57 * Replays changes from a remote cluster for a single image.
58 */
59template <typename ImageCtxT = librbd::ImageCtx>
60class ImageReplayer {
61public:
62 typedef typename librbd::journal::TypeTraits<ImageCtxT>::ReplayEntry ReplayEntry;
63
64 enum State {
65 STATE_UNKNOWN,
66 STATE_STARTING,
67 STATE_REPLAYING,
68 STATE_REPLAY_FLUSHING,
69 STATE_STOPPING,
70 STATE_STOPPED,
71 };
72
73 static ImageReplayer *create(
74 Threads<librbd::ImageCtx> *threads,
75 std::shared_ptr<ImageDeleter> image_deleter,
76 ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
77 RadosRef local, const std::string &local_mirror_uuid, int64_t local_pool_id,
78 const std::string &global_image_id) {
79 return new ImageReplayer(threads, image_deleter, image_sync_throttler,
80 local, local_mirror_uuid, local_pool_id,
81 global_image_id);
82 }
83 void destroy() {
84 delete this;
85 }
86
87 ImageReplayer(Threads<librbd::ImageCtx> *threads,
88 std::shared_ptr<ImageDeleter> image_deleter,
89 ImageSyncThrottlerRef<ImageCtxT> image_sync_throttler,
90 RadosRef local, const std::string &local_mirror_uuid,
91 int64_t local_pool_id, const std::string &global_image_id);
92 virtual ~ImageReplayer();
93 ImageReplayer(const ImageReplayer&) = delete;
94 ImageReplayer& operator=(const ImageReplayer&) = delete;
95
96 State get_state() { Mutex::Locker l(m_lock); return get_state_(); }
97 bool is_stopped() { Mutex::Locker l(m_lock); return is_stopped_(); }
98 bool is_running() { Mutex::Locker l(m_lock); return is_running_(); }
99 bool is_replaying() { Mutex::Locker l(m_lock); return is_replaying_(); }
100
101 std::string get_name() { Mutex::Locker l(m_lock); return m_name; };
102 void set_state_description(int r, const std::string &desc);
103
104 inline bool is_blacklisted() const {
105 Mutex::Locker locker(m_lock);
106 return (m_last_r == -EBLACKLISTED);
107 }
108
109 void add_remote_image(const std::string &remote_mirror_uuid,
110 const std::string &remote_image_id,
111 librados::IoCtx &remote_io_ctx);
112 void remove_remote_image(const std::string &remote_mirror_uuid,
113 const std::string &remote_image_id,
114 bool schedule_delete);
115 bool remote_images_empty() const;
116
117 inline int64_t get_local_pool_id() const {
118 return m_local_pool_id;
119 }
120 inline const std::string& get_global_image_id() const {
121 return m_global_image_id;
122 }
123
124 void start(Context *on_finish = nullptr, bool manual = false);
125 void stop(Context *on_finish = nullptr, bool manual = false,
126 int r = 0, const std::string& desc = "");
127 void restart(Context *on_finish = nullptr);
128 void flush(Context *on_finish = nullptr);
129
130 void resync_image(Context *on_finish=nullptr);
131
132 void print_status(Formatter *f, stringstream *ss);
133
134 virtual void handle_replay_ready();
135 virtual void handle_replay_complete(int r, const std::string &error_desc);
136
137protected:
138 /**
139 * @verbatim
140 * (error)
141 * <uninitialized> <------------------------------------ FAIL
142 * | ^
143 * v *
144 * <starting> *
145 * | *
146 * v (error) *
147 * PREPARE_LOCAL_IMAGE * * * * * * * * * * * * * * * * * *
148 * | *
149 * v (error) *
150 * BOOTSTRAP_IMAGE * * * * * * * * * * * * * * * * * * * *
151 * | *
152 * v (error) *
153 * INIT_REMOTE_JOURNALER * * * * * * * * * * * * * * * * *
154 * | *
155 * v (error) *
156 * START_REPLAY * * * * * * * * * * * * * * * * * * * * * *
157 * |
158 * | /--------------------------------------------\
159 * | | |
160 * v v (asok flush) |
161 * REPLAYING -------------> LOCAL_REPLAY_FLUSH |
162 * | \ | |
163 * | | v |
164 * | | FLUSH_COMMIT_POSITION |
165 * | | | |
166 * | | \--------------------/|
167 * | | |
168 * | | (entries available) |
169 * | \-----------> REPLAY_READY |
170 * | | |
171 * | | (skip if not |
172 * | v needed) (error)
173 * | REPLAY_FLUSH * * * * * * * * *
174 * | | | *
175 * | | (skip if not | *
176 * | v needed) (error) *
177 * | GET_REMOTE_TAG * * * * * * * *
178 * | | | *
179 * | | (skip if not | *
180 * | v needed) (error) *
181 * | ALLOCATE_LOCAL_TAG * * * * * *
182 * | | | *
183 * | v (error) *
184 * | PREPROCESS_ENTRY * * * * * * *
185 * | | | *
186 * | v (error) *
187 * | PROCESS_ENTRY * * * * * * * * *
188 * | | | *
189 * | \---------------------/ *
190 * v *
191 * REPLAY_COMPLETE < * * * * * * * * * * * * * * * * * * *
192 * |
193 * v
194 * JOURNAL_REPLAY_SHUT_DOWN
195 * |
196 * v
197 * LOCAL_IMAGE_CLOSE
198 * |
199 * v
200 * <stopped>
201 *
202 * @endverbatim
203 */
204
205 virtual void on_start_fail(int r, const std::string &desc = "");
206 virtual bool on_start_interrupted();
207
208 virtual void on_stop_journal_replay(int r = 0, const std::string &desc = "");
209
210 virtual void on_flush_local_replay_flush_start(Context *on_flush);
211 virtual void on_flush_local_replay_flush_finish(Context *on_flush, int r);
212 virtual void on_flush_flush_commit_position_start(Context *on_flush);
213 virtual void on_flush_flush_commit_position_finish(Context *on_flush, int r);
214
215 bool on_replay_interrupted();
216
217private:
218 struct RemoteImage {
219 std::string mirror_uuid;
220 std::string image_id;
221 librados::IoCtx io_ctx;
222
223 RemoteImage() {
224 }
225 RemoteImage(const std::string &mirror_uuid,
226 const std::string &image_id)
227 : mirror_uuid(mirror_uuid), image_id(image_id) {
228 }
229 RemoteImage(const std::string &mirror_uuid,
230 const std::string &image_id,
231 librados::IoCtx &io_ctx)
232 : mirror_uuid(mirror_uuid), image_id(image_id), io_ctx(io_ctx) {
233 }
234
235 inline bool operator<(const RemoteImage &rhs) const {
236 if (mirror_uuid != rhs.mirror_uuid) {
237 return mirror_uuid < rhs.mirror_uuid;
238 } else {
239 return image_id < rhs.image_id;
240 }
241 }
242 inline bool operator==(const RemoteImage &rhs) const {
243 return (mirror_uuid == rhs.mirror_uuid && image_id == rhs.image_id);
244 }
245 };
246
247 typedef std::set<RemoteImage> RemoteImages;
248
249 typedef typename librbd::journal::TypeTraits<ImageCtxT>::Journaler Journaler;
250 typedef boost::optional<State> OptionalState;
251
252 struct JournalListener : public librbd::journal::Listener {
253 ImageReplayer *img_replayer;
254
255 JournalListener(ImageReplayer *img_replayer)
256 : img_replayer(img_replayer) {
257 }
258
259 void handle_close() override {
260 img_replayer->on_stop_journal_replay();
261 }
262
263 void handle_promoted() override {
264 img_replayer->on_stop_journal_replay(0, "force promoted");
265 }
266
267 void handle_resync() override {
268 img_replayer->resync_image();
269 }
270 };
271
272 class BootstrapProgressContext : public ProgressContext {
273 public:
274 BootstrapProgressContext(ImageReplayer<ImageCtxT> *replayer) :
275 replayer(replayer) {
276 }
277
278 void update_progress(const std::string &description,
279 bool flush = true) override;
280 private:
281 ImageReplayer<ImageCtxT> *replayer;
282 };
283
284 Threads<librbd::ImageCtx> *m_threads;
285 std::shared_ptr<ImageDeleter> m_image_deleter;
286 ImageSyncThrottlerRef<ImageCtxT> m_image_sync_throttler;
287
288 RemoteImages m_remote_images;
289 RemoteImage m_remote_image;
290
291 RadosRef m_local;
292 std::string m_local_mirror_uuid;
293 int64_t m_local_pool_id;
294 std::string m_local_image_id;
295 std::string m_global_image_id;
296 std::string m_name;
297 mutable Mutex m_lock;
298 State m_state = STATE_STOPPED;
299 int m_last_r = 0;
300 std::string m_state_desc;
301 BootstrapProgressContext m_progress_cxt;
302 bool m_do_resync;
303 image_replayer::EventPreprocessor<ImageCtxT> *m_event_preprocessor = nullptr;
304 image_replayer::ReplayStatusFormatter<ImageCtxT> *m_replay_status_formatter =
305 nullptr;
306 librados::IoCtx m_local_ioctx;
307 ImageCtxT *m_local_image_ctx = nullptr;
308 std::string m_local_image_tag_owner;
309
310 decltype(ImageCtxT::journal) m_local_journal = nullptr;
311 librbd::journal::Replay<ImageCtxT> *m_local_replay = nullptr;
312 Journaler* m_remote_journaler = nullptr;
313 ::journal::ReplayHandler *m_replay_handler = nullptr;
314 librbd::journal::Listener *m_journal_listener;
315 bool m_stopping_for_resync = false;
316
317 Context *m_on_start_finish = nullptr;
318 Context *m_on_stop_finish = nullptr;
319 Context *m_update_status_task = nullptr;
320 int m_update_status_interval = 0;
321 librados::AioCompletion *m_update_status_comp = nullptr;
322 bool m_stop_requested = false;
323 bool m_manual_stop = false;
324
325 AdminSocketHook *m_asok_hook = nullptr;
326
327 image_replayer::BootstrapRequest<ImageCtxT> *m_bootstrap_request = nullptr;
328
329 uint32_t m_in_flight_status_updates = 0;
330 bool m_update_status_requested = false;
331 Context *m_on_update_status_finish = nullptr;
332
333 librbd::journal::MirrorPeerClientMeta m_client_meta;
334
335 ReplayEntry m_replay_entry;
336 bool m_replay_tag_valid = false;
337 uint64_t m_replay_tag_tid = 0;
338 cls::journal::Tag m_replay_tag;
339 librbd::journal::TagData m_replay_tag_data;
340 librbd::journal::EventEntry m_event_entry;
341 AsyncOpTracker m_event_replay_tracker;
342 Context *m_delayed_preprocess_task = nullptr;
343
344 struct RemoteJournalerListener : public ::journal::JournalMetadataListener {
345 ImageReplayer *replayer;
346
347 RemoteJournalerListener(ImageReplayer *replayer) : replayer(replayer) { }
348
349 void handle_update(::journal::JournalMetadata *) override;
350 } m_remote_listener;
351
352 struct C_ReplayCommitted : public Context {
353 ImageReplayer *replayer;
354 ReplayEntry replay_entry;
355
356 C_ReplayCommitted(ImageReplayer *replayer,
357 ReplayEntry &&replay_entry)
358 : replayer(replayer), replay_entry(std::move(replay_entry)) {
359 }
360 void finish(int r) override {
361 replayer->handle_process_entry_safe(replay_entry, r);
362 }
363 };
364
365 static std::string to_string(const State state);
366
367 State get_state_() const {
368 return m_state;
369 }
370 bool is_stopped_() const {
371 return m_state == STATE_STOPPED;
372 }
373 bool is_running_() const {
374 return !is_stopped_() && m_state != STATE_STOPPING && !m_stop_requested;
375 }
376 bool is_replaying_() const {
377 return (m_state == STATE_REPLAYING ||
378 m_state == STATE_REPLAY_FLUSHING);
379 }
380
381 bool update_mirror_image_status(bool force, const OptionalState &state);
382 bool start_mirror_image_status_update(bool force, bool restarting);
383 void finish_mirror_image_status_update();
384 void queue_mirror_image_status_update(const OptionalState &state);
385 void send_mirror_status_update(const OptionalState &state);
386 void handle_mirror_status_update(int r);
387 void reschedule_update_status_task(int new_interval = 0);
388
389 void shut_down(int r);
390 void handle_shut_down(int r);
391 void handle_remote_journal_metadata_updated();
392
393 void prepare_local_image();
394 void handle_prepare_local_image(int r);
395
396 void bootstrap();
397 void handle_bootstrap(int r);
398
399 void init_remote_journaler();
400 void handle_init_remote_journaler(int r);
401
402 void start_replay();
403 void handle_start_replay(int r);
404
405 void replay_flush();
406 void handle_replay_flush(int r);
407
408 void get_remote_tag();
409 void handle_get_remote_tag(int r);
410
411 void allocate_local_tag();
412 void handle_allocate_local_tag(int r);
413
414 void preprocess_entry();
415 void handle_preprocess_entry_ready(int r);
416 void handle_preprocess_entry_safe(int r);
417
418 void process_entry();
419 void handle_process_entry_ready(int r);
420 void handle_process_entry_safe(const ReplayEntry& replay_entry, int r);
421
422};
423
424} // namespace mirror
425} // namespace rbd
426
427extern template class rbd::mirror::ImageReplayer<librbd::ImageCtx>;
428
429#endif // CEPH_RBD_MIRROR_IMAGE_REPLAYER_H