]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <sys/socket.h> | |
18 | #include <netinet/tcp.h> | |
19 | #include <netinet/in.h> | |
20 | #include <arpa/inet.h> | |
21 | #include <errno.h> | |
22 | ||
23 | #include <algorithm> | |
24 | ||
25 | #include "PosixStack.h" | |
26 | ||
27 | #include "include/buffer.h" | |
28 | #include "include/str_list.h" | |
7c673cae FG |
29 | #include "common/errno.h" |
30 | #include "common/strtol.h" | |
31 | #include "common/dout.h" | |
3efd9988 | 32 | #include "msg/Messenger.h" |
91327a77 | 33 | #include "include/compat.h" |
3efd9988 | 34 | #include "include/sock_compat.h" |
7c673cae FG |
35 | |
36 | #define dout_subsys ceph_subsys_ms | |
37 | #undef dout_prefix | |
38 | #define dout_prefix *_dout << "PosixStack " | |
39 | ||
40 | class PosixConnectedSocketImpl final : public ConnectedSocketImpl { | |
f67539c2 | 41 | ceph::NetHandler &handler; |
7c673cae FG |
42 | int _fd; |
43 | entity_addr_t sa; | |
44 | bool connected; | |
7c673cae FG |
45 | |
46 | public: | |
f67539c2 TL |
47 | explicit PosixConnectedSocketImpl(ceph::NetHandler &h, const entity_addr_t &sa, |
48 | int f, bool connected) | |
7c673cae FG |
49 | : handler(h), _fd(f), sa(sa), connected(connected) {} |
50 | ||
51 | int is_connected() override { | |
52 | if (connected) | |
53 | return 1; | |
54 | ||
55 | int r = handler.reconnect(sa, _fd); | |
56 | if (r == 0) { | |
57 | connected = true; | |
58 | return 1; | |
59 | } else if (r < 0) { | |
60 | return r; | |
61 | } else { | |
62 | return 0; | |
63 | } | |
64 | } | |
65 | ||
7c673cae | 66 | ssize_t read(char *buf, size_t len) override { |
f67539c2 TL |
67 | #ifdef _WIN32 |
68 | ssize_t r = ::recv(_fd, buf, len, 0); | |
69 | #else | |
7c673cae | 70 | ssize_t r = ::read(_fd, buf, len); |
f67539c2 | 71 | #endif |
7c673cae | 72 | if (r < 0) |
f67539c2 | 73 | r = -ceph_sock_errno(); |
7c673cae FG |
74 | return r; |
75 | } | |
76 | ||
7c673cae | 77 | // return the sent length |
11fdf7f2 | 78 | // < 0 means error occurred |
f67539c2 | 79 | #ifndef _WIN32 |
7c673cae FG |
80 | static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) |
81 | { | |
7c673cae FG |
82 | size_t sent = 0; |
83 | while (1) { | |
3efd9988 | 84 | MSGR_SIGPIPE_STOPPER; |
7c673cae | 85 | ssize_t r; |
7c673cae | 86 | r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); |
7c673cae | 87 | if (r < 0) { |
f67539c2 TL |
88 | int err = ceph_sock_errno(); |
89 | if (err == EINTR) { | |
7c673cae | 90 | continue; |
f67539c2 | 91 | } else if (err == EAGAIN) { |
7c673cae FG |
92 | break; |
93 | } | |
f67539c2 | 94 | return -err; |
7c673cae FG |
95 | } |
96 | ||
97 | sent += r; | |
98 | if (len == sent) break; | |
99 | ||
100 | while (r > 0) { | |
101 | if (msg.msg_iov[0].iov_len <= (size_t)r) { | |
102 | // drain this whole item | |
103 | r -= msg.msg_iov[0].iov_len; | |
104 | msg.msg_iov++; | |
105 | msg.msg_iovlen--; | |
106 | } else { | |
107 | msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r; | |
108 | msg.msg_iov[0].iov_len -= r; | |
109 | break; | |
110 | } | |
111 | } | |
112 | } | |
7c673cae FG |
113 | return (ssize_t)sent; |
114 | } | |
115 | ||
f67539c2 | 116 | ssize_t send(ceph::buffer::list &bl, bool more) override { |
7c673cae | 117 | size_t sent_bytes = 0; |
11fdf7f2 | 118 | auto pb = std::cbegin(bl.buffers()); |
9f95a23c | 119 | uint64_t left_pbrs = bl.get_num_buffers(); |
7c673cae FG |
120 | while (left_pbrs) { |
121 | struct msghdr msg; | |
122 | struct iovec msgvec[IOV_MAX]; | |
11fdf7f2 | 123 | uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX); |
7c673cae | 124 | left_pbrs -= size; |
92f5a8d4 | 125 | // FIPS zeroization audit 20191115: this memset is not security related. |
7c673cae | 126 | memset(&msg, 0, sizeof(msg)); |
11fdf7f2 | 127 | msg.msg_iovlen = size; |
7c673cae FG |
128 | msg.msg_iov = msgvec; |
129 | unsigned msglen = 0; | |
11fdf7f2 TL |
130 | for (auto iov = msgvec; iov != msgvec + size; iov++) { |
131 | iov->iov_base = (void*)(pb->c_str()); | |
132 | iov->iov_len = pb->length(); | |
133 | msglen += pb->length(); | |
134 | ++pb; | |
7c673cae | 135 | } |
7c673cae FG |
136 | ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more); |
137 | if (r < 0) | |
138 | return r; | |
139 | ||
140 | // "r" is the remaining length | |
141 | sent_bytes += r; | |
142 | if (static_cast<unsigned>(r) < msglen) | |
143 | break; | |
144 | // only "r" == 0 continue | |
145 | } | |
146 | ||
147 | if (sent_bytes) { | |
f67539c2 | 148 | ceph::buffer::list swapped; |
7c673cae FG |
149 | if (sent_bytes < bl.length()) { |
150 | bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped); | |
151 | bl.swap(swapped); | |
152 | } else { | |
153 | bl.clear(); | |
154 | } | |
155 | } | |
156 | ||
157 | return static_cast<ssize_t>(sent_bytes); | |
158 | } | |
f67539c2 TL |
159 | #else |
160 | ssize_t send(bufferlist &bl, bool more) override | |
161 | { | |
162 | size_t total_sent_bytes = 0; | |
163 | auto pb = std::cbegin(bl.buffers()); | |
164 | uint64_t left_pbrs = bl.get_num_buffers(); | |
165 | while (left_pbrs) { | |
166 | WSABUF msgvec[IOV_MAX]; | |
167 | uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX); | |
168 | left_pbrs -= size; | |
169 | unsigned msglen = 0; | |
170 | for (auto iov = msgvec; iov != msgvec + size; iov++) { | |
171 | iov->buf = const_cast<char*>(pb->c_str()); | |
172 | iov->len = pb->length(); | |
173 | msglen += pb->length(); | |
174 | ++pb; | |
175 | } | |
176 | DWORD sent_bytes = 0; | |
177 | DWORD flags = 0; | |
178 | if (more) | |
179 | flags |= MSG_PARTIAL; | |
180 | ||
181 | int ret_val = WSASend(_fd, msgvec, size, &sent_bytes, flags, NULL, NULL); | |
182 | if (ret_val) | |
183 | return -ret_val; | |
184 | ||
185 | total_sent_bytes += sent_bytes; | |
186 | if (static_cast<unsigned>(sent_bytes) < msglen) | |
187 | break; | |
188 | } | |
189 | ||
190 | if (total_sent_bytes) { | |
191 | bufferlist swapped; | |
192 | if (total_sent_bytes < bl.length()) { | |
193 | bl.splice(total_sent_bytes, bl.length()-total_sent_bytes, &swapped); | |
194 | bl.swap(swapped); | |
195 | } else { | |
196 | bl.clear(); | |
197 | } | |
198 | } | |
199 | ||
200 | return static_cast<ssize_t>(total_sent_bytes); | |
201 | } | |
202 | #endif | |
7c673cae FG |
203 | void shutdown() override { |
204 | ::shutdown(_fd, SHUT_RDWR); | |
205 | } | |
206 | void close() override { | |
f67539c2 | 207 | compat_closesocket(_fd); |
7c673cae | 208 | } |
1e59de90 TL |
209 | void set_priority(int sd, int prio, int domain) override { |
210 | handler.set_priority(sd, prio, domain); | |
211 | } | |
7c673cae FG |
212 | int fd() const override { |
213 | return _fd; | |
214 | } | |
215 | friend class PosixServerSocketImpl; | |
216 | friend class PosixNetworkStack; | |
217 | }; | |
218 | ||
219 | class PosixServerSocketImpl : public ServerSocketImpl { | |
f67539c2 | 220 | ceph::NetHandler &handler; |
7c673cae FG |
221 | int _fd; |
222 | ||
223 | public: | |
f67539c2 | 224 | explicit PosixServerSocketImpl(ceph::NetHandler &h, int f, |
11fdf7f2 TL |
225 | const entity_addr_t& listen_addr, unsigned slot) |
226 | : ServerSocketImpl(listen_addr.get_type(), slot), | |
227 | handler(h), _fd(f) {} | |
7c673cae FG |
228 | int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; |
229 | void abort_accept() override { | |
230 | ::close(_fd); | |
9f95a23c | 231 | _fd = -1; |
7c673cae FG |
232 | } |
233 | int fd() const override { | |
234 | return _fd; | |
235 | } | |
236 | }; | |
237 | ||
238 | int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { | |
11fdf7f2 | 239 | ceph_assert(sock); |
7c673cae FG |
240 | sockaddr_storage ss; |
241 | socklen_t slen = sizeof(ss); | |
91327a77 | 242 | int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen); |
7c673cae | 243 | if (sd < 0) { |
f67539c2 | 244 | return -ceph_sock_errno(); |
7c673cae FG |
245 | } |
246 | ||
7c673cae FG |
247 | int r = handler.set_nonblock(sd); |
248 | if (r < 0) { | |
249 | ::close(sd); | |
f67539c2 | 250 | return -ceph_sock_errno(); |
7c673cae FG |
251 | } |
252 | ||
253 | r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size); | |
254 | if (r < 0) { | |
255 | ::close(sd); | |
f67539c2 | 256 | return -ceph_sock_errno(); |
7c673cae FG |
257 | } |
258 | ||
11fdf7f2 | 259 | ceph_assert(NULL != out); //out should not be NULL in accept connection |
7c673cae | 260 | |
11fdf7f2 | 261 | out->set_type(addr_type); |
7c673cae FG |
262 | out->set_sockaddr((sockaddr*)&ss); |
263 | handler.set_priority(sd, opt.priority, out->get_family()); | |
264 | ||
265 | std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true)); | |
266 | *sock = ConnectedSocket(std::move(csi)); | |
267 | return 0; | |
268 | } | |
269 | ||
270 | void PosixWorker::initialize() | |
271 | { | |
272 | } | |
273 | ||
11fdf7f2 TL |
274 | int PosixWorker::listen(entity_addr_t &sa, |
275 | unsigned addr_slot, | |
276 | const SocketOptions &opt, | |
7c673cae FG |
277 | ServerSocket *sock) |
278 | { | |
279 | int listen_sd = net.create_socket(sa.get_family(), true); | |
280 | if (listen_sd < 0) { | |
f67539c2 | 281 | return -ceph_sock_errno(); |
7c673cae FG |
282 | } |
283 | ||
284 | int r = net.set_nonblock(listen_sd); | |
285 | if (r < 0) { | |
286 | ::close(listen_sd); | |
f67539c2 | 287 | return -ceph_sock_errno(); |
7c673cae FG |
288 | } |
289 | ||
7c673cae FG |
290 | r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size); |
291 | if (r < 0) { | |
292 | ::close(listen_sd); | |
f67539c2 | 293 | return -ceph_sock_errno(); |
7c673cae FG |
294 | } |
295 | ||
296 | r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len()); | |
297 | if (r < 0) { | |
f67539c2 | 298 | r = -ceph_sock_errno(); |
7c673cae FG |
299 | ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() |
300 | << ": " << cpp_strerror(r) << dendl; | |
301 | ::close(listen_sd); | |
302 | return r; | |
303 | } | |
304 | ||
224ce89b | 305 | r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog); |
7c673cae | 306 | if (r < 0) { |
f67539c2 | 307 | r = -ceph_sock_errno(); |
7c673cae FG |
308 | lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl; |
309 | ::close(listen_sd); | |
310 | return r; | |
311 | } | |
312 | ||
313 | *sock = ServerSocket( | |
314 | std::unique_ptr<PosixServerSocketImpl>( | |
11fdf7f2 | 315 | new PosixServerSocketImpl(net, listen_sd, sa, addr_slot))); |
7c673cae FG |
316 | return 0; |
317 | } | |
318 | ||
319 | int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { | |
320 | int sd; | |
321 | ||
322 | if (opts.nonblock) { | |
323 | sd = net.nonblock_connect(addr, opts.connect_bind_addr); | |
324 | } else { | |
325 | sd = net.connect(addr, opts.connect_bind_addr); | |
326 | } | |
327 | ||
328 | if (sd < 0) { | |
f67539c2 | 329 | return -ceph_sock_errno(); |
7c673cae FG |
330 | } |
331 | ||
332 | net.set_priority(sd, opts.priority, addr.get_family()); | |
333 | *socket = ConnectedSocket( | |
334 | std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock))); | |
335 | return 0; | |
336 | } | |
337 | ||
f67539c2 TL |
338 | PosixNetworkStack::PosixNetworkStack(CephContext *c) |
339 | : NetworkStack(c) | |
7c673cae | 340 | { |
7c673cae | 341 | } |