#include <random>
+#include <sys/socket.h>
#include <linux/if.h>
#include <linux/netlink.h>
#include <linux/rtnetlink.h>
#include <net/route.h>
+#include <seastar/core/loop.hh>
+#include <seastar/core/reactor.hh>
#include <seastar/net/posix-stack.hh>
#include <seastar/net/net.hh>
#include <seastar/net/packet.hh>
}
+
+namespace {
+
+// reinterpret_cast<foo*>() on a pointer that the compiler knows points to an
+// object with a different type is disliked by the compiler as it violates
+// strict aliasing rules. This safe version does the same thing but keeps the
+// compiler happy.
+template <typename T>
+T
+copy_reinterpret_cast(const void* ptr) {
+ T tmp;
+ std::memcpy(&tmp, ptr, sizeof(T));
+ return tmp;
+}
+
+}
+
namespace seastar {
namespace net {
virtual bool get_keepalive(file_desc& _fd) const = 0;
virtual void set_keepalive_parameters(file_desc& _fd, const keepalive_params& params) const = 0;
virtual keepalive_params get_keepalive_parameters(file_desc& _fd) const = 0;
+ virtual void set_sockopt(file_desc& _fd, int level, int optname, const void* data, size_t len) const {
+ _fd.setsockopt(level, optname, data, socklen_t(len));
+ }
+ virtual int get_sockopt(file_desc& _fd, int level, int optname, void* data, size_t len) const {
+ return _fd.getsockopt(level, optname, reinterpret_cast<char*>(data), socklen_t(len));
+ }
};
thread_local posix_ap_server_socket_impl::sockets_map_t posix_ap_server_socket_impl::sockets{};
return _fd.getsockopt<int>(SOL_SOCKET, SO_KEEPALIVE);
}
virtual void set_keepalive_parameters(file_desc& _fd, const keepalive_params& params) const override {
- const tcp_keepalive_params& pms = compat::get<tcp_keepalive_params>(params);
+ const tcp_keepalive_params& pms = std::get<tcp_keepalive_params>(params);
_fd.setsockopt(IPPROTO_TCP, TCP_KEEPCNT, pms.count);
_fd.setsockopt(IPPROTO_TCP, TCP_KEEPIDLE, int(pms.idle.count()));
_fd.setsockopt(IPPROTO_TCP, TCP_KEEPINTVL, int(pms.interval.count()));
return _fd.getsockopt<sctp_paddrparams>(SOL_SCTP, SCTP_PEER_ADDR_PARAMS).spp_flags & SPP_HB_ENABLE;
}
virtual void set_keepalive_parameters(file_desc& _fd, const keepalive_params& kpms) const override {
- const sctp_keepalive_params& pms = compat::get<sctp_keepalive_params>(kpms);
+ const sctp_keepalive_params& pms = std::get<sctp_keepalive_params>(kpms);
auto params = _fd.getsockopt<sctp_paddrparams>(SOL_SCTP, SCTP_PEER_ADDR_PARAMS);
params.spp_hbinterval = pms.interval.count() * 1000; // in milliseconds
params.spp_pathmaxrxt = pms.count;
}
class posix_connected_socket_impl final : public connected_socket_impl {
- lw_shared_ptr<pollable_fd> _fd;
+ pollable_fd _fd;
const posix_connected_socket_operations* _ops;
conntrack::handle _handle;
- compat::polymorphic_allocator<char>* _allocator;
+ std::pmr::polymorphic_allocator<char>* _allocator;
private:
- explicit posix_connected_socket_impl(sa_family_t family, int protocol, lw_shared_ptr<pollable_fd> fd, compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) :
+ explicit posix_connected_socket_impl(sa_family_t family, int protocol, pollable_fd fd, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) :
_fd(std::move(fd)), _ops(get_posix_connected_socket_ops(family, protocol)), _allocator(allocator) {}
- explicit posix_connected_socket_impl(sa_family_t family, int protocol, lw_shared_ptr<pollable_fd> fd, conntrack::handle&& handle,
- compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _fd(std::move(fd))
+ explicit posix_connected_socket_impl(sa_family_t family, int protocol, pollable_fd fd, conntrack::handle&& handle,
+ std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _fd(std::move(fd))
, _ops(get_posix_connected_socket_ops(family, protocol)), _handle(std::move(handle)), _allocator(allocator) {}
public:
virtual data_source source() override {
- return data_source(std::make_unique< posix_data_source_impl>(_fd, _allocator));
+ return source(connected_socket_input_stream_config());
+ }
+ virtual data_source source(connected_socket_input_stream_config csisc) override {
+ return data_source(std::make_unique<posix_data_source_impl>(_fd, csisc, _allocator));
}
virtual data_sink sink() override {
return data_sink(std::make_unique< posix_data_sink_impl>(_fd));
}
virtual void shutdown_input() override {
- _fd->shutdown(SHUT_RD);
+ _fd.shutdown(SHUT_RD);
}
virtual void shutdown_output() override {
- _fd->shutdown(SHUT_WR);
+ _fd.shutdown(SHUT_WR);
}
virtual void set_nodelay(bool nodelay) override {
- return _ops->set_nodelay(_fd->get_file_desc(), nodelay);
+ return _ops->set_nodelay(_fd.get_file_desc(), nodelay);
}
virtual bool get_nodelay() const override {
- return _ops->get_nodelay(_fd->get_file_desc());
+ return _ops->get_nodelay(_fd.get_file_desc());
}
void set_keepalive(bool keepalive) override {
- return _ops->set_keepalive(_fd->get_file_desc(), keepalive);
+ return _ops->set_keepalive(_fd.get_file_desc(), keepalive);
}
bool get_keepalive() const override {
- return _ops->get_keepalive(_fd->get_file_desc());
+ return _ops->get_keepalive(_fd.get_file_desc());
}
void set_keepalive_parameters(const keepalive_params& p) override {
- return _ops->set_keepalive_parameters(_fd->get_file_desc(), p);
+ return _ops->set_keepalive_parameters(_fd.get_file_desc(), p);
}
keepalive_params get_keepalive_parameters() const override {
- return _ops->get_keepalive_parameters(_fd->get_file_desc());
+ return _ops->get_keepalive_parameters(_fd.get_file_desc());
+ }
+ void set_sockopt(int level, int optname, const void* data, size_t len) override {
+ return _ops->set_sockopt(_fd.get_file_desc(), level, optname, data, len);
+ }
+ int get_sockopt(int level, int optname, void* data, size_t len) const override {
+ return _ops->get_sockopt(_fd.get_file_desc(), level, optname, data, len);
}
friend class posix_server_socket_impl;
friend class posix_ap_server_socket_impl;
}
class posix_socket_impl final : public socket_impl {
- lw_shared_ptr<pollable_fd> _fd;
- compat::polymorphic_allocator<char>* _allocator;
+ pollable_fd _fd;
+ std::pmr::polymorphic_allocator<char>* _allocator;
bool _reuseaddr = false;
future<> find_port_and_connect(socket_address sa, socket_address local, transport proto = transport::TCP) {
resolve_outgoing_address(sa);
return repeat([this, sa, local, proto, attempts = 0, requested_port = ntoh(local.as_posix_sockaddr_in().sin_port)] () mutable {
_fd = engine().make_pollable_fd(sa, int(proto));
- _fd->get_file_desc().setsockopt(SOL_SOCKET, SO_REUSEADDR, int(_reuseaddr));
- uint16_t port = attempts++ < 5 && requested_port == 0 && proto == transport::TCP ? u(random_engine) * smp::count + engine().cpu_id() : requested_port;
+ _fd.get_file_desc().setsockopt(SOL_SOCKET, SO_REUSEADDR, int(_reuseaddr));
+ uint16_t port = attempts++ < 5 && requested_port == 0 && proto == transport::TCP ? u(random_engine) * smp::count + this_shard_id() : requested_port;
local.as_posix_sockaddr_in().sin_port = hton(port);
- return futurize_apply([this, sa, local] { return engine().posix_connect(_fd, sa, local); }).then_wrapped([port, requested_port] (future<> f) {
+ return futurize_invoke([this, sa, local] { return engine().posix_connect(_fd, sa, local); }).then_wrapped([port, requested_port] (future<> f) {
try {
f.get();
return stop_iteration::yes;
}
public:
- explicit posix_socket_impl(compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _allocator(allocator) {}
+ explicit posix_socket_impl(std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _allocator(allocator) {}
virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override {
if (sa.is_af_unix()) {
void set_reuseaddr(bool reuseaddr) override {
_reuseaddr = reuseaddr;
if (_fd) {
- _fd->get_file_desc().setsockopt(SOL_SOCKET, SO_REUSEADDR, int(reuseaddr));
+ _fd.get_file_desc().setsockopt(SOL_SOCKET, SO_REUSEADDR, int(reuseaddr));
}
}
bool get_reuseaddr() const override {
if(_fd) {
- return _fd->get_file_desc().getsockopt<int>(SOL_SOCKET, SO_REUSEADDR);
+ return _fd.get_file_desc().getsockopt<int>(SOL_SOCKET, SO_REUSEADDR);
} else {
return _reuseaddr;
}
virtual void shutdown() override {
if (_fd) {
try {
- _fd->shutdown(SHUT_RDWR);
+ _fd.shutdown(SHUT_RDWR);
} catch (std::system_error& e) {
if (e.code().value() != ENOTCONN) {
throw;
}
} ();
auto cpu = cth.cpu();
- if (cpu == engine().cpu_id()) {
+ if (cpu == this_shard_id()) {
std::unique_ptr<connected_socket_impl> csi(
- new posix_connected_socket_impl(sa.family(), _protocol, make_lw_shared(std::move(fd)), std::move(cth), _allocator));
+ new posix_connected_socket_impl(sa.family(), _protocol, std::move(fd), std::move(cth), _allocator));
return make_ready_future<accept_result>(
accept_result{connected_socket(std::move(csi)), sa});
} else {
conn_q.erase(conni);
try {
std::unique_ptr<connected_socket_impl> csi(
- new posix_connected_socket_impl(_sa.family(), _protocol, make_lw_shared(std::move(c.fd)), std::move(c.connection_tracking_handle), _allocator));
+ new posix_connected_socket_impl(_sa.family(), _protocol, std::move(c.fd), std::move(c.connection_tracking_handle), _allocator));
return make_ready_future<accept_result>(accept_result{connected_socket(std::move(csi)), std::move(c.addr)});
} catch (...) {
return make_exception_future<accept_result>(std::current_exception());
auto& fd = std::get<0>(fd_sa);
auto& sa = std::get<1>(fd_sa);
std::unique_ptr<connected_socket_impl> csi(
- new posix_connected_socket_impl(sa.family(), protocol, make_lw_shared(std::move(fd)), allocator));
+ new posix_connected_socket_impl(sa.family(), protocol, std::move(fd), allocator));
return make_ready_future<accept_result>(
accept_result{connected_socket(std::move(csi)), sa});
});
}
void
-posix_ap_server_socket_impl::move_connected_socket(int protocol, socket_address sa, pollable_fd fd, socket_address addr, conntrack::handle cth, compat::polymorphic_allocator<char>* allocator) {
+posix_ap_server_socket_impl::move_connected_socket(int protocol, socket_address sa, pollable_fd fd, socket_address addr, conntrack::handle cth, std::pmr::polymorphic_allocator<char>* allocator) {
auto t_sa = std::make_tuple(protocol, sa);
auto i = sockets.find(t_sa);
if (i != sockets.end()) {
try {
- std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(sa.family(), protocol, make_lw_shared(std::move(fd)), std::move(cth), allocator));
+ std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(sa.family(), protocol, std::move(fd), std::move(cth), allocator));
i->second.set_value(accept_result{connected_socket(std::move(csi)), std::move(addr)});
} catch (...) {
i->second.set_exception(std::current_exception());
future<temporary_buffer<char>>
posix_data_source_impl::get() {
- return _fd->read_some(_buf.get_write(), _buf_size).then([this] (size_t size) {
- _buf.trim(size);
- auto ret = std::move(_buf);
- _buf = make_temporary_buffer<char>(_buffer_allocator, _buf_size);
- return make_ready_future<temporary_buffer<char>>(std::move(ret));
+ return _fd.read_some(static_cast<internal::buffer_allocator*>(this)).then([this] (temporary_buffer<char> b) {
+ if (b.size() >= _config.buffer_size) {
+ _config.buffer_size *= 2;
+ _config.buffer_size = std::min(_config.buffer_size, _config.max_buffer_size);
+ } else if (b.size() <= _config.buffer_size / 4) {
+ _config.buffer_size /= 2;
+ _config.buffer_size = std::max(_config.buffer_size, _config.min_buffer_size);
+ }
+ return b;
});
}
+temporary_buffer<char>
+posix_data_source_impl::allocate_buffer() {
+ return make_temporary_buffer<char>(_buffer_allocator, _config.buffer_size);
+}
+
future<> posix_data_source_impl::close() {
- _fd->shutdown(SHUT_RD);
+ _fd.shutdown(SHUT_RD);
return make_ready_future<>();
}
future<>
posix_data_sink_impl::put(temporary_buffer<char> buf) {
- return _fd->write_all(buf.get(), buf.size()).then([d = buf.release()] {});
+ return _fd.write_all(buf.get(), buf.size()).then([d = buf.release()] {});
}
future<>
posix_data_sink_impl::put(packet p) {
_p = std::move(p);
- return _fd->write_all(_p).then([this] { _p.reset(); });
+ return _fd.write_all(_p).then([this] { _p.reset(); });
}
future<>
posix_data_sink_impl::close() {
- _fd->shutdown(SHUT_WR);
+ _fd.shutdown(SHUT_WR);
return make_ready_future<>();
}
+posix_network_stack::posix_network_stack(boost::program_options::variables_map opts, std::pmr::polymorphic_allocator<char>* allocator)
+ : _reuseport(engine().posix_reuseport_available()), _allocator(allocator) {
+}
+
server_socket
posix_network_stack::listen(socket_address sa, listen_options opt) {
- using server_socket = seastar::api_v2::server_socket;
+ using server_socket = seastar::server_socket;
// allow unspecified bind address -> default to ipv4 wildcard
if (sa.is_unspecified()) {
sa = inet_address(inet_address::family::INET);
return ::seastar::socket(std::make_unique<posix_socket_impl>(_allocator));
}
+posix_ap_network_stack::posix_ap_network_stack(boost::program_options::variables_map opts, std::pmr::polymorphic_allocator<char>* allocator)
+ : posix_network_stack(std::move(opts), allocator), _reuseport(engine().posix_reuseport_available()) {
+}
+
server_socket
posix_ap_network_stack::listen(socket_address sa, listen_options opt) {
- using server_socket = seastar::api_v2::server_socket;
+ using server_socket = seastar::server_socket;
// allow unspecified bind address -> default to ipv4 wildcard
if (sa.is_unspecified()) {
sa = inet_address(inet_address::family::INET);
send_ctx() {
memset(&_hdr, 0, sizeof(_hdr));
_hdr.msg_name = &_dst.u.sa;
- _hdr.msg_namelen = sizeof(_dst.u.sas);
+ _hdr.msg_namelen = _dst.addr_length;
}
void prepare(const socket_address& dst, packet p) {
_dst = dst;
+ _hdr.msg_namelen = _dst.addr_length;
_p = std::move(p);
_iovecs = to_iovec(_p);
_hdr.msg_iov = _iovecs.data();
resolve_outgoing_address(_dst);
}
};
- std::unique_ptr<pollable_fd> _fd;
+ pollable_fd _fd;
socket_address _address;
recv_ctx _recv;
send_ctx _send;
}
fd.bind(sa.u.sa, sizeof(sa.u.sas));
_address = fd.get_address();
- _fd = std::make_unique<pollable_fd>(std::move(fd));
+ _fd = std::move(fd);
}
virtual ~posix_udp_channel() { if (!_closed) close(); };
virtual future<udp_datagram> receive() override;
virtual future<> send(const socket_address& dst, const char *msg) override;
virtual future<> send(const socket_address& dst, packet p) override;
virtual void shutdown_input() override {
- _fd->abort_reader();
+ _fd.abort_reader();
}
virtual void shutdown_output() override {
- _fd->abort_writer();
+ _fd.abort_writer();
}
virtual void close() override {
_closed = true;
- _fd.reset();
+ _fd = {};
}
virtual bool is_closed() const override { return _closed; }
socket_address local_address() const override {
auto len = strlen(message);
auto a = dst;
resolve_outgoing_address(a);
- return _fd->sendto(a, message, len)
+ return _fd.sendto(a, message, len)
.then([len] (size_t size) { assert(size == len); });
}
future<> posix_udp_channel::send(const socket_address& dst, packet p) {
auto len = p.len();
_send.prepare(dst, std::move(p));
- return _fd->sendmsg(&_send._hdr)
+ return _fd.sendmsg(&_send._hdr)
.then([len] (size_t size) { assert(size == len); });
}
future<udp_datagram>
posix_udp_channel::receive() {
_recv.prepare();
- return _fd->recvmsg(&_recv._hdr).then([this] (size_t size) {
+ return _fd.recvmsg(&_recv._hdr).then([this] (size_t size) {
socket_address dst;
for (auto* cmsg = CMSG_FIRSTHDR(&_recv._hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&_recv._hdr, cmsg)) {
if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
- dst = ipv4_addr(reinterpret_cast<const in_pktinfo*>(CMSG_DATA(cmsg))->ipi_addr, _address.port());
+ dst = ipv4_addr(copy_reinterpret_cast<in_pktinfo>(CMSG_DATA(cmsg)).ipi_addr, _address.port());
break;
} else if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) {
- dst = ipv6_addr(reinterpret_cast<const in6_pktinfo*>(CMSG_DATA(cmsg))->ipi6_addr, _address.port());
+ dst = ipv6_addr(copy_reinterpret_cast<in6_pktinfo>(CMSG_DATA(cmsg)).ipi6_addr, _address.port());
break;
}
}
for (auto& nwif : res) {
if (nwif._index == addr->ifa_index) {
for (auto* attribute = IFA_RTA(addr); RTA_OK(attribute, ilen); attribute = RTA_NEXT(attribute, ilen)) {
- compat::optional<inet_address> ia;
+ std::optional<inet_address> ia;
switch(attribute->rta_type) {
case IFA_LOCAL: