]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/msg/async/PosixStack.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / msg / async / PosixStack.cc
index 0fc344c2ff4c8ae23b0a2cc37197d380c8b50538..a38e82cf39ccf37921f16eb4b6212279c122f3ed 100644 (file)
 #define dout_prefix *_dout << "PosixStack "
 
 class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
-  NetHandler &handler;
+  ceph::NetHandler &handler;
   int _fd;
   entity_addr_t sa;
   bool connected;
 
  public:
-  explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
+  explicit PosixConnectedSocketImpl(ceph::NetHandler &h, const entity_addr_t &sa,
+                                   int f, bool connected)
       : handler(h), _fd(f), sa(sa), connected(connected) {}
 
   int is_connected() override {
@@ -63,14 +64,19 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
   }
 
   ssize_t read(char *buf, size_t len) override {
+    #ifdef _WIN32
+    ssize_t r = ::recv(_fd, buf, len, 0);
+    #else
     ssize_t r = ::read(_fd, buf, len);
+    #endif
     if (r < 0)
-      r = -errno;
+      r = -ceph_sock_errno();
     return r;
   }
 
   // return the sent length
   // < 0 means error occurred
+  #ifndef _WIN32
   static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
   {
     size_t sent = 0;
@@ -79,12 +85,13 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
       ssize_t r;
       r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
       if (r < 0) {
-        if (errno == EINTR) {
+        int err = ceph_sock_errno();
+        if (err == EINTR) {
           continue;
-        } else if (errno == EAGAIN) {
+        } else if (err == EAGAIN) {
           break;
         }
-        return -errno;
+        return -err;
       }
 
       sent += r;
@@ -106,7 +113,7 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
     return (ssize_t)sent;
   }
 
-  ssize_t send(bufferlist &bl, bool more) override {
+  ssize_t send(ceph::buffer::list &bl, bool more) override {
     size_t sent_bytes = 0;
     auto pb = std::cbegin(bl.buffers());
     uint64_t left_pbrs = bl.get_num_buffers();
@@ -138,7 +145,7 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
     }
 
     if (sent_bytes) {
-      bufferlist swapped;
+      ceph::buffer::list swapped;
       if (sent_bytes < bl.length()) {
         bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
         bl.swap(swapped);
@@ -149,11 +156,55 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
 
     return static_cast<ssize_t>(sent_bytes);
   }
+  #else
+  ssize_t send(bufferlist &bl, bool more) override
+  {
+    size_t total_sent_bytes = 0;
+    auto pb = std::cbegin(bl.buffers());
+    uint64_t left_pbrs = bl.get_num_buffers();
+    while (left_pbrs) {
+      WSABUF msgvec[IOV_MAX];
+      uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX);
+      left_pbrs -= size;
+      unsigned msglen = 0;
+      for (auto iov = msgvec; iov != msgvec + size; iov++) {
+        iov->buf = const_cast<char*>(pb->c_str());
+        iov->len = pb->length();
+        msglen += pb->length();
+        ++pb;
+      }
+      DWORD sent_bytes = 0;
+      DWORD flags = 0;
+      if (more)
+        flags |= MSG_PARTIAL;
+
+      int ret_val = WSASend(_fd, msgvec, size, &sent_bytes, flags, NULL, NULL);
+      if (ret_val)
+        return -ret_val;
+
+      total_sent_bytes += sent_bytes;
+      if (static_cast<unsigned>(sent_bytes) < msglen)
+        break;
+    }
+
+    if (total_sent_bytes) {
+      bufferlist swapped;
+      if (total_sent_bytes < bl.length()) {
+        bl.splice(total_sent_bytes, bl.length()-total_sent_bytes, &swapped);
+        bl.swap(swapped);
+      } else {
+        bl.clear();
+      }
+    }
+
+    return static_cast<ssize_t>(total_sent_bytes);
+  }
+  #endif
   void shutdown() override {
     ::shutdown(_fd, SHUT_RDWR);
   }
   void close() override {
-    ::close(_fd);
+    compat_closesocket(_fd);
   }
   int fd() const override {
     return _fd;
@@ -163,11 +214,11 @@ class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
 };
 
 class PosixServerSocketImpl : public ServerSocketImpl {
-  NetHandler &handler;
+  ceph::NetHandler &handler;
   int _fd;
 
  public:
-  explicit PosixServerSocketImpl(NetHandler &h, int f,
+  explicit PosixServerSocketImpl(ceph::NetHandler &h, int f,
                                 const entity_addr_t& listen_addr, unsigned slot)
     : ServerSocketImpl(listen_addr.get_type(), slot),
       handler(h), _fd(f) {}
@@ -187,19 +238,19 @@ int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &op
   socklen_t slen = sizeof(ss);
   int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
   if (sd < 0) {
-    return -errno;
+    return -ceph_sock_errno();
   }
 
   int r = handler.set_nonblock(sd);
   if (r < 0) {
     ::close(sd);
-    return -errno;
+    return -ceph_sock_errno();
   }
 
   r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
   if (r < 0) {
     ::close(sd);
-    return -errno;
+    return -ceph_sock_errno();
   }
 
   ceph_assert(NULL != out); //out should not be NULL in accept connection
@@ -224,24 +275,24 @@ int PosixWorker::listen(entity_addr_t &sa,
 {
   int listen_sd = net.create_socket(sa.get_family(), true);
   if (listen_sd < 0) {
-    return -errno;
+    return -ceph_sock_errno();
   }
 
   int r = net.set_nonblock(listen_sd);
   if (r < 0) {
     ::close(listen_sd);
-    return -errno;
+    return -ceph_sock_errno();
   }
 
   r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
   if (r < 0) {
     ::close(listen_sd);
-    return -errno;
+    return -ceph_sock_errno();
   }
 
   r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
   if (r < 0) {
-    r = -errno;
+    r = -ceph_sock_errno();
     ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
                    << ": " << cpp_strerror(r) << dendl;
     ::close(listen_sd);
@@ -250,7 +301,7 @@ int PosixWorker::listen(entity_addr_t &sa,
 
   r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
   if (r < 0) {
-    r = -errno;
+    r = -ceph_sock_errno();
     lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
     ::close(listen_sd);
     return r;
@@ -272,7 +323,7 @@ int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, C
   }
 
   if (sd < 0) {
-    return -errno;
+    return -ceph_sock_errno();
   }
 
   net.set_priority(sd, opts.priority, addr.get_family());
@@ -281,7 +332,7 @@ int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, C
   return 0;
 }
 
-PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
-    : NetworkStack(c, t)
+PosixNetworkStack::PosixNetworkStack(CephContext *c)
+    : NetworkStack(c)
 {
 }