]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
92f5a8d4 TL |
4 | #include <signal.h> |
5 | ||
7c673cae FG |
6 | #include <boost/range/adaptor/map.hpp> |
7 | ||
8 | #include "common/Formatter.h" | |
9f95a23c | 9 | #include "common/PriorityCache.h" |
7c673cae FG |
10 | #include "common/admin_socket.h" |
11 | #include "common/debug.h" | |
12 | #include "common/errno.h" | |
9f95a23c | 13 | #include "journal/Types.h" |
7c673cae | 14 | #include "librbd/ImageCtx.h" |
9f95a23c | 15 | #include "perfglue/heap_profiler.h" |
7c673cae | 16 | #include "Mirror.h" |
9f95a23c | 17 | #include "PoolMetaCache.h" |
c07f9fc5 | 18 | #include "ServiceDaemon.h" |
7c673cae | 19 | #include "Threads.h" |
7c673cae FG |
20 | |
21 | #define dout_context g_ceph_context | |
22 | #define dout_subsys ceph_subsys_rbd_mirror | |
7c673cae FG |
23 | |
24 | using std::list; | |
25 | using std::map; | |
26 | using std::set; | |
27 | using std::string; | |
28 | using std::unique_ptr; | |
29 | using std::vector; | |
30 | ||
31 | using librados::Rados; | |
32 | using librados::IoCtx; | |
33 | using librbd::mirror_peer_t; | |
34 | ||
35 | namespace rbd { | |
36 | namespace mirror { | |
37 | ||
38 | namespace { | |
39 | ||
40 | class MirrorAdminSocketCommand { | |
41 | public: | |
42 | virtual ~MirrorAdminSocketCommand() {} | |
9f95a23c | 43 | virtual int call(Formatter *f) = 0; |
7c673cae FG |
44 | }; |
45 | ||
46 | class StatusCommand : public MirrorAdminSocketCommand { | |
47 | public: | |
48 | explicit StatusCommand(Mirror *mirror) : mirror(mirror) {} | |
49 | ||
9f95a23c TL |
50 | int call(Formatter *f) override { |
51 | mirror->print_status(f); | |
52 | return 0; | |
7c673cae FG |
53 | } |
54 | ||
55 | private: | |
56 | Mirror *mirror; | |
57 | }; | |
58 | ||
59 | class StartCommand : public MirrorAdminSocketCommand { | |
60 | public: | |
61 | explicit StartCommand(Mirror *mirror) : mirror(mirror) {} | |
62 | ||
9f95a23c | 63 | int call(Formatter *f) override { |
7c673cae | 64 | mirror->start(); |
9f95a23c | 65 | return 0; |
7c673cae FG |
66 | } |
67 | ||
68 | private: | |
69 | Mirror *mirror; | |
70 | }; | |
71 | ||
72 | class StopCommand : public MirrorAdminSocketCommand { | |
73 | public: | |
74 | explicit StopCommand(Mirror *mirror) : mirror(mirror) {} | |
75 | ||
9f95a23c | 76 | int call(Formatter *f) override { |
7c673cae | 77 | mirror->stop(); |
9f95a23c | 78 | return 0; |
7c673cae FG |
79 | } |
80 | ||
81 | private: | |
82 | Mirror *mirror; | |
83 | }; | |
84 | ||
85 | class RestartCommand : public MirrorAdminSocketCommand { | |
86 | public: | |
87 | explicit RestartCommand(Mirror *mirror) : mirror(mirror) {} | |
88 | ||
9f95a23c | 89 | int call(Formatter *f) override { |
7c673cae | 90 | mirror->restart(); |
9f95a23c | 91 | return 0; |
7c673cae FG |
92 | } |
93 | ||
94 | private: | |
95 | Mirror *mirror; | |
96 | }; | |
97 | ||
98 | class FlushCommand : public MirrorAdminSocketCommand { | |
99 | public: | |
100 | explicit FlushCommand(Mirror *mirror) : mirror(mirror) {} | |
101 | ||
9f95a23c | 102 | int call(Formatter *f) override { |
7c673cae | 103 | mirror->flush(); |
9f95a23c | 104 | return 0; |
7c673cae FG |
105 | } |
106 | ||
107 | private: | |
108 | Mirror *mirror; | |
109 | }; | |
110 | ||
111 | class LeaderReleaseCommand : public MirrorAdminSocketCommand { | |
112 | public: | |
113 | explicit LeaderReleaseCommand(Mirror *mirror) : mirror(mirror) {} | |
114 | ||
9f95a23c | 115 | int call(Formatter *f) override { |
7c673cae | 116 | mirror->release_leader(); |
9f95a23c | 117 | return 0; |
7c673cae FG |
118 | } |
119 | ||
120 | private: | |
121 | Mirror *mirror; | |
122 | }; | |
123 | ||
9f95a23c TL |
124 | #undef dout_prefix |
125 | #define dout_prefix *_dout << "rbd::mirror::PriCache: " << this << " " \ | |
126 | << m_name << " " << __func__ << ": " | |
127 | ||
128 | struct PriCache : public PriorityCache::PriCache { | |
129 | std::string m_name; | |
130 | int64_t m_base_cache_max_size; | |
131 | int64_t m_extra_cache_max_size; | |
132 | ||
133 | PriorityCache::Priority m_base_cache_pri = PriorityCache::Priority::PRI10; | |
134 | PriorityCache::Priority m_extra_cache_pri = PriorityCache::Priority::PRI10; | |
135 | int64_t m_base_cache_bytes = 0; | |
136 | int64_t m_extra_cache_bytes = 0; | |
137 | int64_t m_committed_bytes = 0; | |
138 | double m_cache_ratio = 0; | |
139 | ||
140 | PriCache(const std::string &name, uint64_t min_size, uint64_t max_size) | |
141 | : m_name(name), m_base_cache_max_size(min_size), | |
142 | m_extra_cache_max_size(max_size - min_size) { | |
143 | ceph_assert(max_size >= min_size); | |
144 | } | |
145 | ||
146 | void prioritize() { | |
147 | if (m_base_cache_pri == PriorityCache::Priority::PRI0) { | |
148 | return; | |
149 | } | |
150 | auto pri = static_cast<uint8_t>(m_base_cache_pri); | |
151 | m_base_cache_pri = static_cast<PriorityCache::Priority>(--pri); | |
152 | ||
153 | dout(30) << m_base_cache_pri << dendl; | |
154 | } | |
155 | ||
156 | int64_t request_cache_bytes(PriorityCache::Priority pri, | |
157 | uint64_t total_cache) const override { | |
158 | int64_t cache_bytes = 0; | |
159 | ||
160 | if (pri == m_base_cache_pri) { | |
161 | cache_bytes += m_base_cache_max_size; | |
162 | } | |
163 | if (pri == m_extra_cache_pri) { | |
164 | cache_bytes += m_extra_cache_max_size; | |
165 | } | |
166 | ||
167 | dout(30) << cache_bytes << dendl; | |
168 | ||
169 | return cache_bytes; | |
170 | } | |
171 | ||
172 | int64_t get_cache_bytes(PriorityCache::Priority pri) const override { | |
173 | int64_t cache_bytes = 0; | |
174 | ||
175 | if (pri == m_base_cache_pri) { | |
176 | cache_bytes += m_base_cache_bytes; | |
177 | } | |
178 | if (pri == m_extra_cache_pri) { | |
179 | cache_bytes += m_extra_cache_bytes; | |
180 | } | |
181 | ||
182 | dout(30) << "pri=" << pri << " " << cache_bytes << dendl; | |
183 | ||
184 | return cache_bytes; | |
185 | } | |
186 | ||
187 | int64_t get_cache_bytes() const override { | |
188 | auto cache_bytes = m_base_cache_bytes + m_extra_cache_bytes; | |
189 | ||
190 | dout(30) << m_base_cache_bytes << "+" << m_extra_cache_bytes << "=" | |
191 | << cache_bytes << dendl; | |
192 | ||
193 | return cache_bytes; | |
194 | } | |
195 | ||
196 | void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override { | |
197 | ceph_assert(bytes >= 0); | |
198 | ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri || | |
199 | bytes == 0); | |
200 | ||
201 | dout(30) << "pri=" << pri << " " << bytes << dendl; | |
202 | ||
203 | if (pri == m_base_cache_pri) { | |
204 | m_base_cache_bytes = std::min(m_base_cache_max_size, bytes); | |
205 | bytes -= std::min(m_base_cache_bytes, bytes); | |
206 | } | |
207 | ||
208 | if (pri == m_extra_cache_pri) { | |
209 | m_extra_cache_bytes = bytes; | |
210 | } | |
211 | } | |
212 | ||
213 | void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) override { | |
214 | ceph_assert(bytes >= 0); | |
215 | ceph_assert(pri == m_base_cache_pri || pri == m_extra_cache_pri); | |
216 | ||
217 | dout(30) << "pri=" << pri << " " << bytes << dendl; | |
218 | ||
219 | if (pri == m_base_cache_pri) { | |
220 | ceph_assert(m_base_cache_bytes <= m_base_cache_max_size); | |
221 | ||
222 | auto chunk = std::min(m_base_cache_max_size - m_base_cache_bytes, bytes); | |
223 | m_base_cache_bytes += chunk; | |
224 | bytes -= chunk; | |
225 | } | |
226 | ||
227 | if (pri == m_extra_cache_pri) { | |
228 | m_extra_cache_bytes += bytes; | |
229 | } | |
230 | } | |
231 | ||
232 | int64_t commit_cache_size(uint64_t total_cache) override { | |
233 | m_committed_bytes = p2roundup<int64_t>(get_cache_bytes(), 4096); | |
234 | ||
235 | dout(30) << m_committed_bytes << dendl; | |
236 | ||
237 | return m_committed_bytes; | |
238 | } | |
239 | ||
240 | int64_t get_committed_size() const override { | |
241 | dout(30) << m_committed_bytes << dendl; | |
242 | ||
243 | return m_committed_bytes; | |
244 | } | |
245 | ||
246 | double get_cache_ratio() const override { | |
247 | dout(30) << m_cache_ratio << dendl; | |
248 | ||
249 | return m_cache_ratio; | |
250 | } | |
251 | ||
252 | void set_cache_ratio(double ratio) override { | |
253 | dout(30) << m_cache_ratio << dendl; | |
254 | ||
255 | m_cache_ratio = ratio; | |
256 | } | |
257 | ||
20effc67 TL |
258 | void shift_bins() override { |
259 | } | |
260 | ||
261 | void import_bins(const std::vector<uint64_t> &intervals) override { | |
262 | } | |
263 | ||
264 | void set_bins(PriorityCache::Priority pri, uint64_t end_interval) override { | |
265 | } | |
266 | ||
267 | uint64_t get_bins(PriorityCache::Priority pri) const override { | |
268 | return 0; | |
269 | } | |
270 | ||
9f95a23c TL |
271 | std::string get_cache_name() const override { |
272 | return m_name; | |
273 | } | |
274 | }; | |
275 | ||
7c673cae FG |
276 | } // anonymous namespace |
277 | ||
9f95a23c TL |
278 | #undef dout_prefix |
279 | #define dout_prefix *_dout << "rbd::mirror::Mirror: " << this << " " \ | |
280 | << __func__ << ": " | |
281 | ||
7c673cae FG |
282 | class MirrorAdminSocketHook : public AdminSocketHook { |
283 | public: | |
284 | MirrorAdminSocketHook(CephContext *cct, Mirror *mirror) : | |
285 | admin_socket(cct->get_admin_socket()) { | |
286 | std::string command; | |
287 | int r; | |
288 | ||
289 | command = "rbd mirror status"; | |
9f95a23c | 290 | r = admin_socket->register_command(command, this, |
7c673cae FG |
291 | "get status for rbd mirror"); |
292 | if (r == 0) { | |
293 | commands[command] = new StatusCommand(mirror); | |
294 | } | |
295 | ||
296 | command = "rbd mirror start"; | |
9f95a23c | 297 | r = admin_socket->register_command(command, this, |
7c673cae FG |
298 | "start rbd mirror"); |
299 | if (r == 0) { | |
300 | commands[command] = new StartCommand(mirror); | |
301 | } | |
302 | ||
303 | command = "rbd mirror stop"; | |
9f95a23c | 304 | r = admin_socket->register_command(command, this, |
7c673cae FG |
305 | "stop rbd mirror"); |
306 | if (r == 0) { | |
307 | commands[command] = new StopCommand(mirror); | |
308 | } | |
309 | ||
310 | command = "rbd mirror restart"; | |
9f95a23c | 311 | r = admin_socket->register_command(command, this, |
7c673cae FG |
312 | "restart rbd mirror"); |
313 | if (r == 0) { | |
314 | commands[command] = new RestartCommand(mirror); | |
315 | } | |
316 | ||
317 | command = "rbd mirror flush"; | |
9f95a23c | 318 | r = admin_socket->register_command(command, this, |
7c673cae FG |
319 | "flush rbd mirror"); |
320 | if (r == 0) { | |
321 | commands[command] = new FlushCommand(mirror); | |
322 | } | |
323 | ||
324 | command = "rbd mirror leader release"; | |
9f95a23c | 325 | r = admin_socket->register_command(command, this, |
7c673cae FG |
326 | "release rbd mirror leader"); |
327 | if (r == 0) { | |
328 | commands[command] = new LeaderReleaseCommand(mirror); | |
329 | } | |
330 | } | |
331 | ||
332 | ~MirrorAdminSocketHook() override { | |
9f95a23c | 333 | (void)admin_socket->unregister_commands(this); |
7c673cae FG |
334 | for (Commands::const_iterator i = commands.begin(); i != commands.end(); |
335 | ++i) { | |
7c673cae FG |
336 | delete i->second; |
337 | } | |
338 | } | |
339 | ||
9f95a23c TL |
340 | int call(std::string_view command, const cmdmap_t& cmdmap, |
341 | Formatter *f, | |
342 | std::ostream& errss, | |
343 | bufferlist& out) override { | |
7c673cae | 344 | Commands::const_iterator i = commands.find(command); |
11fdf7f2 | 345 | ceph_assert(i != commands.end()); |
9f95a23c | 346 | return i->second->call(f); |
7c673cae FG |
347 | } |
348 | ||
349 | private: | |
11fdf7f2 | 350 | typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands; |
7c673cae FG |
351 | |
352 | AdminSocket *admin_socket; | |
353 | Commands commands; | |
354 | }; | |
355 | ||
9f95a23c TL |
356 | class CacheManagerHandler : public journal::CacheManagerHandler { |
357 | public: | |
358 | CacheManagerHandler(CephContext *cct) | |
359 | : m_cct(cct) { | |
360 | ||
361 | if (!m_cct->_conf.get_val<bool>("rbd_mirror_memory_autotune")) { | |
362 | return; | |
363 | } | |
364 | ||
365 | uint64_t base = m_cct->_conf.get_val<Option::size_t>( | |
366 | "rbd_mirror_memory_base"); | |
367 | double fragmentation = m_cct->_conf.get_val<double>( | |
368 | "rbd_mirror_memory_expected_fragmentation"); | |
369 | uint64_t target = m_cct->_conf.get_val<Option::size_t>( | |
370 | "rbd_mirror_memory_target"); | |
371 | uint64_t min = m_cct->_conf.get_val<Option::size_t>( | |
372 | "rbd_mirror_memory_cache_min"); | |
373 | uint64_t max = min; | |
374 | ||
375 | // When setting the maximum amount of memory to use for cache, first | |
376 | // assume some base amount of memory for the daemon and then fudge in | |
377 | // some overhead for fragmentation that scales with cache usage. | |
378 | uint64_t ltarget = (1.0 - fragmentation) * target; | |
379 | if (ltarget > base + min) { | |
380 | max = ltarget - base; | |
381 | } | |
382 | ||
383 | m_next_balance = ceph_clock_now(); | |
384 | m_next_resize = ceph_clock_now(); | |
385 | ||
386 | m_cache_manager = std::make_unique<PriorityCache::Manager>( | |
387 | m_cct, min, max, target, false); | |
388 | } | |
389 | ||
390 | ~CacheManagerHandler() { | |
391 | std::lock_guard locker{m_lock}; | |
392 | ||
393 | ceph_assert(m_caches.empty()); | |
394 | } | |
395 | ||
396 | void register_cache(const std::string &cache_name, | |
397 | uint64_t min_size, uint64_t max_size, | |
398 | journal::CacheRebalanceHandler* handler) override { | |
399 | if (!m_cache_manager) { | |
400 | handler->handle_cache_rebalanced(max_size); | |
401 | return; | |
402 | } | |
403 | ||
404 | dout(20) << cache_name << " min_size=" << min_size << " max_size=" | |
405 | << max_size << " handler=" << handler << dendl; | |
406 | ||
407 | std::lock_guard locker{m_lock}; | |
408 | ||
409 | auto p = m_caches.insert( | |
410 | {cache_name, {cache_name, min_size, max_size, handler}}); | |
411 | ceph_assert(p.second == true); | |
412 | ||
413 | m_cache_manager->insert(cache_name, p.first->second.pri_cache, false); | |
414 | m_next_balance = ceph_clock_now(); | |
415 | } | |
416 | ||
417 | void unregister_cache(const std::string &cache_name) override { | |
418 | if (!m_cache_manager) { | |
419 | return; | |
420 | } | |
421 | ||
422 | dout(20) << cache_name << dendl; | |
423 | ||
424 | std::lock_guard locker{m_lock}; | |
425 | ||
426 | auto it = m_caches.find(cache_name); | |
427 | ceph_assert(it != m_caches.end()); | |
428 | ||
429 | m_cache_manager->erase(cache_name); | |
430 | m_caches.erase(it); | |
431 | m_next_balance = ceph_clock_now(); | |
432 | } | |
433 | ||
434 | void run_cache_manager() { | |
435 | if (!m_cache_manager) { | |
436 | return; | |
437 | } | |
438 | ||
439 | std::lock_guard locker{m_lock}; | |
440 | ||
441 | // Before we trim, check and see if it's time to rebalance/resize. | |
442 | auto autotune_interval = m_cct->_conf.get_val<double>( | |
443 | "rbd_mirror_memory_cache_autotune_interval"); | |
444 | auto resize_interval = m_cct->_conf.get_val<double>( | |
445 | "rbd_mirror_memory_cache_resize_interval"); | |
446 | ||
447 | utime_t now = ceph_clock_now(); | |
448 | ||
449 | if (autotune_interval > 0 && m_next_balance <= now) { | |
450 | dout(20) << "balance" << dendl; | |
451 | m_cache_manager->balance(); | |
452 | ||
453 | for (auto &it : m_caches) { | |
454 | auto pri_cache = static_cast<PriCache *>(it.second.pri_cache.get()); | |
455 | auto new_cache_bytes = pri_cache->get_cache_bytes(); | |
456 | it.second.handler->handle_cache_rebalanced(new_cache_bytes); | |
457 | pri_cache->prioritize(); | |
458 | } | |
459 | ||
460 | m_next_balance = ceph_clock_now(); | |
461 | m_next_balance += autotune_interval; | |
462 | } | |
463 | ||
464 | if (resize_interval > 0 && m_next_resize < now) { | |
465 | if (ceph_using_tcmalloc()) { | |
466 | dout(20) << "tune memory" << dendl; | |
467 | m_cache_manager->tune_memory(); | |
468 | } | |
469 | ||
470 | m_next_resize = ceph_clock_now(); | |
471 | m_next_resize += resize_interval; | |
472 | } | |
473 | } | |
474 | ||
475 | private: | |
476 | struct Cache { | |
477 | std::shared_ptr<PriorityCache::PriCache> pri_cache; | |
478 | journal::CacheRebalanceHandler *handler; | |
479 | ||
480 | Cache(const std::string name, uint64_t min_size, uint64_t max_size, | |
481 | journal::CacheRebalanceHandler *handler) | |
482 | : pri_cache(new PriCache(name, min_size, max_size)), handler(handler) { | |
483 | } | |
484 | }; | |
485 | ||
486 | CephContext *m_cct; | |
487 | ||
488 | mutable ceph::mutex m_lock = | |
489 | ceph::make_mutex("rbd::mirror::CacheManagerHandler"); | |
490 | std::unique_ptr<PriorityCache::Manager> m_cache_manager; | |
491 | std::map<std::string, Cache> m_caches; | |
492 | ||
493 | utime_t m_next_balance; | |
494 | utime_t m_next_resize; | |
495 | }; | |
496 | ||
7c673cae FG |
497 | Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args) : |
498 | m_cct(cct), | |
499 | m_args(args), | |
7c673cae | 500 | m_local(new librados::Rados()), |
9f95a23c TL |
501 | m_cache_manager_handler(new CacheManagerHandler(cct)), |
502 | m_pool_meta_cache(new PoolMetaCache(cct)), | |
f67539c2 | 503 | m_asok_hook(new MirrorAdminSocketHook(cct, this)) { |
7c673cae FG |
504 | } |
505 | ||
506 | Mirror::~Mirror() | |
507 | { | |
508 | delete m_asok_hook; | |
509 | } | |
510 | ||
511 | void Mirror::handle_signal(int signum) | |
512 | { | |
92f5a8d4 TL |
513 | dout(20) << signum << dendl; |
514 | ||
9f95a23c | 515 | std::lock_guard l{m_lock}; |
92f5a8d4 TL |
516 | |
517 | switch (signum) { | |
518 | case SIGHUP: | |
519 | for (auto &it : m_pool_replayers) { | |
520 | it.second->reopen_logs(); | |
521 | } | |
522 | g_ceph_context->reopen_logs(); | |
523 | break; | |
524 | ||
525 | case SIGINT: | |
526 | case SIGTERM: | |
527 | m_stopping = true; | |
9f95a23c | 528 | m_cond.notify_all(); |
92f5a8d4 TL |
529 | break; |
530 | ||
531 | default: | |
532 | ceph_abort_msgf("unexpected signal %d", signum); | |
7c673cae FG |
533 | } |
534 | } | |
535 | ||
536 | int Mirror::init() | |
537 | { | |
538 | int r = m_local->init_with_context(m_cct); | |
539 | if (r < 0) { | |
540 | derr << "could not initialize rados handle" << dendl; | |
541 | return r; | |
542 | } | |
543 | ||
544 | r = m_local->connect(); | |
545 | if (r < 0) { | |
546 | derr << "error connecting to local cluster" << dendl; | |
547 | return r; | |
548 | } | |
549 | ||
f67539c2 TL |
550 | m_threads = &(m_cct->lookup_or_create_singleton_object< |
551 | Threads<librbd::ImageCtx>>("rbd_mirror::threads", false, m_local)); | |
552 | m_service_daemon.reset(new ServiceDaemon<>(m_cct, m_local, m_threads)); | |
553 | ||
c07f9fc5 FG |
554 | r = m_service_daemon->init(); |
555 | if (r < 0) { | |
556 | derr << "error registering service daemon: " << cpp_strerror(r) << dendl; | |
557 | return r; | |
558 | } | |
7c673cae | 559 | |
c07f9fc5 FG |
560 | m_local_cluster_watcher.reset(new ClusterWatcher(m_local, m_lock, |
561 | m_service_daemon.get())); | |
7c673cae FG |
562 | return r; |
563 | } | |
564 | ||
565 | void Mirror::run() | |
566 | { | |
567 | dout(20) << "enter" << dendl; | |
9f95a23c | 568 | |
20effc67 | 569 | using namespace std::chrono_literals; |
9f95a23c TL |
570 | utime_t next_refresh_pools = ceph_clock_now(); |
571 | ||
7c673cae | 572 | while (!m_stopping) { |
9f95a23c TL |
573 | utime_t now = ceph_clock_now(); |
574 | bool refresh_pools = next_refresh_pools <= now; | |
575 | if (refresh_pools) { | |
576 | m_local_cluster_watcher->refresh_pools(); | |
577 | next_refresh_pools = ceph_clock_now(); | |
578 | next_refresh_pools += m_cct->_conf.get_val<uint64_t>( | |
579 | "rbd_mirror_pool_replayers_refresh_interval"); | |
580 | } | |
581 | std::unique_lock l{m_lock}; | |
7c673cae | 582 | if (!m_manual_stop) { |
9f95a23c TL |
583 | if (refresh_pools) { |
584 | update_pool_replayers(m_local_cluster_watcher->get_pool_peers(), | |
585 | m_local_cluster_watcher->get_site_name()); | |
586 | } | |
587 | m_cache_manager_handler->run_cache_manager(); | |
7c673cae | 588 | } |
9f95a23c | 589 | m_cond.wait_for(l, 1s); |
7c673cae FG |
590 | } |
591 | ||
592 | // stop all pool replayers in parallel | |
9f95a23c | 593 | std::lock_guard locker{m_lock}; |
7c673cae FG |
594 | for (auto &pool_replayer : m_pool_replayers) { |
595 | pool_replayer.second->stop(false); | |
596 | } | |
597 | dout(20) << "return" << dendl; | |
598 | } | |
599 | ||
9f95a23c | 600 | void Mirror::print_status(Formatter *f) |
7c673cae FG |
601 | { |
602 | dout(20) << "enter" << dendl; | |
603 | ||
9f95a23c | 604 | std::lock_guard l{m_lock}; |
7c673cae FG |
605 | |
606 | if (m_stopping) { | |
607 | return; | |
608 | } | |
609 | ||
9f95a23c TL |
610 | f->open_object_section("mirror_status"); |
611 | f->open_array_section("pool_replayers"); | |
7c673cae | 612 | for (auto &pool_replayer : m_pool_replayers) { |
9f95a23c | 613 | pool_replayer.second->print_status(f); |
7c673cae | 614 | } |
9f95a23c TL |
615 | f->close_section(); |
616 | f->close_section(); | |
7c673cae FG |
617 | } |
618 | ||
619 | void Mirror::start() | |
620 | { | |
621 | dout(20) << "enter" << dendl; | |
9f95a23c | 622 | std::lock_guard l{m_lock}; |
7c673cae FG |
623 | |
624 | if (m_stopping) { | |
625 | return; | |
626 | } | |
627 | ||
628 | m_manual_stop = false; | |
629 | ||
630 | for (auto &pool_replayer : m_pool_replayers) { | |
631 | pool_replayer.second->start(); | |
632 | } | |
633 | } | |
634 | ||
635 | void Mirror::stop() | |
636 | { | |
637 | dout(20) << "enter" << dendl; | |
9f95a23c | 638 | std::lock_guard l{m_lock}; |
7c673cae FG |
639 | |
640 | if (m_stopping) { | |
641 | return; | |
642 | } | |
643 | ||
644 | m_manual_stop = true; | |
645 | ||
646 | for (auto &pool_replayer : m_pool_replayers) { | |
647 | pool_replayer.second->stop(true); | |
648 | } | |
649 | } | |
650 | ||
651 | void Mirror::restart() | |
652 | { | |
653 | dout(20) << "enter" << dendl; | |
9f95a23c | 654 | std::lock_guard l{m_lock}; |
7c673cae FG |
655 | |
656 | if (m_stopping) { | |
657 | return; | |
658 | } | |
659 | ||
660 | m_manual_stop = false; | |
661 | ||
662 | for (auto &pool_replayer : m_pool_replayers) { | |
663 | pool_replayer.second->restart(); | |
664 | } | |
665 | } | |
666 | ||
667 | void Mirror::flush() | |
668 | { | |
669 | dout(20) << "enter" << dendl; | |
9f95a23c | 670 | std::lock_guard l{m_lock}; |
7c673cae FG |
671 | |
672 | if (m_stopping || m_manual_stop) { | |
673 | return; | |
674 | } | |
675 | ||
676 | for (auto &pool_replayer : m_pool_replayers) { | |
677 | pool_replayer.second->flush(); | |
678 | } | |
679 | } | |
680 | ||
681 | void Mirror::release_leader() | |
682 | { | |
683 | dout(20) << "enter" << dendl; | |
9f95a23c | 684 | std::lock_guard l{m_lock}; |
7c673cae FG |
685 | |
686 | if (m_stopping) { | |
687 | return; | |
688 | } | |
689 | ||
690 | for (auto &pool_replayer : m_pool_replayers) { | |
691 | pool_replayer.second->release_leader(); | |
692 | } | |
693 | } | |
694 | ||
9f95a23c TL |
695 | void Mirror::update_pool_replayers(const PoolPeers &pool_peers, |
696 | const std::string& site_name) | |
7c673cae FG |
697 | { |
698 | dout(20) << "enter" << dendl; | |
9f95a23c | 699 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
700 | |
701 | // remove stale pool replayers before creating new pool replayers | |
702 | for (auto it = m_pool_replayers.begin(); it != m_pool_replayers.end();) { | |
703 | auto &peer = it->first.second; | |
704 | auto pool_peer_it = pool_peers.find(it->first.first); | |
c07f9fc5 FG |
705 | if (pool_peer_it == pool_peers.end() || |
706 | pool_peer_it->second.find(peer) == pool_peer_it->second.end()) { | |
7c673cae FG |
707 | dout(20) << "removing pool replayer for " << peer << dendl; |
708 | // TODO: make async | |
c07f9fc5 | 709 | it->second->shut_down(); |
7c673cae FG |
710 | it = m_pool_replayers.erase(it); |
711 | } else { | |
712 | ++it; | |
713 | } | |
714 | } | |
715 | ||
716 | for (auto &kv : pool_peers) { | |
717 | for (auto &peer : kv.second) { | |
718 | PoolPeer pool_peer(kv.first, peer); | |
c07f9fc5 FG |
719 | |
720 | auto pool_replayers_it = m_pool_replayers.find(pool_peer); | |
721 | if (pool_replayers_it != m_pool_replayers.end()) { | |
722 | auto& pool_replayer = pool_replayers_it->second; | |
9f95a23c TL |
723 | if (!m_site_name.empty() && !site_name.empty() && |
724 | m_site_name != site_name) { | |
725 | dout(0) << "restarting pool replayer for " << peer << " due to " | |
726 | << "updated site name" << dendl; | |
727 | // TODO: make async | |
728 | pool_replayer->shut_down(); | |
729 | pool_replayer->init(site_name); | |
f67539c2 TL |
730 | } else if (pool_replayer->is_blocklisted()) { |
731 | derr << "restarting blocklisted pool replayer for " << peer << dendl; | |
c07f9fc5 FG |
732 | // TODO: make async |
733 | pool_replayer->shut_down(); | |
9f95a23c | 734 | pool_replayer->init(site_name); |
c07f9fc5 FG |
735 | } else if (!pool_replayer->is_running()) { |
736 | derr << "restarting failed pool replayer for " << peer << dendl; | |
737 | // TODO: make async | |
738 | pool_replayer->shut_down(); | |
9f95a23c | 739 | pool_replayer->init(site_name); |
c07f9fc5 FG |
740 | } |
741 | } else { | |
7c673cae | 742 | dout(20) << "starting pool replayer for " << peer << dendl; |
9f95a23c TL |
743 | unique_ptr<PoolReplayer<>> pool_replayer( |
744 | new PoolReplayer<>(m_threads, m_service_daemon.get(), | |
745 | m_cache_manager_handler.get(), | |
746 | m_pool_meta_cache.get(), kv.first, peer, | |
747 | m_args)); | |
7c673cae | 748 | |
c07f9fc5 | 749 | // TODO: make async |
9f95a23c | 750 | pool_replayer->init(site_name); |
7c673cae FG |
751 | m_pool_replayers.emplace(pool_peer, std::move(pool_replayer)); |
752 | } | |
753 | } | |
754 | ||
755 | // TODO currently only support a single peer | |
756 | } | |
9f95a23c TL |
757 | |
758 | m_site_name = site_name; | |
7c673cae FG |
759 | } |
760 | ||
761 | } // namespace mirror | |
762 | } // namespace rbd |