2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
25 #include <linux/netlink.h>
26 #include <linux/rtnetlink.h>
27 #include <net/route.h>
29 #include <seastar/net/posix-stack.hh>
30 #include <seastar/net/net.hh>
31 #include <seastar/net/packet.hh>
32 #include <seastar/net/api.hh>
33 #include <seastar/net/inet_address.hh>
34 #include <seastar/util/std-compat.hh>
35 #include <netinet/tcp.h>
36 #include <netinet/sctp.h>
41 struct hash
<seastar::net::posix_ap_server_socket_impl::protocol_and_socket_address
> {
42 size_t operator()(const seastar::net::posix_ap_server_socket_impl::protocol_and_socket_address
& t_sa
) const {
43 auto h1
= std::hash
<int>()(std::get
<0>(t_sa
));
44 auto h2
= std::hash
<seastar::net::socket_address
>()(std::get
<1>(t_sa
));
55 using namespace seastar
;
57 class posix_connected_socket_operations
{
59 virtual ~posix_connected_socket_operations() = default;
60 virtual void set_nodelay(file_desc
& fd
, bool nodelay
) const = 0;
61 virtual bool get_nodelay(file_desc
& fd
) const = 0;
62 virtual void set_keepalive(file_desc
& _fd
, bool keepalive
) const = 0;
63 virtual bool get_keepalive(file_desc
& _fd
) const = 0;
64 virtual void set_keepalive_parameters(file_desc
& _fd
, const keepalive_params
& params
) const = 0;
65 virtual keepalive_params
get_keepalive_parameters(file_desc
& _fd
) const = 0;
68 thread_local
posix_ap_server_socket_impl::sockets_map_t
posix_ap_server_socket_impl::sockets
{};
69 thread_local
posix_ap_server_socket_impl::conn_map_t
posix_ap_server_socket_impl::conn_q
{};
71 class posix_tcp_connected_socket_operations
: public posix_connected_socket_operations
{
73 virtual void set_nodelay(file_desc
& _fd
, bool nodelay
) const override
{
74 _fd
.setsockopt(IPPROTO_TCP
, TCP_NODELAY
, int(nodelay
));
76 virtual bool get_nodelay(file_desc
& _fd
) const override
{
77 return _fd
.getsockopt
<int>(IPPROTO_TCP
, TCP_NODELAY
);
79 virtual void set_keepalive(file_desc
& _fd
, bool keepalive
) const override
{
80 _fd
.setsockopt(SOL_SOCKET
, SO_KEEPALIVE
, int(keepalive
));
82 virtual bool get_keepalive(file_desc
& _fd
) const override
{
83 return _fd
.getsockopt
<int>(SOL_SOCKET
, SO_KEEPALIVE
);
85 virtual void set_keepalive_parameters(file_desc
& _fd
, const keepalive_params
& params
) const override
{
86 const tcp_keepalive_params
& pms
= compat::get
<tcp_keepalive_params
>(params
);
87 _fd
.setsockopt(IPPROTO_TCP
, TCP_KEEPCNT
, pms
.count
);
88 _fd
.setsockopt(IPPROTO_TCP
, TCP_KEEPIDLE
, int(pms
.idle
.count()));
89 _fd
.setsockopt(IPPROTO_TCP
, TCP_KEEPINTVL
, int(pms
.interval
.count()));
91 virtual keepalive_params
get_keepalive_parameters(file_desc
& _fd
) const override
{
92 return tcp_keepalive_params
{
93 std::chrono::seconds(_fd
.getsockopt
<int>(IPPROTO_TCP
, TCP_KEEPIDLE
)),
94 std::chrono::seconds(_fd
.getsockopt
<int>(IPPROTO_TCP
, TCP_KEEPINTVL
)),
95 _fd
.getsockopt
<unsigned>(IPPROTO_TCP
, TCP_KEEPCNT
)
100 class posix_sctp_connected_socket_operations
: public posix_connected_socket_operations
{
102 virtual void set_nodelay(file_desc
& _fd
, bool nodelay
) const override
{
103 _fd
.setsockopt(SOL_SCTP
, SCTP_NODELAY
, int(nodelay
));
105 virtual bool get_nodelay(file_desc
& _fd
) const override
{
106 return _fd
.getsockopt
<int>(SOL_SCTP
, SCTP_NODELAY
);
108 virtual void set_keepalive(file_desc
& _fd
, bool keepalive
) const override
{
109 auto heartbeat
= _fd
.getsockopt
<sctp_paddrparams
>(SOL_SCTP
, SCTP_PEER_ADDR_PARAMS
);
111 heartbeat
.spp_flags
|= SPP_HB_ENABLE
;
113 heartbeat
.spp_flags
&= ~SPP_HB_ENABLE
;
115 _fd
.setsockopt(SOL_SCTP
, SCTP_PEER_ADDR_PARAMS
, heartbeat
);
117 virtual bool get_keepalive(file_desc
& _fd
) const override
{
118 return _fd
.getsockopt
<sctp_paddrparams
>(SOL_SCTP
, SCTP_PEER_ADDR_PARAMS
).spp_flags
& SPP_HB_ENABLE
;
120 virtual void set_keepalive_parameters(file_desc
& _fd
, const keepalive_params
& kpms
) const override
{
121 const sctp_keepalive_params
& pms
= compat::get
<sctp_keepalive_params
>(kpms
);
122 auto params
= _fd
.getsockopt
<sctp_paddrparams
>(SOL_SCTP
, SCTP_PEER_ADDR_PARAMS
);
123 params
.spp_hbinterval
= pms
.interval
.count() * 1000; // in milliseconds
124 params
.spp_pathmaxrxt
= pms
.count
;
125 _fd
.setsockopt(SOL_SCTP
, SCTP_PEER_ADDR_PARAMS
, params
);
127 virtual keepalive_params
get_keepalive_parameters(file_desc
& _fd
) const override
{
128 auto params
= _fd
.getsockopt
<sctp_paddrparams
>(SOL_SCTP
, SCTP_PEER_ADDR_PARAMS
);
129 return sctp_keepalive_params
{
130 std::chrono::seconds(params
.spp_hbinterval
/1000), // in seconds
131 params
.spp_pathmaxrxt
136 class posix_unix_stream_connected_socket_operations
: public posix_connected_socket_operations
{
138 virtual void set_nodelay(file_desc
& fd
, bool nodelay
) const override
{
139 assert(nodelay
); // make sure nobody actually tries to use this non-existing functionality
141 virtual bool get_nodelay(file_desc
& fd
) const override
{
144 virtual void set_keepalive(file_desc
& fd
, bool keepalive
) const override
{}
145 virtual bool get_keepalive(file_desc
& fd
) const override
{
148 virtual void set_keepalive_parameters(file_desc
& fd
, const keepalive_params
& p
) const override
{}
149 virtual keepalive_params
get_keepalive_parameters(file_desc
& fd
) const override
{
150 return keepalive_params
{};
154 static const posix_connected_socket_operations
*
155 get_posix_connected_socket_ops(sa_family_t family
, int protocol
) {
156 static posix_tcp_connected_socket_operations tcp_ops
;
157 static posix_sctp_connected_socket_operations sctp_ops
;
158 static posix_unix_stream_connected_socket_operations unix_ops
;
163 case IPPROTO_TCP
: return &tcp_ops
;
164 case IPPROTO_SCTP
: return &sctp_ops
;
174 class posix_connected_socket_impl final
: public connected_socket_impl
{
175 lw_shared_ptr
<pollable_fd
> _fd
;
176 const posix_connected_socket_operations
* _ops
;
177 conntrack::handle _handle
;
178 compat::polymorphic_allocator
<char>* _allocator
;
180 explicit posix_connected_socket_impl(sa_family_t family
, int protocol
, lw_shared_ptr
<pollable_fd
> fd
, compat::polymorphic_allocator
<char>* allocator
=memory::malloc_allocator
) :
181 _fd(std::move(fd
)), _ops(get_posix_connected_socket_ops(family
, protocol
)), _allocator(allocator
) {}
182 explicit posix_connected_socket_impl(sa_family_t family
, int protocol
, lw_shared_ptr
<pollable_fd
> fd
, conntrack::handle
&& handle
,
183 compat::polymorphic_allocator
<char>* allocator
=memory::malloc_allocator
) : _fd(std::move(fd
))
184 , _ops(get_posix_connected_socket_ops(family
, protocol
)), _handle(std::move(handle
)), _allocator(allocator
) {}
186 virtual data_source
source() override
{
187 return data_source(std::make_unique
< posix_data_source_impl
>(_fd
, _allocator
));
189 virtual data_sink
sink() override
{
190 return data_sink(std::make_unique
< posix_data_sink_impl
>(_fd
));
192 virtual void shutdown_input() override
{
193 _fd
->shutdown(SHUT_RD
);
195 virtual void shutdown_output() override
{
196 _fd
->shutdown(SHUT_WR
);
198 virtual void set_nodelay(bool nodelay
) override
{
199 return _ops
->set_nodelay(_fd
->get_file_desc(), nodelay
);
201 virtual bool get_nodelay() const override
{
202 return _ops
->get_nodelay(_fd
->get_file_desc());
204 void set_keepalive(bool keepalive
) override
{
205 return _ops
->set_keepalive(_fd
->get_file_desc(), keepalive
);
207 bool get_keepalive() const override
{
208 return _ops
->get_keepalive(_fd
->get_file_desc());
210 void set_keepalive_parameters(const keepalive_params
& p
) override
{
211 return _ops
->set_keepalive_parameters(_fd
->get_file_desc(), p
);
213 keepalive_params
get_keepalive_parameters() const override
{
214 return _ops
->get_keepalive_parameters(_fd
->get_file_desc());
216 friend class posix_server_socket_impl
;
217 friend class posix_ap_server_socket_impl
;
218 friend class posix_reuseport_server_socket_impl
;
219 friend class posix_network_stack
;
220 friend class posix_ap_network_stack
;
221 friend class posix_socket_impl
;
224 static void resolve_outgoing_address(socket_address
& a
) {
225 if (a
.family() != AF_INET6
226 || a
.as_posix_sockaddr_in6().sin6_scope_id
!= inet_address::invalid_scope
227 || !IN6_IS_ADDR_LINKLOCAL(&a
.as_posix_sockaddr_in6().sin6_addr
)
234 if (!(f
= fopen("/proc/net/ipv6_route", "r"))) {
235 throw std::system_error(errno
, std::system_category(), "resolve_address");
238 auto holder
= std::unique_ptr
<FILE, int(*)(FILE *)>(f
, &::fclose
);
241 Here all configured IPv6 routes are shown in a special format. The example displays for loopback interface only. The meaning is shown below (see net/ipv6/route.c for more).
243 # cat /proc/net/ipv6_route
244 00000000000000000000000000000000 00 00000000000000000000000000000000 00 00000000000000000000000000000000 ffffffff 00000001 00000001 00200200 lo
245 +------------------------------+ ++ +------------------------------+ ++ +------------------------------+ +------+ +------+ +------+ +------+ ++
249 1: IPv6 destination network displayed in 32 hexadecimal chars without colons as separator
251 2: IPv6 destination prefix length in hexadecimal
253 3: IPv6 source network displayed in 32 hexadecimal chars without colons as separator
255 4: IPv6 source prefix length in hexadecimal
257 5: IPv6 next hop displayed in 32 hexadecimal chars without colons as separator
259 6: Metric in hexadecimal
271 uint32_t prefix_len
, src_prefix_len
;
277 auto n
= fscanf(f
, "%4s%4s%4s%4s%4s%4s%4s%4s %02x "
278 "%*4s%*4s%*4s%*4s%*4s%*4s%*4s%*4s %02x "
279 "%*4s%*4s%*4s%*4s%*4s%*4s%*4s%*4s "
280 "%*08x %*08x %*08x %08lx %8s",
281 &dest_str
[0], &dest_str
[5], &dest_str
[10], &dest_str
[15],
282 &dest_str
[20], &dest_str
[25], &dest_str
[30], &dest_str
[35],
290 if ((prefix_len
> 128) || (src_prefix_len
!= 0)
291 || (flags
& (RTF_POLICY
| RTF_FLOW
))
292 || ((flags
& RTF_REJECT
) && prefix_len
== 0) /* reject all */) {
296 dest_str
[4] = dest_str
[9] = dest_str
[14] = dest_str
[19] = dest_str
[24] = dest_str
[29] = dest_str
[34] = ':';
299 struct in6_addr addr
;
300 if (inet_pton(AF_INET6
, dest_str
, &addr
) < 0) {
301 /* not an Ipv6 address */
305 auto bytes
= prefix_len
/ 8;
306 auto bits
= prefix_len
% 8;
308 auto& src
= a
.as_posix_sockaddr_in6().sin6_addr
;
310 if (bytes
> 0 && memcmp(&src
, &addr
, bytes
)) {
314 auto c1
= src
.s6_addr
[bytes
];
315 auto c2
= addr
.s6_addr
[bytes
];
316 auto mask
= 0xffu
<< (8 - bits
);
317 if ((c1
& mask
) != (c2
& mask
)) {
323 for (auto& nif
: engine().net().network_interfaces()) {
324 if (nif
.name() == device
|| nif
.display_name() == device
) {
325 a
.as_posix_sockaddr_in6().sin6_scope_id
= nif
.index();
332 class posix_socket_impl final
: public socket_impl
{
333 lw_shared_ptr
<pollable_fd
> _fd
;
334 compat::polymorphic_allocator
<char>* _allocator
;
335 bool _reuseaddr
= false;
337 future
<> find_port_and_connect(socket_address sa
, socket_address local
, transport proto
= transport::TCP
) {
338 static thread_local
std::default_random_engine random_engine
{std::random_device
{}()};
339 static thread_local
std::uniform_int_distribution
<uint16_t> u(49152/smp::count
+ 1, 65535/smp::count
- 1);
340 // If no explicit local address, set to dest address family wildcard.
341 if (local
.is_unspecified()) {
342 local
= net::inet_address(sa
.addr().in_family());
344 resolve_outgoing_address(sa
);
345 return repeat([this, sa
, local
, proto
, attempts
= 0, requested_port
= ntoh(local
.as_posix_sockaddr_in().sin_port
)] () mutable {
346 _fd
= engine().make_pollable_fd(sa
, int(proto
));
347 _fd
->get_file_desc().setsockopt(SOL_SOCKET
, SO_REUSEADDR
, int(_reuseaddr
));
348 uint16_t port
= attempts
++ < 5 && requested_port
== 0 && proto
== transport::TCP
? u(random_engine
) * smp::count
+ engine().cpu_id() : requested_port
;
349 local
.as_posix_sockaddr_in().sin_port
= hton(port
);
350 return futurize_apply([this, sa
, local
] { return engine().posix_connect(_fd
, sa
, local
); }).then_wrapped([port
, requested_port
] (future
<> f
) {
353 return stop_iteration::yes
;
354 } catch (std::system_error
& err
) {
355 if (port
!= requested_port
&& (err
.code().value() == EADDRINUSE
|| err
.code().value() == EADDRNOTAVAIL
)) {
356 return stop_iteration::no
;
364 /// an aux function to handle unix-domain-specific requests
365 future
<connected_socket
> connect_unix_domain(socket_address sa
, socket_address local
) {
366 // note that if the 'local' address was not set by the client, it was created as an undefined address
367 if (local
.is_unspecified()) {
368 local
= socket_address
{unix_domain_addr
{std::string
{}}};
371 _fd
= engine().make_pollable_fd(sa
, 0);
372 return engine().posix_connect(_fd
, sa
, local
).then(
373 [fd
= _fd
, allocator
= _allocator
](){
374 // a problem with 'private' interaction with 'unique_ptr'
375 std::unique_ptr
<connected_socket_impl
> csi
;
376 csi
.reset(new posix_connected_socket_impl
{AF_UNIX
, 0, std::move(fd
), allocator
});
377 return make_ready_future
<connected_socket
>(connected_socket(std::move(csi
)));
383 explicit posix_socket_impl(compat::polymorphic_allocator
<char>* allocator
=memory::malloc_allocator
) : _allocator(allocator
) {}
385 virtual future
<connected_socket
> connect(socket_address sa
, socket_address local
, transport proto
= transport::TCP
) override
{
386 if (sa
.is_af_unix()) {
387 return connect_unix_domain(sa
, local
);
389 return find_port_and_connect(sa
, local
, proto
).then([this, sa
, proto
, allocator
= _allocator
] () mutable {
390 std::unique_ptr
<connected_socket_impl
> csi
;
391 csi
.reset(new posix_connected_socket_impl(sa
.family(), static_cast<int>(proto
), _fd
, allocator
));
392 return make_ready_future
<connected_socket
>(connected_socket(std::move(csi
)));
396 void set_reuseaddr(bool reuseaddr
) override
{
397 _reuseaddr
= reuseaddr
;
399 _fd
->get_file_desc().setsockopt(SOL_SOCKET
, SO_REUSEADDR
, int(reuseaddr
));
403 bool get_reuseaddr() const override
{
405 return _fd
->get_file_desc().getsockopt
<int>(SOL_SOCKET
, SO_REUSEADDR
);
411 virtual void shutdown() override
{
414 _fd
->shutdown(SHUT_RDWR
);
415 } catch (std::system_error
& e
) {
416 if (e
.code().value() != ENOTCONN
) {
424 future
<accept_result
>
425 posix_server_socket_impl::accept() {
426 return _lfd
.accept().then([this] (std::tuple
<pollable_fd
, socket_address
> fd_sa
) {
427 auto& fd
= std::get
<0>(fd_sa
);
428 auto& sa
= std::get
<1>(fd_sa
);
429 auto cth
= [this, &sa
] {
431 case server_socket::load_balancing_algorithm::connection_distribution
:
432 return _conntrack
.get_handle();
433 case server_socket::load_balancing_algorithm::port
:
434 return _conntrack
.get_handle(ntoh(sa
.as_posix_sockaddr_in().sin_port
) % smp::count
);
435 case server_socket::load_balancing_algorithm::fixed
:
436 return _conntrack
.get_handle(_fixed_cpu
);
440 auto cpu
= cth
.cpu();
441 if (cpu
== engine().cpu_id()) {
442 std::unique_ptr
<connected_socket_impl
> csi(
443 new posix_connected_socket_impl(sa
.family(), _protocol
, make_lw_shared(std::move(fd
)), std::move(cth
), _allocator
));
444 return make_ready_future
<accept_result
>(
445 accept_result
{connected_socket(std::move(csi
)), sa
});
447 // FIXME: future is discarded
448 (void)smp::submit_to(cpu
, [protocol
= _protocol
, ssa
= _sa
, fd
= std::move(fd
.get_file_desc()), sa
, cth
= std::move(cth
), allocator
= _allocator
] () mutable {
449 posix_ap_server_socket_impl::move_connected_socket(protocol
, ssa
, pollable_fd(std::move(fd
)), sa
, std::move(cth
), allocator
);
457 posix_server_socket_impl::abort_accept() {
461 socket_address
posix_server_socket_impl::local_address() const {
462 return _lfd
.get_file_desc().get_address();
465 future
<accept_result
> posix_ap_server_socket_impl::accept() {
466 auto t_sa
= std::make_tuple(_protocol
, _sa
);
467 auto conni
= conn_q
.find(t_sa
);
468 if (conni
!= conn_q
.end()) {
469 connection c
= std::move(conni
->second
);
472 std::unique_ptr
<connected_socket_impl
> csi(
473 new posix_connected_socket_impl(_sa
.family(), _protocol
, make_lw_shared(std::move(c
.fd
)), std::move(c
.connection_tracking_handle
), _allocator
));
474 return make_ready_future
<accept_result
>(accept_result
{connected_socket(std::move(csi
)), std::move(c
.addr
)});
476 return make_exception_future
<accept_result
>(std::current_exception());
480 auto i
= sockets
.emplace(std::piecewise_construct
, std::make_tuple(t_sa
), std::make_tuple());
482 return i
.first
->second
.get_future();
484 return make_exception_future
<accept_result
>(std::current_exception());
490 posix_ap_server_socket_impl::abort_accept() {
491 auto t_sa
= std::make_tuple(_protocol
, _sa
);
493 auto i
= sockets
.find(t_sa
);
494 if (i
!= sockets
.end()) {
495 i
->second
.set_exception(std::system_error(ECONNABORTED
, std::system_category()));
500 future
<accept_result
>
501 posix_reuseport_server_socket_impl::accept() {
502 return _lfd
.accept().then([allocator
= _allocator
, protocol
= _protocol
] (std::tuple
<pollable_fd
, socket_address
> fd_sa
) {
503 auto& fd
= std::get
<0>(fd_sa
);
504 auto& sa
= std::get
<1>(fd_sa
);
505 std::unique_ptr
<connected_socket_impl
> csi(
506 new posix_connected_socket_impl(sa
.family(), protocol
, make_lw_shared(std::move(fd
)), allocator
));
507 return make_ready_future
<accept_result
>(
508 accept_result
{connected_socket(std::move(csi
)), sa
});
513 posix_reuseport_server_socket_impl::abort_accept() {
517 socket_address
posix_reuseport_server_socket_impl::local_address() const {
518 return _lfd
.get_file_desc().get_address();
522 posix_ap_server_socket_impl::move_connected_socket(int protocol
, socket_address sa
, pollable_fd fd
, socket_address addr
, conntrack::handle cth
, compat::polymorphic_allocator
<char>* allocator
) {
523 auto t_sa
= std::make_tuple(protocol
, sa
);
524 auto i
= sockets
.find(t_sa
);
525 if (i
!= sockets
.end()) {
527 std::unique_ptr
<connected_socket_impl
> csi(new posix_connected_socket_impl(sa
.family(), protocol
, make_lw_shared(std::move(fd
)), std::move(cth
), allocator
));
528 i
->second
.set_value(accept_result
{connected_socket(std::move(csi
)), std::move(addr
)});
530 i
->second
.set_exception(std::current_exception());
534 conn_q
.emplace(std::piecewise_construct
, std::make_tuple(t_sa
), std::make_tuple(std::move(fd
), std::move(addr
), std::move(cth
)));
538 future
<temporary_buffer
<char>>
539 posix_data_source_impl::get() {
540 return _fd
->read_some(_buf
.get_write(), _buf_size
).then([this] (size_t size
) {
542 auto ret
= std::move(_buf
);
543 _buf
= make_temporary_buffer
<char>(_buffer_allocator
, _buf_size
);
544 return make_ready_future
<temporary_buffer
<char>>(std::move(ret
));
548 future
<> posix_data_source_impl::close() {
549 _fd
->shutdown(SHUT_RD
);
550 return make_ready_future
<>();
553 std::vector
<struct iovec
> to_iovec(const packet
& p
) {
554 std::vector
<struct iovec
> v
;
555 v
.reserve(p
.nr_frags());
556 for (auto&& f
: p
.fragments()) {
557 v
.push_back({.iov_base
= f
.base
, .iov_len
= f
.size
});
562 std::vector
<iovec
> to_iovec(std::vector
<temporary_buffer
<char>>& buf_vec
) {
563 std::vector
<iovec
> v
;
564 v
.reserve(buf_vec
.size());
565 for (auto& buf
: buf_vec
) {
566 v
.push_back({.iov_base
= buf
.get_write(), .iov_len
= buf
.size()});
572 posix_data_sink_impl::put(temporary_buffer
<char> buf
) {
573 return _fd
->write_all(buf
.get(), buf
.size()).then([d
= buf
.release()] {});
577 posix_data_sink_impl::put(packet p
) {
579 return _fd
->write_all(_p
).then([this] { _p
.reset(); });
583 posix_data_sink_impl::close() {
584 _fd
->shutdown(SHUT_WR
);
585 return make_ready_future
<>();
589 posix_network_stack::listen(socket_address sa
, listen_options opt
) {
590 using server_socket
= seastar::api_v2::server_socket
;
591 // allow unspecified bind address -> default to ipv4 wildcard
592 if (sa
.is_unspecified()) {
593 sa
= inet_address(inet_address::family::INET
);
595 if (sa
.is_af_unix()) {
596 return server_socket(std::make_unique
<posix_server_socket_impl
>(0, sa
, engine().posix_listen(sa
, opt
), opt
.lba
, opt
.fixed_cpu
, _allocator
));
598 auto protocol
= static_cast<int>(opt
.proto
);
600 server_socket(std::make_unique
<posix_reuseport_server_socket_impl
>(protocol
, sa
, engine().posix_listen(sa
, opt
), _allocator
))
602 server_socket(std::make_unique
<posix_server_socket_impl
>(protocol
, sa
, engine().posix_listen(sa
, opt
), opt
.lba
, opt
.fixed_cpu
, _allocator
));
605 ::seastar::socket
posix_network_stack::socket() {
606 return ::seastar::socket(std::make_unique
<posix_socket_impl
>(_allocator
));
610 posix_ap_network_stack::listen(socket_address sa
, listen_options opt
) {
611 using server_socket
= seastar::api_v2::server_socket
;
612 // allow unspecified bind address -> default to ipv4 wildcard
613 if (sa
.is_unspecified()) {
614 sa
= inet_address(inet_address::family::INET
);
616 if (sa
.is_af_unix()) {
617 return server_socket(std::make_unique
<posix_ap_server_socket_impl
>(0, sa
, _allocator
));
619 auto protocol
= static_cast<int>(opt
.proto
);
621 server_socket(std::make_unique
<posix_reuseport_server_socket_impl
>(protocol
, sa
, engine().posix_listen(sa
, opt
), _allocator
))
623 server_socket(std::make_unique
<posix_ap_server_socket_impl
>(protocol
, sa
, _allocator
));
626 struct cmsg_with_pktinfo
{
629 struct in_pktinfo pktinfo
;
630 struct in6_pktinfo pkt6info
;
634 class posix_udp_channel
: public udp_channel_impl
{
636 static constexpr int MAX_DATAGRAM_SIZE
= 65507;
640 socket_address _src_addr
;
642 cmsg_with_pktinfo _cmsg
;
645 memset(&_hdr
, 0, sizeof(_hdr
));
646 _hdr
.msg_iov
= &_iov
;
648 _hdr
.msg_name
= &_src_addr
.u
.sa
;
649 _hdr
.msg_namelen
= sizeof(_src_addr
.u
.sas
);
650 memset(&_cmsg
, 0, sizeof(_cmsg
));
651 _hdr
.msg_control
= &_cmsg
;
652 _hdr
.msg_controllen
= sizeof(_cmsg
);
656 _buffer
= new char[MAX_DATAGRAM_SIZE
];
657 _iov
.iov_base
= _buffer
;
658 _iov
.iov_len
= MAX_DATAGRAM_SIZE
;
663 std::vector
<struct iovec
> _iovecs
;
668 memset(&_hdr
, 0, sizeof(_hdr
));
669 _hdr
.msg_name
= &_dst
.u
.sa
;
670 _hdr
.msg_namelen
= sizeof(_dst
.u
.sas
);
673 void prepare(const socket_address
& dst
, packet p
) {
676 _iovecs
= to_iovec(_p
);
677 _hdr
.msg_iov
= _iovecs
.data();
678 _hdr
.msg_iovlen
= _iovecs
.size();
679 resolve_outgoing_address(_dst
);
682 std::unique_ptr
<pollable_fd
> _fd
;
683 socket_address _address
;
688 posix_udp_channel(const socket_address
& bind_address
)
690 auto sa
= bind_address
.is_unspecified() ? socket_address(inet_address(inet_address::family::INET
)) : bind_address
;
691 file_desc fd
= file_desc::socket(sa
.u
.sa
.sa_family
, SOCK_DGRAM
| SOCK_NONBLOCK
| SOCK_CLOEXEC
, 0);
692 fd
.setsockopt(SOL_IP
, IP_PKTINFO
, true);
693 if (engine().posix_reuseport_available()) {
694 fd
.setsockopt(SOL_SOCKET
, SO_REUSEPORT
, 1);
696 fd
.bind(sa
.u
.sa
, sizeof(sa
.u
.sas
));
697 _address
= fd
.get_address();
698 _fd
= std::make_unique
<pollable_fd
>(std::move(fd
));
700 virtual ~posix_udp_channel() { if (!_closed
) close(); };
701 virtual future
<udp_datagram
> receive() override
;
702 virtual future
<> send(const socket_address
& dst
, const char *msg
) override
;
703 virtual future
<> send(const socket_address
& dst
, packet p
) override
;
704 virtual void shutdown_input() override
{
707 virtual void shutdown_output() override
{
710 virtual void close() override
{
714 virtual bool is_closed() const override
{ return _closed
; }
715 socket_address
local_address() const override
{
716 assert(_address
.u
.sas
.ss_family
!= AF_INET6
|| (_address
.addr_length
> 20));
721 future
<> posix_udp_channel::send(const socket_address
& dst
, const char *message
) {
722 auto len
= strlen(message
);
724 resolve_outgoing_address(a
);
725 return _fd
->sendto(a
, message
, len
)
726 .then([len
] (size_t size
) { assert(size
== len
); });
729 future
<> posix_udp_channel::send(const socket_address
& dst
, packet p
) {
731 _send
.prepare(dst
, std::move(p
));
732 return _fd
->sendmsg(&_send
._hdr
)
733 .then([len
] (size_t size
) { assert(size
== len
); });
737 posix_network_stack::make_udp_channel(const socket_address
& addr
) {
738 return udp_channel(std::make_unique
<posix_udp_channel
>(addr
));
742 posix_network_stack::supports_ipv6() const {
743 static bool has_ipv6
= [] {
745 posix_udp_channel
c(ipv6_addr
{"::1"});
755 class posix_datagram
: public udp_datagram_impl
{
761 posix_datagram(const socket_address
& src
, const socket_address
& dst
, packet p
) : _src(src
), _dst(dst
), _p(std::move(p
)) {}
762 virtual socket_address
get_src() override
{ return _src
; }
763 virtual socket_address
get_dst() override
{ return _dst
; }
764 virtual uint16_t get_dst_port() override
{ return _dst
.port(); }
765 virtual packet
& get_data() override
{ return _p
; }
769 posix_udp_channel::receive() {
771 return _fd
->recvmsg(&_recv
._hdr
).then([this] (size_t size
) {
773 for (auto* cmsg
= CMSG_FIRSTHDR(&_recv
._hdr
); cmsg
!= nullptr; cmsg
= CMSG_NXTHDR(&_recv
._hdr
, cmsg
)) {
774 if (cmsg
->cmsg_level
== IPPROTO_IP
&& cmsg
->cmsg_type
== IP_PKTINFO
) {
775 dst
= ipv4_addr(reinterpret_cast<const in_pktinfo
*>(CMSG_DATA(cmsg
))->ipi_addr
, _address
.port());
777 } else if (cmsg
->cmsg_level
== IPPROTO_IPV6
&& cmsg
->cmsg_type
== IPV6_PKTINFO
) {
778 dst
= ipv6_addr(reinterpret_cast<const in6_pktinfo
*>(CMSG_DATA(cmsg
))->ipi6_addr
, _address
.port());
782 return make_ready_future
<udp_datagram
>(udp_datagram(std::make_unique
<posix_datagram
>(
783 _recv
._src_addr
, dst
, packet(fragment
{_recv
._buffer
, size
}, make_deleter([buf
= _recv
._buffer
] { delete[] buf
; })))));
784 }).handle_exception([p
= _recv
._buffer
](auto ep
) {
786 return make_exception_future
<udp_datagram
>(std::move(ep
));
790 void register_posix_stack() {
791 register_network_stack("posix", boost::program_options::options_description(),
792 [](boost::program_options::variables_map ops
) {
793 return smp::main_thread() ? posix_network_stack::create(ops
)
794 : posix_ap_network_stack::create(ops
);
799 // nw interface stuff
801 std::vector
<network_interface
> posix_network_stack::network_interfaces() {
802 class posix_network_interface_impl final
: public network_interface_impl
{
804 uint32_t _index
= 0, _mtu
= 0;
805 sstring _name
, _display_name
;
806 std::vector
<net::inet_address
> _addresses
;
807 std::vector
<uint8_t> _hardware_address
;
808 bool _loopback
= false, _virtual
= false, _up
= false;
810 uint32_t index() const override
{
813 uint32_t mtu() const override
{
816 const sstring
& name() const override
{
819 const sstring
& display_name() const override
{
820 return _display_name
.empty() ? name() : _display_name
;
822 const std::vector
<net::inet_address
>& addresses() const override
{
825 const std::vector
<uint8_t> hardware_address() const override
{
826 return _hardware_address
;
828 bool is_loopback() const override
{
831 bool is_virtual() const override
{
834 bool is_up() const override
{
835 // TODO: should be checked on query?
838 bool supports_ipv6() const override
{
839 // TODO: this is not 100% correct.
840 return std::any_of(_addresses
.begin(), _addresses
.end(), std::mem_fn(&inet_address::is_ipv6
));
844 // For now, keep an immutable set of interfaces created on start, shared across
846 static const std::vector
<posix_network_interface_impl
> global_interfaces
= [] {
847 auto fd
= ::socket(AF_NETLINK
, SOCK_RAW
, NETLINK_ROUTE
);
848 throw_system_error_on(fd
< 0, "could not open netlink socket");
850 std::unique_ptr
<int, void(*)(int*)> fd_guard(&fd
, [](int* p
) { ::close(*p
); });
852 auto pid
= ::getpid();
854 sockaddr_nl local
= { 0, };
855 local
.nl_family
= AF_NETLINK
;
857 local
.nl_groups
= RTMGRP_IPV6_IFADDR
|RTMGRP_IPV4_IFADDR
;
859 throw_system_error_on(bind(fd
, (struct sockaddr
*) &local
, sizeof(local
)) < 0, "could not bind netlink socket");
861 /* RTNL socket is ready for use, prepare and send requests */
863 std::vector
<posix_network_interface_impl
> res
;
865 for (auto msg
: { RTM_GETLINK
, RTM_GETADDR
}) {
874 sockaddr_nl kernel
= { 0, };
875 msghdr rtnl_msg
= { 0, };
877 kernel
.nl_family
= AF_NETLINK
; /* fill-in kernel address (destination of our message) */
879 req
.hdr
.nlmsg_len
= NLMSG_LENGTH(sizeof(struct rtgenmsg
));
880 req
.hdr
.nlmsg_type
= msg
;
881 req
.hdr
.nlmsg_flags
= NLM_F_REQUEST
| NLM_F_ROOT
;
882 req
.hdr
.nlmsg_seq
= 1;
883 req
.hdr
.nlmsg_pid
= pid
;
885 if (msg
== RTM_GETLINK
) {
886 req
.gen
.rtgen_family
= AF_PACKET
; /* no preferred AF, we will get *all* interfaces */
888 req
.addr
.ifa_family
= AF_UNSPEC
;
894 io
.iov_len
= req
.hdr
.nlmsg_len
;
896 rtnl_msg
.msg_iov
= &io
;
897 rtnl_msg
.msg_iovlen
= 1;
898 rtnl_msg
.msg_name
= &kernel
;
899 rtnl_msg
.msg_namelen
= sizeof(kernel
);
901 throw_system_error_on(::sendmsg(fd
, (struct msghdr
*) &rtnl_msg
, 0) < 0, "could not send netlink request");
904 constexpr size_t reply_buffer_size
= 8192;
905 char reply
[reply_buffer_size
];
910 msghdr rtnl_reply
= { 0, };
911 iovec io_reply
= { 0, };
913 io_reply
.iov_base
= reply
;
914 io_reply
.iov_len
= reply_buffer_size
;
915 rtnl_reply
.msg_iov
= &io_reply
;
916 rtnl_reply
.msg_iovlen
= 1;
917 rtnl_reply
.msg_name
= &kernel
;
918 rtnl_reply
.msg_namelen
= sizeof(kernel
);
920 auto len
= ::recvmsg(fd
, &rtnl_reply
, 0); /* read as much data as fits in the receive buffer */
925 for (auto* msg_ptr
= (struct nlmsghdr
*) reply
; NLMSG_OK(msg_ptr
, len
); msg_ptr
= NLMSG_NEXT(msg_ptr
, len
)) {
926 switch(msg_ptr
->nlmsg_type
) {
927 case NLMSG_DONE
: // that is all
932 auto* iface
= reinterpret_cast<const ifinfomsg
*>(NLMSG_DATA(msg_ptr
));
933 auto ilen
= msg_ptr
->nlmsg_len
- NLMSG_LENGTH(sizeof(ifinfomsg
));
935 // todo: filter any non-network interfaces (family)
937 posix_network_interface_impl nwif
;
939 nwif
._index
= iface
->ifi_index
;
940 nwif
._loopback
= (iface
->ifi_flags
& IFF_LOOPBACK
) != 0;
941 nwif
._up
= (iface
->ifi_flags
& IFF_UP
) != 0;
942 #if defined(IFF_802_1Q_VLAN) && defined(IFF_EBRIDGE) && defined(IFF_SLAVE_INACTIVE)
943 nwif
._virtual
= (iface
->ifi_flags
& (IFF_802_1Q_VLAN
|IFF_EBRIDGE
|IFF_SLAVE_INACTIVE
)) != 0;
945 for (auto* attribute
= IFLA_RTA(iface
); RTA_OK(attribute
, ilen
); attribute
= RTA_NEXT(attribute
, ilen
)) {
946 switch(attribute
->rta_type
) {
948 nwif
._name
= reinterpret_cast<const char *>(RTA_DATA(attribute
));
951 nwif
._mtu
= *reinterpret_cast<const uint32_t *>(RTA_DATA(attribute
));
954 nwif
._hardware_address
.assign(reinterpret_cast<const uint8_t *>(RTA_DATA(attribute
)), reinterpret_cast<const uint8_t *>(RTA_DATA(attribute
)) + RTA_PAYLOAD(attribute
));
961 res
.emplace_back(std::move(nwif
));
967 auto* addr
= reinterpret_cast<const ifaddrmsg
*>(NLMSG_DATA(msg_ptr
));
968 auto ilen
= msg_ptr
->nlmsg_len
- NLMSG_LENGTH(sizeof(ifaddrmsg
));
970 for (auto& nwif
: res
) {
971 if (nwif
._index
== addr
->ifa_index
) {
972 for (auto* attribute
= IFA_RTA(addr
); RTA_OK(attribute
, ilen
); attribute
= RTA_NEXT(attribute
, ilen
)) {
973 compat::optional
<inet_address
> ia
;
975 switch(attribute
->rta_type
) {
977 case IFA_ADDRESS
: // ipv6 addresses are reported only as "ADDRESS"
979 if (RTA_PAYLOAD(attribute
) == sizeof(::in_addr
)) {
980 ia
.emplace(*reinterpret_cast<const ::in_addr
*>(RTA_DATA(attribute
)));
981 } else if (RTA_PAYLOAD(attribute
) == sizeof(::in6_addr
)) {
982 ia
.emplace(*reinterpret_cast<const ::in6_addr
*>(RTA_DATA(attribute
)), nwif
.index());
985 if (ia
&& std::find(nwif
._addresses
.begin(), nwif
._addresses
.end(), *ia
) == nwif
._addresses
.end()) {
986 nwif
._addresses
.emplace_back(*ia
);
1009 // And a similarly immutable set of shared_ptr to network_interface_impl per shard, ready
1010 // to be handed out to callers with minimal overhead
1011 static const thread_local
std::vector
<shared_ptr
<posix_network_interface_impl
>> thread_local_interfaces
= [] {
1012 std::vector
<shared_ptr
<posix_network_interface_impl
>> res
;
1013 res
.reserve(global_interfaces
.size());
1014 std::transform(global_interfaces
.begin(), global_interfaces
.end(), std::back_inserter(res
), [](const posix_network_interface_impl
& impl
) {
1015 return make_shared
<posix_network_interface_impl
>(impl
);
1020 return std::vector
<network_interface
>(thread_local_interfaces
.begin(), thread_local_interfaces
.end());