]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/PosixStack.cc
cf52db9a775cb73e83bbcbc94090a080a123747a
[ceph.git] / ceph / src / msg / async / PosixStack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17 #include <sys/socket.h>
18 #include <netinet/tcp.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21 #include <errno.h>
22
23 #include <algorithm>
24
25 #include "PosixStack.h"
26
27 #include "include/buffer.h"
28 #include "include/str_list.h"
29 #include "include/sock_compat.h"
30 #include "common/errno.h"
31 #include "common/strtol.h"
32 #include "common/dout.h"
33 #include "common/simple_spin.h"
34
35 #define dout_subsys ceph_subsys_ms
36 #undef dout_prefix
37 #define dout_prefix *_dout << "PosixStack "
38
39 class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
40 NetHandler &handler;
41 int _fd;
42 entity_addr_t sa;
43 bool connected;
44 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
45 sigset_t sigpipe_mask;
46 bool sigpipe_pending;
47 bool sigpipe_unblock;
48 #endif
49
50 public:
51 explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
52 : handler(h), _fd(f), sa(sa), connected(connected) {}
53
54 int is_connected() override {
55 if (connected)
56 return 1;
57
58 int r = handler.reconnect(sa, _fd);
59 if (r == 0) {
60 connected = true;
61 return 1;
62 } else if (r < 0) {
63 return r;
64 } else {
65 return 0;
66 }
67 }
68
69 ssize_t zero_copy_read(bufferptr&) override {
70 return -EOPNOTSUPP;
71 }
72
73 ssize_t read(char *buf, size_t len) override {
74 ssize_t r = ::read(_fd, buf, len);
75 if (r < 0)
76 r = -errno;
77 return r;
78 }
79
80 /*
81 SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
82 http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html
83 http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
84 */
85 static void suppress_sigpipe()
86 {
87 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
88 /*
89 We want to ignore possible SIGPIPE that we can generate on write.
90 SIGPIPE is delivered *synchronously* and *only* to the thread
91 doing the write. So if it is reported as already pending (which
92 means the thread blocks it), then we do nothing: if we generate
93 SIGPIPE, it will be merged with the pending one (there's no
94 queuing), and that suits us well. If it is not pending, we block
95 it in this thread (and we avoid changing signal action, because it
96 is per-process).
97 */
98 sigset_t pending;
99 sigemptyset(&pending);
100 sigpending(&pending);
101 sigpipe_pending = sigismember(&pending, SIGPIPE);
102 if (!sigpipe_pending) {
103 sigset_t blocked;
104 sigemptyset(&blocked);
105 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);
106
107 /* Maybe is was blocked already? */
108 sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
109 }
110 #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
111 }
112
113 static void restore_sigpipe()
114 {
115 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
116 /*
117 If SIGPIPE was pending already we do nothing. Otherwise, if it
118 become pending (i.e., we generated it), then we sigwait() it (thus
119 clearing pending status). Then we unblock SIGPIPE, but only if it
120 were us who blocked it.
121 */
122 if (!sigpipe_pending) {
123 sigset_t pending;
124 sigemptyset(&pending);
125 sigpending(&pending);
126 if (sigismember(&pending, SIGPIPE)) {
127 /*
128 Protect ourselves from a situation when SIGPIPE was sent
129 by the user to the whole process, and was delivered to
130 other thread before we had a chance to wait for it.
131 */
132 static const struct timespec nowait = { 0, 0 };
133 TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
134 }
135
136 if (sigpipe_unblock)
137 pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
138 }
139 #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
140 }
141
142 // return the sent length
143 // < 0 means error occured
144 static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
145 {
146 suppress_sigpipe();
147
148 size_t sent = 0;
149 while (1) {
150 ssize_t r;
151 #if defined(MSG_NOSIGNAL)
152 r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
153 #else
154 r = ::sendmsg(fd, &msg, (more ? MSG_MORE : 0));
155 #endif /* defined(MSG_NOSIGNAL) */
156
157 if (r < 0) {
158 if (errno == EINTR) {
159 continue;
160 } else if (errno == EAGAIN) {
161 break;
162 }
163 return -errno;
164 }
165
166 sent += r;
167 if (len == sent) break;
168
169 while (r > 0) {
170 if (msg.msg_iov[0].iov_len <= (size_t)r) {
171 // drain this whole item
172 r -= msg.msg_iov[0].iov_len;
173 msg.msg_iov++;
174 msg.msg_iovlen--;
175 } else {
176 msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
177 msg.msg_iov[0].iov_len -= r;
178 break;
179 }
180 }
181 }
182 restore_sigpipe();
183 return (ssize_t)sent;
184 }
185
186 ssize_t send(bufferlist &bl, bool more) override {
187 size_t sent_bytes = 0;
188 std::list<bufferptr>::const_iterator pb = bl.buffers().begin();
189 uint64_t left_pbrs = bl.buffers().size();
190 while (left_pbrs) {
191 struct msghdr msg;
192 struct iovec msgvec[IOV_MAX];
193 uint64_t size = MIN(left_pbrs, IOV_MAX);
194 left_pbrs -= size;
195 memset(&msg, 0, sizeof(msg));
196 msg.msg_iovlen = 0;
197 msg.msg_iov = msgvec;
198 unsigned msglen = 0;
199 while (size > 0) {
200 msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
201 msgvec[msg.msg_iovlen].iov_len = pb->length();
202 msg.msg_iovlen++;
203 msglen += pb->length();
204 ++pb;
205 size--;
206 }
207
208 ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
209 if (r < 0)
210 return r;
211
212 // "r" is the remaining length
213 sent_bytes += r;
214 if (static_cast<unsigned>(r) < msglen)
215 break;
216 // only "r" == 0 continue
217 }
218
219 if (sent_bytes) {
220 bufferlist swapped;
221 if (sent_bytes < bl.length()) {
222 bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
223 bl.swap(swapped);
224 } else {
225 bl.clear();
226 }
227 }
228
229 return static_cast<ssize_t>(sent_bytes);
230 }
231 void shutdown() override {
232 ::shutdown(_fd, SHUT_RDWR);
233 }
234 void close() override {
235 ::close(_fd);
236 }
237 int fd() const override {
238 return _fd;
239 }
240 friend class PosixServerSocketImpl;
241 friend class PosixNetworkStack;
242 };
243
244 class PosixServerSocketImpl : public ServerSocketImpl {
245 NetHandler &handler;
246 int _fd;
247
248 public:
249 explicit PosixServerSocketImpl(NetHandler &h, int f): handler(h), _fd(f) {}
250 int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
251 void abort_accept() override {
252 ::close(_fd);
253 }
254 int fd() const override {
255 return _fd;
256 }
257 };
258
259 int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
260 assert(sock);
261 sockaddr_storage ss;
262 socklen_t slen = sizeof(ss);
263 int sd = ::accept(_fd, (sockaddr*)&ss, &slen);
264 if (sd < 0) {
265 return -errno;
266 }
267
268 handler.set_close_on_exec(sd);
269 int r = handler.set_nonblock(sd);
270 if (r < 0) {
271 ::close(sd);
272 return -errno;
273 }
274
275 r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
276 if (r < 0) {
277 ::close(sd);
278 return -errno;
279 }
280
281 assert(NULL != out); //out should not be NULL in accept connection
282
283 out->set_sockaddr((sockaddr*)&ss);
284 handler.set_priority(sd, opt.priority, out->get_family());
285
286 std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
287 *sock = ConnectedSocket(std::move(csi));
288 return 0;
289 }
290
291 void PosixWorker::initialize()
292 {
293 }
294
295 int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
296 ServerSocket *sock)
297 {
298 int listen_sd = net.create_socket(sa.get_family(), true);
299 if (listen_sd < 0) {
300 return -errno;
301 }
302
303 int r = net.set_nonblock(listen_sd);
304 if (r < 0) {
305 ::close(listen_sd);
306 return -errno;
307 }
308
309 net.set_close_on_exec(listen_sd);
310 r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
311 if (r < 0) {
312 ::close(listen_sd);
313 return -errno;
314 }
315
316 r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
317 if (r < 0) {
318 r = -errno;
319 ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
320 << ": " << cpp_strerror(r) << dendl;
321 ::close(listen_sd);
322 return r;
323 }
324
325 r = ::listen(listen_sd, 128);
326 if (r < 0) {
327 r = -errno;
328 lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
329 ::close(listen_sd);
330 return r;
331 }
332
333 *sock = ServerSocket(
334 std::unique_ptr<PosixServerSocketImpl>(
335 new PosixServerSocketImpl(net, listen_sd)));
336 return 0;
337 }
338
339 int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
340 int sd;
341
342 if (opts.nonblock) {
343 sd = net.nonblock_connect(addr, opts.connect_bind_addr);
344 } else {
345 sd = net.connect(addr, opts.connect_bind_addr);
346 }
347
348 if (sd < 0) {
349 return -errno;
350 }
351
352 net.set_priority(sd, opts.priority, addr.get_family());
353 *socket = ConnectedSocket(
354 std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
355 return 0;
356 }
357
358 PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
359 : NetworkStack(c, t)
360 {
361 vector<string> corestrs;
362 get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs);
363 for (auto & corestr : corestrs) {
364 string err;
365 int coreid = strict_strtol(corestr.c_str(), 10, &err);
366 if (err == "")
367 coreids.push_back(coreid);
368 else
369 lderr(cct) << __func__ << " failed to parse " << corestr << " in " << cct->_conf->ms_async_affinity_cores << dendl;
370 }
371 }