]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/PoolReplayer.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "PoolReplayer.h"
9f95a23c 5#include "common/Cond.h"
7c673cae
FG
6#include "common/Formatter.h"
7#include "common/admin_socket.h"
8#include "common/ceph_argparse.h"
9#include "common/code_environment.h"
10#include "common/common_init.h"
11#include "common/debug.h"
12#include "common/errno.h"
7c673cae
FG
13#include "cls/rbd/cls_rbd_client.h"
14#include "global/global_context.h"
11fdf7f2 15#include "librbd/api/Config.h"
9f95a23c
TL
16#include "librbd/api/Namespace.h"
17#include "PoolMetaCache.h"
18#include "RemotePoolPoller.h"
c07f9fc5 19#include "ServiceDaemon.h"
7c673cae
FG
20#include "Threads.h"
21
22#define dout_context g_ceph_context
23#define dout_subsys ceph_subsys_rbd_mirror
24#undef dout_prefix
25#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
26 << this << " " << __func__ << ": "
27
7c673cae
FG
28namespace rbd {
29namespace mirror {
30
11fdf7f2
TL
31using ::operator<<;
32
7c673cae
FG
33namespace {
34
11fdf7f2 35const std::string SERVICE_DAEMON_INSTANCE_ID_KEY("instance_id");
c07f9fc5 36const std::string SERVICE_DAEMON_LEADER_KEY("leader");
c07f9fc5 37
3efd9988
FG
38const std::vector<std::string> UNIQUE_PEER_CONFIG_KEYS {
39 {"monmap", "mon_host", "mon_dns_srv_name", "key", "keyfile", "keyring"}};
40
11fdf7f2 41template <typename I>
7c673cae
FG
42class PoolReplayerAdminSocketCommand {
43public:
11fdf7f2 44 PoolReplayerAdminSocketCommand(PoolReplayer<I> *pool_replayer)
7c673cae
FG
45 : pool_replayer(pool_replayer) {
46 }
47 virtual ~PoolReplayerAdminSocketCommand() {}
9f95a23c 48 virtual int call(Formatter *f) = 0;
7c673cae 49protected:
11fdf7f2 50 PoolReplayer<I> *pool_replayer;
7c673cae
FG
51};
52
11fdf7f2
TL
53template <typename I>
54class StatusCommand : public PoolReplayerAdminSocketCommand<I> {
7c673cae 55public:
11fdf7f2
TL
56 explicit StatusCommand(PoolReplayer<I> *pool_replayer)
57 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
7c673cae
FG
58 }
59
9f95a23c
TL
60 int call(Formatter *f) override {
61 this->pool_replayer->print_status(f);
62 return 0;
7c673cae
FG
63 }
64};
65
11fdf7f2
TL
66template <typename I>
67class StartCommand : public PoolReplayerAdminSocketCommand<I> {
7c673cae 68public:
11fdf7f2
TL
69 explicit StartCommand(PoolReplayer<I> *pool_replayer)
70 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
7c673cae
FG
71 }
72
9f95a23c 73 int call(Formatter *f) override {
11fdf7f2 74 this->pool_replayer->start();
9f95a23c 75 return 0;
7c673cae
FG
76 }
77};
78
11fdf7f2
TL
79template <typename I>
80class StopCommand : public PoolReplayerAdminSocketCommand<I> {
7c673cae 81public:
11fdf7f2
TL
82 explicit StopCommand(PoolReplayer<I> *pool_replayer)
83 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
7c673cae
FG
84 }
85
9f95a23c 86 int call(Formatter *f) override {
11fdf7f2 87 this->pool_replayer->stop(true);
9f95a23c 88 return 0;
7c673cae
FG
89 }
90};
91
11fdf7f2
TL
92template <typename I>
93class RestartCommand : public PoolReplayerAdminSocketCommand<I> {
7c673cae 94public:
11fdf7f2
TL
95 explicit RestartCommand(PoolReplayer<I> *pool_replayer)
96 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
7c673cae
FG
97 }
98
9f95a23c 99 int call(Formatter *f) override {
11fdf7f2 100 this->pool_replayer->restart();
9f95a23c 101 return 0;
7c673cae
FG
102 }
103};
104
11fdf7f2
TL
105template <typename I>
106class FlushCommand : public PoolReplayerAdminSocketCommand<I> {
7c673cae 107public:
11fdf7f2
TL
108 explicit FlushCommand(PoolReplayer<I> *pool_replayer)
109 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
7c673cae
FG
110 }
111
9f95a23c 112 int call(Formatter *f) override {
11fdf7f2 113 this->pool_replayer->flush();
9f95a23c 114 return 0;
7c673cae
FG
115 }
116};
117
11fdf7f2
TL
118template <typename I>
119class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand<I> {
7c673cae 120public:
11fdf7f2
TL
121 explicit LeaderReleaseCommand(PoolReplayer<I> *pool_replayer)
122 : PoolReplayerAdminSocketCommand<I>(pool_replayer) {
7c673cae
FG
123 }
124
9f95a23c 125 int call(Formatter *f) override {
11fdf7f2 126 this->pool_replayer->release_leader();
9f95a23c 127 return 0;
7c673cae
FG
128 }
129};
130
11fdf7f2 131template <typename I>
7c673cae
FG
132class PoolReplayerAdminSocketHook : public AdminSocketHook {
133public:
134 PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
11fdf7f2 135 PoolReplayer<I> *pool_replayer)
7c673cae
FG
136 : admin_socket(cct->get_admin_socket()) {
137 std::string command;
138 int r;
139
140 command = "rbd mirror status " + name;
9f95a23c 141 r = admin_socket->register_command(command, this,
7c673cae
FG
142 "get status for rbd mirror " + name);
143 if (r == 0) {
11fdf7f2 144 commands[command] = new StatusCommand<I>(pool_replayer);
7c673cae
FG
145 }
146
147 command = "rbd mirror start " + name;
9f95a23c 148 r = admin_socket->register_command(command, this,
7c673cae
FG
149 "start rbd mirror " + name);
150 if (r == 0) {
11fdf7f2 151 commands[command] = new StartCommand<I>(pool_replayer);
7c673cae
FG
152 }
153
154 command = "rbd mirror stop " + name;
9f95a23c 155 r = admin_socket->register_command(command, this,
7c673cae
FG
156 "stop rbd mirror " + name);
157 if (r == 0) {
11fdf7f2 158 commands[command] = new StopCommand<I>(pool_replayer);
7c673cae
FG
159 }
160
161 command = "rbd mirror restart " + name;
9f95a23c 162 r = admin_socket->register_command(command, this,
7c673cae
FG
163 "restart rbd mirror " + name);
164 if (r == 0) {
11fdf7f2 165 commands[command] = new RestartCommand<I>(pool_replayer);
7c673cae
FG
166 }
167
168 command = "rbd mirror flush " + name;
9f95a23c 169 r = admin_socket->register_command(command, this,
7c673cae
FG
170 "flush rbd mirror " + name);
171 if (r == 0) {
11fdf7f2 172 commands[command] = new FlushCommand<I>(pool_replayer);
7c673cae
FG
173 }
174
175 command = "rbd mirror leader release " + name;
9f95a23c 176 r = admin_socket->register_command(command, this,
7c673cae
FG
177 "release rbd mirror leader " + name);
178 if (r == 0) {
11fdf7f2 179 commands[command] = new LeaderReleaseCommand<I>(pool_replayer);
7c673cae
FG
180 }
181 }
182
183 ~PoolReplayerAdminSocketHook() override {
9f95a23c 184 (void)admin_socket->unregister_commands(this);
11fdf7f2 185 for (auto i = commands.begin(); i != commands.end(); ++i) {
7c673cae
FG
186 delete i->second;
187 }
188 }
189
9f95a23c
TL
190 int call(std::string_view command, const cmdmap_t& cmdmap,
191 Formatter *f,
192 std::ostream& ss,
193 bufferlist& out) override {
11fdf7f2
TL
194 auto i = commands.find(command);
195 ceph_assert(i != commands.end());
9f95a23c 196 return i->second->call(f);
7c673cae
FG
197 }
198
199private:
11fdf7f2
TL
200 typedef std::map<std::string, PoolReplayerAdminSocketCommand<I>*,
201 std::less<>> Commands;
7c673cae
FG
202
203 AdminSocket *admin_socket;
204 Commands commands;
205};
206
207} // anonymous namespace
208
11fdf7f2 209template <typename I>
9f95a23c
TL
210struct PoolReplayer<I>::RemotePoolPollerListener
211 : public remote_pool_poller::Listener {
212
213 PoolReplayer<I>* m_pool_replayer;
214
215 RemotePoolPollerListener(PoolReplayer<I>* pool_replayer)
216 : m_pool_replayer(pool_replayer) {
217 }
218
219 void handle_updated(const RemotePoolMeta& remote_pool_meta) override {
220 m_pool_replayer->handle_remote_pool_meta_updated(remote_pool_meta);
221 }
222};
223
224template <typename I>
225PoolReplayer<I>::PoolReplayer(
226 Threads<I> *threads, ServiceDaemon<I> *service_daemon,
227 journal::CacheManagerHandler *cache_manager_handler,
228 PoolMetaCache* pool_meta_cache, int64_t local_pool_id,
229 const PeerSpec &peer, const std::vector<const char*> &args) :
7c673cae 230 m_threads(threads),
c07f9fc5 231 m_service_daemon(service_daemon),
9f95a23c
TL
232 m_cache_manager_handler(cache_manager_handler),
233 m_pool_meta_cache(pool_meta_cache),
c07f9fc5 234 m_local_pool_id(local_pool_id),
7c673cae
FG
235 m_peer(peer),
236 m_args(args),
9f95a23c 237 m_lock(ceph::make_mutex("rbd::mirror::PoolReplayer " + stringify(peer))),
7c673cae 238 m_pool_replayer_thread(this),
9f95a23c 239 m_leader_listener(this) {
7c673cae
FG
240}
241
11fdf7f2
TL
242template <typename I>
243PoolReplayer<I>::~PoolReplayer()
7c673cae 244{
c07f9fc5 245 shut_down();
9f95a23c
TL
246
247 ceph_assert(m_asok_hook == nullptr);
7c673cae
FG
248}
249
11fdf7f2 250template <typename I>
f67539c2 251bool PoolReplayer<I>::is_blocklisted() const {
9f95a23c 252 std::lock_guard locker{m_lock};
f67539c2 253 return m_blocklisted;
7c673cae
FG
254}
255
11fdf7f2
TL
256template <typename I>
257bool PoolReplayer<I>::is_leader() const {
9f95a23c 258 std::lock_guard locker{m_lock};
7c673cae
FG
259 return m_leader_watcher && m_leader_watcher->is_leader();
260}
261
11fdf7f2
TL
262template <typename I>
263bool PoolReplayer<I>::is_running() const {
20effc67 264 return m_pool_replayer_thread.is_started() && !m_stopping;
c07f9fc5
FG
265}
266
11fdf7f2 267template <typename I>
9f95a23c
TL
268void PoolReplayer<I>::init(const std::string& site_name) {
269 std::lock_guard locker{m_lock};
92f5a8d4 270
11fdf7f2 271 ceph_assert(!m_pool_replayer_thread.is_started());
c07f9fc5
FG
272
273 // reset state
274 m_stopping = false;
f67539c2 275 m_blocklisted = false;
9f95a23c 276 m_site_name = site_name;
7c673cae 277
11fdf7f2 278 dout(10) << "replaying for " << m_peer << dendl;
7c673cae
FG
279 int r = init_rados(g_ceph_context->_conf->cluster,
280 g_ceph_context->_conf->name.to_str(),
11fdf7f2 281 "", "", "local cluster", &m_local_rados, false);
7c673cae 282 if (r < 0) {
c07f9fc5
FG
283 m_callout_id = m_service_daemon->add_or_update_callout(
284 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
285 "unable to connect to local cluster");
286 return;
7c673cae
FG
287 }
288
289 r = init_rados(m_peer.cluster_name, m_peer.client_name,
11fdf7f2 290 m_peer.mon_host, m_peer.key,
7c673cae 291 std::string("remote peer ") + stringify(m_peer),
3efd9988 292 &m_remote_rados, true);
7c673cae 293 if (r < 0) {
c07f9fc5
FG
294 m_callout_id = m_service_daemon->add_or_update_callout(
295 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
296 "unable to connect to remote cluster");
297 return;
7c673cae
FG
298 }
299
300 r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
301 if (r < 0) {
302 derr << "error accessing local pool " << m_local_pool_id << ": "
303 << cpp_strerror(r) << dendl;
c07f9fc5 304 return;
7c673cae
FG
305 }
306
11fdf7f2
TL
307 auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
308 librbd::api::Config<I>::apply_pool_overrides(m_local_io_ctx, &cct->_conf);
309
7c673cae 310 r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
9f95a23c 311 &m_local_mirror_uuid);
7c673cae
FG
312 if (r < 0) {
313 derr << "failed to retrieve local mirror uuid from pool "
314 << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
c07f9fc5
FG
315 m_callout_id = m_service_daemon->add_or_update_callout(
316 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
317 "unable to query local mirror uuid");
318 return;
7c673cae
FG
319 }
320
321 r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
322 m_remote_io_ctx);
323 if (r < 0) {
324 derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
325 << ": " << cpp_strerror(r) << dendl;
c07f9fc5
FG
326 m_callout_id = m_service_daemon->add_or_update_callout(
327 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_WARNING,
328 "unable to access remote pool");
329 return;
7c673cae
FG
330 }
331
11fdf7f2 332 dout(10) << "connected to " << m_peer << dendl;
7c673cae 333
9f95a23c
TL
334 m_image_sync_throttler.reset(
335 Throttler<I>::create(cct, "rbd_mirror_concurrent_image_syncs"));
336
337 m_image_deletion_throttler.reset(
338 Throttler<I>::create(cct, "rbd_mirror_concurrent_image_deletions"));
7c673cae 339
9f95a23c
TL
340 m_remote_pool_poller_listener.reset(new RemotePoolPollerListener(this));
341 m_remote_pool_poller.reset(RemotePoolPoller<I>::create(
342 m_threads, m_remote_io_ctx, m_site_name, m_local_mirror_uuid,
343 *m_remote_pool_poller_listener));
344
345 C_SaferCond on_pool_poller_init;
346 m_remote_pool_poller->init(&on_pool_poller_init);
347 r = on_pool_poller_init.wait();
7c673cae 348 if (r < 0) {
9f95a23c
TL
349 derr << "failed to initialize remote pool poller: " << cpp_strerror(r)
350 << dendl;
c07f9fc5
FG
351 m_callout_id = m_service_daemon->add_or_update_callout(
352 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
9f95a23c
TL
353 "unable to initialize remote pool poller");
354 m_remote_pool_poller.reset();
355 return;
356 }
357 ceph_assert(!m_remote_pool_meta.mirror_uuid.empty());
358 m_pool_meta_cache->set_remote_pool_meta(
359 m_remote_io_ctx.get_id(), m_remote_pool_meta);
360 m_pool_meta_cache->set_local_pool_meta(
361 m_local_io_ctx.get_id(), {m_local_mirror_uuid});
362
363 m_default_namespace_replayer.reset(NamespaceReplayer<I>::create(
364 "", m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
365 m_remote_pool_meta, m_threads, m_image_sync_throttler.get(),
366 m_image_deletion_throttler.get(), m_service_daemon,
367 m_cache_manager_handler, m_pool_meta_cache));
368
369 C_SaferCond on_init;
370 m_default_namespace_replayer->init(&on_init);
371 r = on_init.wait();
372 if (r < 0) {
373 derr << "error initializing default namespace replayer: " << cpp_strerror(r)
374 << dendl;
375 m_callout_id = m_service_daemon->add_or_update_callout(
376 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
377 "unable to initialize default namespace replayer");
378 m_default_namespace_replayer.reset();
c07f9fc5 379 return;
7c673cae
FG
380 }
381
11fdf7f2
TL
382 m_leader_watcher.reset(LeaderWatcher<I>::create(m_threads, m_local_io_ctx,
383 &m_leader_listener));
7c673cae
FG
384 r = m_leader_watcher->init();
385 if (r < 0) {
386 derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
c07f9fc5
FG
387 m_callout_id = m_service_daemon->add_or_update_callout(
388 m_local_pool_id, m_callout_id, service_daemon::CALLOUT_LEVEL_ERROR,
389 "unable to initialize leader messenger object");
9f95a23c 390 m_leader_watcher.reset();
c07f9fc5
FG
391 return;
392 }
393
394 if (m_callout_id != service_daemon::CALLOUT_ID_NONE) {
395 m_service_daemon->remove_callout(m_local_pool_id, m_callout_id);
396 m_callout_id = service_daemon::CALLOUT_ID_NONE;
7c673cae
FG
397 }
398
9f95a23c
TL
399 m_service_daemon->add_or_update_attribute(
400 m_local_io_ctx.get_id(), SERVICE_DAEMON_INSTANCE_ID_KEY,
401 stringify(m_local_io_ctx.get_instance_id()));
402
7c673cae 403 m_pool_replayer_thread.create("pool replayer");
c07f9fc5 404}
7c673cae 405
11fdf7f2
TL
406template <typename I>
407void PoolReplayer<I>::shut_down() {
c07f9fc5 408 {
9f95a23c
TL
409 std::lock_guard l{m_lock};
410 m_stopping = true;
411 m_cond.notify_all();
c07f9fc5
FG
412 }
413 if (m_pool_replayer_thread.is_started()) {
414 m_pool_replayer_thread.join();
415 }
9f95a23c 416
c07f9fc5
FG
417 if (m_leader_watcher) {
418 m_leader_watcher->shut_down();
419 }
9f95a23c
TL
420 m_leader_watcher.reset();
421
422 if (m_default_namespace_replayer) {
423 C_SaferCond on_shut_down;
424 m_default_namespace_replayer->shut_down(&on_shut_down);
425 on_shut_down.wait();
c07f9fc5 426 }
9f95a23c
TL
427 m_default_namespace_replayer.reset();
428
429 if (m_remote_pool_poller) {
430 C_SaferCond ctx;
431 m_remote_pool_poller->shut_down(&ctx);
432 ctx.wait();
433
434 m_pool_meta_cache->remove_remote_pool_meta(m_remote_io_ctx.get_id());
435 m_pool_meta_cache->remove_local_pool_meta(m_local_io_ctx.get_id());
c07f9fc5 436 }
9f95a23c
TL
437 m_remote_pool_poller.reset();
438 m_remote_pool_poller_listener.reset();
c07f9fc5 439
9f95a23c
TL
440 m_image_sync_throttler.reset();
441 m_image_deletion_throttler.reset();
28e407b8 442
c07f9fc5
FG
443 m_local_rados.reset();
444 m_remote_rados.reset();
7c673cae
FG
445}
446
11fdf7f2
TL
447template <typename I>
448int PoolReplayer<I>::init_rados(const std::string &cluster_name,
449 const std::string &client_name,
450 const std::string &mon_host,
451 const std::string &key,
452 const std::string &description,
453 RadosRef *rados_ref,
454 bool strip_cluster_overrides) {
7c673cae
FG
455 // NOTE: manually bootstrap a CephContext here instead of via
456 // the librados API to avoid mixing global singletons between
457 // the librados shared library and the daemon
458 // TODO: eliminate intermingling of global singletons within Ceph APIs
459 CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
460 if (client_name.empty() || !iparams.name.from_str(client_name)) {
461 derr << "error initializing cluster handle for " << description << dendl;
462 return -EINVAL;
463 }
464
465 CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
466 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
467 cct->_conf->cluster = cluster_name;
468
469 // librados::Rados::conf_read_file
11fdf7f2
TL
470 int r = cct->_conf.parse_config_files(nullptr, nullptr, 0);
471 if (r < 0 && r != -ENOENT) {
eafe8130 472 // do not treat this as fatal, it might still be able to connect
7c673cae
FG
473 derr << "could not read ceph conf for " << description << ": "
474 << cpp_strerror(r) << dendl;
7c673cae 475 }
3efd9988
FG
476
477 // preserve cluster-specific config settings before applying environment/cli
478 // overrides
479 std::map<std::string, std::string> config_values;
480 if (strip_cluster_overrides) {
481 // remote peer connections shouldn't apply cluster-specific
482 // configuration settings
483 for (auto& key : UNIQUE_PEER_CONFIG_KEYS) {
11fdf7f2 484 config_values[key] = cct->_conf.get_val<std::string>(key);
3efd9988
FG
485 }
486 }
487
11fdf7f2 488 cct->_conf.parse_env(cct->get_module_type());
7c673cae
FG
489
490 // librados::Rados::conf_parse_env
491 std::vector<const char*> args;
11fdf7f2 492 r = cct->_conf.parse_argv(args);
7c673cae
FG
493 if (r < 0) {
494 derr << "could not parse environment for " << description << ":"
495 << cpp_strerror(r) << dendl;
496 cct->put();
497 return r;
498 }
11fdf7f2 499 cct->_conf.parse_env(cct->get_module_type());
7c673cae
FG
500
501 if (!m_args.empty()) {
502 // librados::Rados::conf_parse_argv
503 args = m_args;
11fdf7f2 504 r = cct->_conf.parse_argv(args);
7c673cae
FG
505 if (r < 0) {
506 derr << "could not parse command line args for " << description << ": "
507 << cpp_strerror(r) << dendl;
508 cct->put();
509 return r;
510 }
511 }
512
3efd9988
FG
513 if (strip_cluster_overrides) {
514 // remote peer connections shouldn't apply cluster-specific
515 // configuration settings
516 for (auto& pair : config_values) {
11fdf7f2 517 auto value = cct->_conf.get_val<std::string>(pair.first);
3efd9988
FG
518 if (pair.second != value) {
519 dout(0) << "reverting global config option override: "
520 << pair.first << ": " << value << " -> " << pair.second
521 << dendl;
11fdf7f2 522 cct->_conf.set_val_or_die(pair.first, pair.second);
3efd9988
FG
523 }
524 }
525 }
526
7c673cae 527 if (!g_ceph_context->_conf->admin_socket.empty()) {
11fdf7f2 528 cct->_conf.set_val_or_die("admin_socket",
7c673cae
FG
529 "$run_dir/$name.$pid.$cluster.$cctid.asok");
530 }
531
11fdf7f2
TL
532 if (!mon_host.empty()) {
533 r = cct->_conf.set_val("mon_host", mon_host);
534 if (r < 0) {
535 derr << "failed to set mon_host config for " << description << ": "
536 << cpp_strerror(r) << dendl;
537 cct->put();
538 return r;
539 }
540 }
541
542 if (!key.empty()) {
543 r = cct->_conf.set_val("key", key);
544 if (r < 0) {
545 derr << "failed to set key config for " << description << ": "
546 << cpp_strerror(r) << dendl;
547 cct->put();
548 return r;
549 }
550 }
551
7c673cae 552 // disable unnecessary librbd cache
11fdf7f2
TL
553 cct->_conf.set_val_or_die("rbd_cache", "false");
554 cct->_conf.apply_changes(nullptr);
9f95a23c 555 cct->_conf.complain_about_parse_error(cct);
7c673cae 556
92f5a8d4
TL
557 rados_ref->reset(new librados::Rados());
558
7c673cae 559 r = (*rados_ref)->init_with_context(cct);
11fdf7f2 560 ceph_assert(r == 0);
7c673cae
FG
561 cct->put();
562
563 r = (*rados_ref)->connect();
564 if (r < 0) {
565 derr << "error connecting to " << description << ": "
566 << cpp_strerror(r) << dendl;
567 return r;
568 }
569
570 return 0;
571}
572
11fdf7f2 573template <typename I>
9f95a23c
TL
574void PoolReplayer<I>::run() {
575 dout(20) << dendl;
7c673cae 576
9f95a23c 577 while (true) {
7c673cae
FG
578 std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
579 m_peer.cluster_name;
580 if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
581 m_asok_hook_name = asok_hook_name;
582 delete m_asok_hook;
583
11fdf7f2
TL
584 m_asok_hook = new PoolReplayerAdminSocketHook<I>(g_ceph_context,
585 m_asok_hook_name, this);
7c673cae
FG
586 }
587
9f95a23c
TL
588 with_namespace_replayers([this]() { update_namespace_replayers(); });
589
590 std::unique_lock locker{m_lock};
591
f67539c2
TL
592 if (m_leader_watcher->is_blocklisted() ||
593 m_default_namespace_replayer->is_blocklisted()) {
594 m_blocklisted = true;
7c673cae 595 m_stopping = true;
9f95a23c
TL
596 }
597
598 for (auto &it : m_namespace_replayers) {
f67539c2
TL
599 if (it.second->is_blocklisted()) {
600 m_blocklisted = true;
9f95a23c
TL
601 m_stopping = true;
602 break;
603 }
604 }
605
606 if (m_stopping) {
7c673cae
FG
607 break;
608 }
609
9f95a23c
TL
610 auto seconds = g_ceph_context->_conf.get_val<uint64_t>(
611 "rbd_mirror_pool_replayers_refresh_interval");
612 m_cond.wait_for(locker, ceph::make_timespan(seconds));
613 }
614
615 // shut down namespace replayers
616 with_namespace_replayers([this]() { update_namespace_replayers(); });
617
618 delete m_asok_hook;
619 m_asok_hook = nullptr;
620}
621
622template <typename I>
623void PoolReplayer<I>::update_namespace_replayers() {
624 dout(20) << dendl;
625
626 ceph_assert(ceph_mutex_is_locked(m_lock));
627
628 std::set<std::string> mirroring_namespaces;
629 if (!m_stopping) {
630 int r = list_mirroring_namespaces(&mirroring_namespaces);
631 if (r < 0) {
632 return;
633 }
634 }
635
636 auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
637 C_SaferCond cond;
638 auto gather_ctx = new C_Gather(cct, &cond);
639 for (auto it = m_namespace_replayers.begin();
640 it != m_namespace_replayers.end(); ) {
641 auto iter = mirroring_namespaces.find(it->first);
642 if (iter == mirroring_namespaces.end()) {
643 auto namespace_replayer = it->second;
644 auto on_shut_down = new LambdaContext(
645 [namespace_replayer, ctx=gather_ctx->new_sub()](int r) {
646 delete namespace_replayer;
647 ctx->complete(r);
648 });
649 m_service_daemon->remove_namespace(m_local_pool_id, it->first);
650 namespace_replayer->shut_down(on_shut_down);
651 it = m_namespace_replayers.erase(it);
652 } else {
653 mirroring_namespaces.erase(iter);
654 it++;
655 }
656 }
657
658 for (auto &name : mirroring_namespaces) {
659 auto namespace_replayer = NamespaceReplayer<I>::create(
660 name, m_local_io_ctx, m_remote_io_ctx, m_local_mirror_uuid, m_peer.uuid,
661 m_remote_pool_meta, m_threads, m_image_sync_throttler.get(),
662 m_image_deletion_throttler.get(), m_service_daemon,
663 m_cache_manager_handler, m_pool_meta_cache);
664 auto on_init = new LambdaContext(
665 [this, namespace_replayer, name, &mirroring_namespaces,
666 ctx=gather_ctx->new_sub()](int r) {
667 std::lock_guard locker{m_lock};
668 if (r < 0) {
669 derr << "failed to initialize namespace replayer for namespace "
670 << name << ": " << cpp_strerror(r) << dendl;
671 delete namespace_replayer;
672 mirroring_namespaces.erase(name);
673 } else {
674 m_namespace_replayers[name] = namespace_replayer;
675 m_service_daemon->add_namespace(m_local_pool_id, name);
676 }
677 ctx->complete(r);
678 });
679 namespace_replayer->init(on_init);
680 }
681
682 gather_ctx->activate();
683
684 m_lock.unlock();
685 cond.wait();
686 m_lock.lock();
687
688 if (m_leader) {
689 C_SaferCond acquire_cond;
690 auto acquire_gather_ctx = new C_Gather(cct, &acquire_cond);
691
692 for (auto &name : mirroring_namespaces) {
693 namespace_replayer_acquire_leader(name, acquire_gather_ctx->new_sub());
694 }
695 acquire_gather_ctx->activate();
696
697 m_lock.unlock();
698 acquire_cond.wait();
699 m_lock.lock();
700
701 std::vector<std::string> instance_ids;
702 m_leader_watcher->list_instances(&instance_ids);
703
704 for (auto &name : mirroring_namespaces) {
705 auto it = m_namespace_replayers.find(name);
706 if (it == m_namespace_replayers.end()) {
707 // acuire leader for this namespace replayer failed
708 continue;
709 }
710 it->second->handle_instances_added(instance_ids);
711 }
712 } else {
713 std::string leader_instance_id;
714 if (m_leader_watcher->get_leader_instance_id(&leader_instance_id)) {
715 for (auto &name : mirroring_namespaces) {
716 m_namespace_replayers[name]->handle_update_leader(leader_instance_id);
717 }
718 }
719 }
720}
721
722template <typename I>
723int PoolReplayer<I>::list_mirroring_namespaces(
724 std::set<std::string> *namespaces) {
725 ceph_assert(ceph_mutex_is_locked(m_lock));
726
727 std::vector<std::string> names;
728
729 int r = librbd::api::Namespace<I>::list(m_local_io_ctx, &names);
730 if (r < 0) {
731 derr << "failed to list namespaces: " << cpp_strerror(r) << dendl;
732 return r;
733 }
734
735 for (auto &name : names) {
736 cls::rbd::MirrorMode mirror_mode = cls::rbd::MIRROR_MODE_DISABLED;
737 int r = librbd::cls_client::mirror_mode_get(&m_local_io_ctx, &mirror_mode);
738 if (r < 0 && r != -ENOENT) {
739 derr << "failed to get namespace mirror mode: " << cpp_strerror(r)
740 << dendl;
741 if (m_namespace_replayers.count(name) == 0) {
742 continue;
743 }
744 } else if (mirror_mode == cls::rbd::MIRROR_MODE_DISABLED) {
745 dout(10) << "mirroring is disabled for namespace " << name << dendl;
746 continue;
d2e6a577 747 }
9f95a23c
TL
748
749 namespaces->insert(name);
7c673cae 750 }
28e407b8 751
9f95a23c 752 return 0;
7c673cae
FG
753}
754
92f5a8d4
TL
755template <typename I>
756void PoolReplayer<I>::reopen_logs()
757{
9f95a23c 758 std::lock_guard locker{m_lock};
92f5a8d4
TL
759
760 if (m_local_rados) {
761 reinterpret_cast<CephContext *>(m_local_rados->cct())->reopen_logs();
762 }
763 if (m_remote_rados) {
764 reinterpret_cast<CephContext *>(m_remote_rados->cct())->reopen_logs();
765 }
766}
767
11fdf7f2 768template <typename I>
9f95a23c
TL
769void PoolReplayer<I>::namespace_replayer_acquire_leader(const std::string &name,
770 Context *on_finish) {
771 ceph_assert(ceph_mutex_is_locked(m_lock));
772
773 auto it = m_namespace_replayers.find(name);
774 ceph_assert(it != m_namespace_replayers.end());
775
776 on_finish = new LambdaContext(
777 [this, name, on_finish](int r) {
778 if (r < 0) {
779 derr << "failed to handle acquire leader for namespace: "
780 << name << ": " << cpp_strerror(r) << dendl;
781
782 // remove the namespace replayer -- update_namespace_replayers will
783 // retry to create it and acquire leader.
784
785 std::lock_guard locker{m_lock};
786
787 auto namespace_replayer = m_namespace_replayers[name];
788 m_namespace_replayers.erase(name);
789 auto on_shut_down = new LambdaContext(
790 [namespace_replayer, on_finish](int r) {
791 delete namespace_replayer;
792 on_finish->complete(r);
793 });
794 m_service_daemon->remove_namespace(m_local_pool_id, name);
795 namespace_replayer->shut_down(on_shut_down);
796 return;
797 }
798 on_finish->complete(0);
799 });
7c673cae 800
9f95a23c
TL
801 it->second->handle_acquire_leader(on_finish);
802}
803
804template <typename I>
805void PoolReplayer<I>::print_status(Formatter *f) {
806 dout(20) << dendl;
7c673cae 807
9f95a23c
TL
808 assert(f);
809
810 std::lock_guard l{m_lock};
7c673cae
FG
811
812 f->open_object_section("pool_replayer_status");
7c673cae 813 f->dump_stream("peer") << m_peer;
9f95a23c
TL
814 if (m_local_io_ctx.is_valid()) {
815 f->dump_string("pool", m_local_io_ctx.get_pool_name());
816 f->dump_stream("instance_id") << m_local_io_ctx.get_instance_id();
817 }
7c673cae 818
11fdf7f2
TL
819 std::string state("running");
820 if (m_manual_stop) {
821 state = "stopped (manual)";
822 } else if (m_stopping) {
823 state = "stopped";
9f95a23c
TL
824 } else if (!is_running()) {
825 state = "error";
11fdf7f2
TL
826 }
827 f->dump_string("state", state);
828
9f95a23c
TL
829 if (m_leader_watcher) {
830 std::string leader_instance_id;
831 m_leader_watcher->get_leader_instance_id(&leader_instance_id);
832 f->dump_string("leader_instance_id", leader_instance_id);
833
834 bool leader = m_leader_watcher->is_leader();
835 f->dump_bool("leader", leader);
836 if (leader) {
837 std::vector<std::string> instance_ids;
838 m_leader_watcher->list_instances(&instance_ids);
839 f->open_array_section("instances");
840 for (auto instance_id : instance_ids) {
841 f->dump_string("instance_id", instance_id);
842 }
843 f->close_section(); // instances
7c673cae 844 }
7c673cae
FG
845 }
846
9f95a23c
TL
847 if (m_local_rados) {
848 auto cct = reinterpret_cast<CephContext *>(m_local_rados->cct());
849 f->dump_string("local_cluster_admin_socket",
850 cct->_conf.get_val<std::string>("admin_socket"));
851 }
852 if (m_remote_rados) {
853 auto cct = reinterpret_cast<CephContext *>(m_remote_rados->cct());
854 f->dump_string("remote_cluster_admin_socket",
855 cct->_conf.get_val<std::string>("admin_socket"));
856 }
857
858 if (m_image_sync_throttler) {
859 f->open_object_section("sync_throttler");
860 m_image_sync_throttler->print_status(f);
861 f->close_section(); // sync_throttler
862 }
7c673cae 863
9f95a23c
TL
864 if (m_image_deletion_throttler) {
865 f->open_object_section("deletion_throttler");
866 m_image_deletion_throttler->print_status(f);
867 f->close_section(); // deletion_throttler
868 }
31f18b77 869
9f95a23c
TL
870 if (m_default_namespace_replayer) {
871 m_default_namespace_replayer->print_status(f);
872 }
7c673cae 873
9f95a23c
TL
874 f->open_array_section("namespaces");
875 for (auto &it : m_namespace_replayers) {
876 f->open_object_section("namespace");
877 f->dump_string("name", it.first);
878 it.second->print_status(f);
879 f->close_section(); // namespace
11fdf7f2 880 }
9f95a23c 881 f->close_section(); // namespaces
11fdf7f2 882
9f95a23c 883 f->close_section(); // pool_replayer_status
7c673cae
FG
884}
885
11fdf7f2 886template <typename I>
9f95a23c
TL
887void PoolReplayer<I>::start() {
888 dout(20) << dendl;
7c673cae 889
9f95a23c 890 std::lock_guard l{m_lock};
7c673cae
FG
891
892 if (m_stopping) {
893 return;
894 }
895
11fdf7f2 896 m_manual_stop = false;
9f95a23c
TL
897
898 if (m_default_namespace_replayer) {
899 m_default_namespace_replayer->start();
900 }
901 for (auto &it : m_namespace_replayers) {
902 it.second->start();
903 }
7c673cae
FG
904}
905
11fdf7f2 906template <typename I>
9f95a23c 907void PoolReplayer<I>::stop(bool manual) {
7c673cae
FG
908 dout(20) << "enter: manual=" << manual << dendl;
909
9f95a23c 910 std::lock_guard l{m_lock};
7c673cae
FG
911 if (!manual) {
912 m_stopping = true;
9f95a23c 913 m_cond.notify_all();
7c673cae
FG
914 return;
915 } else if (m_stopping) {
916 return;
917 }
918
11fdf7f2 919 m_manual_stop = true;
9f95a23c
TL
920
921 if (m_default_namespace_replayer) {
922 m_default_namespace_replayer->stop();
923 }
924 for (auto &it : m_namespace_replayers) {
925 it.second->stop();
926 }
7c673cae
FG
927}
928
11fdf7f2 929template <typename I>
9f95a23c
TL
930void PoolReplayer<I>::restart() {
931 dout(20) << dendl;
7c673cae 932
9f95a23c 933 std::lock_guard l{m_lock};
7c673cae
FG
934
935 if (m_stopping) {
936 return;
937 }
938
9f95a23c
TL
939 if (m_default_namespace_replayer) {
940 m_default_namespace_replayer->restart();
941 }
942 for (auto &it : m_namespace_replayers) {
943 it.second->restart();
944 }
7c673cae
FG
945}
946
11fdf7f2 947template <typename I>
9f95a23c
TL
948void PoolReplayer<I>::flush() {
949 dout(20) << dendl;
7c673cae 950
9f95a23c 951 std::lock_guard l{m_lock};
7c673cae
FG
952
953 if (m_stopping || m_manual_stop) {
954 return;
955 }
956
9f95a23c
TL
957 if (m_default_namespace_replayer) {
958 m_default_namespace_replayer->flush();
959 }
960 for (auto &it : m_namespace_replayers) {
961 it.second->flush();
962 }
7c673cae
FG
963}
964
11fdf7f2 965template <typename I>
9f95a23c
TL
966void PoolReplayer<I>::release_leader() {
967 dout(20) << dendl;
7c673cae 968
9f95a23c 969 std::lock_guard l{m_lock};
7c673cae
FG
970
971 if (m_stopping || !m_leader_watcher) {
972 return;
973 }
974
975 m_leader_watcher->release_leader();
976}
977
11fdf7f2
TL
978template <typename I>
979void PoolReplayer<I>::handle_post_acquire_leader(Context *on_finish) {
9f95a23c
TL
980 dout(20) << dendl;
981
982 with_namespace_replayers(
983 [this](Context *on_finish) {
984 dout(10) << "handle_post_acquire_leader" << dendl;
985
986 ceph_assert(ceph_mutex_is_locked(m_lock));
987
988 m_service_daemon->add_or_update_attribute(m_local_pool_id,
989 SERVICE_DAEMON_LEADER_KEY,
990 true);
991 auto ctx = new LambdaContext(
992 [this, on_finish](int r) {
993 if (r == 0) {
994 std::lock_guard locker{m_lock};
995 m_leader = true;
996 }
997 on_finish->complete(r);
998 });
999
1000 auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
1001 auto gather_ctx = new C_Gather(cct, ctx);
1002
1003 m_default_namespace_replayer->handle_acquire_leader(
1004 gather_ctx->new_sub());
1005
1006 for (auto &it : m_namespace_replayers) {
1007 namespace_replayer_acquire_leader(it.first, gather_ctx->new_sub());
1008 }
1009
1010 gather_ctx->activate();
1011 }, on_finish);
7c673cae
FG
1012}
1013
11fdf7f2
TL
1014template <typename I>
1015void PoolReplayer<I>::handle_pre_release_leader(Context *on_finish) {
9f95a23c 1016 dout(20) << dendl;
11fdf7f2 1017
9f95a23c
TL
1018 with_namespace_replayers(
1019 [this](Context *on_finish) {
1020 dout(10) << "handle_pre_release_leader" << dendl;
7c673cae 1021
9f95a23c 1022 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae 1023
9f95a23c
TL
1024 m_leader = false;
1025 m_service_daemon->remove_attribute(m_local_pool_id,
1026 SERVICE_DAEMON_LEADER_KEY);
7c673cae 1027
9f95a23c
TL
1028 auto cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
1029 auto gather_ctx = new C_Gather(cct, on_finish);
11fdf7f2 1030
9f95a23c
TL
1031 m_default_namespace_replayer->handle_release_leader(
1032 gather_ctx->new_sub());
11fdf7f2 1033
9f95a23c
TL
1034 for (auto &it : m_namespace_replayers) {
1035 it.second->handle_release_leader(gather_ctx->new_sub());
1036 }
11fdf7f2 1037
9f95a23c
TL
1038 gather_ctx->activate();
1039 }, on_finish);
11fdf7f2 1040}
7c673cae 1041
11fdf7f2 1042template <typename I>
9f95a23c
TL
1043void PoolReplayer<I>::handle_update_leader(
1044 const std::string &leader_instance_id) {
1045 dout(10) << "leader_instance_id=" << leader_instance_id << dendl;
7c673cae 1046
9f95a23c 1047 std::lock_guard locker{m_lock};
11fdf7f2 1048
9f95a23c 1049 m_default_namespace_replayer->handle_update_leader(leader_instance_id);
7c673cae 1050
9f95a23c
TL
1051 for (auto &it : m_namespace_replayers) {
1052 it.second->handle_update_leader(leader_instance_id);
11fdf7f2 1053 }
11fdf7f2
TL
1054}
1055
1056template <typename I>
9f95a23c
TL
1057void PoolReplayer<I>::handle_instances_added(
1058 const std::vector<std::string> &instance_ids) {
1059 dout(5) << "instance_ids=" << instance_ids << dendl;
11fdf7f2 1060
9f95a23c
TL
1061 std::lock_guard locker{m_lock};
1062 if (!m_leader_watcher->is_leader()) {
1063 return;
7c673cae
FG
1064 }
1065
9f95a23c 1066 m_default_namespace_replayer->handle_instances_added(instance_ids);
7c673cae 1067
9f95a23c
TL
1068 for (auto &it : m_namespace_replayers) {
1069 it.second->handle_instances_added(instance_ids);
7c673cae 1070 }
11fdf7f2
TL
1071}
1072
1073template <typename I>
9f95a23c
TL
1074void PoolReplayer<I>::handle_instances_removed(
1075 const std::vector<std::string> &instance_ids) {
1076 dout(5) << "instance_ids=" << instance_ids << dendl;
11fdf7f2 1077
9f95a23c
TL
1078 std::lock_guard locker{m_lock};
1079 if (!m_leader_watcher->is_leader()) {
1080 return;
11fdf7f2
TL
1081 }
1082
9f95a23c 1083 m_default_namespace_replayer->handle_instances_removed(instance_ids);
11fdf7f2 1084
9f95a23c
TL
1085 for (auto &it : m_namespace_replayers) {
1086 it.second->handle_instances_removed(instance_ids);
11fdf7f2 1087 }
11fdf7f2
TL
1088}
1089
1090template <typename I>
9f95a23c
TL
1091void PoolReplayer<I>::handle_remote_pool_meta_updated(
1092 const RemotePoolMeta& remote_pool_meta) {
1093 dout(5) << "remote_pool_meta=" << remote_pool_meta << dendl;
11fdf7f2 1094
9f95a23c
TL
1095 if (!m_default_namespace_replayer) {
1096 m_remote_pool_meta = remote_pool_meta;
11fdf7f2
TL
1097 return;
1098 }
1099
9f95a23c
TL
1100 derr << "remote pool metadata updated unexpectedly" << dendl;
1101 std::unique_lock locker{m_lock};
1102 m_stopping = true;
1103 m_cond.notify_all();
11fdf7f2
TL
1104}
1105
7c673cae
FG
1106} // namespace mirror
1107} // namespace rbd
11fdf7f2
TL
1108
1109template class rbd::mirror::PoolReplayer<librbd::ImageCtx>;