]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/Mirror.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / tools / rbd_mirror / Mirror.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <signal.h>
5
6 #include <boost/range/adaptor/map.hpp>
7
8 #include "common/Formatter.h"
9 #include "common/PriorityCache.h"
10 #include "common/admin_socket.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "journal/Types.h"
14 #include "librbd/ImageCtx.h"
15 #include "perfglue/heap_profiler.h"
16 #include "Mirror.h"
17 #include "PoolMetaCache.h"
18 #include "ServiceDaemon.h"
19 #include "Threads.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rbd_mirror
23
24 using std::list;
25 using std::map;
26 using std::set;
27 using std::string;
28 using std::unique_ptr;
29 using std::vector;
30
31 using librados::Rados;
32 using librados::IoCtx;
33 using librbd::mirror_peer_t;
34
35 namespace rbd {
36 namespace mirror {
37
38 namespace {
39
40 class MirrorAdminSocketCommand {
41 public:
42 virtual ~MirrorAdminSocketCommand() {}
43 virtual int call(Formatter *f) = 0;
44 };
45
46 class StatusCommand : public MirrorAdminSocketCommand {
47 public:
48 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
49
50 int call(Formatter *f) override {
51 mirror->print_status(f);
52 return 0;
53 }
54
55 private:
56 Mirror *mirror;
57 };
58
59 class StartCommand : public MirrorAdminSocketCommand {
60 public:
61 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
62
63 int call(Formatter *f) override {
64 mirror->start();
65 return 0;
66 }
67
68 private:
69 Mirror *mirror;
70 };
71
72 class StopCommand : public MirrorAdminSocketCommand {
73 public:
74 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
75
76 int call(Formatter *f) override {
77 mirror->stop();
78 return 0;
79 }
80
81 private:
82 Mirror *mirror;
83 };
84
85 class RestartCommand : public MirrorAdminSocketCommand {
86 public:
87 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
88
89 int call(Formatter *f) override {
90 mirror->restart();
91 return 0;
92 }
93
94 private:
95 Mirror *mirror;
96 };
97
98 class FlushCommand : public MirrorAdminSocketCommand {
99 public:
100 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
101
102 int call(Formatter *f) override {
103 mirror->flush();
104 return 0;
105 }
106
107 private:
108 Mirror *mirror;
109 };
110
111 class LeaderReleaseCommand : public MirrorAdminSocketCommand {
112 public:
113 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
114
115 int call(Formatter *f) override {
116 mirror->release_leader();
117 return 0;
118 }
119
120 private:
121 Mirror *mirror;
122 };
123
124 #undef dout_prefix
125 #define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \
126 << m_name << " " << __func__ << ": "
127
128 struct PriCache : public PriorityCache::PriCache {
129 std::string m_name;
130 int64_t m_base_cache_max_size;
131 int64_t m_extra_cache_max_size;
132
133 PriorityCache::Priority m_base_cache_pri = PriorityCache::Priority::PRI10;
134 PriorityCache::Priority m_extra_cache_pri = PriorityCache::Priority::PRI10;
135 int64_t m_base_cache_bytes = 0;
136 int64_t m_extra_cache_bytes = 0;
137 int64_t m_committed_bytes = 0;
138 double m_cache_ratio = 0;
139
140 PriCache(const std::string &name, uint64_t min_size, uint64_t max_size)
141 : m_name(name), m_base_cache_max_size(min_size),
142 m_extra_cache_max_size(max_size - min_size) {
143 ceph_assert(max_size >= min_size);
144 }
145
146 void prioritize() {
147 if (m_base_cache_pri == PriorityCache::Priority::PRI0) {
148 return;
149 }
150 auto pri = static_cast<uint8_t>(m_base_cache_pri);
151 m_base_cache_pri = static_cast<PriorityCache::Priority>(--pri);
152
153 dout(30) << m_base_cache_pri << dendl;
154 }
155
156 int64_t request_cache_bytes(PriorityCache::Priority pri,
157 uint64_t total_cache) const override {
158 int64_t cache_bytes = 0;
159
160 if (pri == m_base_cache_pri) {
161 cache_bytes += m_base_cache_max_size;
162 }
163 if (pri == m_extra_cache_pri) {
164 cache_bytes += m_extra_cache_max_size;
165 }
166
167 dout(30) << cache_bytes << dendl;
168
169 return cache_bytes;
170 }
171
172 int64_t get_cache_bytes(PriorityCache::Priority pri) const override {
173 int64_t cache_bytes = 0;
174
175 if (pri == m_base_cache_pri) {
176 cache_bytes += m_base_cache_bytes;
177 }
178 if (pri == m_extra_cache_pri) {
179 cache_bytes += m_extra_cache_bytes;
180 }
181
182 dout(30) << "pri=" << pri << " " << cache_bytes << dendl;
183
184 return cache_bytes;
185 }
186
187 int64_t get_cache_bytes() const override {
188 auto cache_bytes = m_base_cache_bytes + m_extra_cache_bytes;
189
190 dout(30) << m_base_cache_bytes << "+" << m_extra_cache_bytes << "="
191 << cache_bytes << dendl;
192
193 return cache_bytes;
194 }
195
196 void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
197 ceph_assert(bytes >= 0);
198 ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri ||
199 bytes == 0);
200
201 dout(30) << "pri=" << pri << " " << bytes << dendl;
202
203 if (pri == m_base_cache_pri) {
204 m_base_cache_bytes = std::min(m_base_cache_max_size, bytes);
205 bytes -= std::min(m_base_cache_bytes, bytes);
206 }
207
208 if (pri == m_extra_cache_pri) {
209 m_extra_cache_bytes = bytes;
210 }
211 }
212
213 void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
214 ceph_assert(bytes >= 0);
215 ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri);
216
217 dout(30) << "pri=" << pri << " " << bytes << dendl;
218
219 if (pri == m_base_cache_pri) {
220 ceph_assert(m_base_cache_bytes <= m_base_cache_max_size);
221
222 auto chunk = std::min(m_base_cache_max_size - m_base_cache_bytes, bytes);
223 m_base_cache_bytes += chunk;
224 bytes -= chunk;
225 }
226
227 if (pri == m_extra_cache_pri) {
228 m_extra_cache_bytes += bytes;
229 }
230 }
231
232 int64_t commit_cache_size(uint64_t total_cache) override {
233 m_committed_bytes = p2roundup<int64_t>(get_cache_bytes(), 4096);
234
235 dout(30) << m_committed_bytes << dendl;
236
237 return m_committed_bytes;
238 }
239
240 int64_t get_committed_size() const override {
241 dout(30) << m_committed_bytes << dendl;
242
243 return m_committed_bytes;
244 }
245
246 double get_cache_ratio() const override {
247 dout(30) << m_cache_ratio << dendl;
248
249 return m_cache_ratio;
250 }
251
252 void set_cache_ratio(double ratio) override {
253 dout(30) << m_cache_ratio << dendl;
254
255 m_cache_ratio = ratio;
256 }
257
258 void shift_bins() override {
259 }
260
261 void import_bins(const std::vector<uint64_t> &intervals) override {
262 }
263
264 void set_bins(PriorityCache::Priority pri, uint64_t end_interval) override {
265 }
266
267 uint64_t get_bins(PriorityCache::Priority pri) const override {
268 return 0;
269 }
270
271 std::string get_cache_name() const override {
272 return m_name;
273 }
274 };
275
276 } // anonymous namespace
277
278 #undef dout_prefix
279 #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
280 << __func__ << ": "
281
282 class MirrorAdminSocketHook : public AdminSocketHook {
283 public:
284 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
285 admin_socket(cct->get_admin_socket()) {
286 std::string command;
287 int r;
288
289 command = "rbd mirror status";
290 r = admin_socket->register_command(command, this,
291 "get status for rbd mirror");
292 if (r == 0) {
293 commands[command] = new StatusCommand(mirror);
294 }
295
296 command = "rbd mirror start";
297 r = admin_socket->register_command(command, this,
298 "start rbd mirror");
299 if (r == 0) {
300 commands[command] = new StartCommand(mirror);
301 }
302
303 command = "rbd mirror stop";
304 r = admin_socket->register_command(command, this,
305 "stop rbd mirror");
306 if (r == 0) {
307 commands[command] = new StopCommand(mirror);
308 }
309
310 command = "rbd mirror restart";
311 r = admin_socket->register_command(command, this,
312 "restart rbd mirror");
313 if (r == 0) {
314 commands[command] = new RestartCommand(mirror);
315 }
316
317 command = "rbd mirror flush";
318 r = admin_socket->register_command(command, this,
319 "flush rbd mirror");
320 if (r == 0) {
321 commands[command] = new FlushCommand(mirror);
322 }
323
324 command = "rbd mirror leader release";
325 r = admin_socket->register_command(command, this,
326 "release rbd mirror leader");
327 if (r == 0) {
328 commands[command] = new LeaderReleaseCommand(mirror);
329 }
330 }
331
332 ~MirrorAdminSocketHook() override {
333 (void)admin_socket->unregister_commands(this);
334 for (Commands::const_iterator i = commands.begin(); i != commands.end();
335 ++i) {
336 delete i->second;
337 }
338 }
339
340 int call(std::string_view command, const cmdmap_t& cmdmap,
341 const bufferlist&,
342 Formatter *f,
343 std::ostream& errss,
344 bufferlist& out) override {
345 Commands::const_iterator i = commands.find(command);
346 ceph_assert(i != commands.end());
347 return i->second->call(f);
348 }
349
350 private:
351 typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
352
353 AdminSocket *admin_socket;
354 Commands commands;
355 };
356
357 class CacheManagerHandler : public journal::CacheManagerHandler {
358 public:
359 CacheManagerHandler(CephContext *cct)
360 : m_cct(cct) {
361
362 if (!m_cct->_conf.get_val<bool>("rbd_mirror_memory_autotune")) {
363 return;
364 }
365
366 uint64_t base = m_cct->_conf.get_val<Option::size_t>(
367 "rbd_mirror_memory_base");
368 double fragmentation = m_cct->_conf.get_val<double>(
369 "rbd_mirror_memory_expected_fragmentation");
370 uint64_t target = m_cct->_conf.get_val<Option::size_t>(
371 "rbd_mirror_memory_target");
372 uint64_t min = m_cct->_conf.get_val<Option::size_t>(
373 "rbd_mirror_memory_cache_min");
374 uint64_t max = min;
375
376 // When setting the maximum amount of memory to use for cache, first
377 // assume some base amount of memory for the daemon and then fudge in
378 // some overhead for fragmentation that scales with cache usage.
379 uint64_t ltarget = (1.0 - fragmentation) * target;
380 if (ltarget > base + min) {
381 max = ltarget - base;
382 }
383
384 m_next_balance = ceph_clock_now();
385 m_next_resize = ceph_clock_now();
386
387 m_cache_manager = std::make_unique<PriorityCache::Manager>(
388 m_cct, min, max, target, false);
389 }
390
391 ~CacheManagerHandler() {
392 std::lock_guard locker{m_lock};
393
394 ceph_assert(m_caches.empty());
395 }
396
397 void register_cache(const std::string &cache_name,
398 uint64_t min_size, uint64_t max_size,
399 journal::CacheRebalanceHandler* handler) override {
400 if (!m_cache_manager) {
401 handler->handle_cache_rebalanced(max_size);
402 return;
403 }
404
405 dout(20) << cache_name << " min_size=" << min_size << " max_size="
406 << max_size << " handler=" << handler << dendl;
407
408 std::lock_guard locker{m_lock};
409
410 auto p = m_caches.insert(
411 {cache_name, {cache_name, min_size, max_size, handler}});
412 ceph_assert(p.second == true);
413
414 m_cache_manager->insert(cache_name, p.first->second.pri_cache, false);
415 m_next_balance = ceph_clock_now();
416 }
417
418 void unregister_cache(const std::string &cache_name) override {
419 if (!m_cache_manager) {
420 return;
421 }
422
423 dout(20) << cache_name << dendl;
424
425 std::lock_guard locker{m_lock};
426
427 auto it = m_caches.find(cache_name);
428 ceph_assert(it != m_caches.end());
429
430 m_cache_manager->erase(cache_name);
431 m_caches.erase(it);
432 m_next_balance = ceph_clock_now();
433 }
434
435 void run_cache_manager() {
436 if (!m_cache_manager) {
437 return;
438 }
439
440 std::lock_guard locker{m_lock};
441
442 // Before we trim, check and see if it's time to rebalance/resize.
443 auto autotune_interval = m_cct->_conf.get_val<double>(
444 "rbd_mirror_memory_cache_autotune_interval");
445 auto resize_interval = m_cct->_conf.get_val<double>(
446 "rbd_mirror_memory_cache_resize_interval");
447
448 utime_t now = ceph_clock_now();
449
450 if (autotune_interval > 0 && m_next_balance <= now) {
451 dout(20) << "balance" << dendl;
452 m_cache_manager->balance();
453
454 for (auto &it : m_caches) {
455 auto pri_cache = static_cast<PriCache *>(it.second.pri_cache.get());
456 auto new_cache_bytes = pri_cache->get_cache_bytes();
457 it.second.handler->handle_cache_rebalanced(new_cache_bytes);
458 pri_cache->prioritize();
459 }
460
461 m_next_balance = ceph_clock_now();
462 m_next_balance += autotune_interval;
463 }
464
465 if (resize_interval > 0 && m_next_resize < now) {
466 if (ceph_using_tcmalloc()) {
467 dout(20) << "tune memory" << dendl;
468 m_cache_manager->tune_memory();
469 }
470
471 m_next_resize = ceph_clock_now();
472 m_next_resize += resize_interval;
473 }
474 }
475
476 private:
477 struct Cache {
478 std::shared_ptr<PriorityCache::PriCache> pri_cache;
479 journal::CacheRebalanceHandler *handler;
480
481 Cache(const std::string name, uint64_t min_size, uint64_t max_size,
482 journal::CacheRebalanceHandler *handler)
483 : pri_cache(new PriCache(name, min_size, max_size)), handler(handler) {
484 }
485 };
486
487 CephContext *m_cct;
488
489 mutable ceph::mutex m_lock =
490 ceph::make_mutex("rbd::mirror::CacheManagerHandler");
491 std::unique_ptr<PriorityCache::Manager> m_cache_manager;
492 std::map<std::string, Cache> m_caches;
493
494 utime_t m_next_balance;
495 utime_t m_next_resize;
496 };
497
498 Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
499 m_cct(cct),
500 m_args(args),
501 m_local(new librados::Rados()),
502 m_cache_manager_handler(new CacheManagerHandler(cct)),
503 m_pool_meta_cache(new PoolMetaCache(cct)),
504 m_asok_hook(new MirrorAdminSocketHook(cct, this)) {
505 }
506
507 Mirror::~Mirror()
508 {
509 delete m_asok_hook;
510 }
511
512 void Mirror::handle_signal(int signum)
513 {
514 dout(20) << signum << dendl;
515
516 std::lock_guard l{m_lock};
517
518 switch (signum) {
519 case SIGHUP:
520 for (auto &it : m_pool_replayers) {
521 it.second->reopen_logs();
522 }
523 g_ceph_context->reopen_logs();
524 break;
525
526 case SIGINT:
527 case SIGTERM:
528 m_stopping = true;
529 m_cond.notify_all();
530 break;
531
532 default:
533 ceph_abort_msgf("unexpected signal %d", signum);
534 }
535 }
536
537 int Mirror::init()
538 {
539 int r = m_local->init_with_context(m_cct);
540 if (r < 0) {
541 derr << "could not initialize rados handle" << dendl;
542 return r;
543 }
544
545 r = m_local->connect();
546 if (r < 0) {
547 derr << "error connecting to local cluster" << dendl;
548 return r;
549 }
550
551 m_threads = &(m_cct->lookup_or_create_singleton_object<
552 Threads<librbd::ImageCtx>>("rbd_mirror::threads", false, m_local));
553 m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
554
555 r = m_service_daemon->init();
556 if (r < 0) {
557 derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
558 return r;
559 }
560
561 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
562 m_service_daemon.get()));
563 return r;
564 }
565
566 void Mirror::run()
567 {
568 dout(20) << "enter" << dendl;
569
570 using namespace std::chrono_literals;
571 utime_t next_refresh_pools = ceph_clock_now();
572
573 while (!m_stopping) {
574 utime_t now = ceph_clock_now();
575 bool refresh_pools = next_refresh_pools <= now;
576 if (refresh_pools) {
577 m_local_cluster_watcher->refresh_pools();
578 next_refresh_pools = ceph_clock_now();
579 next_refresh_pools += m_cct->_conf.get_val<uint64_t>(
580 "rbd_mirror_pool_replayers_refresh_interval");
581 }
582 std::unique_lock l{m_lock};
583 if (!m_manual_stop) {
584 if (refresh_pools) {
585 update_pool_replayers(m_local_cluster_watcher->get_pool_peers(),
586 m_local_cluster_watcher->get_site_name());
587 }
588 m_cache_manager_handler->run_cache_manager();
589 }
590 m_cond.wait_for(l, 1s);
591 }
592
593 // stop all pool replayers in parallel
594 std::lock_guard locker{m_lock};
595 for (auto &pool_replayer : m_pool_replayers) {
596 pool_replayer.second->stop(false);
597 }
598 dout(20) << "return" << dendl;
599 }
600
601 void Mirror::print_status(Formatter *f)
602 {
603 dout(20) << "enter" << dendl;
604
605 std::lock_guard l{m_lock};
606
607 if (m_stopping) {
608 return;
609 }
610
611 f->open_object_section("mirror_status");
612 f->open_array_section("pool_replayers");
613 for (auto &pool_replayer : m_pool_replayers) {
614 pool_replayer.second->print_status(f);
615 }
616 f->close_section();
617 f->close_section();
618 }
619
620 void Mirror::start()
621 {
622 dout(20) << "enter" << dendl;
623 std::lock_guard l{m_lock};
624
625 if (m_stopping) {
626 return;
627 }
628
629 m_manual_stop = false;
630
631 for (auto &pool_replayer : m_pool_replayers) {
632 pool_replayer.second->start();
633 }
634 }
635
636 void Mirror::stop()
637 {
638 dout(20) << "enter" << dendl;
639 std::lock_guard l{m_lock};
640
641 if (m_stopping) {
642 return;
643 }
644
645 m_manual_stop = true;
646
647 for (auto &pool_replayer : m_pool_replayers) {
648 pool_replayer.second->stop(true);
649 }
650 }
651
652 void Mirror::restart()
653 {
654 dout(20) << "enter" << dendl;
655 std::lock_guard l{m_lock};
656
657 if (m_stopping) {
658 return;
659 }
660
661 m_manual_stop = false;
662
663 for (auto &pool_replayer : m_pool_replayers) {
664 pool_replayer.second->restart();
665 }
666 }
667
668 void Mirror::flush()
669 {
670 dout(20) << "enter" << dendl;
671 std::lock_guard l{m_lock};
672
673 if (m_stopping || m_manual_stop) {
674 return;
675 }
676
677 for (auto &pool_replayer : m_pool_replayers) {
678 pool_replayer.second->flush();
679 }
680 }
681
682 void Mirror::release_leader()
683 {
684 dout(20) << "enter" << dendl;
685 std::lock_guard l{m_lock};
686
687 if (m_stopping) {
688 return;
689 }
690
691 for (auto &pool_replayer : m_pool_replayers) {
692 pool_replayer.second->release_leader();
693 }
694 }
695
696 void Mirror::update_pool_replayers(const PoolPeers &pool_peers,
697 const std::string& site_name)
698 {
699 dout(20) << "enter" << dendl;
700 ceph_assert(ceph_mutex_is_locked(m_lock));
701
702 // remove stale pool replayers before creating new pool replayers
703 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
704 auto &peer = it->first.second;
705 auto pool_peer_it = pool_peers.find(it->first.first);
706 if (pool_peer_it == pool_peers.end() ||
707 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
708 dout(20) << "removing pool replayer for " << peer << dendl;
709 // TODO: make async
710 it->second->shut_down();
711 it = m_pool_replayers.erase(it);
712 } else {
713 ++it;
714 }
715 }
716
717 for (auto &kv : pool_peers) {
718 for (auto &peer : kv.second) {
719 PoolPeer pool_peer(kv.first, peer);
720
721 auto pool_replayers_it = m_pool_replayers.find(pool_peer);
722 if (pool_replayers_it != m_pool_replayers.end()) {
723 auto& pool_replayer = pool_replayers_it->second;
724 if (!m_site_name.empty() && !site_name.empty() &&
725 m_site_name != site_name) {
726 dout(0) << "restarting pool replayer for " << peer << " due to "
727 << "updated site name" << dendl;
728 // TODO: make async
729 pool_replayer->shut_down();
730 pool_replayer->init(site_name);
731 } else if (pool_replayer->is_blocklisted()) {
732 derr << "restarting blocklisted pool replayer for " << peer << dendl;
733 // TODO: make async
734 pool_replayer->shut_down();
735 pool_replayer->init(site_name);
736 } else if (!pool_replayer->is_running()) {
737 derr << "restarting failed pool replayer for " << peer << dendl;
738 // TODO: make async
739 pool_replayer->shut_down();
740 pool_replayer->init(site_name);
741 }
742 } else {
743 dout(20) << "starting pool replayer for " << peer << dendl;
744 unique_ptr<PoolReplayer<>> pool_replayer(
745 new PoolReplayer<>(m_threads, m_service_daemon.get(),
746 m_cache_manager_handler.get(),
747 m_pool_meta_cache.get(), kv.first, peer,
748 m_args));
749
750 // TODO: make async
751 pool_replayer->init(site_name);
752 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
753 }
754 }
755
756 // TODO currently only support a single peer
757 }
758
759 m_site_name = site_name;
760 }
761
762 } // namespace mirror
763 } // namespace rbd