1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RBD_MIRROR_POOL_REPLAYER_H
5 #define CEPH_RBD_MIRROR_POOL_REPLAYER_H
7 #include "common/AsyncOpTracker.h"
8 #include "common/Cond.h"
9 #include "common/Mutex.h"
10 #include "common/WorkQueue.h"
11 #include "include/rados/librados.hpp"
13 #include "ClusterWatcher.h"
14 #include "LeaderWatcher.h"
15 #include "PoolWatcher.h"
16 #include "ImageDeleter.h"
18 #include "tools/rbd_mirror/service_daemon/Types.h"
26 class AdminSocketHook
;
28 namespace librbd
{ class ImageCtx
; }
33 template <typename
> class InstanceReplayer
;
34 template <typename
> class InstanceWatcher
;
35 template <typename
> class ServiceDaemon
;
36 template <typename
> struct Threads
;
39 * Controls mirroring for a single remote cluster.
43 PoolReplayer(Threads
<librbd::ImageCtx
> *threads
,
44 ServiceDaemon
<librbd::ImageCtx
>* service_daemon
,
45 ImageDeleter
<>* image_deleter
,
46 int64_t local_pool_id
, const peer_t
&peer
,
47 const std::vector
<const char*> &args
);
49 PoolReplayer(const PoolReplayer
&) = delete;
50 PoolReplayer
& operator=(const PoolReplayer
&) = delete;
52 bool is_blacklisted() const;
53 bool is_leader() const;
54 bool is_running() const;
61 void print_status(Formatter
*f
, stringstream
*ss
);
63 void stop(bool manual
);
66 void release_leader();
69 struct PoolWatcherListener
: public PoolWatcher
<>::Listener
{
70 PoolReplayer
*pool_replayer
;
73 PoolWatcherListener(PoolReplayer
*pool_replayer
, bool local
)
74 : pool_replayer(pool_replayer
), local(local
) {
77 void handle_update(const std::string
&mirror_uuid
,
78 ImageIds
&&added_image_ids
,
79 ImageIds
&&removed_image_ids
) override
{
80 pool_replayer
->handle_update((local
? "" : mirror_uuid
),
81 std::move(added_image_ids
),
82 std::move(removed_image_ids
));
86 void handle_update(const std::string
&mirror_uuid
,
87 ImageIds
&&added_image_ids
,
88 ImageIds
&&removed_image_ids
);
90 int init_rados(const std::string
&cluster_name
,
91 const std::string
&client_name
,
92 const std::string
&description
, RadosRef
*rados_ref
);
94 void handle_post_acquire_leader(Context
*on_finish
);
95 void handle_pre_release_leader(Context
*on_finish
);
97 void init_local_pool_watcher(Context
*on_finish
);
98 void handle_init_local_pool_watcher(int r
, Context
*on_finish
);
100 void init_remote_pool_watcher(Context
*on_finish
);
102 void shut_down_pool_watchers(Context
*on_finish
);
103 void handle_shut_down_pool_watchers(int r
, Context
*on_finish
);
105 void wait_for_update_ops(Context
*on_finish
);
106 void handle_wait_for_update_ops(int r
, Context
*on_finish
);
108 void handle_update_leader(const std::string
&leader_instance_id
);
110 Threads
<librbd::ImageCtx
> *m_threads
;
111 ServiceDaemon
<librbd::ImageCtx
>* m_service_daemon
;
112 ImageDeleter
<>* m_image_deleter
;
113 int64_t m_local_pool_id
= -1;
115 std::vector
<const char*> m_args
;
117 mutable Mutex m_lock
;
119 std::atomic
<bool> m_stopping
= { false };
120 bool m_manual_stop
= false;
121 bool m_blacklisted
= false;
123 RadosRef m_local_rados
;
124 RadosRef m_remote_rados
;
126 librados::IoCtx m_local_io_ctx
;
127 librados::IoCtx m_remote_io_ctx
;
129 PoolWatcherListener m_local_pool_watcher_listener
;
130 std::unique_ptr
<PoolWatcher
<> > m_local_pool_watcher
;
132 PoolWatcherListener m_remote_pool_watcher_listener
;
133 std::unique_ptr
<PoolWatcher
<> > m_remote_pool_watcher
;
135 std::unique_ptr
<InstanceReplayer
<librbd::ImageCtx
>> m_instance_replayer
;
137 std::string m_asok_hook_name
;
138 AdminSocketHook
*m_asok_hook
= nullptr;
140 std::map
<std::string
, ImageIds
> m_initial_mirror_image_ids
;
142 service_daemon::CalloutId m_callout_id
= service_daemon::CALLOUT_ID_NONE
;
144 class PoolReplayerThread
: public Thread
{
145 PoolReplayer
*m_pool_replayer
;
147 PoolReplayerThread(PoolReplayer
*pool_replayer
)
148 : m_pool_replayer(pool_replayer
) {
150 void *entry() override
{
151 m_pool_replayer
->run();
154 } m_pool_replayer_thread
;
156 class LeaderListener
: public LeaderWatcher
<>::Listener
{
158 LeaderListener(PoolReplayer
*pool_replayer
)
159 : m_pool_replayer(pool_replayer
) {
163 void post_acquire_handler(Context
*on_finish
) override
{
164 m_pool_replayer
->handle_post_acquire_leader(on_finish
);
167 void pre_release_handler(Context
*on_finish
) override
{
168 m_pool_replayer
->handle_pre_release_leader(on_finish
);
171 void update_leader_handler(
172 const std::string
&leader_instance_id
) override
{
173 m_pool_replayer
->handle_update_leader(leader_instance_id
);
177 PoolReplayer
*m_pool_replayer
;
180 std::unique_ptr
<LeaderWatcher
<> > m_leader_watcher
;
181 std::unique_ptr
<InstanceWatcher
<librbd::ImageCtx
> > m_instance_watcher
;
182 AsyncOpTracker m_update_op_tracker
;
185 } // namespace mirror
188 #endif // CEPH_RBD_MIRROR_POOL_REPLAYER_H