]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/PosixStack.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / msg / async / PosixStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <sys/socket.h>
18 #include <netinet/tcp.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21 #include <errno.h>
22
23 #include <algorithm>
24
25 #include "PosixStack.h"
26
27 #include "include/buffer.h"
28 #include "include/str_list.h"
29 #include "common/errno.h"
30 #include "common/strtol.h"
31 #include "common/dout.h"
32 #include "msg/Messenger.h"
33 #include "include/compat.h"
34 #include "include/sock_compat.h"
35
36 #define dout_subsys ceph_subsys_ms
37 #undef dout_prefix
38 #define dout_prefix *_dout << "PosixStack "
39
40 class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
41 NetHandler &handler;
42 int _fd;
43 entity_addr_t sa;
44 bool connected;
45
46 public:
47 explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
48 : handler(h), _fd(f), sa(sa), connected(connected) {}
49
50 int is_connected() override {
51 if (connected)
52 return 1;
53
54 int r = handler.reconnect(sa, _fd);
55 if (r == 0) {
56 connected = true;
57 return 1;
58 } else if (r < 0) {
59 return r;
60 } else {
61 return 0;
62 }
63 }
64
65 ssize_t read(char *buf, size_t len) override {
66 ssize_t r = ::read(_fd, buf, len);
67 if (r < 0)
68 r = -errno;
69 return r;
70 }
71
72 // return the sent length
73 // < 0 means error occurred
74 static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
75 {
76 size_t sent = 0;
77 while (1) {
78 MSGR_SIGPIPE_STOPPER;
79 ssize_t r;
80 r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
81 if (r < 0) {
82 if (errno == EINTR) {
83 continue;
84 } else if (errno == EAGAIN) {
85 break;
86 }
87 return -errno;
88 }
89
90 sent += r;
91 if (len == sent) break;
92
93 while (r > 0) {
94 if (msg.msg_iov[0].iov_len <= (size_t)r) {
95 // drain this whole item
96 r -= msg.msg_iov[0].iov_len;
97 msg.msg_iov++;
98 msg.msg_iovlen--;
99 } else {
100 msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
101 msg.msg_iov[0].iov_len -= r;
102 break;
103 }
104 }
105 }
106 return (ssize_t)sent;
107 }
108
109 ssize_t send(bufferlist &bl, bool more) override {
110 size_t sent_bytes = 0;
111 auto pb = std::cbegin(bl.buffers());
112 uint64_t left_pbrs = bl.get_num_buffers();
113 while (left_pbrs) {
114 struct msghdr msg;
115 struct iovec msgvec[IOV_MAX];
116 uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX);
117 left_pbrs -= size;
118 // FIPS zeroization audit 20191115: this memset is not security related.
119 memset(&msg, 0, sizeof(msg));
120 msg.msg_iovlen = size;
121 msg.msg_iov = msgvec;
122 unsigned msglen = 0;
123 for (auto iov = msgvec; iov != msgvec + size; iov++) {
124 iov->iov_base = (void*)(pb->c_str());
125 iov->iov_len = pb->length();
126 msglen += pb->length();
127 ++pb;
128 }
129 ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
130 if (r < 0)
131 return r;
132
133 // "r" is the remaining length
134 sent_bytes += r;
135 if (static_cast<unsigned>(r) < msglen)
136 break;
137 // only "r" == 0 continue
138 }
139
140 if (sent_bytes) {
141 bufferlist swapped;
142 if (sent_bytes < bl.length()) {
143 bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
144 bl.swap(swapped);
145 } else {
146 bl.clear();
147 }
148 }
149
150 return static_cast<ssize_t>(sent_bytes);
151 }
152 void shutdown() override {
153 ::shutdown(_fd, SHUT_RDWR);
154 }
155 void close() override {
156 ::close(_fd);
157 }
158 int fd() const override {
159 return _fd;
160 }
161 friend class PosixServerSocketImpl;
162 friend class PosixNetworkStack;
163 };
164
165 class PosixServerSocketImpl : public ServerSocketImpl {
166 NetHandler &handler;
167 int _fd;
168
169 public:
170 explicit PosixServerSocketImpl(NetHandler &h, int f,
171 const entity_addr_t& listen_addr, unsigned slot)
172 : ServerSocketImpl(listen_addr.get_type(), slot),
173 handler(h), _fd(f) {}
174 int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
175 void abort_accept() override {
176 ::close(_fd);
177 _fd = -1;
178 }
179 int fd() const override {
180 return _fd;
181 }
182 };
183
184 int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
185 ceph_assert(sock);
186 sockaddr_storage ss;
187 socklen_t slen = sizeof(ss);
188 int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen);
189 if (sd < 0) {
190 return -errno;
191 }
192
193 int r = handler.set_nonblock(sd);
194 if (r < 0) {
195 ::close(sd);
196 return -errno;
197 }
198
199 r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
200 if (r < 0) {
201 ::close(sd);
202 return -errno;
203 }
204
205 ceph_assert(NULL != out); //out should not be NULL in accept connection
206
207 out->set_type(addr_type);
208 out->set_sockaddr((sockaddr*)&ss);
209 handler.set_priority(sd, opt.priority, out->get_family());
210
211 std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
212 *sock = ConnectedSocket(std::move(csi));
213 return 0;
214 }
215
216 void PosixWorker::initialize()
217 {
218 }
219
220 int PosixWorker::listen(entity_addr_t &sa,
221 unsigned addr_slot,
222 const SocketOptions &opt,
223 ServerSocket *sock)
224 {
225 int listen_sd = net.create_socket(sa.get_family(), true);
226 if (listen_sd < 0) {
227 return -errno;
228 }
229
230 int r = net.set_nonblock(listen_sd);
231 if (r < 0) {
232 ::close(listen_sd);
233 return -errno;
234 }
235
236 r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
237 if (r < 0) {
238 ::close(listen_sd);
239 return -errno;
240 }
241
242 r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
243 if (r < 0) {
244 r = -errno;
245 ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
246 << ": " << cpp_strerror(r) << dendl;
247 ::close(listen_sd);
248 return r;
249 }
250
251 r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog);
252 if (r < 0) {
253 r = -errno;
254 lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
255 ::close(listen_sd);
256 return r;
257 }
258
259 *sock = ServerSocket(
260 std::unique_ptr<PosixServerSocketImpl>(
261 new PosixServerSocketImpl(net, listen_sd, sa, addr_slot)));
262 return 0;
263 }
264
265 int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
266 int sd;
267
268 if (opts.nonblock) {
269 sd = net.nonblock_connect(addr, opts.connect_bind_addr);
270 } else {
271 sd = net.connect(addr, opts.connect_bind_addr);
272 }
273
274 if (sd < 0) {
275 return -errno;
276 }
277
278 net.set_priority(sd, opts.priority, addr.get_family());
279 *socket = ConnectedSocket(
280 std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
281 return 0;
282 }
283
284 PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
285 : NetworkStack(c, t)
286 {
287 }