]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/SocketMessenger.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / crimson / net / SocketMessenger.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2017 Red Hat, Inc
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "SocketMessenger.h"
16
20effc67
TL
17#include <seastar/core/sleep.hh>
18
11fdf7f2
TL
19#include <tuple>
20#include <boost/functional/hash.hpp>
21
22#include "auth/Auth.h"
23#include "Errors.h"
11fdf7f2
TL
24#include "Socket.h"
25
11fdf7f2
TL
26namespace {
27 seastar::logger& logger() {
9f95a23c 28 return crimson::get_logger(ceph_subsys_ms);
11fdf7f2
TL
29 }
30}
31
9f95a23c
TL
32namespace crimson::net {
33
11fdf7f2
TL
34SocketMessenger::SocketMessenger(const entity_name_t& myname,
35 const std::string& logic_name,
9f95a23c 36 uint32_t nonce)
11fdf7f2 37 : Messenger{myname},
f67539c2 38 master_sid{seastar::this_shard_id()},
11fdf7f2
TL
39 logic_name{logic_name},
40 nonce{nonce}
41{}
42
20effc67
TL
43SocketMessenger::~SocketMessenger()
44{
45 ceph_assert(!listener);
46}
47
48bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t &addrs)
49{
50 bool ret = false;
51
52 entity_addrvec_t newaddrs = my_addrs;
53 for (auto& a : newaddrs.v) {
54 if (a.is_blank_ip()) {
55 int type = a.get_type();
56 int port = a.get_port();
57 uint32_t nonce = a.get_nonce();
58 for (auto& b : addrs.v) {
59 if (a.get_family() == b.get_family()) {
60 logger().debug(" assuming my addr {} matches provided addr {}", a, b);
61 a = b;
62 a.set_nonce(nonce);
63 a.set_type(type);
64 a.set_port(port);
65 ret = true;
66 break;
67 }
68 }
69 }
70 }
71 my_addrs = newaddrs;
72 return ret;
73}
74
11fdf7f2
TL
75seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
76{
f67539c2 77 assert(seastar::this_shard_id() == master_sid);
11fdf7f2
TL
78 auto my_addrs = addrs;
79 for (auto& addr : my_addrs.v) {
80 addr.nonce = nonce;
81 }
9f95a23c
TL
82 return Messenger::set_myaddrs(my_addrs);
83}
84
20effc67
TL
85crimson::net::listen_ertr::future<>
86SocketMessenger::do_listen(const entity_addrvec_t& addrs)
9f95a23c 87{
f67539c2 88 assert(seastar::this_shard_id() == master_sid);
9f95a23c
TL
89 ceph_assert(addrs.front().get_family() == AF_INET);
90 return set_myaddrs(addrs).then([this] {
91 if (!listener) {
92 return FixedCPUServerSocket::create().then([this] (auto _listener) {
93 listener = _listener;
94 });
95 } else {
96 return seastar::now();
97 }
20effc67 98 }).then([this] () -> listen_ertr::future<> {
f67539c2 99 const entity_addr_t listen_addr = get_myaddr();
20effc67 100 logger().debug("{} do_listen: try listen {}...", *this, listen_addr);
9f95a23c 101 if (!listener) {
20effc67
TL
102 logger().warn("{} do_listen: listener doesn't exist", *this);
103 return listen_ertr::now();
9f95a23c
TL
104 }
105 return listener->listen(listen_addr);
106 });
11fdf7f2
TL
107}
108
f67539c2 109SocketMessenger::bind_ertr::future<>
11fdf7f2
TL
110SocketMessenger::try_bind(const entity_addrvec_t& addrs,
111 uint32_t min_port, uint32_t max_port)
112{
20effc67
TL
113 // the classical OSD iterates over the addrvec and tries to listen on each
114 // addr. crimson doesn't need to follow as there is a consensus we need to
115 // worry only about proto v2.
116 assert(addrs.size() == 1);
117 auto addr = addrs.msgr2_addr();
11fdf7f2 118 if (addr.get_port() != 0) {
20effc67 119 return do_listen(addrs).safe_then([this] {
9f95a23c
TL
120 logger().info("{} try_bind: done", *this);
121 });
11fdf7f2
TL
122 }
123 ceph_assert(min_port <= max_port);
124 return seastar::do_with(uint32_t(min_port),
9f95a23c 125 [this, max_port, addr] (auto& port) {
f67539c2 126 return seastar::repeat_until_value([this, max_port, addr, &port] {
9f95a23c
TL
127 auto to_bind = addr;
128 to_bind.set_port(port);
20effc67
TL
129 return do_listen(entity_addrvec_t{to_bind}
130 ).safe_then([this] () -> seastar::future<std::optional<std::error_code>> {
9f95a23c 131 logger().info("{} try_bind: done", *this);
20effc67
TL
132 return seastar::make_ready_future<std::optional<std::error_code>>(
133 std::make_optional<std::error_code>(std::error_code{/* success! */}));
134 }, listen_ertr::all_same_way([this, max_port, &port]
135 (const std::error_code& e) mutable
136 -> seastar::future<std::optional<std::error_code>> {
137 logger().trace("{} try_bind: {} got error {}", *this, port, e);
9f95a23c 138 if (port == max_port) {
20effc67
TL
139 return seastar::make_ready_future<std::optional<std::error_code>>(
140 std::make_optional<std::error_code>(e));
9f95a23c
TL
141 }
142 ++port;
20effc67
TL
143 return seastar::make_ready_future<std::optional<std::error_code>>(
144 std::optional<std::error_code>{std::nullopt});
f67539c2 145 }));
20effc67
TL
146 }).then([] (const std::error_code e) -> bind_ertr::future<> {
147 if (!e) {
148 return bind_ertr::now(); // success!
149 } else if (e == std::errc::address_in_use) {
f67539c2 150 return crimson::ct_error::address_in_use::make();
20effc67
TL
151 } else if (e == std::errc::address_not_available) {
152 return crimson::ct_error::address_not_available::make();
f67539c2 153 }
20effc67
TL
154 ceph_abort();
155 });
156 });
157}
158
159SocketMessenger::bind_ertr::future<>
160SocketMessenger::bind(const entity_addrvec_t& addrs)
161{
162 using crimson::common::local_conf;
163 return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count},
164 [this, addrs] (auto& count) {
165 return seastar::repeat_until_value([this, addrs, &count] {
166 assert(count >= 0);
167 return try_bind(addrs,
168 local_conf()->ms_bind_port_min,
169 local_conf()->ms_bind_port_max)
170 .safe_then([this] {
171 logger().info("{} try_bind: done", *this);
172 return seastar::make_ready_future<std::optional<std::error_code>>(
173 std::make_optional<std::error_code>(std::error_code{/* success! */}));
174 }, bind_ertr::all_same_way([this, &count] (const std::error_code error) {
175 if (count-- > 0) {
176 logger().info("{} was unable to bind. Trying again in {} seconds",
177 *this, local_conf()->ms_bind_retry_delay);
178 return seastar::sleep(
179 std::chrono::seconds(local_conf()->ms_bind_retry_delay)
180 ).then([] {
181 // one more time, please
182 return seastar::make_ready_future<std::optional<std::error_code>>(
183 std::optional<std::error_code>{std::nullopt});
184 });
185 } else {
186 logger().info("{} was unable to bind after {} attempts: {}",
187 *this, local_conf()->ms_bind_retry_count, error);
188 return seastar::make_ready_future<std::optional<std::error_code>>(
189 std::make_optional<std::error_code>(error));
190 }
191 }));
192 }).then([] (const std::error_code error) -> bind_ertr::future<> {
193 if (!error) {
194 return bind_ertr::now(); // success!
195 } else if (error == std::errc::address_in_use) {
196 return crimson::ct_error::address_in_use::make();
197 } else if (error == std::errc::address_not_available) {
198 return crimson::ct_error::address_not_available::make();
199 }
200 ceph_abort();
11fdf7f2 201 });
9f95a23c 202 });
11fdf7f2
TL
203}
204
f67539c2
TL
205seastar::future<> SocketMessenger::start(
206 const dispatchers_t& _dispatchers) {
207 assert(seastar::this_shard_id() == master_sid);
11fdf7f2 208
f67539c2 209 dispatchers.assign(_dispatchers);
11fdf7f2 210 if (listener) {
9f95a23c 211 // make sure we have already bound to a valid address
20effc67 212 ceph_assert(get_myaddr().is_msgr2());
9f95a23c 213 ceph_assert(get_myaddr().get_port() > 0);
11fdf7f2 214
9f95a23c 215 return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
f67539c2 216 assert(seastar::this_shard_id() == master_sid);
20effc67
TL
217 assert(get_myaddr().is_msgr2());
218 SocketConnectionRef conn =
219 seastar::make_shared<SocketConnection>(*this, dispatchers);
9f95a23c
TL
220 conn->start_accept(std::move(socket), peer_addr);
221 return seastar::now();
222 });
223 }
11fdf7f2
TL
224 return seastar::now();
225}
226
9f95a23c 227crimson::net::ConnectionRef
f67539c2 228SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_name_t& peer_name)
11fdf7f2 229{
f67539c2 230 assert(seastar::this_shard_id() == master_sid);
9f95a23c
TL
231
232 // make sure we connect to a valid peer_addr
20effc67
TL
233 if (!peer_addr.is_msgr2()) {
234 ceph_abort_msg("ProtocolV1 is no longer supported");
235 }
9f95a23c
TL
236 ceph_assert(peer_addr.get_port() > 0);
237
11fdf7f2 238 if (auto found = lookup_conn(peer_addr); found) {
f67539c2 239 logger().debug("{} connect to existing", *found);
9f95a23c 240 return found->shared_from_this();
11fdf7f2 241 }
20effc67
TL
242 SocketConnectionRef conn =
243 seastar::make_shared<SocketConnection>(*this, dispatchers);
f67539c2 244 conn->start_connect(peer_addr, peer_name);
9f95a23c 245 return conn->shared_from_this();
11fdf7f2
TL
246}
247
9f95a23c 248seastar::future<> SocketMessenger::shutdown()
11fdf7f2 249{
f67539c2
TL
250 assert(seastar::this_shard_id() == master_sid);
251 return seastar::futurize_invoke([this] {
252 assert(dispatchers.empty());
9f95a23c
TL
253 if (listener) {
254 auto d_listener = listener;
255 listener = nullptr;
256 return d_listener->destroy();
257 } else {
258 return seastar::now();
259 }
11fdf7f2 260 // close all connections
9f95a23c
TL
261 }).then([this] {
262 return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
f67539c2 263 return conn->close_clean(false);
11fdf7f2 264 });
9f95a23c
TL
265 }).then([this] {
266 ceph_assert(accepting_conns.empty());
267 return seastar::parallel_for_each(connections, [] (auto conn) {
f67539c2
TL
268 return conn.second->close_clean(false);
269 });
270 }).then([this] {
271 return seastar::parallel_for_each(closing_conns, [] (auto conn) {
272 return conn->close_clean(false);
9f95a23c
TL
273 });
274 }).then([this] {
275 ceph_assert(connections.empty());
276 shutdown_promise.set_value();
277 });
11fdf7f2
TL
278}
279
20effc67
TL
280static entity_addr_t choose_addr(
281 const entity_addr_t &peer_addr_for_me,
282 const SocketConnection& conn)
283{
284 using crimson::common::local_conf;
285 // XXX: a syscall is here
286 if (const auto local_addr = conn.get_local_address();
287 local_conf()->ms_learn_addr_from_peer) {
288 logger().info("{} peer {} says I am {} (socket says {})",
289 conn, conn.get_peer_socket_addr(), peer_addr_for_me,
290 local_addr);
291 return peer_addr_for_me;
292 } else {
293 const auto local_addr_for_me = conn.get_local_address();
294 logger().info("{} socket to {} says I am {} (peer says {})",
295 conn, conn.get_peer_socket_addr(),
296 local_addr, peer_addr_for_me);
297 entity_addr_t addr;
298 addr.set_sockaddr(&local_addr_for_me.as_posix_sockaddr());
299 return addr;
300 }
301}
302
9f95a23c 303seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
11fdf7f2 304{
f67539c2 305 assert(seastar::this_shard_id() == master_sid);
9f95a23c
TL
306 if (!need_addr) {
307 if ((!get_myaddr().is_any() &&
308 get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
309 get_myaddr().get_family() != peer_addr_for_me.get_family() ||
310 !get_myaddr().is_same_host(peer_addr_for_me)) {
311 logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
312 conn, peer_addr_for_me, get_myaddr());
313 throw std::system_error(
314 make_error_code(crimson::net::error::bad_peer_address));
315 }
11fdf7f2
TL
316 return seastar::now();
317 }
9f95a23c
TL
318
319 if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
320 // Not bound
20effc67 321 auto addr = choose_addr(peer_addr_for_me, conn);
9f95a23c
TL
322 addr.set_type(entity_addr_t::TYPE_ANY);
323 addr.set_port(0);
f67539c2 324 need_addr = false;
9f95a23c 325 return set_myaddrs(entity_addrvec_t{addr}
20effc67
TL
326 ).then([this, &conn] {
327 logger().info("{} learned myaddr={} (unbound)", conn, get_myaddr());
9f95a23c
TL
328 });
329 } else {
330 // Already bound
331 if (!get_myaddr().is_any() &&
332 get_myaddr().get_type() != peer_addr_for_me.get_type()) {
333 logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
334 conn, peer_addr_for_me, get_myaddr());
335 throw std::system_error(
336 make_error_code(crimson::net::error::bad_peer_address));
337 }
338 if (get_myaddr().get_family() != peer_addr_for_me.get_family()) {
339 logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
340 conn, peer_addr_for_me, get_myaddr());
341 throw std::system_error(
342 make_error_code(crimson::net::error::bad_peer_address));
343 }
344 if (get_myaddr().is_blank_ip()) {
20effc67 345 auto addr = choose_addr(peer_addr_for_me, conn);
9f95a23c
TL
346 addr.set_type(get_myaddr().get_type());
347 addr.set_port(get_myaddr().get_port());
f67539c2 348 need_addr = false;
9f95a23c 349 return set_myaddrs(entity_addrvec_t{addr}
20effc67
TL
350 ).then([this, &conn] {
351 logger().info("{} learned myaddr={} (blank IP)", conn, get_myaddr());
9f95a23c
TL
352 });
353 } else if (!get_myaddr().is_same_host(peer_addr_for_me)) {
354 logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
355 conn, peer_addr_for_me, get_myaddr());
356 throw std::system_error(
357 make_error_code(crimson::net::error::bad_peer_address));
358 } else {
f67539c2 359 need_addr = false;
9f95a23c
TL
360 return seastar::now();
361 }
362 }
363}
364
365SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
366{
367 return policy_set.get(peer_type);
368}
11fdf7f2 369
9f95a23c
TL
370SocketPolicy SocketMessenger::get_default_policy() const
371{
372 return policy_set.get_default();
11fdf7f2
TL
373}
374
375void SocketMessenger::set_default_policy(const SocketPolicy& p)
376{
377 policy_set.set_default(p);
378}
379
380void SocketMessenger::set_policy(entity_type_t peer_type,
381 const SocketPolicy& p)
382{
383 policy_set.set(peer_type, p);
384}
385
386void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
387 Throttle* throttle)
388{
389 // only byte throttler is used in OSD
390 policy_set.set_throttlers(peer_type, throttle, nullptr);
391}
392
9f95a23c 393crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
11fdf7f2
TL
394{
395 if (auto found = connections.find(addr);
396 found != connections.end()) {
397 return found->second;
398 } else {
399 return nullptr;
400 }
401}
402
403void SocketMessenger::accept_conn(SocketConnectionRef conn)
404{
405 accepting_conns.insert(conn);
406}
407
408void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
409{
410 accepting_conns.erase(conn);
411}
412
413void SocketMessenger::register_conn(SocketConnectionRef conn)
414{
11fdf7f2
TL
415 auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
416 std::ignore = i;
417 ceph_assert(added);
418}
419
420void SocketMessenger::unregister_conn(SocketConnectionRef conn)
421{
422 ceph_assert(conn);
423 auto found = connections.find(conn->get_peer_addr());
424 ceph_assert(found != connections.end());
425 ceph_assert(found->second == conn);
426 connections.erase(found);
427}
9f95a23c 428
f67539c2
TL
429void SocketMessenger::closing_conn(SocketConnectionRef conn)
430{
431 closing_conns.push_back(conn);
432}
433
434void SocketMessenger::closed_conn(SocketConnectionRef conn)
435{
436 for (auto it = closing_conns.begin();
437 it != closing_conns.end();) {
438 if (*it == conn) {
439 it = closing_conns.erase(it);
440 } else {
441 it++;
442 }
443 }
444}
445
9f95a23c
TL
446seastar::future<uint32_t>
447SocketMessenger::get_global_seq(uint32_t old)
448{
449 if (old > global_seq) {
450 global_seq = old;
451 }
452 return seastar::make_ready_future<uint32_t>(++global_seq);
453}
454
455} // namespace crimson::net