]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/cephfs_mirror/FSMirror.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / tools / cephfs_mirror / FSMirror.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "common/admin_socket.h"
5#include "common/ceph_argparse.h"
6#include "common/ceph_context.h"
7#include "common/common_init.h"
8#include "common/debug.h"
9#include "common/errno.h"
10#include "common/WorkQueue.h"
11#include "include/stringify.h"
12#include "msg/Messenger.h"
13#include "FSMirror.h"
14#include "PeerReplayer.h"
15#include "aio_utils.h"
16#include "ServiceDaemon.h"
17#include "Utils.h"
18
19#include "common/Cond.h"
20
21#define dout_context g_ceph_context
22#define dout_subsys ceph_subsys_cephfs_mirror
23#undef dout_prefix
24#define dout_prefix *_dout << "cephfs::mirror::FSMirror " << __func__
25
20effc67
TL
26using namespace std;
27
f67539c2
TL
28namespace cephfs {
29namespace mirror {
30
31namespace {
32
33const std::string SERVICE_DAEMON_DIR_COUNT_KEY("directory_count");
34const std::string SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed");
35
36class MirrorAdminSocketCommand {
37public:
38 virtual ~MirrorAdminSocketCommand() {
39 }
40 virtual int call(Formatter *f) = 0;
41};
42
43class StatusCommand : public MirrorAdminSocketCommand {
44public:
45 explicit StatusCommand(FSMirror *fs_mirror)
46 : fs_mirror(fs_mirror) {
47 }
48
49 int call(Formatter *f) override {
50 fs_mirror->mirror_status(f);
51 return 0;
52 }
53
54private:
55 FSMirror *fs_mirror;
56};
57
58} // anonymous namespace
59
60class MirrorAdminSocketHook : public AdminSocketHook {
61public:
62 MirrorAdminSocketHook(CephContext *cct, const Filesystem &filesystem, FSMirror *fs_mirror)
63 : admin_socket(cct->get_admin_socket()) {
64 int r;
65 std::string cmd;
66
67 // mirror status format is name@fscid
68 cmd = "fs mirror status " + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid);
69 r = admin_socket->register_command(
70 cmd, this, "get filesystem mirror status");
71 if (r == 0) {
72 commands[cmd] = new StatusCommand(fs_mirror);
73 }
74 }
75
76 ~MirrorAdminSocketHook() override {
77 admin_socket->unregister_commands(this);
78 for (auto &[command, cmdptr] : commands) {
79 delete cmdptr;
80 }
81 }
82
83 int call(std::string_view command, const cmdmap_t& cmdmap,
84 Formatter *f, std::ostream &errss, bufferlist &out) override {
85 auto p = commands.at(std::string(command));
86 return p->call(f);
87 }
88
89private:
90 typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
91
92 AdminSocket *admin_socket;
93 Commands commands;
94};
95
96FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
97 ServiceDaemon *service_daemon, std::vector<const char*> args,
98 ContextWQ *work_queue)
99 : m_cct(cct),
100 m_filesystem(filesystem),
101 m_pool_id(pool_id),
102 m_service_daemon(service_daemon),
103 m_args(args),
104 m_work_queue(work_queue),
105 m_snap_listener(this),
106 m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) {
107 m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
108 (uint64_t)0);
109}
110
111FSMirror::~FSMirror() {
112 dout(20) << dendl;
113
114 {
115 std::scoped_lock locker(m_lock);
116 delete m_instance_watcher;
117 delete m_mirror_watcher;
118 }
119 // outside the lock so that in-progress commands can acquire
120 // lock and finish executing.
121 delete m_asok_hook;
122}
123
124int FSMirror::init_replayer(PeerReplayer *peer_replayer) {
125 ceph_assert(ceph_mutex_is_locked(m_lock));
126 return peer_replayer->init();
127}
128
129void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) {
130 peer_replayer->shutdown();
131}
132
133void FSMirror::cleanup() {
134 dout(20) << dendl;
135 ceph_unmount(m_mount);
136 ceph_release(m_mount);
137 m_ioctx.close();
138 m_cluster.reset();
139}
140
b3b6e05e
TL
141void FSMirror::reopen_logs() {
142 std::scoped_lock locker(m_lock);
143
144 if (m_cluster) {
145 reinterpret_cast<CephContext *>(m_cluster->cct())->reopen_logs();
146 }
147 for (auto &[peer, replayer] : m_peer_replayers) {
148 replayer->reopen_logs();
149 }
150}
151
f67539c2
TL
152void FSMirror::init(Context *on_finish) {
153 dout(20) << dendl;
154
155 std::scoped_lock locker(m_lock);
156 int r = connect(g_ceph_context->_conf->name.to_str(),
b3b6e05e 157 g_ceph_context->_conf->cluster, &m_cluster, "", "", m_args);
f67539c2
TL
158 if (r < 0) {
159 m_init_failed = true;
160 on_finish->complete(r);
161 return;
162 }
163
164 r = m_cluster->ioctx_create2(m_pool_id, m_ioctx);
165 if (r < 0) {
166 m_init_failed = true;
167 m_cluster.reset();
168 derr << ": error accessing local pool (id=" << m_pool_id << "): "
169 << cpp_strerror(r) << dendl;
170 on_finish->complete(r);
171 return;
172 }
173
174 r = mount(m_cluster, m_filesystem, true, &m_mount);
175 if (r < 0) {
176 m_init_failed = true;
177 m_ioctx.close();
178 m_cluster.reset();
179 on_finish->complete(r);
180 return;
181 }
182
183 m_addrs = m_cluster->get_addrs();
184 dout(10) << ": rados addrs=" << m_addrs << dendl;
185
186 init_instance_watcher(on_finish);
187}
188
189void FSMirror::shutdown(Context *on_finish) {
190 dout(20) << dendl;
191
192 {
193 std::scoped_lock locker(m_lock);
194 m_stopping = true;
195 if (m_on_init_finish != nullptr) {
196 dout(10) << ": delaying shutdown -- init in progress" << dendl;
197 m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
198 if (r < 0) {
199 on_finish->complete(0);
200 return;
201 }
202 m_on_shutdown_finish = on_finish;
203 shutdown_peer_replayers();
204 });
205 return;
206 }
207
208 m_on_shutdown_finish = on_finish;
209 }
210
211 shutdown_peer_replayers();
212}
213
214void FSMirror::shutdown_peer_replayers() {
215 dout(20) << dendl;
216
217 for (auto &[peer, peer_replayer] : m_peer_replayers) {
218 dout(5) << ": shutting down replayer for peer=" << peer << dendl;
219 shutdown_replayer(peer_replayer.get());
220 }
221 m_peer_replayers.clear();
222
223 shutdown_mirror_watcher();
224}
225
226void FSMirror::init_instance_watcher(Context *on_finish) {
227 dout(20) << dendl;
228
229 m_on_init_finish = new LambdaContext([this, on_finish](int r) {
230 {
231 std::scoped_lock locker(m_lock);
232 if (r < 0) {
233 m_init_failed = true;
234 }
235 }
236 on_finish->complete(r);
237 if (m_on_shutdown_finish != nullptr) {
238 m_on_shutdown_finish->complete(r);
239 }
240 });
241
242 Context *ctx = new C_CallbackAdapter<
243 FSMirror, &FSMirror::handle_init_instance_watcher>(this);
244 m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_work_queue);
245 m_instance_watcher->init(ctx);
246}
247
248void FSMirror::handle_init_instance_watcher(int r) {
249 dout(20) << ": r=" << r << dendl;
250
251 Context *on_init_finish = nullptr;
252 {
253 std::scoped_lock locker(m_lock);
254 if (r < 0) {
255 std::swap(on_init_finish, m_on_init_finish);
256 }
257 }
258
259 if (on_init_finish != nullptr) {
260 on_init_finish->complete(r);
261 return;
262 }
263
264 init_mirror_watcher();
265}
266
267void FSMirror::init_mirror_watcher() {
268 dout(20) << dendl;
269
270 std::scoped_lock locker(m_lock);
271 Context *ctx = new C_CallbackAdapter<
272 FSMirror, &FSMirror::handle_init_mirror_watcher>(this);
273 m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_work_queue);
274 m_mirror_watcher->init(ctx);
275}
276
277void FSMirror::handle_init_mirror_watcher(int r) {
278 dout(20) << ": r=" << r << dendl;
279
280 Context *on_init_finish = nullptr;
281 {
282 std::scoped_lock locker(m_lock);
283 if (r == 0) {
284 std::swap(on_init_finish, m_on_init_finish);
285 }
286 }
287
288 if (on_init_finish != nullptr) {
289 on_init_finish->complete(r);
290 return;
291 }
292
293 m_retval = r; // save errcode for init context callback
294 shutdown_instance_watcher();
295}
296
297void FSMirror::shutdown_mirror_watcher() {
298 dout(20) << dendl;
299
300 std::scoped_lock locker(m_lock);
301 Context *ctx = new C_CallbackAdapter<
302 FSMirror, &FSMirror::handle_shutdown_mirror_watcher>(this);
303 m_mirror_watcher->shutdown(ctx);
304}
305
306void FSMirror::handle_shutdown_mirror_watcher(int r) {
307 dout(20) << ": r=" << r << dendl;
308
309 shutdown_instance_watcher();
310}
311
312void FSMirror::shutdown_instance_watcher() {
313 dout(20) << dendl;
314
315 std::scoped_lock locker(m_lock);
316 Context *ctx = new C_CallbackAdapter<
317 FSMirror, &FSMirror::handle_shutdown_instance_watcher>(this);
318 m_instance_watcher->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, ctx));
319}
320
321void FSMirror::handle_shutdown_instance_watcher(int r) {
322 dout(20) << ": r=" << r << dendl;
323
324 cleanup();
325
326 Context *on_init_finish = nullptr;
327 Context *on_shutdown_finish = nullptr;
328
329 {
330 std::scoped_lock locker(m_lock);
331 std::swap(on_init_finish, m_on_init_finish);
332 std::swap(on_shutdown_finish, m_on_shutdown_finish);
333 }
334
335 if (on_init_finish != nullptr) {
336 on_init_finish->complete(m_retval);
337 }
338 if (on_shutdown_finish != nullptr) {
339 on_shutdown_finish->complete(r);
340 }
341}
342
343void FSMirror::handle_acquire_directory(string_view dir_path) {
344 dout(5) << ": dir_path=" << dir_path << dendl;
345
346 {
347 std::scoped_lock locker(m_lock);
348 m_directories.emplace(dir_path);
349 m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
350 m_directories.size());
351
352 for (auto &[peer, peer_replayer] : m_peer_replayers) {
353 dout(10) << ": peer=" << peer << dendl;
354 peer_replayer->add_directory(dir_path);
355 }
356 }
357}
358
359void FSMirror::handle_release_directory(string_view dir_path) {
360 dout(5) << ": dir_path=" << dir_path << dendl;
361
362 {
363 std::scoped_lock locker(m_lock);
364 auto it = m_directories.find(dir_path);
365 if (it != m_directories.end()) {
366 m_directories.erase(it);
367 m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
368 m_directories.size());
369 for (auto &[peer, peer_replayer] : m_peer_replayers) {
370 dout(10) << ": peer=" << peer << dendl;
371 peer_replayer->remove_directory(dir_path);
372 }
373 }
374 }
375}
376
377void FSMirror::add_peer(const Peer &peer) {
378 dout(10) << ": peer=" << peer << dendl;
379
380 std::scoped_lock locker(m_lock);
381 m_all_peers.emplace(peer);
382 if (m_peer_replayers.find(peer) != m_peer_replayers.end()) {
383 return;
384 }
385
386 auto replayer = std::make_unique<PeerReplayer>(
387 m_cct, this, m_cluster, m_filesystem, peer, m_directories, m_mount, m_service_daemon);
388 int r = init_replayer(replayer.get());
389 if (r < 0) {
390 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, peer,
391 SERVICE_DAEMON_PEER_INIT_FAILED_KEY,
392 true);
393 return;
394 }
395 m_peer_replayers.emplace(peer, std::move(replayer));
396 ceph_assert(m_peer_replayers.size() == 1); // support only a single peer
397}
398
399void FSMirror::remove_peer(const Peer &peer) {
400 dout(10) << ": peer=" << peer << dendl;
401
402 std::unique_ptr<PeerReplayer> replayer;
403 {
404 std::scoped_lock locker(m_lock);
405 m_all_peers.erase(peer);
406 auto it = m_peer_replayers.find(peer);
407 if (it != m_peer_replayers.end()) {
408 replayer = std::move(it->second);
409 m_peer_replayers.erase(it);
410 }
411 }
412
413 if (replayer) {
414 dout(5) << ": shutting down replayers for peer=" << peer << dendl;
415 shutdown_replayer(replayer.get());
416 }
417}
418
419void FSMirror::mirror_status(Formatter *f) {
420 std::scoped_lock locker(m_lock);
421 f->open_object_section("status");
422 if (m_init_failed) {
423 f->dump_string("state", "failed");
424 } else if (is_blocklisted(locker)) {
425 f->dump_string("state", "blocklisted");
426 } else {
427 // dump rados addr for blocklist test
428 f->dump_string("rados_inst", m_addrs);
429 f->open_object_section("peers");
430 for ([[maybe_unused]] auto &[peer, peer_replayer] : m_peer_replayers) {
431 peer.dump(f);
432 }
433 f->close_section(); // peers
434 f->open_object_section("snap_dirs");
435 f->dump_int("dir_count", m_directories.size());
436 f->close_section(); // snap_dirs
437 }
438 f->close_section(); // status
439}
440
441
442} // namespace mirror
443} // namespace cephfs