]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
92f5a8d4 TL |
4 | #include <signal.h> |
5 | ||
7c673cae FG |
6 | #include <boost/range/adaptor/map.hpp> |
7 | ||
8 | #include "common/Formatter.h" | |
9f95a23c | 9 | #include "common/PriorityCache.h" |
7c673cae FG |
10 | #include "common/admin_socket.h" |
11 | #include "common/debug.h" | |
12 | #include "common/errno.h" | |
9f95a23c | 13 | #include "journal/Types.h" |
7c673cae | 14 | #include "librbd/ImageCtx.h" |
9f95a23c | 15 | #include "perfglue/heap_profiler.h" |
7c673cae | 16 | #include "Mirror.h" |
9f95a23c | 17 | #include "PoolMetaCache.h" |
c07f9fc5 | 18 | #include "ServiceDaemon.h" |
7c673cae | 19 | #include "Threads.h" |
7c673cae FG |
20 | |
21 | #define dout_context g_ceph_context | |
22 | #define dout_subsys ceph_subsys_rbd_mirror | |
7c673cae FG |
23 | |
24 | using std::list; | |
25 | using std::map; | |
26 | using std::set; | |
27 | using std::string; | |
28 | using std::unique_ptr; | |
29 | using std::vector; | |
30 | ||
31 | using librados::Rados; | |
32 | using librados::IoCtx; | |
33 | using librbd::mirror_peer_t; | |
34 | ||
35 | namespace rbd { | |
36 | namespace mirror { | |
37 | ||
38 | namespace { | |
39 | ||
40 | class MirrorAdminSocketCommand { | |
41 | public: | |
42 | virtual ~MirrorAdminSocketCommand() {} | |
9f95a23c | 43 | virtual int call(Formatter *f) = 0; |
7c673cae FG |
44 | }; |
45 | ||
46 | class StatusCommand : public MirrorAdminSocketCommand { | |
47 | public: | |
48 | explicit StatusCommand(Mirror *mirror) : mirror(mirror) {} | |
49 | ||
9f95a23c TL |
50 | int call(Formatter *f) override { |
51 | mirror->print_status(f); | |
52 | return 0; | |
7c673cae FG |
53 | } |
54 | ||
55 | private: | |
56 | Mirror *mirror; | |
57 | }; | |
58 | ||
59 | class StartCommand : public MirrorAdminSocketCommand { | |
60 | public: | |
61 | explicit StartCommand(Mirror *mirror) : mirror(mirror) {} | |
62 | ||
9f95a23c | 63 | int call(Formatter *f) override { |
7c673cae | 64 | mirror->start(); |
9f95a23c | 65 | return 0; |
7c673cae FG |
66 | } |
67 | ||
68 | private: | |
69 | Mirror *mirror; | |
70 | }; | |
71 | ||
72 | class StopCommand : public MirrorAdminSocketCommand { | |
73 | public: | |
74 | explicit StopCommand(Mirror *mirror) : mirror(mirror) {} | |
75 | ||
9f95a23c | 76 | int call(Formatter *f) override { |
7c673cae | 77 | mirror->stop(); |
9f95a23c | 78 | return 0; |
7c673cae FG |
79 | } |
80 | ||
81 | private: | |
82 | Mirror *mirror; | |
83 | }; | |
84 | ||
85 | class RestartCommand : public MirrorAdminSocketCommand { | |
86 | public: | |
87 | explicit RestartCommand(Mirror *mirror) : mirror(mirror) {} | |
88 | ||
9f95a23c | 89 | int call(Formatter *f) override { |
7c673cae | 90 | mirror->restart(); |
9f95a23c | 91 | return 0; |
7c673cae FG |
92 | } |
93 | ||
94 | private: | |
95 | Mirror *mirror; | |
96 | }; | |
97 | ||
98 | class FlushCommand : public MirrorAdminSocketCommand { | |
99 | public: | |
100 | explicit FlushCommand(Mirror *mirror) : mirror(mirror) {} | |
101 | ||
9f95a23c | 102 | int call(Formatter *f) override { |
7c673cae | 103 | mirror->flush(); |
9f95a23c | 104 | return 0; |
7c673cae FG |
105 | } |
106 | ||
107 | private: | |
108 | Mirror *mirror; | |
109 | }; | |
110 | ||
111 | class LeaderReleaseCommand : public MirrorAdminSocketCommand { | |
112 | public: | |
113 | explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {} | |
114 | ||
9f95a23c | 115 | int call(Formatter *f) override { |
7c673cae | 116 | mirror->release_leader(); |
9f95a23c | 117 | return 0; |
7c673cae FG |
118 | } |
119 | ||
120 | private: | |
121 | Mirror *mirror; | |
122 | }; | |
123 | ||
9f95a23c TL |
124 | #undef dout_prefix |
125 | #define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \ | |
126 | << m_name << " " << __func__ << ": " | |
127 | ||
128 | struct PriCache : public PriorityCache::PriCache { | |
129 | std::string m_name; | |
130 | int64_t m_base_cache_max_size; | |
131 | int64_t m_extra_cache_max_size; | |
132 | ||
133 | PriorityCache::Priority m_base_cache_pri = PriorityCache::Priority::PRI10; | |
134 | PriorityCache::Priority m_extra_cache_pri = PriorityCache::Priority::PRI10; | |
135 | int64_t m_base_cache_bytes = 0; | |
136 | int64_t m_extra_cache_bytes = 0; | |
137 | int64_t m_committed_bytes = 0; | |
138 | double m_cache_ratio = 0; | |
139 | ||
140 | PriCache(const std::string &name, uint64_t min_size, uint64_t max_size) | |
141 | : m_name(name), m_base_cache_max_size(min_size), | |
142 | m_extra_cache_max_size(max_size - min_size) { | |
143 | ceph_assert(max_size >= min_size); | |
144 | } | |
145 | ||
146 | void prioritize() { | |
147 | if (m_base_cache_pri == PriorityCache::Priority::PRI0) { | |
148 | return; | |
149 | } | |
150 | auto pri = static_cast<uint8_t>(m_base_cache_pri); | |
151 | m_base_cache_pri = static_cast<PriorityCache::Priority>(--pri); | |
152 | ||
153 | dout(30) << m_base_cache_pri << dendl; | |
154 | } | |
155 | ||
156 | int64_t request_cache_bytes(PriorityCache::Priority pri, | |
157 | uint64_t total_cache) const override { | |
158 | int64_t cache_bytes = 0; | |
159 | ||
160 | if (pri == m_base_cache_pri) { | |
161 | cache_bytes += m_base_cache_max_size; | |
162 | } | |
163 | if (pri == m_extra_cache_pri) { | |
164 | cache_bytes += m_extra_cache_max_size; | |
165 | } | |
166 | ||
167 | dout(30) << cache_bytes << dendl; | |
168 | ||
169 | return cache_bytes; | |
170 | } | |
171 | ||
172 | int64_t get_cache_bytes(PriorityCache::Priority pri) const override { | |
173 | int64_t cache_bytes = 0; | |
174 | ||
175 | if (pri == m_base_cache_pri) { | |
176 | cache_bytes += m_base_cache_bytes; | |
177 | } | |
178 | if (pri == m_extra_cache_pri) { | |
179 | cache_bytes += m_extra_cache_bytes; | |
180 | } | |
181 | ||
182 | dout(30) << "pri=" << pri << " " << cache_bytes << dendl; | |
183 | ||
184 | return cache_bytes; | |
185 | } | |
186 | ||
187 | int64_t get_cache_bytes() const override { | |
188 | auto cache_bytes = m_base_cache_bytes + m_extra_cache_bytes; | |
189 | ||
190 | dout(30) << m_base_cache_bytes << "+" << m_extra_cache_bytes << "=" | |
191 | << cache_bytes << dendl; | |
192 | ||
193 | return cache_bytes; | |
194 | } | |
195 | ||
196 | void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override { | |
197 | ceph_assert(bytes >= 0); | |
198 | ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri || | |
199 | bytes == 0); | |
200 | ||
201 | dout(30) << "pri=" << pri << " " << bytes << dendl; | |
202 | ||
203 | if (pri == m_base_cache_pri) { | |
204 | m_base_cache_bytes = std::min(m_base_cache_max_size, bytes); | |
205 | bytes -= std::min(m_base_cache_bytes, bytes); | |
206 | } | |
207 | ||
208 | if (pri == m_extra_cache_pri) { | |
209 | m_extra_cache_bytes = bytes; | |
210 | } | |
211 | } | |
212 | ||
213 | void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override { | |
214 | ceph_assert(bytes >= 0); | |
215 | ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri); | |
216 | ||
217 | dout(30) << "pri=" << pri << " " << bytes << dendl; | |
218 | ||
219 | if (pri == m_base_cache_pri) { | |
220 | ceph_assert(m_base_cache_bytes <= m_base_cache_max_size); | |
221 | ||
222 | auto chunk = std::min(m_base_cache_max_size - m_base_cache_bytes, bytes); | |
223 | m_base_cache_bytes += chunk; | |
224 | bytes -= chunk; | |
225 | } | |
226 | ||
227 | if (pri == m_extra_cache_pri) { | |
228 | m_extra_cache_bytes += bytes; | |
229 | } | |
230 | } | |
231 | ||
232 | int64_t commit_cache_size(uint64_t total_cache) override { | |
233 | m_committed_bytes = p2roundup<int64_t>(get_cache_bytes(), 4096); | |
234 | ||
235 | dout(30) << m_committed_bytes << dendl; | |
236 | ||
237 | return m_committed_bytes; | |
238 | } | |
239 | ||
240 | int64_t get_committed_size() const override { | |
241 | dout(30) << m_committed_bytes << dendl; | |
242 | ||
243 | return m_committed_bytes; | |
244 | } | |
245 | ||
246 | double get_cache_ratio() const override { | |
247 | dout(30) << m_cache_ratio << dendl; | |
248 | ||
249 | return m_cache_ratio; | |
250 | } | |
251 | ||
252 | void set_cache_ratio(double ratio) override { | |
253 | dout(30) << m_cache_ratio << dendl; | |
254 | ||
255 | m_cache_ratio = ratio; | |
256 | } | |
257 | ||
258 | std::string get_cache_name() const override { | |
259 | return m_name; | |
260 | } | |
261 | }; | |
262 | ||
7c673cae FG |
263 | } // anonymous namespace |
264 | ||
9f95a23c TL |
265 | #undef dout_prefix |
266 | #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \ | |
267 | << __func__ << ": " | |
268 | ||
7c673cae FG |
269 | class MirrorAdminSocketHook : public AdminSocketHook { |
270 | public: | |
271 | MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) : | |
272 | admin_socket(cct->get_admin_socket()) { | |
273 | std::string command; | |
274 | int r; | |
275 | ||
276 | command = "rbd mirror status"; | |
9f95a23c | 277 | r = admin_socket->register_command(command, this, |
7c673cae FG |
278 | "get status for rbd mirror"); |
279 | if (r == 0) { | |
280 | commands[command] = new StatusCommand(mirror); | |
281 | } | |
282 | ||
283 | command = "rbd mirror start"; | |
9f95a23c | 284 | r = admin_socket->register_command(command, this, |
7c673cae FG |
285 | "start rbd mirror"); |
286 | if (r == 0) { | |
287 | commands[command] = new StartCommand(mirror); | |
288 | } | |
289 | ||
290 | command = "rbd mirror stop"; | |
9f95a23c | 291 | r = admin_socket->register_command(command, this, |
7c673cae FG |
292 | "stop rbd mirror"); |
293 | if (r == 0) { | |
294 | commands[command] = new StopCommand(mirror); | |
295 | } | |
296 | ||
297 | command = "rbd mirror restart"; | |
9f95a23c | 298 | r = admin_socket->register_command(command, this, |
7c673cae FG |
299 | "restart rbd mirror"); |
300 | if (r == 0) { | |
301 | commands[command] = new RestartCommand(mirror); | |
302 | } | |
303 | ||
304 | command = "rbd mirror flush"; | |
9f95a23c | 305 | r = admin_socket->register_command(command, this, |
7c673cae FG |
306 | "flush rbd mirror"); |
307 | if (r == 0) { | |
308 | commands[command] = new FlushCommand(mirror); | |
309 | } | |
310 | ||
311 | command = "rbd mirror leader release"; | |
9f95a23c | 312 | r = admin_socket->register_command(command, this, |
7c673cae FG |
313 | "release rbd mirror leader"); |
314 | if (r == 0) { | |
315 | commands[command] = new LeaderReleaseCommand(mirror); | |
316 | } | |
317 | } | |
318 | ||
319 | ~MirrorAdminSocketHook() override { | |
9f95a23c | 320 | (void)admin_socket->unregister_commands(this); |
7c673cae FG |
321 | for (Commands::const_iterator i = commands.begin(); i != commands.end(); |
322 | ++i) { | |
7c673cae FG |
323 | delete i->second; |
324 | } | |
325 | } | |
326 | ||
9f95a23c TL |
327 | int call(std::string_view command, const cmdmap_t& cmdmap, |
328 | Formatter *f, | |
329 | std::ostream& errss, | |
330 | bufferlist& out) override { | |
7c673cae | 331 | Commands::const_iterator i = commands.find(command); |
11fdf7f2 | 332 | ceph_assert(i != commands.end()); |
9f95a23c | 333 | return i->second->call(f); |
7c673cae FG |
334 | } |
335 | ||
336 | private: | |
11fdf7f2 | 337 | typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands; |
7c673cae FG |
338 | |
339 | AdminSocket *admin_socket; | |
340 | Commands commands; | |
341 | }; | |
342 | ||
9f95a23c TL |
343 | class CacheManagerHandler : public journal::CacheManagerHandler { |
344 | public: | |
345 | CacheManagerHandler(CephContext *cct) | |
346 | : m_cct(cct) { | |
347 | ||
348 | if (!m_cct->_conf.get_val<bool>("rbd_mirror_memory_autotune")) { | |
349 | return; | |
350 | } | |
351 | ||
352 | uint64_t base = m_cct->_conf.get_val<Option::size_t>( | |
353 | "rbd_mirror_memory_base"); | |
354 | double fragmentation = m_cct->_conf.get_val<double>( | |
355 | "rbd_mirror_memory_expected_fragmentation"); | |
356 | uint64_t target = m_cct->_conf.get_val<Option::size_t>( | |
357 | "rbd_mirror_memory_target"); | |
358 | uint64_t min = m_cct->_conf.get_val<Option::size_t>( | |
359 | "rbd_mirror_memory_cache_min"); | |
360 | uint64_t max = min; | |
361 | ||
362 | // When setting the maximum amount of memory to use for cache, first | |
363 | // assume some base amount of memory for the daemon and then fudge in | |
364 | // some overhead for fragmentation that scales with cache usage. | |
365 | uint64_t ltarget = (1.0 - fragmentation) * target; | |
366 | if (ltarget > base + min) { | |
367 | max = ltarget - base; | |
368 | } | |
369 | ||
370 | m_next_balance = ceph_clock_now(); | |
371 | m_next_resize = ceph_clock_now(); | |
372 | ||
373 | m_cache_manager = std::make_unique<PriorityCache::Manager>( | |
374 | m_cct, min, max, target, false); | |
375 | } | |
376 | ||
377 | ~CacheManagerHandler() { | |
378 | std::lock_guard locker{m_lock}; | |
379 | ||
380 | ceph_assert(m_caches.empty()); | |
381 | } | |
382 | ||
383 | void register_cache(const std::string &cache_name, | |
384 | uint64_t min_size, uint64_t max_size, | |
385 | journal::CacheRebalanceHandler* handler) override { | |
386 | if (!m_cache_manager) { | |
387 | handler->handle_cache_rebalanced(max_size); | |
388 | return; | |
389 | } | |
390 | ||
391 | dout(20) << cache_name << " min_size=" << min_size << " max_size=" | |
392 | << max_size << " handler=" << handler << dendl; | |
393 | ||
394 | std::lock_guard locker{m_lock}; | |
395 | ||
396 | auto p = m_caches.insert( | |
397 | {cache_name, {cache_name, min_size, max_size, handler}}); | |
398 | ceph_assert(p.second == true); | |
399 | ||
400 | m_cache_manager->insert(cache_name, p.first->second.pri_cache, false); | |
401 | m_next_balance = ceph_clock_now(); | |
402 | } | |
403 | ||
404 | void unregister_cache(const std::string &cache_name) override { | |
405 | if (!m_cache_manager) { | |
406 | return; | |
407 | } | |
408 | ||
409 | dout(20) << cache_name << dendl; | |
410 | ||
411 | std::lock_guard locker{m_lock}; | |
412 | ||
413 | auto it = m_caches.find(cache_name); | |
414 | ceph_assert(it != m_caches.end()); | |
415 | ||
416 | m_cache_manager->erase(cache_name); | |
417 | m_caches.erase(it); | |
418 | m_next_balance = ceph_clock_now(); | |
419 | } | |
420 | ||
421 | void run_cache_manager() { | |
422 | if (!m_cache_manager) { | |
423 | return; | |
424 | } | |
425 | ||
426 | std::lock_guard locker{m_lock}; | |
427 | ||
428 | // Before we trim, check and see if it's time to rebalance/resize. | |
429 | auto autotune_interval = m_cct->_conf.get_val<double>( | |
430 | "rbd_mirror_memory_cache_autotune_interval"); | |
431 | auto resize_interval = m_cct->_conf.get_val<double>( | |
432 | "rbd_mirror_memory_cache_resize_interval"); | |
433 | ||
434 | utime_t now = ceph_clock_now(); | |
435 | ||
436 | if (autotune_interval > 0 && m_next_balance <= now) { | |
437 | dout(20) << "balance" << dendl; | |
438 | m_cache_manager->balance(); | |
439 | ||
440 | for (auto &it : m_caches) { | |
441 | auto pri_cache = static_cast<PriCache *>(it.second.pri_cache.get()); | |
442 | auto new_cache_bytes = pri_cache->get_cache_bytes(); | |
443 | it.second.handler->handle_cache_rebalanced(new_cache_bytes); | |
444 | pri_cache->prioritize(); | |
445 | } | |
446 | ||
447 | m_next_balance = ceph_clock_now(); | |
448 | m_next_balance += autotune_interval; | |
449 | } | |
450 | ||
451 | if (resize_interval > 0 && m_next_resize < now) { | |
452 | if (ceph_using_tcmalloc()) { | |
453 | dout(20) << "tune memory" << dendl; | |
454 | m_cache_manager->tune_memory(); | |
455 | } | |
456 | ||
457 | m_next_resize = ceph_clock_now(); | |
458 | m_next_resize += resize_interval; | |
459 | } | |
460 | } | |
461 | ||
462 | private: | |
463 | struct Cache { | |
464 | std::shared_ptr<PriorityCache::PriCache> pri_cache; | |
465 | journal::CacheRebalanceHandler *handler; | |
466 | ||
467 | Cache(const std::string name, uint64_t min_size, uint64_t max_size, | |
468 | journal::CacheRebalanceHandler *handler) | |
469 | : pri_cache(new PriCache(name, min_size, max_size)), handler(handler) { | |
470 | } | |
471 | }; | |
472 | ||
473 | CephContext *m_cct; | |
474 | ||
475 | mutable ceph::mutex m_lock = | |
476 | ceph::make_mutex("rbd::mirror::CacheManagerHandler"); | |
477 | std::unique_ptr<PriorityCache::Manager> m_cache_manager; | |
478 | std::map<std::string, Cache> m_caches; | |
479 | ||
480 | utime_t m_next_balance; | |
481 | utime_t m_next_resize; | |
482 | }; | |
483 | ||
7c673cae FG |
484 | Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) : |
485 | m_cct(cct), | |
486 | m_args(args), | |
7c673cae | 487 | m_local(new librados::Rados()), |
9f95a23c TL |
488 | m_cache_manager_handler(new CacheManagerHandler(cct)), |
489 | m_pool_meta_cache(new PoolMetaCache(cct)), | |
7c673cae FG |
490 | m_asok_hook(new MirrorAdminSocketHook(cct, this)) |
491 | { | |
11fdf7f2 TL |
492 | m_threads = |
493 | &(cct->lookup_or_create_singleton_object<Threads<librbd::ImageCtx>>( | |
494 | "rbd_mirror::threads", false, cct)); | |
c07f9fc5 | 495 | m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads)); |
7c673cae FG |
496 | } |
497 | ||
498 | Mirror::~Mirror() | |
499 | { | |
500 | delete m_asok_hook; | |
501 | } | |
502 | ||
503 | void Mirror::handle_signal(int signum) | |
504 | { | |
92f5a8d4 TL |
505 | dout(20) << signum << dendl; |
506 | ||
9f95a23c | 507 | std::lock_guard l{m_lock}; |
92f5a8d4 TL |
508 | |
509 | switch (signum) { | |
510 | case SIGHUP: | |
511 | for (auto &it : m_pool_replayers) { | |
512 | it.second->reopen_logs(); | |
513 | } | |
514 | g_ceph_context->reopen_logs(); | |
515 | break; | |
516 | ||
517 | case SIGINT: | |
518 | case SIGTERM: | |
519 | m_stopping = true; | |
9f95a23c | 520 | m_cond.notify_all(); |
92f5a8d4 TL |
521 | break; |
522 | ||
523 | default: | |
524 | ceph_abort_msgf("unexpected signal %d", signum); | |
7c673cae FG |
525 | } |
526 | } | |
527 | ||
528 | int Mirror::init() | |
529 | { | |
530 | int r = m_local->init_with_context(m_cct); | |
531 | if (r < 0) { | |
532 | derr << "could not initialize rados handle" << dendl; | |
533 | return r; | |
534 | } | |
535 | ||
536 | r = m_local->connect(); | |
537 | if (r < 0) { | |
538 | derr << "error connecting to local cluster" << dendl; | |
539 | return r; | |
540 | } | |
541 | ||
c07f9fc5 FG |
542 | r = m_service_daemon->init(); |
543 | if (r < 0) { | |
544 | derr << "error registering service daemon: " << cpp_strerror(r) << dendl; | |
545 | return r; | |
546 | } | |
7c673cae | 547 | |
c07f9fc5 FG |
548 | m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock, |
549 | m_service_daemon.get())); | |
7c673cae FG |
550 | return r; |
551 | } | |
552 | ||
553 | void Mirror::run() | |
554 | { | |
555 | dout(20) << "enter" << dendl; | |
9f95a23c TL |
556 | |
557 | utime_t next_refresh_pools = ceph_clock_now(); | |
558 | ||
7c673cae | 559 | while (!m_stopping) { |
9f95a23c TL |
560 | utime_t now = ceph_clock_now(); |
561 | bool refresh_pools = next_refresh_pools <= now; | |
562 | if (refresh_pools) { | |
563 | m_local_cluster_watcher->refresh_pools(); | |
564 | next_refresh_pools = ceph_clock_now(); | |
565 | next_refresh_pools += m_cct->_conf.get_val<uint64_t>( | |
566 | "rbd_mirror_pool_replayers_refresh_interval"); | |
567 | } | |
568 | std::unique_lock l{m_lock}; | |
7c673cae | 569 | if (!m_manual_stop) { |
9f95a23c TL |
570 | if (refresh_pools) { |
571 | update_pool_replayers(m_local_cluster_watcher->get_pool_peers(), | |
572 | m_local_cluster_watcher->get_site_name()); | |
573 | } | |
574 | m_cache_manager_handler->run_cache_manager(); | |
7c673cae | 575 | } |
9f95a23c | 576 | m_cond.wait_for(l, 1s); |
7c673cae FG |
577 | } |
578 | ||
579 | // stop all pool replayers in parallel | |
9f95a23c | 580 | std::lock_guard locker{m_lock}; |
7c673cae FG |
581 | for (auto &pool_replayer : m_pool_replayers) { |
582 | pool_replayer.second->stop(false); | |
583 | } | |
584 | dout(20) << "return" << dendl; | |
585 | } | |
586 | ||
9f95a23c | 587 | void Mirror::print_status(Formatter *f) |
7c673cae FG |
588 | { |
589 | dout(20) << "enter" << dendl; | |
590 | ||
9f95a23c | 591 | std::lock_guard l{m_lock}; |
7c673cae FG |
592 | |
593 | if (m_stopping) { | |
594 | return; | |
595 | } | |
596 | ||
9f95a23c TL |
597 | f->open_object_section("mirror_status"); |
598 | f->open_array_section("pool_replayers"); | |
7c673cae | 599 | for (auto &pool_replayer : m_pool_replayers) { |
9f95a23c | 600 | pool_replayer.second->print_status(f); |
7c673cae | 601 | } |
9f95a23c TL |
602 | f->close_section(); |
603 | f->close_section(); | |
7c673cae FG |
604 | } |
605 | ||
606 | void Mirror::start() | |
607 | { | |
608 | dout(20) << "enter" << dendl; | |
9f95a23c | 609 | std::lock_guard l{m_lock}; |
7c673cae FG |
610 | |
611 | if (m_stopping) { | |
612 | return; | |
613 | } | |
614 | ||
615 | m_manual_stop = false; | |
616 | ||
617 | for (auto &pool_replayer : m_pool_replayers) { | |
618 | pool_replayer.second->start(); | |
619 | } | |
620 | } | |
621 | ||
622 | void Mirror::stop() | |
623 | { | |
624 | dout(20) << "enter" << dendl; | |
9f95a23c | 625 | std::lock_guard l{m_lock}; |
7c673cae FG |
626 | |
627 | if (m_stopping) { | |
628 | return; | |
629 | } | |
630 | ||
631 | m_manual_stop = true; | |
632 | ||
633 | for (auto &pool_replayer : m_pool_replayers) { | |
634 | pool_replayer.second->stop(true); | |
635 | } | |
636 | } | |
637 | ||
638 | void Mirror::restart() | |
639 | { | |
640 | dout(20) << "enter" << dendl; | |
9f95a23c | 641 | std::lock_guard l{m_lock}; |
7c673cae FG |
642 | |
643 | if (m_stopping) { | |
644 | return; | |
645 | } | |
646 | ||
647 | m_manual_stop = false; | |
648 | ||
649 | for (auto &pool_replayer : m_pool_replayers) { | |
650 | pool_replayer.second->restart(); | |
651 | } | |
652 | } | |
653 | ||
654 | void Mirror::flush() | |
655 | { | |
656 | dout(20) << "enter" << dendl; | |
9f95a23c | 657 | std::lock_guard l{m_lock}; |
7c673cae FG |
658 | |
659 | if (m_stopping || m_manual_stop) { | |
660 | return; | |
661 | } | |
662 | ||
663 | for (auto &pool_replayer : m_pool_replayers) { | |
664 | pool_replayer.second->flush(); | |
665 | } | |
666 | } | |
667 | ||
668 | void Mirror::release_leader() | |
669 | { | |
670 | dout(20) << "enter" << dendl; | |
9f95a23c | 671 | std::lock_guard l{m_lock}; |
7c673cae FG |
672 | |
673 | if (m_stopping) { | |
674 | return; | |
675 | } | |
676 | ||
677 | for (auto &pool_replayer : m_pool_replayers) { | |
678 | pool_replayer.second->release_leader(); | |
679 | } | |
680 | } | |
681 | ||
9f95a23c TL |
682 | void Mirror::update_pool_replayers(const PoolPeers &pool_peers, |
683 | const std::string& site_name) | |
7c673cae FG |
684 | { |
685 | dout(20) << "enter" << dendl; | |
9f95a23c | 686 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
687 | |
688 | // remove stale pool replayers before creating new pool replayers | |
689 | for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) { | |
690 | auto &peer = it->first.second; | |
691 | auto pool_peer_it = pool_peers.find(it->first.first); | |
c07f9fc5 FG |
692 | if (pool_peer_it == pool_peers.end() || |
693 | pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { | |
7c673cae FG |
694 | dout(20) << "removing pool replayer for " << peer << dendl; |
695 | // TODO: make async | |
c07f9fc5 | 696 | it->second->shut_down(); |
7c673cae FG |
697 | it = m_pool_replayers.erase(it); |
698 | } else { | |
699 | ++it; | |
700 | } | |
701 | } | |
702 | ||
703 | for (auto &kv : pool_peers) { | |
704 | for (auto &peer : kv.second) { | |
705 | PoolPeer pool_peer(kv.first, peer); | |
c07f9fc5 FG |
706 | |
707 | auto pool_replayers_it = m_pool_replayers.find(pool_peer); | |
708 | if (pool_replayers_it != m_pool_replayers.end()) { | |
709 | auto& pool_replayer = pool_replayers_it->second; | |
9f95a23c TL |
710 | if (!m_site_name.empty() && !site_name.empty() && |
711 | m_site_name != site_name) { | |
712 | dout(0) << "restarting pool replayer for " << peer << " due to " | |
713 | << "updated site name" << dendl; | |
714 | // TODO: make async | |
715 | pool_replayer->shut_down(); | |
716 | pool_replayer->init(site_name); | |
717 | } else if (pool_replayer->is_blacklisted()) { | |
c07f9fc5 FG |
718 | derr << "restarting blacklisted pool replayer for " << peer << dendl; |
719 | // TODO: make async | |
720 | pool_replayer->shut_down(); | |
9f95a23c | 721 | pool_replayer->init(site_name); |
c07f9fc5 FG |
722 | } else if (!pool_replayer->is_running()) { |
723 | derr << "restarting failed pool replayer for " << peer << dendl; | |
724 | // TODO: make async | |
725 | pool_replayer->shut_down(); | |
9f95a23c | 726 | pool_replayer->init(site_name); |
c07f9fc5 FG |
727 | } |
728 | } else { | |
7c673cae | 729 | dout(20) << "starting pool replayer for " << peer << dendl; |
9f95a23c TL |
730 | unique_ptr<PoolReplayer<>> pool_replayer( |
731 | new PoolReplayer<>(m_threads, m_service_daemon.get(), | |
732 | m_cache_manager_handler.get(), | |
733 | m_pool_meta_cache.get(), kv.first, peer, | |
734 | m_args)); | |
7c673cae | 735 | |
c07f9fc5 | 736 | // TODO: make async |
9f95a23c | 737 | pool_replayer->init(site_name); |
7c673cae FG |
738 | m_pool_replayers.emplace(pool_peer, std::move(pool_replayer)); |
739 | } | |
740 | } | |
741 | ||
742 | // TODO currently only support a single peer | |
743 | } | |
9f95a23c TL |
744 | |
745 | m_site_name = site_name; | |
7c673cae FG |
746 | } |
747 | ||
748 | } // namespace mirror | |
749 | } // namespace rbd |