1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <boost/range/adaptor/map.hpp>
8 #include "common/Formatter.h"
9 #include "common/PriorityCache.h"
10 #include "common/admin_socket.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "journal/Types.h"
14 #include "librbd/ImageCtx.h"
15 #include "perfglue/heap_profiler.h"
17 #include "PoolMetaCache.h"
18 #include "ServiceDaemon.h"
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rbd_mirror
28 using std::unique_ptr
;
31 using librados::Rados
;
32 using librados::IoCtx
;
33 using librbd::mirror_peer_t
;
40 class MirrorAdminSocketCommand
{
42 virtual ~MirrorAdminSocketCommand() {}
43 virtual int call(Formatter
*f
) = 0;
46 class StatusCommand
: public MirrorAdminSocketCommand
{
48 explicit StatusCommand(Mirror
*mirror
) : mirror(mirror
) {}
50 int call(Formatter
*f
) override
{
51 mirror
->print_status(f
);
59 class StartCommand
: public MirrorAdminSocketCommand
{
61 explicit StartCommand(Mirror
*mirror
) : mirror(mirror
) {}
63 int call(Formatter
*f
) override
{
72 class StopCommand
: public MirrorAdminSocketCommand
{
74 explicit StopCommand(Mirror
*mirror
) : mirror(mirror
) {}
76 int call(Formatter
*f
) override
{
85 class RestartCommand
: public MirrorAdminSocketCommand
{
87 explicit RestartCommand(Mirror
*mirror
) : mirror(mirror
) {}
89 int call(Formatter
*f
) override
{
98 class FlushCommand
: public MirrorAdminSocketCommand
{
100 explicit FlushCommand(Mirror
*mirror
) : mirror(mirror
) {}
102 int call(Formatter
*f
) override
{
111 class LeaderReleaseCommand
: public MirrorAdminSocketCommand
{
113 explicit LeaderReleaseCommand(Mirror
*mirror
) : mirror(mirror
) {}
115 int call(Formatter
*f
) override
{
116 mirror
->release_leader();
125 #define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \
126 << m_name << " " << __func__ << ": "
128 struct PriCache
: public PriorityCache::PriCache
{
130 int64_t m_base_cache_max_size
;
131 int64_t m_extra_cache_max_size
;
133 PriorityCache::Priority m_base_cache_pri
= PriorityCache::Priority::PRI10
;
134 PriorityCache::Priority m_extra_cache_pri
= PriorityCache::Priority::PRI10
;
135 int64_t m_base_cache_bytes
= 0;
136 int64_t m_extra_cache_bytes
= 0;
137 int64_t m_committed_bytes
= 0;
138 double m_cache_ratio
= 0;
140 PriCache(const std::string
&name
, uint64_t min_size
, uint64_t max_size
)
141 : m_name(name
), m_base_cache_max_size(min_size
),
142 m_extra_cache_max_size(max_size
- min_size
) {
143 ceph_assert(max_size
>= min_size
);
147 if (m_base_cache_pri
== PriorityCache::Priority::PRI0
) {
150 auto pri
= static_cast<uint8_t>(m_base_cache_pri
);
151 m_base_cache_pri
= static_cast<PriorityCache::Priority
>(--pri
);
153 dout(30) << m_base_cache_pri
<< dendl
;
156 int64_t request_cache_bytes(PriorityCache::Priority pri
,
157 uint64_t total_cache
) const override
{
158 int64_t cache_bytes
= 0;
160 if (pri
== m_base_cache_pri
) {
161 cache_bytes
+= m_base_cache_max_size
;
163 if (pri
== m_extra_cache_pri
) {
164 cache_bytes
+= m_extra_cache_max_size
;
167 dout(30) << cache_bytes
<< dendl
;
172 int64_t get_cache_bytes(PriorityCache::Priority pri
) const override
{
173 int64_t cache_bytes
= 0;
175 if (pri
== m_base_cache_pri
) {
176 cache_bytes
+= m_base_cache_bytes
;
178 if (pri
== m_extra_cache_pri
) {
179 cache_bytes
+= m_extra_cache_bytes
;
182 dout(30) << "pri=" << pri
<< " " << cache_bytes
<< dendl
;
187 int64_t get_cache_bytes() const override
{
188 auto cache_bytes
= m_base_cache_bytes
+ m_extra_cache_bytes
;
190 dout(30) << m_base_cache_bytes
<< "+" << m_extra_cache_bytes
<< "="
191 << cache_bytes
<< dendl
;
196 void set_cache_bytes(PriorityCache::Priority pri
, int64_t bytes
) override
{
197 ceph_assert(bytes
>= 0);
198 ceph_assert(pri
== m_base_cache_pri
|| pri
== m_extra_cache_pri
||
201 dout(30) << "pri=" << pri
<< " " << bytes
<< dendl
;
203 if (pri
== m_base_cache_pri
) {
204 m_base_cache_bytes
= std::min(m_base_cache_max_size
, bytes
);
205 bytes
-= std::min(m_base_cache_bytes
, bytes
);
208 if (pri
== m_extra_cache_pri
) {
209 m_extra_cache_bytes
= bytes
;
213 void add_cache_bytes(PriorityCache::Priority pri
, int64_t bytes
) override
{
214 ceph_assert(bytes
>= 0);
215 ceph_assert(pri
== m_base_cache_pri
|| pri
== m_extra_cache_pri
);
217 dout(30) << "pri=" << pri
<< " " << bytes
<< dendl
;
219 if (pri
== m_base_cache_pri
) {
220 ceph_assert(m_base_cache_bytes
<= m_base_cache_max_size
);
222 auto chunk
= std::min(m_base_cache_max_size
- m_base_cache_bytes
, bytes
);
223 m_base_cache_bytes
+= chunk
;
227 if (pri
== m_extra_cache_pri
) {
228 m_extra_cache_bytes
+= bytes
;
232 int64_t commit_cache_size(uint64_t total_cache
) override
{
233 m_committed_bytes
= p2roundup
<int64_t>(get_cache_bytes(), 4096);
235 dout(30) << m_committed_bytes
<< dendl
;
237 return m_committed_bytes
;
240 int64_t get_committed_size() const override
{
241 dout(30) << m_committed_bytes
<< dendl
;
243 return m_committed_bytes
;
246 double get_cache_ratio() const override
{
247 dout(30) << m_cache_ratio
<< dendl
;
249 return m_cache_ratio
;
252 void set_cache_ratio(double ratio
) override
{
253 dout(30) << m_cache_ratio
<< dendl
;
255 m_cache_ratio
= ratio
;
258 void shift_bins() override
{
261 void import_bins(const std::vector
<uint64_t> &intervals
) override
{
264 void set_bins(PriorityCache::Priority pri
, uint64_t end_interval
) override
{
267 uint64_t get_bins(PriorityCache::Priority pri
) const override
{
271 std::string
get_cache_name() const override
{
276 } // anonymous namespace
279 #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
282 class MirrorAdminSocketHook
: public AdminSocketHook
{
284 MirrorAdminSocketHook(CephContext
*cct
, Mirror
*mirror
) :
285 admin_socket(cct
->get_admin_socket()) {
289 command
= "rbd mirror status";
290 r
= admin_socket
->register_command(command
, this,
291 "get status for rbd mirror");
293 commands
[command
] = new StatusCommand(mirror
);
296 command
= "rbd mirror start";
297 r
= admin_socket
->register_command(command
, this,
300 commands
[command
] = new StartCommand(mirror
);
303 command
= "rbd mirror stop";
304 r
= admin_socket
->register_command(command
, this,
307 commands
[command
] = new StopCommand(mirror
);
310 command
= "rbd mirror restart";
311 r
= admin_socket
->register_command(command
, this,
312 "restart rbd mirror");
314 commands
[command
] = new RestartCommand(mirror
);
317 command
= "rbd mirror flush";
318 r
= admin_socket
->register_command(command
, this,
321 commands
[command
] = new FlushCommand(mirror
);
324 command
= "rbd mirror leader release";
325 r
= admin_socket
->register_command(command
, this,
326 "release rbd mirror leader");
328 commands
[command
] = new LeaderReleaseCommand(mirror
);
332 ~MirrorAdminSocketHook() override
{
333 (void)admin_socket
->unregister_commands(this);
334 for (Commands::const_iterator i
= commands
.begin(); i
!= commands
.end();
340 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
344 bufferlist
& out
) override
{
345 Commands::const_iterator i
= commands
.find(command
);
346 ceph_assert(i
!= commands
.end());
347 return i
->second
->call(f
);
351 typedef std::map
<std::string
, MirrorAdminSocketCommand
*, std::less
<>> Commands
;
353 AdminSocket
*admin_socket
;
357 class CacheManagerHandler
: public journal::CacheManagerHandler
{
359 CacheManagerHandler(CephContext
*cct
)
362 if (!m_cct
->_conf
.get_val
<bool>("rbd_mirror_memory_autotune")) {
366 uint64_t base
= m_cct
->_conf
.get_val
<Option::size_t>(
367 "rbd_mirror_memory_base");
368 double fragmentation
= m_cct
->_conf
.get_val
<double>(
369 "rbd_mirror_memory_expected_fragmentation");
370 uint64_t target
= m_cct
->_conf
.get_val
<Option::size_t>(
371 "rbd_mirror_memory_target");
372 uint64_t min
= m_cct
->_conf
.get_val
<Option::size_t>(
373 "rbd_mirror_memory_cache_min");
376 // When setting the maximum amount of memory to use for cache, first
377 // assume some base amount of memory for the daemon and then fudge in
378 // some overhead for fragmentation that scales with cache usage.
379 uint64_t ltarget
= (1.0 - fragmentation
) * target
;
380 if (ltarget
> base
+ min
) {
381 max
= ltarget
- base
;
384 m_next_balance
= ceph_clock_now();
385 m_next_resize
= ceph_clock_now();
387 m_cache_manager
= std::make_unique
<PriorityCache::Manager
>(
388 m_cct
, min
, max
, target
, false);
391 ~CacheManagerHandler() {
392 std::lock_guard locker
{m_lock
};
394 ceph_assert(m_caches
.empty());
397 void register_cache(const std::string
&cache_name
,
398 uint64_t min_size
, uint64_t max_size
,
399 journal::CacheRebalanceHandler
* handler
) override
{
400 if (!m_cache_manager
) {
401 handler
->handle_cache_rebalanced(max_size
);
405 dout(20) << cache_name
<< " min_size=" << min_size
<< " max_size="
406 << max_size
<< " handler=" << handler
<< dendl
;
408 std::lock_guard locker
{m_lock
};
410 auto p
= m_caches
.insert(
411 {cache_name
, {cache_name
, min_size
, max_size
, handler
}});
412 ceph_assert(p
.second
== true);
414 m_cache_manager
->insert(cache_name
, p
.first
->second
.pri_cache
, false);
415 m_next_balance
= ceph_clock_now();
418 void unregister_cache(const std::string
&cache_name
) override
{
419 if (!m_cache_manager
) {
423 dout(20) << cache_name
<< dendl
;
425 std::lock_guard locker
{m_lock
};
427 auto it
= m_caches
.find(cache_name
);
428 ceph_assert(it
!= m_caches
.end());
430 m_cache_manager
->erase(cache_name
);
432 m_next_balance
= ceph_clock_now();
435 void run_cache_manager() {
436 if (!m_cache_manager
) {
440 std::lock_guard locker
{m_lock
};
442 // Before we trim, check and see if it's time to rebalance/resize.
443 auto autotune_interval
= m_cct
->_conf
.get_val
<double>(
444 "rbd_mirror_memory_cache_autotune_interval");
445 auto resize_interval
= m_cct
->_conf
.get_val
<double>(
446 "rbd_mirror_memory_cache_resize_interval");
448 utime_t now
= ceph_clock_now();
450 if (autotune_interval
> 0 && m_next_balance
<= now
) {
451 dout(20) << "balance" << dendl
;
452 m_cache_manager
->balance();
454 for (auto &it
: m_caches
) {
455 auto pri_cache
= static_cast<PriCache
*>(it
.second
.pri_cache
.get());
456 auto new_cache_bytes
= pri_cache
->get_cache_bytes();
457 it
.second
.handler
->handle_cache_rebalanced(new_cache_bytes
);
458 pri_cache
->prioritize();
461 m_next_balance
= ceph_clock_now();
462 m_next_balance
+= autotune_interval
;
465 if (resize_interval
> 0 && m_next_resize
< now
) {
466 if (ceph_using_tcmalloc()) {
467 dout(20) << "tune memory" << dendl
;
468 m_cache_manager
->tune_memory();
471 m_next_resize
= ceph_clock_now();
472 m_next_resize
+= resize_interval
;
478 std::shared_ptr
<PriorityCache::PriCache
> pri_cache
;
479 journal::CacheRebalanceHandler
*handler
;
481 Cache(const std::string name
, uint64_t min_size
, uint64_t max_size
,
482 journal::CacheRebalanceHandler
*handler
)
483 : pri_cache(new PriCache(name
, min_size
, max_size
)), handler(handler
) {
489 mutable ceph::mutex m_lock
=
490 ceph::make_mutex("rbd::mirror::CacheManagerHandler");
491 std::unique_ptr
<PriorityCache::Manager
> m_cache_manager
;
492 std::map
<std::string
, Cache
> m_caches
;
494 utime_t m_next_balance
;
495 utime_t m_next_resize
;
498 Mirror::Mirror(CephContext
*cct
, const std::vector
<const char*> &args
) :
501 m_local(new librados::Rados()),
502 m_cache_manager_handler(new CacheManagerHandler(cct
)),
503 m_pool_meta_cache(new PoolMetaCache(cct
)),
504 m_asok_hook(new MirrorAdminSocketHook(cct
, this)) {
512 void Mirror::handle_signal(int signum
)
514 dout(20) << signum
<< dendl
;
516 std::lock_guard l
{m_lock
};
520 for (auto &it
: m_pool_replayers
) {
521 it
.second
->reopen_logs();
523 g_ceph_context
->reopen_logs();
533 ceph_abort_msgf("unexpected signal %d", signum
);
539 int r
= m_local
->init_with_context(m_cct
);
541 derr
<< "could not initialize rados handle" << dendl
;
545 r
= m_local
->connect();
547 derr
<< "error connecting to local cluster" << dendl
;
551 m_threads
= &(m_cct
->lookup_or_create_singleton_object
<
552 Threads
<librbd::ImageCtx
>>("rbd_mirror::threads", false, m_local
));
553 m_service_daemon
.reset(new ServiceDaemon
<>(m_cct
, m_local
, m_threads
));
555 r
= m_service_daemon
->init();
557 derr
<< "error registering service daemon: " << cpp_strerror(r
) << dendl
;
561 m_local_cluster_watcher
.reset(new ClusterWatcher(m_local
, m_lock
,
562 m_service_daemon
.get()));
568 dout(20) << "enter" << dendl
;
570 using namespace std::chrono_literals
;
571 utime_t next_refresh_pools
= ceph_clock_now();
573 while (!m_stopping
) {
574 utime_t now
= ceph_clock_now();
575 bool refresh_pools
= next_refresh_pools
<= now
;
577 m_local_cluster_watcher
->refresh_pools();
578 next_refresh_pools
= ceph_clock_now();
579 next_refresh_pools
+= m_cct
->_conf
.get_val
<uint64_t>(
580 "rbd_mirror_pool_replayers_refresh_interval");
582 std::unique_lock l
{m_lock
};
583 if (!m_manual_stop
) {
585 update_pool_replayers(m_local_cluster_watcher
->get_pool_peers(),
586 m_local_cluster_watcher
->get_site_name());
588 m_cache_manager_handler
->run_cache_manager();
590 m_cond
.wait_for(l
, 1s
);
593 // stop all pool replayers in parallel
594 std::lock_guard locker
{m_lock
};
595 for (auto &pool_replayer
: m_pool_replayers
) {
596 pool_replayer
.second
->stop(false);
598 dout(20) << "return" << dendl
;
601 void Mirror::print_status(Formatter
*f
)
603 dout(20) << "enter" << dendl
;
605 std::lock_guard l
{m_lock
};
611 f
->open_object_section("mirror_status");
612 f
->open_array_section("pool_replayers");
613 for (auto &pool_replayer
: m_pool_replayers
) {
614 pool_replayer
.second
->print_status(f
);
622 dout(20) << "enter" << dendl
;
623 std::lock_guard l
{m_lock
};
629 m_manual_stop
= false;
631 for (auto &pool_replayer
: m_pool_replayers
) {
632 pool_replayer
.second
->start();
638 dout(20) << "enter" << dendl
;
639 std::lock_guard l
{m_lock
};
645 m_manual_stop
= true;
647 for (auto &pool_replayer
: m_pool_replayers
) {
648 pool_replayer
.second
->stop(true);
652 void Mirror::restart()
654 dout(20) << "enter" << dendl
;
655 std::lock_guard l
{m_lock
};
661 m_manual_stop
= false;
663 for (auto &pool_replayer
: m_pool_replayers
) {
664 pool_replayer
.second
->restart();
670 dout(20) << "enter" << dendl
;
671 std::lock_guard l
{m_lock
};
673 if (m_stopping
|| m_manual_stop
) {
677 for (auto &pool_replayer
: m_pool_replayers
) {
678 pool_replayer
.second
->flush();
682 void Mirror::release_leader()
684 dout(20) << "enter" << dendl
;
685 std::lock_guard l
{m_lock
};
691 for (auto &pool_replayer
: m_pool_replayers
) {
692 pool_replayer
.second
->release_leader();
696 void Mirror::update_pool_replayers(const PoolPeers
&pool_peers
,
697 const std::string
& site_name
)
699 dout(20) << "enter" << dendl
;
700 ceph_assert(ceph_mutex_is_locked(m_lock
));
702 // remove stale pool replayers before creating new pool replayers
703 for (auto it
= m_pool_replayers
.begin(); it
!= m_pool_replayers
.end();) {
704 auto &peer
= it
->first
.second
;
705 auto pool_peer_it
= pool_peers
.find(it
->first
.first
);
706 if (pool_peer_it
== pool_peers
.end() ||
707 pool_peer_it
->second
.find(peer
) == pool_peer_it
->second
.end()) {
708 dout(20) << "removing pool replayer for " << peer
<< dendl
;
710 it
->second
->shut_down();
711 it
= m_pool_replayers
.erase(it
);
717 for (auto &kv
: pool_peers
) {
718 for (auto &peer
: kv
.second
) {
719 PoolPeer
pool_peer(kv
.first
, peer
);
721 auto pool_replayers_it
= m_pool_replayers
.find(pool_peer
);
722 if (pool_replayers_it
!= m_pool_replayers
.end()) {
723 auto& pool_replayer
= pool_replayers_it
->second
;
724 if (!m_site_name
.empty() && !site_name
.empty() &&
725 m_site_name
!= site_name
) {
726 dout(0) << "restarting pool replayer for " << peer
<< " due to "
727 << "updated site name" << dendl
;
729 pool_replayer
->shut_down();
730 pool_replayer
->init(site_name
);
731 } else if (pool_replayer
->is_blocklisted()) {
732 derr
<< "restarting blocklisted pool replayer for " << peer
<< dendl
;
734 pool_replayer
->shut_down();
735 pool_replayer
->init(site_name
);
736 } else if (!pool_replayer
->is_running()) {
737 derr
<< "restarting failed pool replayer for " << peer
<< dendl
;
739 pool_replayer
->shut_down();
740 pool_replayer
->init(site_name
);
743 dout(20) << "starting pool replayer for " << peer
<< dendl
;
744 unique_ptr
<PoolReplayer
<>> pool_replayer(
745 new PoolReplayer
<>(m_threads
, m_service_daemon
.get(),
746 m_cache_manager_handler
.get(),
747 m_pool_meta_cache
.get(), kv
.first
, peer
,
751 pool_replayer
->init(site_name
);
752 m_pool_replayers
.emplace(pool_peer
, std::move(pool_replayer
));
756 // TODO currently only support a single peer
759 m_site_name
= site_name
;
762 } // namespace mirror