]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/Mirror.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / tools / rbd_mirror / Mirror.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
92f5a8d4
TL
4#include <signal.h>
5
7c673cae
FG
6#include <boost/range/adaptor/map.hpp>
7
8#include "common/Formatter.h"
9f95a23c 9#include "common/PriorityCache.h"
7c673cae
FG
10#include "common/admin_socket.h"
11#include "common/debug.h"
12#include "common/errno.h"
9f95a23c 13#include "journal/Types.h"
7c673cae 14#include "librbd/ImageCtx.h"
9f95a23c 15#include "perfglue/heap_profiler.h"
7c673cae 16#include "Mirror.h"
9f95a23c 17#include "PoolMetaCache.h"
c07f9fc5 18#include "ServiceDaemon.h"
7c673cae 19#include "Threads.h"
7c673cae
FG
20
21#define dout_context g_ceph_context
22#define dout_subsys ceph_subsys_rbd_mirror
7c673cae
FG
23
24using std::list;
25using std::map;
26using std::set;
27using std::string;
28using std::unique_ptr;
29using std::vector;
30
31using librados::Rados;
32using librados::IoCtx;
33using librbd::mirror_peer_t;
34
35namespace rbd {
36namespace mirror {
37
38namespace {
39
40class MirrorAdminSocketCommand {
41public:
42 virtual ~MirrorAdminSocketCommand() {}
9f95a23c 43 virtual int call(Formatter *f) = 0;
7c673cae
FG
44};
45
46class StatusCommand : public MirrorAdminSocketCommand {
47public:
48 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
49
9f95a23c
TL
50 int call(Formatter *f) override {
51 mirror->print_status(f);
52 return 0;
7c673cae
FG
53 }
54
55private:
56 Mirror *mirror;
57};
58
59class StartCommand : public MirrorAdminSocketCommand {
60public:
61 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
62
9f95a23c 63 int call(Formatter *f) override {
7c673cae 64 mirror->start();
9f95a23c 65 return 0;
7c673cae
FG
66 }
67
68private:
69 Mirror *mirror;
70};
71
72class StopCommand : public MirrorAdminSocketCommand {
73public:
74 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
75
9f95a23c 76 int call(Formatter *f) override {
7c673cae 77 mirror->stop();
9f95a23c 78 return 0;
7c673cae
FG
79 }
80
81private:
82 Mirror *mirror;
83};
84
85class RestartCommand : public MirrorAdminSocketCommand {
86public:
87 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
88
9f95a23c 89 int call(Formatter *f) override {
7c673cae 90 mirror->restart();
9f95a23c 91 return 0;
7c673cae
FG
92 }
93
94private:
95 Mirror *mirror;
96};
97
98class FlushCommand : public MirrorAdminSocketCommand {
99public:
100 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
101
9f95a23c 102 int call(Formatter *f) override {
7c673cae 103 mirror->flush();
9f95a23c 104 return 0;
7c673cae
FG
105 }
106
107private:
108 Mirror *mirror;
109};
110
111class LeaderReleaseCommand : public MirrorAdminSocketCommand {
112public:
113 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
114
9f95a23c 115 int call(Formatter *f) override {
7c673cae 116 mirror->release_leader();
9f95a23c 117 return 0;
7c673cae
FG
118 }
119
120private:
121 Mirror *mirror;
122};
123
9f95a23c
TL
124#undef dout_prefix
125#define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \
126 << m_name << " " << __func__ << ": "
127
128struct PriCache : public PriorityCache::PriCache {
129 std::string m_name;
130 int64_t m_base_cache_max_size;
131 int64_t m_extra_cache_max_size;
132
133 PriorityCache::Priority m_base_cache_pri = PriorityCache::Priority::PRI10;
134 PriorityCache::Priority m_extra_cache_pri = PriorityCache::Priority::PRI10;
135 int64_t m_base_cache_bytes = 0;
136 int64_t m_extra_cache_bytes = 0;
137 int64_t m_committed_bytes = 0;
138 double m_cache_ratio = 0;
139
140 PriCache(const std::string &name, uint64_t min_size, uint64_t max_size)
141 : m_name(name), m_base_cache_max_size(min_size),
142 m_extra_cache_max_size(max_size - min_size) {
143 ceph_assert(max_size >= min_size);
144 }
145
146 void prioritize() {
147 if (m_base_cache_pri == PriorityCache::Priority::PRI0) {
148 return;
149 }
150 auto pri = static_cast<uint8_t>(m_base_cache_pri);
151 m_base_cache_pri = static_cast<PriorityCache::Priority>(--pri);
152
153 dout(30) << m_base_cache_pri << dendl;
154 }
155
156 int64_t request_cache_bytes(PriorityCache::Priority pri,
157 uint64_t total_cache) const override {
158 int64_t cache_bytes = 0;
159
160 if (pri == m_base_cache_pri) {
161 cache_bytes += m_base_cache_max_size;
162 }
163 if (pri == m_extra_cache_pri) {
164 cache_bytes += m_extra_cache_max_size;
165 }
166
167 dout(30) << cache_bytes << dendl;
168
169 return cache_bytes;
170 }
171
172 int64_t get_cache_bytes(PriorityCache::Priority pri) const override {
173 int64_t cache_bytes = 0;
174
175 if (pri == m_base_cache_pri) {
176 cache_bytes += m_base_cache_bytes;
177 }
178 if (pri == m_extra_cache_pri) {
179 cache_bytes += m_extra_cache_bytes;
180 }
181
182 dout(30) << "pri=" << pri << " " << cache_bytes << dendl;
183
184 return cache_bytes;
185 }
186
187 int64_t get_cache_bytes() const override {
188 auto cache_bytes = m_base_cache_bytes + m_extra_cache_bytes;
189
190 dout(30) << m_base_cache_bytes << "+" << m_extra_cache_bytes << "="
191 << cache_bytes << dendl;
192
193 return cache_bytes;
194 }
195
196 void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
197 ceph_assert(bytes >= 0);
198 ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri ||
199 bytes == 0);
200
201 dout(30) << "pri=" << pri << " " << bytes << dendl;
202
203 if (pri == m_base_cache_pri) {
204 m_base_cache_bytes = std::min(m_base_cache_max_size, bytes);
205 bytes -= std::min(m_base_cache_bytes, bytes);
206 }
207
208 if (pri == m_extra_cache_pri) {
209 m_extra_cache_bytes = bytes;
210 }
211 }
212
213 void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
214 ceph_assert(bytes >= 0);
215 ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri);
216
217 dout(30) << "pri=" << pri << " " << bytes << dendl;
218
219 if (pri == m_base_cache_pri) {
220 ceph_assert(m_base_cache_bytes <= m_base_cache_max_size);
221
222 auto chunk = std::min(m_base_cache_max_size - m_base_cache_bytes, bytes);
223 m_base_cache_bytes += chunk;
224 bytes -= chunk;
225 }
226
227 if (pri == m_extra_cache_pri) {
228 m_extra_cache_bytes += bytes;
229 }
230 }
231
232 int64_t commit_cache_size(uint64_t total_cache) override {
233 m_committed_bytes = p2roundup<int64_t>(get_cache_bytes(), 4096);
234
235 dout(30) << m_committed_bytes << dendl;
236
237 return m_committed_bytes;
238 }
239
240 int64_t get_committed_size() const override {
241 dout(30) << m_committed_bytes << dendl;
242
243 return m_committed_bytes;
244 }
245
246 double get_cache_ratio() const override {
247 dout(30) << m_cache_ratio << dendl;
248
249 return m_cache_ratio;
250 }
251
252 void set_cache_ratio(double ratio) override {
253 dout(30) << m_cache_ratio << dendl;
254
255 m_cache_ratio = ratio;
256 }
257
20effc67
TL
258 void shift_bins() override {
259 }
260
261 void import_bins(const std::vector<uint64_t> &intervals) override {
262 }
263
264 void set_bins(PriorityCache::Priority pri, uint64_t end_interval) override {
265 }
266
267 uint64_t get_bins(PriorityCache::Priority pri) const override {
268 return 0;
269 }
270
9f95a23c
TL
271 std::string get_cache_name() const override {
272 return m_name;
273 }
274};
275
7c673cae
FG
276} // anonymous namespace
277
9f95a23c
TL
278#undef dout_prefix
279#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
280 << __func__ << ": "
281
7c673cae
FG
282class MirrorAdminSocketHook : public AdminSocketHook {
283public:
284 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
285 admin_socket(cct->get_admin_socket()) {
286 std::string command;
287 int r;
288
289 command = "rbd mirror status";
9f95a23c 290 r = admin_socket->register_command(command, this,
7c673cae
FG
291 "get status for rbd mirror");
292 if (r == 0) {
293 commands[command] = new StatusCommand(mirror);
294 }
295
296 command = "rbd mirror start";
9f95a23c 297 r = admin_socket->register_command(command, this,
7c673cae
FG
298 "start rbd mirror");
299 if (r == 0) {
300 commands[command] = new StartCommand(mirror);
301 }
302
303 command = "rbd mirror stop";
9f95a23c 304 r = admin_socket->register_command(command, this,
7c673cae
FG
305 "stop rbd mirror");
306 if (r == 0) {
307 commands[command] = new StopCommand(mirror);
308 }
309
310 command = "rbd mirror restart";
9f95a23c 311 r = admin_socket->register_command(command, this,
7c673cae
FG
312 "restart rbd mirror");
313 if (r == 0) {
314 commands[command] = new RestartCommand(mirror);
315 }
316
317 command = "rbd mirror flush";
9f95a23c 318 r = admin_socket->register_command(command, this,
7c673cae
FG
319 "flush rbd mirror");
320 if (r == 0) {
321 commands[command] = new FlushCommand(mirror);
322 }
323
324 command = "rbd mirror leader release";
9f95a23c 325 r = admin_socket->register_command(command, this,
7c673cae
FG
326 "release rbd mirror leader");
327 if (r == 0) {
328 commands[command] = new LeaderReleaseCommand(mirror);
329 }
330 }
331
332 ~MirrorAdminSocketHook() override {
9f95a23c 333 (void)admin_socket->unregister_commands(this);
7c673cae
FG
334 for (Commands::const_iterator i = commands.begin(); i != commands.end();
335 ++i) {
7c673cae
FG
336 delete i->second;
337 }
338 }
339
9f95a23c
TL
340 int call(std::string_view command, const cmdmap_t& cmdmap,
341 Formatter *f,
342 std::ostream& errss,
343 bufferlist& out) override {
7c673cae 344 Commands::const_iterator i = commands.find(command);
11fdf7f2 345 ceph_assert(i != commands.end());
9f95a23c 346 return i->second->call(f);
7c673cae
FG
347 }
348
349private:
11fdf7f2 350 typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
7c673cae
FG
351
352 AdminSocket *admin_socket;
353 Commands commands;
354};
355
9f95a23c
TL
356class CacheManagerHandler : public journal::CacheManagerHandler {
357public:
358 CacheManagerHandler(CephContext *cct)
359 : m_cct(cct) {
360
361 if (!m_cct->_conf.get_val<bool>("rbd_mirror_memory_autotune")) {
362 return;
363 }
364
365 uint64_t base = m_cct->_conf.get_val<Option::size_t>(
366 "rbd_mirror_memory_base");
367 double fragmentation = m_cct->_conf.get_val<double>(
368 "rbd_mirror_memory_expected_fragmentation");
369 uint64_t target = m_cct->_conf.get_val<Option::size_t>(
370 "rbd_mirror_memory_target");
371 uint64_t min = m_cct->_conf.get_val<Option::size_t>(
372 "rbd_mirror_memory_cache_min");
373 uint64_t max = min;
374
375 // When setting the maximum amount of memory to use for cache, first
376 // assume some base amount of memory for the daemon and then fudge in
377 // some overhead for fragmentation that scales with cache usage.
378 uint64_t ltarget = (1.0 - fragmentation) * target;
379 if (ltarget > base + min) {
380 max = ltarget - base;
381 }
382
383 m_next_balance = ceph_clock_now();
384 m_next_resize = ceph_clock_now();
385
386 m_cache_manager = std::make_unique<PriorityCache::Manager>(
387 m_cct, min, max, target, false);
388 }
389
390 ~CacheManagerHandler() {
391 std::lock_guard locker{m_lock};
392
393 ceph_assert(m_caches.empty());
394 }
395
396 void register_cache(const std::string &cache_name,
397 uint64_t min_size, uint64_t max_size,
398 journal::CacheRebalanceHandler* handler) override {
399 if (!m_cache_manager) {
400 handler->handle_cache_rebalanced(max_size);
401 return;
402 }
403
404 dout(20) << cache_name << " min_size=" << min_size << " max_size="
405 << max_size << " handler=" << handler << dendl;
406
407 std::lock_guard locker{m_lock};
408
409 auto p = m_caches.insert(
410 {cache_name, {cache_name, min_size, max_size, handler}});
411 ceph_assert(p.second == true);
412
413 m_cache_manager->insert(cache_name, p.first->second.pri_cache, false);
414 m_next_balance = ceph_clock_now();
415 }
416
417 void unregister_cache(const std::string &cache_name) override {
418 if (!m_cache_manager) {
419 return;
420 }
421
422 dout(20) << cache_name << dendl;
423
424 std::lock_guard locker{m_lock};
425
426 auto it = m_caches.find(cache_name);
427 ceph_assert(it != m_caches.end());
428
429 m_cache_manager->erase(cache_name);
430 m_caches.erase(it);
431 m_next_balance = ceph_clock_now();
432 }
433
434 void run_cache_manager() {
435 if (!m_cache_manager) {
436 return;
437 }
438
439 std::lock_guard locker{m_lock};
440
441 // Before we trim, check and see if it's time to rebalance/resize.
442 auto autotune_interval = m_cct->_conf.get_val<double>(
443 "rbd_mirror_memory_cache_autotune_interval");
444 auto resize_interval = m_cct->_conf.get_val<double>(
445 "rbd_mirror_memory_cache_resize_interval");
446
447 utime_t now = ceph_clock_now();
448
449 if (autotune_interval > 0 && m_next_balance <= now) {
450 dout(20) << "balance" << dendl;
451 m_cache_manager->balance();
452
453 for (auto &it : m_caches) {
454 auto pri_cache = static_cast<PriCache *>(it.second.pri_cache.get());
455 auto new_cache_bytes = pri_cache->get_cache_bytes();
456 it.second.handler->handle_cache_rebalanced(new_cache_bytes);
457 pri_cache->prioritize();
458 }
459
460 m_next_balance = ceph_clock_now();
461 m_next_balance += autotune_interval;
462 }
463
464 if (resize_interval > 0 && m_next_resize < now) {
465 if (ceph_using_tcmalloc()) {
466 dout(20) << "tune memory" << dendl;
467 m_cache_manager->tune_memory();
468 }
469
470 m_next_resize = ceph_clock_now();
471 m_next_resize += resize_interval;
472 }
473 }
474
475private:
476 struct Cache {
477 std::shared_ptr<PriorityCache::PriCache> pri_cache;
478 journal::CacheRebalanceHandler *handler;
479
480 Cache(const std::string name, uint64_t min_size, uint64_t max_size,
481 journal::CacheRebalanceHandler *handler)
482 : pri_cache(new PriCache(name, min_size, max_size)), handler(handler) {
483 }
484 };
485
486 CephContext *m_cct;
487
488 mutable ceph::mutex m_lock =
489 ceph::make_mutex("rbd::mirror::CacheManagerHandler");
490 std::unique_ptr<PriorityCache::Manager> m_cache_manager;
491 std::map<std::string, Cache> m_caches;
492
493 utime_t m_next_balance;
494 utime_t m_next_resize;
495};
496
7c673cae
FG
497Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
498 m_cct(cct),
499 m_args(args),
7c673cae 500 m_local(new librados::Rados()),
9f95a23c
TL
501 m_cache_manager_handler(new CacheManagerHandler(cct)),
502 m_pool_meta_cache(new PoolMetaCache(cct)),
f67539c2 503 m_asok_hook(new MirrorAdminSocketHook(cct, this)) {
7c673cae
FG
504}
505
506Mirror::~Mirror()
507{
508 delete m_asok_hook;
509}
510
511void Mirror::handle_signal(int signum)
512{
92f5a8d4
TL
513 dout(20) << signum << dendl;
514
9f95a23c 515 std::lock_guard l{m_lock};
92f5a8d4
TL
516
517 switch (signum) {
518 case SIGHUP:
519 for (auto &it : m_pool_replayers) {
520 it.second->reopen_logs();
521 }
522 g_ceph_context->reopen_logs();
523 break;
524
525 case SIGINT:
526 case SIGTERM:
527 m_stopping = true;
9f95a23c 528 m_cond.notify_all();
92f5a8d4
TL
529 break;
530
531 default:
532 ceph_abort_msgf("unexpected signal %d", signum);
7c673cae
FG
533 }
534}
535
536int Mirror::init()
537{
538 int r = m_local->init_with_context(m_cct);
539 if (r < 0) {
540 derr << "could not initialize rados handle" << dendl;
541 return r;
542 }
543
544 r = m_local->connect();
545 if (r < 0) {
546 derr << "error connecting to local cluster" << dendl;
547 return r;
548 }
549
f67539c2
TL
550 m_threads = &(m_cct->lookup_or_create_singleton_object<
551 Threads<librbd::ImageCtx>>("rbd_mirror::threads", false, m_local));
552 m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
553
c07f9fc5
FG
554 r = m_service_daemon->init();
555 if (r < 0) {
556 derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
557 return r;
558 }
7c673cae 559
c07f9fc5
FG
560 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
561 m_service_daemon.get()));
7c673cae
FG
562 return r;
563}
564
565void Mirror::run()
566{
567 dout(20) << "enter" << dendl;
9f95a23c 568
20effc67 569 using namespace std::chrono_literals;
9f95a23c
TL
570 utime_t next_refresh_pools = ceph_clock_now();
571
7c673cae 572 while (!m_stopping) {
9f95a23c
TL
573 utime_t now = ceph_clock_now();
574 bool refresh_pools = next_refresh_pools <= now;
575 if (refresh_pools) {
576 m_local_cluster_watcher->refresh_pools();
577 next_refresh_pools = ceph_clock_now();
578 next_refresh_pools += m_cct->_conf.get_val<uint64_t>(
579 "rbd_mirror_pool_replayers_refresh_interval");
580 }
581 std::unique_lock l{m_lock};
7c673cae 582 if (!m_manual_stop) {
9f95a23c
TL
583 if (refresh_pools) {
584 update_pool_replayers(m_local_cluster_watcher->get_pool_peers(),
585 m_local_cluster_watcher->get_site_name());
586 }
587 m_cache_manager_handler->run_cache_manager();
7c673cae 588 }
9f95a23c 589 m_cond.wait_for(l, 1s);
7c673cae
FG
590 }
591
592 // stop all pool replayers in parallel
9f95a23c 593 std::lock_guard locker{m_lock};
7c673cae
FG
594 for (auto &pool_replayer : m_pool_replayers) {
595 pool_replayer.second->stop(false);
596 }
597 dout(20) << "return" << dendl;
598}
599
9f95a23c 600void Mirror::print_status(Formatter *f)
7c673cae
FG
601{
602 dout(20) << "enter" << dendl;
603
9f95a23c 604 std::lock_guard l{m_lock};
7c673cae
FG
605
606 if (m_stopping) {
607 return;
608 }
609
9f95a23c
TL
610 f->open_object_section("mirror_status");
611 f->open_array_section("pool_replayers");
7c673cae 612 for (auto &pool_replayer : m_pool_replayers) {
9f95a23c 613 pool_replayer.second->print_status(f);
7c673cae 614 }
9f95a23c
TL
615 f->close_section();
616 f->close_section();
7c673cae
FG
617}
618
619void Mirror::start()
620{
621 dout(20) << "enter" << dendl;
9f95a23c 622 std::lock_guard l{m_lock};
7c673cae
FG
623
624 if (m_stopping) {
625 return;
626 }
627
628 m_manual_stop = false;
629
630 for (auto &pool_replayer : m_pool_replayers) {
631 pool_replayer.second->start();
632 }
633}
634
635void Mirror::stop()
636{
637 dout(20) << "enter" << dendl;
9f95a23c 638 std::lock_guard l{m_lock};
7c673cae
FG
639
640 if (m_stopping) {
641 return;
642 }
643
644 m_manual_stop = true;
645
646 for (auto &pool_replayer : m_pool_replayers) {
647 pool_replayer.second->stop(true);
648 }
649}
650
651void Mirror::restart()
652{
653 dout(20) << "enter" << dendl;
9f95a23c 654 std::lock_guard l{m_lock};
7c673cae
FG
655
656 if (m_stopping) {
657 return;
658 }
659
660 m_manual_stop = false;
661
662 for (auto &pool_replayer : m_pool_replayers) {
663 pool_replayer.second->restart();
664 }
665}
666
667void Mirror::flush()
668{
669 dout(20) << "enter" << dendl;
9f95a23c 670 std::lock_guard l{m_lock};
7c673cae
FG
671
672 if (m_stopping || m_manual_stop) {
673 return;
674 }
675
676 for (auto &pool_replayer : m_pool_replayers) {
677 pool_replayer.second->flush();
678 }
679}
680
681void Mirror::release_leader()
682{
683 dout(20) << "enter" << dendl;
9f95a23c 684 std::lock_guard l{m_lock};
7c673cae
FG
685
686 if (m_stopping) {
687 return;
688 }
689
690 for (auto &pool_replayer : m_pool_replayers) {
691 pool_replayer.second->release_leader();
692 }
693}
694
9f95a23c
TL
695void Mirror::update_pool_replayers(const PoolPeers &pool_peers,
696 const std::string& site_name)
7c673cae
FG
697{
698 dout(20) << "enter" << dendl;
9f95a23c 699 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
700
701 // remove stale pool replayers before creating new pool replayers
702 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
703 auto &peer = it->first.second;
704 auto pool_peer_it = pool_peers.find(it->first.first);
c07f9fc5
FG
705 if (pool_peer_it == pool_peers.end() ||
706 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
7c673cae
FG
707 dout(20) << "removing pool replayer for " << peer << dendl;
708 // TODO: make async
c07f9fc5 709 it->second->shut_down();
7c673cae
FG
710 it = m_pool_replayers.erase(it);
711 } else {
712 ++it;
713 }
714 }
715
716 for (auto &kv : pool_peers) {
717 for (auto &peer : kv.second) {
718 PoolPeer pool_peer(kv.first, peer);
c07f9fc5
FG
719
720 auto pool_replayers_it = m_pool_replayers.find(pool_peer);
721 if (pool_replayers_it != m_pool_replayers.end()) {
722 auto& pool_replayer = pool_replayers_it->second;
9f95a23c
TL
723 if (!m_site_name.empty() && !site_name.empty() &&
724 m_site_name != site_name) {
725 dout(0) << "restarting pool replayer for " << peer << " due to "
726 << "updated site name" << dendl;
727 // TODO: make async
728 pool_replayer->shut_down();
729 pool_replayer->init(site_name);
f67539c2
TL
730 } else if (pool_replayer->is_blocklisted()) {
731 derr << "restarting blocklisted pool replayer for " << peer << dendl;
c07f9fc5
FG
732 // TODO: make async
733 pool_replayer->shut_down();
9f95a23c 734 pool_replayer->init(site_name);
c07f9fc5
FG
735 } else if (!pool_replayer->is_running()) {
736 derr << "restarting failed pool replayer for " << peer << dendl;
737 // TODO: make async
738 pool_replayer->shut_down();
9f95a23c 739 pool_replayer->init(site_name);
c07f9fc5
FG
740 }
741 } else {
7c673cae 742 dout(20) << "starting pool replayer for " << peer << dendl;
9f95a23c
TL
743 unique_ptr<PoolReplayer<>> pool_replayer(
744 new PoolReplayer<>(m_threads, m_service_daemon.get(),
745 m_cache_manager_handler.get(),
746 m_pool_meta_cache.get(), kv.first, peer,
747 m_args));
7c673cae 748
c07f9fc5 749 // TODO: make async
9f95a23c 750 pool_replayer->init(site_name);
7c673cae
FG
751 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
752 }
753 }
754
755 // TODO currently only support a single peer
756 }
9f95a23c
TL
757
758 m_site_name = site_name;
7c673cae
FG
759}
760
761} // namespace mirror
762} // namespace rbd