]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/PosixStack.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / msg / async / PosixStack.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <sys/socket.h>
18#include <netinet/tcp.h>
19#include <netinet/in.h>
20#include <arpa/inet.h>
21#include <errno.h>
22
23#include <algorithm>
24
25#include "PosixStack.h"
26
27#include "include/buffer.h"
28#include "include/str_list.h"
7c673cae
FG
29#include "common/errno.h"
30#include "common/strtol.h"
31#include "common/dout.h"
3efd9988 32#include "msg/Messenger.h"
91327a77 33#include "include/compat.h"
3efd9988 34#include "include/sock_compat.h"
7c673cae
FG
35
36#define dout_subsys ceph_subsys_ms
37#undef dout_prefix
38#define dout_prefix *_dout << "PosixStack "
39
40class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
f67539c2 41 ceph::NetHandler &handler;
7c673cae
FG
42 int _fd;
43 entity_addr_t sa;
44 bool connected;
7c673cae
FG
45
46 public:
f67539c2
TL
47 explicit PosixConnectedSocketImpl(ceph::NetHandler &h, const entity_addr_t &sa,
48 int f, bool connected)
7c673cae
FG
49 : handler(h), _fd(f), sa(sa), connected(connected) {}
50
51 int is_connected() override {
52 if (connected)
53 return 1;
54
55 int r = handler.reconnect(sa, _fd);
56 if (r == 0) {
57 connected = true;
58 return 1;
59 } else if (r < 0) {
60 return r;
61 } else {
62 return 0;
63 }
64 }
65
7c673cae 66 ssize_t read(char *buf, size_t len) override {
f67539c2
TL
67 #ifdef _WIN32
68 ssize_t r = ::recv(_fd, buf, len, 0);
69 #else
7c673cae 70 ssize_t r = ::read(_fd, buf, len);
f67539c2 71 #endif
7c673cae 72 if (r < 0)
f67539c2 73 r = -ceph_sock_errno();
7c673cae
FG
74 return r;
75 }
76
7c673cae 77 // return the sent length
11fdf7f2 78 // < 0 means error occurred
f67539c2 79 #ifndef _WIN32
7c673cae
FG
80 static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
81 {
7c673cae
FG
82 size_t sent = 0;
83 while (1) {
3efd9988 84 MSGR_SIGPIPE_STOPPER;
7c673cae 85 ssize_t r;
7c673cae 86 r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
7c673cae 87 if (r < 0) {
f67539c2
TL
88 int err = ceph_sock_errno();
89 if (err == EINTR) {
7c673cae 90 continue;
f67539c2 91 } else if (err == EAGAIN) {
7c673cae
FG
92 break;
93 }
f67539c2 94 return -err;
7c673cae
FG
95 }
96
97 sent += r;
98 if (len == sent) break;
99
100 while (r > 0) {
101 if (msg.msg_iov[0].iov_len <= (size_t)r) {
102 // drain this whole item
103 r -= msg.msg_iov[0].iov_len;
104 msg.msg_iov++;
105 msg.msg_iovlen--;
106 } else {
107 msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
108 msg.msg_iov[0].iov_len -= r;
109 break;
110 }
111 }
112 }
7c673cae
FG
113 return (ssize_t)sent;
114 }
115
f67539c2 116 ssize_t send(ceph::buffer::list &bl, bool more) override {
7c673cae 117 size_t sent_bytes = 0;
11fdf7f2 118 auto pb = std::cbegin(bl.buffers());
9f95a23c 119 uint64_t left_pbrs = bl.get_num_buffers();
7c673cae
FG
120 while (left_pbrs) {
121 struct msghdr msg;
122 struct iovec msgvec[IOV_MAX];
11fdf7f2 123 uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX);
7c673cae 124 left_pbrs -= size;
92f5a8d4 125 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae 126 memset(&msg, 0, sizeof(msg));
11fdf7f2 127 msg.msg_iovlen = size;
7c673cae
FG
128 msg.msg_iov = msgvec;
129 unsigned msglen = 0;
11fdf7f2
TL
130 for (auto iov = msgvec; iov != msgvec + size; iov++) {
131 iov->iov_base = (void*)(pb->c_str());
132 iov->iov_len = pb->length();
133 msglen += pb->length();
134 ++pb;
7c673cae 135 }
7c673cae
FG
136 ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
137 if (r < 0)
138 return r;
139
140 // "r" is the remaining length
141 sent_bytes += r;
142 if (static_cast<unsigned>(r) < msglen)
143 break;
144 // only "r" == 0 continue
145 }
146
147 if (sent_bytes) {
f67539c2 148 ceph::buffer::list swapped;
7c673cae
FG
149 if (sent_bytes < bl.length()) {
150 bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
151 bl.swap(swapped);
152 } else {
153 bl.clear();
154 }
155 }
156
157 return static_cast<ssize_t>(sent_bytes);
158 }
f67539c2
TL
159 #else
160 ssize_t send(bufferlist &bl, bool more) override
161 {
162 size_t total_sent_bytes = 0;
163 auto pb = std::cbegin(bl.buffers());
164 uint64_t left_pbrs = bl.get_num_buffers();
165 while (left_pbrs) {
166 WSABUF msgvec[IOV_MAX];
167 uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX);
168 left_pbrs -= size;
169 unsigned msglen = 0;
170 for (auto iov = msgvec; iov != msgvec + size; iov++) {
171 iov->buf = const_cast<char*>(pb->c_str());
172 iov->len = pb->length();
173 msglen += pb->length();
174 ++pb;
175 }
176 DWORD sent_bytes = 0;
177 DWORD flags = 0;
178 if (more)
179 flags |= MSG_PARTIAL;
180
181 int ret_val = WSASend(_fd, msgvec, size, &sent_bytes, flags, NULL, NULL);
182 if (ret_val)
183 return -ret_val;
184
185 total_sent_bytes += sent_bytes;
186 if (static_cast<unsigned>(sent_bytes) < msglen)
187 break;
188 }
189
190 if (total_sent_bytes) {
191 bufferlist swapped;
192 if (total_sent_bytes < bl.length()) {
193 bl.splice(total_sent_bytes, bl.length()-total_sent_bytes, &swapped);
194 bl.swap(swapped);
195 } else {
196 bl.clear();
197 }
198 }
199
200 return static_cast<ssize_t>(total_sent_bytes);
201 }
202 #endif
7c673cae
FG
203 void shutdown() override {
204 ::shutdown(_fd, SHUT_RDWR);
205 }
206 void close() override {
f67539c2 207 compat_closesocket(_fd);
7c673cae 208 }
1e59de90
TL
209 void set_priority(int sd, int prio, int domain) override {
210 handler.set_priority(sd, prio, domain);
211 }
7c673cae
FG
212 int fd() const override {
213 return _fd;
214 }
215 friend class PosixServerSocketImpl;
216 friend class PosixNetworkStack;
217};
218
219class PosixServerSocketImpl : public ServerSocketImpl {
f67539c2 220 ceph::NetHandler &handler;
7c673cae
FG
221 int _fd;
222
223 public:
f67539c2 224 explicit PosixServerSocketImpl(ceph::NetHandler &h, int f,
11fdf7f2
TL
225 const entity_addr_t& listen_addr, unsigned slot)
226 : ServerSocketImpl(listen_addr.get_type(), slot),
227 handler(h), _fd(f) {}
7c673cae
FG
228 int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
229 void abort_accept() override {
230 ::close(_fd);
9f95a23c 231 _fd = -1;
7c673cae
FG
232 }
233 int fd() const override {
234 return _fd;
235 }
236};
237
238int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
11fdf7f2 239 ceph_assert(sock);
7c673cae
FG
240 sockaddr_storage ss;
241 socklen_t slen = sizeof(ss);
91327a77 242 int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
7c673cae 243 if (sd < 0) {
f67539c2 244 return -ceph_sock_errno();
7c673cae
FG
245 }
246
7c673cae
FG
247 int r = handler.set_nonblock(sd);
248 if (r < 0) {
249 ::close(sd);
f67539c2 250 return -ceph_sock_errno();
7c673cae
FG
251 }
252
253 r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
254 if (r < 0) {
255 ::close(sd);
f67539c2 256 return -ceph_sock_errno();
7c673cae
FG
257 }
258
11fdf7f2 259 ceph_assert(NULL != out); //out should not be NULL in accept connection
7c673cae 260
11fdf7f2 261 out->set_type(addr_type);
7c673cae
FG
262 out->set_sockaddr((sockaddr*)&ss);
263 handler.set_priority(sd, opt.priority, out->get_family());
264
265 std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
266 *sock = ConnectedSocket(std::move(csi));
267 return 0;
268}
269
270void PosixWorker::initialize()
271{
272}
273
11fdf7f2
TL
274int PosixWorker::listen(entity_addr_t &sa,
275 unsigned addr_slot,
276 const SocketOptions &opt,
7c673cae
FG
277 ServerSocket *sock)
278{
279 int listen_sd = net.create_socket(sa.get_family(), true);
280 if (listen_sd < 0) {
f67539c2 281 return -ceph_sock_errno();
7c673cae
FG
282 }
283
284 int r = net.set_nonblock(listen_sd);
285 if (r < 0) {
286 ::close(listen_sd);
f67539c2 287 return -ceph_sock_errno();
7c673cae
FG
288 }
289
7c673cae
FG
290 r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
291 if (r < 0) {
292 ::close(listen_sd);
f67539c2 293 return -ceph_sock_errno();
7c673cae
FG
294 }
295
296 r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
297 if (r < 0) {
f67539c2 298 r = -ceph_sock_errno();
7c673cae
FG
299 ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
300 << ": " << cpp_strerror(r) << dendl;
301 ::close(listen_sd);
302 return r;
303 }
304
224ce89b 305 r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
7c673cae 306 if (r < 0) {
f67539c2 307 r = -ceph_sock_errno();
7c673cae
FG
308 lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
309 ::close(listen_sd);
310 return r;
311 }
312
313 *sock = ServerSocket(
314 std::unique_ptr<PosixServerSocketImpl>(
11fdf7f2 315 new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
7c673cae
FG
316 return 0;
317}
318
319int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
320 int sd;
321
322 if (opts.nonblock) {
323 sd = net.nonblock_connect(addr, opts.connect_bind_addr);
324 } else {
325 sd = net.connect(addr, opts.connect_bind_addr);
326 }
327
328 if (sd < 0) {
f67539c2 329 return -ceph_sock_errno();
7c673cae
FG
330 }
331
332 net.set_priority(sd, opts.priority, addr.get_family());
333 *socket = ConnectedSocket(
334 std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
335 return 0;
336}
337
f67539c2
TL
338PosixNetworkStack::PosixNetworkStack(CephContext *c)
339 : NetworkStack(c)
7c673cae 340{
7c673cae 341}