1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2017 Red Hat, Inc
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include "SocketMessenger.h"
17 #include <seastar/core/sleep.hh>
20 #include <boost/functional/hash.hpp>
23 #include "auth/Auth.h"
28 seastar::logger
& logger() {
29 return crimson::get_logger(ceph_subsys_ms
);
33 namespace crimson::net
{
35 SocketMessenger::SocketMessenger(const entity_name_t
& myname
,
36 const std::string
& logic_name
,
38 : master_sid
{seastar::this_shard_id()},
39 logic_name
{logic_name
},
44 SocketMessenger::~SocketMessenger()
46 logger().debug("~SocketMessenger: {}", logic_name
);
47 ceph_assert(!listener
);
50 bool SocketMessenger::set_addr_unknowns(const entity_addrvec_t
&addrs
)
54 entity_addrvec_t newaddrs
= my_addrs
;
55 for (auto& a
: newaddrs
.v
) {
56 if (a
.is_blank_ip()) {
57 int type
= a
.get_type();
58 int port
= a
.get_port();
59 uint32_t nonce
= a
.get_nonce();
60 for (auto& b
: addrs
.v
) {
61 if (a
.get_family() == b
.get_family()) {
62 logger().debug(" assuming my addr {} matches provided addr {}", a
, b
);
77 void SocketMessenger::set_myaddrs(const entity_addrvec_t
& addrs
)
79 assert(seastar::this_shard_id() == master_sid
);
81 for (auto& addr
: my_addrs
.v
) {
86 crimson::net::listen_ertr::future
<>
87 SocketMessenger::do_listen(const entity_addrvec_t
& addrs
)
89 assert(seastar::this_shard_id() == master_sid
);
90 ceph_assert(addrs
.front().get_family() == AF_INET
);
92 return seastar::futurize_invoke([this] {
94 return FixedCPUServerSocket::create().then([this] (auto _listener
) {
98 return seastar::now();
100 }).then([this] () -> listen_ertr::future
<> {
101 const entity_addr_t listen_addr
= get_myaddr();
102 logger().debug("{} do_listen: try listen {}...", *this, listen_addr
);
104 logger().warn("{} do_listen: listener doesn't exist", *this);
105 return listen_ertr::now();
107 return listener
->listen(listen_addr
);
111 SocketMessenger::bind_ertr::future
<>
112 SocketMessenger::try_bind(const entity_addrvec_t
& addrs
,
113 uint32_t min_port
, uint32_t max_port
)
115 // the classical OSD iterates over the addrvec and tries to listen on each
116 // addr. crimson doesn't need to follow as there is a consensus we need to
117 // worry only about proto v2.
118 assert(addrs
.size() == 1);
119 auto addr
= addrs
.msgr2_addr();
120 if (addr
.get_port() != 0) {
121 return do_listen(addrs
).safe_then([this] {
122 logger().info("{} try_bind: done", *this);
125 ceph_assert(min_port
<= max_port
);
126 return seastar::do_with(uint32_t(min_port
),
127 [this, max_port
, addr
] (auto& port
) {
128 return seastar::repeat_until_value([this, max_port
, addr
, &port
] {
130 to_bind
.set_port(port
);
131 return do_listen(entity_addrvec_t
{to_bind
}
132 ).safe_then([this] () -> seastar::future
<std::optional
<std::error_code
>> {
133 logger().info("{} try_bind: done", *this);
134 return seastar::make_ready_future
<std::optional
<std::error_code
>>(
135 std::make_optional
<std::error_code
>(std::error_code
{/* success! */}));
136 }, listen_ertr::all_same_way([this, max_port
, &port
]
137 (const std::error_code
& e
) mutable
138 -> seastar::future
<std::optional
<std::error_code
>> {
139 logger().trace("{} try_bind: {} got error {}", *this, port
, e
);
140 if (port
== max_port
) {
141 return seastar::make_ready_future
<std::optional
<std::error_code
>>(
142 std::make_optional
<std::error_code
>(e
));
145 return seastar::make_ready_future
<std::optional
<std::error_code
>>(
146 std::optional
<std::error_code
>{std::nullopt
});
148 }).then([] (const std::error_code e
) -> bind_ertr::future
<> {
150 return bind_ertr::now(); // success!
151 } else if (e
== std::errc::address_in_use
) {
152 return crimson::ct_error::address_in_use::make();
153 } else if (e
== std::errc::address_not_available
) {
154 return crimson::ct_error::address_not_available::make();
161 SocketMessenger::bind_ertr::future
<>
162 SocketMessenger::bind(const entity_addrvec_t
& addrs
)
164 using crimson::common::local_conf
;
165 return seastar::do_with(int64_t{local_conf()->ms_bind_retry_count
},
166 [this, addrs
] (auto& count
) {
167 return seastar::repeat_until_value([this, addrs
, &count
] {
169 return try_bind(addrs
,
170 local_conf()->ms_bind_port_min
,
171 local_conf()->ms_bind_port_max
)
173 logger().info("{} try_bind: done", *this);
174 return seastar::make_ready_future
<std::optional
<std::error_code
>>(
175 std::make_optional
<std::error_code
>(std::error_code
{/* success! */}));
176 }, bind_ertr::all_same_way([this, &count
] (const std::error_code error
) {
178 logger().info("{} was unable to bind. Trying again in {} seconds",
179 *this, local_conf()->ms_bind_retry_delay
);
180 return seastar::sleep(
181 std::chrono::seconds(local_conf()->ms_bind_retry_delay
)
183 // one more time, please
184 return seastar::make_ready_future
<std::optional
<std::error_code
>>(
185 std::optional
<std::error_code
>{std::nullopt
});
188 logger().info("{} was unable to bind after {} attempts: {}",
189 *this, local_conf()->ms_bind_retry_count
, error
);
190 return seastar::make_ready_future
<std::optional
<std::error_code
>>(
191 std::make_optional
<std::error_code
>(error
));
194 }).then([] (const std::error_code error
) -> bind_ertr::future
<> {
196 return bind_ertr::now(); // success!
197 } else if (error
== std::errc::address_in_use
) {
198 return crimson::ct_error::address_in_use::make();
199 } else if (error
== std::errc::address_not_available
) {
200 return crimson::ct_error::address_not_available::make();
207 seastar::future
<> SocketMessenger::start(
208 const dispatchers_t
& _dispatchers
) {
209 assert(seastar::this_shard_id() == master_sid
);
211 dispatchers
.assign(_dispatchers
);
213 // make sure we have already bound to a valid address
214 ceph_assert(get_myaddr().is_msgr2());
215 ceph_assert(get_myaddr().get_port() > 0);
217 return listener
->accept([this] (SocketRef socket
, entity_addr_t peer_addr
) {
218 assert(seastar::this_shard_id() == master_sid
);
219 assert(get_myaddr().is_msgr2());
220 SocketConnectionRef conn
=
221 seastar::make_shared
<SocketConnection
>(*this, dispatchers
);
222 conn
->start_accept(std::move(socket
), peer_addr
);
223 return seastar::now();
226 return seastar::now();
229 crimson::net::ConnectionRef
230 SocketMessenger::connect(const entity_addr_t
& peer_addr
, const entity_name_t
& peer_name
)
232 assert(seastar::this_shard_id() == master_sid
);
234 // make sure we connect to a valid peer_addr
235 if (!peer_addr
.is_msgr2()) {
236 ceph_abort_msg("ProtocolV1 is no longer supported");
238 ceph_assert(peer_addr
.get_port() > 0);
240 if (auto found
= lookup_conn(peer_addr
); found
) {
241 logger().debug("{} connect to existing", *found
);
242 return found
->get_local_shared_foreign_from_this();
244 SocketConnectionRef conn
=
245 seastar::make_shared
<SocketConnection
>(*this, dispatchers
);
246 conn
->start_connect(peer_addr
, peer_name
);
247 return conn
->get_local_shared_foreign_from_this();
250 seastar::future
<> SocketMessenger::shutdown()
252 assert(seastar::this_shard_id() == master_sid
);
253 return seastar::futurize_invoke([this] {
254 assert(dispatchers
.empty());
256 auto d_listener
= listener
;
258 return d_listener
->destroy();
260 return seastar::now();
262 // close all connections
264 return seastar::parallel_for_each(accepting_conns
, [] (auto conn
) {
265 return conn
->close_clean_yielded();
268 ceph_assert(accepting_conns
.empty());
269 return seastar::parallel_for_each(connections
, [] (auto conn
) {
270 return conn
.second
->close_clean_yielded();
273 return seastar::parallel_for_each(closing_conns
, [] (auto conn
) {
274 return conn
->close_clean_yielded();
277 ceph_assert(connections
.empty());
278 shutdown_promise
.set_value();
282 static entity_addr_t
choose_addr(
283 const entity_addr_t
&peer_addr_for_me
,
284 const SocketConnection
& conn
)
286 using crimson::common::local_conf
;
287 // XXX: a syscall is here
288 if (const auto local_addr
= conn
.get_local_address();
289 local_conf()->ms_learn_addr_from_peer
) {
290 logger().info("{} peer {} says I am {} (socket says {})",
291 conn
, conn
.get_peer_socket_addr(), peer_addr_for_me
,
293 return peer_addr_for_me
;
295 const auto local_addr_for_me
= conn
.get_local_address();
296 logger().info("{} socket to {} says I am {} (peer says {})",
297 conn
, conn
.get_peer_socket_addr(),
298 local_addr
, peer_addr_for_me
);
300 addr
.set_sockaddr(&local_addr_for_me
.as_posix_sockaddr());
305 void SocketMessenger::learned_addr(
306 const entity_addr_t
&peer_addr_for_me
,
307 const SocketConnection
& conn
)
309 assert(seastar::this_shard_id() == master_sid
);
311 if ((!get_myaddr().is_any() &&
312 get_myaddr().get_type() != peer_addr_for_me
.get_type()) ||
313 get_myaddr().get_family() != peer_addr_for_me
.get_family() ||
314 !get_myaddr().is_same_host(peer_addr_for_me
)) {
315 logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
316 conn
, peer_addr_for_me
, get_myaddr());
317 throw std::system_error(
318 make_error_code(crimson::net::error::bad_peer_address
));
323 if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE
) {
325 auto addr
= choose_addr(peer_addr_for_me
, conn
);
326 addr
.set_type(entity_addr_t::TYPE_ANY
);
329 set_myaddrs(entity_addrvec_t
{addr
});
330 logger().info("{} learned myaddr={} (unbound)", conn
, get_myaddr());
333 if (!get_myaddr().is_any() &&
334 get_myaddr().get_type() != peer_addr_for_me
.get_type()) {
335 logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
336 conn
, peer_addr_for_me
, get_myaddr());
337 throw std::system_error(
338 make_error_code(crimson::net::error::bad_peer_address
));
340 if (get_myaddr().get_family() != peer_addr_for_me
.get_family()) {
341 logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
342 conn
, peer_addr_for_me
, get_myaddr());
343 throw std::system_error(
344 make_error_code(crimson::net::error::bad_peer_address
));
346 if (get_myaddr().is_blank_ip()) {
347 auto addr
= choose_addr(peer_addr_for_me
, conn
);
348 addr
.set_type(get_myaddr().get_type());
349 addr
.set_port(get_myaddr().get_port());
351 set_myaddrs(entity_addrvec_t
{addr
});
352 logger().info("{} learned myaddr={} (blank IP)", conn
, get_myaddr());
353 } else if (!get_myaddr().is_same_host(peer_addr_for_me
)) {
354 logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
355 conn
, peer_addr_for_me
, get_myaddr());
356 throw std::system_error(
357 make_error_code(crimson::net::error::bad_peer_address
));
364 SocketPolicy
SocketMessenger::get_policy(entity_type_t peer_type
) const
366 return policy_set
.get(peer_type
);
369 SocketPolicy
SocketMessenger::get_default_policy() const
371 return policy_set
.get_default();
374 void SocketMessenger::set_default_policy(const SocketPolicy
& p
)
376 policy_set
.set_default(p
);
379 void SocketMessenger::set_policy(entity_type_t peer_type
,
380 const SocketPolicy
& p
)
382 policy_set
.set(peer_type
, p
);
385 void SocketMessenger::set_policy_throttler(entity_type_t peer_type
,
388 // only byte throttler is used in OSD
389 policy_set
.set_throttlers(peer_type
, throttle
, nullptr);
392 crimson::net::SocketConnectionRef
SocketMessenger::lookup_conn(const entity_addr_t
& addr
)
394 if (auto found
= connections
.find(addr
);
395 found
!= connections
.end()) {
396 return found
->second
;
402 void SocketMessenger::accept_conn(SocketConnectionRef conn
)
404 accepting_conns
.insert(conn
);
407 void SocketMessenger::unaccept_conn(SocketConnectionRef conn
)
409 accepting_conns
.erase(conn
);
412 void SocketMessenger::register_conn(SocketConnectionRef conn
)
414 auto [i
, added
] = connections
.emplace(conn
->get_peer_addr(), conn
);
419 void SocketMessenger::unregister_conn(SocketConnectionRef conn
)
422 auto found
= connections
.find(conn
->get_peer_addr());
423 ceph_assert(found
!= connections
.end());
424 ceph_assert(found
->second
== conn
);
425 connections
.erase(found
);
428 void SocketMessenger::closing_conn(SocketConnectionRef conn
)
430 closing_conns
.push_back(conn
);
433 void SocketMessenger::closed_conn(SocketConnectionRef conn
)
435 for (auto it
= closing_conns
.begin();
436 it
!= closing_conns
.end();) {
438 it
= closing_conns
.erase(it
);
445 uint32_t SocketMessenger::get_global_seq(uint32_t old
)
447 if (old
> global_seq
) {
453 } // namespace crimson::net