]> git.proxmox.com Git - ceph.git/blame - ceph/src/tools/cephfs_mirror/ClusterWatcher.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / tools / cephfs_mirror / ClusterWatcher.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <mutex>
5#include <vector>
6
7#include "common/ceph_context.h"
8#include "common/debug.h"
9#include "common/errno.h"
10#include "mon/MonClient.h"
11
12#include "ClusterWatcher.h"
13#include "ServiceDaemon.h"
14
15#define dout_context g_ceph_context
16#define dout_subsys ceph_subsys_cephfs_mirror
17#undef dout_prefix
18#define dout_prefix *_dout << "cephfs::mirror::ClusterWatcher " << __func__
19
20namespace cephfs {
21namespace mirror {
22
23ClusterWatcher::ClusterWatcher(CephContext *cct, MonClient *monc, ServiceDaemon *service_daemon,
24 Listener &listener)
25 : Dispatcher(cct),
26 m_monc(monc),
27 m_service_daemon(service_daemon),
28 m_listener(listener) {
29}
30
31ClusterWatcher::~ClusterWatcher() {
32}
33
34bool ClusterWatcher::ms_can_fast_dispatch2(const cref_t<Message> &m) const {
35 return m->get_type() == CEPH_MSG_FS_MAP;
36}
37
38void ClusterWatcher::ms_fast_dispatch2(const ref_t<Message> &m) {
39 bool handled = ms_dispatch2(m);
40 ceph_assert(handled);
41}
42
43bool ClusterWatcher::ms_dispatch2(const ref_t<Message> &m) {
44 if (m->get_type() == CEPH_MSG_FS_MAP) {
45 if (m->get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
46 handle_fsmap(ref_cast<MFSMap>(m));
47 }
48 return true;
49 }
50
51 return false;
52}
53
54int ClusterWatcher::init() {
55 dout(20) << dendl;
56
57 bool sub = m_monc->sub_want("fsmap", 0, 0);
58 if (!sub) {
59 derr << ": failed subscribing to FSMap" << dendl;
60 return -1;
61 }
62
63 m_monc->renew_subs();
64 dout(10) << ": subscribed to FSMap" << dendl;
65 return 0;
66}
67
68void ClusterWatcher::shutdown() {
69 dout(20) << dendl;
70 std::scoped_lock locker(m_lock);
a4b75251 71 m_stopping = true;
f67539c2
TL
72 m_monc->sub_unwant("fsmap");
73}
74
75void ClusterWatcher::handle_fsmap(const cref_t<MFSMap> &m) {
76 dout(20) << dendl;
77
78 auto fsmap = m->get_fsmap();
79 auto filesystems = fsmap.get_filesystems();
80
81 std::vector<Filesystem> mirroring_enabled;
82 std::vector<Filesystem> mirroring_disabled;
83 std::map<Filesystem, Peers> peers_added;
84 std::map<Filesystem, Peers> peers_removed;
85 std::map<Filesystem, uint64_t> fs_metadata_pools;
86 {
87 std::scoped_lock locker(m_lock);
a4b75251
TL
88 if (m_stopping) {
89 return;
90 }
91
f67539c2
TL
92 // deleted filesystems are considered mirroring disabled
93 for (auto it = m_filesystem_peers.begin(); it != m_filesystem_peers.end();) {
94 if (!fsmap.filesystem_exists(it->first.fscid)) {
95 mirroring_disabled.emplace_back(it->first);
96 it = m_filesystem_peers.erase(it);
97 continue;
98 }
99 ++it;
100 }
101
102 for (auto &filesystem : filesystems) {
103 auto fs = Filesystem{filesystem->fscid,
104 std::string(filesystem->mds_map.get_fs_name())};
105 auto pool_id = filesystem->mds_map.get_metadata_pool();
106 auto &mirror_info = filesystem->mirror_info;
107
108 if (!mirror_info.is_mirrored()) {
109 auto it = m_filesystem_peers.find(fs);
110 if (it != m_filesystem_peers.end()) {
111 mirroring_disabled.emplace_back(fs);
112 m_filesystem_peers.erase(it);
113 }
114 } else {
115 auto [fspeersit, enabled] = m_filesystem_peers.emplace(fs, Peers{});
116 auto &peers = fspeersit->second;
117
118 if (enabled) {
119 mirroring_enabled.emplace_back(fs);
120 fs_metadata_pools.emplace(fs, pool_id);
121 }
122
123 // peers added
124 Peers added;
125 std::set_difference(mirror_info.peers.begin(), mirror_info.peers.end(),
126 peers.begin(), peers.end(), std::inserter(added, added.end()));
127
128 // peers removed
129 Peers removed;
130 std::set_difference(peers.begin(), peers.end(),
131 mirror_info.peers.begin(), mirror_info.peers.end(),
132 std::inserter(removed, removed.end()));
133
134 // update set
135 if (!added.empty()) {
136 peers_added.emplace(fs, added);
137 peers.insert(added.begin(), added.end());
138 }
139 if (!removed.empty()) {
140 peers_removed.emplace(fs, removed);
141 for (auto &p : removed) {
142 peers.erase(p);
143 }
144 }
145 }
146 }
147 }
148
149 dout(5) << ": mirroring enabled=" << mirroring_enabled << ", mirroring_disabled="
150 << mirroring_disabled << dendl;
151 for (auto &fs : mirroring_enabled) {
152 m_service_daemon->add_filesystem(fs.fscid, fs.fs_name);
153 m_listener.handle_mirroring_enabled(FilesystemSpec(fs, fs_metadata_pools.at(fs)));
154 }
155 for (auto &fs : mirroring_disabled) {
156 m_service_daemon->remove_filesystem(fs.fscid);
157 m_listener.handle_mirroring_disabled(fs);
158 }
159
160 dout(5) << ": peers added=" << peers_added << ", peers removed=" << peers_removed << dendl;
161
162 for (auto &[fs, peers] : peers_added) {
163 for (auto &peer : peers) {
164 m_service_daemon->add_peer(fs.fscid, peer);
165 m_listener.handle_peers_added(fs, peer);
166 }
167 }
168 for (auto &[fs, peers] : peers_removed) {
169 for (auto &peer : peers) {
170 m_service_daemon->remove_peer(fs.fscid, peer);
171 m_listener.handle_peers_removed(fs, peer);
172 }
173 }
174
a4b75251
TL
175 std::scoped_lock locker(m_lock);
176 if (!m_stopping) {
177 m_monc->sub_got("fsmap", fsmap.get_epoch());
178 } // else we have already done a sub_unwant()
f67539c2
TL
179}
180
181} // namespace mirror
182} // namespace cephfs