]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/PoolReplayer.cc
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / tools / rbd_mirror / PoolReplayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "PoolReplayer.h"
5#include <boost/bind.hpp>
6#include "common/Formatter.h"
7#include "common/admin_socket.h"
8#include "common/ceph_argparse.h"
9#include "common/code_environment.h"
10#include "common/common_init.h"
11#include "common/debug.h"
12#include "common/errno.h"
13#include "include/stringify.h"
14#include "cls/rbd/cls_rbd_client.h"
15#include "global/global_context.h"
16#include "librbd/internal.h"
17#include "librbd/Utils.h"
18#include "librbd/Watcher.h"
19#include "librbd/api/Mirror.h"
20#include "InstanceReplayer.h"
21#include "InstanceWatcher.h"
22#include "LeaderWatcher.h"
23#include "Threads.h"
24
25#define dout_context g_ceph_context
26#define dout_subsys ceph_subsys_rbd_mirror
27#undef dout_prefix
28#define dout_prefix *_dout << "rbd::mirror::PoolReplayer: " \
29 << this << " " << __func__ << ": "
30
31using std::chrono::seconds;
32using std::map;
33using std::string;
34using std::unique_ptr;
35using std::vector;
36
37using librbd::cls_client::dir_get_name;
38using librbd::util::create_async_context_callback;
39
40namespace rbd {
41namespace mirror {
42
43namespace {
44
45class PoolReplayerAdminSocketCommand {
46public:
47 PoolReplayerAdminSocketCommand(PoolReplayer *pool_replayer)
48 : pool_replayer(pool_replayer) {
49 }
50 virtual ~PoolReplayerAdminSocketCommand() {}
51 virtual bool call(Formatter *f, stringstream *ss) = 0;
52protected:
53 PoolReplayer *pool_replayer;
54};
55
56class StatusCommand : public PoolReplayerAdminSocketCommand {
57public:
58 explicit StatusCommand(PoolReplayer *pool_replayer)
59 : PoolReplayerAdminSocketCommand(pool_replayer) {
60 }
61
62 bool call(Formatter *f, stringstream *ss) override {
63 pool_replayer->print_status(f, ss);
64 return true;
65 }
66};
67
68class StartCommand : public PoolReplayerAdminSocketCommand {
69public:
70 explicit StartCommand(PoolReplayer *pool_replayer)
71 : PoolReplayerAdminSocketCommand(pool_replayer) {
72 }
73
74 bool call(Formatter *f, stringstream *ss) override {
75 pool_replayer->start();
76 return true;
77 }
78};
79
80class StopCommand : public PoolReplayerAdminSocketCommand {
81public:
82 explicit StopCommand(PoolReplayer *pool_replayer)
83 : PoolReplayerAdminSocketCommand(pool_replayer) {
84 }
85
86 bool call(Formatter *f, stringstream *ss) override {
87 pool_replayer->stop(true);
88 return true;
89 }
90};
91
92class RestartCommand : public PoolReplayerAdminSocketCommand {
93public:
94 explicit RestartCommand(PoolReplayer *pool_replayer)
95 : PoolReplayerAdminSocketCommand(pool_replayer) {
96 }
97
98 bool call(Formatter *f, stringstream *ss) override {
99 pool_replayer->restart();
100 return true;
101 }
102};
103
104class FlushCommand : public PoolReplayerAdminSocketCommand {
105public:
106 explicit FlushCommand(PoolReplayer *pool_replayer)
107 : PoolReplayerAdminSocketCommand(pool_replayer) {
108 }
109
110 bool call(Formatter *f, stringstream *ss) override {
111 pool_replayer->flush();
112 return true;
113 }
114};
115
116class LeaderReleaseCommand : public PoolReplayerAdminSocketCommand {
117public:
118 explicit LeaderReleaseCommand(PoolReplayer *pool_replayer)
119 : PoolReplayerAdminSocketCommand(pool_replayer) {
120 }
121
122 bool call(Formatter *f, stringstream *ss) override {
123 pool_replayer->release_leader();
124 return true;
125 }
126};
127
128class PoolReplayerAdminSocketHook : public AdminSocketHook {
129public:
130 PoolReplayerAdminSocketHook(CephContext *cct, const std::string &name,
131 PoolReplayer *pool_replayer)
132 : admin_socket(cct->get_admin_socket()) {
133 std::string command;
134 int r;
135
136 command = "rbd mirror status " + name;
137 r = admin_socket->register_command(command, command, this,
138 "get status for rbd mirror " + name);
139 if (r == 0) {
140 commands[command] = new StatusCommand(pool_replayer);
141 }
142
143 command = "rbd mirror start " + name;
144 r = admin_socket->register_command(command, command, this,
145 "start rbd mirror " + name);
146 if (r == 0) {
147 commands[command] = new StartCommand(pool_replayer);
148 }
149
150 command = "rbd mirror stop " + name;
151 r = admin_socket->register_command(command, command, this,
152 "stop rbd mirror " + name);
153 if (r == 0) {
154 commands[command] = new StopCommand(pool_replayer);
155 }
156
157 command = "rbd mirror restart " + name;
158 r = admin_socket->register_command(command, command, this,
159 "restart rbd mirror " + name);
160 if (r == 0) {
161 commands[command] = new RestartCommand(pool_replayer);
162 }
163
164 command = "rbd mirror flush " + name;
165 r = admin_socket->register_command(command, command, this,
166 "flush rbd mirror " + name);
167 if (r == 0) {
168 commands[command] = new FlushCommand(pool_replayer);
169 }
170
171 command = "rbd mirror leader release " + name;
172 r = admin_socket->register_command(command, command, this,
173 "release rbd mirror leader " + name);
174 if (r == 0) {
175 commands[command] = new LeaderReleaseCommand(pool_replayer);
176 }
177 }
178
179 ~PoolReplayerAdminSocketHook() override {
180 for (Commands::const_iterator i = commands.begin(); i != commands.end();
181 ++i) {
182 (void)admin_socket->unregister_command(i->first);
183 delete i->second;
184 }
185 }
186
187 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
188 bufferlist& out) override {
189 Commands::const_iterator i = commands.find(command);
190 assert(i != commands.end());
191 Formatter *f = Formatter::create(format);
192 stringstream ss;
193 bool r = i->second->call(f, &ss);
194 delete f;
195 out.append(ss);
196 return r;
197 }
198
199private:
200 typedef std::map<std::string, PoolReplayerAdminSocketCommand*> Commands;
201
202 AdminSocket *admin_socket;
203 Commands commands;
204};
205
206} // anonymous namespace
207
208PoolReplayer::PoolReplayer(Threads<librbd::ImageCtx> *threads,
209 std::shared_ptr<ImageDeleter> image_deleter,
210 ImageSyncThrottlerRef<> image_sync_throttler,
211 int64_t local_pool_id, const peer_t &peer,
212 const std::vector<const char*> &args) :
213 m_threads(threads),
214 m_image_deleter(image_deleter),
215 m_image_sync_throttler(image_sync_throttler),
216 m_lock(stringify("rbd::mirror::PoolReplayer ") + stringify(peer)),
217 m_peer(peer),
218 m_args(args),
219 m_local_pool_id(local_pool_id),
220 m_local_pool_watcher_listener(this, true),
221 m_remote_pool_watcher_listener(this, false),
222 m_asok_hook(nullptr),
223 m_pool_replayer_thread(this),
224 m_leader_listener(this)
225{
226}
227
228PoolReplayer::~PoolReplayer()
229{
230 delete m_asok_hook;
231
232 m_stopping = true;
233 {
234 Mutex::Locker l(m_lock);
235 m_cond.Signal();
236 }
237 if (m_pool_replayer_thread.is_started()) {
238 m_pool_replayer_thread.join();
239 }
240 if (m_leader_watcher) {
241 m_leader_watcher->shut_down();
242 }
243 if (m_instance_watcher) {
244 m_instance_watcher->shut_down();
245 }
246 if (m_instance_replayer) {
247 m_instance_replayer->shut_down();
248 }
249
250 assert(!m_local_pool_watcher);
251 assert(!m_remote_pool_watcher);
252}
253
254bool PoolReplayer::is_blacklisted() const {
255 Mutex::Locker locker(m_lock);
256 return m_blacklisted;
257}
258
259bool PoolReplayer::is_leader() const {
260 Mutex::Locker locker(m_lock);
261 return m_leader_watcher && m_leader_watcher->is_leader();
262}
263
264int PoolReplayer::init()
265{
266 dout(20) << "replaying for " << m_peer << dendl;
267
268 int r = init_rados(g_ceph_context->_conf->cluster,
269 g_ceph_context->_conf->name.to_str(),
270 "local cluster", &m_local_rados);
271 if (r < 0) {
272 return r;
273 }
274
275 r = init_rados(m_peer.cluster_name, m_peer.client_name,
276 std::string("remote peer ") + stringify(m_peer),
277 &m_remote_rados);
278 if (r < 0) {
279 return r;
280 }
281
282 r = m_local_rados->ioctx_create2(m_local_pool_id, m_local_io_ctx);
283 if (r < 0) {
284 derr << "error accessing local pool " << m_local_pool_id << ": "
285 << cpp_strerror(r) << dendl;
286 return r;
287 }
288
289 std::string local_mirror_uuid;
290 r = librbd::cls_client::mirror_uuid_get(&m_local_io_ctx,
291 &local_mirror_uuid);
292 if (r < 0) {
293 derr << "failed to retrieve local mirror uuid from pool "
294 << m_local_io_ctx.get_pool_name() << ": " << cpp_strerror(r) << dendl;
295 return r;
296 }
297
298 r = m_remote_rados->ioctx_create(m_local_io_ctx.get_pool_name().c_str(),
299 m_remote_io_ctx);
300 if (r < 0) {
301 derr << "error accessing remote pool " << m_local_io_ctx.get_pool_name()
302 << ": " << cpp_strerror(r) << dendl;
303 return r;
304 }
305
306 dout(20) << "connected to " << m_peer << dendl;
307
308 m_instance_replayer.reset(
309 InstanceReplayer<>::create(m_threads, m_image_deleter,
310 m_image_sync_throttler, m_local_rados,
311 local_mirror_uuid, m_local_pool_id));
312 m_instance_replayer->init();
313 m_instance_replayer->add_peer(m_peer.uuid, m_remote_io_ctx);
314
315 m_instance_watcher.reset(InstanceWatcher<>::create(m_local_io_ctx,
316 m_threads->work_queue,
317 m_instance_replayer.get()));
318 r = m_instance_watcher->init();
319 if (r < 0) {
320 derr << "error initializing instance watcher: " << cpp_strerror(r) << dendl;
321 return r;
322 }
323
324 m_leader_watcher.reset(new LeaderWatcher<>(m_threads, m_local_io_ctx,
325 &m_leader_listener));
326 r = m_leader_watcher->init();
327 if (r < 0) {
328 derr << "error initializing leader watcher: " << cpp_strerror(r) << dendl;
329 return r;
330 }
331
332 m_pool_replayer_thread.create("pool replayer");
333
334 return 0;
335}
336
337int PoolReplayer::init_rados(const std::string &cluster_name,
338 const std::string &client_name,
339 const std::string &description,
340 RadosRef *rados_ref) {
341 rados_ref->reset(new librados::Rados());
342
343 // NOTE: manually bootstrap a CephContext here instead of via
344 // the librados API to avoid mixing global singletons between
345 // the librados shared library and the daemon
346 // TODO: eliminate intermingling of global singletons within Ceph APIs
347 CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT);
348 if (client_name.empty() || !iparams.name.from_str(client_name)) {
349 derr << "error initializing cluster handle for " << description << dendl;
350 return -EINVAL;
351 }
352
353 CephContext *cct = common_preinit(iparams, CODE_ENVIRONMENT_LIBRARY,
354 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
355 cct->_conf->cluster = cluster_name;
356
357 // librados::Rados::conf_read_file
358 int r = cct->_conf->parse_config_files(nullptr, nullptr, 0);
359 if (r < 0) {
360 derr << "could not read ceph conf for " << description << ": "
361 << cpp_strerror(r) << dendl;
362 cct->put();
363 return r;
364 }
365 cct->_conf->parse_env();
366
367 // librados::Rados::conf_parse_env
368 std::vector<const char*> args;
369 env_to_vec(args, nullptr);
370 r = cct->_conf->parse_argv(args);
371 if (r < 0) {
372 derr << "could not parse environment for " << description << ":"
373 << cpp_strerror(r) << dendl;
374 cct->put();
375 return r;
376 }
377
378 if (!m_args.empty()) {
379 // librados::Rados::conf_parse_argv
380 args = m_args;
381 r = cct->_conf->parse_argv(args);
382 if (r < 0) {
383 derr << "could not parse command line args for " << description << ": "
384 << cpp_strerror(r) << dendl;
385 cct->put();
386 return r;
387 }
388 }
389
390 if (!g_ceph_context->_conf->admin_socket.empty()) {
391 cct->_conf->set_val_or_die("admin_socket",
392 "$run_dir/$name.$pid.$cluster.$cctid.asok");
393 }
394
395 // disable unnecessary librbd cache
396 cct->_conf->set_val_or_die("rbd_cache", "false");
397 cct->_conf->apply_changes(nullptr);
398 cct->_conf->complain_about_parse_errors(cct);
399
400 r = (*rados_ref)->init_with_context(cct);
401 assert(r == 0);
402 cct->put();
403
404 r = (*rados_ref)->connect();
405 if (r < 0) {
406 derr << "error connecting to " << description << ": "
407 << cpp_strerror(r) << dendl;
408 return r;
409 }
410
411 return 0;
412}
413
414void PoolReplayer::run()
415{
416 dout(20) << "enter" << dendl;
417
418 while (!m_stopping) {
419 std::string asok_hook_name = m_local_io_ctx.get_pool_name() + " " +
420 m_peer.cluster_name;
421 if (m_asok_hook_name != asok_hook_name || m_asok_hook == nullptr) {
422 m_asok_hook_name = asok_hook_name;
423 delete m_asok_hook;
424
425 m_asok_hook = new PoolReplayerAdminSocketHook(g_ceph_context,
426 m_asok_hook_name, this);
427 }
428
429 Mutex::Locker locker(m_lock);
430 if ((m_local_pool_watcher && m_local_pool_watcher->is_blacklisted()) ||
431 (m_remote_pool_watcher && m_remote_pool_watcher->is_blacklisted())) {
432 m_blacklisted = true;
433 m_stopping = true;
434 break;
435 }
436
437 m_cond.WaitInterval(m_lock, utime_t(1, 0));
438 }
439}
440
441void PoolReplayer::print_status(Formatter *f, stringstream *ss)
442{
443 dout(20) << "enter" << dendl;
444
445 if (!f) {
446 return;
447 }
448
449 Mutex::Locker l(m_lock);
450
451 f->open_object_section("pool_replayer_status");
452 f->dump_string("pool", m_local_io_ctx.get_pool_name());
453 f->dump_stream("peer") << m_peer;
454 f->dump_string("instance_id", m_instance_watcher->get_instance_id());
455
456 std::string leader_instance_id;
457 m_leader_watcher->get_leader_instance_id(&leader_instance_id);
458 f->dump_string("leader_instance_id", leader_instance_id);
459
460 bool leader = m_leader_watcher->is_leader();
461 f->dump_bool("leader", leader);
462 if (leader) {
463 std::vector<std::string> instance_ids;
464 m_leader_watcher->list_instances(&instance_ids);
465 f->open_array_section("instances");
466 for (auto instance_id : instance_ids) {
467 f->dump_string("instance_id", instance_id);
468 }
469 f->close_section();
470 }
471
472 f->dump_string("local_cluster_admin_socket",
473 reinterpret_cast<CephContext *>(m_local_io_ctx.cct())->_conf->
474 admin_socket);
475 f->dump_string("remote_cluster_admin_socket",
476 reinterpret_cast<CephContext *>(m_remote_io_ctx.cct())->_conf->
477 admin_socket);
478
479 m_instance_replayer->print_status(f, ss);
480
481 f->close_section();
482 f->flush(*ss);
483}
484
485void PoolReplayer::start()
486{
487 dout(20) << "enter" << dendl;
488
489 Mutex::Locker l(m_lock);
490
491 if (m_stopping) {
492 return;
493 }
494
495 m_instance_replayer->start();
496}
497
498void PoolReplayer::stop(bool manual)
499{
500 dout(20) << "enter: manual=" << manual << dendl;
501
502 Mutex::Locker l(m_lock);
503 if (!manual) {
504 m_stopping = true;
505 m_cond.Signal();
506 return;
507 } else if (m_stopping) {
508 return;
509 }
510
511 m_instance_replayer->stop();
512}
513
514void PoolReplayer::restart()
515{
516 dout(20) << "enter" << dendl;
517
518 Mutex::Locker l(m_lock);
519
520 if (m_stopping) {
521 return;
522 }
523
524 m_instance_replayer->restart();
525}
526
527void PoolReplayer::flush()
528{
529 dout(20) << "enter" << dendl;
530
531 Mutex::Locker l(m_lock);
532
533 if (m_stopping || m_manual_stop) {
534 return;
535 }
536
537 m_instance_replayer->flush();
538}
539
540void PoolReplayer::release_leader()
541{
542 dout(20) << "enter" << dendl;
543
544 Mutex::Locker l(m_lock);
545
546 if (m_stopping || !m_leader_watcher) {
547 return;
548 }
549
550 m_leader_watcher->release_leader();
551}
552
553void PoolReplayer::handle_update(const std::string &mirror_uuid,
554 ImageIds &&added_image_ids,
555 ImageIds &&removed_image_ids) {
556 if (m_stopping) {
557 return;
558 }
559
560 dout(10) << "mirror_uuid=" << mirror_uuid << ", "
561 << "added_count=" << added_image_ids.size() << ", "
562 << "removed_count=" << removed_image_ids.size() << dendl;
563 Mutex::Locker locker(m_lock);
564 if (!m_leader_watcher->is_leader()) {
565 return;
566 }
567
568 if (m_initial_mirror_image_ids.find(mirror_uuid) ==
569 m_initial_mirror_image_ids.end() &&
570 m_initial_mirror_image_ids.size() < 2) {
571 m_initial_mirror_image_ids[mirror_uuid] = added_image_ids;
572
573 if (m_initial_mirror_image_ids.size() == 2) {
574 dout(10) << "local and remote pools refreshed" << dendl;
575
576 // both local and remote initial pool listing received. derive
577 // removal notifications for the remote pool
578 auto &local_image_ids = m_initial_mirror_image_ids.begin()->second;
579 auto &remote_image_ids = m_initial_mirror_image_ids.rbegin()->second;
580 for (auto &local_image_id : local_image_ids) {
581 if (remote_image_ids.find(local_image_id) == remote_image_ids.end()) {
582 removed_image_ids.emplace(local_image_id.global_id, "");
583 }
584 }
585 local_image_ids.clear();
586 remote_image_ids.clear();
587 }
588 }
589
590 if (!mirror_uuid.empty() && m_peer.uuid != mirror_uuid) {
591 m_instance_replayer->remove_peer(m_peer.uuid);
592 m_instance_replayer->add_peer(mirror_uuid, m_remote_io_ctx);
593 m_peer.uuid = mirror_uuid;
594 }
595
596 m_update_op_tracker.start_op();
597 Context *ctx = new FunctionContext([this](int r) {
598 dout(20) << "complete handle_update: r=" << r << dendl;
599 m_update_op_tracker.finish_op();
600 });
601
602 C_Gather *gather_ctx = new C_Gather(g_ceph_context, ctx);
603
604 for (auto &image_id : added_image_ids) {
605 // for now always send to myself (the leader)
606 std::string &instance_id = m_instance_watcher->get_instance_id();
607 m_instance_watcher->notify_image_acquire(instance_id, image_id.global_id,
608 mirror_uuid, image_id.id,
609 gather_ctx->new_sub());
610 }
611
612 for (auto &image_id : removed_image_ids) {
613 // for now always send to myself (the leader)
614 std::string &instance_id = m_instance_watcher->get_instance_id();
615 m_instance_watcher->notify_image_release(instance_id, image_id.global_id,
616 mirror_uuid, image_id.id, true,
617 gather_ctx->new_sub());
618 }
619
620 gather_ctx->activate();
621}
622
623void PoolReplayer::handle_post_acquire_leader(Context *on_finish) {
624 dout(20) << dendl;
625 init_local_pool_watcher(on_finish);
626}
627
628void PoolReplayer::handle_pre_release_leader(Context *on_finish) {
629 dout(20) << dendl;
630 shut_down_pool_watchers(on_finish);
631}
632
633void PoolReplayer::init_local_pool_watcher(Context *on_finish) {
634 dout(20) << dendl;
635
636 Mutex::Locker locker(m_lock);
637 assert(!m_local_pool_watcher);
638 m_local_pool_watcher.reset(new PoolWatcher<>(
639 m_threads, m_local_io_ctx, m_local_pool_watcher_listener));
640 m_initial_mirror_image_ids.clear();
641
642 // ensure the initial set of local images is up-to-date
643 // after acquiring the leader role
644 auto ctx = new FunctionContext([this, on_finish](int r) {
645 handle_init_local_pool_watcher(r, on_finish);
646 });
647 m_local_pool_watcher->init(create_async_context_callback(
648 m_threads->work_queue, ctx));
649}
650
651void PoolReplayer::handle_init_local_pool_watcher(int r, Context *on_finish) {
652 dout(20) << "r=" << r << dendl;
653 if (r < 0) {
654 derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
655 on_finish->complete(r);
656 return;
657 }
658
659 init_remote_pool_watcher(on_finish);
660}
661
662void PoolReplayer::init_remote_pool_watcher(Context *on_finish) {
663 dout(20) << dendl;
664
665 Mutex::Locker locker(m_lock);
666 assert(!m_remote_pool_watcher);
667 m_remote_pool_watcher.reset(new PoolWatcher<>(
668 m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
669 m_remote_pool_watcher->init(create_async_context_callback(
670 m_threads->work_queue, on_finish));
671
672 m_cond.Signal();
673}
674
675void PoolReplayer::shut_down_pool_watchers(Context *on_finish) {
676 dout(20) << dendl;
677
678 {
679 Mutex::Locker locker(m_lock);
680 if (m_local_pool_watcher) {
681 Context *ctx = new FunctionContext([this, on_finish](int r) {
682 handle_shut_down_pool_watchers(r, on_finish);
683 });
684 ctx = create_async_context_callback(m_threads->work_queue, ctx);
685
686 auto gather_ctx = new C_Gather(g_ceph_context, ctx);
687 m_local_pool_watcher->shut_down(gather_ctx->new_sub());
688 if (m_remote_pool_watcher) {
689 m_remote_pool_watcher->shut_down(gather_ctx->new_sub());
690 }
691 gather_ctx->activate();
692 return;
693 }
694 }
695
696 on_finish->complete(0);
697}
698
699void PoolReplayer::handle_shut_down_pool_watchers(int r, Context *on_finish) {
700 dout(20) << "r=" << r << dendl;
701
702 {
703 Mutex::Locker locker(m_lock);
704 assert(m_local_pool_watcher);
705 m_local_pool_watcher.reset();
706
707 if (m_remote_pool_watcher) {
708 m_remote_pool_watcher.reset();
709 }
710 }
711 wait_for_update_ops(on_finish);
712}
713
714void PoolReplayer::wait_for_update_ops(Context *on_finish) {
715 dout(20) << dendl;
716
717 Mutex::Locker locker(m_lock);
718
719 Context *ctx = new FunctionContext([this, on_finish](int r) {
720 handle_wait_for_update_ops(r, on_finish);
721 });
722 ctx = create_async_context_callback(m_threads->work_queue, ctx);
723
724 m_update_op_tracker.wait_for_ops(ctx);
725}
726
727void PoolReplayer::handle_wait_for_update_ops(int r, Context *on_finish) {
728 dout(20) << "r=" << r << dendl;
729
730 assert(r == 0);
731
732 Mutex::Locker locker(m_lock);
733 m_instance_replayer->release_all(on_finish);
734}
735
736} // namespace mirror
737} // namespace rbd