1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RBD_MIRROR_POOL_REPLAYER_H
5 #define CEPH_RBD_MIRROR_POOL_REPLAYER_H
7 #include "common/Cond.h"
8 #include "common/WorkQueue.h"
9 #include "common/ceph_mutex.h"
10 #include "include/rados/librados.hpp"
11 #include "librbd/Utils.h"
13 #include "tools/rbd_mirror/LeaderWatcher.h"
14 #include "tools/rbd_mirror/NamespaceReplayer.h"
15 #include "tools/rbd_mirror/Throttler.h"
16 #include "tools/rbd_mirror/Types.h"
17 #include "tools/rbd_mirror/leader_watcher/Types.h"
18 #include "tools/rbd_mirror/service_daemon/Types.h"
25 class AdminSocketHook
;
27 namespace journal
{ struct CacheManagerHandler
; }
29 namespace librbd
{ class ImageCtx
; }
34 template <typename
> class RemotePoolPoller
;
35 namespace remote_pool_poller
{ struct Listener
; }
38 template <typename
> class ServiceDaemon
;
39 template <typename
> struct Threads
;
43 * Controls mirroring for a single remote cluster.
45 template <typename ImageCtxT
= librbd::ImageCtx
>
48 PoolReplayer(Threads
<ImageCtxT
> *threads
,
49 ServiceDaemon
<ImageCtxT
> *service_daemon
,
50 journal::CacheManagerHandler
*cache_manager_handler
,
51 PoolMetaCache
* pool_meta_cache
,
52 int64_t local_pool_id
, const PeerSpec
&peer
,
53 const std::vector
<const char*> &args
);
55 PoolReplayer(const PoolReplayer
&) = delete;
56 PoolReplayer
& operator=(const PoolReplayer
&) = delete;
58 bool is_blacklisted() const;
59 bool is_leader() const;
60 bool is_running() const;
62 void init(const std::string
& site_name
);
67 void print_status(Formatter
*f
);
69 void stop(bool manual
);
72 void release_leader();
85 * <follower> <---------------------\
87 * . (leader acquired) |
89 * NOTIFY_NAMESPACE_WATCHERS NOTIFY_NAMESPACE_WATCHERS
94 * . (leader lost / shut down) .
95 * . . . . . . . . . . . . . . . .
100 struct RemotePoolPollerListener
;
102 int init_rados(const std::string
&cluster_name
,
103 const std::string
&client_name
,
104 const std::string
&mon_host
,
105 const std::string
&key
,
106 const std::string
&description
, RadosRef
*rados_ref
,
107 bool strip_cluster_overrides
);
109 void update_namespace_replayers();
110 int list_mirroring_namespaces(std::set
<std::string
> *namespaces
);
112 void namespace_replayer_acquire_leader(const std::string
&name
,
115 void handle_post_acquire_leader(Context
*on_finish
);
116 void handle_pre_release_leader(Context
*on_finish
);
118 void handle_update_leader(const std::string
&leader_instance_id
);
120 void handle_instances_added(const std::vector
<std::string
> &instance_ids
);
121 void handle_instances_removed(const std::vector
<std::string
> &instance_ids
);
123 // sync version, executed in the caller thread
124 template <typename L
>
125 void with_namespace_replayers(L
&&callback
) {
126 std::lock_guard locker
{m_lock
};
128 if (m_namespace_replayers_locked
) {
129 ceph_assert(m_on_namespace_replayers_unlocked
== nullptr);
131 m_on_namespace_replayers_unlocked
= &cond
;
136 m_namespace_replayers_locked
= true;
139 ceph_assert(m_namespace_replayers_locked
);
140 callback(); // may temporary release the lock
141 ceph_assert(m_namespace_replayers_locked
);
143 if (m_on_namespace_replayers_unlocked
== nullptr) {
144 m_namespace_replayers_locked
= false;
148 m_threads
->work_queue
->queue(m_on_namespace_replayers_unlocked
);
149 m_on_namespace_replayers_unlocked
= nullptr;
153 template <typename L
>
154 void with_namespace_replayers(L
&&callback
, Context
*on_finish
) {
155 std::lock_guard locker
{m_lock
};
157 on_finish
= librbd::util::create_async_context_callback(
158 m_threads
->work_queue
, new LambdaContext(
159 [this, on_finish
](int r
) {
161 std::lock_guard locker
{m_lock
};
162 ceph_assert(m_namespace_replayers_locked
);
164 m_namespace_replayers_locked
= false;
166 if (m_on_namespace_replayers_unlocked
!= nullptr) {
167 m_namespace_replayers_locked
= true;
168 m_threads
->work_queue
->queue(m_on_namespace_replayers_unlocked
);
169 m_on_namespace_replayers_unlocked
= nullptr;
172 on_finish
->complete(r
);
175 auto on_lock
= new LambdaContext(
176 [this, callback
, on_finish
](int) {
177 std::lock_guard locker
{m_lock
};
178 ceph_assert(m_namespace_replayers_locked
);
183 if (m_namespace_replayers_locked
) {
184 ceph_assert(m_on_namespace_replayers_unlocked
== nullptr);
185 m_on_namespace_replayers_unlocked
= on_lock
;
189 m_namespace_replayers_locked
= true;
190 m_threads
->work_queue
->queue(on_lock
);
193 void handle_remote_pool_meta_updated(const RemotePoolMeta
& remote_pool_meta
);
195 Threads
<ImageCtxT
> *m_threads
;
196 ServiceDaemon
<ImageCtxT
> *m_service_daemon
;
197 journal::CacheManagerHandler
*m_cache_manager_handler
;
198 PoolMetaCache
* m_pool_meta_cache
;
199 int64_t m_local_pool_id
= -1;
201 std::vector
<const char*> m_args
;
203 mutable ceph::mutex m_lock
;
204 ceph::condition_variable m_cond
;
205 std::string m_site_name
;
206 bool m_stopping
= false;
207 bool m_manual_stop
= false;
208 bool m_blacklisted
= false;
210 RadosRef m_local_rados
;
211 RadosRef m_remote_rados
;
213 librados::IoCtx m_local_io_ctx
;
214 librados::IoCtx m_remote_io_ctx
;
216 std::string m_local_mirror_uuid
;
218 RemotePoolMeta m_remote_pool_meta
;
219 std::unique_ptr
<remote_pool_poller::Listener
> m_remote_pool_poller_listener
;
220 std::unique_ptr
<RemotePoolPoller
<ImageCtxT
>> m_remote_pool_poller
;
222 std::unique_ptr
<NamespaceReplayer
<ImageCtxT
>> m_default_namespace_replayer
;
223 std::map
<std::string
, NamespaceReplayer
<ImageCtxT
> *> m_namespace_replayers
;
225 std::string m_asok_hook_name
;
226 AdminSocketHook
*m_asok_hook
= nullptr;
228 service_daemon::CalloutId m_callout_id
= service_daemon::CALLOUT_ID_NONE
;
230 bool m_leader
= false;
231 bool m_namespace_replayers_locked
= false;
232 Context
*m_on_namespace_replayers_unlocked
= nullptr;
234 class PoolReplayerThread
: public Thread
{
235 PoolReplayer
*m_pool_replayer
;
237 PoolReplayerThread(PoolReplayer
*pool_replayer
)
238 : m_pool_replayer(pool_replayer
) {
240 void *entry() override
{
241 m_pool_replayer
->run();
244 } m_pool_replayer_thread
;
246 class LeaderListener
: public leader_watcher::Listener
{
248 LeaderListener(PoolReplayer
*pool_replayer
)
249 : m_pool_replayer(pool_replayer
) {
253 void post_acquire_handler(Context
*on_finish
) override
{
254 m_pool_replayer
->handle_post_acquire_leader(on_finish
);
257 void pre_release_handler(Context
*on_finish
) override
{
258 m_pool_replayer
->handle_pre_release_leader(on_finish
);
261 void update_leader_handler(
262 const std::string
&leader_instance_id
) override
{
263 m_pool_replayer
->handle_update_leader(leader_instance_id
);
266 void handle_instances_added(const InstanceIds
& instance_ids
) override
{
267 m_pool_replayer
->handle_instances_added(instance_ids
);
270 void handle_instances_removed(const InstanceIds
& instance_ids
) override
{
271 m_pool_replayer
->handle_instances_removed(instance_ids
);
275 PoolReplayer
*m_pool_replayer
;
278 std::unique_ptr
<LeaderWatcher
<ImageCtxT
>> m_leader_watcher
;
279 std::unique_ptr
<Throttler
<ImageCtxT
>> m_image_sync_throttler
;
280 std::unique_ptr
<Throttler
<ImageCtxT
>> m_image_deletion_throttler
;
283 } // namespace mirror
286 extern template class rbd::mirror::PoolReplayer
<librbd::ImageCtx
>;
288 #endif // CEPH_RBD_MIRROR_POOL_REPLAYER_H