1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <boost/range/adaptor/map.hpp>
8 #include "common/Formatter.h"
9 #include "common/PriorityCache.h"
10 #include "common/admin_socket.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "journal/Types.h"
14 #include "librbd/ImageCtx.h"
15 #include "perfglue/heap_profiler.h"
17 #include "PoolMetaCache.h"
18 #include "ServiceDaemon.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rbd_mirror
28 using std::unique_ptr
;
31 using librados::Rados
;
32 using librados::IoCtx
;
33 using librbd::mirror_peer_t
;
40 class MirrorAdminSocketCommand
{
42 virtual ~MirrorAdminSocketCommand() {}
43 virtual int call(Formatter
*f
) = 0;
46 class StatusCommand
: public MirrorAdminSocketCommand
{
48 explicit StatusCommand(Mirror
*mirror
) : mirror(mirror
) {}
50 int call(Formatter
*f
) override
{
51 mirror
->print_status(f
);
59 class StartCommand
: public MirrorAdminSocketCommand
{
61 explicit StartCommand(Mirror
*mirror
) : mirror(mirror
) {}
63 int call(Formatter
*f
) override
{
72 class StopCommand
: public MirrorAdminSocketCommand
{
74 explicit StopCommand(Mirror
*mirror
) : mirror(mirror
) {}
76 int call(Formatter
*f
) override
{
85 class RestartCommand
: public MirrorAdminSocketCommand
{
87 explicit RestartCommand(Mirror
*mirror
) : mirror(mirror
) {}
89 int call(Formatter
*f
) override
{
98 class FlushCommand
: public MirrorAdminSocketCommand
{
100 explicit FlushCommand(Mirror
*mirror
) : mirror(mirror
) {}
102 int call(Formatter
*f
) override
{
111 class LeaderReleaseCommand
: public MirrorAdminSocketCommand
{
113 explicit LeaderReleaseCommand(Mirror
*mirror
) : mirror(mirror
) {}
115 int call(Formatter
*f
) override
{
116 mirror
->release_leader();
125 #define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \
126 << m_name << " " << __func__ << ": "
128 struct PriCache
: public PriorityCache::PriCache
{
130 int64_t m_base_cache_max_size
;
131 int64_t m_extra_cache_max_size
;
133 PriorityCache::Priority m_base_cache_pri
= PriorityCache::Priority::PRI10
;
134 PriorityCache::Priority m_extra_cache_pri
= PriorityCache::Priority::PRI10
;
135 int64_t m_base_cache_bytes
= 0;
136 int64_t m_extra_cache_bytes
= 0;
137 int64_t m_committed_bytes
= 0;
138 double m_cache_ratio
= 0;
140 PriCache(const std::string
&name
, uint64_t min_size
, uint64_t max_size
)
141 : m_name(name
), m_base_cache_max_size(min_size
),
142 m_extra_cache_max_size(max_size
- min_size
) {
143 ceph_assert(max_size
>= min_size
);
147 if (m_base_cache_pri
== PriorityCache::Priority::PRI0
) {
150 auto pri
= static_cast<uint8_t>(m_base_cache_pri
);
151 m_base_cache_pri
= static_cast<PriorityCache::Priority
>(--pri
);
153 dout(30) << m_base_cache_pri
<< dendl
;
156 int64_t request_cache_bytes(PriorityCache::Priority pri
,
157 uint64_t total_cache
) const override
{
158 int64_t cache_bytes
= 0;
160 if (pri
== m_base_cache_pri
) {
161 cache_bytes
+= m_base_cache_max_size
;
163 if (pri
== m_extra_cache_pri
) {
164 cache_bytes
+= m_extra_cache_max_size
;
167 dout(30) << cache_bytes
<< dendl
;
172 int64_t get_cache_bytes(PriorityCache::Priority pri
) const override
{
173 int64_t cache_bytes
= 0;
175 if (pri
== m_base_cache_pri
) {
176 cache_bytes
+= m_base_cache_bytes
;
178 if (pri
== m_extra_cache_pri
) {
179 cache_bytes
+= m_extra_cache_bytes
;
182 dout(30) << "pri=" << pri
<< " " << cache_bytes
<< dendl
;
187 int64_t get_cache_bytes() const override
{
188 auto cache_bytes
= m_base_cache_bytes
+ m_extra_cache_bytes
;
190 dout(30) << m_base_cache_bytes
<< "+" << m_extra_cache_bytes
<< "="
191 << cache_bytes
<< dendl
;
196 void set_cache_bytes(PriorityCache::Priority pri
, int64_t bytes
) override
{
197 ceph_assert(bytes
>= 0);
198 ceph_assert(pri
== m_base_cache_pri
|| pri
== m_extra_cache_pri
||
201 dout(30) << "pri=" << pri
<< " " << bytes
<< dendl
;
203 if (pri
== m_base_cache_pri
) {
204 m_base_cache_bytes
= std::min(m_base_cache_max_size
, bytes
);
205 bytes
-= std::min(m_base_cache_bytes
, bytes
);
208 if (pri
== m_extra_cache_pri
) {
209 m_extra_cache_bytes
= bytes
;
213 void add_cache_bytes(PriorityCache::Priority pri
, int64_t bytes
) override
{
214 ceph_assert(bytes
>= 0);
215 ceph_assert(pri
== m_base_cache_pri
|| pri
== m_extra_cache_pri
);
217 dout(30) << "pri=" << pri
<< " " << bytes
<< dendl
;
219 if (pri
== m_base_cache_pri
) {
220 ceph_assert(m_base_cache_bytes
<= m_base_cache_max_size
);
222 auto chunk
= std::min(m_base_cache_max_size
- m_base_cache_bytes
, bytes
);
223 m_base_cache_bytes
+= chunk
;
227 if (pri
== m_extra_cache_pri
) {
228 m_extra_cache_bytes
+= bytes
;
232 int64_t commit_cache_size(uint64_t total_cache
) override
{
233 m_committed_bytes
= p2roundup
<int64_t>(get_cache_bytes(), 4096);
235 dout(30) << m_committed_bytes
<< dendl
;
237 return m_committed_bytes
;
240 int64_t get_committed_size() const override
{
241 dout(30) << m_committed_bytes
<< dendl
;
243 return m_committed_bytes
;
246 double get_cache_ratio() const override
{
247 dout(30) << m_cache_ratio
<< dendl
;
249 return m_cache_ratio
;
252 void set_cache_ratio(double ratio
) override
{
253 dout(30) << m_cache_ratio
<< dendl
;
255 m_cache_ratio
= ratio
;
258 void shift_bins() override
{
261 void import_bins(const std::vector
<uint64_t> &intervals
) override
{
264 void set_bins(PriorityCache::Priority pri
, uint64_t end_interval
) override
{
267 uint64_t get_bins(PriorityCache::Priority pri
) const override
{
271 std::string
get_cache_name() const override
{
276 } // anonymous namespace
279 #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
282 class MirrorAdminSocketHook
: public AdminSocketHook
{
284 MirrorAdminSocketHook(CephContext
*cct
, Mirror
*mirror
) :
285 admin_socket(cct
->get_admin_socket()) {
289 command
= "rbd mirror status";
290 r
= admin_socket
->register_command(command
, this,
291 "get status for rbd mirror");
293 commands
[command
] = new StatusCommand(mirror
);
296 command
= "rbd mirror start";
297 r
= admin_socket
->register_command(command
, this,
300 commands
[command
] = new StartCommand(mirror
);
303 command
= "rbd mirror stop";
304 r
= admin_socket
->register_command(command
, this,
307 commands
[command
] = new StopCommand(mirror
);
310 command
= "rbd mirror restart";
311 r
= admin_socket
->register_command(command
, this,
312 "restart rbd mirror");
314 commands
[command
] = new RestartCommand(mirror
);
317 command
= "rbd mirror flush";
318 r
= admin_socket
->register_command(command
, this,
321 commands
[command
] = new FlushCommand(mirror
);
324 command
= "rbd mirror leader release";
325 r
= admin_socket
->register_command(command
, this,
326 "release rbd mirror leader");
328 commands
[command
] = new LeaderReleaseCommand(mirror
);
332 ~MirrorAdminSocketHook() override
{
333 (void)admin_socket
->unregister_commands(this);
334 for (Commands::const_iterator i
= commands
.begin(); i
!= commands
.end();
340 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
343 bufferlist
& out
) override
{
344 Commands::const_iterator i
= commands
.find(command
);
345 ceph_assert(i
!= commands
.end());
346 return i
->second
->call(f
);
350 typedef std::map
<std::string
, MirrorAdminSocketCommand
*, std::less
<>> Commands
;
352 AdminSocket
*admin_socket
;
356 class CacheManagerHandler
: public journal::CacheManagerHandler
{
358 CacheManagerHandler(CephContext
*cct
)
361 if (!m_cct
->_conf
.get_val
<bool>("rbd_mirror_memory_autotune")) {
365 uint64_t base
= m_cct
->_conf
.get_val
<Option::size_t>(
366 "rbd_mirror_memory_base");
367 double fragmentation
= m_cct
->_conf
.get_val
<double>(
368 "rbd_mirror_memory_expected_fragmentation");
369 uint64_t target
= m_cct
->_conf
.get_val
<Option::size_t>(
370 "rbd_mirror_memory_target");
371 uint64_t min
= m_cct
->_conf
.get_val
<Option::size_t>(
372 "rbd_mirror_memory_cache_min");
375 // When setting the maximum amount of memory to use for cache, first
376 // assume some base amount of memory for the daemon and then fudge in
377 // some overhead for fragmentation that scales with cache usage.
378 uint64_t ltarget
= (1.0 - fragmentation
) * target
;
379 if (ltarget
> base
+ min
) {
380 max
= ltarget
- base
;
383 m_next_balance
= ceph_clock_now();
384 m_next_resize
= ceph_clock_now();
386 m_cache_manager
= std::make_unique
<PriorityCache::Manager
>(
387 m_cct
, min
, max
, target
, false);
390 ~CacheManagerHandler() {
391 std::lock_guard locker
{m_lock
};
393 ceph_assert(m_caches
.empty());
396 void register_cache(const std::string
&cache_name
,
397 uint64_t min_size
, uint64_t max_size
,
398 journal::CacheRebalanceHandler
* handler
) override
{
399 if (!m_cache_manager
) {
400 handler
->handle_cache_rebalanced(max_size
);
404 dout(20) << cache_name
<< " min_size=" << min_size
<< " max_size="
405 << max_size
<< " handler=" << handler
<< dendl
;
407 std::lock_guard locker
{m_lock
};
409 auto p
= m_caches
.insert(
410 {cache_name
, {cache_name
, min_size
, max_size
, handler
}});
411 ceph_assert(p
.second
== true);
413 m_cache_manager
->insert(cache_name
, p
.first
->second
.pri_cache
, false);
414 m_next_balance
= ceph_clock_now();
417 void unregister_cache(const std::string
&cache_name
) override
{
418 if (!m_cache_manager
) {
422 dout(20) << cache_name
<< dendl
;
424 std::lock_guard locker
{m_lock
};
426 auto it
= m_caches
.find(cache_name
);
427 ceph_assert(it
!= m_caches
.end());
429 m_cache_manager
->erase(cache_name
);
431 m_next_balance
= ceph_clock_now();
434 void run_cache_manager() {
435 if (!m_cache_manager
) {
439 std::lock_guard locker
{m_lock
};
441 // Before we trim, check and see if it's time to rebalance/resize.
442 auto autotune_interval
= m_cct
->_conf
.get_val
<double>(
443 "rbd_mirror_memory_cache_autotune_interval");
444 auto resize_interval
= m_cct
->_conf
.get_val
<double>(
445 "rbd_mirror_memory_cache_resize_interval");
447 utime_t now
= ceph_clock_now();
449 if (autotune_interval
> 0 && m_next_balance
<= now
) {
450 dout(20) << "balance" << dendl
;
451 m_cache_manager
->balance();
453 for (auto &it
: m_caches
) {
454 auto pri_cache
= static_cast<PriCache
*>(it
.second
.pri_cache
.get());
455 auto new_cache_bytes
= pri_cache
->get_cache_bytes();
456 it
.second
.handler
->handle_cache_rebalanced(new_cache_bytes
);
457 pri_cache
->prioritize();
460 m_next_balance
= ceph_clock_now();
461 m_next_balance
+= autotune_interval
;
464 if (resize_interval
> 0 && m_next_resize
< now
) {
465 if (ceph_using_tcmalloc()) {
466 dout(20) << "tune memory" << dendl
;
467 m_cache_manager
->tune_memory();
470 m_next_resize
= ceph_clock_now();
471 m_next_resize
+= resize_interval
;
477 std::shared_ptr
<PriorityCache::PriCache
> pri_cache
;
478 journal::CacheRebalanceHandler
*handler
;
480 Cache(const std::string name
, uint64_t min_size
, uint64_t max_size
,
481 journal::CacheRebalanceHandler
*handler
)
482 : pri_cache(new PriCache(name
, min_size
, max_size
)), handler(handler
) {
488 mutable ceph::mutex m_lock
=
489 ceph::make_mutex("rbd::mirror::CacheManagerHandler");
490 std::unique_ptr
<PriorityCache::Manager
> m_cache_manager
;
491 std::map
<std::string
, Cache
> m_caches
;
493 utime_t m_next_balance
;
494 utime_t m_next_resize
;
497 Mirror::Mirror(CephContext
*cct
, const std::vector
<const char*> &args
) :
500 m_local(new librados::Rados()),
501 m_cache_manager_handler(new CacheManagerHandler(cct
)),
502 m_pool_meta_cache(new PoolMetaCache(cct
)),
503 m_asok_hook(new MirrorAdminSocketHook(cct
, this)) {
511 void Mirror::handle_signal(int signum
)
513 dout(20) << signum
<< dendl
;
515 std::lock_guard l
{m_lock
};
519 for (auto &it
: m_pool_replayers
) {
520 it
.second
->reopen_logs();
522 g_ceph_context
->reopen_logs();
532 ceph_abort_msgf("unexpected signal %d", signum
);
538 int r
= m_local
->init_with_context(m_cct
);
540 derr
<< "could not initialize rados handle" << dendl
;
544 r
= m_local
->connect();
546 derr
<< "error connecting to local cluster" << dendl
;
550 m_threads
= &(m_cct
->lookup_or_create_singleton_object
<
551 Threads
<librbd::ImageCtx
>>("rbd_mirror::threads", false, m_local
));
552 m_service_daemon
.reset(new ServiceDaemon
<>(m_cct
, m_local
, m_threads
));
554 r
= m_service_daemon
->init();
556 derr
<< "error registering service daemon: " << cpp_strerror(r
) << dendl
;
560 m_local_cluster_watcher
.reset(new ClusterWatcher(m_local
, m_lock
,
561 m_service_daemon
.get()));
567 dout(20) << "enter" << dendl
;
569 using namespace std::chrono_literals
;
570 utime_t next_refresh_pools
= ceph_clock_now();
572 while (!m_stopping
) {
573 utime_t now
= ceph_clock_now();
574 bool refresh_pools
= next_refresh_pools
<= now
;
576 m_local_cluster_watcher
->refresh_pools();
577 next_refresh_pools
= ceph_clock_now();
578 next_refresh_pools
+= m_cct
->_conf
.get_val
<uint64_t>(
579 "rbd_mirror_pool_replayers_refresh_interval");
581 std::unique_lock l
{m_lock
};
582 if (!m_manual_stop
) {
584 update_pool_replayers(m_local_cluster_watcher
->get_pool_peers(),
585 m_local_cluster_watcher
->get_site_name());
587 m_cache_manager_handler
->run_cache_manager();
589 m_cond
.wait_for(l
, 1s
);
592 // stop all pool replayers in parallel
593 std::lock_guard locker
{m_lock
};
594 for (auto &pool_replayer
: m_pool_replayers
) {
595 pool_replayer
.second
->stop(false);
597 dout(20) << "return" << dendl
;
600 void Mirror::print_status(Formatter
*f
)
602 dout(20) << "enter" << dendl
;
604 std::lock_guard l
{m_lock
};
610 f
->open_object_section("mirror_status");
611 f
->open_array_section("pool_replayers");
612 for (auto &pool_replayer
: m_pool_replayers
) {
613 pool_replayer
.second
->print_status(f
);
621 dout(20) << "enter" << dendl
;
622 std::lock_guard l
{m_lock
};
628 m_manual_stop
= false;
630 for (auto &pool_replayer
: m_pool_replayers
) {
631 pool_replayer
.second
->start();
637 dout(20) << "enter" << dendl
;
638 std::lock_guard l
{m_lock
};
644 m_manual_stop
= true;
646 for (auto &pool_replayer
: m_pool_replayers
) {
647 pool_replayer
.second
->stop(true);
651 void Mirror::restart()
653 dout(20) << "enter" << dendl
;
654 std::lock_guard l
{m_lock
};
660 m_manual_stop
= false;
662 for (auto &pool_replayer
: m_pool_replayers
) {
663 pool_replayer
.second
->restart();
669 dout(20) << "enter" << dendl
;
670 std::lock_guard l
{m_lock
};
672 if (m_stopping
|| m_manual_stop
) {
676 for (auto &pool_replayer
: m_pool_replayers
) {
677 pool_replayer
.second
->flush();
681 void Mirror::release_leader()
683 dout(20) << "enter" << dendl
;
684 std::lock_guard l
{m_lock
};
690 for (auto &pool_replayer
: m_pool_replayers
) {
691 pool_replayer
.second
->release_leader();
695 void Mirror::update_pool_replayers(const PoolPeers
&pool_peers
,
696 const std::string
& site_name
)
698 dout(20) << "enter" << dendl
;
699 ceph_assert(ceph_mutex_is_locked(m_lock
));
701 // remove stale pool replayers before creating new pool replayers
702 for (auto it
= m_pool_replayers
.begin(); it
!= m_pool_replayers
.end();) {
703 auto &peer
= it
->first
.second
;
704 auto pool_peer_it
= pool_peers
.find(it
->first
.first
);
705 if (pool_peer_it
== pool_peers
.end() ||
706 pool_peer_it
->second
.find(peer
) == pool_peer_it
->second
.end()) {
707 dout(20) << "removing pool replayer for " << peer
<< dendl
;
709 it
->second
->shut_down();
710 it
= m_pool_replayers
.erase(it
);
716 for (auto &kv
: pool_peers
) {
717 for (auto &peer
: kv
.second
) {
718 PoolPeer
pool_peer(kv
.first
, peer
);
720 auto pool_replayers_it
= m_pool_replayers
.find(pool_peer
);
721 if (pool_replayers_it
!= m_pool_replayers
.end()) {
722 auto& pool_replayer
= pool_replayers_it
->second
;
723 if (!m_site_name
.empty() && !site_name
.empty() &&
724 m_site_name
!= site_name
) {
725 dout(0) << "restarting pool replayer for " << peer
<< " due to "
726 << "updated site name" << dendl
;
728 pool_replayer
->shut_down();
729 pool_replayer
->init(site_name
);
730 } else if (pool_replayer
->is_blocklisted()) {
731 derr
<< "restarting blocklisted pool replayer for " << peer
<< dendl
;
733 pool_replayer
->shut_down();
734 pool_replayer
->init(site_name
);
735 } else if (!pool_replayer
->is_running()) {
736 derr
<< "restarting failed pool replayer for " << peer
<< dendl
;
738 pool_replayer
->shut_down();
739 pool_replayer
->init(site_name
);
742 dout(20) << "starting pool replayer for " << peer
<< dendl
;
743 unique_ptr
<PoolReplayer
<>> pool_replayer(
744 new PoolReplayer
<>(m_threads
, m_service_daemon
.get(),
745 m_cache_manager_handler
.get(),
746 m_pool_meta_cache
.get(), kv
.first
, peer
,
750 pool_replayer
->init(site_name
);
751 m_pool_replayers
.emplace(pool_peer
, std::move(pool_replayer
));
755 // TODO currently only support a single peer
758 m_site_name
= site_name
;
761 } // namespace mirror