]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | module thrift.server.transport.socket; | |
20 | ||
21 | import core.thread : dur, Duration, Thread; | |
22 | import core.stdc.string : strerror; | |
23 | import std.array : empty; | |
24 | import std.conv : text, to; | |
25 | import std.exception : enforce; | |
26 | import std.socket; | |
27 | import thrift.base; | |
28 | import thrift.internal.socket; | |
29 | import thrift.server.transport.base; | |
30 | import thrift.transport.base; | |
31 | import thrift.transport.socket; | |
32 | import thrift.util.awaitable; | |
33 | import thrift.util.cancellation; | |
34 | ||
35 | private alias TServerTransportException STE; | |
36 | ||
37 | /** | |
38 | * Server socket implementation of TServerTransport. | |
39 | * | |
40 | * Maps to std.socket listen()/accept(); only provides TCP/IP sockets (i.e. no | |
41 | * Unix sockets) for now, because they are not supported in std.socket. | |
42 | */ | |
43 | class TServerSocket : TServerTransport { | |
44 | /** | |
45 | * Constructs a new instance. | |
46 | * | |
47 | * Params: | |
48 | * port = The TCP port to listen at (host is always 0.0.0.0). | |
49 | * sendTimeout = The socket sending timeout. | |
50 | * recvTimout = The socket receiving timeout. | |
51 | */ | |
52 | this(ushort port, Duration sendTimeout = dur!"hnsecs"(0), | |
53 | Duration recvTimeout = dur!"hnsecs"(0)) | |
54 | { | |
55 | port_ = port; | |
56 | sendTimeout_ = sendTimeout; | |
57 | recvTimeout_ = recvTimeout; | |
58 | ||
59 | cancellationNotifier_ = new TSocketNotifier; | |
60 | ||
61 | socketSet_ = new SocketSet; | |
62 | } | |
63 | ||
64 | /// The port the server socket listens at. | |
65 | ushort port() const @property { | |
66 | return port_; | |
67 | } | |
68 | ||
69 | /// The socket sending timeout, zero to block infinitely. | |
70 | void sendTimeout(Duration sendTimeout) @property { | |
71 | sendTimeout_ = sendTimeout; | |
72 | } | |
73 | ||
74 | /// The socket receiving timeout, zero to block infinitely. | |
75 | void recvTimeout(Duration recvTimeout) @property { | |
76 | recvTimeout_ = recvTimeout; | |
77 | } | |
78 | ||
79 | /// The maximum number of listening retries if it fails. | |
80 | void retryLimit(ushort retryLimit) @property { | |
81 | retryLimit_ = retryLimit; | |
82 | } | |
83 | ||
84 | /// The delay between a listening attempt failing and retrying it. | |
85 | void retryDelay(Duration retryDelay) @property { | |
86 | retryDelay_ = retryDelay; | |
87 | } | |
88 | ||
89 | /// The size of the TCP send buffer, in bytes. | |
90 | void tcpSendBuffer(int tcpSendBuffer) @property { | |
91 | tcpSendBuffer_ = tcpSendBuffer; | |
92 | } | |
93 | ||
94 | /// The size of the TCP receiving buffer, in bytes. | |
95 | void tcpRecvBuffer(int tcpRecvBuffer) @property { | |
96 | tcpRecvBuffer_ = tcpRecvBuffer; | |
97 | } | |
98 | ||
99 | /// Whether to listen on IPv6 only, if IPv6 support is detected | |
100 | /// (default: false). | |
101 | void ipv6Only(bool value) @property { | |
102 | ipv6Only_ = value; | |
103 | } | |
104 | ||
105 | override void listen() { | |
106 | enforce(!isListening, new STE(STE.Type.ALREADY_LISTENING)); | |
107 | ||
108 | serverSocket_ = makeSocketAndListen(port_, ACCEPT_BACKLOG, retryLimit_, | |
109 | retryDelay_, tcpSendBuffer_, tcpRecvBuffer_, ipv6Only_); | |
110 | } | |
111 | ||
112 | override void close() { | |
113 | enforce(isListening, new STE(STE.Type.NOT_LISTENING)); | |
114 | ||
115 | serverSocket_.shutdown(SocketShutdown.BOTH); | |
116 | serverSocket_.close(); | |
117 | serverSocket_ = null; | |
118 | } | |
119 | ||
120 | override bool isListening() @property { | |
121 | return serverSocket_ !is null; | |
122 | } | |
123 | ||
124 | /// Number of connections listen() backlogs. | |
125 | enum ACCEPT_BACKLOG = 1024; | |
126 | ||
127 | override TTransport accept(TCancellation cancellation = null) { | |
128 | enforce(isListening, new STE(STE.Type.NOT_LISTENING)); | |
129 | ||
130 | if (cancellation) cancellationNotifier_.attach(cancellation.triggering); | |
131 | scope (exit) if (cancellation) cancellationNotifier_.detach(); | |
132 | ||
133 | ||
134 | // Too many EINTRs is a fault condition and would need to be handled | |
135 | // manually by our caller, but we can tolerate a certain number. | |
136 | enum MAX_EINTRS = 10; | |
137 | uint numEintrs; | |
138 | ||
139 | while (true) { | |
140 | socketSet_.reset(); | |
141 | socketSet_.add(serverSocket_); | |
142 | socketSet_.add(cancellationNotifier_.socket); | |
143 | ||
144 | auto ret = Socket.select(socketSet_, null, null); | |
145 | enforce(ret != 0, new STE("Socket.select() returned 0.", | |
146 | STE.Type.RESOURCE_FAILED)); | |
147 | ||
148 | if (ret < 0) { | |
149 | // Select itself failed, check if it was just due to an interrupted | |
150 | // syscall. | |
151 | if (getSocketErrno() == INTERRUPTED_ERRNO) { | |
152 | if (numEintrs++ < MAX_EINTRS) { | |
153 | continue; | |
154 | } else { | |
155 | throw new STE("Socket.select() was interrupted by a signal (EINTR) " ~ | |
156 | "more than " ~ to!string(MAX_EINTRS) ~ " times.", | |
157 | STE.Type.RESOURCE_FAILED | |
158 | ); | |
159 | } | |
160 | } | |
161 | throw new STE("Unknown error on Socket.select(): " ~ | |
162 | socketErrnoString(getSocketErrno()), STE.Type.RESOURCE_FAILED); | |
163 | } else { | |
164 | // Check for a ping on the interrupt socket. | |
165 | if (socketSet_.isSet(cancellationNotifier_.socket)) { | |
166 | cancellation.throwIfTriggered(); | |
167 | } | |
168 | ||
169 | // Check for the actual server socket having a connection waiting. | |
170 | if (socketSet_.isSet(serverSocket_)) { | |
171 | break; | |
172 | } | |
173 | } | |
174 | } | |
175 | ||
176 | try { | |
177 | auto client = createTSocket(serverSocket_.accept()); | |
178 | client.sendTimeout = sendTimeout_; | |
179 | client.recvTimeout = recvTimeout_; | |
180 | return client; | |
181 | } catch (SocketException e) { | |
182 | throw new STE("Unknown error on accepting: " ~ to!string(e), | |
183 | STE.Type.RESOURCE_FAILED); | |
184 | } | |
185 | } | |
186 | ||
187 | protected: | |
188 | /** | |
189 | * Allows derived classes to create a different TSocket type. | |
190 | */ | |
191 | TSocket createTSocket(Socket socket) { | |
192 | return new TSocket(socket); | |
193 | } | |
194 | ||
195 | private: | |
196 | ushort port_; | |
197 | Duration sendTimeout_; | |
198 | Duration recvTimeout_; | |
199 | ushort retryLimit_; | |
200 | Duration retryDelay_; | |
201 | uint tcpSendBuffer_; | |
202 | uint tcpRecvBuffer_; | |
203 | bool ipv6Only_; | |
204 | ||
205 | Socket serverSocket_; | |
206 | TSocketNotifier cancellationNotifier_; | |
207 | ||
208 | // Keep socket set between accept() calls to avoid reallocating. | |
209 | SocketSet socketSet_; | |
210 | } | |
211 | ||
212 | Socket makeSocketAndListen(ushort port, int backlog, ushort retryLimit, | |
213 | Duration retryDelay, uint tcpSendBuffer = 0, uint tcpRecvBuffer = 0, | |
214 | bool ipv6Only = false | |
215 | ) { | |
216 | Address localAddr; | |
217 | try { | |
218 | // null represents the wildcard address. | |
219 | auto addrInfos = getAddressInfo(null, to!string(port), | |
220 | AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP); | |
221 | foreach (i, ai; addrInfos) { | |
222 | // Prefer to bind to IPv6 addresses, because then IPv4 is listened to as | |
223 | // well, but not the other way round. | |
224 | if (ai.family == AddressFamily.INET6 || i == (addrInfos.length - 1)) { | |
225 | localAddr = ai.address; | |
226 | break; | |
227 | } | |
228 | } | |
229 | } catch (Exception e) { | |
230 | throw new STE("Could not determine local address to listen on.", | |
231 | STE.Type.RESOURCE_FAILED, __FILE__, __LINE__, e); | |
232 | } | |
233 | ||
234 | Socket socket; | |
235 | try { | |
236 | socket = new Socket(localAddr.addressFamily, SocketType.STREAM, | |
237 | ProtocolType.TCP); | |
238 | } catch (SocketException e) { | |
239 | throw new STE("Could not create accepting socket: " ~ to!string(e), | |
240 | STE.Type.RESOURCE_FAILED); | |
241 | } | |
242 | ||
243 | try { | |
244 | socket.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, ipv6Only); | |
245 | } catch (SocketException e) { | |
246 | // This is somewhat expected on older systems (e.g. pre-Vista Windows), | |
247 | // which do not support the IPV6_V6ONLY flag yet. Racy flag just to avoid | |
248 | // log spew in unit tests. | |
249 | shared static warned = false; | |
250 | if (!warned) { | |
251 | logError("Could not set IPV6_V6ONLY socket option: %s", e); | |
252 | warned = true; | |
253 | } | |
254 | } | |
255 | ||
256 | alias SocketOptionLevel.SOCKET lvlSock; | |
257 | ||
258 | // Prevent 2 maximum segement lifetime delay on accept. | |
259 | try { | |
260 | socket.setOption(lvlSock, SocketOption.REUSEADDR, true); | |
261 | } catch (SocketException e) { | |
262 | throw new STE("Could not set REUSEADDR socket option: " ~ to!string(e), | |
263 | STE.Type.RESOURCE_FAILED); | |
264 | } | |
265 | ||
266 | // Set TCP buffer sizes. | |
267 | if (tcpSendBuffer > 0) { | |
268 | try { | |
269 | socket.setOption(lvlSock, SocketOption.SNDBUF, tcpSendBuffer); | |
270 | } catch (SocketException e) { | |
271 | throw new STE("Could not set socket send buffer size: " ~ to!string(e), | |
272 | STE.Type.RESOURCE_FAILED); | |
273 | } | |
274 | } | |
275 | ||
276 | if (tcpRecvBuffer > 0) { | |
277 | try { | |
278 | socket.setOption(lvlSock, SocketOption.RCVBUF, tcpRecvBuffer); | |
279 | } catch (SocketException e) { | |
280 | throw new STE("Could not set receive send buffer size: " ~ to!string(e), | |
281 | STE.Type.RESOURCE_FAILED); | |
282 | } | |
283 | } | |
284 | ||
285 | // Turn linger off to avoid blocking on socket close. | |
286 | try { | |
287 | Linger l; | |
288 | l.on = 0; | |
289 | l.time = 0; | |
290 | socket.setOption(lvlSock, SocketOption.LINGER, l); | |
291 | } catch (SocketException e) { | |
292 | throw new STE("Could not disable socket linger: " ~ to!string(e), | |
293 | STE.Type.RESOURCE_FAILED); | |
294 | } | |
295 | ||
296 | // Set TCP_NODELAY. | |
297 | try { | |
298 | socket.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, true); | |
299 | } catch (SocketException e) { | |
300 | throw new STE("Could not disable Nagle's algorithm: " ~ to!string(e), | |
301 | STE.Type.RESOURCE_FAILED); | |
302 | } | |
303 | ||
304 | ushort retries; | |
305 | while (true) { | |
306 | try { | |
307 | socket.bind(localAddr); | |
308 | break; | |
309 | } catch (SocketException) {} | |
310 | ||
311 | // If bind() worked, we breaked outside the loop above. | |
312 | retries++; | |
313 | if (retries < retryLimit) { | |
314 | Thread.sleep(retryDelay); | |
315 | } else { | |
316 | throw new STE(text("Could not bind to address: ", localAddr), | |
317 | STE.Type.RESOURCE_FAILED); | |
318 | } | |
319 | } | |
320 | ||
321 | socket.listen(backlog); | |
322 | return socket; | |
323 | } | |
324 | ||
325 | unittest { | |
326 | // Test interrupt(). | |
327 | { | |
328 | auto sock = new TServerSocket(0); | |
329 | sock.listen(); | |
330 | scope (exit) sock.close(); | |
331 | ||
332 | auto cancellation = new TCancellationOrigin; | |
333 | ||
334 | auto intThread = new Thread({ | |
335 | // Sleep for a bit until the socket is accepting. | |
336 | Thread.sleep(dur!"msecs"(50)); | |
337 | cancellation.trigger(); | |
338 | }); | |
339 | intThread.start(); | |
340 | ||
341 | import std.exception; | |
342 | assertThrown!TCancelledException(sock.accept(cancellation)); | |
343 | } | |
344 | ||
345 | // Test receive() timeout on accepted client sockets. | |
346 | { | |
347 | immutable port = 11122; | |
348 | auto timeout = dur!"msecs"(500); | |
349 | auto serverSock = new TServerSocket(port, timeout, timeout); | |
350 | serverSock.listen(); | |
351 | scope (exit) serverSock.close(); | |
352 | ||
353 | auto clientSock = new TSocket("127.0.0.1", port); | |
354 | clientSock.open(); | |
355 | scope (exit) clientSock.close(); | |
356 | ||
357 | shared bool hasTimedOut; | |
358 | auto recvThread = new Thread({ | |
359 | auto sock = serverSock.accept(); | |
360 | ubyte[1] data; | |
361 | try { | |
362 | sock.read(data); | |
363 | } catch (TTransportException e) { | |
364 | if (e.type == TTransportException.Type.TIMED_OUT) { | |
365 | hasTimedOut = true; | |
366 | } else { | |
367 | import std.stdio; | |
368 | stderr.writeln(e); | |
369 | } | |
370 | } | |
371 | }); | |
372 | recvThread.isDaemon = true; | |
373 | recvThread.start(); | |
374 | ||
375 | // Wait for the timeout, with a little bit of spare time. | |
376 | Thread.sleep(timeout + dur!"msecs"(50)); | |
377 | enforce(hasTimedOut, | |
378 | "Client socket receive() blocked for longer than recvTimeout."); | |
379 | } | |
380 | } |