]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/seastar/src/net/posix-stack.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / net / posix-stack.cc
index 192eb3a81869ff1cccac0b6b7b7a515e6082ee8d..c1d818b1dc3f8d46283f7e317bc3e33a9a647fed 100644 (file)
 
 #include <random>
 
+#include <sys/socket.h>
 #include <linux/if.h>
 #include <linux/netlink.h>
 #include <linux/rtnetlink.h>
 #include <net/route.h>
 
+#include <seastar/core/loop.hh>
+#include <seastar/core/reactor.hh>
 #include <seastar/net/posix-stack.hh>
 #include <seastar/net/net.hh>
 #include <seastar/net/packet.hh>
@@ -48,6 +51,23 @@ struct hash<seastar::net::posix_ap_server_socket_impl::protocol_and_socket_addre
 
 }
 
+
+namespace {
+
+// reinterpret_cast<foo*>() on a pointer that the compiler knows points to an
+// object with a different type is disliked by the compiler as it violates
+// strict aliasing rules. This safe version does the same thing but keeps the
+// compiler happy.
+template <typename T>
+T
+copy_reinterpret_cast(const void* ptr) {
+    T tmp;
+    std::memcpy(&tmp, ptr, sizeof(T));
+    return tmp;
+}
+
+}
+
 namespace seastar {
 
 namespace net {
@@ -63,6 +83,12 @@ public:
     virtual bool get_keepalive(file_desc& _fd) const = 0;
     virtual void set_keepalive_parameters(file_desc& _fd, const keepalive_params& params) const = 0;
     virtual keepalive_params get_keepalive_parameters(file_desc& _fd) const = 0;
+    virtual void set_sockopt(file_desc& _fd, int level, int optname, const void* data, size_t len) const {
+        _fd.setsockopt(level, optname, data, socklen_t(len));
+    }
+    virtual int get_sockopt(file_desc& _fd, int level, int optname, void* data, size_t len) const {
+        return _fd.getsockopt(level, optname, reinterpret_cast<char*>(data), socklen_t(len));
+    }
 };
 
 thread_local posix_ap_server_socket_impl::sockets_map_t posix_ap_server_socket_impl::sockets{};
@@ -83,7 +109,7 @@ public:
         return _fd.getsockopt<int>(SOL_SOCKET, SO_KEEPALIVE);
     }
     virtual void set_keepalive_parameters(file_desc& _fd, const keepalive_params& params) const override {
-        const tcp_keepalive_params& pms = compat::get<tcp_keepalive_params>(params);
+        const tcp_keepalive_params& pms = std::get<tcp_keepalive_params>(params);
         _fd.setsockopt(IPPROTO_TCP, TCP_KEEPCNT, pms.count);
         _fd.setsockopt(IPPROTO_TCP, TCP_KEEPIDLE, int(pms.idle.count()));
         _fd.setsockopt(IPPROTO_TCP, TCP_KEEPINTVL, int(pms.interval.count()));
@@ -118,7 +144,7 @@ public:
         return _fd.getsockopt<sctp_paddrparams>(SOL_SCTP, SCTP_PEER_ADDR_PARAMS).spp_flags & SPP_HB_ENABLE;
     }
     virtual void set_keepalive_parameters(file_desc& _fd, const keepalive_params& kpms) const override {
-        const sctp_keepalive_params& pms = compat::get<sctp_keepalive_params>(kpms);
+        const sctp_keepalive_params& pms = std::get<sctp_keepalive_params>(kpms);
         auto params = _fd.getsockopt<sctp_paddrparams>(SOL_SCTP, SCTP_PEER_ADDR_PARAMS);
         params.spp_hbinterval = pms.interval.count() * 1000; // in milliseconds
         params.spp_pathmaxrxt = pms.count;
@@ -172,46 +198,55 @@ get_posix_connected_socket_ops(sa_family_t family, int protocol) {
 }
 
 class posix_connected_socket_impl final : public connected_socket_impl {
-    lw_shared_ptr<pollable_fd> _fd;
+    pollable_fd _fd;
     const posix_connected_socket_operations* _ops;
     conntrack::handle _handle;
-    compat::polymorphic_allocator<char>* _allocator;
+    std::pmr::polymorphic_allocator<char>* _allocator;
 private:
-    explicit posix_connected_socket_impl(sa_family_t family, int protocol, lw_shared_ptr<pollable_fd> fd, compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) :
+    explicit posix_connected_socket_impl(sa_family_t family, int protocol, pollable_fd fd, std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) :
         _fd(std::move(fd)), _ops(get_posix_connected_socket_ops(family, protocol)), _allocator(allocator) {}
-    explicit posix_connected_socket_impl(sa_family_t family, int protocol, lw_shared_ptr<pollable_fd> fd, conntrack::handle&& handle,
-        compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _fd(std::move(fd))
+    explicit posix_connected_socket_impl(sa_family_t family, int protocol, pollable_fd fd, conntrack::handle&& handle,
+        std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _fd(std::move(fd))
                 , _ops(get_posix_connected_socket_ops(family, protocol)), _handle(std::move(handle)), _allocator(allocator) {}
 public:
     virtual data_source source() override {
-        return data_source(std::make_unique< posix_data_source_impl>(_fd, _allocator));
+        return source(connected_socket_input_stream_config());
+    }
+    virtual data_source source(connected_socket_input_stream_config csisc) override {
+        return data_source(std::make_unique<posix_data_source_impl>(_fd, csisc, _allocator));
     }
     virtual data_sink sink() override {
         return data_sink(std::make_unique< posix_data_sink_impl>(_fd));
     }
     virtual void shutdown_input() override {
-        _fd->shutdown(SHUT_RD);
+        _fd.shutdown(SHUT_RD);
     }
     virtual void shutdown_output() override {
-        _fd->shutdown(SHUT_WR);
+        _fd.shutdown(SHUT_WR);
     }
     virtual void set_nodelay(bool nodelay) override {
-        return _ops->set_nodelay(_fd->get_file_desc(), nodelay);
+        return _ops->set_nodelay(_fd.get_file_desc(), nodelay);
     }
     virtual bool get_nodelay() const override {
-        return _ops->get_nodelay(_fd->get_file_desc());
+        return _ops->get_nodelay(_fd.get_file_desc());
     }
     void set_keepalive(bool keepalive) override {
-        return _ops->set_keepalive(_fd->get_file_desc(), keepalive);
+        return _ops->set_keepalive(_fd.get_file_desc(), keepalive);
     }
     bool get_keepalive() const override {
-        return _ops->get_keepalive(_fd->get_file_desc());
+        return _ops->get_keepalive(_fd.get_file_desc());
     }
     void set_keepalive_parameters(const keepalive_params& p) override {
-        return _ops->set_keepalive_parameters(_fd->get_file_desc(), p);
+        return _ops->set_keepalive_parameters(_fd.get_file_desc(), p);
     }
     keepalive_params get_keepalive_parameters() const override {
-        return _ops->get_keepalive_parameters(_fd->get_file_desc());
+        return _ops->get_keepalive_parameters(_fd.get_file_desc());
+    }
+    void set_sockopt(int level, int optname, const void* data, size_t len) override {
+        return _ops->set_sockopt(_fd.get_file_desc(), level, optname, data, len);
+    }
+    int get_sockopt(int level, int optname, void* data, size_t len) const override {
+        return _ops->get_sockopt(_fd.get_file_desc(), level, optname, data, len);
     }
     friend class posix_server_socket_impl;
     friend class posix_ap_server_socket_impl;
@@ -330,8 +365,8 @@ static void resolve_outgoing_address(socket_address& a) {
 }
 
 class posix_socket_impl final : public socket_impl {
-    lw_shared_ptr<pollable_fd> _fd;
-    compat::polymorphic_allocator<char>* _allocator;
+    pollable_fd _fd;
+    std::pmr::polymorphic_allocator<char>* _allocator;
     bool _reuseaddr = false;
 
     future<> find_port_and_connect(socket_address sa, socket_address local, transport proto = transport::TCP) {
@@ -344,10 +379,10 @@ class posix_socket_impl final : public socket_impl {
         resolve_outgoing_address(sa);
         return repeat([this, sa, local, proto, attempts = 0, requested_port = ntoh(local.as_posix_sockaddr_in().sin_port)] () mutable {
             _fd = engine().make_pollable_fd(sa, int(proto));
-            _fd->get_file_desc().setsockopt(SOL_SOCKET, SO_REUSEADDR, int(_reuseaddr));
-            uint16_t port = attempts++ < 5 && requested_port == 0 && proto == transport::TCP ? u(random_engine) * smp::count + engine().cpu_id() : requested_port;
+            _fd.get_file_desc().setsockopt(SOL_SOCKET, SO_REUSEADDR, int(_reuseaddr));
+            uint16_t port = attempts++ < 5 && requested_port == 0 && proto == transport::TCP ? u(random_engine) * smp::count + this_shard_id() : requested_port;
             local.as_posix_sockaddr_in().sin_port = hton(port);
-            return futurize_apply([this, sa, local] { return engine().posix_connect(_fd, sa, local); }).then_wrapped([port, requested_port] (future<> f) {
+            return futurize_invoke([this, sa, local] { return engine().posix_connect(_fd, sa, local); }).then_wrapped([port, requested_port] (future<> f) {
                 try {
                     f.get();
                     return stop_iteration::yes;
@@ -380,7 +415,7 @@ class posix_socket_impl final : public socket_impl {
     }
 
 public:
-    explicit posix_socket_impl(compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _allocator(allocator) {}
+    explicit posix_socket_impl(std::pmr::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _allocator(allocator) {}
 
     virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override {
         if (sa.is_af_unix()) {
@@ -396,13 +431,13 @@ public:
     void set_reuseaddr(bool reuseaddr) override {
         _reuseaddr = reuseaddr;
         if (_fd) {
-            _fd->get_file_desc().setsockopt(SOL_SOCKET, SO_REUSEADDR, int(reuseaddr));
+            _fd.get_file_desc().setsockopt(SOL_SOCKET, SO_REUSEADDR, int(reuseaddr));
         }
     }
 
     bool get_reuseaddr() const override {
         if(_fd) {
-            return _fd->get_file_desc().getsockopt<int>(SOL_SOCKET, SO_REUSEADDR);
+            return _fd.get_file_desc().getsockopt<int>(SOL_SOCKET, SO_REUSEADDR);
         } else {
             return _reuseaddr;
         }
@@ -411,7 +446,7 @@ public:
     virtual void shutdown() override {
         if (_fd) {
             try {
-                _fd->shutdown(SHUT_RDWR);
+                _fd.shutdown(SHUT_RDWR);
             } catch (std::system_error& e) {
                 if (e.code().value() != ENOTCONN) {
                     throw;
@@ -438,9 +473,9 @@ posix_server_socket_impl::accept() {
             }
         } ();
         auto cpu = cth.cpu();
-        if (cpu == engine().cpu_id()) {
+        if (cpu == this_shard_id()) {
             std::unique_ptr<connected_socket_impl> csi(
-                    new posix_connected_socket_impl(sa.family(), _protocol, make_lw_shared(std::move(fd)), std::move(cth), _allocator));
+                    new posix_connected_socket_impl(sa.family(), _protocol, std::move(fd), std::move(cth), _allocator));
             return make_ready_future<accept_result>(
                     accept_result{connected_socket(std::move(csi)), sa});
         } else {
@@ -470,7 +505,7 @@ future<accept_result> posix_ap_server_socket_impl::accept() {
         conn_q.erase(conni);
         try {
             std::unique_ptr<connected_socket_impl> csi(
-                    new posix_connected_socket_impl(_sa.family(), _protocol, make_lw_shared(std::move(c.fd)), std::move(c.connection_tracking_handle), _allocator));
+                    new posix_connected_socket_impl(_sa.family(), _protocol, std::move(c.fd), std::move(c.connection_tracking_handle), _allocator));
             return make_ready_future<accept_result>(accept_result{connected_socket(std::move(csi)), std::move(c.addr)});
         } catch (...) {
             return make_exception_future<accept_result>(std::current_exception());
@@ -503,7 +538,7 @@ posix_reuseport_server_socket_impl::accept() {
         auto& fd = std::get<0>(fd_sa);
         auto& sa = std::get<1>(fd_sa);
         std::unique_ptr<connected_socket_impl> csi(
-                new posix_connected_socket_impl(sa.family(), protocol, make_lw_shared(std::move(fd)), allocator));
+                new posix_connected_socket_impl(sa.family(), protocol, std::move(fd), allocator));
         return make_ready_future<accept_result>(
             accept_result{connected_socket(std::move(csi)), sa});
     });
@@ -519,12 +554,12 @@ socket_address posix_reuseport_server_socket_impl::local_address() const {
 }
 
 void
-posix_ap_server_socket_impl::move_connected_socket(int protocol, socket_address sa, pollable_fd fd, socket_address addr, conntrack::handle cth, compat::polymorphic_allocator<char>* allocator) {
+posix_ap_server_socket_impl::move_connected_socket(int protocol, socket_address sa, pollable_fd fd, socket_address addr, conntrack::handle cth, std::pmr::polymorphic_allocator<char>* allocator) {
     auto t_sa = std::make_tuple(protocol, sa);
     auto i = sockets.find(t_sa);
     if (i != sockets.end()) {
         try {
-            std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(sa.family(), protocol, make_lw_shared(std::move(fd)), std::move(cth), allocator));
+            std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(sa.family(), protocol, std::move(fd), std::move(cth), allocator));
             i->second.set_value(accept_result{connected_socket(std::move(csi)), std::move(addr)});
         } catch (...) {
             i->second.set_exception(std::current_exception());
@@ -537,16 +572,25 @@ posix_ap_server_socket_impl::move_connected_socket(int protocol, socket_address
 
 future<temporary_buffer<char>>
 posix_data_source_impl::get() {
-    return _fd->read_some(_buf.get_write(), _buf_size).then([this] (size_t size) {
-        _buf.trim(size);
-        auto ret = std::move(_buf);
-        _buf = make_temporary_buffer<char>(_buffer_allocator, _buf_size);
-        return make_ready_future<temporary_buffer<char>>(std::move(ret));
+    return _fd.read_some(static_cast<internal::buffer_allocator*>(this)).then([this] (temporary_buffer<char> b) {
+        if (b.size() >= _config.buffer_size) {
+            _config.buffer_size *= 2;
+            _config.buffer_size = std::min(_config.buffer_size, _config.max_buffer_size);
+        } else if (b.size() <= _config.buffer_size / 4) {
+            _config.buffer_size /= 2;
+            _config.buffer_size = std::max(_config.buffer_size, _config.min_buffer_size);
+        }
+        return b;
     });
 }
 
+temporary_buffer<char>
+posix_data_source_impl::allocate_buffer() {
+    return make_temporary_buffer<char>(_buffer_allocator, _config.buffer_size);
+}
+
 future<> posix_data_source_impl::close() {
-    _fd->shutdown(SHUT_RD);
+    _fd.shutdown(SHUT_RD);
     return make_ready_future<>();
 }
 
@@ -570,24 +614,28 @@ std::vector<iovec> to_iovec(std::vector<temporary_buffer<char>>& buf_vec) {
 
 future<>
 posix_data_sink_impl::put(temporary_buffer<char> buf) {
-    return _fd->write_all(buf.get(), buf.size()).then([d = buf.release()] {});
+    return _fd.write_all(buf.get(), buf.size()).then([d = buf.release()] {});
 }
 
 future<>
 posix_data_sink_impl::put(packet p) {
     _p = std::move(p);
-    return _fd->write_all(_p).then([this] { _p.reset(); });
+    return _fd.write_all(_p).then([this] { _p.reset(); });
 }
 
 future<>
 posix_data_sink_impl::close() {
-    _fd->shutdown(SHUT_WR);
+    _fd.shutdown(SHUT_WR);
     return make_ready_future<>();
 }
 
+posix_network_stack::posix_network_stack(boost::program_options::variables_map opts, std::pmr::polymorphic_allocator<char>* allocator)
+        : _reuseport(engine().posix_reuseport_available()), _allocator(allocator) {
+}
+
 server_socket
 posix_network_stack::listen(socket_address sa, listen_options opt) {
-    using server_socket = seastar::api_v2::server_socket;
+    using server_socket = seastar::server_socket;
     // allow unspecified bind address -> default to ipv4 wildcard
     if (sa.is_unspecified()) {
         sa = inet_address(inet_address::family::INET);
@@ -606,9 +654,13 @@ posix_network_stack::listen(socket_address sa, listen_options opt) {
     return ::seastar::socket(std::make_unique<posix_socket_impl>(_allocator));
 }
 
+posix_ap_network_stack::posix_ap_network_stack(boost::program_options::variables_map opts, std::pmr::polymorphic_allocator<char>* allocator)
+        : posix_network_stack(std::move(opts), allocator), _reuseport(engine().posix_reuseport_available()) {
+}
+
 server_socket
 posix_ap_network_stack::listen(socket_address sa, listen_options opt) {
-    using server_socket = seastar::api_v2::server_socket;
+    using server_socket = seastar::server_socket;
     // allow unspecified bind address -> default to ipv4 wildcard
     if (sa.is_unspecified()) {
         sa = inet_address(inet_address::family::INET);
@@ -667,11 +719,12 @@ private:
         send_ctx() {
             memset(&_hdr, 0, sizeof(_hdr));
             _hdr.msg_name = &_dst.u.sa;
-            _hdr.msg_namelen = sizeof(_dst.u.sas);
+            _hdr.msg_namelen = _dst.addr_length;
         }
 
         void prepare(const socket_address& dst, packet p) {
             _dst = dst;
+            _hdr.msg_namelen = _dst.addr_length;
             _p = std::move(p);
             _iovecs = to_iovec(_p);
             _hdr.msg_iov = _iovecs.data();
@@ -679,7 +732,7 @@ private:
             resolve_outgoing_address(_dst);
         }
     };
-    std::unique_ptr<pollable_fd> _fd;
+    pollable_fd _fd;
     socket_address _address;
     recv_ctx _recv;
     send_ctx _send;
@@ -695,21 +748,21 @@ public:
         }
         fd.bind(sa.u.sa, sizeof(sa.u.sas));
         _address = fd.get_address();
-        _fd = std::make_unique<pollable_fd>(std::move(fd));
+        _fd = std::move(fd);
     }
     virtual ~posix_udp_channel() { if (!_closed) close(); };
     virtual future<udp_datagram> receive() override;
     virtual future<> send(const socket_address& dst, const char *msg) override;
     virtual future<> send(const socket_address& dst, packet p) override;
     virtual void shutdown_input() override {
-        _fd->abort_reader();
+        _fd.abort_reader();
     }
     virtual void shutdown_output() override {
-        _fd->abort_writer();
+        _fd.abort_writer();
     }
     virtual void close() override {
         _closed = true;
-        _fd.reset();
+        _fd = {};
     }
     virtual bool is_closed() const override { return _closed; }
     socket_address local_address() const override {
@@ -722,14 +775,14 @@ future<> posix_udp_channel::send(const socket_address& dst, const char *message)
     auto len = strlen(message);
     auto a = dst;
     resolve_outgoing_address(a);
-    return _fd->sendto(a, message, len)
+    return _fd.sendto(a, message, len)
             .then([len] (size_t size) { assert(size == len); });
 }
 
 future<> posix_udp_channel::send(const socket_address& dst, packet p) {
     auto len = p.len();
     _send.prepare(dst, std::move(p));
-    return _fd->sendmsg(&_send._hdr)
+    return _fd.sendmsg(&_send._hdr)
             .then([len] (size_t size) { assert(size == len); });
 }
 
@@ -768,14 +821,14 @@ public:
 future<udp_datagram>
 posix_udp_channel::receive() {
     _recv.prepare();
-    return _fd->recvmsg(&_recv._hdr).then([this] (size_t size) {
+    return _fd.recvmsg(&_recv._hdr).then([this] (size_t size) {
         socket_address dst;
         for (auto* cmsg = CMSG_FIRSTHDR(&_recv._hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&_recv._hdr, cmsg)) {
             if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
-                dst = ipv4_addr(reinterpret_cast<const in_pktinfo*>(CMSG_DATA(cmsg))->ipi_addr, _address.port());
+                dst = ipv4_addr(copy_reinterpret_cast<in_pktinfo>(CMSG_DATA(cmsg)).ipi_addr, _address.port());
                 break;
             } else if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) {
-                dst = ipv6_addr(reinterpret_cast<const in6_pktinfo*>(CMSG_DATA(cmsg))->ipi6_addr, _address.port());
+                dst = ipv6_addr(copy_reinterpret_cast<in6_pktinfo>(CMSG_DATA(cmsg)).ipi6_addr, _address.port());
                 break;
             }
         }
@@ -970,7 +1023,7 @@ std::vector<network_interface> posix_network_stack::network_interfaces() {
                         for (auto& nwif : res) {
                             if (nwif._index == addr->ifa_index) {
                                 for (auto* attribute = IFA_RTA(addr); RTA_OK(attribute, ilen); attribute = RTA_NEXT(attribute, ilen)) {
-                                    compat::optional<inet_address> ia;
+                                    std::optional<inet_address> ia;
                                     
                                     switch(attribute->rta_type) {
                                     case IFA_LOCAL: