]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2017 Red Hat, Inc | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "SocketMessenger.h" | |
16 | ||
17 | #include <tuple> | |
18 | #include <boost/functional/hash.hpp> | |
19 | ||
20 | #include "auth/Auth.h" | |
21 | #include "Errors.h" | |
22 | #include "Dispatcher.h" | |
23 | #include "Socket.h" | |
24 | ||
11fdf7f2 TL |
25 | namespace { |
26 | seastar::logger& logger() { | |
9f95a23c | 27 | return crimson::get_logger(ceph_subsys_ms); |
11fdf7f2 TL |
28 | } |
29 | } | |
30 | ||
9f95a23c TL |
31 | namespace crimson::net { |
32 | ||
11fdf7f2 TL |
33 | SocketMessenger::SocketMessenger(const entity_name_t& myname, |
34 | const std::string& logic_name, | |
9f95a23c | 35 | uint32_t nonce) |
11fdf7f2 | 36 | : Messenger{myname}, |
9f95a23c | 37 | master_sid{seastar::engine().cpu_id()}, |
11fdf7f2 TL |
38 | logic_name{logic_name}, |
39 | nonce{nonce} | |
40 | {} | |
41 | ||
42 | seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) | |
43 | { | |
9f95a23c | 44 | assert(seastar::engine().cpu_id() == master_sid); |
11fdf7f2 TL |
45 | auto my_addrs = addrs; |
46 | for (auto& addr : my_addrs.v) { | |
47 | addr.nonce = nonce; | |
48 | } | |
9f95a23c TL |
49 | return Messenger::set_myaddrs(my_addrs); |
50 | } | |
51 | ||
52 | seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs) | |
53 | { | |
54 | assert(seastar::engine().cpu_id() == master_sid); | |
55 | ceph_assert(addrs.front().get_family() == AF_INET); | |
56 | return set_myaddrs(addrs).then([this] { | |
57 | if (!listener) { | |
58 | return FixedCPUServerSocket::create().then([this] (auto _listener) { | |
59 | listener = _listener; | |
60 | }); | |
61 | } else { | |
62 | return seastar::now(); | |
63 | } | |
64 | }).then([this] { | |
65 | auto listen_addr = get_myaddr(); | |
66 | logger().debug("{} do_bind: try listen {}...", *this, listen_addr.in4_addr()); | |
67 | if (!listener) { | |
68 | logger().warn("{} do_bind: listener doesn't exist", *this); | |
69 | return seastar::now(); | |
70 | } | |
71 | return listener->listen(listen_addr); | |
72 | }); | |
11fdf7f2 TL |
73 | } |
74 | ||
75 | seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) | |
76 | { | |
9f95a23c TL |
77 | return do_bind(addrs).then([this] { |
78 | logger().info("{} bind: done", *this); | |
79 | }); | |
11fdf7f2 TL |
80 | } |
81 | ||
82 | seastar::future<> | |
83 | SocketMessenger::try_bind(const entity_addrvec_t& addrs, | |
84 | uint32_t min_port, uint32_t max_port) | |
85 | { | |
9f95a23c | 86 | auto addr = addrs.front(); |
11fdf7f2 | 87 | if (addr.get_port() != 0) { |
9f95a23c TL |
88 | return do_bind(addrs).then([this] { |
89 | logger().info("{} try_bind: done", *this); | |
90 | }); | |
11fdf7f2 TL |
91 | } |
92 | ceph_assert(min_port <= max_port); | |
93 | return seastar::do_with(uint32_t(min_port), | |
9f95a23c TL |
94 | [this, max_port, addr] (auto& port) { |
95 | return seastar::repeat([this, max_port, addr, &port] { | |
96 | auto to_bind = addr; | |
97 | to_bind.set_port(port); | |
98 | return do_bind(entity_addrvec_t{to_bind}).then([this] { | |
99 | logger().info("{} try_bind: done", *this); | |
100 | return stop_t::yes; | |
101 | }).handle_exception_type([this, max_port, &port] (const std::system_error& e) { | |
102 | assert(e.code() == std::errc::address_in_use); | |
103 | logger().trace("{} try_bind: {} already used", *this, port); | |
104 | if (port == max_port) { | |
105 | throw; | |
106 | } | |
107 | ++port; | |
108 | return stop_t::no; | |
109 | }); | |
11fdf7f2 | 110 | }); |
9f95a23c | 111 | }); |
11fdf7f2 TL |
112 | } |
113 | ||
114 | seastar::future<> SocketMessenger::start(Dispatcher *disp) { | |
9f95a23c | 115 | assert(seastar::engine().cpu_id() == master_sid); |
11fdf7f2 | 116 | |
11fdf7f2 | 117 | dispatcher = disp; |
11fdf7f2 | 118 | if (listener) { |
9f95a23c TL |
119 | // make sure we have already bound to a valid address |
120 | ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2()); | |
121 | ceph_assert(get_myaddr().get_port() > 0); | |
11fdf7f2 | 122 | |
9f95a23c TL |
123 | return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) { |
124 | assert(seastar::engine().cpu_id() == master_sid); | |
125 | SocketConnectionRef conn = seastar::make_shared<SocketConnection>( | |
126 | *this, *dispatcher, get_myaddr().is_msgr2()); | |
127 | conn->start_accept(std::move(socket), peer_addr); | |
128 | return seastar::now(); | |
129 | }); | |
130 | } | |
11fdf7f2 TL |
131 | return seastar::now(); |
132 | } | |
133 | ||
9f95a23c TL |
134 | crimson::net::ConnectionRef |
135 | SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) | |
11fdf7f2 | 136 | { |
9f95a23c TL |
137 | assert(seastar::engine().cpu_id() == master_sid); |
138 | ||
139 | // make sure we connect to a valid peer_addr | |
140 | ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2()); | |
141 | ceph_assert(peer_addr.get_port() > 0); | |
142 | ||
11fdf7f2 | 143 | if (auto found = lookup_conn(peer_addr); found) { |
9f95a23c | 144 | return found->shared_from_this(); |
11fdf7f2 | 145 | } |
9f95a23c TL |
146 | SocketConnectionRef conn = seastar::make_shared<SocketConnection>( |
147 | *this, *dispatcher, peer_addr.is_msgr2()); | |
11fdf7f2 | 148 | conn->start_connect(peer_addr, peer_type); |
9f95a23c | 149 | return conn->shared_from_this(); |
11fdf7f2 TL |
150 | } |
151 | ||
9f95a23c | 152 | seastar::future<> SocketMessenger::shutdown() |
11fdf7f2 | 153 | { |
9f95a23c TL |
154 | assert(seastar::engine().cpu_id() == master_sid); |
155 | return seastar::futurize_apply([this] { | |
156 | if (listener) { | |
157 | auto d_listener = listener; | |
158 | listener = nullptr; | |
159 | return d_listener->destroy(); | |
160 | } else { | |
161 | return seastar::now(); | |
162 | } | |
11fdf7f2 | 163 | // close all connections |
9f95a23c TL |
164 | }).then([this] { |
165 | return seastar::parallel_for_each(accepting_conns, [] (auto conn) { | |
11fdf7f2 | 166 | return conn->close(); |
11fdf7f2 | 167 | }); |
9f95a23c TL |
168 | }).then([this] { |
169 | ceph_assert(accepting_conns.empty()); | |
170 | return seastar::parallel_for_each(connections, [] (auto conn) { | |
171 | return conn.second->close(); | |
172 | }); | |
173 | }).then([this] { | |
174 | ceph_assert(connections.empty()); | |
175 | shutdown_promise.set_value(); | |
176 | }); | |
11fdf7f2 TL |
177 | } |
178 | ||
9f95a23c | 179 | seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn) |
11fdf7f2 | 180 | { |
9f95a23c TL |
181 | assert(seastar::engine().cpu_id() == master_sid); |
182 | if (!need_addr) { | |
183 | if ((!get_myaddr().is_any() && | |
184 | get_myaddr().get_type() != peer_addr_for_me.get_type()) || | |
185 | get_myaddr().get_family() != peer_addr_for_me.get_family() || | |
186 | !get_myaddr().is_same_host(peer_addr_for_me)) { | |
187 | logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}", | |
188 | conn, peer_addr_for_me, get_myaddr()); | |
189 | throw std::system_error( | |
190 | make_error_code(crimson::net::error::bad_peer_address)); | |
191 | } | |
11fdf7f2 TL |
192 | return seastar::now(); |
193 | } | |
9f95a23c TL |
194 | need_addr = false; |
195 | ||
196 | if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) { | |
197 | // Not bound | |
198 | entity_addr_t addr = peer_addr_for_me; | |
199 | addr.set_type(entity_addr_t::TYPE_ANY); | |
200 | addr.set_port(0); | |
201 | return set_myaddrs(entity_addrvec_t{addr} | |
202 | ).then([this, &conn, peer_addr_for_me] { | |
203 | logger().info("{} learned myaddr={} (unbound) from {}", | |
204 | conn, get_myaddr(), peer_addr_for_me); | |
205 | }); | |
206 | } else { | |
207 | // Already bound | |
208 | if (!get_myaddr().is_any() && | |
209 | get_myaddr().get_type() != peer_addr_for_me.get_type()) { | |
210 | logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}", | |
211 | conn, peer_addr_for_me, get_myaddr()); | |
212 | throw std::system_error( | |
213 | make_error_code(crimson::net::error::bad_peer_address)); | |
214 | } | |
215 | if (get_myaddr().get_family() != peer_addr_for_me.get_family()) { | |
216 | logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}", | |
217 | conn, peer_addr_for_me, get_myaddr()); | |
218 | throw std::system_error( | |
219 | make_error_code(crimson::net::error::bad_peer_address)); | |
220 | } | |
221 | if (get_myaddr().is_blank_ip()) { | |
222 | entity_addr_t addr = peer_addr_for_me; | |
223 | addr.set_type(get_myaddr().get_type()); | |
224 | addr.set_port(get_myaddr().get_port()); | |
225 | return set_myaddrs(entity_addrvec_t{addr} | |
226 | ).then([this, &conn, peer_addr_for_me] { | |
227 | logger().info("{} learned myaddr={} (blank IP) from {}", | |
228 | conn, get_myaddr(), peer_addr_for_me); | |
229 | }); | |
230 | } else if (!get_myaddr().is_same_host(peer_addr_for_me)) { | |
231 | logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}", | |
232 | conn, peer_addr_for_me, get_myaddr()); | |
233 | throw std::system_error( | |
234 | make_error_code(crimson::net::error::bad_peer_address)); | |
235 | } else { | |
236 | return seastar::now(); | |
237 | } | |
238 | } | |
239 | } | |
240 | ||
241 | SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const | |
242 | { | |
243 | return policy_set.get(peer_type); | |
244 | } | |
11fdf7f2 | 245 | |
9f95a23c TL |
246 | SocketPolicy SocketMessenger::get_default_policy() const |
247 | { | |
248 | return policy_set.get_default(); | |
11fdf7f2 TL |
249 | } |
250 | ||
251 | void SocketMessenger::set_default_policy(const SocketPolicy& p) | |
252 | { | |
253 | policy_set.set_default(p); | |
254 | } | |
255 | ||
256 | void SocketMessenger::set_policy(entity_type_t peer_type, | |
257 | const SocketPolicy& p) | |
258 | { | |
259 | policy_set.set(peer_type, p); | |
260 | } | |
261 | ||
262 | void SocketMessenger::set_policy_throttler(entity_type_t peer_type, | |
263 | Throttle* throttle) | |
264 | { | |
265 | // only byte throttler is used in OSD | |
266 | policy_set.set_throttlers(peer_type, throttle, nullptr); | |
267 | } | |
268 | ||
9f95a23c | 269 | crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) |
11fdf7f2 TL |
270 | { |
271 | if (auto found = connections.find(addr); | |
272 | found != connections.end()) { | |
273 | return found->second; | |
274 | } else { | |
275 | return nullptr; | |
276 | } | |
277 | } | |
278 | ||
279 | void SocketMessenger::accept_conn(SocketConnectionRef conn) | |
280 | { | |
281 | accepting_conns.insert(conn); | |
282 | } | |
283 | ||
284 | void SocketMessenger::unaccept_conn(SocketConnectionRef conn) | |
285 | { | |
286 | accepting_conns.erase(conn); | |
287 | } | |
288 | ||
289 | void SocketMessenger::register_conn(SocketConnectionRef conn) | |
290 | { | |
11fdf7f2 TL |
291 | auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); |
292 | std::ignore = i; | |
293 | ceph_assert(added); | |
294 | } | |
295 | ||
296 | void SocketMessenger::unregister_conn(SocketConnectionRef conn) | |
297 | { | |
298 | ceph_assert(conn); | |
299 | auto found = connections.find(conn->get_peer_addr()); | |
300 | ceph_assert(found != connections.end()); | |
301 | ceph_assert(found->second == conn); | |
302 | connections.erase(found); | |
303 | } | |
9f95a23c TL |
304 | |
305 | seastar::future<uint32_t> | |
306 | SocketMessenger::get_global_seq(uint32_t old) | |
307 | { | |
308 | if (old > global_seq) { | |
309 | global_seq = old; | |
310 | } | |
311 | return seastar::make_ready_future<uint32_t>(++global_seq); | |
312 | } | |
313 | ||
314 | } // namespace crimson::net |