1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef RBD_MIRROR_IMAGE_REPLAYER_SNAPSHOT_REPLAYER_H
5 #define RBD_MIRROR_IMAGE_REPLAYER_SNAPSHOT_REPLAYER_H
7 #include "tools/rbd_mirror/image_replayer/Replayer.h"
8 #include "common/ceph_mutex.h"
9 #include "common/AsyncOpTracker.h"
10 #include "cls/rbd/cls_rbd_types.h"
11 #include "librbd/mirror/snapshot/Types.h"
12 #include "tools/rbd_mirror/image_replayer/TimeRollingMean.h"
13 #include <boost/accumulators/accumulators.hpp>
14 #include <boost/accumulators/statistics/stats.hpp>
15 #include <boost/accumulators/statistics/rolling_mean.hpp>
17 #include <type_traits>
22 namespace snapshot
{ template <typename I
> class Replay
; }
29 template <typename
> struct InstanceWatcher
;
31 template <typename
> struct Threads
;
33 namespace image_replayer
{
35 struct ReplayerListener
;
39 template <typename
> class EventPreprocessor
;
40 template <typename
> class ReplayStatusFormatter
;
41 template <typename
> class StateBuilder
;
43 template <typename ImageCtxT
>
44 class Replayer
: public image_replayer::Replayer
{
46 static Replayer
* create(
47 Threads
<ImageCtxT
>* threads
,
48 InstanceWatcher
<ImageCtxT
>* instance_watcher
,
49 const std::string
& local_mirror_uuid
,
50 PoolMetaCache
* pool_meta_cache
,
51 StateBuilder
<ImageCtxT
>* state_builder
,
52 ReplayerListener
* replayer_listener
) {
53 return new Replayer(threads
, instance_watcher
, local_mirror_uuid
,
54 pool_meta_cache
, state_builder
, replayer_listener
);
58 Threads
<ImageCtxT
>* threads
,
59 InstanceWatcher
<ImageCtxT
>* instance_watcher
,
60 const std::string
& local_mirror_uuid
,
61 PoolMetaCache
* pool_meta_cache
,
62 StateBuilder
<ImageCtxT
>* state_builder
,
63 ReplayerListener
* replayer_listener
);
66 void destroy() override
{
70 void init(Context
* on_finish
) override
;
71 void shut_down(Context
* on_finish
) override
;
73 void flush(Context
* on_finish
) override
;
75 bool get_replay_status(std::string
* description
, Context
* on_finish
) override
;
77 bool is_replaying() const override
{
78 std::unique_lock locker
{m_lock
};
79 return (m_state
== STATE_REPLAYING
|| m_state
== STATE_IDLE
);
82 bool is_resync_requested() const override
{
83 std::unique_lock locker
{m_lock
};
84 return m_resync_requested
;
87 int get_error_code() const override
{
88 std::unique_lock
locker(m_lock
);
92 std::string
get_error_description() const override
{
93 std::unique_lock
locker(m_lock
);
94 return m_error_description
;
104 * REGISTER_LOCAL_UPDATE_WATCHER
107 * REGISTER_REMOTE_UPDATE_WATCHER
110 * LOAD_LOCAL_IMAGE_META <----------------------------\
112 * v (skip if not needed) |
113 * REFRESH_LOCAL_IMAGE |
115 * v (skip if not needed) |
116 * REFRESH_REMOTE_IMAGE |
118 * | (unused non-primary snapshot) |
119 * |\--------------> PRUNE_NON_PRIMARY_SNAPSHOT---/|
121 * | (interrupted sync) |
122 * |\--------------> GET_LOCAL_IMAGE_STATE ------\ |
124 * | (new snapshot) | |
125 * |\--------------> COPY_SNAPSHOTS | |
128 * | GET_REMOTE_IMAGE_STATE | |
131 * | CREATE_NON_PRIMARY_SNAPSHOT | |
133 * | v (skip if not needed)| |
134 * | UPDATE_MIRROR_IMAGE_STATE | |
136 * | |/--------------------/ |
145 * | APPLY_IMAGE_STATE |
148 * | UPDATE_NON_PRIMARY_SNAPSHOT |
151 * | NOTIFY_IMAGE_UPDATE |
153 * | (interrupted unlink) v |
154 * |\--------------> UNLINK_PEER |
157 * | NOTIFY_LISTENER |
159 * | \----------------------/|
161 * | (remote demoted) |
162 * \---------------> NOTIFY_LISTENER |
164 * |/--------------------/ |
166 * | (update notification) |
167 * <idle> --------------------------------------------/
173 * UNREGISTER_REMOTE_UPDATE_WATCHER
176 * UNREGISTER_LOCAL_UPDATE_WATCHER
179 * WAIT_FOR_IN_FLIGHT_OPS
194 struct C_UpdateWatchCtx
;
195 struct DeepCopyHandler
;
197 Threads
<ImageCtxT
>* m_threads
;
198 InstanceWatcher
<ImageCtxT
>* m_instance_watcher
;
199 std::string m_local_mirror_uuid
;
200 PoolMetaCache
* m_pool_meta_cache
;
201 StateBuilder
<ImageCtxT
>* m_state_builder
;
202 ReplayerListener
* m_replayer_listener
;
204 mutable ceph::mutex m_lock
;
206 State m_state
= STATE_INIT
;
208 Context
* m_on_init_shutdown
= nullptr;
210 bool m_resync_requested
= false;
211 int m_error_code
= 0;
212 std::string m_error_description
;
214 C_UpdateWatchCtx
* m_update_watch_ctx
= nullptr;
215 uint64_t m_local_update_watcher_handle
= 0;
216 uint64_t m_remote_update_watcher_handle
= 0;
217 bool m_image_updated
= false;
219 AsyncOpTracker m_in_flight_op_tracker
;
221 uint64_t m_local_snap_id_start
= 0;
222 uint64_t m_local_snap_id_end
= CEPH_NOSNAP
;
223 cls::rbd::MirrorSnapshotNamespace m_local_mirror_snap_ns
;
224 uint64_t m_local_object_count
= 0;
226 std::string m_remote_mirror_peer_uuid
;
227 uint64_t m_remote_snap_id_start
= 0;
228 uint64_t m_remote_snap_id_end
= CEPH_NOSNAP
;
229 cls::rbd::MirrorSnapshotNamespace m_remote_mirror_snap_ns
;
231 librbd::mirror::snapshot::ImageState m_image_state
;
232 DeepCopyHandler
* m_deep_copy_handler
= nullptr;
234 TimeRollingMean m_bytes_per_second
;
236 uint64_t m_snapshot_bytes
= 0;
237 boost::accumulators::accumulator_set
<
238 uint64_t, boost::accumulators::stats
<
239 boost::accumulators::tag::rolling_mean
>> m_bytes_per_snapshot
{
240 boost::accumulators::tag::rolling_window::window_size
= 2};
242 uint32_t m_pending_snapshots
= 0;
244 bool m_remote_image_updated
= false;
245 bool m_updating_sync_point
= false;
246 bool m_sync_in_progress
= false;
248 void load_local_image_meta();
249 void handle_load_local_image_meta(int r
);
251 void refresh_local_image();
252 void handle_refresh_local_image(int r
);
254 void refresh_remote_image();
255 void handle_refresh_remote_image(int r
);
257 void scan_local_mirror_snapshots(std::unique_lock
<ceph::mutex
>* locker
);
258 void scan_remote_mirror_snapshots(std::unique_lock
<ceph::mutex
>* locker
);
260 void prune_non_primary_snapshot(uint64_t snap_id
);
261 void handle_prune_non_primary_snapshot(int r
);
263 void copy_snapshots();
264 void handle_copy_snapshots(int r
);
266 void get_remote_image_state();
267 void handle_get_remote_image_state(int r
);
269 void get_local_image_state();
270 void handle_get_local_image_state(int r
);
272 void create_non_primary_snapshot();
273 void handle_create_non_primary_snapshot(int r
);
275 void update_mirror_image_state();
276 void handle_update_mirror_image_state(int r
);
279 void handle_request_sync(int r
);
282 void handle_copy_image(int r
);
283 void handle_copy_image_progress(uint64_t object_number
,
284 uint64_t object_count
);
285 void handle_copy_image_read(uint64_t bytes_read
);
287 void apply_image_state();
288 void handle_apply_image_state(int r
);
290 void update_non_primary_snapshot(bool complete
);
291 void handle_update_non_primary_snapshot(bool complete
, int r
);
293 void notify_image_update();
294 void handle_notify_image_update(int r
);
296 void unlink_peer(uint64_t remote_snap_id
);
297 void handle_unlink_peer(int r
);
301 void register_local_update_watcher();
302 void handle_register_local_update_watcher(int r
);
304 void register_remote_update_watcher();
305 void handle_register_remote_update_watcher(int r
);
307 void unregister_remote_update_watcher();
308 void handle_unregister_remote_update_watcher(int r
);
310 void unregister_local_update_watcher();
311 void handle_unregister_local_update_watcher(int r
);
313 void wait_for_in_flight_ops();
314 void handle_wait_for_in_flight_ops(int r
);
316 void handle_image_update_notify();
318 void handle_replay_complete(int r
, const std::string
& description
);
319 void handle_replay_complete(std::unique_lock
<ceph::mutex
>* locker
,
320 int r
, const std::string
& description
);
321 void notify_status_updated();
323 bool is_replay_interrupted();
324 bool is_replay_interrupted(std::unique_lock
<ceph::mutex
>* lock
);
328 } // namespace snapshot
329 } // namespace image_replayer
330 } // namespace mirror
333 extern template class rbd::mirror::image_replayer::snapshot::Replayer
<librbd::ImageCtx
>;
335 #endif // RBD_MIRROR_IMAGE_REPLAYER_SNAPSHOT_REPLAYER_H