]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/rbd_mirror/Mirror.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / tools / rbd_mirror / Mirror.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
92f5a8d4
TL
4#include <signal.h>
5
7c673cae
FG
6#include <boost/range/adaptor/map.hpp>
7
8#include "common/Formatter.h"
9f95a23c 9#include "common/PriorityCache.h"
7c673cae
FG
10#include "common/admin_socket.h"
11#include "common/debug.h"
12#include "common/errno.h"
9f95a23c 13#include "journal/Types.h"
7c673cae 14#include "librbd/ImageCtx.h"
9f95a23c 15#include "perfglue/heap_profiler.h"
7c673cae 16#include "Mirror.h"
9f95a23c 17#include "PoolMetaCache.h"
c07f9fc5 18#include "ServiceDaemon.h"
7c673cae 19#include "Threads.h"
7c673cae
FG
20
21#define dout_context g_ceph_context
22#define dout_subsys ceph_subsys_rbd_mirror
7c673cae
FG
23
24using std::list;
25using std::map;
26using std::set;
27using std::string;
28using std::unique_ptr;
29using std::vector;
30
31using librados::Rados;
32using librados::IoCtx;
33using librbd::mirror_peer_t;
34
35namespace rbd {
36namespace mirror {
37
38namespace {
39
40class MirrorAdminSocketCommand {
41public:
42 virtual ~MirrorAdminSocketCommand() {}
9f95a23c 43 virtual int call(Formatter *f) = 0;
7c673cae
FG
44};
45
46class StatusCommand : public MirrorAdminSocketCommand {
47public:
48 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
49
9f95a23c
TL
50 int call(Formatter *f) override {
51 mirror->print_status(f);
52 return 0;
7c673cae
FG
53 }
54
55private:
56 Mirror *mirror;
57};
58
59class StartCommand : public MirrorAdminSocketCommand {
60public:
61 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
62
9f95a23c 63 int call(Formatter *f) override {
7c673cae 64 mirror->start();
9f95a23c 65 return 0;
7c673cae
FG
66 }
67
68private:
69 Mirror *mirror;
70};
71
72class StopCommand : public MirrorAdminSocketCommand {
73public:
74 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
75
9f95a23c 76 int call(Formatter *f) override {
7c673cae 77 mirror->stop();
9f95a23c 78 return 0;
7c673cae
FG
79 }
80
81private:
82 Mirror *mirror;
83};
84
85class RestartCommand : public MirrorAdminSocketCommand {
86public:
87 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
88
9f95a23c 89 int call(Formatter *f) override {
7c673cae 90 mirror->restart();
9f95a23c 91 return 0;
7c673cae
FG
92 }
93
94private:
95 Mirror *mirror;
96};
97
98class FlushCommand : public MirrorAdminSocketCommand {
99public:
100 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
101
9f95a23c 102 int call(Formatter *f) override {
7c673cae 103 mirror->flush();
9f95a23c 104 return 0;
7c673cae
FG
105 }
106
107private:
108 Mirror *mirror;
109};
110
111class LeaderReleaseCommand : public MirrorAdminSocketCommand {
112public:
113 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
114
9f95a23c 115 int call(Formatter *f) override {
7c673cae 116 mirror->release_leader();
9f95a23c 117 return 0;
7c673cae
FG
118 }
119
120private:
121 Mirror *mirror;
122};
123
9f95a23c
TL
124#undef dout_prefix
125#define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \
126 << m_name << " " << __func__ << ": "
127
128struct PriCache : public PriorityCache::PriCache {
129 std::string m_name;
130 int64_t m_base_cache_max_size;
131 int64_t m_extra_cache_max_size;
132
133 PriorityCache::Priority m_base_cache_pri = PriorityCache::Priority::PRI10;
134 PriorityCache::Priority m_extra_cache_pri = PriorityCache::Priority::PRI10;
135 int64_t m_base_cache_bytes = 0;
136 int64_t m_extra_cache_bytes = 0;
137 int64_t m_committed_bytes = 0;
138 double m_cache_ratio = 0;
139
140 PriCache(const std::string &name, uint64_t min_size, uint64_t max_size)
141 : m_name(name), m_base_cache_max_size(min_size),
142 m_extra_cache_max_size(max_size - min_size) {
143 ceph_assert(max_size >= min_size);
144 }
145
146 void prioritize() {
147 if (m_base_cache_pri == PriorityCache::Priority::PRI0) {
148 return;
149 }
150 auto pri = static_cast<uint8_t>(m_base_cache_pri);
151 m_base_cache_pri = static_cast<PriorityCache::Priority>(--pri);
152
153 dout(30) << m_base_cache_pri << dendl;
154 }
155
156 int64_t request_cache_bytes(PriorityCache::Priority pri,
157 uint64_t total_cache) const override {
158 int64_t cache_bytes = 0;
159
160 if (pri == m_base_cache_pri) {
161 cache_bytes += m_base_cache_max_size;
162 }
163 if (pri == m_extra_cache_pri) {
164 cache_bytes += m_extra_cache_max_size;
165 }
166
167 dout(30) << cache_bytes << dendl;
168
169 return cache_bytes;
170 }
171
172 int64_t get_cache_bytes(PriorityCache::Priority pri) const override {
173 int64_t cache_bytes = 0;
174
175 if (pri == m_base_cache_pri) {
176 cache_bytes += m_base_cache_bytes;
177 }
178 if (pri == m_extra_cache_pri) {
179 cache_bytes += m_extra_cache_bytes;
180 }
181
182 dout(30) << "pri=" << pri << " " << cache_bytes << dendl;
183
184 return cache_bytes;
185 }
186
187 int64_t get_cache_bytes() const override {
188 auto cache_bytes = m_base_cache_bytes + m_extra_cache_bytes;
189
190 dout(30) << m_base_cache_bytes << "+" << m_extra_cache_bytes << "="
191 << cache_bytes << dendl;
192
193 return cache_bytes;
194 }
195
196 void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
197 ceph_assert(bytes >= 0);
198 ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri ||
199 bytes == 0);
200
201 dout(30) << "pri=" << pri << " " << bytes << dendl;
202
203 if (pri == m_base_cache_pri) {
204 m_base_cache_bytes = std::min(m_base_cache_max_size, bytes);
205 bytes -= std::min(m_base_cache_bytes, bytes);
206 }
207
208 if (pri == m_extra_cache_pri) {
209 m_extra_cache_bytes = bytes;
210 }
211 }
212
213 void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
214 ceph_assert(bytes >= 0);
215 ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri);
216
217 dout(30) << "pri=" << pri << " " << bytes << dendl;
218
219 if (pri == m_base_cache_pri) {
220 ceph_assert(m_base_cache_bytes <= m_base_cache_max_size);
221
222 auto chunk = std::min(m_base_cache_max_size - m_base_cache_bytes, bytes);
223 m_base_cache_bytes += chunk;
224 bytes -= chunk;
225 }
226
227 if (pri == m_extra_cache_pri) {
228 m_extra_cache_bytes += bytes;
229 }
230 }
231
232 int64_t commit_cache_size(uint64_t total_cache) override {
233 m_committed_bytes = p2roundup<int64_t>(get_cache_bytes(), 4096);
234
235 dout(30) << m_committed_bytes << dendl;
236
237 return m_committed_bytes;
238 }
239
240 int64_t get_committed_size() const override {
241 dout(30) << m_committed_bytes << dendl;
242
243 return m_committed_bytes;
244 }
245
246 double get_cache_ratio() const override {
247 dout(30) << m_cache_ratio << dendl;
248
249 return m_cache_ratio;
250 }
251
252 void set_cache_ratio(double ratio) override {
253 dout(30) << m_cache_ratio << dendl;
254
255 m_cache_ratio = ratio;
256 }
257
258 std::string get_cache_name() const override {
259 return m_name;
260 }
261};
262
7c673cae
FG
263} // anonymous namespace
264
9f95a23c
TL
265#undef dout_prefix
266#define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
267 << __func__ << ": "
268
7c673cae
FG
269class MirrorAdminSocketHook : public AdminSocketHook {
270public:
271 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
272 admin_socket(cct->get_admin_socket()) {
273 std::string command;
274 int r;
275
276 command = "rbd mirror status";
9f95a23c 277 r = admin_socket->register_command(command, this,
7c673cae
FG
278 "get status for rbd mirror");
279 if (r == 0) {
280 commands[command] = new StatusCommand(mirror);
281 }
282
283 command = "rbd mirror start";
9f95a23c 284 r = admin_socket->register_command(command, this,
7c673cae
FG
285 "start rbd mirror");
286 if (r == 0) {
287 commands[command] = new StartCommand(mirror);
288 }
289
290 command = "rbd mirror stop";
9f95a23c 291 r = admin_socket->register_command(command, this,
7c673cae
FG
292 "stop rbd mirror");
293 if (r == 0) {
294 commands[command] = new StopCommand(mirror);
295 }
296
297 command = "rbd mirror restart";
9f95a23c 298 r = admin_socket->register_command(command, this,
7c673cae
FG
299 "restart rbd mirror");
300 if (r == 0) {
301 commands[command] = new RestartCommand(mirror);
302 }
303
304 command = "rbd mirror flush";
9f95a23c 305 r = admin_socket->register_command(command, this,
7c673cae
FG
306 "flush rbd mirror");
307 if (r == 0) {
308 commands[command] = new FlushCommand(mirror);
309 }
310
311 command = "rbd mirror leader release";
9f95a23c 312 r = admin_socket->register_command(command, this,
7c673cae
FG
313 "release rbd mirror leader");
314 if (r == 0) {
315 commands[command] = new LeaderReleaseCommand(mirror);
316 }
317 }
318
319 ~MirrorAdminSocketHook() override {
9f95a23c 320 (void)admin_socket->unregister_commands(this);
7c673cae
FG
321 for (Commands::const_iterator i = commands.begin(); i != commands.end();
322 ++i) {
7c673cae
FG
323 delete i->second;
324 }
325 }
326
9f95a23c
TL
327 int call(std::string_view command, const cmdmap_t& cmdmap,
328 Formatter *f,
329 std::ostream& errss,
330 bufferlist& out) override {
7c673cae 331 Commands::const_iterator i = commands.find(command);
11fdf7f2 332 ceph_assert(i != commands.end());
9f95a23c 333 return i->second->call(f);
7c673cae
FG
334 }
335
336private:
11fdf7f2 337 typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
7c673cae
FG
338
339 AdminSocket *admin_socket;
340 Commands commands;
341};
342
9f95a23c
TL
343class CacheManagerHandler : public journal::CacheManagerHandler {
344public:
345 CacheManagerHandler(CephContext *cct)
346 : m_cct(cct) {
347
348 if (!m_cct->_conf.get_val<bool>("rbd_mirror_memory_autotune")) {
349 return;
350 }
351
352 uint64_t base = m_cct->_conf.get_val<Option::size_t>(
353 "rbd_mirror_memory_base");
354 double fragmentation = m_cct->_conf.get_val<double>(
355 "rbd_mirror_memory_expected_fragmentation");
356 uint64_t target = m_cct->_conf.get_val<Option::size_t>(
357 "rbd_mirror_memory_target");
358 uint64_t min = m_cct->_conf.get_val<Option::size_t>(
359 "rbd_mirror_memory_cache_min");
360 uint64_t max = min;
361
362 // When setting the maximum amount of memory to use for cache, first
363 // assume some base amount of memory for the daemon and then fudge in
364 // some overhead for fragmentation that scales with cache usage.
365 uint64_t ltarget = (1.0 - fragmentation) * target;
366 if (ltarget > base + min) {
367 max = ltarget - base;
368 }
369
370 m_next_balance = ceph_clock_now();
371 m_next_resize = ceph_clock_now();
372
373 m_cache_manager = std::make_unique<PriorityCache::Manager>(
374 m_cct, min, max, target, false);
375 }
376
377 ~CacheManagerHandler() {
378 std::lock_guard locker{m_lock};
379
380 ceph_assert(m_caches.empty());
381 }
382
383 void register_cache(const std::string &cache_name,
384 uint64_t min_size, uint64_t max_size,
385 journal::CacheRebalanceHandler* handler) override {
386 if (!m_cache_manager) {
387 handler->handle_cache_rebalanced(max_size);
388 return;
389 }
390
391 dout(20) << cache_name << " min_size=" << min_size << " max_size="
392 << max_size << " handler=" << handler << dendl;
393
394 std::lock_guard locker{m_lock};
395
396 auto p = m_caches.insert(
397 {cache_name, {cache_name, min_size, max_size, handler}});
398 ceph_assert(p.second == true);
399
400 m_cache_manager->insert(cache_name, p.first->second.pri_cache, false);
401 m_next_balance = ceph_clock_now();
402 }
403
404 void unregister_cache(const std::string &cache_name) override {
405 if (!m_cache_manager) {
406 return;
407 }
408
409 dout(20) << cache_name << dendl;
410
411 std::lock_guard locker{m_lock};
412
413 auto it = m_caches.find(cache_name);
414 ceph_assert(it != m_caches.end());
415
416 m_cache_manager->erase(cache_name);
417 m_caches.erase(it);
418 m_next_balance = ceph_clock_now();
419 }
420
421 void run_cache_manager() {
422 if (!m_cache_manager) {
423 return;
424 }
425
426 std::lock_guard locker{m_lock};
427
428 // Before we trim, check and see if it's time to rebalance/resize.
429 auto autotune_interval = m_cct->_conf.get_val<double>(
430 "rbd_mirror_memory_cache_autotune_interval");
431 auto resize_interval = m_cct->_conf.get_val<double>(
432 "rbd_mirror_memory_cache_resize_interval");
433
434 utime_t now = ceph_clock_now();
435
436 if (autotune_interval > 0 && m_next_balance <= now) {
437 dout(20) << "balance" << dendl;
438 m_cache_manager->balance();
439
440 for (auto &it : m_caches) {
441 auto pri_cache = static_cast<PriCache *>(it.second.pri_cache.get());
442 auto new_cache_bytes = pri_cache->get_cache_bytes();
443 it.second.handler->handle_cache_rebalanced(new_cache_bytes);
444 pri_cache->prioritize();
445 }
446
447 m_next_balance = ceph_clock_now();
448 m_next_balance += autotune_interval;
449 }
450
451 if (resize_interval > 0 && m_next_resize < now) {
452 if (ceph_using_tcmalloc()) {
453 dout(20) << "tune memory" << dendl;
454 m_cache_manager->tune_memory();
455 }
456
457 m_next_resize = ceph_clock_now();
458 m_next_resize += resize_interval;
459 }
460 }
461
462private:
463 struct Cache {
464 std::shared_ptr<PriorityCache::PriCache> pri_cache;
465 journal::CacheRebalanceHandler *handler;
466
467 Cache(const std::string name, uint64_t min_size, uint64_t max_size,
468 journal::CacheRebalanceHandler *handler)
469 : pri_cache(new PriCache(name, min_size, max_size)), handler(handler) {
470 }
471 };
472
473 CephContext *m_cct;
474
475 mutable ceph::mutex m_lock =
476 ceph::make_mutex("rbd::mirror::CacheManagerHandler");
477 std::unique_ptr<PriorityCache::Manager> m_cache_manager;
478 std::map<std::string, Cache> m_caches;
479
480 utime_t m_next_balance;
481 utime_t m_next_resize;
482};
483
7c673cae
FG
484Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
485 m_cct(cct),
486 m_args(args),
7c673cae 487 m_local(new librados::Rados()),
9f95a23c
TL
488 m_cache_manager_handler(new CacheManagerHandler(cct)),
489 m_pool_meta_cache(new PoolMetaCache(cct)),
7c673cae
FG
490 m_asok_hook(new MirrorAdminSocketHook(cct, this))
491{
11fdf7f2
TL
492 m_threads =
493 &(cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx>>(
494 "rbd_mirror::threads", false, cct));
c07f9fc5 495 m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
7c673cae
FG
496}
497
498Mirror::~Mirror()
499{
500 delete m_asok_hook;
501}
502
503void Mirror::handle_signal(int signum)
504{
92f5a8d4
TL
505 dout(20) << signum << dendl;
506
9f95a23c 507 std::lock_guard l{m_lock};
92f5a8d4
TL
508
509 switch (signum) {
510 case SIGHUP:
511 for (auto &it : m_pool_replayers) {
512 it.second->reopen_logs();
513 }
514 g_ceph_context->reopen_logs();
515 break;
516
517 case SIGINT:
518 case SIGTERM:
519 m_stopping = true;
9f95a23c 520 m_cond.notify_all();
92f5a8d4
TL
521 break;
522
523 default:
524 ceph_abort_msgf("unexpected signal %d", signum);
7c673cae
FG
525 }
526}
527
528int Mirror::init()
529{
530 int r = m_local->init_with_context(m_cct);
531 if (r < 0) {
532 derr << "could not initialize rados handle" << dendl;
533 return r;
534 }
535
536 r = m_local->connect();
537 if (r < 0) {
538 derr << "error connecting to local cluster" << dendl;
539 return r;
540 }
541
c07f9fc5
FG
542 r = m_service_daemon->init();
543 if (r < 0) {
544 derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
545 return r;
546 }
7c673cae 547
c07f9fc5
FG
548 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
549 m_service_daemon.get()));
7c673cae
FG
550 return r;
551}
552
553void Mirror::run()
554{
555 dout(20) << "enter" << dendl;
9f95a23c
TL
556
557 utime_t next_refresh_pools = ceph_clock_now();
558
7c673cae 559 while (!m_stopping) {
9f95a23c
TL
560 utime_t now = ceph_clock_now();
561 bool refresh_pools = next_refresh_pools <= now;
562 if (refresh_pools) {
563 m_local_cluster_watcher->refresh_pools();
564 next_refresh_pools = ceph_clock_now();
565 next_refresh_pools += m_cct->_conf.get_val<uint64_t>(
566 "rbd_mirror_pool_replayers_refresh_interval");
567 }
568 std::unique_lock l{m_lock};
7c673cae 569 if (!m_manual_stop) {
9f95a23c
TL
570 if (refresh_pools) {
571 update_pool_replayers(m_local_cluster_watcher->get_pool_peers(),
572 m_local_cluster_watcher->get_site_name());
573 }
574 m_cache_manager_handler->run_cache_manager();
7c673cae 575 }
9f95a23c 576 m_cond.wait_for(l, 1s);
7c673cae
FG
577 }
578
579 // stop all pool replayers in parallel
9f95a23c 580 std::lock_guard locker{m_lock};
7c673cae
FG
581 for (auto &pool_replayer : m_pool_replayers) {
582 pool_replayer.second->stop(false);
583 }
584 dout(20) << "return" << dendl;
585}
586
9f95a23c 587void Mirror::print_status(Formatter *f)
7c673cae
FG
588{
589 dout(20) << "enter" << dendl;
590
9f95a23c 591 std::lock_guard l{m_lock};
7c673cae
FG
592
593 if (m_stopping) {
594 return;
595 }
596
9f95a23c
TL
597 f->open_object_section("mirror_status");
598 f->open_array_section("pool_replayers");
7c673cae 599 for (auto &pool_replayer : m_pool_replayers) {
9f95a23c 600 pool_replayer.second->print_status(f);
7c673cae 601 }
9f95a23c
TL
602 f->close_section();
603 f->close_section();
7c673cae
FG
604}
605
606void Mirror::start()
607{
608 dout(20) << "enter" << dendl;
9f95a23c 609 std::lock_guard l{m_lock};
7c673cae
FG
610
611 if (m_stopping) {
612 return;
613 }
614
615 m_manual_stop = false;
616
617 for (auto &pool_replayer : m_pool_replayers) {
618 pool_replayer.second->start();
619 }
620}
621
622void Mirror::stop()
623{
624 dout(20) << "enter" << dendl;
9f95a23c 625 std::lock_guard l{m_lock};
7c673cae
FG
626
627 if (m_stopping) {
628 return;
629 }
630
631 m_manual_stop = true;
632
633 for (auto &pool_replayer : m_pool_replayers) {
634 pool_replayer.second->stop(true);
635 }
636}
637
638void Mirror::restart()
639{
640 dout(20) << "enter" << dendl;
9f95a23c 641 std::lock_guard l{m_lock};
7c673cae
FG
642
643 if (m_stopping) {
644 return;
645 }
646
647 m_manual_stop = false;
648
649 for (auto &pool_replayer : m_pool_replayers) {
650 pool_replayer.second->restart();
651 }
652}
653
654void Mirror::flush()
655{
656 dout(20) << "enter" << dendl;
9f95a23c 657 std::lock_guard l{m_lock};
7c673cae
FG
658
659 if (m_stopping || m_manual_stop) {
660 return;
661 }
662
663 for (auto &pool_replayer : m_pool_replayers) {
664 pool_replayer.second->flush();
665 }
666}
667
668void Mirror::release_leader()
669{
670 dout(20) << "enter" << dendl;
9f95a23c 671 std::lock_guard l{m_lock};
7c673cae
FG
672
673 if (m_stopping) {
674 return;
675 }
676
677 for (auto &pool_replayer : m_pool_replayers) {
678 pool_replayer.second->release_leader();
679 }
680}
681
9f95a23c
TL
682void Mirror::update_pool_replayers(const PoolPeers &pool_peers,
683 const std::string& site_name)
7c673cae
FG
684{
685 dout(20) << "enter" << dendl;
9f95a23c 686 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
687
688 // remove stale pool replayers before creating new pool replayers
689 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
690 auto &peer = it->first.second;
691 auto pool_peer_it = pool_peers.find(it->first.first);
c07f9fc5
FG
692 if (pool_peer_it == pool_peers.end() ||
693 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
7c673cae
FG
694 dout(20) << "removing pool replayer for " << peer << dendl;
695 // TODO: make async
c07f9fc5 696 it->second->shut_down();
7c673cae
FG
697 it = m_pool_replayers.erase(it);
698 } else {
699 ++it;
700 }
701 }
702
703 for (auto &kv : pool_peers) {
704 for (auto &peer : kv.second) {
705 PoolPeer pool_peer(kv.first, peer);
c07f9fc5
FG
706
707 auto pool_replayers_it = m_pool_replayers.find(pool_peer);
708 if (pool_replayers_it != m_pool_replayers.end()) {
709 auto& pool_replayer = pool_replayers_it->second;
9f95a23c
TL
710 if (!m_site_name.empty() && !site_name.empty() &&
711 m_site_name != site_name) {
712 dout(0) << "restarting pool replayer for " << peer << " due to "
713 << "updated site name" << dendl;
714 // TODO: make async
715 pool_replayer->shut_down();
716 pool_replayer->init(site_name);
717 } else if (pool_replayer->is_blacklisted()) {
c07f9fc5
FG
718 derr << "restarting blacklisted pool replayer for " << peer << dendl;
719 // TODO: make async
720 pool_replayer->shut_down();
9f95a23c 721 pool_replayer->init(site_name);
c07f9fc5
FG
722 } else if (!pool_replayer->is_running()) {
723 derr << "restarting failed pool replayer for " << peer << dendl;
724 // TODO: make async
725 pool_replayer->shut_down();
9f95a23c 726 pool_replayer->init(site_name);
c07f9fc5
FG
727 }
728 } else {
7c673cae 729 dout(20) << "starting pool replayer for " << peer << dendl;
9f95a23c
TL
730 unique_ptr<PoolReplayer<>> pool_replayer(
731 new PoolReplayer<>(m_threads, m_service_daemon.get(),
732 m_cache_manager_handler.get(),
733 m_pool_meta_cache.get(), kv.first, peer,
734 m_args));
7c673cae 735
c07f9fc5 736 // TODO: make async
9f95a23c 737 pool_replayer->init(site_name);
7c673cae
FG
738 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
739 }
740 }
741
742 // TODO currently only support a single peer
743 }
9f95a23c
TL
744
745 m_site_name = site_name;
7c673cae
FG
746}
747
748} // namespace mirror
749} // namespace rbd