]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/SocketMessenger.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / crimson / net / SocketMessenger.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2017 Red Hat, Inc
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "SocketMessenger.h"
16
20effc67
TL
17#include <seastar/core/sleep.hh>
18
11fdf7f2
TL
19#include <tuple>
20#include <boost/functional/hash.hpp>
1e59de90 21#include <fmt/os.h>
11fdf7f2
TL
22
23#include "auth/Auth.h"
24#include "Errors.h"
11fdf7f2
TL
25#include "Socket.h"
26
11fdf7f2
TL
27namespace {
28 seastar::logger& logger() {
9f95a23c 29 return crimson::get_logger(ceph_subsys_ms);
11fdf7f2
TL
30 }
31}
32
9f95a23c
TL
33namespace crimson::net {
34
11fdf7f2
TL
35SocketMessenger::SocketMessenger(const entity_name_t& myname,
36 const std::string& logic_name,
9f95a23c 37 uint32_t nonce)
1e59de90 38 : master_sid{seastar::this_shard_id()},
11fdf7f2 39 logic_name{logic_name},
1e59de90
TL
40 nonce{nonce},
41 my_name{myname}
11fdf7f2
TL
42{}
43
20effc67
TL
44SocketMessenger::~SocketMessenger()
45{
1e59de90 46 logger().debug("~SocketMessenger: {}", logic_name);
20effc67
TL
47 ceph_assert(!listener);
48}
49
50bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
51{
52 bool ret = false;
53
54 entity_addrvec_t newaddrs = my_addrs;
55 for (auto& a : newaddrs.v) {
56 if (a.is_blank_ip()) {
57 int type = a.get_type();
58 int port = a.get_port();
59 uint32_t nonce = a.get_nonce();
60 for (auto& b : addrs.v) {
61 if (a.get_family() == b.get_family()) {
62 logger().debug(" assuming my addr {} matches provided addr {}", a, b);
63 a = b;
64 a.set_nonce(nonce);
65 a.set_type(type);
66 a.set_port(port);
67 ret = true;
68 break;
69 }
70 }
71 }
72 }
73 my_addrs = newaddrs;
74 return ret;
75}
76
1e59de90 77void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
11fdf7f2 78{
f67539c2 79 assert(seastar::this_shard_id() == master_sid);
1e59de90 80 my_addrs = addrs;
11fdf7f2
TL
81 for (auto& addr : my_addrs.v) {
82 addr.nonce = nonce;
83 }
9f95a23c
TL
84}
85
20effc67
TL
86crimson::net::listen_ertr::future<>
87SocketMessenger::do_listen(const entity_addrvec_t& addrs)
9f95a23c 88{
f67539c2 89 assert(seastar::this_shard_id() == master_sid);
9f95a23c 90 ceph_assert(addrs.front().get_family() == AF_INET);
1e59de90
TL
91 set_myaddrs(addrs);
92 return seastar::futurize_invoke([this] {
9f95a23c
TL
93 if (!listener) {
94 return FixedCPUServerSocket::create().then([this] (auto _listener) {
95 listener = _listener;
96 });
97 } else {
98 return seastar::now();
99 }
20effc67 100 }).then([this] () -> listen_ertr::future<> {
f67539c2 101 const entity_addr_t listen_addr = get_myaddr();
20effc67 102 logger().debug("{} do_listen: try listen {}...", *this, listen_addr);
9f95a23c 103 if (!listener) {
20effc67
TL
104 logger().warn("{} do_listen: listener doesn't exist", *this);
105 return listen_ertr::now();
9f95a23c
TL
106 }
107 return listener->listen(listen_addr);
108 });
11fdf7f2
TL
109}
110
f67539c2 111SocketMessenger::bind_ertr::future<>
11fdf7f2
TL
112SocketMessenger::try_bind(const entity_addrvec_t& addrs,
113 uint32_t min_port, uint32_t max_port)
114{
20effc67
TL
115 // the classical OSD iterates over the addrvec and tries to listen on each
116 // addr. crimson doesn't need to follow as there is a consensus we need to
117 // worry only about proto v2.
118 assert(addrs.size() == 1);
119 auto addr = addrs.msgr2_addr();
11fdf7f2 120 if (addr.get_port() != 0) {
20effc67 121 return do_listen(addrs).safe_then([this] {
9f95a23c
TL
122 logger().info("{} try_bind: done", *this);
123 });
11fdf7f2
TL
124 }
125 ceph_assert(min_port <= max_port);
126 return seastar::do_with(uint32_t(min_port),
9f95a23c 127 [this, max_port, addr] (auto& port) {
f67539c2 128 return seastar::repeat_until_value([this, max_port, addr, &port] {
9f95a23c
TL
129 auto to_bind = addr;
130 to_bind.set_port(port);
20effc67
TL
131 return do_listen(entity_addrvec_t{to_bind}
132 ).safe_then([this] () -> seastar::future<std::optional<std::error_code>> {
9f95a23c 133 logger().info("{} try_bind: done", *this);
20effc67
TL
134 return seastar::make_ready_future<std::optional<std::error_code>>(
135 std::make_optional<std::error_code>(std::error_code{/* success! */}));
136 }, listen_ertr::all_same_way([this, max_port, &port]
137 (const std::error_code& e) mutable
138 -> seastar::future<std::optional<std::error_code>> {
139 logger().trace("{} try_bind: {} got error {}", *this, port, e);
9f95a23c 140 if (port == max_port) {
20effc67
TL
141 return seastar::make_ready_future<std::optional<std::error_code>>(
142 std::make_optional<std::error_code>(e));
9f95a23c
TL
143 }
144 ++port;
20effc67
TL
145 return seastar::make_ready_future<std::optional<std::error_code>>(
146 std::optional<std::error_code>{std::nullopt});
f67539c2 147 }));
20effc67
TL
148 }).then([] (const std::error_code e) -> bind_ertr::future<> {
149 if (!e) {
150 return bind_ertr::now(); // success!
151 } else if (e == std::errc::address_in_use) {
f67539c2 152 return crimson::ct_error::address_in_use::make();
20effc67
TL
153 } else if (e == std::errc::address_not_available) {
154 return crimson::ct_error::address_not_available::make();
f67539c2 155 }
20effc67
TL
156 ceph_abort();
157 });
158 });
159}
160
161SocketMessenger::bind_ertr::future<>
162SocketMessenger::bind(const entity_addrvec_t& addrs)
163{
164 using crimson::common::local_conf;
165 return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count},
166 [this, addrs] (auto& count) {
167 return seastar::repeat_until_value([this, addrs, &count] {
168 assert(count >= 0);
169 return try_bind(addrs,
170 local_conf()->ms_bind_port_min,
171 local_conf()->ms_bind_port_max)
172 .safe_then([this] {
173 logger().info("{} try_bind: done", *this);
174 return seastar::make_ready_future<std::optional<std::error_code>>(
175 std::make_optional<std::error_code>(std::error_code{/* success! */}));
176 }, bind_ertr::all_same_way([this, &count] (const std::error_code error) {
177 if (count-- > 0) {
178 logger().info("{} was unable to bind. Trying again in {} seconds",
179 *this, local_conf()->ms_bind_retry_delay);
180 return seastar::sleep(
181 std::chrono::seconds(local_conf()->ms_bind_retry_delay)
182 ).then([] {
183 // one more time, please
184 return seastar::make_ready_future<std::optional<std::error_code>>(
185 std::optional<std::error_code>{std::nullopt});
186 });
187 } else {
188 logger().info("{} was unable to bind after {} attempts: {}",
189 *this, local_conf()->ms_bind_retry_count, error);
190 return seastar::make_ready_future<std::optional<std::error_code>>(
191 std::make_optional<std::error_code>(error));
192 }
193 }));
194 }).then([] (const std::error_code error) -> bind_ertr::future<> {
195 if (!error) {
196 return bind_ertr::now(); // success!
197 } else if (error == std::errc::address_in_use) {
198 return crimson::ct_error::address_in_use::make();
199 } else if (error == std::errc::address_not_available) {
200 return crimson::ct_error::address_not_available::make();
201 }
202 ceph_abort();
11fdf7f2 203 });
9f95a23c 204 });
11fdf7f2
TL
205}
206
f67539c2
TL
207seastar::future<> SocketMessenger::start(
208 const dispatchers_t& _dispatchers) {
209 assert(seastar::this_shard_id() == master_sid);
11fdf7f2 210
f67539c2 211 dispatchers.assign(_dispatchers);
11fdf7f2 212 if (listener) {
9f95a23c 213 // make sure we have already bound to a valid address
20effc67 214 ceph_assert(get_myaddr().is_msgr2());
9f95a23c 215 ceph_assert(get_myaddr().get_port() > 0);
11fdf7f2 216
9f95a23c 217 return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
f67539c2 218 assert(seastar::this_shard_id() == master_sid);
20effc67
TL
219 assert(get_myaddr().is_msgr2());
220 SocketConnectionRef conn =
221 seastar::make_shared<SocketConnection>(*this, dispatchers);
9f95a23c
TL
222 conn->start_accept(std::move(socket), peer_addr);
223 return seastar::now();
224 });
225 }
11fdf7f2
TL
226 return seastar::now();
227}
228
9f95a23c 229crimson::net::ConnectionRef
f67539c2 230SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
11fdf7f2 231{
f67539c2 232 assert(seastar::this_shard_id() == master_sid);
9f95a23c
TL
233
234 // make sure we connect to a valid peer_addr
20effc67
TL
235 if (!peer_addr.is_msgr2()) {
236 ceph_abort_msg("ProtocolV1 is no longer supported");
237 }
9f95a23c
TL
238 ceph_assert(peer_addr.get_port() > 0);
239
11fdf7f2 240 if (auto found = lookup_conn(peer_addr); found) {
f67539c2 241 logger().debug("{} connect to existing", *found);
1e59de90 242 return found->get_local_shared_foreign_from_this();
11fdf7f2 243 }
20effc67
TL
244 SocketConnectionRef conn =
245 seastar::make_shared<SocketConnection>(*this, dispatchers);
f67539c2 246 conn->start_connect(peer_addr, peer_name);
1e59de90 247 return conn->get_local_shared_foreign_from_this();
11fdf7f2
TL
248}
249
9f95a23c 250seastar::future<> SocketMessenger::shutdown()
11fdf7f2 251{
f67539c2
TL
252 assert(seastar::this_shard_id() == master_sid);
253 return seastar::futurize_invoke([this] {
254 assert(dispatchers.empty());
9f95a23c
TL
255 if (listener) {
256 auto d_listener = listener;
257 listener = nullptr;
258 return d_listener->destroy();
259 } else {
260 return seastar::now();
261 }
11fdf7f2 262 // close all connections
9f95a23c
TL
263 }).then([this] {
264 return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
1e59de90 265 return conn->close_clean_yielded();
11fdf7f2 266 });
9f95a23c
TL
267 }).then([this] {
268 ceph_assert(accepting_conns.empty());
269 return seastar::parallel_for_each(connections, [] (auto conn) {
1e59de90 270 return conn.second->close_clean_yielded();
f67539c2
TL
271 });
272 }).then([this] {
273 return seastar::parallel_for_each(closing_conns, [] (auto conn) {
1e59de90 274 return conn->close_clean_yielded();
9f95a23c
TL
275 });
276 }).then([this] {
277 ceph_assert(connections.empty());
278 shutdown_promise.set_value();
279 });
11fdf7f2
TL
280}
281
20effc67
TL
282static entity_addr_t choose_addr(
283 const entity_addr_t &peer_addr_for_me,
284 const SocketConnection& conn)
285{
286 using crimson::common::local_conf;
287 // XXX: a syscall is here
288 if (const auto local_addr = conn.get_local_address();
289 local_conf()->ms_learn_addr_from_peer) {
290 logger().info("{} peer {} says I am {} (socket says {})",
291 conn, conn.get_peer_socket_addr(), peer_addr_for_me,
292 local_addr);
293 return peer_addr_for_me;
294 } else {
295 const auto local_addr_for_me = conn.get_local_address();
296 logger().info("{} socket to {} says I am {} (peer says {})",
297 conn, conn.get_peer_socket_addr(),
298 local_addr, peer_addr_for_me);
299 entity_addr_t addr;
300 addr.set_sockaddr(&local_addr_for_me.as_posix_sockaddr());
301 return addr;
302 }
303}
304
1e59de90
TL
305void SocketMessenger::learned_addr(
306 const entity_addr_t &peer_addr_for_me,
307 const SocketConnection& conn)
11fdf7f2 308{
f67539c2 309 assert(seastar::this_shard_id() == master_sid);
9f95a23c
TL
310 if (!need_addr) {
311 if ((!get_myaddr().is_any() &&
312 get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
313 get_myaddr().get_family() != peer_addr_for_me.get_family() ||
314 !get_myaddr().is_same_host(peer_addr_for_me)) {
315 logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
316 conn, peer_addr_for_me, get_myaddr());
317 throw std::system_error(
318 make_error_code(crimson::net::error::bad_peer_address));
319 }
1e59de90 320 return;
11fdf7f2 321 }
9f95a23c
TL
322
323 if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
324 // Not bound
20effc67 325 auto addr = choose_addr(peer_addr_for_me, conn);
9f95a23c
TL
326 addr.set_type(entity_addr_t::TYPE_ANY);
327 addr.set_port(0);
f67539c2 328 need_addr = false;
1e59de90
TL
329 set_myaddrs(entity_addrvec_t{addr});
330 logger().info("{} learned myaddr={} (unbound)", conn, get_myaddr());
9f95a23c
TL
331 } else {
332 // Already bound
333 if (!get_myaddr().is_any() &&
334 get_myaddr().get_type() != peer_addr_for_me.get_type()) {
335 logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
336 conn, peer_addr_for_me, get_myaddr());
337 throw std::system_error(
338 make_error_code(crimson::net::error::bad_peer_address));
339 }
340 if (get_myaddr().get_family() != peer_addr_for_me.get_family()) {
341 logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
342 conn, peer_addr_for_me, get_myaddr());
343 throw std::system_error(
344 make_error_code(crimson::net::error::bad_peer_address));
345 }
346 if (get_myaddr().is_blank_ip()) {
20effc67 347 auto addr = choose_addr(peer_addr_for_me, conn);
9f95a23c
TL
348 addr.set_type(get_myaddr().get_type());
349 addr.set_port(get_myaddr().get_port());
f67539c2 350 need_addr = false;
1e59de90
TL
351 set_myaddrs(entity_addrvec_t{addr});
352 logger().info("{} learned myaddr={} (blank IP)", conn, get_myaddr());
9f95a23c
TL
353 } else if (!get_myaddr().is_same_host(peer_addr_for_me)) {
354 logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
355 conn, peer_addr_for_me, get_myaddr());
356 throw std::system_error(
357 make_error_code(crimson::net::error::bad_peer_address));
358 } else {
f67539c2 359 need_addr = false;
9f95a23c
TL
360 }
361 }
362}
363
364SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
365{
366 return policy_set.get(peer_type);
367}
11fdf7f2 368
9f95a23c
TL
369SocketPolicy SocketMessenger::get_default_policy() const
370{
371 return policy_set.get_default();
11fdf7f2
TL
372}
373
374void SocketMessenger::set_default_policy(const SocketPolicy& p)
375{
376 policy_set.set_default(p);
377}
378
379void SocketMessenger::set_policy(entity_type_t peer_type,
380 const SocketPolicy& p)
381{
382 policy_set.set(peer_type, p);
383}
384
385void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
386 Throttle* throttle)
387{
388 // only byte throttler is used in OSD
389 policy_set.set_throttlers(peer_type, throttle, nullptr);
390}
391
9f95a23c 392crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
11fdf7f2
TL
393{
394 if (auto found = connections.find(addr);
395 found != connections.end()) {
396 return found->second;
397 } else {
398 return nullptr;
399 }
400}
401
402void SocketMessenger::accept_conn(SocketConnectionRef conn)
403{
404 accepting_conns.insert(conn);
405}
406
407void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
408{
409 accepting_conns.erase(conn);
410}
411
412void SocketMessenger::register_conn(SocketConnectionRef conn)
413{
11fdf7f2
TL
414 auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
415 std::ignore = i;
416 ceph_assert(added);
417}
418
419void SocketMessenger::unregister_conn(SocketConnectionRef conn)
420{
421 ceph_assert(conn);
422 auto found = connections.find(conn->get_peer_addr());
423 ceph_assert(found != connections.end());
424 ceph_assert(found->second == conn);
425 connections.erase(found);
426}
9f95a23c 427
f67539c2
TL
428void SocketMessenger::closing_conn(SocketConnectionRef conn)
429{
430 closing_conns.push_back(conn);
431}
432
433void SocketMessenger::closed_conn(SocketConnectionRef conn)
434{
435 for (auto it = closing_conns.begin();
436 it != closing_conns.end();) {
437 if (*it == conn) {
438 it = closing_conns.erase(it);
439 } else {
440 it++;
441 }
442 }
443}
444
1e59de90 445uint32_t SocketMessenger::get_global_seq(uint32_t old)
9f95a23c
TL
446{
447 if (old > global_seq) {
448 global_seq = old;
449 }
1e59de90 450 return ++global_seq;
9f95a23c
TL
451}
452
453} // namespace crimson::net