]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "common/ceph_argparse.h" | |
5 | #include "common/ceph_context.h" | |
6 | #include "common/common_init.h" | |
7 | #include "common/Cond.h" | |
8 | #include "common/debug.h" | |
9 | #include "common/errno.h" | |
10 | #include "common/Timer.h" | |
11 | #include "common/WorkQueue.h" | |
12 | #include "include/types.h" | |
13 | #include "mon/MonClient.h" | |
14 | #include "msg/Messenger.h" | |
15 | #include "aio_utils.h" | |
16 | #include "Mirror.h" | |
17 | ||
18 | #define dout_context g_ceph_context | |
19 | #define dout_subsys ceph_subsys_cephfs_mirror | |
20 | #undef dout_prefix | |
21 | #define dout_prefix *_dout << "cephfs::mirror::Mirror " << __func__ | |
22 | ||
23 | namespace cephfs { | |
24 | namespace mirror { | |
25 | ||
26 | namespace { | |
27 | ||
28 | const std::string SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY("mirroring_failed"); | |
29 | ||
30 | class SafeTimerSingleton : public SafeTimer { | |
31 | public: | |
32 | ceph::mutex timer_lock = ceph::make_mutex("cephfs::mirror::timer_lock"); | |
33 | ||
34 | explicit SafeTimerSingleton(CephContext *cct) | |
35 | : SafeTimer(cct, timer_lock, true) { | |
36 | init(); | |
37 | } | |
38 | }; | |
39 | ||
40 | class ThreadPoolSingleton : public ThreadPool { | |
41 | public: | |
42 | ContextWQ *work_queue = nullptr; | |
43 | ||
44 | explicit ThreadPoolSingleton(CephContext *cct) | |
45 | : ThreadPool(cct, "Mirror::thread_pool", "tp_mirror", 1) { | |
46 | work_queue = new ContextWQ("Mirror::work_queue", ceph::make_timespan(60), this); | |
47 | ||
48 | start(); | |
49 | } | |
50 | }; | |
51 | ||
52 | } // anonymous namespace | |
53 | ||
54 | struct Mirror::C_EnableMirroring : Context { | |
55 | Mirror *mirror; | |
56 | Filesystem filesystem; | |
57 | uint64_t pool_id; | |
58 | ||
59 | C_EnableMirroring(Mirror *mirror, const Filesystem &filesystem, uint64_t pool_id) | |
60 | : mirror(mirror), | |
61 | filesystem(filesystem), | |
62 | pool_id(pool_id) { | |
63 | } | |
64 | ||
65 | void finish(int r) override { | |
66 | enable_mirroring(); | |
67 | } | |
68 | ||
69 | void enable_mirroring() { | |
70 | Context *ctx = new C_CallbackAdapter<C_EnableMirroring, | |
71 | &C_EnableMirroring::handle_enable_mirroring>(this); | |
72 | mirror->enable_mirroring(filesystem, pool_id, ctx); | |
73 | } | |
74 | ||
75 | void handle_enable_mirroring(int r) { | |
76 | mirror->handle_enable_mirroring(filesystem, r); | |
77 | delete this; | |
78 | } | |
79 | ||
80 | // context needs to live post completion | |
81 | void complete(int r) override { | |
82 | finish(r); | |
83 | } | |
84 | }; | |
85 | ||
86 | struct Mirror::C_DisableMirroring : Context { | |
87 | Mirror *mirror; | |
88 | Filesystem filesystem; | |
89 | ||
90 | C_DisableMirroring(Mirror *mirror, const Filesystem &filesystem) | |
91 | : mirror(mirror), | |
92 | filesystem(filesystem) { | |
93 | } | |
94 | ||
95 | void finish(int r) override { | |
96 | disable_mirroring(); | |
97 | } | |
98 | ||
99 | void disable_mirroring() { | |
100 | Context *ctx = new C_CallbackAdapter<C_DisableMirroring, | |
101 | &C_DisableMirroring::handle_disable_mirroring>(this); | |
102 | mirror->disable_mirroring(filesystem, ctx); | |
103 | } | |
104 | ||
105 | void handle_disable_mirroring(int r) { | |
106 | mirror->handle_disable_mirroring(filesystem, r); | |
107 | delete this; | |
108 | } | |
109 | ||
110 | // context needs to live post completion | |
111 | void complete(int r) override { | |
112 | finish(r); | |
113 | } | |
114 | }; | |
115 | ||
116 | struct Mirror::C_PeerUpdate : Context { | |
117 | Mirror *mirror; | |
118 | Filesystem filesystem; | |
119 | Peer peer; | |
120 | bool remove = false; | |
121 | ||
122 | C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem, | |
123 | const Peer &peer) | |
124 | : mirror(mirror), | |
125 | filesystem(filesystem), | |
126 | peer(peer) { | |
127 | } | |
128 | C_PeerUpdate(Mirror *mirror, const Filesystem &filesystem, | |
129 | const Peer &peer, bool remove) | |
130 | : mirror(mirror), | |
131 | filesystem(filesystem), | |
132 | peer(peer), | |
133 | remove(remove) { | |
134 | } | |
135 | ||
136 | void finish(int r) override { | |
137 | if (remove) { | |
138 | mirror->remove_peer(filesystem, peer); | |
139 | } else { | |
140 | mirror->add_peer(filesystem, peer); | |
141 | } | |
142 | } | |
143 | }; | |
144 | ||
145 | struct Mirror::C_RestartMirroring : Context { | |
146 | Mirror *mirror; | |
147 | Filesystem filesystem; | |
148 | uint64_t pool_id; | |
149 | Peers peers; | |
150 | ||
151 | C_RestartMirroring(Mirror *mirror, const Filesystem &filesystem, | |
152 | uint64_t pool_id, const Peers &peers) | |
153 | : mirror(mirror), | |
154 | filesystem(filesystem), | |
155 | pool_id(pool_id), | |
156 | peers(peers) { | |
157 | } | |
158 | ||
159 | void finish(int r) override { | |
160 | disable_mirroring(); | |
161 | } | |
162 | ||
163 | void disable_mirroring() { | |
164 | Context *ctx = new C_CallbackAdapter<C_RestartMirroring, | |
165 | &C_RestartMirroring::handle_disable_mirroring>(this); | |
166 | mirror->disable_mirroring(filesystem, ctx); | |
167 | } | |
168 | ||
169 | void handle_disable_mirroring(int r) { | |
170 | enable_mirroring(); | |
171 | } | |
172 | ||
173 | void enable_mirroring() { | |
174 | std::scoped_lock locker(mirror->m_lock); | |
175 | Context *ctx = new C_CallbackAdapter<C_RestartMirroring, | |
176 | &C_RestartMirroring::handle_enable_mirroring>(this); | |
177 | mirror->enable_mirroring(filesystem, pool_id, ctx, true); | |
178 | } | |
179 | ||
180 | void handle_enable_mirroring(int r) { | |
181 | mirror->handle_enable_mirroring(filesystem, peers, r); | |
182 | delete this; | |
183 | } | |
184 | ||
185 | // context needs to live post completion | |
186 | void complete(int r) override { | |
187 | finish(r); | |
188 | } | |
189 | }; | |
190 | ||
191 | Mirror::Mirror(CephContext *cct, const std::vector<const char*> &args, | |
192 | MonClient *monc, Messenger *msgr) | |
193 | : m_cct(cct), | |
194 | m_args(args), | |
195 | m_monc(monc), | |
196 | m_msgr(msgr), | |
197 | m_listener(this), | |
198 | m_last_blocklist_check(ceph_clock_now()), | |
199 | m_last_failure_check(ceph_clock_now()), | |
200 | m_local(new librados::Rados()) { | |
201 | auto thread_pool = &(cct->lookup_or_create_singleton_object<ThreadPoolSingleton>( | |
202 | "cephfs::mirror::thread_pool", false, cct)); | |
203 | auto safe_timer = &(cct->lookup_or_create_singleton_object<SafeTimerSingleton>( | |
204 | "cephfs::mirror::safe_timer", false, cct)); | |
205 | m_thread_pool = thread_pool; | |
206 | m_work_queue = thread_pool->work_queue; | |
207 | m_timer = safe_timer; | |
208 | m_timer_lock = &safe_timer->timer_lock; | |
209 | std::scoped_lock timer_lock(*m_timer_lock); | |
210 | schedule_mirror_update_task(); | |
211 | } | |
212 | ||
213 | Mirror::~Mirror() { | |
214 | dout(10) << dendl; | |
215 | { | |
216 | std::scoped_lock timer_lock(*m_timer_lock); | |
217 | m_timer->shutdown(); | |
218 | } | |
219 | ||
220 | m_work_queue->drain(); | |
221 | delete m_work_queue; | |
222 | { | |
223 | std::scoped_lock locker(m_lock); | |
224 | m_thread_pool->stop(); | |
225 | m_cluster_watcher.reset(); | |
226 | } | |
227 | } | |
228 | ||
229 | int Mirror::init_mon_client() { | |
230 | dout(20) << dendl; | |
231 | ||
232 | m_monc->set_messenger(m_msgr); | |
233 | m_msgr->add_dispatcher_head(m_monc); | |
234 | m_monc->set_want_keys(CEPH_ENTITY_TYPE_MON); | |
235 | ||
236 | int r = m_monc->init(); | |
237 | if (r < 0) { | |
238 | derr << ": failed to init mon client: " << cpp_strerror(r) << dendl; | |
239 | return r; | |
240 | } | |
241 | ||
242 | r = m_monc->authenticate(m_cct->_conf->client_mount_timeout); | |
243 | if (r < 0) { | |
244 | derr << ": failed to authenticate to monitor: " << cpp_strerror(r) << dendl; | |
245 | return r; | |
246 | } | |
247 | ||
248 | client_t me = m_monc->get_global_id(); | |
249 | m_msgr->set_myname(entity_name_t::CLIENT(me.v)); | |
250 | return 0; | |
251 | } | |
252 | ||
253 | int Mirror::init(std::string &reason) { | |
254 | dout(20) << dendl; | |
255 | ||
256 | std::scoped_lock locker(m_lock); | |
257 | ||
258 | int r = m_local->init_with_context(m_cct); | |
259 | if (r < 0) { | |
260 | derr << ": could not initialize rados handler" << dendl; | |
261 | return r; | |
262 | } | |
263 | ||
264 | r = m_local->connect(); | |
265 | if (r < 0) { | |
266 | derr << ": error connecting to local cluster" << dendl; | |
267 | return r; | |
268 | } | |
269 | ||
270 | m_service_daemon = std::make_unique<ServiceDaemon>(m_cct, m_local); | |
271 | r = m_service_daemon->init(); | |
272 | if (r < 0) { | |
273 | derr << ": error registering service daemon: " << cpp_strerror(r) << dendl; | |
274 | return r; | |
275 | } | |
276 | ||
277 | r = init_mon_client(); | |
278 | if (r < 0) { | |
279 | return r; | |
280 | } | |
281 | ||
282 | return 0; | |
283 | } | |
284 | ||
285 | void Mirror::shutdown() { | |
286 | dout(20) << dendl; | |
f67539c2 TL |
287 | m_stopping = true; |
288 | m_cond.notify_all(); | |
289 | } | |
290 | ||
b3b6e05e TL |
291 | void Mirror::reopen_logs() { |
292 | for (auto &[filesystem, mirror_action] : m_mirror_actions) { | |
293 | mirror_action.fs_mirror->reopen_logs(); | |
294 | } | |
295 | g_ceph_context->reopen_logs(); | |
296 | } | |
297 | ||
f67539c2 TL |
298 | void Mirror::handle_signal(int signum) { |
299 | dout(10) << ": signal=" << signum << dendl; | |
b3b6e05e TL |
300 | |
301 | std::scoped_lock locker(m_lock); | |
302 | switch (signum) { | |
303 | case SIGHUP: | |
304 | reopen_logs(); | |
305 | break; | |
306 | case SIGINT: | |
307 | case SIGTERM: | |
308 | shutdown(); | |
309 | break; | |
310 | default: | |
311 | ceph_abort_msgf("unexpected signal %d", signum); | |
312 | } | |
f67539c2 TL |
313 | } |
314 | ||
315 | void Mirror::handle_enable_mirroring(const Filesystem &filesystem, | |
316 | const Peers &peers, int r) { | |
317 | dout(20) << ": filesystem=" << filesystem << ", peers=" << peers | |
318 | << ", r=" << r << dendl; | |
319 | ||
320 | std::scoped_lock locker(m_lock); | |
321 | auto &mirror_action = m_mirror_actions.at(filesystem); | |
322 | ceph_assert(mirror_action.action_in_progress); | |
323 | ||
324 | mirror_action.action_in_progress = false; | |
325 | m_cond.notify_all(); | |
326 | if (r < 0) { | |
327 | derr << ": failed to initialize FSMirror for filesystem=" << filesystem | |
328 | << ": " << cpp_strerror(r) << dendl; | |
329 | m_service_daemon->add_or_update_fs_attribute(filesystem.fscid, | |
330 | SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY, | |
331 | true); | |
332 | return; | |
333 | } | |
334 | ||
335 | for (auto &peer : peers) { | |
336 | mirror_action.fs_mirror->add_peer(peer); | |
337 | } | |
338 | ||
339 | dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl; | |
340 | } | |
341 | ||
342 | void Mirror::handle_enable_mirroring(const Filesystem &filesystem, int r) { | |
343 | dout(20) << ": filesystem=" << filesystem << ", r=" << r << dendl; | |
344 | ||
345 | std::scoped_lock locker(m_lock); | |
346 | auto &mirror_action = m_mirror_actions.at(filesystem); | |
347 | ceph_assert(mirror_action.action_in_progress); | |
348 | ||
349 | mirror_action.action_in_progress = false; | |
350 | m_cond.notify_all(); | |
351 | if (r < 0) { | |
352 | derr << ": failed to initialize FSMirror for filesystem=" << filesystem | |
353 | << ": " << cpp_strerror(r) << dendl; | |
354 | m_service_daemon->add_or_update_fs_attribute(filesystem.fscid, | |
355 | SERVICE_DAEMON_MIRROR_ENABLE_FAILED_KEY, | |
356 | true); | |
357 | return; | |
358 | } | |
359 | ||
360 | dout(10) << ": Initialized FSMirror for filesystem=" << filesystem << dendl; | |
361 | } | |
362 | ||
363 | void Mirror::enable_mirroring(const Filesystem &filesystem, uint64_t local_pool_id, | |
364 | Context *on_finish, bool is_restart) { | |
365 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
366 | ||
367 | auto &mirror_action = m_mirror_actions.at(filesystem); | |
368 | if (is_restart) { | |
369 | mirror_action.fs_mirror.reset(); | |
370 | } else { | |
371 | ceph_assert(!mirror_action.action_in_progress); | |
372 | } | |
373 | ||
374 | ceph_assert(!mirror_action.fs_mirror); | |
375 | ||
376 | dout(10) << ": starting FSMirror: filesystem=" << filesystem << dendl; | |
377 | ||
378 | mirror_action.action_in_progress = true; | |
379 | mirror_action.fs_mirror = std::make_unique<FSMirror>(m_cct, filesystem, local_pool_id, | |
380 | m_service_daemon.get(), m_args, m_work_queue); | |
381 | mirror_action.fs_mirror->init(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish)); | |
382 | } | |
383 | ||
384 | void Mirror::mirroring_enabled(const Filesystem &filesystem, uint64_t local_pool_id) { | |
385 | dout(10) << ": filesystem=" << filesystem << ", pool_id=" << local_pool_id << dendl; | |
386 | ||
387 | std::scoped_lock locker(m_lock); | |
388 | if (m_stopping) { | |
389 | return; | |
390 | } | |
391 | ||
392 | auto p = m_mirror_actions.emplace(filesystem, MirrorAction(local_pool_id)); | |
393 | auto &mirror_action = p.first->second; | |
394 | mirror_action.action_ctxs.push_back(new C_EnableMirroring(this, filesystem, local_pool_id)); | |
395 | } | |
396 | ||
397 | void Mirror::handle_disable_mirroring(const Filesystem &filesystem, int r) { | |
398 | dout(10) << ": filesystem=" << filesystem << ", r=" << r << dendl; | |
399 | ||
400 | std::scoped_lock locker(m_lock); | |
401 | auto &mirror_action = m_mirror_actions.at(filesystem); | |
402 | ||
403 | if (!mirror_action.fs_mirror->is_init_failed()) { | |
404 | ceph_assert(mirror_action.action_in_progress); | |
405 | mirror_action.action_in_progress = false; | |
406 | m_cond.notify_all(); | |
407 | } | |
408 | ||
409 | if (!m_stopping) { | |
410 | mirror_action.fs_mirror.reset(); | |
411 | if (mirror_action.action_ctxs.empty()) { | |
412 | dout(10) << ": no pending actions for filesystem=" << filesystem << dendl; | |
413 | m_mirror_actions.erase(filesystem); | |
414 | } | |
415 | } | |
416 | } | |
417 | ||
418 | void Mirror::disable_mirroring(const Filesystem &filesystem, Context *on_finish) { | |
419 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
420 | ||
421 | auto &mirror_action = m_mirror_actions.at(filesystem); | |
422 | ceph_assert(mirror_action.fs_mirror); | |
423 | ceph_assert(!mirror_action.action_in_progress); | |
424 | ||
425 | if (mirror_action.fs_mirror->is_init_failed()) { | |
426 | dout(10) << ": init failed for filesystem=" << filesystem << dendl; | |
427 | m_work_queue->queue(on_finish, -EINVAL); | |
428 | return; | |
429 | } | |
430 | ||
431 | mirror_action.action_in_progress = true; | |
432 | mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, on_finish)); | |
433 | } | |
434 | ||
435 | void Mirror::mirroring_disabled(const Filesystem &filesystem) { | |
436 | dout(10) << ": filesystem=" << filesystem << dendl; | |
437 | ||
438 | std::scoped_lock locker(m_lock); | |
439 | if (m_stopping) { | |
440 | dout(5) << "shutting down" << dendl; | |
441 | return; | |
442 | } | |
443 | ||
444 | auto &mirror_action = m_mirror_actions.at(filesystem); | |
445 | mirror_action.action_ctxs.push_back(new C_DisableMirroring(this, filesystem)); | |
446 | } | |
447 | ||
448 | void Mirror::add_peer(const Filesystem &filesystem, const Peer &peer) { | |
449 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
450 | ||
451 | auto &mirror_action = m_mirror_actions.at(filesystem); | |
452 | ceph_assert(mirror_action.fs_mirror); | |
453 | ceph_assert(!mirror_action.action_in_progress); | |
454 | ||
455 | mirror_action.fs_mirror->add_peer(peer); | |
456 | } | |
457 | ||
458 | void Mirror::peer_added(const Filesystem &filesystem, const Peer &peer) { | |
459 | dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl; | |
460 | ||
461 | std::scoped_lock locker(m_lock); | |
462 | if (m_stopping) { | |
463 | dout(5) << "shutting down" << dendl; | |
464 | return; | |
465 | } | |
466 | ||
467 | auto &mirror_action = m_mirror_actions.at(filesystem); | |
468 | mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer)); | |
469 | } | |
470 | ||
471 | void Mirror::remove_peer(const Filesystem &filesystem, const Peer &peer) { | |
472 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
473 | ||
474 | auto &mirror_action = m_mirror_actions.at(filesystem); | |
475 | ceph_assert(mirror_action.fs_mirror); | |
476 | ceph_assert(!mirror_action.action_in_progress); | |
477 | ||
478 | mirror_action.fs_mirror->remove_peer(peer); | |
479 | } | |
480 | ||
481 | void Mirror::peer_removed(const Filesystem &filesystem, const Peer &peer) { | |
482 | dout(20) << ": filesystem=" << filesystem << ", peer=" << peer << dendl; | |
483 | ||
484 | std::scoped_lock locker(m_lock); | |
485 | if (m_stopping) { | |
486 | dout(5) << "shutting down" << dendl; | |
487 | return; | |
488 | } | |
489 | ||
490 | auto &mirror_action = m_mirror_actions.at(filesystem); | |
491 | mirror_action.action_ctxs.push_back(new C_PeerUpdate(this, filesystem, peer, true)); | |
492 | } | |
493 | ||
494 | void Mirror::update_fs_mirrors() { | |
495 | dout(20) << dendl; | |
496 | ||
497 | auto now = ceph_clock_now(); | |
498 | double blocklist_interval = g_ceph_context->_conf.get_val<std::chrono::seconds> | |
499 | ("cephfs_mirror_restart_mirror_on_blocklist_interval").count(); | |
500 | bool check_blocklist = blocklist_interval > 0 && ((now - m_last_blocklist_check) >= blocklist_interval); | |
501 | ||
502 | double failed_interval = g_ceph_context->_conf.get_val<std::chrono::seconds> | |
503 | ("cephfs_mirror_restart_mirror_on_failure_interval").count(); | |
504 | bool check_failure = failed_interval > 0 && ((now - m_last_failure_check) >= failed_interval); | |
505 | ||
506 | { | |
507 | std::scoped_lock locker(m_lock); | |
508 | for (auto &[filesystem, mirror_action] : m_mirror_actions) { | |
509 | if (check_failure && !mirror_action.action_in_progress && | |
510 | mirror_action.fs_mirror && mirror_action.fs_mirror->is_failed()) { | |
511 | // about to restart failed mirror instance -- nothing | |
512 | // should interfere | |
513 | dout(5) << ": filesystem=" << filesystem << " failed mirroring -- restarting" << dendl; | |
514 | auto peers = mirror_action.fs_mirror->get_peers(); | |
515 | mirror_action.action_ctxs.push_front( | |
516 | new C_RestartMirroring(this, filesystem, mirror_action.pool_id, peers)); | |
517 | } else if (check_blocklist && !mirror_action.action_in_progress && | |
518 | mirror_action.fs_mirror && mirror_action.fs_mirror->is_blocklisted()) { | |
519 | // about to restart blocklisted mirror instance -- nothing | |
520 | // should interfere | |
521 | dout(5) << ": filesystem=" << filesystem << " is blocklisted -- restarting" << dendl; | |
522 | auto peers = mirror_action.fs_mirror->get_peers(); | |
523 | mirror_action.action_ctxs.push_front( | |
524 | new C_RestartMirroring(this, filesystem, mirror_action.pool_id, peers)); | |
525 | } | |
526 | if (!mirror_action.action_ctxs.empty() && !mirror_action.action_in_progress) { | |
527 | auto ctx = std::move(mirror_action.action_ctxs.front()); | |
528 | mirror_action.action_ctxs.pop_front(); | |
529 | ctx->complete(0); | |
530 | } | |
531 | } | |
532 | ||
533 | if (check_blocklist) { | |
534 | m_last_blocklist_check = now; | |
535 | } | |
536 | if (check_failure) { | |
537 | m_last_failure_check = now; | |
538 | } | |
539 | } | |
540 | ||
541 | schedule_mirror_update_task(); | |
542 | } | |
543 | ||
544 | void Mirror::schedule_mirror_update_task() { | |
545 | ceph_assert(m_timer_task == nullptr); | |
546 | ceph_assert(ceph_mutex_is_locked(*m_timer_lock)); | |
547 | ||
548 | m_timer_task = new LambdaContext([this](int _) { | |
549 | m_timer_task = nullptr; | |
550 | update_fs_mirrors(); | |
551 | }); | |
552 | double after = g_ceph_context->_conf.get_val<std::chrono::seconds> | |
553 | ("cephfs_mirror_action_update_interval").count(); | |
554 | dout(20) << ": scheduling fs mirror update (" << m_timer_task << ") after " | |
555 | << after << " seconds" << dendl; | |
556 | m_timer->add_event_after(after, m_timer_task); | |
557 | } | |
558 | ||
559 | void Mirror::run() { | |
560 | dout(20) << dendl; | |
561 | ||
562 | std::unique_lock locker(m_lock); | |
563 | m_cluster_watcher.reset(new ClusterWatcher(m_cct, m_monc, m_service_daemon.get(), m_listener)); | |
564 | m_msgr->add_dispatcher_tail(m_cluster_watcher.get()); | |
565 | ||
566 | m_cluster_watcher->init(); | |
567 | m_cond.wait(locker, [this]{return m_stopping;}); | |
568 | ||
569 | locker.unlock(); | |
570 | { | |
571 | std::scoped_lock timer_lock(*m_timer_lock); | |
572 | if (m_timer_task != nullptr) { | |
573 | dout(10) << ": canceling timer task=" << m_timer_task << dendl; | |
574 | m_timer->cancel_event(m_timer_task); | |
575 | m_timer_task = nullptr; | |
576 | } | |
577 | } | |
578 | locker.lock(); | |
579 | ||
580 | for (auto &[filesystem, mirror_action] : m_mirror_actions) { | |
581 | dout(10) << ": trying to shutdown filesystem=" << filesystem << dendl; | |
582 | // wait for in-progress action and shutdown | |
583 | m_cond.wait(locker, [&mirror_action=mirror_action] | |
584 | {return !mirror_action.action_in_progress;}); | |
585 | if (mirror_action.fs_mirror && | |
586 | !mirror_action.fs_mirror->is_stopping() && | |
587 | !mirror_action.fs_mirror->is_init_failed()) { | |
588 | C_SaferCond cond; | |
589 | mirror_action.fs_mirror->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, &cond)); | |
590 | int r = cond.wait(); | |
591 | dout(10) << ": shutdown filesystem=" << filesystem << ", r=" << r << dendl; | |
592 | } | |
593 | ||
594 | mirror_action.fs_mirror.reset(); | |
595 | } | |
596 | } | |
597 | ||
598 | } // namespace mirror | |
599 | } // namespace cephfs | |
600 |