]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "PoolReplayer.h" | |
9f95a23c | 5 | #include "common/Cond.h" |
7c673cae FG |
6 | #include "common/Formatter.h" |
7 | #include "common/admin_socket.h" | |
8 | #include "common/ceph_argparse.h" | |
9 | #include "common/code_environment.h" | |
10 | #include "common/common_init.h" | |
11 | #include "common/debug.h" | |
12 | #include "common/errno.h" | |
7c673cae FG |
13 | #include "cls/rbd/cls_rbd_client.h" |
14 | #include "global/global_context.h" | |
11fdf7f2 | 15 | #include "librbd/api/Config.h" |
9f95a23c TL |
16 | #include "librbd/api/Namespace.h" |
17 | #include "PoolMetaCache.h" | |
18 | #include "RemotePoolPoller.h" | |
c07f9fc5 | 19 | #include "ServiceDaemon.h" |
7c673cae FG |
20 | #include "Threads.h" |
21 | ||
22 | #define dout_context g_ceph_context | |
23 | #define dout_subsys ceph_subsys_rbd_mirror | |
24 | #undef dout_prefix | |
25 | #define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \ | |
26 | << this << " " << __func__ << ": " | |
27 | ||
7c673cae FG |
28 | namespace rbd { |
29 | namespace mirror { | |
30 | ||
11fdf7f2 TL |
31 | using ::operator<<; |
32 | ||
7c673cae FG |
33 | namespace { |
34 | ||
11fdf7f2 | 35 | const std::string SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id"); |
c07f9fc5 | 36 | const std::string SERVICE_DAEMON_LEADER_KEY("leader"); |
c07f9fc5 | 37 | |
3efd9988 FG |
38 | const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS { |
39 | {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}}; | |
40 | ||
11fdf7f2 | 41 | template <typename I> |
7c673cae FG |
42 | class PoolReplayerAdminSocketCommand { |
43 | public: | |
11fdf7f2 | 44 | PoolReplayerAdminSocketCommand(PoolReplayer<I> *pool_replayer) |
7c673cae FG |
45 | : pool_replayer(pool_replayer) { |
46 | } | |
47 | virtual ~PoolReplayerAdminSocketCommand() {} | |
9f95a23c | 48 | virtual int call(Formatter *f) = 0; |
7c673cae | 49 | protected: |
11fdf7f2 | 50 | PoolReplayer<I> *pool_replayer; |
7c673cae FG |
51 | }; |
52 | ||
11fdf7f2 TL |
53 | template <typename I> |
54 | class StatusCommand : public PoolReplayerAdminSocketCommand<I> { | |
7c673cae | 55 | public: |
11fdf7f2 TL |
56 | explicit StatusCommand(PoolReplayer<I> *pool_replayer) |
57 | : PoolReplayerAdminSocketCommand<I>(pool_replayer) { | |
7c673cae FG |
58 | } |
59 | ||
9f95a23c TL |
60 | int call(Formatter *f) override { |
61 | this->pool_replayer->print_status(f); | |
62 | return 0; | |
7c673cae FG |
63 | } |
64 | }; | |
65 | ||
11fdf7f2 TL |
66 | template <typename I> |
67 | class StartCommand : public PoolReplayerAdminSocketCommand<I> { | |
7c673cae | 68 | public: |
11fdf7f2 TL |
69 | explicit StartCommand(PoolReplayer<I> *pool_replayer) |
70 | : PoolReplayerAdminSocketCommand<I>(pool_replayer) { | |
7c673cae FG |
71 | } |
72 | ||
9f95a23c | 73 | int call(Formatter *f) override { |
11fdf7f2 | 74 | this->pool_replayer->start(); |
9f95a23c | 75 | return 0; |
7c673cae FG |
76 | } |
77 | }; | |
78 | ||
11fdf7f2 TL |
79 | template <typename I> |
80 | class StopCommand : public PoolReplayerAdminSocketCommand<I> { | |
7c673cae | 81 | public: |
11fdf7f2 TL |
82 | explicit StopCommand(PoolReplayer<I> *pool_replayer) |
83 | : PoolReplayerAdminSocketCommand<I>(pool_replayer) { | |
7c673cae FG |
84 | } |
85 | ||
9f95a23c | 86 | int call(Formatter *f) override { |
11fdf7f2 | 87 | this->pool_replayer->stop(true); |
9f95a23c | 88 | return 0; |
7c673cae FG |
89 | } |
90 | }; | |
91 | ||
11fdf7f2 TL |
92 | template <typename I> |
93 | class RestartCommand : public PoolReplayerAdminSocketCommand<I> { | |
7c673cae | 94 | public: |
11fdf7f2 TL |
95 | explicit RestartCommand(PoolReplayer<I> *pool_replayer) |
96 | : PoolReplayerAdminSocketCommand<I>(pool_replayer) { | |
7c673cae FG |
97 | } |
98 | ||
9f95a23c | 99 | int call(Formatter *f) override { |
11fdf7f2 | 100 | this->pool_replayer->restart(); |
9f95a23c | 101 | return 0; |
7c673cae FG |
102 | } |
103 | }; | |
104 | ||
11fdf7f2 TL |
105 | template <typename I> |
106 | class FlushCommand : public PoolReplayerAdminSocketCommand<I> { | |
7c673cae | 107 | public: |
11fdf7f2 TL |
108 | explicit FlushCommand(PoolReplayer<I> *pool_replayer) |
109 | : PoolReplayerAdminSocketCommand<I>(pool_replayer) { | |
7c673cae FG |
110 | } |
111 | ||
9f95a23c | 112 | int call(Formatter *f) override { |
11fdf7f2 | 113 | this->pool_replayer->flush(); |
9f95a23c | 114 | return 0; |
7c673cae FG |
115 | } |
116 | }; | |
117 | ||
11fdf7f2 TL |
118 | template <typename I> |
119 | class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand<I> { | |
7c673cae | 120 | public: |
11fdf7f2 TL |
121 | explicit LeaderReleaseCommand(PoolReplayer<I> *pool_replayer) |
122 | : PoolReplayerAdminSocketCommand<I>(pool_replayer) { | |
7c673cae FG |
123 | } |
124 | ||
9f95a23c | 125 | int call(Formatter *f) override { |
11fdf7f2 | 126 | this->pool_replayer->release_leader(); |
9f95a23c | 127 | return 0; |
7c673cae FG |
128 | } |
129 | }; | |
130 | ||
11fdf7f2 | 131 | template <typename I> |
7c673cae FG |
132 | class PoolReplayerAdminSocketHook : public AdminSocketHook { |
133 | public: | |
134 | PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name, | |
11fdf7f2 | 135 | PoolReplayer<I> *pool_replayer) |
7c673cae FG |
136 | : admin_socket(cct->get_admin_socket()) { |
137 | std::string command; | |
138 | int r; | |
139 | ||
140 | command = "rbd mirror status " + name; | |
9f95a23c | 141 | r = admin_socket->register_command(command, this, |
7c673cae FG |
142 | "get status for rbd mirror " + name); |
143 | if (r == 0) { | |
11fdf7f2 | 144 | commands[command] = new StatusCommand<I>(pool_replayer); |
7c673cae FG |
145 | } |
146 | ||
147 | command = "rbd mirror start " + name; | |
9f95a23c | 148 | r = admin_socket->register_command(command, this, |
7c673cae FG |
149 | "start rbd mirror " + name); |
150 | if (r == 0) { | |
11fdf7f2 | 151 | commands[command] = new StartCommand<I>(pool_replayer); |
7c673cae FG |
152 | } |
153 | ||
154 | command = "rbd mirror stop " + name; | |
9f95a23c | 155 | r = admin_socket->register_command(command, this, |
7c673cae FG |
156 | "stop rbd mirror " + name); |
157 | if (r == 0) { | |
11fdf7f2 | 158 | commands[command] = new StopCommand<I>(pool_replayer); |
7c673cae FG |
159 | } |
160 | ||
161 | command = "rbd mirror restart " + name; | |
9f95a23c | 162 | r = admin_socket->register_command(command, this, |
7c673cae FG |
163 | "restart rbd mirror " + name); |
164 | if (r == 0) { | |
11fdf7f2 | 165 | commands[command] = new RestartCommand<I>(pool_replayer); |
7c673cae FG |
166 | } |
167 | ||
168 | command = "rbd mirror flush " + name; | |
9f95a23c | 169 | r = admin_socket->register_command(command, this, |
7c673cae FG |
170 | "flush rbd mirror " + name); |
171 | if (r == 0) { | |
11fdf7f2 | 172 | commands[command] = new FlushCommand<I>(pool_replayer); |
7c673cae FG |
173 | } |
174 | ||
175 | command = "rbd mirror leader release " + name; | |
9f95a23c | 176 | r = admin_socket->register_command(command, this, |
7c673cae FG |
177 | "release rbd mirror leader " + name); |
178 | if (r == 0) { | |
11fdf7f2 | 179 | commands[command] = new LeaderReleaseCommand<I>(pool_replayer); |
7c673cae FG |
180 | } |
181 | } | |
182 | ||
183 | ~PoolReplayerAdminSocketHook() override { | |
9f95a23c | 184 | (void)admin_socket->unregister_commands(this); |
11fdf7f2 | 185 | for (auto i = commands.begin(); i != commands.end(); ++i) { |
7c673cae FG |
186 | delete i->second; |
187 | } | |
188 | } | |
189 | ||
9f95a23c TL |
190 | int call(std::string_view command, const cmdmap_t& cmdmap, |
191 | Formatter *f, | |
192 | std::ostream& ss, | |
193 | bufferlist& out) override { | |
11fdf7f2 TL |
194 | auto i = commands.find(command); |
195 | ceph_assert(i != commands.end()); | |
9f95a23c | 196 | return i->second->call(f); |
7c673cae FG |
197 | } |
198 | ||
199 | private: | |
11fdf7f2 TL |
200 | typedef std::map<std::string, PoolReplayerAdminSocketCommand<I>*, |
201 | std::less<>> Commands; | |
7c673cae FG |
202 | |
203 | AdminSocket *admin_socket; | |
204 | Commands commands; | |
205 | }; | |
206 | ||
207 | } // anonymous namespace | |
208 | ||
11fdf7f2 | 209 | template <typename I> |
9f95a23c TL |
210 | struct PoolReplayer<I>::RemotePoolPollerListener |
211 | : public remote_pool_poller::Listener { | |
212 | ||
213 | PoolReplayer<I>* m_pool_replayer; | |
214 | ||
215 | RemotePoolPollerListener(PoolReplayer<I>* pool_replayer) | |
216 | : m_pool_replayer(pool_replayer) { | |
217 | } | |
218 | ||
219 | void handle_updated(const RemotePoolMeta& remote_pool_meta) override { | |
220 | m_pool_replayer->handle_remote_pool_meta_updated(remote_pool_meta); | |
221 | } | |
222 | }; | |
223 | ||
224 | template <typename I> | |
225 | PoolReplayer<I>::PoolReplayer( | |
226 | Threads<I> *threads, ServiceDaemon<I> *service_daemon, | |
227 | journal::CacheManagerHandler *cache_manager_handler, | |
228 | PoolMetaCache* pool_meta_cache, int64_t local_pool_id, | |
229 | const PeerSpec &peer, const std::vector<const char*> &args) : | |
7c673cae | 230 | m_threads(threads), |
c07f9fc5 | 231 | m_service_daemon(service_daemon), |
9f95a23c TL |
232 | m_cache_manager_handler(cache_manager_handler), |
233 | m_pool_meta_cache(pool_meta_cache), | |
c07f9fc5 | 234 | m_local_pool_id(local_pool_id), |
7c673cae FG |
235 | m_peer(peer), |
236 | m_args(args), | |
9f95a23c | 237 | m_lock(ceph::make_mutex("rbd::mirror::PoolReplayer " + stringify(peer))), |
7c673cae | 238 | m_pool_replayer_thread(this), |
9f95a23c | 239 | m_leader_listener(this) { |
7c673cae FG |
240 | } |
241 | ||
11fdf7f2 TL |
242 | template <typename I> |
243 | PoolReplayer<I>::~PoolReplayer() | |
7c673cae | 244 | { |
c07f9fc5 | 245 | shut_down(); |
9f95a23c TL |
246 | |
247 | ceph_assert(m_asok_hook == nullptr); | |
7c673cae FG |
248 | } |
249 | ||
11fdf7f2 | 250 | template <typename I> |
f67539c2 | 251 | bool PoolReplayer<I>::is_blocklisted() const { |
9f95a23c | 252 | std::lock_guard locker{m_lock}; |
f67539c2 | 253 | return m_blocklisted; |
7c673cae FG |
254 | } |
255 | ||
11fdf7f2 TL |
256 | template <typename I> |
257 | bool PoolReplayer<I>::is_leader() const { | |
9f95a23c | 258 | std::lock_guard locker{m_lock}; |
7c673cae FG |
259 | return m_leader_watcher && m_leader_watcher->is_leader(); |
260 | } | |
261 | ||
11fdf7f2 TL |
262 | template <typename I> |
263 | bool PoolReplayer<I>::is_running() const { | |
20effc67 | 264 | return m_pool_replayer_thread.is_started() && !m_stopping; |
c07f9fc5 FG |
265 | } |
266 | ||
11fdf7f2 | 267 | template <typename I> |
9f95a23c TL |
268 | void PoolReplayer<I>::init(const std::string& site_name) { |
269 | std::lock_guard locker{m_lock}; | |
92f5a8d4 | 270 | |
11fdf7f2 | 271 | ceph_assert(!m_pool_replayer_thread.is_started()); |
c07f9fc5 FG |
272 | |
273 | // reset state | |
274 | m_stopping = false; | |
f67539c2 | 275 | m_blocklisted = false; |
9f95a23c | 276 | m_site_name = site_name; |
7c673cae | 277 | |
11fdf7f2 | 278 | dout(10) << "replaying for " << m_peer << dendl; |
7c673cae FG |
279 | int r = init_rados(g_ceph_context->_conf->cluster, |
280 | g_ceph_context->_conf->name.to_str(), | |
11fdf7f2 | 281 | "", "", "local cluster", &m_local_rados, false); |
7c673cae | 282 | if (r < 0) { |
c07f9fc5 FG |
283 | m_callout_id = m_service_daemon->add_or_update_callout( |
284 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
285 | "unable to connect to local cluster"); | |
286 | return; | |
7c673cae FG |
287 | } |
288 | ||
289 | r = init_rados(m_peer.cluster_name, m_peer.client_name, | |
11fdf7f2 | 290 | m_peer.mon_host, m_peer.key, |
7c673cae | 291 | std::string("remote peer ") + stringify(m_peer), |
3efd9988 | 292 | &m_remote_rados, true); |
7c673cae | 293 | if (r < 0) { |
c07f9fc5 FG |
294 | m_callout_id = m_service_daemon->add_or_update_callout( |
295 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
296 | "unable to connect to remote cluster"); | |
297 | return; | |
7c673cae FG |
298 | } |
299 | ||
300 | r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx); | |
301 | if (r < 0) { | |
302 | derr << "error accessing local pool " << m_local_pool_id << ": " | |
303 | << cpp_strerror(r) << dendl; | |
c07f9fc5 | 304 | return; |
7c673cae FG |
305 | } |
306 | ||
11fdf7f2 TL |
307 | auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct()); |
308 | librbd::api::Config<I>::apply_pool_overrides(m_local_io_ctx, &cct->_conf); | |
309 | ||
7c673cae | 310 | r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx, |
9f95a23c | 311 | &m_local_mirror_uuid); |
7c673cae FG |
312 | if (r < 0) { |
313 | derr << "failed to retrieve local mirror uuid from pool " | |
314 | << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl; | |
c07f9fc5 FG |
315 | m_callout_id = m_service_daemon->add_or_update_callout( |
316 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
317 | "unable to query local mirror uuid"); | |
318 | return; | |
7c673cae FG |
319 | } |
320 | ||
321 | r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(), | |
322 | m_remote_io_ctx); | |
323 | if (r < 0) { | |
324 | derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name() | |
325 | << ": " << cpp_strerror(r) << dendl; | |
c07f9fc5 FG |
326 | m_callout_id = m_service_daemon->add_or_update_callout( |
327 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING, | |
328 | "unable to access remote pool"); | |
329 | return; | |
7c673cae FG |
330 | } |
331 | ||
11fdf7f2 | 332 | dout(10) << "connected to " << m_peer << dendl; |
7c673cae | 333 | |
9f95a23c TL |
334 | m_image_sync_throttler.reset( |
335 | Throttler<I>::create(cct, "rbd_mirror_concurrent_image_syncs")); | |
336 | ||
337 | m_image_deletion_throttler.reset( | |
338 | Throttler<I>::create(cct, "rbd_mirror_concurrent_image_deletions")); | |
7c673cae | 339 | |
9f95a23c TL |
340 | m_remote_pool_poller_listener.reset(new RemotePoolPollerListener(this)); |
341 | m_remote_pool_poller.reset(RemotePoolPoller<I>::create( | |
342 | m_threads, m_remote_io_ctx, m_site_name, m_local_mirror_uuid, | |
343 | *m_remote_pool_poller_listener)); | |
344 | ||
345 | C_SaferCond on_pool_poller_init; | |
346 | m_remote_pool_poller->init(&on_pool_poller_init); | |
347 | r = on_pool_poller_init.wait(); | |
7c673cae | 348 | if (r < 0) { |
9f95a23c TL |
349 | derr << "failed to initialize remote pool poller: " << cpp_strerror(r) |
350 | << dendl; | |
c07f9fc5 FG |
351 | m_callout_id = m_service_daemon->add_or_update_callout( |
352 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
9f95a23c TL |
353 | "unable to initialize remote pool poller"); |
354 | m_remote_pool_poller.reset(); | |
355 | return; | |
356 | } | |
357 | ceph_assert(!m_remote_pool_meta.mirror_uuid.empty()); | |
358 | m_pool_meta_cache->set_remote_pool_meta( | |
359 | m_remote_io_ctx.get_id(), m_remote_pool_meta); | |
360 | m_pool_meta_cache->set_local_pool_meta( | |
361 | m_local_io_ctx.get_id(), {m_local_mirror_uuid}); | |
362 | ||
363 | m_default_namespace_replayer.reset(NamespaceReplayer<I>::create( | |
364 | "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid, | |
365 | m_remote_pool_meta, m_threads, m_image_sync_throttler.get(), | |
366 | m_image_deletion_throttler.get(), m_service_daemon, | |
367 | m_cache_manager_handler, m_pool_meta_cache)); | |
368 | ||
369 | C_SaferCond on_init; | |
370 | m_default_namespace_replayer->init(&on_init); | |
371 | r = on_init.wait(); | |
372 | if (r < 0) { | |
373 | derr << "error initializing default namespace replayer: " << cpp_strerror(r) | |
374 | << dendl; | |
375 | m_callout_id = m_service_daemon->add_or_update_callout( | |
376 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
377 | "unable to initialize default namespace replayer"); | |
378 | m_default_namespace_replayer.reset(); | |
c07f9fc5 | 379 | return; |
7c673cae FG |
380 | } |
381 | ||
11fdf7f2 TL |
382 | m_leader_watcher.reset(LeaderWatcher<I>::create(m_threads, m_local_io_ctx, |
383 | &m_leader_listener)); | |
7c673cae FG |
384 | r = m_leader_watcher->init(); |
385 | if (r < 0) { | |
386 | derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl; | |
c07f9fc5 FG |
387 | m_callout_id = m_service_daemon->add_or_update_callout( |
388 | m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR, | |
389 | "unable to initialize leader messenger object"); | |
9f95a23c | 390 | m_leader_watcher.reset(); |
c07f9fc5 FG |
391 | return; |
392 | } | |
393 | ||
394 | if (m_callout_id != service_daemon::CALLOUT_ID_NONE) { | |
395 | m_service_daemon->remove_callout(m_local_pool_id, m_callout_id); | |
396 | m_callout_id = service_daemon::CALLOUT_ID_NONE; | |
7c673cae FG |
397 | } |
398 | ||
9f95a23c TL |
399 | m_service_daemon->add_or_update_attribute( |
400 | m_local_io_ctx.get_id(), SERVICE_DAEMON_INSTANCE_ID_KEY, | |
401 | stringify(m_local_io_ctx.get_instance_id())); | |
402 | ||
7c673cae | 403 | m_pool_replayer_thread.create("pool replayer"); |
c07f9fc5 | 404 | } |
7c673cae | 405 | |
11fdf7f2 TL |
406 | template <typename I> |
407 | void PoolReplayer<I>::shut_down() { | |
c07f9fc5 | 408 | { |
9f95a23c TL |
409 | std::lock_guard l{m_lock}; |
410 | m_stopping = true; | |
411 | m_cond.notify_all(); | |
c07f9fc5 FG |
412 | } |
413 | if (m_pool_replayer_thread.is_started()) { | |
414 | m_pool_replayer_thread.join(); | |
415 | } | |
9f95a23c | 416 | |
c07f9fc5 FG |
417 | if (m_leader_watcher) { |
418 | m_leader_watcher->shut_down(); | |
419 | } | |
9f95a23c TL |
420 | m_leader_watcher.reset(); |
421 | ||
422 | if (m_default_namespace_replayer) { | |
423 | C_SaferCond on_shut_down; | |
424 | m_default_namespace_replayer->shut_down(&on_shut_down); | |
425 | on_shut_down.wait(); | |
c07f9fc5 | 426 | } |
9f95a23c TL |
427 | m_default_namespace_replayer.reset(); |
428 | ||
429 | if (m_remote_pool_poller) { | |
430 | C_SaferCond ctx; | |
431 | m_remote_pool_poller->shut_down(&ctx); | |
432 | ctx.wait(); | |
433 | ||
434 | m_pool_meta_cache->remove_remote_pool_meta(m_remote_io_ctx.get_id()); | |
435 | m_pool_meta_cache->remove_local_pool_meta(m_local_io_ctx.get_id()); | |
c07f9fc5 | 436 | } |
9f95a23c TL |
437 | m_remote_pool_poller.reset(); |
438 | m_remote_pool_poller_listener.reset(); | |
c07f9fc5 | 439 | |
9f95a23c TL |
440 | m_image_sync_throttler.reset(); |
441 | m_image_deletion_throttler.reset(); | |
28e407b8 | 442 | |
c07f9fc5 FG |
443 | m_local_rados.reset(); |
444 | m_remote_rados.reset(); | |
7c673cae FG |
445 | } |
446 | ||
11fdf7f2 TL |
447 | template <typename I> |
448 | int PoolReplayer<I>::init_rados(const std::string &cluster_name, | |
449 | const std::string &client_name, | |
450 | const std::string &mon_host, | |
451 | const std::string &key, | |
452 | const std::string &description, | |
453 | RadosRef *rados_ref, | |
454 | bool strip_cluster_overrides) { | |
7c673cae FG |
455 | // NOTE: manually bootstrap a CephContext here instead of via |
456 | // the librados API to avoid mixing global singletons between | |
457 | // the librados shared library and the daemon | |
458 | // TODO: eliminate intermingling of global singletons within Ceph APIs | |
459 | CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT); | |
460 | if (client_name.empty() || !iparams.name.from_str(client_name)) { | |
461 | derr << "error initializing cluster handle for " << description << dendl; | |
462 | return -EINVAL; | |
463 | } | |
464 | ||
465 | CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY, | |
466 | CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS); | |
467 | cct->_conf->cluster = cluster_name; | |
468 | ||
469 | // librados::Rados::conf_read_file | |
11fdf7f2 TL |
470 | int r = cct->_conf.parse_config_files(nullptr, nullptr, 0); |
471 | if (r < 0 && r != -ENOENT) { | |
eafe8130 | 472 | // do not treat this as fatal, it might still be able to connect |
7c673cae FG |
473 | derr << "could not read ceph conf for " << description << ": " |
474 | << cpp_strerror(r) << dendl; | |
7c673cae | 475 | } |
3efd9988 FG |
476 | |
477 | // preserve cluster-specific config settings before applying environment/cli | |
478 | // overrides | |
479 | std::map<std::string, std::string> config_values; | |
480 | if (strip_cluster_overrides) { | |
481 | // remote peer connections shouldn't apply cluster-specific | |
482 | // configuration settings | |
483 | for (auto& key : UNIQUE_PEER_CONFIG_KEYS) { | |
11fdf7f2 | 484 | config_values[key] = cct->_conf.get_val<std::string>(key); |
3efd9988 FG |
485 | } |
486 | } | |
487 | ||
11fdf7f2 | 488 | cct->_conf.parse_env(cct->get_module_type()); |
7c673cae FG |
489 | |
490 | // librados::Rados::conf_parse_env | |
491 | std::vector<const char*> args; | |
11fdf7f2 | 492 | r = cct->_conf.parse_argv(args); |
7c673cae FG |
493 | if (r < 0) { |
494 | derr << "could not parse environment for " << description << ":" | |
495 | << cpp_strerror(r) << dendl; | |
496 | cct->put(); | |
497 | return r; | |
498 | } | |
11fdf7f2 | 499 | cct->_conf.parse_env(cct->get_module_type()); |
7c673cae FG |
500 | |
501 | if (!m_args.empty()) { | |
502 | // librados::Rados::conf_parse_argv | |
503 | args = m_args; | |
11fdf7f2 | 504 | r = cct->_conf.parse_argv(args); |
7c673cae FG |
505 | if (r < 0) { |
506 | derr << "could not parse command line args for " << description << ": " | |
507 | << cpp_strerror(r) << dendl; | |
508 | cct->put(); | |
509 | return r; | |
510 | } | |
511 | } | |
512 | ||
3efd9988 FG |
513 | if (strip_cluster_overrides) { |
514 | // remote peer connections shouldn't apply cluster-specific | |
515 | // configuration settings | |
516 | for (auto& pair : config_values) { | |
11fdf7f2 | 517 | auto value = cct->_conf.get_val<std::string>(pair.first); |
3efd9988 FG |
518 | if (pair.second != value) { |
519 | dout(0) << "reverting global config option override: " | |
520 | << pair.first << ": " << value << " -> " << pair.second | |
521 | << dendl; | |
11fdf7f2 | 522 | cct->_conf.set_val_or_die(pair.first, pair.second); |
3efd9988 FG |
523 | } |
524 | } | |
525 | } | |
526 | ||
7c673cae | 527 | if (!g_ceph_context->_conf->admin_socket.empty()) { |
11fdf7f2 | 528 | cct->_conf.set_val_or_die("admin_socket", |
7c673cae FG |
529 | "$run_dir/$name.$pid.$cluster.$cctid.asok"); |
530 | } | |
531 | ||
11fdf7f2 TL |
532 | if (!mon_host.empty()) { |
533 | r = cct->_conf.set_val("mon_host", mon_host); | |
534 | if (r < 0) { | |
535 | derr << "failed to set mon_host config for " << description << ": " | |
536 | << cpp_strerror(r) << dendl; | |
537 | cct->put(); | |
538 | return r; | |
539 | } | |
540 | } | |
541 | ||
542 | if (!key.empty()) { | |
543 | r = cct->_conf.set_val("key", key); | |
544 | if (r < 0) { | |
545 | derr << "failed to set key config for " << description << ": " | |
546 | << cpp_strerror(r) << dendl; | |
547 | cct->put(); | |
548 | return r; | |
549 | } | |
550 | } | |
551 | ||
7c673cae | 552 | // disable unnecessary librbd cache |
11fdf7f2 TL |
553 | cct->_conf.set_val_or_die("rbd_cache", "false"); |
554 | cct->_conf.apply_changes(nullptr); | |
9f95a23c | 555 | cct->_conf.complain_about_parse_error(cct); |
7c673cae | 556 | |
92f5a8d4 TL |
557 | rados_ref->reset(new librados::Rados()); |
558 | ||
7c673cae | 559 | r = (*rados_ref)->init_with_context(cct); |
11fdf7f2 | 560 | ceph_assert(r == 0); |
7c673cae FG |
561 | cct->put(); |
562 | ||
563 | r = (*rados_ref)->connect(); | |
564 | if (r < 0) { | |
565 | derr << "error connecting to " << description << ": " | |
566 | << cpp_strerror(r) << dendl; | |
567 | return r; | |
568 | } | |
569 | ||
570 | return 0; | |
571 | } | |
572 | ||
11fdf7f2 | 573 | template <typename I> |
9f95a23c TL |
574 | void PoolReplayer<I>::run() { |
575 | dout(20) << dendl; | |
7c673cae | 576 | |
9f95a23c | 577 | while (true) { |
7c673cae FG |
578 | std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " + |
579 | m_peer.cluster_name; | |
580 | if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) { | |
581 | m_asok_hook_name = asok_hook_name; | |
582 | delete m_asok_hook; | |
583 | ||
11fdf7f2 TL |
584 | m_asok_hook = new PoolReplayerAdminSocketHook<I>(g_ceph_context, |
585 | m_asok_hook_name, this); | |
7c673cae FG |
586 | } |
587 | ||
9f95a23c TL |
588 | with_namespace_replayers([this]() { update_namespace_replayers(); }); |
589 | ||
590 | std::unique_lock locker{m_lock}; | |
591 | ||
f67539c2 TL |
592 | if (m_leader_watcher->is_blocklisted() || |
593 | m_default_namespace_replayer->is_blocklisted()) { | |
594 | m_blocklisted = true; | |
7c673cae | 595 | m_stopping = true; |
9f95a23c TL |
596 | } |
597 | ||
598 | for (auto &it : m_namespace_replayers) { | |
f67539c2 TL |
599 | if (it.second->is_blocklisted()) { |
600 | m_blocklisted = true; | |
9f95a23c TL |
601 | m_stopping = true; |
602 | break; | |
603 | } | |
604 | } | |
605 | ||
606 | if (m_stopping) { | |
7c673cae FG |
607 | break; |
608 | } | |
609 | ||
9f95a23c TL |
610 | auto seconds = g_ceph_context->_conf.get_val<uint64_t>( |
611 | "rbd_mirror_pool_replayers_refresh_interval"); | |
612 | m_cond.wait_for(locker, ceph::make_timespan(seconds)); | |
613 | } | |
614 | ||
615 | // shut down namespace replayers | |
616 | with_namespace_replayers([this]() { update_namespace_replayers(); }); | |
617 | ||
618 | delete m_asok_hook; | |
619 | m_asok_hook = nullptr; | |
620 | } | |
621 | ||
622 | template <typename I> | |
623 | void PoolReplayer<I>::update_namespace_replayers() { | |
624 | dout(20) << dendl; | |
625 | ||
626 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
627 | ||
628 | std::set<std::string> mirroring_namespaces; | |
629 | if (!m_stopping) { | |
630 | int r = list_mirroring_namespaces(&mirroring_namespaces); | |
631 | if (r < 0) { | |
632 | return; | |
633 | } | |
634 | } | |
635 | ||
636 | auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct()); | |
637 | C_SaferCond cond; | |
638 | auto gather_ctx = new C_Gather(cct, &cond); | |
639 | for (auto it = m_namespace_replayers.begin(); | |
640 | it != m_namespace_replayers.end(); ) { | |
641 | auto iter = mirroring_namespaces.find(it->first); | |
642 | if (iter == mirroring_namespaces.end()) { | |
643 | auto namespace_replayer = it->second; | |
644 | auto on_shut_down = new LambdaContext( | |
645 | [namespace_replayer, ctx=gather_ctx->new_sub()](int r) { | |
646 | delete namespace_replayer; | |
647 | ctx->complete(r); | |
648 | }); | |
649 | m_service_daemon->remove_namespace(m_local_pool_id, it->first); | |
650 | namespace_replayer->shut_down(on_shut_down); | |
651 | it = m_namespace_replayers.erase(it); | |
652 | } else { | |
653 | mirroring_namespaces.erase(iter); | |
654 | it++; | |
655 | } | |
656 | } | |
657 | ||
658 | for (auto &name : mirroring_namespaces) { | |
659 | auto namespace_replayer = NamespaceReplayer<I>::create( | |
660 | name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid, | |
661 | m_remote_pool_meta, m_threads, m_image_sync_throttler.get(), | |
662 | m_image_deletion_throttler.get(), m_service_daemon, | |
663 | m_cache_manager_handler, m_pool_meta_cache); | |
664 | auto on_init = new LambdaContext( | |
665 | [this, namespace_replayer, name, &mirroring_namespaces, | |
666 | ctx=gather_ctx->new_sub()](int r) { | |
667 | std::lock_guard locker{m_lock}; | |
668 | if (r < 0) { | |
669 | derr << "failed to initialize namespace replayer for namespace " | |
670 | << name << ": " << cpp_strerror(r) << dendl; | |
671 | delete namespace_replayer; | |
672 | mirroring_namespaces.erase(name); | |
673 | } else { | |
674 | m_namespace_replayers[name] = namespace_replayer; | |
675 | m_service_daemon->add_namespace(m_local_pool_id, name); | |
676 | } | |
677 | ctx->complete(r); | |
678 | }); | |
679 | namespace_replayer->init(on_init); | |
680 | } | |
681 | ||
682 | gather_ctx->activate(); | |
683 | ||
684 | m_lock.unlock(); | |
685 | cond.wait(); | |
686 | m_lock.lock(); | |
687 | ||
688 | if (m_leader) { | |
689 | C_SaferCond acquire_cond; | |
690 | auto acquire_gather_ctx = new C_Gather(cct, &acquire_cond); | |
691 | ||
692 | for (auto &name : mirroring_namespaces) { | |
693 | namespace_replayer_acquire_leader(name, acquire_gather_ctx->new_sub()); | |
694 | } | |
695 | acquire_gather_ctx->activate(); | |
696 | ||
697 | m_lock.unlock(); | |
698 | acquire_cond.wait(); | |
699 | m_lock.lock(); | |
700 | ||
701 | std::vector<std::string> instance_ids; | |
702 | m_leader_watcher->list_instances(&instance_ids); | |
703 | ||
704 | for (auto &name : mirroring_namespaces) { | |
705 | auto it = m_namespace_replayers.find(name); | |
706 | if (it == m_namespace_replayers.end()) { | |
707 | // acuire leader for this namespace replayer failed | |
708 | continue; | |
709 | } | |
710 | it->second->handle_instances_added(instance_ids); | |
711 | } | |
712 | } else { | |
713 | std::string leader_instance_id; | |
714 | if (m_leader_watcher->get_leader_instance_id(&leader_instance_id)) { | |
715 | for (auto &name : mirroring_namespaces) { | |
716 | m_namespace_replayers[name]->handle_update_leader(leader_instance_id); | |
717 | } | |
718 | } | |
719 | } | |
720 | } | |
721 | ||
722 | template <typename I> | |
723 | int PoolReplayer<I>::list_mirroring_namespaces( | |
724 | std::set<std::string> *namespaces) { | |
725 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
726 | ||
727 | std::vector<std::string> names; | |
728 | ||
729 | int r = librbd::api::Namespace<I>::list(m_local_io_ctx, &names); | |
730 | if (r < 0) { | |
731 | derr << "failed to list namespaces: " << cpp_strerror(r) << dendl; | |
732 | return r; | |
733 | } | |
734 | ||
735 | for (auto &name : names) { | |
736 | cls::rbd::MirrorMode mirror_mode = cls::rbd::MIRROR_MODE_DISABLED; | |
737 | int r = librbd::cls_client::mirror_mode_get(&m_local_io_ctx, &mirror_mode); | |
738 | if (r < 0 && r != -ENOENT) { | |
739 | derr << "failed to get namespace mirror mode: " << cpp_strerror(r) | |
740 | << dendl; | |
741 | if (m_namespace_replayers.count(name) == 0) { | |
742 | continue; | |
743 | } | |
744 | } else if (mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) { | |
745 | dout(10) << "mirroring is disabled for namespace " << name << dendl; | |
746 | continue; | |
d2e6a577 | 747 | } |
9f95a23c TL |
748 | |
749 | namespaces->insert(name); | |
7c673cae | 750 | } |
28e407b8 | 751 | |
9f95a23c | 752 | return 0; |
7c673cae FG |
753 | } |
754 | ||
92f5a8d4 TL |
755 | template <typename I> |
756 | void PoolReplayer<I>::reopen_logs() | |
757 | { | |
9f95a23c | 758 | std::lock_guard locker{m_lock}; |
92f5a8d4 TL |
759 | |
760 | if (m_local_rados) { | |
761 | reinterpret_cast<CephContext *>(m_local_rados->cct())->reopen_logs(); | |
762 | } | |
763 | if (m_remote_rados) { | |
764 | reinterpret_cast<CephContext *>(m_remote_rados->cct())->reopen_logs(); | |
765 | } | |
766 | } | |
767 | ||
11fdf7f2 | 768 | template <typename I> |
9f95a23c TL |
769 | void PoolReplayer<I>::namespace_replayer_acquire_leader(const std::string &name, |
770 | Context *on_finish) { | |
771 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
772 | ||
773 | auto it = m_namespace_replayers.find(name); | |
774 | ceph_assert(it != m_namespace_replayers.end()); | |
775 | ||
776 | on_finish = new LambdaContext( | |
777 | [this, name, on_finish](int r) { | |
778 | if (r < 0) { | |
779 | derr << "failed to handle acquire leader for namespace: " | |
780 | << name << ": " << cpp_strerror(r) << dendl; | |
781 | ||
782 | // remove the namespace replayer -- update_namespace_replayers will | |
783 | // retry to create it and acquire leader. | |
784 | ||
785 | std::lock_guard locker{m_lock}; | |
786 | ||
787 | auto namespace_replayer = m_namespace_replayers[name]; | |
788 | m_namespace_replayers.erase(name); | |
789 | auto on_shut_down = new LambdaContext( | |
790 | [namespace_replayer, on_finish](int r) { | |
791 | delete namespace_replayer; | |
792 | on_finish->complete(r); | |
793 | }); | |
794 | m_service_daemon->remove_namespace(m_local_pool_id, name); | |
795 | namespace_replayer->shut_down(on_shut_down); | |
796 | return; | |
797 | } | |
798 | on_finish->complete(0); | |
799 | }); | |
7c673cae | 800 | |
9f95a23c TL |
801 | it->second->handle_acquire_leader(on_finish); |
802 | } | |
803 | ||
804 | template <typename I> | |
805 | void PoolReplayer<I>::print_status(Formatter *f) { | |
806 | dout(20) << dendl; | |
7c673cae | 807 | |
9f95a23c TL |
808 | assert(f); |
809 | ||
810 | std::lock_guard l{m_lock}; | |
7c673cae FG |
811 | |
812 | f->open_object_section("pool_replayer_status"); | |
7c673cae | 813 | f->dump_stream("peer") << m_peer; |
9f95a23c TL |
814 | if (m_local_io_ctx.is_valid()) { |
815 | f->dump_string("pool", m_local_io_ctx.get_pool_name()); | |
816 | f->dump_stream("instance_id") << m_local_io_ctx.get_instance_id(); | |
817 | } | |
7c673cae | 818 | |
11fdf7f2 TL |
819 | std::string state("running"); |
820 | if (m_manual_stop) { | |
821 | state = "stopped (manual)"; | |
822 | } else if (m_stopping) { | |
823 | state = "stopped"; | |
9f95a23c TL |
824 | } else if (!is_running()) { |
825 | state = "error"; | |
11fdf7f2 TL |
826 | } |
827 | f->dump_string("state", state); | |
828 | ||
9f95a23c TL |
829 | if (m_leader_watcher) { |
830 | std::string leader_instance_id; | |
831 | m_leader_watcher->get_leader_instance_id(&leader_instance_id); | |
832 | f->dump_string("leader_instance_id", leader_instance_id); | |
833 | ||
834 | bool leader = m_leader_watcher->is_leader(); | |
835 | f->dump_bool("leader", leader); | |
836 | if (leader) { | |
837 | std::vector<std::string> instance_ids; | |
838 | m_leader_watcher->list_instances(&instance_ids); | |
839 | f->open_array_section("instances"); | |
840 | for (auto instance_id : instance_ids) { | |
841 | f->dump_string("instance_id", instance_id); | |
842 | } | |
843 | f->close_section(); // instances | |
7c673cae | 844 | } |
7c673cae FG |
845 | } |
846 | ||
9f95a23c TL |
847 | if (m_local_rados) { |
848 | auto cct = reinterpret_cast<CephContext *>(m_local_rados->cct()); | |
849 | f->dump_string("local_cluster_admin_socket", | |
850 | cct->_conf.get_val<std::string>("admin_socket")); | |
851 | } | |
852 | if (m_remote_rados) { | |
853 | auto cct = reinterpret_cast<CephContext *>(m_remote_rados->cct()); | |
854 | f->dump_string("remote_cluster_admin_socket", | |
855 | cct->_conf.get_val<std::string>("admin_socket")); | |
856 | } | |
857 | ||
858 | if (m_image_sync_throttler) { | |
859 | f->open_object_section("sync_throttler"); | |
860 | m_image_sync_throttler->print_status(f); | |
861 | f->close_section(); // sync_throttler | |
862 | } | |
7c673cae | 863 | |
9f95a23c TL |
864 | if (m_image_deletion_throttler) { |
865 | f->open_object_section("deletion_throttler"); | |
866 | m_image_deletion_throttler->print_status(f); | |
867 | f->close_section(); // deletion_throttler | |
868 | } | |
31f18b77 | 869 | |
9f95a23c TL |
870 | if (m_default_namespace_replayer) { |
871 | m_default_namespace_replayer->print_status(f); | |
872 | } | |
7c673cae | 873 | |
9f95a23c TL |
874 | f->open_array_section("namespaces"); |
875 | for (auto &it : m_namespace_replayers) { | |
876 | f->open_object_section("namespace"); | |
877 | f->dump_string("name", it.first); | |
878 | it.second->print_status(f); | |
879 | f->close_section(); // namespace | |
11fdf7f2 | 880 | } |
9f95a23c | 881 | f->close_section(); // namespaces |
11fdf7f2 | 882 | |
9f95a23c | 883 | f->close_section(); // pool_replayer_status |
7c673cae FG |
884 | } |
885 | ||
11fdf7f2 | 886 | template <typename I> |
9f95a23c TL |
887 | void PoolReplayer<I>::start() { |
888 | dout(20) << dendl; | |
7c673cae | 889 | |
9f95a23c | 890 | std::lock_guard l{m_lock}; |
7c673cae FG |
891 | |
892 | if (m_stopping) { | |
893 | return; | |
894 | } | |
895 | ||
11fdf7f2 | 896 | m_manual_stop = false; |
9f95a23c TL |
897 | |
898 | if (m_default_namespace_replayer) { | |
899 | m_default_namespace_replayer->start(); | |
900 | } | |
901 | for (auto &it : m_namespace_replayers) { | |
902 | it.second->start(); | |
903 | } | |
7c673cae FG |
904 | } |
905 | ||
11fdf7f2 | 906 | template <typename I> |
9f95a23c | 907 | void PoolReplayer<I>::stop(bool manual) { |
7c673cae FG |
908 | dout(20) << "enter: manual=" << manual << dendl; |
909 | ||
9f95a23c | 910 | std::lock_guard l{m_lock}; |
7c673cae FG |
911 | if (!manual) { |
912 | m_stopping = true; | |
9f95a23c | 913 | m_cond.notify_all(); |
7c673cae FG |
914 | return; |
915 | } else if (m_stopping) { | |
916 | return; | |
917 | } | |
918 | ||
11fdf7f2 | 919 | m_manual_stop = true; |
9f95a23c TL |
920 | |
921 | if (m_default_namespace_replayer) { | |
922 | m_default_namespace_replayer->stop(); | |
923 | } | |
924 | for (auto &it : m_namespace_replayers) { | |
925 | it.second->stop(); | |
926 | } | |
7c673cae FG |
927 | } |
928 | ||
11fdf7f2 | 929 | template <typename I> |
9f95a23c TL |
930 | void PoolReplayer<I>::restart() { |
931 | dout(20) << dendl; | |
7c673cae | 932 | |
9f95a23c | 933 | std::lock_guard l{m_lock}; |
7c673cae FG |
934 | |
935 | if (m_stopping) { | |
936 | return; | |
937 | } | |
938 | ||
9f95a23c TL |
939 | if (m_default_namespace_replayer) { |
940 | m_default_namespace_replayer->restart(); | |
941 | } | |
942 | for (auto &it : m_namespace_replayers) { | |
943 | it.second->restart(); | |
944 | } | |
7c673cae FG |
945 | } |
946 | ||
11fdf7f2 | 947 | template <typename I> |
9f95a23c TL |
948 | void PoolReplayer<I>::flush() { |
949 | dout(20) << dendl; | |
7c673cae | 950 | |
9f95a23c | 951 | std::lock_guard l{m_lock}; |
7c673cae FG |
952 | |
953 | if (m_stopping || m_manual_stop) { | |
954 | return; | |
955 | } | |
956 | ||
9f95a23c TL |
957 | if (m_default_namespace_replayer) { |
958 | m_default_namespace_replayer->flush(); | |
959 | } | |
960 | for (auto &it : m_namespace_replayers) { | |
961 | it.second->flush(); | |
962 | } | |
7c673cae FG |
963 | } |
964 | ||
11fdf7f2 | 965 | template <typename I> |
9f95a23c TL |
966 | void PoolReplayer<I>::release_leader() { |
967 | dout(20) << dendl; | |
7c673cae | 968 | |
9f95a23c | 969 | std::lock_guard l{m_lock}; |
7c673cae FG |
970 | |
971 | if (m_stopping || !m_leader_watcher) { | |
972 | return; | |
973 | } | |
974 | ||
975 | m_leader_watcher->release_leader(); | |
976 | } | |
977 | ||
11fdf7f2 TL |
978 | template <typename I> |
979 | void PoolReplayer<I>::handle_post_acquire_leader(Context *on_finish) { | |
9f95a23c TL |
980 | dout(20) << dendl; |
981 | ||
982 | with_namespace_replayers( | |
983 | [this](Context *on_finish) { | |
984 | dout(10) << "handle_post_acquire_leader" << dendl; | |
985 | ||
986 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
987 | ||
988 | m_service_daemon->add_or_update_attribute(m_local_pool_id, | |
989 | SERVICE_DAEMON_LEADER_KEY, | |
990 | true); | |
991 | auto ctx = new LambdaContext( | |
992 | [this, on_finish](int r) { | |
993 | if (r == 0) { | |
994 | std::lock_guard locker{m_lock}; | |
995 | m_leader = true; | |
996 | } | |
997 | on_finish->complete(r); | |
998 | }); | |
999 | ||
1000 | auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct()); | |
1001 | auto gather_ctx = new C_Gather(cct, ctx); | |
1002 | ||
1003 | m_default_namespace_replayer->handle_acquire_leader( | |
1004 | gather_ctx->new_sub()); | |
1005 | ||
1006 | for (auto &it : m_namespace_replayers) { | |
1007 | namespace_replayer_acquire_leader(it.first, gather_ctx->new_sub()); | |
1008 | } | |
1009 | ||
1010 | gather_ctx->activate(); | |
1011 | }, on_finish); | |
7c673cae FG |
1012 | } |
1013 | ||
11fdf7f2 TL |
1014 | template <typename I> |
1015 | void PoolReplayer<I>::handle_pre_release_leader(Context *on_finish) { | |
9f95a23c | 1016 | dout(20) << dendl; |
11fdf7f2 | 1017 | |
9f95a23c TL |
1018 | with_namespace_replayers( |
1019 | [this](Context *on_finish) { | |
1020 | dout(10) << "handle_pre_release_leader" << dendl; | |
7c673cae | 1021 | |
9f95a23c | 1022 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae | 1023 | |
9f95a23c TL |
1024 | m_leader = false; |
1025 | m_service_daemon->remove_attribute(m_local_pool_id, | |
1026 | SERVICE_DAEMON_LEADER_KEY); | |
7c673cae | 1027 | |
9f95a23c TL |
1028 | auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct()); |
1029 | auto gather_ctx = new C_Gather(cct, on_finish); | |
11fdf7f2 | 1030 | |
9f95a23c TL |
1031 | m_default_namespace_replayer->handle_release_leader( |
1032 | gather_ctx->new_sub()); | |
11fdf7f2 | 1033 | |
9f95a23c TL |
1034 | for (auto &it : m_namespace_replayers) { |
1035 | it.second->handle_release_leader(gather_ctx->new_sub()); | |
1036 | } | |
11fdf7f2 | 1037 | |
9f95a23c TL |
1038 | gather_ctx->activate(); |
1039 | }, on_finish); | |
11fdf7f2 | 1040 | } |
7c673cae | 1041 | |
11fdf7f2 | 1042 | template <typename I> |
9f95a23c TL |
1043 | void PoolReplayer<I>::handle_update_leader( |
1044 | const std::string &leader_instance_id) { | |
1045 | dout(10) << "leader_instance_id=" << leader_instance_id << dendl; | |
7c673cae | 1046 | |
9f95a23c | 1047 | std::lock_guard locker{m_lock}; |
11fdf7f2 | 1048 | |
9f95a23c | 1049 | m_default_namespace_replayer->handle_update_leader(leader_instance_id); |
7c673cae | 1050 | |
9f95a23c TL |
1051 | for (auto &it : m_namespace_replayers) { |
1052 | it.second->handle_update_leader(leader_instance_id); | |
11fdf7f2 | 1053 | } |
11fdf7f2 TL |
1054 | } |
1055 | ||
1056 | template <typename I> | |
9f95a23c TL |
1057 | void PoolReplayer<I>::handle_instances_added( |
1058 | const std::vector<std::string> &instance_ids) { | |
1059 | dout(5) << "instance_ids=" << instance_ids << dendl; | |
11fdf7f2 | 1060 | |
9f95a23c TL |
1061 | std::lock_guard locker{m_lock}; |
1062 | if (!m_leader_watcher->is_leader()) { | |
1063 | return; | |
7c673cae FG |
1064 | } |
1065 | ||
9f95a23c | 1066 | m_default_namespace_replayer->handle_instances_added(instance_ids); |
7c673cae | 1067 | |
9f95a23c TL |
1068 | for (auto &it : m_namespace_replayers) { |
1069 | it.second->handle_instances_added(instance_ids); | |
7c673cae | 1070 | } |
11fdf7f2 TL |
1071 | } |
1072 | ||
1073 | template <typename I> | |
9f95a23c TL |
1074 | void PoolReplayer<I>::handle_instances_removed( |
1075 | const std::vector<std::string> &instance_ids) { | |
1076 | dout(5) << "instance_ids=" << instance_ids << dendl; | |
11fdf7f2 | 1077 | |
9f95a23c TL |
1078 | std::lock_guard locker{m_lock}; |
1079 | if (!m_leader_watcher->is_leader()) { | |
1080 | return; | |
11fdf7f2 TL |
1081 | } |
1082 | ||
9f95a23c | 1083 | m_default_namespace_replayer->handle_instances_removed(instance_ids); |
11fdf7f2 | 1084 | |
9f95a23c TL |
1085 | for (auto &it : m_namespace_replayers) { |
1086 | it.second->handle_instances_removed(instance_ids); | |
11fdf7f2 | 1087 | } |
11fdf7f2 TL |
1088 | } |
1089 | ||
1090 | template <typename I> | |
9f95a23c TL |
1091 | void PoolReplayer<I>::handle_remote_pool_meta_updated( |
1092 | const RemotePoolMeta& remote_pool_meta) { | |
1093 | dout(5) << "remote_pool_meta=" << remote_pool_meta << dendl; | |
11fdf7f2 | 1094 | |
9f95a23c TL |
1095 | if (!m_default_namespace_replayer) { |
1096 | m_remote_pool_meta = remote_pool_meta; | |
11fdf7f2 TL |
1097 | return; |
1098 | } | |
1099 | ||
9f95a23c TL |
1100 | derr << "remote pool metadata updated unexpectedly" << dendl; |
1101 | std::unique_lock locker{m_lock}; | |
1102 | m_stopping = true; | |
1103 | m_cond.notify_all(); | |
11fdf7f2 TL |
1104 | } |
1105 | ||
7c673cae FG |
1106 | } // namespace mirror |
1107 | } // namespace rbd | |
11fdf7f2 TL |
1108 | |
1109 | template class rbd::mirror::PoolReplayer<librbd::ImageCtx>; |