]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/PoolReplayer.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_RBD_MIRROR_POOL_REPLAYER_H
5 #define CEPH_RBD_MIRROR_POOL_REPLAYER_H
6
7 #include "common/Cond.h"
8 #include "common/WorkQueue.h"
9 #include "common/ceph_mutex.h"
10 #include "include/rados/librados.hpp"
11 #include "librbd/Utils.h"
12
13 #include "tools/rbd_mirror/LeaderWatcher.h"
14 #include "tools/rbd_mirror/NamespaceReplayer.h"
15 #include "tools/rbd_mirror/Throttler.h"
16 #include "tools/rbd_mirror/Types.h"
17 #include "tools/rbd_mirror/leader_watcher/Types.h"
18 #include "tools/rbd_mirror/service_daemon/Types.h"
19
20 #include <map>
21 #include <memory>
22 #include <string>
23 #include <vector>
24
25 class AdminSocketHook;
26
27 namespace journal { struct CacheManagerHandler; }
28
29 namespace librbd { class ImageCtx; }
30
31 namespace rbd {
32 namespace mirror {
33
34 template <typename> class RemotePoolPoller;
35 namespace remote_pool_poller { struct Listener; }
36
37 struct PoolMetaCache;
38 template <typename> class ServiceDaemon;
39 template <typename> struct Threads;
40
41
42 /**
43 * Controls mirroring for a single remote cluster.
44 */
45 template <typename ImageCtxT = librbd::ImageCtx>
46 class PoolReplayer {
47 public:
48 PoolReplayer(Threads<ImageCtxT> *threads,
49 ServiceDaemon<ImageCtxT> *service_daemon,
50 journal::CacheManagerHandler *cache_manager_handler,
51 PoolMetaCache* pool_meta_cache,
52 int64_t local_pool_id, const PeerSpec &peer,
53 const std::vector<const char*> &args);
54 ~PoolReplayer();
55 PoolReplayer(const PoolReplayer&) = delete;
56 PoolReplayer& operator=(const PoolReplayer&) = delete;
57
58 bool is_blacklisted() const;
59 bool is_leader() const;
60 bool is_running() const;
61
62 void init(const std::string& site_name);
63 void shut_down();
64
65 void run();
66
67 void print_status(Formatter *f);
68 void start();
69 void stop(bool manual);
70 void restart();
71 void flush();
72 void release_leader();
73 void reopen_logs();
74
75 private:
76 /**
77 * @verbatim
78 *
79 * <start>
80 * |
81 * v
82 * INIT
83 * |
84 * v
85 * <follower> <---------------------\
86 * . |
87 * . (leader acquired) |
88 * v |
89 * NOTIFY_NAMESPACE_WATCHERS NOTIFY_NAMESPACE_WATCHERS
90 * | ^
91 * v .
92 * <leader> .
93 * . .
94 * . (leader lost / shut down) .
95 * . . . . . . . . . . . . . . . .
96 *
97 * @endverbatim
98 */
99
100 struct RemotePoolPollerListener;
101
102 int init_rados(const std::string &cluster_name,
103 const std::string &client_name,
104 const std::string &mon_host,
105 const std::string &key,
106 const std::string &description, RadosRef *rados_ref,
107 bool strip_cluster_overrides);
108
109 void update_namespace_replayers();
110 int list_mirroring_namespaces(std::set<std::string> *namespaces);
111
112 void namespace_replayer_acquire_leader(const std::string &name,
113 Context *on_finish);
114
115 void handle_post_acquire_leader(Context *on_finish);
116 void handle_pre_release_leader(Context *on_finish);
117
118 void handle_update_leader(const std::string &leader_instance_id);
119
120 void handle_instances_added(const std::vector<std::string> &instance_ids);
121 void handle_instances_removed(const std::vector<std::string> &instance_ids);
122
123 // sync version, executed in the caller thread
124 template <typename L>
125 void with_namespace_replayers(L &&callback) {
126 std::lock_guard locker{m_lock};
127
128 if (m_namespace_replayers_locked) {
129 ceph_assert(m_on_namespace_replayers_unlocked == nullptr);
130 C_SaferCond cond;
131 m_on_namespace_replayers_unlocked = &cond;
132 m_lock.unlock();
133 cond.wait();
134 m_lock.lock();
135 } else {
136 m_namespace_replayers_locked = true;
137 }
138
139 ceph_assert(m_namespace_replayers_locked);
140 callback(); // may temporary release the lock
141 ceph_assert(m_namespace_replayers_locked);
142
143 if (m_on_namespace_replayers_unlocked == nullptr) {
144 m_namespace_replayers_locked = false;
145 return;
146 }
147
148 m_threads->work_queue->queue(m_on_namespace_replayers_unlocked);
149 m_on_namespace_replayers_unlocked = nullptr;
150 }
151
152 // async version
153 template <typename L>
154 void with_namespace_replayers(L &&callback, Context *on_finish) {
155 std::lock_guard locker{m_lock};
156
157 on_finish = librbd::util::create_async_context_callback(
158 m_threads->work_queue, new LambdaContext(
159 [this, on_finish](int r) {
160 {
161 std::lock_guard locker{m_lock};
162 ceph_assert(m_namespace_replayers_locked);
163
164 m_namespace_replayers_locked = false;
165
166 if (m_on_namespace_replayers_unlocked != nullptr) {
167 m_namespace_replayers_locked = true;
168 m_threads->work_queue->queue(m_on_namespace_replayers_unlocked);
169 m_on_namespace_replayers_unlocked = nullptr;
170 }
171 }
172 on_finish->complete(r);
173 }));
174
175 auto on_lock = new LambdaContext(
176 [this, callback, on_finish](int) {
177 std::lock_guard locker{m_lock};
178 ceph_assert(m_namespace_replayers_locked);
179
180 callback(on_finish);
181 });
182
183 if (m_namespace_replayers_locked) {
184 ceph_assert(m_on_namespace_replayers_unlocked == nullptr);
185 m_on_namespace_replayers_unlocked = on_lock;
186 return;
187 }
188
189 m_namespace_replayers_locked = true;
190 m_threads->work_queue->queue(on_lock);
191 }
192
193 void handle_remote_pool_meta_updated(const RemotePoolMeta& remote_pool_meta);
194
195 Threads<ImageCtxT> *m_threads;
196 ServiceDaemon<ImageCtxT> *m_service_daemon;
197 journal::CacheManagerHandler *m_cache_manager_handler;
198 PoolMetaCache* m_pool_meta_cache;
199 int64_t m_local_pool_id = -1;
200 PeerSpec m_peer;
201 std::vector<const char*> m_args;
202
203 mutable ceph::mutex m_lock;
204 ceph::condition_variable m_cond;
205 std::string m_site_name;
206 bool m_stopping = false;
207 bool m_manual_stop = false;
208 bool m_blacklisted = false;
209
210 RadosRef m_local_rados;
211 RadosRef m_remote_rados;
212
213 librados::IoCtx m_local_io_ctx;
214 librados::IoCtx m_remote_io_ctx;
215
216 std::string m_local_mirror_uuid;
217
218 RemotePoolMeta m_remote_pool_meta;
219 std::unique_ptr<remote_pool_poller::Listener> m_remote_pool_poller_listener;
220 std::unique_ptr<RemotePoolPoller<ImageCtxT>> m_remote_pool_poller;
221
222 std::unique_ptr<NamespaceReplayer<ImageCtxT>> m_default_namespace_replayer;
223 std::map<std::string, NamespaceReplayer<ImageCtxT> *> m_namespace_replayers;
224
225 std::string m_asok_hook_name;
226 AdminSocketHook *m_asok_hook = nullptr;
227
228 service_daemon::CalloutId m_callout_id = service_daemon::CALLOUT_ID_NONE;
229
230 bool m_leader = false;
231 bool m_namespace_replayers_locked = false;
232 Context *m_on_namespace_replayers_unlocked = nullptr;
233
234 class PoolReplayerThread : public Thread {
235 PoolReplayer *m_pool_replayer;
236 public:
237 PoolReplayerThread(PoolReplayer *pool_replayer)
238 : m_pool_replayer(pool_replayer) {
239 }
240 void *entry() override {
241 m_pool_replayer->run();
242 return 0;
243 }
244 } m_pool_replayer_thread;
245
246 class LeaderListener : public leader_watcher::Listener {
247 public:
248 LeaderListener(PoolReplayer *pool_replayer)
249 : m_pool_replayer(pool_replayer) {
250 }
251
252 protected:
253 void post_acquire_handler(Context *on_finish) override {
254 m_pool_replayer->handle_post_acquire_leader(on_finish);
255 }
256
257 void pre_release_handler(Context *on_finish) override {
258 m_pool_replayer->handle_pre_release_leader(on_finish);
259 }
260
261 void update_leader_handler(
262 const std::string &leader_instance_id) override {
263 m_pool_replayer->handle_update_leader(leader_instance_id);
264 }
265
266 void handle_instances_added(const InstanceIds& instance_ids) override {
267 m_pool_replayer->handle_instances_added(instance_ids);
268 }
269
270 void handle_instances_removed(const InstanceIds& instance_ids) override {
271 m_pool_replayer->handle_instances_removed(instance_ids);
272 }
273
274 private:
275 PoolReplayer *m_pool_replayer;
276 } m_leader_listener;
277
278 std::unique_ptr<LeaderWatcher<ImageCtxT>> m_leader_watcher;
279 std::unique_ptr<Throttler<ImageCtxT>> m_image_sync_throttler;
280 std::unique_ptr<Throttler<ImageCtxT>> m_image_deletion_throttler;
281 };
282
283 } // namespace mirror
284 } // namespace rbd
285
286 extern template class rbd::mirror::PoolReplayer<librbd::ImageCtx>;
287
288 #endif // CEPH_RBD_MIRROR_POOL_REPLAYER_H