]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/PosixStack.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / msg / async / PosixStack.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
17#include <sys/socket.h>
18#include <netinet/tcp.h>
19#include <netinet/in.h>
20#include <arpa/inet.h>
21#include <errno.h>
22
23#include <algorithm>
24
25#include "PosixStack.h"
26
27#include "include/buffer.h"
28#include "include/str_list.h"
29#include "include/sock_compat.h"
30#include "common/errno.h"
31#include "common/strtol.h"
32#include "common/dout.h"
33#include "include/assert.h"
34#include "common/simple_spin.h"
35
36#define dout_subsys ceph_subsys_ms
37#undef dout_prefix
38#define dout_prefix *_dout << "PosixStack "
39
40class PosixConnectedSocketImpl final : public ConnectedSocketImpl {
41 NetHandler &handler;
42 int _fd;
43 entity_addr_t sa;
44 bool connected;
45#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
46 sigset_t sigpipe_mask;
47 bool sigpipe_pending;
48 bool sigpipe_unblock;
49#endif
50
51 public:
52 explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected)
53 : handler(h), _fd(f), sa(sa), connected(connected) {}
54
55 int is_connected() override {
56 if (connected)
57 return 1;
58
59 int r = handler.reconnect(sa, _fd);
60 if (r == 0) {
61 connected = true;
62 return 1;
63 } else if (r < 0) {
64 return r;
65 } else {
66 return 0;
67 }
68 }
69
70 ssize_t zero_copy_read(bufferptr&) override {
71 return -EOPNOTSUPP;
72 }
73
74 ssize_t read(char *buf, size_t len) override {
75 ssize_t r = ::read(_fd, buf, len);
76 if (r < 0)
77 r = -errno;
78 return r;
79 }
80
81 /*
82 SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
83 http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html
84 http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
85 */
86 static void suppress_sigpipe()
87 {
88 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
89 /*
90 We want to ignore possible SIGPIPE that we can generate on write.
91 SIGPIPE is delivered *synchronously* and *only* to the thread
92 doing the write. So if it is reported as already pending (which
93 means the thread blocks it), then we do nothing: if we generate
94 SIGPIPE, it will be merged with the pending one (there's no
95 queuing), and that suits us well. If it is not pending, we block
96 it in this thread (and we avoid changing signal action, because it
97 is per-process).
98 */
99 sigset_t pending;
100 sigemptyset(&pending);
101 sigpending(&pending);
102 sigpipe_pending = sigismember(&pending, SIGPIPE);
103 if (!sigpipe_pending) {
104 sigset_t blocked;
105 sigemptyset(&blocked);
106 pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked);
107
108 /* Maybe is was blocked already? */
109 sigpipe_unblock = ! sigismember(&blocked, SIGPIPE);
110 }
111 #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
112 }
113
114 static void restore_sigpipe()
115 {
116 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
117 /*
118 If SIGPIPE was pending already we do nothing. Otherwise, if it
119 become pending (i.e., we generated it), then we sigwait() it (thus
120 clearing pending status). Then we unblock SIGPIPE, but only if it
121 were us who blocked it.
122 */
123 if (!sigpipe_pending) {
124 sigset_t pending;
125 sigemptyset(&pending);
126 sigpending(&pending);
127 if (sigismember(&pending, SIGPIPE)) {
128 /*
129 Protect ourselves from a situation when SIGPIPE was sent
130 by the user to the whole process, and was delivered to
131 other thread before we had a chance to wait for it.
132 */
133 static const struct timespec nowait = { 0, 0 };
134 TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait));
135 }
136
137 if (sigpipe_unblock)
138 pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL);
139 }
140 #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
141 }
142
143 // return the sent length
144 // < 0 means error occured
145 static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more)
146 {
147 suppress_sigpipe();
148
149 size_t sent = 0;
150 while (1) {
151 ssize_t r;
152 #if defined(MSG_NOSIGNAL)
153 r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0));
154 #else
155 r = ::sendmsg(fd, &msg, (more ? MSG_MORE : 0));
156 #endif /* defined(MSG_NOSIGNAL) */
157
158 if (r < 0) {
159 if (errno == EINTR) {
160 continue;
161 } else if (errno == EAGAIN) {
162 break;
163 }
164 return -errno;
165 }
166
167 sent += r;
168 if (len == sent) break;
169
170 while (r > 0) {
171 if (msg.msg_iov[0].iov_len <= (size_t)r) {
172 // drain this whole item
173 r -= msg.msg_iov[0].iov_len;
174 msg.msg_iov++;
175 msg.msg_iovlen--;
176 } else {
177 msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r;
178 msg.msg_iov[0].iov_len -= r;
179 break;
180 }
181 }
182 }
183 restore_sigpipe();
184 return (ssize_t)sent;
185 }
186
187 ssize_t send(bufferlist &bl, bool more) override {
188 size_t sent_bytes = 0;
189 std::list<bufferptr>::const_iterator pb = bl.buffers().begin();
190 uint64_t left_pbrs = bl.buffers().size();
191 while (left_pbrs) {
192 struct msghdr msg;
193 struct iovec msgvec[IOV_MAX];
194 uint64_t size = MIN(left_pbrs, IOV_MAX);
195 left_pbrs -= size;
196 memset(&msg, 0, sizeof(msg));
197 msg.msg_iovlen = 0;
198 msg.msg_iov = msgvec;
199 unsigned msglen = 0;
200 while (size > 0) {
201 msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
202 msgvec[msg.msg_iovlen].iov_len = pb->length();
203 msg.msg_iovlen++;
204 msglen += pb->length();
205 ++pb;
206 size--;
207 }
208
209 ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more);
210 if (r < 0)
211 return r;
212
213 // "r" is the remaining length
214 sent_bytes += r;
215 if (static_cast<unsigned>(r) < msglen)
216 break;
217 // only "r" == 0 continue
218 }
219
220 if (sent_bytes) {
221 bufferlist swapped;
222 if (sent_bytes < bl.length()) {
223 bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped);
224 bl.swap(swapped);
225 } else {
226 bl.clear();
227 }
228 }
229
230 return static_cast<ssize_t>(sent_bytes);
231 }
232 void shutdown() override {
233 ::shutdown(_fd, SHUT_RDWR);
234 }
235 void close() override {
236 ::close(_fd);
237 }
238 int fd() const override {
239 return _fd;
240 }
241 friend class PosixServerSocketImpl;
242 friend class PosixNetworkStack;
243};
244
245class PosixServerSocketImpl : public ServerSocketImpl {
246 NetHandler &handler;
247 int _fd;
248
249 public:
250 explicit PosixServerSocketImpl(NetHandler &h, int f): handler(h), _fd(f) {}
251 int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
252 void abort_accept() override {
253 ::close(_fd);
254 }
255 int fd() const override {
256 return _fd;
257 }
258};
259
260int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
261 assert(sock);
262 sockaddr_storage ss;
263 socklen_t slen = sizeof(ss);
264 int sd = ::accept(_fd, (sockaddr*)&ss, &slen);
265 if (sd < 0) {
266 return -errno;
267 }
268
269 handler.set_close_on_exec(sd);
270 int r = handler.set_nonblock(sd);
271 if (r < 0) {
272 ::close(sd);
273 return -errno;
274 }
275
276 r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size);
277 if (r < 0) {
278 ::close(sd);
279 return -errno;
280 }
281
282 assert(NULL != out); //out should not be NULL in accept connection
283
284 out->set_sockaddr((sockaddr*)&ss);
285 handler.set_priority(sd, opt.priority, out->get_family());
286
287 std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true));
288 *sock = ConnectedSocket(std::move(csi));
289 return 0;
290}
291
292void PosixWorker::initialize()
293{
294}
295
296int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt,
297 ServerSocket *sock)
298{
299 int listen_sd = net.create_socket(sa.get_family(), true);
300 if (listen_sd < 0) {
301 return -errno;
302 }
303
304 int r = net.set_nonblock(listen_sd);
305 if (r < 0) {
306 ::close(listen_sd);
307 return -errno;
308 }
309
310 net.set_close_on_exec(listen_sd);
311 r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size);
312 if (r < 0) {
313 ::close(listen_sd);
314 return -errno;
315 }
316
317 r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len());
318 if (r < 0) {
319 r = -errno;
320 ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr()
321 << ": " << cpp_strerror(r) << dendl;
322 ::close(listen_sd);
323 return r;
324 }
325
326 r = ::listen(listen_sd, 128);
327 if (r < 0) {
328 r = -errno;
329 lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl;
330 ::close(listen_sd);
331 return r;
332 }
333
334 *sock = ServerSocket(
335 std::unique_ptr<PosixServerSocketImpl>(
336 new PosixServerSocketImpl(net, listen_sd)));
337 return 0;
338}
339
340int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) {
341 int sd;
342
343 if (opts.nonblock) {
344 sd = net.nonblock_connect(addr, opts.connect_bind_addr);
345 } else {
346 sd = net.connect(addr, opts.connect_bind_addr);
347 }
348
349 if (sd < 0) {
350 return -errno;
351 }
352
353 net.set_priority(sd, opts.priority, addr.get_family());
354 *socket = ConnectedSocket(
355 std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock)));
356 return 0;
357}
358
359PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t)
360 : NetworkStack(c, t)
361{
362 vector<string> corestrs;
363 get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs);
364 for (auto & corestr : corestrs) {
365 string err;
366 int coreid = strict_strtol(corestr.c_str(), 10, &err);
367 if (err == "")
368 coreids.push_back(coreid);
369 else
370 lderr(cct) << __func__ << " failed to parse " << corestr << " in " << cct->_conf->ms_async_affinity_cores << dendl;
371 }
372}