]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/SocketMessenger.cc
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / crimson / net / SocketMessenger.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2017 Red Hat, Inc
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "SocketMessenger.h"
16
20effc67
TL
17#include <seastar/core/sleep.hh>
18
11fdf7f2
TL
19#include <tuple>
20#include <boost/functional/hash.hpp>
1e59de90 21#include <fmt/os.h>
11fdf7f2
TL
22
23#include "auth/Auth.h"
24#include "Errors.h"
11fdf7f2
TL
25#include "Socket.h"
26
11fdf7f2
TL
27namespace {
28 seastar::logger& logger() {
9f95a23c 29 return crimson::get_logger(ceph_subsys_ms);
11fdf7f2
TL
30 }
31}
32
9f95a23c
TL
33namespace crimson::net {
34
11fdf7f2
TL
35SocketMessenger::SocketMessenger(const entity_name_t& myname,
36 const std::string& logic_name,
aee94f69
TL
37 uint32_t nonce,
38 bool dispatch_only_on_this_shard)
39 : sid{seastar::this_shard_id()},
11fdf7f2 40 logic_name{logic_name},
1e59de90 41 nonce{nonce},
aee94f69 42 dispatch_only_on_sid{dispatch_only_on_this_shard},
1e59de90 43 my_name{myname}
11fdf7f2
TL
44{}
45
20effc67
TL
46SocketMessenger::~SocketMessenger()
47{
1e59de90 48 logger().debug("~SocketMessenger: {}", logic_name);
aee94f69 49 ceph_assert_always(seastar::this_shard_id() == sid);
20effc67
TL
50 ceph_assert(!listener);
51}
52
53bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
54{
aee94f69 55 assert(seastar::this_shard_id() == sid);
20effc67
TL
56 bool ret = false;
57
58 entity_addrvec_t newaddrs = my_addrs;
59 for (auto& a : newaddrs.v) {
60 if (a.is_blank_ip()) {
61 int type = a.get_type();
62 int port = a.get_port();
63 uint32_t nonce = a.get_nonce();
64 for (auto& b : addrs.v) {
65 if (a.get_family() == b.get_family()) {
66 logger().debug(" assuming my addr {} matches provided addr {}", a, b);
67 a = b;
68 a.set_nonce(nonce);
69 a.set_type(type);
70 a.set_port(port);
71 ret = true;
72 break;
73 }
74 }
75 }
76 }
77 my_addrs = newaddrs;
78 return ret;
79}
80
1e59de90 81void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
11fdf7f2 82{
aee94f69 83 assert(seastar::this_shard_id() == sid);
1e59de90 84 my_addrs = addrs;
11fdf7f2
TL
85 for (auto& addr : my_addrs.v) {
86 addr.nonce = nonce;
87 }
9f95a23c
TL
88}
89
20effc67
TL
90crimson::net::listen_ertr::future<>
91SocketMessenger::do_listen(const entity_addrvec_t& addrs)
9f95a23c 92{
9f95a23c 93 ceph_assert(addrs.front().get_family() == AF_INET);
1e59de90
TL
94 set_myaddrs(addrs);
95 return seastar::futurize_invoke([this] {
9f95a23c 96 if (!listener) {
aee94f69
TL
97 return ShardedServerSocket::create(dispatch_only_on_sid
98 ).then([this] (auto _listener) {
9f95a23c
TL
99 listener = _listener;
100 });
101 } else {
102 return seastar::now();
103 }
20effc67 104 }).then([this] () -> listen_ertr::future<> {
f67539c2 105 const entity_addr_t listen_addr = get_myaddr();
20effc67 106 logger().debug("{} do_listen: try listen {}...", *this, listen_addr);
9f95a23c 107 if (!listener) {
20effc67
TL
108 logger().warn("{} do_listen: listener doesn't exist", *this);
109 return listen_ertr::now();
9f95a23c
TL
110 }
111 return listener->listen(listen_addr);
112 });
11fdf7f2
TL
113}
114
f67539c2 115SocketMessenger::bind_ertr::future<>
11fdf7f2
TL
116SocketMessenger::try_bind(const entity_addrvec_t& addrs,
117 uint32_t min_port, uint32_t max_port)
118{
20effc67
TL
119 // the classical OSD iterates over the addrvec and tries to listen on each
120 // addr. crimson doesn't need to follow as there is a consensus we need to
121 // worry only about proto v2.
122 assert(addrs.size() == 1);
123 auto addr = addrs.msgr2_addr();
11fdf7f2 124 if (addr.get_port() != 0) {
20effc67 125 return do_listen(addrs).safe_then([this] {
9f95a23c
TL
126 logger().info("{} try_bind: done", *this);
127 });
11fdf7f2
TL
128 }
129 ceph_assert(min_port <= max_port);
130 return seastar::do_with(uint32_t(min_port),
9f95a23c 131 [this, max_port, addr] (auto& port) {
f67539c2 132 return seastar::repeat_until_value([this, max_port, addr, &port] {
9f95a23c
TL
133 auto to_bind = addr;
134 to_bind.set_port(port);
20effc67
TL
135 return do_listen(entity_addrvec_t{to_bind}
136 ).safe_then([this] () -> seastar::future<std::optional<std::error_code>> {
9f95a23c 137 logger().info("{} try_bind: done", *this);
20effc67
TL
138 return seastar::make_ready_future<std::optional<std::error_code>>(
139 std::make_optional<std::error_code>(std::error_code{/* success! */}));
140 }, listen_ertr::all_same_way([this, max_port, &port]
141 (const std::error_code& e) mutable
142 -> seastar::future<std::optional<std::error_code>> {
143 logger().trace("{} try_bind: {} got error {}", *this, port, e);
9f95a23c 144 if (port == max_port) {
20effc67
TL
145 return seastar::make_ready_future<std::optional<std::error_code>>(
146 std::make_optional<std::error_code>(e));
9f95a23c
TL
147 }
148 ++port;
20effc67
TL
149 return seastar::make_ready_future<std::optional<std::error_code>>(
150 std::optional<std::error_code>{std::nullopt});
f67539c2 151 }));
20effc67
TL
152 }).then([] (const std::error_code e) -> bind_ertr::future<> {
153 if (!e) {
154 return bind_ertr::now(); // success!
155 } else if (e == std::errc::address_in_use) {
f67539c2 156 return crimson::ct_error::address_in_use::make();
20effc67
TL
157 } else if (e == std::errc::address_not_available) {
158 return crimson::ct_error::address_not_available::make();
f67539c2 159 }
20effc67
TL
160 ceph_abort();
161 });
162 });
163}
164
165SocketMessenger::bind_ertr::future<>
166SocketMessenger::bind(const entity_addrvec_t& addrs)
167{
aee94f69 168 assert(seastar::this_shard_id() == sid);
20effc67
TL
169 using crimson::common::local_conf;
170 return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count},
171 [this, addrs] (auto& count) {
172 return seastar::repeat_until_value([this, addrs, &count] {
173 assert(count >= 0);
174 return try_bind(addrs,
175 local_conf()->ms_bind_port_min,
176 local_conf()->ms_bind_port_max)
177 .safe_then([this] {
178 logger().info("{} try_bind: done", *this);
179 return seastar::make_ready_future<std::optional<std::error_code>>(
180 std::make_optional<std::error_code>(std::error_code{/* success! */}));
181 }, bind_ertr::all_same_way([this, &count] (const std::error_code error) {
182 if (count-- > 0) {
183 logger().info("{} was unable to bind. Trying again in {} seconds",
184 *this, local_conf()->ms_bind_retry_delay);
185 return seastar::sleep(
186 std::chrono::seconds(local_conf()->ms_bind_retry_delay)
187 ).then([] {
188 // one more time, please
189 return seastar::make_ready_future<std::optional<std::error_code>>(
190 std::optional<std::error_code>{std::nullopt});
191 });
192 } else {
193 logger().info("{} was unable to bind after {} attempts: {}",
194 *this, local_conf()->ms_bind_retry_count, error);
195 return seastar::make_ready_future<std::optional<std::error_code>>(
196 std::make_optional<std::error_code>(error));
197 }
198 }));
199 }).then([] (const std::error_code error) -> bind_ertr::future<> {
200 if (!error) {
201 return bind_ertr::now(); // success!
202 } else if (error == std::errc::address_in_use) {
203 return crimson::ct_error::address_in_use::make();
204 } else if (error == std::errc::address_not_available) {
205 return crimson::ct_error::address_not_available::make();
206 }
207 ceph_abort();
11fdf7f2 208 });
9f95a23c 209 });
11fdf7f2
TL
210}
211
aee94f69
TL
212seastar::future<> SocketMessenger::accept(
213 SocketFRef &&socket, const entity_addr_t &peer_addr)
214{
215 assert(seastar::this_shard_id() == sid);
216 SocketConnectionRef conn =
217 seastar::make_shared<SocketConnection>(*this, dispatchers);
218 conn->start_accept(std::move(socket), peer_addr);
219 return seastar::now();
220}
221
f67539c2
TL
222seastar::future<> SocketMessenger::start(
223 const dispatchers_t& _dispatchers) {
aee94f69 224 assert(seastar::this_shard_id() == sid);
11fdf7f2 225
f67539c2 226 dispatchers.assign(_dispatchers);
11fdf7f2 227 if (listener) {
9f95a23c 228 // make sure we have already bound to a valid address
20effc67 229 ceph_assert(get_myaddr().is_msgr2());
9f95a23c 230 ceph_assert(get_myaddr().get_port() > 0);
11fdf7f2 231
aee94f69 232 return listener->accept([this](SocketRef _socket, entity_addr_t peer_addr) {
20effc67 233 assert(get_myaddr().is_msgr2());
aee94f69
TL
234 SocketFRef socket = seastar::make_foreign(std::move(_socket));
235 if (listener->is_fixed_shard_dispatching()) {
236 return accept(std::move(socket), peer_addr);
237 } else {
238 return seastar::smp::submit_to(sid,
239 [this, peer_addr, socket = std::move(socket)]() mutable {
240 return accept(std::move(socket), peer_addr);
241 });
242 }
9f95a23c
TL
243 });
244 }
11fdf7f2
TL
245 return seastar::now();
246}
247
9f95a23c 248crimson::net::ConnectionRef
f67539c2 249SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
11fdf7f2 250{
aee94f69 251 assert(seastar::this_shard_id() == sid);
9f95a23c
TL
252
253 // make sure we connect to a valid peer_addr
20effc67
TL
254 if (!peer_addr.is_msgr2()) {
255 ceph_abort_msg("ProtocolV1 is no longer supported");
256 }
9f95a23c
TL
257 ceph_assert(peer_addr.get_port() > 0);
258
11fdf7f2 259 if (auto found = lookup_conn(peer_addr); found) {
f67539c2 260 logger().debug("{} connect to existing", *found);
1e59de90 261 return found->get_local_shared_foreign_from_this();
11fdf7f2 262 }
20effc67
TL
263 SocketConnectionRef conn =
264 seastar::make_shared<SocketConnection>(*this, dispatchers);
f67539c2 265 conn->start_connect(peer_addr, peer_name);
1e59de90 266 return conn->get_local_shared_foreign_from_this();
11fdf7f2
TL
267}
268
9f95a23c 269seastar::future<> SocketMessenger::shutdown()
11fdf7f2 270{
aee94f69 271 assert(seastar::this_shard_id() == sid);
f67539c2
TL
272 return seastar::futurize_invoke([this] {
273 assert(dispatchers.empty());
9f95a23c
TL
274 if (listener) {
275 auto d_listener = listener;
276 listener = nullptr;
aee94f69 277 return d_listener->shutdown_destroy();
9f95a23c
TL
278 } else {
279 return seastar::now();
280 }
11fdf7f2 281 // close all connections
9f95a23c
TL
282 }).then([this] {
283 return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
1e59de90 284 return conn->close_clean_yielded();
11fdf7f2 285 });
9f95a23c
TL
286 }).then([this] {
287 ceph_assert(accepting_conns.empty());
288 return seastar::parallel_for_each(connections, [] (auto conn) {
1e59de90 289 return conn.second->close_clean_yielded();
f67539c2
TL
290 });
291 }).then([this] {
292 return seastar::parallel_for_each(closing_conns, [] (auto conn) {
1e59de90 293 return conn->close_clean_yielded();
9f95a23c
TL
294 });
295 }).then([this] {
296 ceph_assert(connections.empty());
297 shutdown_promise.set_value();
298 });
11fdf7f2
TL
299}
300
20effc67
TL
301static entity_addr_t choose_addr(
302 const entity_addr_t &peer_addr_for_me,
303 const SocketConnection& conn)
304{
305 using crimson::common::local_conf;
306 // XXX: a syscall is here
307 if (const auto local_addr = conn.get_local_address();
308 local_conf()->ms_learn_addr_from_peer) {
309 logger().info("{} peer {} says I am {} (socket says {})",
310 conn, conn.get_peer_socket_addr(), peer_addr_for_me,
311 local_addr);
312 return peer_addr_for_me;
313 } else {
314 const auto local_addr_for_me = conn.get_local_address();
315 logger().info("{} socket to {} says I am {} (peer says {})",
316 conn, conn.get_peer_socket_addr(),
317 local_addr, peer_addr_for_me);
318 entity_addr_t addr;
319 addr.set_sockaddr(&local_addr_for_me.as_posix_sockaddr());
320 return addr;
321 }
322}
323
1e59de90
TL
324void SocketMessenger::learned_addr(
325 const entity_addr_t &peer_addr_for_me,
326 const SocketConnection& conn)
11fdf7f2 327{
aee94f69 328 assert(seastar::this_shard_id() == sid);
9f95a23c
TL
329 if (!need_addr) {
330 if ((!get_myaddr().is_any() &&
331 get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
332 get_myaddr().get_family() != peer_addr_for_me.get_family() ||
333 !get_myaddr().is_same_host(peer_addr_for_me)) {
334 logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
335 conn, peer_addr_for_me, get_myaddr());
336 throw std::system_error(
337 make_error_code(crimson::net::error::bad_peer_address));
338 }
1e59de90 339 return;
11fdf7f2 340 }
9f95a23c
TL
341
342 if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
343 // Not bound
20effc67 344 auto addr = choose_addr(peer_addr_for_me, conn);
9f95a23c
TL
345 addr.set_type(entity_addr_t::TYPE_ANY);
346 addr.set_port(0);
f67539c2 347 need_addr = false;
1e59de90
TL
348 set_myaddrs(entity_addrvec_t{addr});
349 logger().info("{} learned myaddr={} (unbound)", conn, get_myaddr());
9f95a23c
TL
350 } else {
351 // Already bound
352 if (!get_myaddr().is_any() &&
353 get_myaddr().get_type() != peer_addr_for_me.get_type()) {
354 logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
355 conn, peer_addr_for_me, get_myaddr());
356 throw std::system_error(
357 make_error_code(crimson::net::error::bad_peer_address));
358 }
359 if (get_myaddr().get_family() != peer_addr_for_me.get_family()) {
360 logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
361 conn, peer_addr_for_me, get_myaddr());
362 throw std::system_error(
363 make_error_code(crimson::net::error::bad_peer_address));
364 }
365 if (get_myaddr().is_blank_ip()) {
20effc67 366 auto addr = choose_addr(peer_addr_for_me, conn);
9f95a23c
TL
367 addr.set_type(get_myaddr().get_type());
368 addr.set_port(get_myaddr().get_port());
f67539c2 369 need_addr = false;
1e59de90
TL
370 set_myaddrs(entity_addrvec_t{addr});
371 logger().info("{} learned myaddr={} (blank IP)", conn, get_myaddr());
9f95a23c
TL
372 } else if (!get_myaddr().is_same_host(peer_addr_for_me)) {
373 logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
374 conn, peer_addr_for_me, get_myaddr());
375 throw std::system_error(
376 make_error_code(crimson::net::error::bad_peer_address));
377 } else {
f67539c2 378 need_addr = false;
9f95a23c
TL
379 }
380 }
381}
382
383SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
384{
aee94f69 385 assert(seastar::this_shard_id() == sid);
9f95a23c
TL
386 return policy_set.get(peer_type);
387}
11fdf7f2 388
9f95a23c
TL
389SocketPolicy SocketMessenger::get_default_policy() const
390{
aee94f69 391 assert(seastar::this_shard_id() == sid);
9f95a23c 392 return policy_set.get_default();
11fdf7f2
TL
393}
394
395void SocketMessenger::set_default_policy(const SocketPolicy& p)
396{
aee94f69 397 assert(seastar::this_shard_id() == sid);
11fdf7f2
TL
398 policy_set.set_default(p);
399}
400
401void SocketMessenger::set_policy(entity_type_t peer_type,
402 const SocketPolicy& p)
403{
aee94f69 404 assert(seastar::this_shard_id() == sid);
11fdf7f2
TL
405 policy_set.set(peer_type, p);
406}
407
408void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
409 Throttle* throttle)
410{
aee94f69 411 assert(seastar::this_shard_id() == sid);
11fdf7f2
TL
412 // only byte throttler is used in OSD
413 policy_set.set_throttlers(peer_type, throttle, nullptr);
414}
415
9f95a23c 416crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
11fdf7f2 417{
aee94f69 418 assert(seastar::this_shard_id() == sid);
11fdf7f2
TL
419 if (auto found = connections.find(addr);
420 found != connections.end()) {
421 return found->second;
422 } else {
423 return nullptr;
424 }
425}
426
427void SocketMessenger::accept_conn(SocketConnectionRef conn)
428{
aee94f69 429 assert(seastar::this_shard_id() == sid);
11fdf7f2
TL
430 accepting_conns.insert(conn);
431}
432
433void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
434{
aee94f69 435 assert(seastar::this_shard_id() == sid);
11fdf7f2
TL
436 accepting_conns.erase(conn);
437}
438
439void SocketMessenger::register_conn(SocketConnectionRef conn)
440{
aee94f69 441 assert(seastar::this_shard_id() == sid);
11fdf7f2
TL
442 auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
443 std::ignore = i;
444 ceph_assert(added);
445}
446
447void SocketMessenger::unregister_conn(SocketConnectionRef conn)
448{
aee94f69 449 assert(seastar::this_shard_id() == sid);
11fdf7f2
TL
450 ceph_assert(conn);
451 auto found = connections.find(conn->get_peer_addr());
452 ceph_assert(found != connections.end());
453 ceph_assert(found->second == conn);
454 connections.erase(found);
455}
9f95a23c 456
f67539c2
TL
457void SocketMessenger::closing_conn(SocketConnectionRef conn)
458{
aee94f69 459 assert(seastar::this_shard_id() == sid);
f67539c2
TL
460 closing_conns.push_back(conn);
461}
462
463void SocketMessenger::closed_conn(SocketConnectionRef conn)
464{
aee94f69 465 assert(seastar::this_shard_id() == sid);
f67539c2
TL
466 for (auto it = closing_conns.begin();
467 it != closing_conns.end();) {
468 if (*it == conn) {
469 it = closing_conns.erase(it);
470 } else {
471 it++;
472 }
473 }
474}
475
1e59de90 476uint32_t SocketMessenger::get_global_seq(uint32_t old)
9f95a23c 477{
aee94f69 478 assert(seastar::this_shard_id() == sid);
9f95a23c
TL
479 if (old > global_seq) {
480 global_seq = old;
481 }
1e59de90 482 return ++global_seq;
9f95a23c
TL
483}
484
485} // namespace crimson::net