1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include <sys/socket.h>
18 #include <netinet/tcp.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
25 #include "PosixStack.h"
27 #include "include/buffer.h"
28 #include "include/str_list.h"
29 #include "include/sock_compat.h"
30 #include "common/errno.h"
31 #include "common/strtol.h"
32 #include "common/dout.h"
33 #include "common/simple_spin.h"
35 #define dout_subsys ceph_subsys_ms
37 #define dout_prefix *_dout << "PosixStack "
39 class PosixConnectedSocketImpl final
: public ConnectedSocketImpl
{
44 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
45 sigset_t sigpipe_mask
;
51 explicit PosixConnectedSocketImpl(NetHandler
&h
, const entity_addr_t
&sa
, int f
, bool connected
)
52 : handler(h
), _fd(f
), sa(sa
), connected(connected
) {}
54 int is_connected() override
{
58 int r
= handler
.reconnect(sa
, _fd
);
69 ssize_t
zero_copy_read(bufferptr
&) override
{
73 ssize_t
read(char *buf
, size_t len
) override
{
74 ssize_t r
= ::read(_fd
, buf
, len
);
81 SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL
82 http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html
83 http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html
85 static void suppress_sigpipe()
87 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
89 We want to ignore possible SIGPIPE that we can generate on write.
90 SIGPIPE is delivered *synchronously* and *only* to the thread
91 doing the write. So if it is reported as already pending (which
92 means the thread blocks it), then we do nothing: if we generate
93 SIGPIPE, it will be merged with the pending one (there's no
94 queuing), and that suits us well. If it is not pending, we block
95 it in this thread (and we avoid changing signal action, because it
99 sigemptyset(&pending
);
100 sigpending(&pending
);
101 sigpipe_pending
= sigismember(&pending
, SIGPIPE
);
102 if (!sigpipe_pending
) {
104 sigemptyset(&blocked
);
105 pthread_sigmask(SIG_BLOCK
, &sigpipe_mask
, &blocked
);
107 /* Maybe is was blocked already? */
108 sigpipe_unblock
= ! sigismember(&blocked
, SIGPIPE
);
110 #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
113 static void restore_sigpipe()
115 #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE)
117 If SIGPIPE was pending already we do nothing. Otherwise, if it
118 become pending (i.e., we generated it), then we sigwait() it (thus
119 clearing pending status). Then we unblock SIGPIPE, but only if it
120 were us who blocked it.
122 if (!sigpipe_pending
) {
124 sigemptyset(&pending
);
125 sigpending(&pending
);
126 if (sigismember(&pending
, SIGPIPE
)) {
128 Protect ourselves from a situation when SIGPIPE was sent
129 by the user to the whole process, and was delivered to
130 other thread before we had a chance to wait for it.
132 static const struct timespec nowait
= { 0, 0 };
133 TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask
, NULL
, &nowait
));
137 pthread_sigmask(SIG_UNBLOCK
, &sigpipe_mask
, NULL
);
139 #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */
142 // return the sent length
143 // < 0 means error occured
144 static ssize_t
do_sendmsg(int fd
, struct msghdr
&msg
, unsigned len
, bool more
)
151 #if defined(MSG_NOSIGNAL)
152 r
= ::sendmsg(fd
, &msg
, MSG_NOSIGNAL
| (more
? MSG_MORE
: 0));
154 r
= ::sendmsg(fd
, &msg
, (more
? MSG_MORE
: 0));
155 #endif /* defined(MSG_NOSIGNAL) */
158 if (errno
== EINTR
) {
160 } else if (errno
== EAGAIN
) {
167 if (len
== sent
) break;
170 if (msg
.msg_iov
[0].iov_len
<= (size_t)r
) {
171 // drain this whole item
172 r
-= msg
.msg_iov
[0].iov_len
;
176 msg
.msg_iov
[0].iov_base
= (char *)msg
.msg_iov
[0].iov_base
+ r
;
177 msg
.msg_iov
[0].iov_len
-= r
;
183 return (ssize_t
)sent
;
186 ssize_t
send(bufferlist
&bl
, bool more
) override
{
187 size_t sent_bytes
= 0;
188 std::list
<bufferptr
>::const_iterator pb
= bl
.buffers().begin();
189 uint64_t left_pbrs
= bl
.buffers().size();
192 struct iovec msgvec
[IOV_MAX
];
193 uint64_t size
= MIN(left_pbrs
, IOV_MAX
);
195 memset(&msg
, 0, sizeof(msg
));
197 msg
.msg_iov
= msgvec
;
200 msgvec
[msg
.msg_iovlen
].iov_base
= (void*)(pb
->c_str());
201 msgvec
[msg
.msg_iovlen
].iov_len
= pb
->length();
203 msglen
+= pb
->length();
208 ssize_t r
= do_sendmsg(_fd
, msg
, msglen
, left_pbrs
|| more
);
212 // "r" is the remaining length
214 if (static_cast<unsigned>(r
) < msglen
)
216 // only "r" == 0 continue
221 if (sent_bytes
< bl
.length()) {
222 bl
.splice(sent_bytes
, bl
.length()-sent_bytes
, &swapped
);
229 return static_cast<ssize_t
>(sent_bytes
);
231 void shutdown() override
{
232 ::shutdown(_fd
, SHUT_RDWR
);
234 void close() override
{
237 int fd() const override
{
240 friend class PosixServerSocketImpl
;
241 friend class PosixNetworkStack
;
244 class PosixServerSocketImpl
: public ServerSocketImpl
{
249 explicit PosixServerSocketImpl(NetHandler
&h
, int f
): handler(h
), _fd(f
) {}
250 int accept(ConnectedSocket
*sock
, const SocketOptions
&opts
, entity_addr_t
*out
, Worker
*w
) override
;
251 void abort_accept() override
{
254 int fd() const override
{
259 int PosixServerSocketImpl::accept(ConnectedSocket
*sock
, const SocketOptions
&opt
, entity_addr_t
*out
, Worker
*w
) {
262 socklen_t slen
= sizeof(ss
);
263 int sd
= ::accept(_fd
, (sockaddr
*)&ss
, &slen
);
268 handler
.set_close_on_exec(sd
);
269 int r
= handler
.set_nonblock(sd
);
275 r
= handler
.set_socket_options(sd
, opt
.nodelay
, opt
.rcbuf_size
);
281 assert(NULL
!= out
); //out should not be NULL in accept connection
283 out
->set_sockaddr((sockaddr
*)&ss
);
284 handler
.set_priority(sd
, opt
.priority
, out
->get_family());
286 std::unique_ptr
<PosixConnectedSocketImpl
> csi(new PosixConnectedSocketImpl(handler
, *out
, sd
, true));
287 *sock
= ConnectedSocket(std::move(csi
));
291 void PosixWorker::initialize()
295 int PosixWorker::listen(entity_addr_t
&sa
, const SocketOptions
&opt
,
298 int listen_sd
= net
.create_socket(sa
.get_family(), true);
303 int r
= net
.set_nonblock(listen_sd
);
309 net
.set_close_on_exec(listen_sd
);
310 r
= net
.set_socket_options(listen_sd
, opt
.nodelay
, opt
.rcbuf_size
);
316 r
= ::bind(listen_sd
, sa
.get_sockaddr(), sa
.get_sockaddr_len());
319 ldout(cct
, 10) << __func__
<< " unable to bind to " << sa
.get_sockaddr()
320 << ": " << cpp_strerror(r
) << dendl
;
325 r
= ::listen(listen_sd
, cct
->_conf
->ms_tcp_listen_backlog
);
328 lderr(cct
) << __func__
<< " unable to listen on " << sa
<< ": " << cpp_strerror(r
) << dendl
;
333 *sock
= ServerSocket(
334 std::unique_ptr
<PosixServerSocketImpl
>(
335 new PosixServerSocketImpl(net
, listen_sd
)));
339 int PosixWorker::connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
) {
343 sd
= net
.nonblock_connect(addr
, opts
.connect_bind_addr
);
345 sd
= net
.connect(addr
, opts
.connect_bind_addr
);
352 net
.set_priority(sd
, opts
.priority
, addr
.get_family());
353 *socket
= ConnectedSocket(
354 std::unique_ptr
<PosixConnectedSocketImpl
>(new PosixConnectedSocketImpl(net
, addr
, sd
, !opts
.nonblock
)));
358 PosixNetworkStack::PosixNetworkStack(CephContext
*c
, const string
&t
)
361 vector
<string
> corestrs
;
362 get_str_vec(cct
->_conf
->ms_async_affinity_cores
, corestrs
);
363 for (auto & corestr
: corestrs
) {
365 int coreid
= strict_strtol(corestr
.c_str(), 10, &err
);
367 coreids
.push_back(coreid
);
369 lderr(cct
) << __func__
<< " failed to parse " << corestr
<< " in " << cct
->_conf
->ms_async_affinity_cores
<< dendl
;