]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2017 Red Hat, Inc | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "SocketMessenger.h" | |
16 | ||
17 | #include <tuple> | |
18 | #include <boost/functional/hash.hpp> | |
19 | ||
20 | #include "auth/Auth.h" | |
21 | #include "Errors.h" | |
22 | #include "Dispatcher.h" | |
23 | #include "Socket.h" | |
24 | ||
25 | using namespace ceph::net; | |
26 | ||
27 | namespace { | |
28 | seastar::logger& logger() { | |
29 | return ceph::get_logger(ceph_subsys_ms); | |
30 | } | |
31 | } | |
32 | ||
33 | SocketMessenger::SocketMessenger(const entity_name_t& myname, | |
34 | const std::string& logic_name, | |
35 | uint32_t nonce, | |
36 | int master_sid) | |
37 | : Messenger{myname}, | |
38 | master_sid{master_sid}, | |
39 | sid{seastar::engine().cpu_id()}, | |
40 | logic_name{logic_name}, | |
41 | nonce{nonce} | |
42 | {} | |
43 | ||
44 | seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs) | |
45 | { | |
46 | auto my_addrs = addrs; | |
47 | for (auto& addr : my_addrs.v) { | |
48 | addr.nonce = nonce; | |
49 | } | |
50 | return container().invoke_on_all([my_addrs](auto& msgr) { | |
51 | return msgr.Messenger::set_myaddrs(my_addrs); | |
52 | }); | |
53 | } | |
54 | ||
55 | seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs) | |
56 | { | |
57 | ceph_assert(addrs.legacy_addr().get_family() == AF_INET); | |
58 | auto my_addrs = addrs; | |
59 | for (auto& addr : my_addrs.v) { | |
60 | addr.nonce = nonce; | |
61 | } | |
62 | logger().info("listening on {}", my_addrs.legacy_addr().in4_addr()); | |
63 | return container().invoke_on_all([my_addrs](auto& msgr) { | |
64 | msgr.do_bind(my_addrs); | |
65 | }); | |
66 | } | |
67 | ||
68 | seastar::future<> | |
69 | SocketMessenger::try_bind(const entity_addrvec_t& addrs, | |
70 | uint32_t min_port, uint32_t max_port) | |
71 | { | |
72 | auto addr = addrs.legacy_or_front_addr(); | |
73 | if (addr.get_port() != 0) { | |
74 | return bind(addrs); | |
75 | } | |
76 | ceph_assert(min_port <= max_port); | |
77 | return seastar::do_with(uint32_t(min_port), | |
78 | [this, max_port, addr] (auto& port) { | |
79 | return seastar::repeat([this, max_port, addr, &port] { | |
80 | auto to_bind = addr; | |
81 | to_bind.set_port(port); | |
82 | return bind(entity_addrvec_t{to_bind}) | |
83 | .then([this] { | |
84 | logger().info("{}: try_bind: done", *this); | |
85 | return stop_t::yes; | |
86 | }).handle_exception_type([this, max_port, &port] (const std::system_error& e) { | |
87 | logger().debug("{}: try_bind: {} already used", *this, port); | |
88 | if (port == max_port) { | |
89 | throw e; | |
90 | } | |
91 | ++port; | |
92 | return stop_t::no; | |
93 | }); | |
94 | }); | |
95 | }); | |
96 | } | |
97 | ||
98 | seastar::future<> SocketMessenger::start(Dispatcher *disp) { | |
99 | return container().invoke_on_all([disp](auto& msgr) { | |
100 | return msgr.do_start(disp->get_local_shard()); | |
101 | }); | |
102 | } | |
103 | ||
104 | seastar::future<ceph::net::ConnectionXRef> | |
105 | SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) | |
106 | { | |
107 | auto shard = locate_shard(peer_addr); | |
108 | return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) { | |
109 | return msgr.do_connect(peer_addr, peer_type); | |
110 | }).then([](seastar::foreign_ptr<ConnectionRef>&& conn) { | |
111 | return seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(std::move(conn)); | |
112 | }); | |
113 | } | |
114 | ||
115 | seastar::future<> SocketMessenger::shutdown() | |
116 | { | |
117 | return container().invoke_on_all([](auto& msgr) { | |
118 | return msgr.do_shutdown(); | |
119 | }).finally([this] { | |
120 | return container().invoke_on_all([](auto& msgr) { | |
121 | msgr.shutdown_promise.set_value(); | |
122 | }); | |
123 | }); | |
124 | } | |
125 | ||
126 | void SocketMessenger::do_bind(const entity_addrvec_t& addrs) | |
127 | { | |
128 | Messenger::set_myaddrs(addrs); | |
129 | ||
130 | // TODO: v2: listen on multiple addresses | |
131 | seastar::socket_address address(addrs.legacy_addr().in4_addr()); | |
132 | seastar::listen_options lo; | |
133 | lo.reuse_address = true; | |
134 | listener = seastar::listen(address, lo); | |
135 | } | |
136 | ||
137 | seastar::future<> SocketMessenger::do_start(Dispatcher *disp) | |
138 | { | |
139 | dispatcher = disp; | |
140 | ||
141 | // start listening if bind() was called | |
142 | if (listener) { | |
143 | seastar::keep_doing([this] { | |
144 | return listener->accept() | |
145 | .then([this] (seastar::connected_socket socket, | |
146 | seastar::socket_address paddr) { | |
147 | // allocate the connection | |
148 | entity_addr_t peer_addr; | |
149 | peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); | |
150 | auto shard = locate_shard(peer_addr); | |
151 | #warning fixme | |
152 | // we currently do dangerous i/o from a Connection core, different from the Socket core. | |
153 | auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket))); | |
154 | // don't wait before accepting another | |
155 | container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable { | |
156 | SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher); | |
157 | conn->start_accept(std::move(sock), peer_addr); | |
158 | }); | |
159 | }); | |
160 | }).handle_exception_type([this] (const std::system_error& e) { | |
161 | // stop gracefully on connection_aborted | |
162 | if (e.code() != error::connection_aborted) { | |
163 | logger().error("{} unexpected error during accept: {}", *this, e); | |
164 | } | |
165 | }); | |
166 | } | |
167 | ||
168 | return seastar::now(); | |
169 | } | |
170 | ||
171 | seastar::foreign_ptr<ceph::net::ConnectionRef> | |
172 | SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) | |
173 | { | |
174 | if (auto found = lookup_conn(peer_addr); found) { | |
175 | return seastar::make_foreign(found->shared_from_this()); | |
176 | } | |
177 | SocketConnectionRef conn = seastar::make_shared<SocketConnection>(*this, *dispatcher); | |
178 | conn->start_connect(peer_addr, peer_type); | |
179 | return seastar::make_foreign(conn->shared_from_this()); | |
180 | } | |
181 | ||
182 | seastar::future<> SocketMessenger::do_shutdown() | |
183 | { | |
184 | if (listener) { | |
185 | listener->abort_accept(); | |
186 | } | |
187 | // close all connections | |
188 | return seastar::parallel_for_each(accepting_conns, [] (auto conn) { | |
189 | return conn->close(); | |
190 | }).then([this] { | |
191 | ceph_assert(accepting_conns.empty()); | |
192 | return seastar::parallel_for_each(connections, [] (auto conn) { | |
193 | return conn.second->close(); | |
194 | }); | |
195 | }).finally([this] { | |
196 | ceph_assert(connections.empty()); | |
197 | }); | |
198 | } | |
199 | ||
200 | seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) | |
201 | { | |
202 | if (!get_myaddr().is_blank_ip()) { | |
203 | // already learned or binded | |
204 | return seastar::now(); | |
205 | } | |
206 | ||
207 | // Only learn IP address if blank. | |
208 | entity_addr_t addr = get_myaddr(); | |
209 | addr.u = peer_addr_for_me.u; | |
210 | addr.set_type(peer_addr_for_me.get_type()); | |
211 | addr.set_port(get_myaddr().get_port()); | |
212 | return set_myaddrs(entity_addrvec_t{addr}); | |
213 | } | |
214 | ||
215 | void SocketMessenger::set_default_policy(const SocketPolicy& p) | |
216 | { | |
217 | policy_set.set_default(p); | |
218 | } | |
219 | ||
220 | void SocketMessenger::set_policy(entity_type_t peer_type, | |
221 | const SocketPolicy& p) | |
222 | { | |
223 | policy_set.set(peer_type, p); | |
224 | } | |
225 | ||
226 | void SocketMessenger::set_policy_throttler(entity_type_t peer_type, | |
227 | Throttle* throttle) | |
228 | { | |
229 | // only byte throttler is used in OSD | |
230 | policy_set.set_throttlers(peer_type, throttle, nullptr); | |
231 | } | |
232 | ||
233 | seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr) | |
234 | { | |
235 | ceph_assert(addr.get_family() == AF_INET); | |
236 | if (master_sid >= 0) { | |
237 | return master_sid; | |
238 | } | |
239 | std::size_t seed = 0; | |
240 | boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr); | |
241 | //boost::hash_combine(seed, addr.u.sin.sin_port); | |
242 | //boost::hash_combine(seed, addr.nonce); | |
243 | return seed % seastar::smp::count; | |
244 | } | |
245 | ||
246 | ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) | |
247 | { | |
248 | if (auto found = connections.find(addr); | |
249 | found != connections.end()) { | |
250 | return found->second; | |
251 | } else { | |
252 | return nullptr; | |
253 | } | |
254 | } | |
255 | ||
256 | void SocketMessenger::accept_conn(SocketConnectionRef conn) | |
257 | { | |
258 | accepting_conns.insert(conn); | |
259 | } | |
260 | ||
261 | void SocketMessenger::unaccept_conn(SocketConnectionRef conn) | |
262 | { | |
263 | accepting_conns.erase(conn); | |
264 | } | |
265 | ||
266 | void SocketMessenger::register_conn(SocketConnectionRef conn) | |
267 | { | |
268 | if (master_sid >= 0) { | |
269 | ceph_assert(static_cast<int>(sid) == master_sid); | |
270 | } | |
271 | auto [i, added] = connections.emplace(conn->get_peer_addr(), conn); | |
272 | std::ignore = i; | |
273 | ceph_assert(added); | |
274 | } | |
275 | ||
276 | void SocketMessenger::unregister_conn(SocketConnectionRef conn) | |
277 | { | |
278 | ceph_assert(conn); | |
279 | auto found = connections.find(conn->get_peer_addr()); | |
280 | ceph_assert(found != connections.end()); | |
281 | ceph_assert(found->second == conn); | |
282 | connections.erase(found); | |
283 | } |