]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/cephfs_mirror/FSMirror.cc
a0bf75d1e29778a207e48d9c0fd700dca43e629b
[ceph.git] / ceph / src / tools / cephfs_mirror / FSMirror.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/admin_socket.h"
5 #include "common/ceph_argparse.h"
6 #include "common/ceph_context.h"
7 #include "common/common_init.h"
8 #include "common/debug.h"
9 #include "common/errno.h"
10 #include "common/WorkQueue.h"
11 #include "include/stringify.h"
12 #include "msg/Messenger.h"
13 #include "FSMirror.h"
14 #include "PeerReplayer.h"
15 #include "aio_utils.h"
16 #include "ServiceDaemon.h"
17 #include "Utils.h"
18
19 #include "common/Cond.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_cephfs_mirror
23 #undef dout_prefix
24 #define dout_prefix *_dout << "cephfs::mirror::FSMirror " << __func__
25
26 using namespace std;
27
28 namespace cephfs {
29 namespace mirror {
30
31 namespace {
32
33 const std::string SERVICE_DAEMON_DIR_COUNT_KEY("directory_count");
34 const std::string SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed");
35
36 class MirrorAdminSocketCommand {
37 public:
38 virtual ~MirrorAdminSocketCommand() {
39 }
40 virtual int call(Formatter *f) = 0;
41 };
42
43 class StatusCommand : public MirrorAdminSocketCommand {
44 public:
45 explicit StatusCommand(FSMirror *fs_mirror)
46 : fs_mirror(fs_mirror) {
47 }
48
49 int call(Formatter *f) override {
50 fs_mirror->mirror_status(f);
51 return 0;
52 }
53
54 private:
55 FSMirror *fs_mirror;
56 };
57
58 } // anonymous namespace
59
60 class MirrorAdminSocketHook : public AdminSocketHook {
61 public:
62 MirrorAdminSocketHook(CephContext *cct, const Filesystem &filesystem, FSMirror *fs_mirror)
63 : admin_socket(cct->get_admin_socket()) {
64 int r;
65 std::string cmd;
66
67 // mirror status format is name@fscid
68 cmd = "fs mirror status " + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid);
69 r = admin_socket->register_command(
70 cmd, this, "get filesystem mirror status");
71 if (r == 0) {
72 commands[cmd] = new StatusCommand(fs_mirror);
73 }
74 }
75
76 ~MirrorAdminSocketHook() override {
77 admin_socket->unregister_commands(this);
78 for (auto &[command, cmdptr] : commands) {
79 delete cmdptr;
80 }
81 }
82
83 int call(std::string_view command, const cmdmap_t& cmdmap,
84 Formatter *f, std::ostream &errss, bufferlist &out) override {
85 auto p = commands.at(std::string(command));
86 return p->call(f);
87 }
88
89 private:
90 typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
91
92 AdminSocket *admin_socket;
93 Commands commands;
94 };
95
96 FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
97 ServiceDaemon *service_daemon, std::vector<const char*> args,
98 ContextWQ *work_queue)
99 : m_cct(cct),
100 m_filesystem(filesystem),
101 m_pool_id(pool_id),
102 m_service_daemon(service_daemon),
103 m_args(args),
104 m_work_queue(work_queue),
105 m_snap_listener(this),
106 m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) {
107 m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
108 (uint64_t)0);
109 }
110
111 FSMirror::~FSMirror() {
112 dout(20) << dendl;
113
114 {
115 std::scoped_lock locker(m_lock);
116 delete m_instance_watcher;
117 delete m_mirror_watcher;
118 }
119 // outside the lock so that in-progress commands can acquire
120 // lock and finish executing.
121 delete m_asok_hook;
122 }
123
124 int FSMirror::init_replayer(PeerReplayer *peer_replayer) {
125 ceph_assert(ceph_mutex_is_locked(m_lock));
126 return peer_replayer->init();
127 }
128
129 void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) {
130 peer_replayer->shutdown();
131 }
132
133 void FSMirror::cleanup() {
134 dout(20) << dendl;
135 ceph_unmount(m_mount);
136 ceph_release(m_mount);
137 m_ioctx.close();
138 m_cluster.reset();
139 }
140
141 void FSMirror::reopen_logs() {
142 std::scoped_lock locker(m_lock);
143
144 if (m_cluster) {
145 reinterpret_cast<CephContext *>(m_cluster->cct())->reopen_logs();
146 }
147 for (auto &[peer, replayer] : m_peer_replayers) {
148 replayer->reopen_logs();
149 }
150 }
151
152 void FSMirror::init(Context *on_finish) {
153 dout(20) << dendl;
154
155 std::scoped_lock locker(m_lock);
156 int r = connect(g_ceph_context->_conf->name.to_str(),
157 g_ceph_context->_conf->cluster, &m_cluster, "", "", m_args);
158 if (r < 0) {
159 m_init_failed = true;
160 on_finish->complete(r);
161 return;
162 }
163
164 r = m_cluster->ioctx_create2(m_pool_id, m_ioctx);
165 if (r < 0) {
166 m_init_failed = true;
167 m_cluster.reset();
168 derr << ": error accessing local pool (id=" << m_pool_id << "): "
169 << cpp_strerror(r) << dendl;
170 on_finish->complete(r);
171 return;
172 }
173
174 r = mount(m_cluster, m_filesystem, true, &m_mount);
175 if (r < 0) {
176 m_init_failed = true;
177 m_ioctx.close();
178 m_cluster.reset();
179 on_finish->complete(r);
180 return;
181 }
182
183 m_addrs = m_cluster->get_addrs();
184 dout(10) << ": rados addrs=" << m_addrs << dendl;
185
186 init_instance_watcher(on_finish);
187 }
188
189 void FSMirror::shutdown(Context *on_finish) {
190 dout(20) << dendl;
191
192 {
193 std::scoped_lock locker(m_lock);
194 m_stopping = true;
195 if (m_on_init_finish != nullptr) {
196 dout(10) << ": delaying shutdown -- init in progress" << dendl;
197 m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
198 if (r < 0) {
199 on_finish->complete(0);
200 return;
201 }
202 m_on_shutdown_finish = on_finish;
203 shutdown_peer_replayers();
204 });
205 return;
206 }
207
208 m_on_shutdown_finish = on_finish;
209 }
210
211 shutdown_peer_replayers();
212 }
213
214 void FSMirror::shutdown_peer_replayers() {
215 dout(20) << dendl;
216
217 for (auto &[peer, peer_replayer] : m_peer_replayers) {
218 dout(5) << ": shutting down replayer for peer=" << peer << dendl;
219 shutdown_replayer(peer_replayer.get());
220 }
221 m_peer_replayers.clear();
222
223 shutdown_mirror_watcher();
224 }
225
226 void FSMirror::init_instance_watcher(Context *on_finish) {
227 dout(20) << dendl;
228
229 m_on_init_finish = new LambdaContext([this, on_finish](int r) {
230 {
231 std::scoped_lock locker(m_lock);
232 if (r < 0) {
233 m_init_failed = true;
234 }
235 }
236 on_finish->complete(r);
237 if (m_on_shutdown_finish != nullptr) {
238 m_on_shutdown_finish->complete(r);
239 }
240 });
241
242 Context *ctx = new C_CallbackAdapter<
243 FSMirror, &FSMirror::handle_init_instance_watcher>(this);
244 m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_work_queue);
245 m_instance_watcher->init(ctx);
246 }
247
248 void FSMirror::handle_init_instance_watcher(int r) {
249 dout(20) << ": r=" << r << dendl;
250
251 Context *on_init_finish = nullptr;
252 {
253 std::scoped_lock locker(m_lock);
254 if (r < 0) {
255 std::swap(on_init_finish, m_on_init_finish);
256 }
257 }
258
259 if (on_init_finish != nullptr) {
260 on_init_finish->complete(r);
261 return;
262 }
263
264 init_mirror_watcher();
265 }
266
267 void FSMirror::init_mirror_watcher() {
268 dout(20) << dendl;
269
270 std::scoped_lock locker(m_lock);
271 Context *ctx = new C_CallbackAdapter<
272 FSMirror, &FSMirror::handle_init_mirror_watcher>(this);
273 m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_work_queue);
274 m_mirror_watcher->init(ctx);
275 }
276
277 void FSMirror::handle_init_mirror_watcher(int r) {
278 dout(20) << ": r=" << r << dendl;
279
280 Context *on_init_finish = nullptr;
281 {
282 std::scoped_lock locker(m_lock);
283 if (r == 0) {
284 std::swap(on_init_finish, m_on_init_finish);
285 }
286 }
287
288 if (on_init_finish != nullptr) {
289 on_init_finish->complete(r);
290 return;
291 }
292
293 m_retval = r; // save errcode for init context callback
294 shutdown_instance_watcher();
295 }
296
297 void FSMirror::shutdown_mirror_watcher() {
298 dout(20) << dendl;
299
300 std::scoped_lock locker(m_lock);
301 Context *ctx = new C_CallbackAdapter<
302 FSMirror, &FSMirror::handle_shutdown_mirror_watcher>(this);
303 m_mirror_watcher->shutdown(ctx);
304 }
305
306 void FSMirror::handle_shutdown_mirror_watcher(int r) {
307 dout(20) << ": r=" << r << dendl;
308
309 shutdown_instance_watcher();
310 }
311
312 void FSMirror::shutdown_instance_watcher() {
313 dout(20) << dendl;
314
315 std::scoped_lock locker(m_lock);
316 Context *ctx = new C_CallbackAdapter<
317 FSMirror, &FSMirror::handle_shutdown_instance_watcher>(this);
318 m_instance_watcher->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, ctx));
319 }
320
321 void FSMirror::handle_shutdown_instance_watcher(int r) {
322 dout(20) << ": r=" << r << dendl;
323
324 cleanup();
325
326 Context *on_init_finish = nullptr;
327 Context *on_shutdown_finish = nullptr;
328
329 {
330 std::scoped_lock locker(m_lock);
331 std::swap(on_init_finish, m_on_init_finish);
332 std::swap(on_shutdown_finish, m_on_shutdown_finish);
333 }
334
335 if (on_init_finish != nullptr) {
336 on_init_finish->complete(m_retval);
337 }
338 if (on_shutdown_finish != nullptr) {
339 on_shutdown_finish->complete(r);
340 }
341 }
342
343 void FSMirror::handle_acquire_directory(string_view dir_path) {
344 dout(5) << ": dir_path=" << dir_path << dendl;
345
346 {
347 std::scoped_lock locker(m_lock);
348 m_directories.emplace(dir_path);
349 m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
350 m_directories.size());
351
352 for (auto &[peer, peer_replayer] : m_peer_replayers) {
353 dout(10) << ": peer=" << peer << dendl;
354 peer_replayer->add_directory(dir_path);
355 }
356 }
357 }
358
359 void FSMirror::handle_release_directory(string_view dir_path) {
360 dout(5) << ": dir_path=" << dir_path << dendl;
361
362 {
363 std::scoped_lock locker(m_lock);
364 auto it = m_directories.find(dir_path);
365 if (it != m_directories.end()) {
366 m_directories.erase(it);
367 m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
368 m_directories.size());
369 for (auto &[peer, peer_replayer] : m_peer_replayers) {
370 dout(10) << ": peer=" << peer << dendl;
371 peer_replayer->remove_directory(dir_path);
372 }
373 }
374 }
375 }
376
377 void FSMirror::add_peer(const Peer &peer) {
378 dout(10) << ": peer=" << peer << dendl;
379
380 std::scoped_lock locker(m_lock);
381 m_all_peers.emplace(peer);
382 if (m_peer_replayers.find(peer) != m_peer_replayers.end()) {
383 return;
384 }
385
386 auto replayer = std::make_unique<PeerReplayer>(
387 m_cct, this, m_cluster, m_filesystem, peer, m_directories, m_mount, m_service_daemon);
388 int r = init_replayer(replayer.get());
389 if (r < 0) {
390 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, peer,
391 SERVICE_DAEMON_PEER_INIT_FAILED_KEY,
392 true);
393 return;
394 }
395 m_peer_replayers.emplace(peer, std::move(replayer));
396 ceph_assert(m_peer_replayers.size() == 1); // support only a single peer
397 }
398
399 void FSMirror::remove_peer(const Peer &peer) {
400 dout(10) << ": peer=" << peer << dendl;
401
402 std::unique_ptr<PeerReplayer> replayer;
403 {
404 std::scoped_lock locker(m_lock);
405 m_all_peers.erase(peer);
406 auto it = m_peer_replayers.find(peer);
407 if (it != m_peer_replayers.end()) {
408 replayer = std::move(it->second);
409 m_peer_replayers.erase(it);
410 }
411 }
412
413 if (replayer) {
414 dout(5) << ": shutting down replayers for peer=" << peer << dendl;
415 shutdown_replayer(replayer.get());
416 }
417 }
418
419 void FSMirror::mirror_status(Formatter *f) {
420 std::scoped_lock locker(m_lock);
421 f->open_object_section("status");
422 if (m_init_failed) {
423 f->dump_string("state", "failed");
424 } else if (is_blocklisted(locker)) {
425 f->dump_string("state", "blocklisted");
426 } else {
427 // dump rados addr for blocklist test
428 f->dump_string("rados_inst", m_addrs);
429 f->open_object_section("peers");
430 for ([[maybe_unused]] auto &[peer, peer_replayer] : m_peer_replayers) {
431 peer.dump(f);
432 }
433 f->close_section(); // peers
434 f->open_object_section("snap_dirs");
435 f->dump_int("dir_count", m_directories.size());
436 f->close_section(); // snap_dirs
437 }
438 f->close_section(); // status
439 }
440
441
442 } // namespace mirror
443 } // namespace cephfs