1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include <sys/socket.h>
18 #include <netinet/tcp.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
25 #include "PosixStack.h"
27 #include "include/buffer.h"
28 #include "include/str_list.h"
29 #include "common/errno.h"
30 #include "common/strtol.h"
31 #include "common/dout.h"
32 #include "msg/Messenger.h"
33 #include "include/compat.h"
34 #include "include/sock_compat.h"
36 #define dout_subsys ceph_subsys_ms
38 #define dout_prefix *_dout << "PosixStack "
40 class PosixConnectedSocketImpl final
: public ConnectedSocketImpl
{
47 explicit PosixConnectedSocketImpl(NetHandler
&h
, const entity_addr_t
&sa
, int f
, bool connected
)
48 : handler(h
), _fd(f
), sa(sa
), connected(connected
) {}
50 int is_connected() override
{
54 int r
= handler
.reconnect(sa
, _fd
);
65 ssize_t
read(char *buf
, size_t len
) override
{
66 ssize_t r
= ::read(_fd
, buf
, len
);
72 // return the sent length
73 // < 0 means error occurred
74 static ssize_t
do_sendmsg(int fd
, struct msghdr
&msg
, unsigned len
, bool more
)
80 r
= ::sendmsg(fd
, &msg
, MSG_NOSIGNAL
| (more
? MSG_MORE
: 0));
84 } else if (errno
== EAGAIN
) {
91 if (len
== sent
) break;
94 if (msg
.msg_iov
[0].iov_len
<= (size_t)r
) {
95 // drain this whole item
96 r
-= msg
.msg_iov
[0].iov_len
;
100 msg
.msg_iov
[0].iov_base
= (char *)msg
.msg_iov
[0].iov_base
+ r
;
101 msg
.msg_iov
[0].iov_len
-= r
;
106 return (ssize_t
)sent
;
109 ssize_t
send(bufferlist
&bl
, bool more
) override
{
110 size_t sent_bytes
= 0;
111 auto pb
= std::cbegin(bl
.buffers());
112 uint64_t left_pbrs
= bl
.get_num_buffers();
115 struct iovec msgvec
[IOV_MAX
];
116 uint64_t size
= std::min
<uint64_t>(left_pbrs
, IOV_MAX
);
118 // FIPS zeroization audit 20191115: this memset is not security related.
119 memset(&msg
, 0, sizeof(msg
));
120 msg
.msg_iovlen
= size
;
121 msg
.msg_iov
= msgvec
;
123 for (auto iov
= msgvec
; iov
!= msgvec
+ size
; iov
++) {
124 iov
->iov_base
= (void*)(pb
->c_str());
125 iov
->iov_len
= pb
->length();
126 msglen
+= pb
->length();
129 ssize_t r
= do_sendmsg(_fd
, msg
, msglen
, left_pbrs
|| more
);
133 // "r" is the remaining length
135 if (static_cast<unsigned>(r
) < msglen
)
137 // only "r" == 0 continue
142 if (sent_bytes
< bl
.length()) {
143 bl
.splice(sent_bytes
, bl
.length()-sent_bytes
, &swapped
);
150 return static_cast<ssize_t
>(sent_bytes
);
152 void shutdown() override
{
153 ::shutdown(_fd
, SHUT_RDWR
);
155 void close() override
{
158 int fd() const override
{
161 friend class PosixServerSocketImpl
;
162 friend class PosixNetworkStack
;
165 class PosixServerSocketImpl
: public ServerSocketImpl
{
170 explicit PosixServerSocketImpl(NetHandler
&h
, int f
,
171 const entity_addr_t
& listen_addr
, unsigned slot
)
172 : ServerSocketImpl(listen_addr
.get_type(), slot
),
173 handler(h
), _fd(f
) {}
174 int accept(ConnectedSocket
*sock
, const SocketOptions
&opts
, entity_addr_t
*out
, Worker
*w
) override
;
175 void abort_accept() override
{
179 int fd() const override
{
184 int PosixServerSocketImpl::accept(ConnectedSocket
*sock
, const SocketOptions
&opt
, entity_addr_t
*out
, Worker
*w
) {
187 socklen_t slen
= sizeof(ss
);
188 int sd
= accept_cloexec(_fd
, (sockaddr
*)&ss
, &slen
);
193 int r
= handler
.set_nonblock(sd
);
199 r
= handler
.set_socket_options(sd
, opt
.nodelay
, opt
.rcbuf_size
);
205 ceph_assert(NULL
!= out
); //out should not be NULL in accept connection
207 out
->set_type(addr_type
);
208 out
->set_sockaddr((sockaddr
*)&ss
);
209 handler
.set_priority(sd
, opt
.priority
, out
->get_family());
211 std::unique_ptr
<PosixConnectedSocketImpl
> csi(new PosixConnectedSocketImpl(handler
, *out
, sd
, true));
212 *sock
= ConnectedSocket(std::move(csi
));
216 void PosixWorker::initialize()
220 int PosixWorker::listen(entity_addr_t
&sa
,
222 const SocketOptions
&opt
,
225 int listen_sd
= net
.create_socket(sa
.get_family(), true);
230 int r
= net
.set_nonblock(listen_sd
);
236 r
= net
.set_socket_options(listen_sd
, opt
.nodelay
, opt
.rcbuf_size
);
242 r
= ::bind(listen_sd
, sa
.get_sockaddr(), sa
.get_sockaddr_len());
245 ldout(cct
, 10) << __func__
<< " unable to bind to " << sa
.get_sockaddr()
246 << ": " << cpp_strerror(r
) << dendl
;
251 r
= ::listen(listen_sd
, cct
->_conf
->ms_tcp_listen_backlog
);
254 lderr(cct
) << __func__
<< " unable to listen on " << sa
<< ": " << cpp_strerror(r
) << dendl
;
259 *sock
= ServerSocket(
260 std::unique_ptr
<PosixServerSocketImpl
>(
261 new PosixServerSocketImpl(net
, listen_sd
, sa
, addr_slot
)));
265 int PosixWorker::connect(const entity_addr_t
&addr
, const SocketOptions
&opts
, ConnectedSocket
*socket
) {
269 sd
= net
.nonblock_connect(addr
, opts
.connect_bind_addr
);
271 sd
= net
.connect(addr
, opts
.connect_bind_addr
);
278 net
.set_priority(sd
, opts
.priority
, addr
.get_family());
279 *socket
= ConnectedSocket(
280 std::unique_ptr
<PosixConnectedSocketImpl
>(new PosixConnectedSocketImpl(net
, addr
, sd
, !opts
.nonblock
)));
284 PosixNetworkStack::PosixNetworkStack(CephContext
*c
, const string
&t
)