]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/cephfs_mirror/Mirror.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / tools / cephfs_mirror / Mirror.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "common/ceph_argparse.h"
5#include "common/ceph_context.h"
6#include "common/common_init.h"
7#include "common/Cond.h"
8#include "common/debug.h"
9#include "common/errno.h"
10#include "common/Timer.h"
11#include "common/WorkQueue.h"
12#include "include/types.h"
13#include "mon/MonClient.h"
14#include "msg/Messenger.h"
15#include "aio_utils.h"
16#include "Mirror.h"
17
18#define dout_context g_ceph_context
19#define dout_subsys ceph_subsys_cephfs_mirror
20#undef dout_prefix
21#define dout_prefix *_dout << "cephfs::mirror::Mirror " << __func__
22
23namespace cephfs {
24namespace mirror {
25
26namespace {
27
28const std::string SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY("mirroring_failed");
29
30class SafeTimerSingleton : public SafeTimer {
31public:
32 ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock");
33
34 explicit SafeTimerSingleton(CephContext *cct)
35 : SafeTimer(cct, timer_lock, true) {
36 init();
37 }
38};
39
40class ThreadPoolSingleton : public ThreadPool {
41public:
42 ContextWQ *work_queue = nullptr;
43
44 explicit ThreadPoolSingleton(CephContext *cct)
45 : ThreadPool(cct, "Mirror::thread_pool", "tp_mirror", 1) {
46 work_queue = new ContextWQ("Mirror::work_queue", ceph::make_timespan(60), this);
47
48 start();
49 }
50};
51
52} // anonymous namespace
53
54struct Mirror::C_EnableMirroring : Context {
55 Mirror *mirror;
56 Filesystem filesystem;
57 uint64_t pool_id;
58
59 C_EnableMirroring(Mirror *mirror, const Filesystem &filesystem, uint64_t pool_id)
60 : mirror(mirror),
61 filesystem(filesystem),
62 pool_id(pool_id) {
63 }
64
65 void finish(int r) override {
66 enable_mirroring();
67 }
68
69 void enable_mirroring() {
70 Context *ctx = new C_CallbackAdapter<C_EnableMirroring,
71 &C_EnableMirroring::handle_enable_mirroring>(this);
72 mirror->enable_mirroring(filesystem, pool_id, ctx);
73 }
74
75 void handle_enable_mirroring(int r) {
76 mirror->handle_enable_mirroring(filesystem, r);
77 delete this;
78 }
79
80 // context needs to live post completion
81 void complete(int r) override {
82 finish(r);
83 }
84};
85
86struct Mirror::C_DisableMirroring : Context {
87 Mirror *mirror;
88 Filesystem filesystem;
89
90 C_DisableMirroring(Mirror *mirror, const Filesystem &filesystem)
91 : mirror(mirror),
92 filesystem(filesystem) {
93 }
94
95 void finish(int r) override {
96 disable_mirroring();
97 }
98
99 void disable_mirroring() {
100 Context *ctx = new C_CallbackAdapter<C_DisableMirroring,
101 &C_DisableMirroring::handle_disable_mirroring>(this);
102 mirror->disable_mirroring(filesystem, ctx);
103 }
104
105 void handle_disable_mirroring(int r) {
106 mirror->handle_disable_mirroring(filesystem, r);
107 delete this;
108 }
109
110 // context needs to live post completion
111 void complete(int r) override {
112 finish(r);
113 }
114};
115
116struct Mirror::C_PeerUpdate : Context {
117 Mirror *mirror;
118 Filesystem filesystem;
119 Peer peer;
120 bool remove = false;
121
122 C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem,
123 const Peer &peer)
124 : mirror(mirror),
125 filesystem(filesystem),
126 peer(peer) {
127 }
128 C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem,
129 const Peer &peer, bool remove)
130 : mirror(mirror),
131 filesystem(filesystem),
132 peer(peer),
133 remove(remove) {
134 }
135
136 void finish(int r) override {
137 if (remove) {
138 mirror->remove_peer(filesystem, peer);
139 } else {
140 mirror->add_peer(filesystem, peer);
141 }
142 }
143};
144
145struct Mirror::C_RestartMirroring : Context {
146 Mirror *mirror;
147 Filesystem filesystem;
148 uint64_t pool_id;
149 Peers peers;
150
151 C_RestartMirroring(Mirror *mirror, const Filesystem &filesystem,
152 uint64_t pool_id, const Peers &peers)
153 : mirror(mirror),
154 filesystem(filesystem),
155 pool_id(pool_id),
156 peers(peers) {
157 }
158
159 void finish(int r) override {
160 disable_mirroring();
161 }
162
163 void disable_mirroring() {
164 Context *ctx = new C_CallbackAdapter<C_RestartMirroring,
165 &C_RestartMirroring::handle_disable_mirroring>(this);
166 mirror->disable_mirroring(filesystem, ctx);
167 }
168
169 void handle_disable_mirroring(int r) {
170 enable_mirroring();
171 }
172
173 void enable_mirroring() {
174 std::scoped_lock locker(mirror->m_lock);
175 Context *ctx = new C_CallbackAdapter<C_RestartMirroring,
176 &C_RestartMirroring::handle_enable_mirroring>(this);
177 mirror->enable_mirroring(filesystem, pool_id, ctx, true);
178 }
179
180 void handle_enable_mirroring(int r) {
181 mirror->handle_enable_mirroring(filesystem, peers, r);
182 delete this;
183 }
184
185 // context needs to live post completion
186 void complete(int r) override {
187 finish(r);
188 }
189};
190
191Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args,
192 MonClient *monc, Messenger *msgr)
193 : m_cct(cct),
194 m_args(args),
195 m_monc(monc),
196 m_msgr(msgr),
197 m_listener(this),
198 m_last_blocklist_check(ceph_clock_now()),
199 m_last_failure_check(ceph_clock_now()),
200 m_local(new librados::Rados()) {
201 auto thread_pool = &(cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
202 "cephfs::mirror::thread_pool", false, cct));
203 auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
204 "cephfs::mirror::safe_timer", false, cct));
205 m_thread_pool = thread_pool;
206 m_work_queue = thread_pool->work_queue;
207 m_timer = safe_timer;
208 m_timer_lock = &safe_timer->timer_lock;
209 std::scoped_lock timer_lock(*m_timer_lock);
210 schedule_mirror_update_task();
211}
212
213Mirror::~Mirror() {
214 dout(10) << dendl;
215 {
216 std::scoped_lock timer_lock(*m_timer_lock);
217 m_timer->shutdown();
218 }
219
220 m_work_queue->drain();
221 delete m_work_queue;
222 {
223 std::scoped_lock locker(m_lock);
224 m_thread_pool->stop();
225 m_cluster_watcher.reset();
226 }
227}
228
229int Mirror::init_mon_client() {
230 dout(20) << dendl;
231
232 m_monc->set_messenger(m_msgr);
233 m_msgr->add_dispatcher_head(m_monc);
234 m_monc->set_want_keys(CEPH_ENTITY_TYPE_MON);
235
236 int r = m_monc->init();
237 if (r < 0) {
238 derr << ": failed to init mon client: " << cpp_strerror(r) << dendl;
239 return r;
240 }
241
242 r = m_monc->authenticate(m_cct->_conf->client_mount_timeout);
243 if (r < 0) {
244 derr << ": failed to authenticate to monitor: " << cpp_strerror(r) << dendl;
245 return r;
246 }
247
248 client_t me = m_monc->get_global_id();
249 m_msgr->set_myname(entity_name_t::CLIENT(me.v));
250 return 0;
251}
252
253int Mirror::init(std::string &reason) {
254 dout(20) << dendl;
255
256 std::scoped_lock locker(m_lock);
257
258 int r = m_local->init_with_context(m_cct);
259 if (r < 0) {
260 derr << ": could not initialize rados handler" << dendl;
261 return r;
262 }
263
264 r = m_local->connect();
265 if (r < 0) {
266 derr << ": error connecting to local cluster" << dendl;
267 return r;
268 }
269
270 m_service_daemon = std::make_unique<ServiceDaemon>(m_cct, m_local);
271 r = m_service_daemon->init();
272 if (r < 0) {
273 derr << ": error registering service daemon: " << cpp_strerror(r) << dendl;
274 return r;
275 }
276
277 r = init_mon_client();
278 if (r < 0) {
279 return r;
280 }
281
282 return 0;
283}
284
285void Mirror::shutdown() {
286 dout(20) << dendl;
f67539c2
TL
287 m_stopping = true;
288 m_cond.notify_all();
289}
290
b3b6e05e
TL
291void Mirror::reopen_logs() {
292 for (auto &[filesystem, mirror_action] : m_mirror_actions) {
293 mirror_action.fs_mirror->reopen_logs();
294 }
295 g_ceph_context->reopen_logs();
296}
297
f67539c2
TL
298void Mirror::handle_signal(int signum) {
299 dout(10) << ": signal=" << signum << dendl;
b3b6e05e
TL
300
301 std::scoped_lock locker(m_lock);
302 switch (signum) {
303 case SIGHUP:
304 reopen_logs();
305 break;
306 case SIGINT:
307 case SIGTERM:
308 shutdown();
309 break;
310 default:
311 ceph_abort_msgf("unexpected signal %d", signum);
312 }
f67539c2
TL
313}
314
315void Mirror::handle_enable_mirroring(const Filesystem &filesystem,
316 const Peers &peers, int r) {
317 dout(20) << ": filesystem=" << filesystem << ", peers=" << peers
318 << ", r=" << r << dendl;
319
320 std::scoped_lock locker(m_lock);
321 auto &mirror_action = m_mirror_actions.at(filesystem);
322 ceph_assert(mirror_action.action_in_progress);
323
324 mirror_action.action_in_progress = false;
325 m_cond.notify_all();
326 if (r < 0) {
327 derr << ": failed to initialize FSMirror for filesystem=" << filesystem
328 << ": " << cpp_strerror(r) << dendl;
329 m_service_daemon->add_or_update_fs_attribute(filesystem.fscid,
330 SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY,
331 true);
332 return;
333 }
334
335 for (auto &peer : peers) {
336 mirror_action.fs_mirror->add_peer(peer);
337 }
338
339 dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl;
340}
341
342void Mirror::handle_enable_mirroring(const Filesystem &filesystem, int r) {
343 dout(20) << ": filesystem=" << filesystem << ", r=" << r << dendl;
344
345 std::scoped_lock locker(m_lock);
346 auto &mirror_action = m_mirror_actions.at(filesystem);
347 ceph_assert(mirror_action.action_in_progress);
348
349 mirror_action.action_in_progress = false;
350 m_cond.notify_all();
351 if (r < 0) {
352 derr << ": failed to initialize FSMirror for filesystem=" << filesystem
353 << ": " << cpp_strerror(r) << dendl;
354 m_service_daemon->add_or_update_fs_attribute(filesystem.fscid,
355 SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY,
356 true);
357 return;
358 }
359
360 dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl;
361}
362
363void Mirror::enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id,
364 Context *on_finish, bool is_restart) {
365 ceph_assert(ceph_mutex_is_locked(m_lock));
366
367 auto &mirror_action = m_mirror_actions.at(filesystem);
368 if (is_restart) {
369 mirror_action.fs_mirror.reset();
370 } else {
371 ceph_assert(!mirror_action.action_in_progress);
372 }
373
374 ceph_assert(!mirror_action.fs_mirror);
375
376 dout(10) << ": starting FSMirror: filesystem=" << filesystem << dendl;
377
378 mirror_action.action_in_progress = true;
379 mirror_action.fs_mirror = std::make_unique<FSMirror>(m_cct, filesystem, local_pool_id,
380 m_service_daemon.get(), m_args, m_work_queue);
381 mirror_action.fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
382}
383
384void Mirror::mirroring_enabled(const Filesystem &filesystem, uint64_t local_pool_id) {
385 dout(10) << ": filesystem=" << filesystem << ", pool_id=" << local_pool_id << dendl;
386
387 std::scoped_lock locker(m_lock);
388 if (m_stopping) {
389 return;
390 }
391
392 auto p = m_mirror_actions.emplace(filesystem, MirrorAction(local_pool_id));
393 auto &mirror_action = p.first->second;
394 mirror_action.action_ctxs.push_back(new C_EnableMirroring(this, filesystem, local_pool_id));
395}
396
397void Mirror::handle_disable_mirroring(const Filesystem &filesystem, int r) {
398 dout(10) << ": filesystem=" << filesystem << ", r=" << r << dendl;
399
400 std::scoped_lock locker(m_lock);
401 auto &mirror_action = m_mirror_actions.at(filesystem);
402
403 if (!mirror_action.fs_mirror->is_init_failed()) {
404 ceph_assert(mirror_action.action_in_progress);
405 mirror_action.action_in_progress = false;
406 m_cond.notify_all();
407 }
408
409 if (!m_stopping) {
410 mirror_action.fs_mirror.reset();
411 if (mirror_action.action_ctxs.empty()) {
412 dout(10) << ": no pending actions for filesystem=" << filesystem << dendl;
413 m_mirror_actions.erase(filesystem);
414 }
415 }
416}
417
418void Mirror::disable_mirroring(const Filesystem &filesystem, Context *on_finish) {
419 ceph_assert(ceph_mutex_is_locked(m_lock));
420
421 auto &mirror_action = m_mirror_actions.at(filesystem);
422 ceph_assert(mirror_action.fs_mirror);
423 ceph_assert(!mirror_action.action_in_progress);
424
425 if (mirror_action.fs_mirror->is_init_failed()) {
426 dout(10) << ": init failed for filesystem=" << filesystem << dendl;
427 m_work_queue->queue(on_finish, -EINVAL);
428 return;
429 }
430
431 mirror_action.action_in_progress = true;
432 mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish));
433}
434
435void Mirror::mirroring_disabled(const Filesystem &filesystem) {
436 dout(10) << ": filesystem=" << filesystem << dendl;
437
438 std::scoped_lock locker(m_lock);
439 if (m_stopping) {
440 dout(5) << "shutting down" << dendl;
441 return;
442 }
443
444 auto &mirror_action = m_mirror_actions.at(filesystem);
445 mirror_action.action_ctxs.push_back(new C_DisableMirroring(this, filesystem));
446}
447
448void Mirror::add_peer(const Filesystem &filesystem, const Peer &peer) {
449 ceph_assert(ceph_mutex_is_locked(m_lock));
450
451 auto &mirror_action = m_mirror_actions.at(filesystem);
452 ceph_assert(mirror_action.fs_mirror);
453 ceph_assert(!mirror_action.action_in_progress);
454
455 mirror_action.fs_mirror->add_peer(peer);
456}
457
458void Mirror::peer_added(const Filesystem &filesystem, const Peer &peer) {
459 dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl;
460
461 std::scoped_lock locker(m_lock);
462 if (m_stopping) {
463 dout(5) << "shutting down" << dendl;
464 return;
465 }
466
467 auto &mirror_action = m_mirror_actions.at(filesystem);
468 mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer));
469}
470
471void Mirror::remove_peer(const Filesystem &filesystem, const Peer &peer) {
472 ceph_assert(ceph_mutex_is_locked(m_lock));
473
474 auto &mirror_action = m_mirror_actions.at(filesystem);
475 ceph_assert(mirror_action.fs_mirror);
476 ceph_assert(!mirror_action.action_in_progress);
477
478 mirror_action.fs_mirror->remove_peer(peer);
479}
480
481void Mirror::peer_removed(const Filesystem &filesystem, const Peer &peer) {
482 dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl;
483
484 std::scoped_lock locker(m_lock);
485 if (m_stopping) {
486 dout(5) << "shutting down" << dendl;
487 return;
488 }
489
490 auto &mirror_action = m_mirror_actions.at(filesystem);
491 mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer, true));
492}
493
494void Mirror::update_fs_mirrors() {
495 dout(20) << dendl;
496
497 auto now = ceph_clock_now();
498 double blocklist_interval = g_ceph_context->_conf.get_val<std::chrono::seconds>
499 ("cephfs_mirror_restart_mirror_on_blocklist_interval").count();
500 bool check_blocklist = blocklist_interval > 0 && ((now - m_last_blocklist_check) >= blocklist_interval);
501
502 double failed_interval = g_ceph_context->_conf.get_val<std::chrono::seconds>
503 ("cephfs_mirror_restart_mirror_on_failure_interval").count();
504 bool check_failure = failed_interval > 0 && ((now - m_last_failure_check) >= failed_interval);
505
506 {
507 std::scoped_lock locker(m_lock);
508 for (auto &[filesystem, mirror_action] : m_mirror_actions) {
509 if (check_failure && !mirror_action.action_in_progress &&
510 mirror_action.fs_mirror && mirror_action.fs_mirror->is_failed()) {
511 // about to restart failed mirror instance -- nothing
512 // should interfere
513 dout(5) << ": filesystem=" << filesystem << " failed mirroring -- restarting" << dendl;
514 auto peers = mirror_action.fs_mirror->get_peers();
515 mirror_action.action_ctxs.push_front(
516 new C_RestartMirroring(this, filesystem, mirror_action.pool_id, peers));
517 } else if (check_blocklist && !mirror_action.action_in_progress &&
518 mirror_action.fs_mirror && mirror_action.fs_mirror->is_blocklisted()) {
519 // about to restart blocklisted mirror instance -- nothing
520 // should interfere
521 dout(5) << ": filesystem=" << filesystem << " is blocklisted -- restarting" << dendl;
522 auto peers = mirror_action.fs_mirror->get_peers();
523 mirror_action.action_ctxs.push_front(
524 new C_RestartMirroring(this, filesystem, mirror_action.pool_id, peers));
525 }
526 if (!mirror_action.action_ctxs.empty() && !mirror_action.action_in_progress) {
527 auto ctx = std::move(mirror_action.action_ctxs.front());
528 mirror_action.action_ctxs.pop_front();
529 ctx->complete(0);
530 }
531 }
532
533 if (check_blocklist) {
534 m_last_blocklist_check = now;
535 }
536 if (check_failure) {
537 m_last_failure_check = now;
538 }
539 }
540
541 schedule_mirror_update_task();
542}
543
544void Mirror::schedule_mirror_update_task() {
545 ceph_assert(m_timer_task == nullptr);
546 ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
547
548 m_timer_task = new LambdaContext([this](int _) {
549 m_timer_task = nullptr;
550 update_fs_mirrors();
551 });
552 double after = g_ceph_context->_conf.get_val<std::chrono::seconds>
553 ("cephfs_mirror_action_update_interval").count();
554 dout(20) << ": scheduling fs mirror update (" << m_timer_task << ") after "
555 << after << " seconds" << dendl;
556 m_timer->add_event_after(after, m_timer_task);
557}
558
559void Mirror::run() {
560 dout(20) << dendl;
561
562 std::unique_lock locker(m_lock);
563 m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_service_daemon.get(), m_listener));
564 m_msgr->add_dispatcher_tail(m_cluster_watcher.get());
565
566 m_cluster_watcher->init();
567 m_cond.wait(locker, [this]{return m_stopping;});
568
569 locker.unlock();
570 {
571 std::scoped_lock timer_lock(*m_timer_lock);
572 if (m_timer_task != nullptr) {
573 dout(10) << ": canceling timer task=" << m_timer_task << dendl;
574 m_timer->cancel_event(m_timer_task);
575 m_timer_task = nullptr;
576 }
577 }
578 locker.lock();
579
580 for (auto &[filesystem, mirror_action] : m_mirror_actions) {
581 dout(10) << ": trying to shutdown filesystem=" << filesystem << dendl;
582 // wait for in-progress action and shutdown
583 m_cond.wait(locker, [&mirror_action=mirror_action]
584 {return !mirror_action.action_in_progress;});
585 if (mirror_action.fs_mirror &&
586 !mirror_action.fs_mirror->is_stopping() &&
587 !mirror_action.fs_mirror->is_init_failed()) {
588 C_SaferCond cond;
589 mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, &cond));
590 int r = cond.wait();
591 dout(10) << ": shutdown filesystem=" << filesystem << ", r=" << r << dendl;
592 }
593
594 mirror_action.fs_mirror.reset();
595 }
596}
597
598} // namespace mirror
599} // namespace cephfs
600