]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "common/admin_socket.h" | |
5 | #include "common/ceph_argparse.h" | |
6 | #include "common/ceph_context.h" | |
7 | #include "common/common_init.h" | |
8 | #include "common/debug.h" | |
9 | #include "common/errno.h" | |
10 | #include "common/WorkQueue.h" | |
11 | #include "include/stringify.h" | |
12 | #include "msg/Messenger.h" | |
13 | #include "FSMirror.h" | |
14 | #include "PeerReplayer.h" | |
15 | #include "aio_utils.h" | |
16 | #include "ServiceDaemon.h" | |
17 | #include "Utils.h" | |
18 | ||
19 | #include "common/Cond.h" | |
20 | ||
21 | #define dout_context g_ceph_context | |
22 | #define dout_subsys ceph_subsys_cephfs_mirror | |
23 | #undef dout_prefix | |
24 | #define dout_prefix *_dout << "cephfs::mirror::FSMirror " << __func__ | |
25 | ||
20effc67 TL |
26 | using namespace std; |
27 | ||
f67539c2 TL |
28 | namespace cephfs { |
29 | namespace mirror { | |
30 | ||
31 | namespace { | |
32 | ||
33 | const std::string SERVICE_DAEMON_DIR_COUNT_KEY("directory_count"); | |
34 | const std::string SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed"); | |
35 | ||
36 | class MirrorAdminSocketCommand { | |
37 | public: | |
38 | virtual ~MirrorAdminSocketCommand() { | |
39 | } | |
40 | virtual int call(Formatter *f) = 0; | |
41 | }; | |
42 | ||
43 | class StatusCommand : public MirrorAdminSocketCommand { | |
44 | public: | |
45 | explicit StatusCommand(FSMirror *fs_mirror) | |
46 | : fs_mirror(fs_mirror) { | |
47 | } | |
48 | ||
49 | int call(Formatter *f) override { | |
50 | fs_mirror->mirror_status(f); | |
51 | return 0; | |
52 | } | |
53 | ||
54 | private: | |
55 | FSMirror *fs_mirror; | |
56 | }; | |
57 | ||
58 | } // anonymous namespace | |
59 | ||
60 | class MirrorAdminSocketHook : public AdminSocketHook { | |
61 | public: | |
62 | MirrorAdminSocketHook(CephContext *cct, const Filesystem &filesystem, FSMirror *fs_mirror) | |
63 | : admin_socket(cct->get_admin_socket()) { | |
64 | int r; | |
65 | std::string cmd; | |
66 | ||
67 | // mirror status format is name@fscid | |
68 | cmd = "fs mirror status " + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid); | |
69 | r = admin_socket->register_command( | |
70 | cmd, this, "get filesystem mirror status"); | |
71 | if (r == 0) { | |
72 | commands[cmd] = new StatusCommand(fs_mirror); | |
73 | } | |
74 | } | |
75 | ||
76 | ~MirrorAdminSocketHook() override { | |
77 | admin_socket->unregister_commands(this); | |
78 | for (auto &[command, cmdptr] : commands) { | |
79 | delete cmdptr; | |
80 | } | |
81 | } | |
82 | ||
83 | int call(std::string_view command, const cmdmap_t& cmdmap, | |
84 | Formatter *f, std::ostream &errss, bufferlist &out) override { | |
85 | auto p = commands.at(std::string(command)); | |
86 | return p->call(f); | |
87 | } | |
88 | ||
89 | private: | |
90 | typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands; | |
91 | ||
92 | AdminSocket *admin_socket; | |
93 | Commands commands; | |
94 | }; | |
95 | ||
96 | FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id, | |
97 | ServiceDaemon *service_daemon, std::vector<const char*> args, | |
98 | ContextWQ *work_queue) | |
99 | : m_cct(cct), | |
100 | m_filesystem(filesystem), | |
101 | m_pool_id(pool_id), | |
102 | m_service_daemon(service_daemon), | |
103 | m_args(args), | |
104 | m_work_queue(work_queue), | |
105 | m_snap_listener(this), | |
106 | m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) { | |
107 | m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, | |
108 | (uint64_t)0); | |
109 | } | |
110 | ||
111 | FSMirror::~FSMirror() { | |
112 | dout(20) << dendl; | |
113 | ||
114 | { | |
115 | std::scoped_lock locker(m_lock); | |
116 | delete m_instance_watcher; | |
117 | delete m_mirror_watcher; | |
118 | } | |
119 | // outside the lock so that in-progress commands can acquire | |
120 | // lock and finish executing. | |
121 | delete m_asok_hook; | |
122 | } | |
123 | ||
124 | int FSMirror::init_replayer(PeerReplayer *peer_replayer) { | |
125 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
126 | return peer_replayer->init(); | |
127 | } | |
128 | ||
129 | void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) { | |
130 | peer_replayer->shutdown(); | |
131 | } | |
132 | ||
133 | void FSMirror::cleanup() { | |
134 | dout(20) << dendl; | |
135 | ceph_unmount(m_mount); | |
136 | ceph_release(m_mount); | |
137 | m_ioctx.close(); | |
138 | m_cluster.reset(); | |
139 | } | |
140 | ||
b3b6e05e TL |
141 | void FSMirror::reopen_logs() { |
142 | std::scoped_lock locker(m_lock); | |
143 | ||
144 | if (m_cluster) { | |
145 | reinterpret_cast<CephContext *>(m_cluster->cct())->reopen_logs(); | |
146 | } | |
147 | for (auto &[peer, replayer] : m_peer_replayers) { | |
148 | replayer->reopen_logs(); | |
149 | } | |
150 | } | |
151 | ||
f67539c2 TL |
152 | void FSMirror::init(Context *on_finish) { |
153 | dout(20) << dendl; | |
154 | ||
155 | std::scoped_lock locker(m_lock); | |
156 | int r = connect(g_ceph_context->_conf->name.to_str(), | |
b3b6e05e | 157 | g_ceph_context->_conf->cluster, &m_cluster, "", "", m_args); |
f67539c2 TL |
158 | if (r < 0) { |
159 | m_init_failed = true; | |
160 | on_finish->complete(r); | |
161 | return; | |
162 | } | |
163 | ||
164 | r = m_cluster->ioctx_create2(m_pool_id, m_ioctx); | |
165 | if (r < 0) { | |
166 | m_init_failed = true; | |
167 | m_cluster.reset(); | |
168 | derr << ": error accessing local pool (id=" << m_pool_id << "): " | |
169 | << cpp_strerror(r) << dendl; | |
170 | on_finish->complete(r); | |
171 | return; | |
172 | } | |
173 | ||
174 | r = mount(m_cluster, m_filesystem, true, &m_mount); | |
175 | if (r < 0) { | |
176 | m_init_failed = true; | |
177 | m_ioctx.close(); | |
178 | m_cluster.reset(); | |
179 | on_finish->complete(r); | |
180 | return; | |
181 | } | |
182 | ||
183 | m_addrs = m_cluster->get_addrs(); | |
184 | dout(10) << ": rados addrs=" << m_addrs << dendl; | |
185 | ||
186 | init_instance_watcher(on_finish); | |
187 | } | |
188 | ||
189 | void FSMirror::shutdown(Context *on_finish) { | |
190 | dout(20) << dendl; | |
191 | ||
192 | { | |
193 | std::scoped_lock locker(m_lock); | |
194 | m_stopping = true; | |
195 | if (m_on_init_finish != nullptr) { | |
196 | dout(10) << ": delaying shutdown -- init in progress" << dendl; | |
197 | m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) { | |
198 | if (r < 0) { | |
199 | on_finish->complete(0); | |
200 | return; | |
201 | } | |
202 | m_on_shutdown_finish = on_finish; | |
203 | shutdown_peer_replayers(); | |
204 | }); | |
205 | return; | |
206 | } | |
207 | ||
208 | m_on_shutdown_finish = on_finish; | |
209 | } | |
210 | ||
211 | shutdown_peer_replayers(); | |
212 | } | |
213 | ||
214 | void FSMirror::shutdown_peer_replayers() { | |
215 | dout(20) << dendl; | |
216 | ||
217 | for (auto &[peer, peer_replayer] : m_peer_replayers) { | |
218 | dout(5) << ": shutting down replayer for peer=" << peer << dendl; | |
219 | shutdown_replayer(peer_replayer.get()); | |
220 | } | |
221 | m_peer_replayers.clear(); | |
222 | ||
223 | shutdown_mirror_watcher(); | |
224 | } | |
225 | ||
226 | void FSMirror::init_instance_watcher(Context *on_finish) { | |
227 | dout(20) << dendl; | |
228 | ||
229 | m_on_init_finish = new LambdaContext([this, on_finish](int r) { | |
230 | { | |
231 | std::scoped_lock locker(m_lock); | |
232 | if (r < 0) { | |
233 | m_init_failed = true; | |
234 | } | |
235 | } | |
236 | on_finish->complete(r); | |
237 | if (m_on_shutdown_finish != nullptr) { | |
238 | m_on_shutdown_finish->complete(r); | |
239 | } | |
240 | }); | |
241 | ||
242 | Context *ctx = new C_CallbackAdapter< | |
243 | FSMirror, &FSMirror::handle_init_instance_watcher>(this); | |
244 | m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_work_queue); | |
245 | m_instance_watcher->init(ctx); | |
246 | } | |
247 | ||
248 | void FSMirror::handle_init_instance_watcher(int r) { | |
249 | dout(20) << ": r=" << r << dendl; | |
250 | ||
251 | Context *on_init_finish = nullptr; | |
252 | { | |
253 | std::scoped_lock locker(m_lock); | |
254 | if (r < 0) { | |
255 | std::swap(on_init_finish, m_on_init_finish); | |
256 | } | |
257 | } | |
258 | ||
259 | if (on_init_finish != nullptr) { | |
260 | on_init_finish->complete(r); | |
261 | return; | |
262 | } | |
263 | ||
264 | init_mirror_watcher(); | |
265 | } | |
266 | ||
267 | void FSMirror::init_mirror_watcher() { | |
268 | dout(20) << dendl; | |
269 | ||
270 | std::scoped_lock locker(m_lock); | |
271 | Context *ctx = new C_CallbackAdapter< | |
272 | FSMirror, &FSMirror::handle_init_mirror_watcher>(this); | |
273 | m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_work_queue); | |
274 | m_mirror_watcher->init(ctx); | |
275 | } | |
276 | ||
277 | void FSMirror::handle_init_mirror_watcher(int r) { | |
278 | dout(20) << ": r=" << r << dendl; | |
279 | ||
280 | Context *on_init_finish = nullptr; | |
281 | { | |
282 | std::scoped_lock locker(m_lock); | |
283 | if (r == 0) { | |
284 | std::swap(on_init_finish, m_on_init_finish); | |
285 | } | |
286 | } | |
287 | ||
288 | if (on_init_finish != nullptr) { | |
289 | on_init_finish->complete(r); | |
290 | return; | |
291 | } | |
292 | ||
293 | m_retval = r; // save errcode for init context callback | |
294 | shutdown_instance_watcher(); | |
295 | } | |
296 | ||
297 | void FSMirror::shutdown_mirror_watcher() { | |
298 | dout(20) << dendl; | |
299 | ||
300 | std::scoped_lock locker(m_lock); | |
301 | Context *ctx = new C_CallbackAdapter< | |
302 | FSMirror, &FSMirror::handle_shutdown_mirror_watcher>(this); | |
303 | m_mirror_watcher->shutdown(ctx); | |
304 | } | |
305 | ||
306 | void FSMirror::handle_shutdown_mirror_watcher(int r) { | |
307 | dout(20) << ": r=" << r << dendl; | |
308 | ||
309 | shutdown_instance_watcher(); | |
310 | } | |
311 | ||
312 | void FSMirror::shutdown_instance_watcher() { | |
313 | dout(20) << dendl; | |
314 | ||
315 | std::scoped_lock locker(m_lock); | |
316 | Context *ctx = new C_CallbackAdapter< | |
317 | FSMirror, &FSMirror::handle_shutdown_instance_watcher>(this); | |
318 | m_instance_watcher->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, ctx)); | |
319 | } | |
320 | ||
321 | void FSMirror::handle_shutdown_instance_watcher(int r) { | |
322 | dout(20) << ": r=" << r << dendl; | |
323 | ||
324 | cleanup(); | |
325 | ||
326 | Context *on_init_finish = nullptr; | |
327 | Context *on_shutdown_finish = nullptr; | |
328 | ||
329 | { | |
330 | std::scoped_lock locker(m_lock); | |
331 | std::swap(on_init_finish, m_on_init_finish); | |
332 | std::swap(on_shutdown_finish, m_on_shutdown_finish); | |
333 | } | |
334 | ||
335 | if (on_init_finish != nullptr) { | |
336 | on_init_finish->complete(m_retval); | |
337 | } | |
338 | if (on_shutdown_finish != nullptr) { | |
339 | on_shutdown_finish->complete(r); | |
340 | } | |
341 | } | |
342 | ||
343 | void FSMirror::handle_acquire_directory(string_view dir_path) { | |
344 | dout(5) << ": dir_path=" << dir_path << dendl; | |
345 | ||
346 | { | |
347 | std::scoped_lock locker(m_lock); | |
348 | m_directories.emplace(dir_path); | |
349 | m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, | |
350 | m_directories.size()); | |
351 | ||
352 | for (auto &[peer, peer_replayer] : m_peer_replayers) { | |
353 | dout(10) << ": peer=" << peer << dendl; | |
354 | peer_replayer->add_directory(dir_path); | |
355 | } | |
356 | } | |
357 | } | |
358 | ||
359 | void FSMirror::handle_release_directory(string_view dir_path) { | |
360 | dout(5) << ": dir_path=" << dir_path << dendl; | |
361 | ||
362 | { | |
363 | std::scoped_lock locker(m_lock); | |
364 | auto it = m_directories.find(dir_path); | |
365 | if (it != m_directories.end()) { | |
366 | m_directories.erase(it); | |
367 | m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY, | |
368 | m_directories.size()); | |
369 | for (auto &[peer, peer_replayer] : m_peer_replayers) { | |
370 | dout(10) << ": peer=" << peer << dendl; | |
371 | peer_replayer->remove_directory(dir_path); | |
372 | } | |
373 | } | |
374 | } | |
375 | } | |
376 | ||
377 | void FSMirror::add_peer(const Peer &peer) { | |
378 | dout(10) << ": peer=" << peer << dendl; | |
379 | ||
380 | std::scoped_lock locker(m_lock); | |
381 | m_all_peers.emplace(peer); | |
382 | if (m_peer_replayers.find(peer) != m_peer_replayers.end()) { | |
383 | return; | |
384 | } | |
385 | ||
386 | auto replayer = std::make_unique<PeerReplayer>( | |
387 | m_cct, this, m_cluster, m_filesystem, peer, m_directories, m_mount, m_service_daemon); | |
388 | int r = init_replayer(replayer.get()); | |
389 | if (r < 0) { | |
390 | m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, peer, | |
391 | SERVICE_DAEMON_PEER_INIT_FAILED_KEY, | |
392 | true); | |
393 | return; | |
394 | } | |
395 | m_peer_replayers.emplace(peer, std::move(replayer)); | |
396 | ceph_assert(m_peer_replayers.size() == 1); // support only a single peer | |
397 | } | |
398 | ||
399 | void FSMirror::remove_peer(const Peer &peer) { | |
400 | dout(10) << ": peer=" << peer << dendl; | |
401 | ||
402 | std::unique_ptr<PeerReplayer> replayer; | |
403 | { | |
404 | std::scoped_lock locker(m_lock); | |
405 | m_all_peers.erase(peer); | |
406 | auto it = m_peer_replayers.find(peer); | |
407 | if (it != m_peer_replayers.end()) { | |
408 | replayer = std::move(it->second); | |
409 | m_peer_replayers.erase(it); | |
410 | } | |
411 | } | |
412 | ||
413 | if (replayer) { | |
414 | dout(5) << ": shutting down replayers for peer=" << peer << dendl; | |
415 | shutdown_replayer(replayer.get()); | |
416 | } | |
417 | } | |
418 | ||
419 | void FSMirror::mirror_status(Formatter *f) { | |
420 | std::scoped_lock locker(m_lock); | |
421 | f->open_object_section("status"); | |
422 | if (m_init_failed) { | |
423 | f->dump_string("state", "failed"); | |
424 | } else if (is_blocklisted(locker)) { | |
425 | f->dump_string("state", "blocklisted"); | |
426 | } else { | |
427 | // dump rados addr for blocklist test | |
428 | f->dump_string("rados_inst", m_addrs); | |
429 | f->open_object_section("peers"); | |
430 | for ([[maybe_unused]] auto &[peer, peer_replayer] : m_peer_replayers) { | |
431 | peer.dump(f); | |
432 | } | |
433 | f->close_section(); // peers | |
434 | f->open_object_section("snap_dirs"); | |
435 | f->dump_int("dir_count", m_directories.size()); | |
436 | f->close_section(); // snap_dirs | |
437 | } | |
438 | f->close_section(); // status | |
439 | } | |
440 | ||
441 | ||
442 | } // namespace mirror | |
443 | } // namespace cephfs |