]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd_mirror/Mirror.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / tools / rbd_mirror / Mirror.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <signal.h>
5
6 #include <boost/range/adaptor/map.hpp>
7
8 #include "common/Formatter.h"
9 #include "common/PriorityCache.h"
10 #include "common/admin_socket.h"
11 #include "common/debug.h"
12 #include "common/errno.h"
13 #include "journal/Types.h"
14 #include "librbd/ImageCtx.h"
15 #include "perfglue/heap_profiler.h"
16 #include "Mirror.h"
17 #include "PoolMetaCache.h"
18 #include "ServiceDaemon.h"
19 #include "Threads.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rbd_mirror
23
24 using std::list;
25 using std::map;
26 using std::set;
27 using std::string;
28 using std::unique_ptr;
29 using std::vector;
30
31 using librados::Rados;
32 using librados::IoCtx;
33 using librbd::mirror_peer_t;
34
35 namespace rbd {
36 namespace mirror {
37
38 namespace {
39
40 class MirrorAdminSocketCommand {
41 public:
42 virtual ~MirrorAdminSocketCommand() {}
43 virtual int call(Formatter *f) = 0;
44 };
45
46 class StatusCommand : public MirrorAdminSocketCommand {
47 public:
48 explicit StatusCommand(Mirror *mirror) : mirror(mirror) {}
49
50 int call(Formatter *f) override {
51 mirror->print_status(f);
52 return 0;
53 }
54
55 private:
56 Mirror *mirror;
57 };
58
59 class StartCommand : public MirrorAdminSocketCommand {
60 public:
61 explicit StartCommand(Mirror *mirror) : mirror(mirror) {}
62
63 int call(Formatter *f) override {
64 mirror->start();
65 return 0;
66 }
67
68 private:
69 Mirror *mirror;
70 };
71
72 class StopCommand : public MirrorAdminSocketCommand {
73 public:
74 explicit StopCommand(Mirror *mirror) : mirror(mirror) {}
75
76 int call(Formatter *f) override {
77 mirror->stop();
78 return 0;
79 }
80
81 private:
82 Mirror *mirror;
83 };
84
85 class RestartCommand : public MirrorAdminSocketCommand {
86 public:
87 explicit RestartCommand(Mirror *mirror) : mirror(mirror) {}
88
89 int call(Formatter *f) override {
90 mirror->restart();
91 return 0;
92 }
93
94 private:
95 Mirror *mirror;
96 };
97
98 class FlushCommand : public MirrorAdminSocketCommand {
99 public:
100 explicit FlushCommand(Mirror *mirror) : mirror(mirror) {}
101
102 int call(Formatter *f) override {
103 mirror->flush();
104 return 0;
105 }
106
107 private:
108 Mirror *mirror;
109 };
110
111 class LeaderReleaseCommand : public MirrorAdminSocketCommand {
112 public:
113 explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {}
114
115 int call(Formatter *f) override {
116 mirror->release_leader();
117 return 0;
118 }
119
120 private:
121 Mirror *mirror;
122 };
123
124 #undef dout_prefix
125 #define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \
126 << m_name << " " << __func__ << ": "
127
128 struct PriCache : public PriorityCache::PriCache {
129 std::string m_name;
130 int64_t m_base_cache_max_size;
131 int64_t m_extra_cache_max_size;
132
133 PriorityCache::Priority m_base_cache_pri = PriorityCache::Priority::PRI10;
134 PriorityCache::Priority m_extra_cache_pri = PriorityCache::Priority::PRI10;
135 int64_t m_base_cache_bytes = 0;
136 int64_t m_extra_cache_bytes = 0;
137 int64_t m_committed_bytes = 0;
138 double m_cache_ratio = 0;
139
140 PriCache(const std::string &name, uint64_t min_size, uint64_t max_size)
141 : m_name(name), m_base_cache_max_size(min_size),
142 m_extra_cache_max_size(max_size - min_size) {
143 ceph_assert(max_size >= min_size);
144 }
145
146 void prioritize() {
147 if (m_base_cache_pri == PriorityCache::Priority::PRI0) {
148 return;
149 }
150 auto pri = static_cast<uint8_t>(m_base_cache_pri);
151 m_base_cache_pri = static_cast<PriorityCache::Priority>(--pri);
152
153 dout(30) << m_base_cache_pri << dendl;
154 }
155
156 int64_t request_cache_bytes(PriorityCache::Priority pri,
157 uint64_t total_cache) const override {
158 int64_t cache_bytes = 0;
159
160 if (pri == m_base_cache_pri) {
161 cache_bytes += m_base_cache_max_size;
162 }
163 if (pri == m_extra_cache_pri) {
164 cache_bytes += m_extra_cache_max_size;
165 }
166
167 dout(30) << cache_bytes << dendl;
168
169 return cache_bytes;
170 }
171
172 int64_t get_cache_bytes(PriorityCache::Priority pri) const override {
173 int64_t cache_bytes = 0;
174
175 if (pri == m_base_cache_pri) {
176 cache_bytes += m_base_cache_bytes;
177 }
178 if (pri == m_extra_cache_pri) {
179 cache_bytes += m_extra_cache_bytes;
180 }
181
182 dout(30) << "pri=" << pri << " " << cache_bytes << dendl;
183
184 return cache_bytes;
185 }
186
187 int64_t get_cache_bytes() const override {
188 auto cache_bytes = m_base_cache_bytes + m_extra_cache_bytes;
189
190 dout(30) << m_base_cache_bytes << "+" << m_extra_cache_bytes << "="
191 << cache_bytes << dendl;
192
193 return cache_bytes;
194 }
195
196 void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
197 ceph_assert(bytes >= 0);
198 ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri ||
199 bytes == 0);
200
201 dout(30) << "pri=" << pri << " " << bytes << dendl;
202
203 if (pri == m_base_cache_pri) {
204 m_base_cache_bytes = std::min(m_base_cache_max_size, bytes);
205 bytes -= std::min(m_base_cache_bytes, bytes);
206 }
207
208 if (pri == m_extra_cache_pri) {
209 m_extra_cache_bytes = bytes;
210 }
211 }
212
213 void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override {
214 ceph_assert(bytes >= 0);
215 ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri);
216
217 dout(30) << "pri=" << pri << " " << bytes << dendl;
218
219 if (pri == m_base_cache_pri) {
220 ceph_assert(m_base_cache_bytes <= m_base_cache_max_size);
221
222 auto chunk = std::min(m_base_cache_max_size - m_base_cache_bytes, bytes);
223 m_base_cache_bytes += chunk;
224 bytes -= chunk;
225 }
226
227 if (pri == m_extra_cache_pri) {
228 m_extra_cache_bytes += bytes;
229 }
230 }
231
232 int64_t commit_cache_size(uint64_t total_cache) override {
233 m_committed_bytes = p2roundup<int64_t>(get_cache_bytes(), 4096);
234
235 dout(30) << m_committed_bytes << dendl;
236
237 return m_committed_bytes;
238 }
239
240 int64_t get_committed_size() const override {
241 dout(30) << m_committed_bytes << dendl;
242
243 return m_committed_bytes;
244 }
245
246 double get_cache_ratio() const override {
247 dout(30) << m_cache_ratio << dendl;
248
249 return m_cache_ratio;
250 }
251
252 void set_cache_ratio(double ratio) override {
253 dout(30) << m_cache_ratio << dendl;
254
255 m_cache_ratio = ratio;
256 }
257
258 void shift_bins() override {
259 }
260
261 void import_bins(const std::vector<uint64_t> &intervals) override {
262 }
263
264 void set_bins(PriorityCache::Priority pri, uint64_t end_interval) override {
265 }
266
267 uint64_t get_bins(PriorityCache::Priority pri) const override {
268 return 0;
269 }
270
271 std::string get_cache_name() const override {
272 return m_name;
273 }
274 };
275
276 } // anonymous namespace
277
278 #undef dout_prefix
279 #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \
280 << __func__ << ": "
281
282 class MirrorAdminSocketHook : public AdminSocketHook {
283 public:
284 MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) :
285 admin_socket(cct->get_admin_socket()) {
286 std::string command;
287 int r;
288
289 command = "rbd mirror status";
290 r = admin_socket->register_command(command, this,
291 "get status for rbd mirror");
292 if (r == 0) {
293 commands[command] = new StatusCommand(mirror);
294 }
295
296 command = "rbd mirror start";
297 r = admin_socket->register_command(command, this,
298 "start rbd mirror");
299 if (r == 0) {
300 commands[command] = new StartCommand(mirror);
301 }
302
303 command = "rbd mirror stop";
304 r = admin_socket->register_command(command, this,
305 "stop rbd mirror");
306 if (r == 0) {
307 commands[command] = new StopCommand(mirror);
308 }
309
310 command = "rbd mirror restart";
311 r = admin_socket->register_command(command, this,
312 "restart rbd mirror");
313 if (r == 0) {
314 commands[command] = new RestartCommand(mirror);
315 }
316
317 command = "rbd mirror flush";
318 r = admin_socket->register_command(command, this,
319 "flush rbd mirror");
320 if (r == 0) {
321 commands[command] = new FlushCommand(mirror);
322 }
323
324 command = "rbd mirror leader release";
325 r = admin_socket->register_command(command, this,
326 "release rbd mirror leader");
327 if (r == 0) {
328 commands[command] = new LeaderReleaseCommand(mirror);
329 }
330 }
331
332 ~MirrorAdminSocketHook() override {
333 (void)admin_socket->unregister_commands(this);
334 for (Commands::const_iterator i = commands.begin(); i != commands.end();
335 ++i) {
336 delete i->second;
337 }
338 }
339
340 int call(std::string_view command, const cmdmap_t& cmdmap,
341 Formatter *f,
342 std::ostream& errss,
343 bufferlist& out) override {
344 Commands::const_iterator i = commands.find(command);
345 ceph_assert(i != commands.end());
346 return i->second->call(f);
347 }
348
349 private:
350 typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
351
352 AdminSocket *admin_socket;
353 Commands commands;
354 };
355
356 class CacheManagerHandler : public journal::CacheManagerHandler {
357 public:
358 CacheManagerHandler(CephContext *cct)
359 : m_cct(cct) {
360
361 if (!m_cct->_conf.get_val<bool>("rbd_mirror_memory_autotune")) {
362 return;
363 }
364
365 uint64_t base = m_cct->_conf.get_val<Option::size_t>(
366 "rbd_mirror_memory_base");
367 double fragmentation = m_cct->_conf.get_val<double>(
368 "rbd_mirror_memory_expected_fragmentation");
369 uint64_t target = m_cct->_conf.get_val<Option::size_t>(
370 "rbd_mirror_memory_target");
371 uint64_t min = m_cct->_conf.get_val<Option::size_t>(
372 "rbd_mirror_memory_cache_min");
373 uint64_t max = min;
374
375 // When setting the maximum amount of memory to use for cache, first
376 // assume some base amount of memory for the daemon and then fudge in
377 // some overhead for fragmentation that scales with cache usage.
378 uint64_t ltarget = (1.0 - fragmentation) * target;
379 if (ltarget > base + min) {
380 max = ltarget - base;
381 }
382
383 m_next_balance = ceph_clock_now();
384 m_next_resize = ceph_clock_now();
385
386 m_cache_manager = std::make_unique<PriorityCache::Manager>(
387 m_cct, min, max, target, false);
388 }
389
390 ~CacheManagerHandler() {
391 std::lock_guard locker{m_lock};
392
393 ceph_assert(m_caches.empty());
394 }
395
396 void register_cache(const std::string &cache_name,
397 uint64_t min_size, uint64_t max_size,
398 journal::CacheRebalanceHandler* handler) override {
399 if (!m_cache_manager) {
400 handler->handle_cache_rebalanced(max_size);
401 return;
402 }
403
404 dout(20) << cache_name << " min_size=" << min_size << " max_size="
405 << max_size << " handler=" << handler << dendl;
406
407 std::lock_guard locker{m_lock};
408
409 auto p = m_caches.insert(
410 {cache_name, {cache_name, min_size, max_size, handler}});
411 ceph_assert(p.second == true);
412
413 m_cache_manager->insert(cache_name, p.first->second.pri_cache, false);
414 m_next_balance = ceph_clock_now();
415 }
416
417 void unregister_cache(const std::string &cache_name) override {
418 if (!m_cache_manager) {
419 return;
420 }
421
422 dout(20) << cache_name << dendl;
423
424 std::lock_guard locker{m_lock};
425
426 auto it = m_caches.find(cache_name);
427 ceph_assert(it != m_caches.end());
428
429 m_cache_manager->erase(cache_name);
430 m_caches.erase(it);
431 m_next_balance = ceph_clock_now();
432 }
433
434 void run_cache_manager() {
435 if (!m_cache_manager) {
436 return;
437 }
438
439 std::lock_guard locker{m_lock};
440
441 // Before we trim, check and see if it's time to rebalance/resize.
442 auto autotune_interval = m_cct->_conf.get_val<double>(
443 "rbd_mirror_memory_cache_autotune_interval");
444 auto resize_interval = m_cct->_conf.get_val<double>(
445 "rbd_mirror_memory_cache_resize_interval");
446
447 utime_t now = ceph_clock_now();
448
449 if (autotune_interval > 0 && m_next_balance <= now) {
450 dout(20) << "balance" << dendl;
451 m_cache_manager->balance();
452
453 for (auto &it : m_caches) {
454 auto pri_cache = static_cast<PriCache *>(it.second.pri_cache.get());
455 auto new_cache_bytes = pri_cache->get_cache_bytes();
456 it.second.handler->handle_cache_rebalanced(new_cache_bytes);
457 pri_cache->prioritize();
458 }
459
460 m_next_balance = ceph_clock_now();
461 m_next_balance += autotune_interval;
462 }
463
464 if (resize_interval > 0 && m_next_resize < now) {
465 if (ceph_using_tcmalloc()) {
466 dout(20) << "tune memory" << dendl;
467 m_cache_manager->tune_memory();
468 }
469
470 m_next_resize = ceph_clock_now();
471 m_next_resize += resize_interval;
472 }
473 }
474
475 private:
476 struct Cache {
477 std::shared_ptr<PriorityCache::PriCache> pri_cache;
478 journal::CacheRebalanceHandler *handler;
479
480 Cache(const std::string name, uint64_t min_size, uint64_t max_size,
481 journal::CacheRebalanceHandler *handler)
482 : pri_cache(new PriCache(name, min_size, max_size)), handler(handler) {
483 }
484 };
485
486 CephContext *m_cct;
487
488 mutable ceph::mutex m_lock =
489 ceph::make_mutex("rbd::mirror::CacheManagerHandler");
490 std::unique_ptr<PriorityCache::Manager> m_cache_manager;
491 std::map<std::string, Cache> m_caches;
492
493 utime_t m_next_balance;
494 utime_t m_next_resize;
495 };
496
497 Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) :
498 m_cct(cct),
499 m_args(args),
500 m_local(new librados::Rados()),
501 m_cache_manager_handler(new CacheManagerHandler(cct)),
502 m_pool_meta_cache(new PoolMetaCache(cct)),
503 m_asok_hook(new MirrorAdminSocketHook(cct, this)) {
504 }
505
506 Mirror::~Mirror()
507 {
508 delete m_asok_hook;
509 }
510
511 void Mirror::handle_signal(int signum)
512 {
513 dout(20) << signum << dendl;
514
515 std::lock_guard l{m_lock};
516
517 switch (signum) {
518 case SIGHUP:
519 for (auto &it : m_pool_replayers) {
520 it.second->reopen_logs();
521 }
522 g_ceph_context->reopen_logs();
523 break;
524
525 case SIGINT:
526 case SIGTERM:
527 m_stopping = true;
528 m_cond.notify_all();
529 break;
530
531 default:
532 ceph_abort_msgf("unexpected signal %d", signum);
533 }
534 }
535
536 int Mirror::init()
537 {
538 int r = m_local->init_with_context(m_cct);
539 if (r < 0) {
540 derr << "could not initialize rados handle" << dendl;
541 return r;
542 }
543
544 r = m_local->connect();
545 if (r < 0) {
546 derr << "error connecting to local cluster" << dendl;
547 return r;
548 }
549
550 m_threads = &(m_cct->lookup_or_create_singleton_object<
551 Threads<librbd::ImageCtx>>("rbd_mirror::threads", false, m_local));
552 m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads));
553
554 r = m_service_daemon->init();
555 if (r < 0) {
556 derr << "error registering service daemon: " << cpp_strerror(r) << dendl;
557 return r;
558 }
559
560 m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock,
561 m_service_daemon.get()));
562 return r;
563 }
564
565 void Mirror::run()
566 {
567 dout(20) << "enter" << dendl;
568
569 using namespace std::chrono_literals;
570 utime_t next_refresh_pools = ceph_clock_now();
571
572 while (!m_stopping) {
573 utime_t now = ceph_clock_now();
574 bool refresh_pools = next_refresh_pools <= now;
575 if (refresh_pools) {
576 m_local_cluster_watcher->refresh_pools();
577 next_refresh_pools = ceph_clock_now();
578 next_refresh_pools += m_cct->_conf.get_val<uint64_t>(
579 "rbd_mirror_pool_replayers_refresh_interval");
580 }
581 std::unique_lock l{m_lock};
582 if (!m_manual_stop) {
583 if (refresh_pools) {
584 update_pool_replayers(m_local_cluster_watcher->get_pool_peers(),
585 m_local_cluster_watcher->get_site_name());
586 }
587 m_cache_manager_handler->run_cache_manager();
588 }
589 m_cond.wait_for(l, 1s);
590 }
591
592 // stop all pool replayers in parallel
593 std::lock_guard locker{m_lock};
594 for (auto &pool_replayer : m_pool_replayers) {
595 pool_replayer.second->stop(false);
596 }
597 dout(20) << "return" << dendl;
598 }
599
600 void Mirror::print_status(Formatter *f)
601 {
602 dout(20) << "enter" << dendl;
603
604 std::lock_guard l{m_lock};
605
606 if (m_stopping) {
607 return;
608 }
609
610 f->open_object_section("mirror_status");
611 f->open_array_section("pool_replayers");
612 for (auto &pool_replayer : m_pool_replayers) {
613 pool_replayer.second->print_status(f);
614 }
615 f->close_section();
616 f->close_section();
617 }
618
619 void Mirror::start()
620 {
621 dout(20) << "enter" << dendl;
622 std::lock_guard l{m_lock};
623
624 if (m_stopping) {
625 return;
626 }
627
628 m_manual_stop = false;
629
630 for (auto &pool_replayer : m_pool_replayers) {
631 pool_replayer.second->start();
632 }
633 }
634
635 void Mirror::stop()
636 {
637 dout(20) << "enter" << dendl;
638 std::lock_guard l{m_lock};
639
640 if (m_stopping) {
641 return;
642 }
643
644 m_manual_stop = true;
645
646 for (auto &pool_replayer : m_pool_replayers) {
647 pool_replayer.second->stop(true);
648 }
649 }
650
651 void Mirror::restart()
652 {
653 dout(20) << "enter" << dendl;
654 std::lock_guard l{m_lock};
655
656 if (m_stopping) {
657 return;
658 }
659
660 m_manual_stop = false;
661
662 for (auto &pool_replayer : m_pool_replayers) {
663 pool_replayer.second->restart();
664 }
665 }
666
667 void Mirror::flush()
668 {
669 dout(20) << "enter" << dendl;
670 std::lock_guard l{m_lock};
671
672 if (m_stopping || m_manual_stop) {
673 return;
674 }
675
676 for (auto &pool_replayer : m_pool_replayers) {
677 pool_replayer.second->flush();
678 }
679 }
680
681 void Mirror::release_leader()
682 {
683 dout(20) << "enter" << dendl;
684 std::lock_guard l{m_lock};
685
686 if (m_stopping) {
687 return;
688 }
689
690 for (auto &pool_replayer : m_pool_replayers) {
691 pool_replayer.second->release_leader();
692 }
693 }
694
695 void Mirror::update_pool_replayers(const PoolPeers &pool_peers,
696 const std::string& site_name)
697 {
698 dout(20) << "enter" << dendl;
699 ceph_assert(ceph_mutex_is_locked(m_lock));
700
701 // remove stale pool replayers before creating new pool replayers
702 for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) {
703 auto &peer = it->first.second;
704 auto pool_peer_it = pool_peers.find(it->first.first);
705 if (pool_peer_it == pool_peers.end() ||
706 pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
707 dout(20) << "removing pool replayer for " << peer << dendl;
708 // TODO: make async
709 it->second->shut_down();
710 it = m_pool_replayers.erase(it);
711 } else {
712 ++it;
713 }
714 }
715
716 for (auto &kv : pool_peers) {
717 for (auto &peer : kv.second) {
718 PoolPeer pool_peer(kv.first, peer);
719
720 auto pool_replayers_it = m_pool_replayers.find(pool_peer);
721 if (pool_replayers_it != m_pool_replayers.end()) {
722 auto& pool_replayer = pool_replayers_it->second;
723 if (!m_site_name.empty() && !site_name.empty() &&
724 m_site_name != site_name) {
725 dout(0) << "restarting pool replayer for " << peer << " due to "
726 << "updated site name" << dendl;
727 // TODO: make async
728 pool_replayer->shut_down();
729 pool_replayer->init(site_name);
730 } else if (pool_replayer->is_blocklisted()) {
731 derr << "restarting blocklisted pool replayer for " << peer << dendl;
732 // TODO: make async
733 pool_replayer->shut_down();
734 pool_replayer->init(site_name);
735 } else if (!pool_replayer->is_running()) {
736 derr << "restarting failed pool replayer for " << peer << dendl;
737 // TODO: make async
738 pool_replayer->shut_down();
739 pool_replayer->init(site_name);
740 }
741 } else {
742 dout(20) << "starting pool replayer for " << peer << dendl;
743 unique_ptr<PoolReplayer<>> pool_replayer(
744 new PoolReplayer<>(m_threads, m_service_daemon.get(),
745 m_cache_manager_handler.get(),
746 m_pool_meta_cache.get(), kv.first, peer,
747 m_args));
748
749 // TODO: make async
750 pool_replayer->init(site_name);
751 m_pool_replayers.emplace(pool_peer, std::move(pool_replayer));
752 }
753 }
754
755 // TODO currently only support a single peer
756 }
757
758 m_site_name = site_name;
759 }
760
761 } // namespace mirror
762 } // namespace rbd