]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <sys/socket.h> | |
18 | #include <netinet/tcp.h> | |
19 | #include <netinet/in.h> | |
20 | #include <arpa/inet.h> | |
21 | #include <errno.h> | |
22 | ||
23 | #include <algorithm> | |
24 | ||
25 | #include "PosixStack.h" | |
26 | ||
27 | #include "include/buffer.h" | |
28 | #include "include/str_list.h" | |
7c673cae FG |
29 | #include "common/errno.h" |
30 | #include "common/strtol.h" | |
31 | #include "common/dout.h" | |
3efd9988 | 32 | #include "msg/Messenger.h" |
91327a77 | 33 | #include "include/compat.h" |
3efd9988 | 34 | #include "include/sock_compat.h" |
7c673cae FG |
35 | |
36 | #define dout_subsys ceph_subsys_ms | |
37 | #undef dout_prefix | |
38 | #define dout_prefix *_dout << "PosixStack " | |
39 | ||
40 | class PosixConnectedSocketImpl final : public ConnectedSocketImpl { | |
41 | NetHandler &handler; | |
42 | int _fd; | |
43 | entity_addr_t sa; | |
44 | bool connected; | |
7c673cae FG |
45 | |
46 | public: | |
47 | explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected) | |
48 | : handler(h), _fd(f), sa(sa), connected(connected) {} | |
49 | ||
50 | int is_connected() override { | |
51 | if (connected) | |
52 | return 1; | |
53 | ||
54 | int r = handler.reconnect(sa, _fd); | |
55 | if (r == 0) { | |
56 | connected = true; | |
57 | return 1; | |
58 | } else if (r < 0) { | |
59 | return r; | |
60 | } else { | |
61 | return 0; | |
62 | } | |
63 | } | |
64 | ||
65 | ssize_t zero_copy_read(bufferptr&) override { | |
66 | return -EOPNOTSUPP; | |
67 | } | |
68 | ||
69 | ssize_t read(char *buf, size_t len) override { | |
70 | ssize_t r = ::read(_fd, buf, len); | |
71 | if (r < 0) | |
72 | r = -errno; | |
73 | return r; | |
74 | } | |
75 | ||
7c673cae | 76 | // return the sent length |
11fdf7f2 | 77 | // < 0 means error occurred |
7c673cae FG |
78 | static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) |
79 | { | |
7c673cae FG |
80 | size_t sent = 0; |
81 | while (1) { | |
3efd9988 | 82 | MSGR_SIGPIPE_STOPPER; |
7c673cae | 83 | ssize_t r; |
7c673cae | 84 | r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); |
7c673cae FG |
85 | if (r < 0) { |
86 | if (errno == EINTR) { | |
87 | continue; | |
88 | } else if (errno == EAGAIN) { | |
89 | break; | |
90 | } | |
91 | return -errno; | |
92 | } | |
93 | ||
94 | sent += r; | |
95 | if (len == sent) break; | |
96 | ||
97 | while (r > 0) { | |
98 | if (msg.msg_iov[0].iov_len <= (size_t)r) { | |
99 | // drain this whole item | |
100 | r -= msg.msg_iov[0].iov_len; | |
101 | msg.msg_iov++; | |
102 | msg.msg_iovlen--; | |
103 | } else { | |
104 | msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r; | |
105 | msg.msg_iov[0].iov_len -= r; | |
106 | break; | |
107 | } | |
108 | } | |
109 | } | |
7c673cae FG |
110 | return (ssize_t)sent; |
111 | } | |
112 | ||
113 | ssize_t send(bufferlist &bl, bool more) override { | |
114 | size_t sent_bytes = 0; | |
11fdf7f2 TL |
115 | auto pb = std::cbegin(bl.buffers()); |
116 | uint64_t left_pbrs = std::size(bl.buffers()); | |
7c673cae FG |
117 | while (left_pbrs) { |
118 | struct msghdr msg; | |
119 | struct iovec msgvec[IOV_MAX]; | |
11fdf7f2 | 120 | uint64_t size = std::min<uint64_t>(left_pbrs, IOV_MAX); |
7c673cae FG |
121 | left_pbrs -= size; |
122 | memset(&msg, 0, sizeof(msg)); | |
11fdf7f2 | 123 | msg.msg_iovlen = size; |
7c673cae FG |
124 | msg.msg_iov = msgvec; |
125 | unsigned msglen = 0; | |
11fdf7f2 TL |
126 | for (auto iov = msgvec; iov != msgvec + size; iov++) { |
127 | iov->iov_base = (void*)(pb->c_str()); | |
128 | iov->iov_len = pb->length(); | |
129 | msglen += pb->length(); | |
130 | ++pb; | |
7c673cae | 131 | } |
7c673cae FG |
132 | ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more); |
133 | if (r < 0) | |
134 | return r; | |
135 | ||
136 | // "r" is the remaining length | |
137 | sent_bytes += r; | |
138 | if (static_cast<unsigned>(r) < msglen) | |
139 | break; | |
140 | // only "r" == 0 continue | |
141 | } | |
142 | ||
143 | if (sent_bytes) { | |
144 | bufferlist swapped; | |
145 | if (sent_bytes < bl.length()) { | |
146 | bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped); | |
147 | bl.swap(swapped); | |
148 | } else { | |
149 | bl.clear(); | |
150 | } | |
151 | } | |
152 | ||
153 | return static_cast<ssize_t>(sent_bytes); | |
154 | } | |
155 | void shutdown() override { | |
156 | ::shutdown(_fd, SHUT_RDWR); | |
157 | } | |
158 | void close() override { | |
159 | ::close(_fd); | |
160 | } | |
161 | int fd() const override { | |
162 | return _fd; | |
163 | } | |
11fdf7f2 TL |
164 | int socket_fd() const override { |
165 | return _fd; | |
166 | } | |
7c673cae FG |
167 | friend class PosixServerSocketImpl; |
168 | friend class PosixNetworkStack; | |
169 | }; | |
170 | ||
171 | class PosixServerSocketImpl : public ServerSocketImpl { | |
172 | NetHandler &handler; | |
173 | int _fd; | |
174 | ||
175 | public: | |
11fdf7f2 TL |
176 | explicit PosixServerSocketImpl(NetHandler &h, int f, |
177 | const entity_addr_t& listen_addr, unsigned slot) | |
178 | : ServerSocketImpl(listen_addr.get_type(), slot), | |
179 | handler(h), _fd(f) {} | |
7c673cae FG |
180 | int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; |
181 | void abort_accept() override { | |
182 | ::close(_fd); | |
183 | } | |
184 | int fd() const override { | |
185 | return _fd; | |
186 | } | |
187 | }; | |
188 | ||
189 | int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { | |
11fdf7f2 | 190 | ceph_assert(sock); |
7c673cae FG |
191 | sockaddr_storage ss; |
192 | socklen_t slen = sizeof(ss); | |
91327a77 | 193 | int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen); |
7c673cae FG |
194 | if (sd < 0) { |
195 | return -errno; | |
196 | } | |
197 | ||
7c673cae FG |
198 | int r = handler.set_nonblock(sd); |
199 | if (r < 0) { | |
200 | ::close(sd); | |
201 | return -errno; | |
202 | } | |
203 | ||
204 | r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size); | |
205 | if (r < 0) { | |
206 | ::close(sd); | |
207 | return -errno; | |
208 | } | |
209 | ||
11fdf7f2 | 210 | ceph_assert(NULL != out); //out should not be NULL in accept connection |
7c673cae | 211 | |
11fdf7f2 | 212 | out->set_type(addr_type); |
7c673cae FG |
213 | out->set_sockaddr((sockaddr*)&ss); |
214 | handler.set_priority(sd, opt.priority, out->get_family()); | |
215 | ||
216 | std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true)); | |
217 | *sock = ConnectedSocket(std::move(csi)); | |
218 | return 0; | |
219 | } | |
220 | ||
221 | void PosixWorker::initialize() | |
222 | { | |
223 | } | |
224 | ||
11fdf7f2 TL |
225 | int PosixWorker::listen(entity_addr_t &sa, |
226 | unsigned addr_slot, | |
227 | const SocketOptions &opt, | |
7c673cae FG |
228 | ServerSocket *sock) |
229 | { | |
230 | int listen_sd = net.create_socket(sa.get_family(), true); | |
231 | if (listen_sd < 0) { | |
232 | return -errno; | |
233 | } | |
234 | ||
235 | int r = net.set_nonblock(listen_sd); | |
236 | if (r < 0) { | |
237 | ::close(listen_sd); | |
238 | return -errno; | |
239 | } | |
240 | ||
7c673cae FG |
241 | r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size); |
242 | if (r < 0) { | |
243 | ::close(listen_sd); | |
244 | return -errno; | |
245 | } | |
246 | ||
247 | r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len()); | |
248 | if (r < 0) { | |
249 | r = -errno; | |
250 | ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() | |
251 | << ": " << cpp_strerror(r) << dendl; | |
252 | ::close(listen_sd); | |
253 | return r; | |
254 | } | |
255 | ||
224ce89b | 256 | r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog); |
7c673cae FG |
257 | if (r < 0) { |
258 | r = -errno; | |
259 | lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl; | |
260 | ::close(listen_sd); | |
261 | return r; | |
262 | } | |
263 | ||
264 | *sock = ServerSocket( | |
265 | std::unique_ptr<PosixServerSocketImpl>( | |
11fdf7f2 | 266 | new PosixServerSocketImpl(net, listen_sd, sa, addr_slot))); |
7c673cae FG |
267 | return 0; |
268 | } | |
269 | ||
270 | int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { | |
271 | int sd; | |
272 | ||
273 | if (opts.nonblock) { | |
274 | sd = net.nonblock_connect(addr, opts.connect_bind_addr); | |
275 | } else { | |
276 | sd = net.connect(addr, opts.connect_bind_addr); | |
277 | } | |
278 | ||
279 | if (sd < 0) { | |
280 | return -errno; | |
281 | } | |
282 | ||
283 | net.set_priority(sd, opts.priority, addr.get_family()); | |
284 | *socket = ConnectedSocket( | |
285 | std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock))); | |
286 | return 0; | |
287 | } | |
288 | ||
289 | PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t) | |
290 | : NetworkStack(c, t) | |
291 | { | |
7c673cae | 292 | } |