]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/cephfs_mirror/FSMirror.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / tools / cephfs_mirror / FSMirror.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "common/admin_socket.h"
5 #include "common/ceph_argparse.h"
6 #include "common/ceph_context.h"
7 #include "common/common_init.h"
8 #include "common/debug.h"
9 #include "common/errno.h"
10 #include "common/WorkQueue.h"
11 #include "include/stringify.h"
12 #include "msg/Messenger.h"
13 #include "FSMirror.h"
14 #include "PeerReplayer.h"
15 #include "aio_utils.h"
16 #include "ServiceDaemon.h"
17 #include "Utils.h"
18
19 #include "common/Cond.h"
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_cephfs_mirror
23 #undef dout_prefix
24 #define dout_prefix *_dout << "cephfs::mirror::FSMirror " << __func__
25
26 namespace cephfs {
27 namespace mirror {
28
29 namespace {
30
31 const std::string SERVICE_DAEMON_DIR_COUNT_KEY("directory_count");
32 const std::string SERVICE_DAEMON_PEER_INIT_FAILED_KEY("peer_init_failed");
33
34 class MirrorAdminSocketCommand {
35 public:
36 virtual ~MirrorAdminSocketCommand() {
37 }
38 virtual int call(Formatter *f) = 0;
39 };
40
41 class StatusCommand : public MirrorAdminSocketCommand {
42 public:
43 explicit StatusCommand(FSMirror *fs_mirror)
44 : fs_mirror(fs_mirror) {
45 }
46
47 int call(Formatter *f) override {
48 fs_mirror->mirror_status(f);
49 return 0;
50 }
51
52 private:
53 FSMirror *fs_mirror;
54 };
55
56 } // anonymous namespace
57
58 class MirrorAdminSocketHook : public AdminSocketHook {
59 public:
60 MirrorAdminSocketHook(CephContext *cct, const Filesystem &filesystem, FSMirror *fs_mirror)
61 : admin_socket(cct->get_admin_socket()) {
62 int r;
63 std::string cmd;
64
65 // mirror status format is name@fscid
66 cmd = "fs mirror status " + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid);
67 r = admin_socket->register_command(
68 cmd, this, "get filesystem mirror status");
69 if (r == 0) {
70 commands[cmd] = new StatusCommand(fs_mirror);
71 }
72 }
73
74 ~MirrorAdminSocketHook() override {
75 admin_socket->unregister_commands(this);
76 for (auto &[command, cmdptr] : commands) {
77 delete cmdptr;
78 }
79 }
80
81 int call(std::string_view command, const cmdmap_t& cmdmap,
82 Formatter *f, std::ostream &errss, bufferlist &out) override {
83 auto p = commands.at(std::string(command));
84 return p->call(f);
85 }
86
87 private:
88 typedef std::map<std::string, MirrorAdminSocketCommand*, std::less<>> Commands;
89
90 AdminSocket *admin_socket;
91 Commands commands;
92 };
93
94 FSMirror::FSMirror(CephContext *cct, const Filesystem &filesystem, uint64_t pool_id,
95 ServiceDaemon *service_daemon, std::vector<const char*> args,
96 ContextWQ *work_queue)
97 : m_cct(cct),
98 m_filesystem(filesystem),
99 m_pool_id(pool_id),
100 m_service_daemon(service_daemon),
101 m_args(args),
102 m_work_queue(work_queue),
103 m_snap_listener(this),
104 m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) {
105 m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
106 (uint64_t)0);
107 }
108
109 FSMirror::~FSMirror() {
110 dout(20) << dendl;
111
112 {
113 std::scoped_lock locker(m_lock);
114 delete m_instance_watcher;
115 delete m_mirror_watcher;
116 }
117 // outside the lock so that in-progress commands can acquire
118 // lock and finish executing.
119 delete m_asok_hook;
120 }
121
122 int FSMirror::init_replayer(PeerReplayer *peer_replayer) {
123 ceph_assert(ceph_mutex_is_locked(m_lock));
124 return peer_replayer->init();
125 }
126
127 void FSMirror::shutdown_replayer(PeerReplayer *peer_replayer) {
128 peer_replayer->shutdown();
129 }
130
131 void FSMirror::cleanup() {
132 dout(20) << dendl;
133 ceph_unmount(m_mount);
134 ceph_release(m_mount);
135 m_ioctx.close();
136 m_cluster.reset();
137 }
138
139 void FSMirror::reopen_logs() {
140 std::scoped_lock locker(m_lock);
141
142 if (m_cluster) {
143 reinterpret_cast<CephContext *>(m_cluster->cct())->reopen_logs();
144 }
145 for (auto &[peer, replayer] : m_peer_replayers) {
146 replayer->reopen_logs();
147 }
148 }
149
150 void FSMirror::init(Context *on_finish) {
151 dout(20) << dendl;
152
153 std::scoped_lock locker(m_lock);
154 int r = connect(g_ceph_context->_conf->name.to_str(),
155 g_ceph_context->_conf->cluster, &m_cluster, "", "", m_args);
156 if (r < 0) {
157 m_init_failed = true;
158 on_finish->complete(r);
159 return;
160 }
161
162 r = m_cluster->ioctx_create2(m_pool_id, m_ioctx);
163 if (r < 0) {
164 m_init_failed = true;
165 m_cluster.reset();
166 derr << ": error accessing local pool (id=" << m_pool_id << "): "
167 << cpp_strerror(r) << dendl;
168 on_finish->complete(r);
169 return;
170 }
171
172 r = mount(m_cluster, m_filesystem, true, &m_mount);
173 if (r < 0) {
174 m_init_failed = true;
175 m_ioctx.close();
176 m_cluster.reset();
177 on_finish->complete(r);
178 return;
179 }
180
181 m_addrs = m_cluster->get_addrs();
182 dout(10) << ": rados addrs=" << m_addrs << dendl;
183
184 init_instance_watcher(on_finish);
185 }
186
187 void FSMirror::shutdown(Context *on_finish) {
188 dout(20) << dendl;
189
190 {
191 std::scoped_lock locker(m_lock);
192 m_stopping = true;
193 if (m_on_init_finish != nullptr) {
194 dout(10) << ": delaying shutdown -- init in progress" << dendl;
195 m_on_shutdown_finish = new LambdaContext([this, on_finish](int r) {
196 if (r < 0) {
197 on_finish->complete(0);
198 return;
199 }
200 m_on_shutdown_finish = on_finish;
201 shutdown_peer_replayers();
202 });
203 return;
204 }
205
206 m_on_shutdown_finish = on_finish;
207 }
208
209 shutdown_peer_replayers();
210 }
211
212 void FSMirror::shutdown_peer_replayers() {
213 dout(20) << dendl;
214
215 for (auto &[peer, peer_replayer] : m_peer_replayers) {
216 dout(5) << ": shutting down replayer for peer=" << peer << dendl;
217 shutdown_replayer(peer_replayer.get());
218 }
219 m_peer_replayers.clear();
220
221 shutdown_mirror_watcher();
222 }
223
224 void FSMirror::init_instance_watcher(Context *on_finish) {
225 dout(20) << dendl;
226
227 m_on_init_finish = new LambdaContext([this, on_finish](int r) {
228 {
229 std::scoped_lock locker(m_lock);
230 if (r < 0) {
231 m_init_failed = true;
232 }
233 }
234 on_finish->complete(r);
235 if (m_on_shutdown_finish != nullptr) {
236 m_on_shutdown_finish->complete(r);
237 }
238 });
239
240 Context *ctx = new C_CallbackAdapter<
241 FSMirror, &FSMirror::handle_init_instance_watcher>(this);
242 m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_work_queue);
243 m_instance_watcher->init(ctx);
244 }
245
246 void FSMirror::handle_init_instance_watcher(int r) {
247 dout(20) << ": r=" << r << dendl;
248
249 Context *on_init_finish = nullptr;
250 {
251 std::scoped_lock locker(m_lock);
252 if (r < 0) {
253 std::swap(on_init_finish, m_on_init_finish);
254 }
255 }
256
257 if (on_init_finish != nullptr) {
258 on_init_finish->complete(r);
259 return;
260 }
261
262 init_mirror_watcher();
263 }
264
265 void FSMirror::init_mirror_watcher() {
266 dout(20) << dendl;
267
268 std::scoped_lock locker(m_lock);
269 Context *ctx = new C_CallbackAdapter<
270 FSMirror, &FSMirror::handle_init_mirror_watcher>(this);
271 m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_work_queue);
272 m_mirror_watcher->init(ctx);
273 }
274
275 void FSMirror::handle_init_mirror_watcher(int r) {
276 dout(20) << ": r=" << r << dendl;
277
278 Context *on_init_finish = nullptr;
279 {
280 std::scoped_lock locker(m_lock);
281 if (r == 0) {
282 std::swap(on_init_finish, m_on_init_finish);
283 }
284 }
285
286 if (on_init_finish != nullptr) {
287 on_init_finish->complete(r);
288 return;
289 }
290
291 m_retval = r; // save errcode for init context callback
292 shutdown_instance_watcher();
293 }
294
295 void FSMirror::shutdown_mirror_watcher() {
296 dout(20) << dendl;
297
298 std::scoped_lock locker(m_lock);
299 Context *ctx = new C_CallbackAdapter<
300 FSMirror, &FSMirror::handle_shutdown_mirror_watcher>(this);
301 m_mirror_watcher->shutdown(ctx);
302 }
303
304 void FSMirror::handle_shutdown_mirror_watcher(int r) {
305 dout(20) << ": r=" << r << dendl;
306
307 shutdown_instance_watcher();
308 }
309
310 void FSMirror::shutdown_instance_watcher() {
311 dout(20) << dendl;
312
313 std::scoped_lock locker(m_lock);
314 Context *ctx = new C_CallbackAdapter<
315 FSMirror, &FSMirror::handle_shutdown_instance_watcher>(this);
316 m_instance_watcher->shutdown(new C_AsyncCallback<ContextWQ>(m_work_queue, ctx));
317 }
318
319 void FSMirror::handle_shutdown_instance_watcher(int r) {
320 dout(20) << ": r=" << r << dendl;
321
322 cleanup();
323
324 Context *on_init_finish = nullptr;
325 Context *on_shutdown_finish = nullptr;
326
327 {
328 std::scoped_lock locker(m_lock);
329 std::swap(on_init_finish, m_on_init_finish);
330 std::swap(on_shutdown_finish, m_on_shutdown_finish);
331 }
332
333 if (on_init_finish != nullptr) {
334 on_init_finish->complete(m_retval);
335 }
336 if (on_shutdown_finish != nullptr) {
337 on_shutdown_finish->complete(r);
338 }
339 }
340
341 void FSMirror::handle_acquire_directory(string_view dir_path) {
342 dout(5) << ": dir_path=" << dir_path << dendl;
343
344 {
345 std::scoped_lock locker(m_lock);
346 m_directories.emplace(dir_path);
347 m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
348 m_directories.size());
349
350 for (auto &[peer, peer_replayer] : m_peer_replayers) {
351 dout(10) << ": peer=" << peer << dendl;
352 peer_replayer->add_directory(dir_path);
353 }
354 }
355 }
356
357 void FSMirror::handle_release_directory(string_view dir_path) {
358 dout(5) << ": dir_path=" << dir_path << dendl;
359
360 {
361 std::scoped_lock locker(m_lock);
362 auto it = m_directories.find(dir_path);
363 if (it != m_directories.end()) {
364 m_directories.erase(it);
365 m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
366 m_directories.size());
367 for (auto &[peer, peer_replayer] : m_peer_replayers) {
368 dout(10) << ": peer=" << peer << dendl;
369 peer_replayer->remove_directory(dir_path);
370 }
371 }
372 }
373 }
374
375 void FSMirror::add_peer(const Peer &peer) {
376 dout(10) << ": peer=" << peer << dendl;
377
378 std::scoped_lock locker(m_lock);
379 m_all_peers.emplace(peer);
380 if (m_peer_replayers.find(peer) != m_peer_replayers.end()) {
381 return;
382 }
383
384 auto replayer = std::make_unique<PeerReplayer>(
385 m_cct, this, m_cluster, m_filesystem, peer, m_directories, m_mount, m_service_daemon);
386 int r = init_replayer(replayer.get());
387 if (r < 0) {
388 m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, peer,
389 SERVICE_DAEMON_PEER_INIT_FAILED_KEY,
390 true);
391 return;
392 }
393 m_peer_replayers.emplace(peer, std::move(replayer));
394 ceph_assert(m_peer_replayers.size() == 1); // support only a single peer
395 }
396
397 void FSMirror::remove_peer(const Peer &peer) {
398 dout(10) << ": peer=" << peer << dendl;
399
400 std::unique_ptr<PeerReplayer> replayer;
401 {
402 std::scoped_lock locker(m_lock);
403 m_all_peers.erase(peer);
404 auto it = m_peer_replayers.find(peer);
405 if (it != m_peer_replayers.end()) {
406 replayer = std::move(it->second);
407 m_peer_replayers.erase(it);
408 }
409 }
410
411 if (replayer) {
412 dout(5) << ": shutting down replayers for peer=" << peer << dendl;
413 shutdown_replayer(replayer.get());
414 }
415 }
416
417 void FSMirror::mirror_status(Formatter *f) {
418 std::scoped_lock locker(m_lock);
419 f->open_object_section("status");
420 if (m_init_failed) {
421 f->dump_string("state", "failed");
422 } else if (is_blocklisted(locker)) {
423 f->dump_string("state", "blocklisted");
424 } else {
425 // dump rados addr for blocklist test
426 f->dump_string("rados_inst", m_addrs);
427 f->open_object_section("peers");
428 for ([[maybe_unused]] auto &[peer, peer_replayer] : m_peer_replayers) {
429 peer.dump(f);
430 }
431 f->close_section(); // peers
432 f->open_object_section("snap_dirs");
433 f->dump_int("dir_count", m_directories.size());
434 f->close_section(); // snap_dirs
435 }
436 f->close_section(); // status
437 }
438
439
440 } // namespace mirror
441 } // namespace cephfs