#define dout_prefix *_dout << "PosixStack "
class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
- NetHandler &handler;
+ ceph::NetHandler &handler;
int _fd;
entity_addr_t sa;
bool connected;
public:
- explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
+ explicit PosixConnectedSocketImpl(ceph::NetHandler &h, const entity_addr_t &sa,
+ int f, bool connected)
: handler(h), _fd(f), sa(sa), connected(connected) {}
int is_connected() override {
}
ssize_t read(char *buf, size_t len) override {
+ #ifdef _WIN32
+ ssize_t r = ::recv(_fd, buf, len, 0);
+ #else
ssize_t r = ::read(_fd, buf, len);
+ #endif
if (r < 0)
- r = -errno;
+ r = -ceph_sock_errno();
return r;
}
// return the sent length
// < 0 means error occurred
+ #ifndef _WIN32
static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
{
size_t sent = 0;
ssize_t r;
r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
if (r < 0) {
- if (errno == EINTR) {
+ int err = ceph_sock_errno();
+ if (err == EINTR) {
continue;
- } else if (errno == EAGAIN) {
+ } else if (err == EAGAIN) {
break;
}
- return -errno;
+ return -err;
}
sent += r;
return (ssize_t)sent;
}
- ssize_t send(bufferlist &bl, bool more) override {
+ ssize_t send(ceph::buffer::list &bl, bool more) override {
size_t sent_bytes = 0;
auto pb = std::cbegin(bl.buffers());
uint64_t left_pbrs = bl.get_num_buffers();
}
if (sent_bytes) {
- bufferlist swapped;
+ ceph::buffer::list swapped;
if (sent_bytes < bl.length()) {
bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
bl.swap(swapped);
return static_cast<ssize_t>(sent_bytes);
}
+ #else
+ ssize_t send(bufferlist &bl, bool more) override
+ {
+ size_t total_sent_bytes = 0;
+ auto pb = std::cbegin(bl.buffers());
+ uint64_t left_pbrs = bl.get_num_buffers();
+ while (left_pbrs) {
+ WSABUF msgvec[IOV_MAX];
+ uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX);
+ left_pbrs -= size;
+ unsigned msglen = 0;
+ for (auto iov = msgvec; iov != msgvec + size; iov++) {
+ iov->buf = const_cast<char*>(pb->c_str());
+ iov->len = pb->length();
+ msglen += pb->length();
+ ++pb;
+ }
+ DWORD sent_bytes = 0;
+ DWORD flags = 0;
+ if (more)
+ flags |= MSG_PARTIAL;
+
+ int ret_val = WSASend(_fd, msgvec, size, &sent_bytes, flags, NULL, NULL);
+ if (ret_val)
+ return -ret_val;
+
+ total_sent_bytes += sent_bytes;
+ if (static_cast<unsigned>(sent_bytes) < msglen)
+ break;
+ }
+
+ if (total_sent_bytes) {
+ bufferlist swapped;
+ if (total_sent_bytes < bl.length()) {
+ bl.splice(total_sent_bytes, bl.length()-total_sent_bytes, &swapped);
+ bl.swap(swapped);
+ } else {
+ bl.clear();
+ }
+ }
+
+ return static_cast<ssize_t>(total_sent_bytes);
+ }
+ #endif
void shutdown() override {
::shutdown(_fd, SHUT_RDWR);
}
void close() override {
- ::close(_fd);
+ compat_closesocket(_fd);
}
int fd() const override {
return _fd;
};
class PosixServerSocketImpl : public ServerSocketImpl {
- NetHandler &handler;
+ ceph::NetHandler &handler;
int _fd;
public:
- explicit PosixServerSocketImpl(NetHandler &h, int f,
+ explicit PosixServerSocketImpl(ceph::NetHandler &h, int f,
const entity_addr_t& listen_addr, unsigned slot)
: ServerSocketImpl(listen_addr.get_type(), slot),
handler(h), _fd(f) {}
socklen_t slen = sizeof(ss);
int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
if (sd < 0) {
- return -errno;
+ return -ceph_sock_errno();
}
int r = handler.set_nonblock(sd);
if (r < 0) {
::close(sd);
- return -errno;
+ return -ceph_sock_errno();
}
r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
if (r < 0) {
::close(sd);
- return -errno;
+ return -ceph_sock_errno();
}
ceph_assert(NULL != out); //out should not be NULL in accept connection
{
int listen_sd = net.create_socket(sa.get_family(), true);
if (listen_sd < 0) {
- return -errno;
+ return -ceph_sock_errno();
}
int r = net.set_nonblock(listen_sd);
if (r < 0) {
::close(listen_sd);
- return -errno;
+ return -ceph_sock_errno();
}
r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
if (r < 0) {
::close(listen_sd);
- return -errno;
+ return -ceph_sock_errno();
}
r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
if (r < 0) {
- r = -errno;
+ r = -ceph_sock_errno();
ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
<< ": " << cpp_strerror(r) << dendl;
::close(listen_sd);
r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
if (r < 0) {
- r = -errno;
+ r = -ceph_sock_errno();
lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
::close(listen_sd);
return r;
}
if (sd < 0) {
- return -errno;
+ return -ceph_sock_errno();
}
net.set_priority(sd, opts.priority, addr.get_family());
return 0;
}
-PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
- : NetworkStack(c, t)
+PosixNetworkStack::PosixNetworkStack(CephContext *c)
+ : NetworkStack(c)
{
}