]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2016 XSKY <haomai@xsky.com> | |
7 | * | |
8 | * Author: Haomai Wang <haomaiwang@gmail.com> | |
9 | * | |
10 | * This is free software; you can redistribute it and/or | |
11 | * modify it under the terms of the GNU Lesser General Public | |
12 | * License version 2.1, as published by the Free Software | |
13 | * Foundation. See file COPYING. | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <sys/socket.h> | |
18 | #include <netinet/tcp.h> | |
19 | #include <netinet/in.h> | |
20 | #include <arpa/inet.h> | |
21 | #include <errno.h> | |
22 | ||
23 | #include <algorithm> | |
24 | ||
25 | #include "PosixStack.h" | |
26 | ||
27 | #include "include/buffer.h" | |
28 | #include "include/str_list.h" | |
7c673cae FG |
29 | #include "common/errno.h" |
30 | #include "common/strtol.h" | |
31 | #include "common/dout.h" | |
7c673cae | 32 | #include "common/simple_spin.h" |
3efd9988 | 33 | #include "msg/Messenger.h" |
91327a77 | 34 | #include "include/compat.h" |
3efd9988 | 35 | #include "include/sock_compat.h" |
7c673cae FG |
36 | |
37 | #define dout_subsys ceph_subsys_ms | |
38 | #undef dout_prefix | |
39 | #define dout_prefix *_dout << "PosixStack " | |
40 | ||
41 | class PosixConnectedSocketImpl final : public ConnectedSocketImpl { | |
42 | NetHandler &handler; | |
43 | int _fd; | |
44 | entity_addr_t sa; | |
45 | bool connected; | |
7c673cae FG |
46 | |
47 | public: | |
48 | explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected) | |
49 | : handler(h), _fd(f), sa(sa), connected(connected) {} | |
50 | ||
51 | int is_connected() override { | |
52 | if (connected) | |
53 | return 1; | |
54 | ||
55 | int r = handler.reconnect(sa, _fd); | |
56 | if (r == 0) { | |
57 | connected = true; | |
58 | return 1; | |
59 | } else if (r < 0) { | |
60 | return r; | |
61 | } else { | |
62 | return 0; | |
63 | } | |
64 | } | |
65 | ||
66 | ssize_t zero_copy_read(bufferptr&) override { | |
67 | return -EOPNOTSUPP; | |
68 | } | |
69 | ||
70 | ssize_t read(char *buf, size_t len) override { | |
71 | ssize_t r = ::read(_fd, buf, len); | |
72 | if (r < 0) | |
73 | r = -errno; | |
74 | return r; | |
75 | } | |
76 | ||
7c673cae FG |
77 | // return the sent length |
78 | // < 0 means error occured | |
79 | static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) | |
80 | { | |
7c673cae FG |
81 | size_t sent = 0; |
82 | while (1) { | |
3efd9988 | 83 | MSGR_SIGPIPE_STOPPER; |
7c673cae | 84 | ssize_t r; |
7c673cae | 85 | r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); |
7c673cae FG |
86 | if (r < 0) { |
87 | if (errno == EINTR) { | |
88 | continue; | |
89 | } else if (errno == EAGAIN) { | |
90 | break; | |
91 | } | |
92 | return -errno; | |
93 | } | |
94 | ||
95 | sent += r; | |
96 | if (len == sent) break; | |
97 | ||
98 | while (r > 0) { | |
99 | if (msg.msg_iov[0].iov_len <= (size_t)r) { | |
100 | // drain this whole item | |
101 | r -= msg.msg_iov[0].iov_len; | |
102 | msg.msg_iov++; | |
103 | msg.msg_iovlen--; | |
104 | } else { | |
105 | msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r; | |
106 | msg.msg_iov[0].iov_len -= r; | |
107 | break; | |
108 | } | |
109 | } | |
110 | } | |
7c673cae FG |
111 | return (ssize_t)sent; |
112 | } | |
113 | ||
114 | ssize_t send(bufferlist &bl, bool more) override { | |
115 | size_t sent_bytes = 0; | |
116 | std::list<bufferptr>::const_iterator pb = bl.buffers().begin(); | |
117 | uint64_t left_pbrs = bl.buffers().size(); | |
118 | while (left_pbrs) { | |
119 | struct msghdr msg; | |
120 | struct iovec msgvec[IOV_MAX]; | |
121 | uint64_t size = MIN(left_pbrs, IOV_MAX); | |
122 | left_pbrs -= size; | |
123 | memset(&msg, 0, sizeof(msg)); | |
124 | msg.msg_iovlen = 0; | |
125 | msg.msg_iov = msgvec; | |
126 | unsigned msglen = 0; | |
127 | while (size > 0) { | |
128 | msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()); | |
129 | msgvec[msg.msg_iovlen].iov_len = pb->length(); | |
130 | msg.msg_iovlen++; | |
131 | msglen += pb->length(); | |
132 | ++pb; | |
133 | size--; | |
134 | } | |
135 | ||
136 | ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more); | |
137 | if (r < 0) | |
138 | return r; | |
139 | ||
140 | // "r" is the remaining length | |
141 | sent_bytes += r; | |
142 | if (static_cast<unsigned>(r) < msglen) | |
143 | break; | |
144 | // only "r" == 0 continue | |
145 | } | |
146 | ||
147 | if (sent_bytes) { | |
148 | bufferlist swapped; | |
149 | if (sent_bytes < bl.length()) { | |
150 | bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped); | |
151 | bl.swap(swapped); | |
152 | } else { | |
153 | bl.clear(); | |
154 | } | |
155 | } | |
156 | ||
157 | return static_cast<ssize_t>(sent_bytes); | |
158 | } | |
159 | void shutdown() override { | |
160 | ::shutdown(_fd, SHUT_RDWR); | |
161 | } | |
162 | void close() override { | |
163 | ::close(_fd); | |
164 | } | |
165 | int fd() const override { | |
166 | return _fd; | |
167 | } | |
168 | friend class PosixServerSocketImpl; | |
169 | friend class PosixNetworkStack; | |
170 | }; | |
171 | ||
172 | class PosixServerSocketImpl : public ServerSocketImpl { | |
173 | NetHandler &handler; | |
174 | int _fd; | |
175 | ||
176 | public: | |
177 | explicit PosixServerSocketImpl(NetHandler &h, int f): handler(h), _fd(f) {} | |
178 | int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override; | |
179 | void abort_accept() override { | |
180 | ::close(_fd); | |
181 | } | |
182 | int fd() const override { | |
183 | return _fd; | |
184 | } | |
185 | }; | |
186 | ||
187 | int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) { | |
188 | assert(sock); | |
189 | sockaddr_storage ss; | |
190 | socklen_t slen = sizeof(ss); | |
91327a77 | 191 | int sd = accept_cloexec(_fd, (sockaddr*)&ss, &slen); |
7c673cae FG |
192 | if (sd < 0) { |
193 | return -errno; | |
194 | } | |
195 | ||
7c673cae FG |
196 | int r = handler.set_nonblock(sd); |
197 | if (r < 0) { | |
198 | ::close(sd); | |
199 | return -errno; | |
200 | } | |
201 | ||
202 | r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size); | |
203 | if (r < 0) { | |
204 | ::close(sd); | |
205 | return -errno; | |
206 | } | |
207 | ||
208 | assert(NULL != out); //out should not be NULL in accept connection | |
209 | ||
210 | out->set_sockaddr((sockaddr*)&ss); | |
211 | handler.set_priority(sd, opt.priority, out->get_family()); | |
212 | ||
213 | std::unique_ptr<PosixConnectedSocketImpl> csi(new PosixConnectedSocketImpl(handler, *out, sd, true)); | |
214 | *sock = ConnectedSocket(std::move(csi)); | |
215 | return 0; | |
216 | } | |
217 | ||
218 | void PosixWorker::initialize() | |
219 | { | |
220 | } | |
221 | ||
222 | int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt, | |
223 | ServerSocket *sock) | |
224 | { | |
225 | int listen_sd = net.create_socket(sa.get_family(), true); | |
226 | if (listen_sd < 0) { | |
227 | return -errno; | |
228 | } | |
229 | ||
230 | int r = net.set_nonblock(listen_sd); | |
231 | if (r < 0) { | |
232 | ::close(listen_sd); | |
233 | return -errno; | |
234 | } | |
235 | ||
7c673cae FG |
236 | r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size); |
237 | if (r < 0) { | |
238 | ::close(listen_sd); | |
239 | return -errno; | |
240 | } | |
241 | ||
242 | r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len()); | |
243 | if (r < 0) { | |
244 | r = -errno; | |
245 | ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() | |
246 | << ": " << cpp_strerror(r) << dendl; | |
247 | ::close(listen_sd); | |
248 | return r; | |
249 | } | |
250 | ||
224ce89b | 251 | r = ::listen(listen_sd, cct->_conf->ms_tcp_listen_backlog); |
7c673cae FG |
252 | if (r < 0) { |
253 | r = -errno; | |
254 | lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl; | |
255 | ::close(listen_sd); | |
256 | return r; | |
257 | } | |
258 | ||
259 | *sock = ServerSocket( | |
260 | std::unique_ptr<PosixServerSocketImpl>( | |
261 | new PosixServerSocketImpl(net, listen_sd))); | |
262 | return 0; | |
263 | } | |
264 | ||
265 | int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { | |
266 | int sd; | |
267 | ||
268 | if (opts.nonblock) { | |
269 | sd = net.nonblock_connect(addr, opts.connect_bind_addr); | |
270 | } else { | |
271 | sd = net.connect(addr, opts.connect_bind_addr); | |
272 | } | |
273 | ||
274 | if (sd < 0) { | |
275 | return -errno; | |
276 | } | |
277 | ||
278 | net.set_priority(sd, opts.priority, addr.get_family()); | |
279 | *socket = ConnectedSocket( | |
280 | std::unique_ptr<PosixConnectedSocketImpl>(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock))); | |
281 | return 0; | |
282 | } | |
283 | ||
284 | PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t) | |
285 | : NetworkStack(c, t) | |
286 | { | |
287 | vector<string> corestrs; | |
288 | get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs); | |
289 | for (auto & corestr : corestrs) { | |
290 | string err; | |
291 | int coreid = strict_strtol(corestr.c_str(), 10, &err); | |
292 | if (err == "") | |
293 | coreids.push_back(coreid); | |
294 | else | |
295 | lderr(cct) << __func__ << " failed to parse " << corestr << " in " << cct->_conf->ms_async_affinity_cores << dendl; | |
296 | } | |
297 | } |