]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/d/src/thrift/server/transport/socket.d
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / jaegertracing / thrift / lib / d / src / thrift / server / transport / socket.d
CommitLineData
f67539c2
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19module thrift.server.transport.socket;
20
21import core.thread : dur, Duration, Thread;
22import core.stdc.string : strerror;
23import std.array : empty;
24import std.conv : text, to;
25import std.exception : enforce;
26import std.socket;
27import thrift.base;
28import thrift.internal.socket;
29import thrift.server.transport.base;
30import thrift.transport.base;
31import thrift.transport.socket;
32import thrift.util.awaitable;
33import thrift.util.cancellation;
34
35private alias TServerTransportException STE;
36
37/**
38 * Server socket implementation of TServerTransport.
39 *
40 * Maps to std.socket listen()/accept(); only provides TCP/IP sockets (i.e. no
41 * Unix sockets) for now, because they are not supported in std.socket.
42 */
43class TServerSocket : TServerTransport {
44 /**
45 * Constructs a new instance.
46 *
47 * Params:
48 * port = The TCP port to listen at (host is always 0.0.0.0).
49 * sendTimeout = The socket sending timeout.
50 * recvTimout = The socket receiving timeout.
51 */
52 this(ushort port, Duration sendTimeout = dur!"hnsecs"(0),
53 Duration recvTimeout = dur!"hnsecs"(0))
54 {
55 port_ = port;
56 sendTimeout_ = sendTimeout;
57 recvTimeout_ = recvTimeout;
58
59 cancellationNotifier_ = new TSocketNotifier;
60
61 socketSet_ = new SocketSet;
62 }
63
64 /// The port the server socket listens at.
65 ushort port() const @property {
66 return port_;
67 }
68
69 /// The socket sending timeout, zero to block infinitely.
70 void sendTimeout(Duration sendTimeout) @property {
71 sendTimeout_ = sendTimeout;
72 }
73
74 /// The socket receiving timeout, zero to block infinitely.
75 void recvTimeout(Duration recvTimeout) @property {
76 recvTimeout_ = recvTimeout;
77 }
78
79 /// The maximum number of listening retries if it fails.
80 void retryLimit(ushort retryLimit) @property {
81 retryLimit_ = retryLimit;
82 }
83
84 /// The delay between a listening attempt failing and retrying it.
85 void retryDelay(Duration retryDelay) @property {
86 retryDelay_ = retryDelay;
87 }
88
89 /// The size of the TCP send buffer, in bytes.
90 void tcpSendBuffer(int tcpSendBuffer) @property {
91 tcpSendBuffer_ = tcpSendBuffer;
92 }
93
94 /// The size of the TCP receiving buffer, in bytes.
95 void tcpRecvBuffer(int tcpRecvBuffer) @property {
96 tcpRecvBuffer_ = tcpRecvBuffer;
97 }
98
99 /// Whether to listen on IPv6 only, if IPv6 support is detected
100 /// (default: false).
101 void ipv6Only(bool value) @property {
102 ipv6Only_ = value;
103 }
104
105 override void listen() {
106 enforce(!isListening, new STE(STE.Type.ALREADY_LISTENING));
107
108 serverSocket_ = makeSocketAndListen(port_, ACCEPT_BACKLOG, retryLimit_,
109 retryDelay_, tcpSendBuffer_, tcpRecvBuffer_, ipv6Only_);
110 }
111
112 override void close() {
113 enforce(isListening, new STE(STE.Type.NOT_LISTENING));
114
115 serverSocket_.shutdown(SocketShutdown.BOTH);
116 serverSocket_.close();
117 serverSocket_ = null;
118 }
119
120 override bool isListening() @property {
121 return serverSocket_ !is null;
122 }
123
124 /// Number of connections listen() backlogs.
125 enum ACCEPT_BACKLOG = 1024;
126
127 override TTransport accept(TCancellation cancellation = null) {
128 enforce(isListening, new STE(STE.Type.NOT_LISTENING));
129
130 if (cancellation) cancellationNotifier_.attach(cancellation.triggering);
131 scope (exit) if (cancellation) cancellationNotifier_.detach();
132
133
134 // Too many EINTRs is a fault condition and would need to be handled
135 // manually by our caller, but we can tolerate a certain number.
136 enum MAX_EINTRS = 10;
137 uint numEintrs;
138
139 while (true) {
140 socketSet_.reset();
141 socketSet_.add(serverSocket_);
142 socketSet_.add(cancellationNotifier_.socket);
143
144 auto ret = Socket.select(socketSet_, null, null);
145 enforce(ret != 0, new STE("Socket.select() returned 0.",
146 STE.Type.RESOURCE_FAILED));
147
148 if (ret < 0) {
149 // Select itself failed, check if it was just due to an interrupted
150 // syscall.
151 if (getSocketErrno() == INTERRUPTED_ERRNO) {
152 if (numEintrs++ < MAX_EINTRS) {
153 continue;
154 } else {
155 throw new STE("Socket.select() was interrupted by a signal (EINTR) " ~
156 "more than " ~ to!string(MAX_EINTRS) ~ " times.",
157 STE.Type.RESOURCE_FAILED
158 );
159 }
160 }
161 throw new STE("Unknown error on Socket.select(): " ~
162 socketErrnoString(getSocketErrno()), STE.Type.RESOURCE_FAILED);
163 } else {
164 // Check for a ping on the interrupt socket.
165 if (socketSet_.isSet(cancellationNotifier_.socket)) {
166 cancellation.throwIfTriggered();
167 }
168
169 // Check for the actual server socket having a connection waiting.
170 if (socketSet_.isSet(serverSocket_)) {
171 break;
172 }
173 }
174 }
175
176 try {
177 auto client = createTSocket(serverSocket_.accept());
178 client.sendTimeout = sendTimeout_;
179 client.recvTimeout = recvTimeout_;
180 return client;
181 } catch (SocketException e) {
182 throw new STE("Unknown error on accepting: " ~ to!string(e),
183 STE.Type.RESOURCE_FAILED);
184 }
185 }
186
187protected:
188 /**
189 * Allows derived classes to create a different TSocket type.
190 */
191 TSocket createTSocket(Socket socket) {
192 return new TSocket(socket);
193 }
194
195private:
196 ushort port_;
197 Duration sendTimeout_;
198 Duration recvTimeout_;
199 ushort retryLimit_;
200 Duration retryDelay_;
201 uint tcpSendBuffer_;
202 uint tcpRecvBuffer_;
203 bool ipv6Only_;
204
205 Socket serverSocket_;
206 TSocketNotifier cancellationNotifier_;
207
208 // Keep socket set between accept() calls to avoid reallocating.
209 SocketSet socketSet_;
210}
211
212Socket makeSocketAndListen(ushort port, int backlog, ushort retryLimit,
213 Duration retryDelay, uint tcpSendBuffer = 0, uint tcpRecvBuffer = 0,
214 bool ipv6Only = false
215) {
216 Address localAddr;
217 try {
218 // null represents the wildcard address.
219 auto addrInfos = getAddressInfo(null, to!string(port),
220 AddressInfoFlags.PASSIVE, SocketType.STREAM, ProtocolType.TCP);
221 foreach (i, ai; addrInfos) {
222 // Prefer to bind to IPv6 addresses, because then IPv4 is listened to as
223 // well, but not the other way round.
224 if (ai.family == AddressFamily.INET6 || i == (addrInfos.length - 1)) {
225 localAddr = ai.address;
226 break;
227 }
228 }
229 } catch (Exception e) {
230 throw new STE("Could not determine local address to listen on.",
231 STE.Type.RESOURCE_FAILED, __FILE__, __LINE__, e);
232 }
233
234 Socket socket;
235 try {
236 socket = new Socket(localAddr.addressFamily, SocketType.STREAM,
237 ProtocolType.TCP);
238 } catch (SocketException e) {
239 throw new STE("Could not create accepting socket: " ~ to!string(e),
240 STE.Type.RESOURCE_FAILED);
241 }
242
243 try {
244 socket.setOption(SocketOptionLevel.IPV6, SocketOption.IPV6_V6ONLY, ipv6Only);
245 } catch (SocketException e) {
246 // This is somewhat expected on older systems (e.g. pre-Vista Windows),
247 // which do not support the IPV6_V6ONLY flag yet. Racy flag just to avoid
248 // log spew in unit tests.
249 shared static warned = false;
250 if (!warned) {
251 logError("Could not set IPV6_V6ONLY socket option: %s", e);
252 warned = true;
253 }
254 }
255
256 alias SocketOptionLevel.SOCKET lvlSock;
257
258 // Prevent 2 maximum segement lifetime delay on accept.
259 try {
260 socket.setOption(lvlSock, SocketOption.REUSEADDR, true);
261 } catch (SocketException e) {
262 throw new STE("Could not set REUSEADDR socket option: " ~ to!string(e),
263 STE.Type.RESOURCE_FAILED);
264 }
265
266 // Set TCP buffer sizes.
267 if (tcpSendBuffer > 0) {
268 try {
269 socket.setOption(lvlSock, SocketOption.SNDBUF, tcpSendBuffer);
270 } catch (SocketException e) {
271 throw new STE("Could not set socket send buffer size: " ~ to!string(e),
272 STE.Type.RESOURCE_FAILED);
273 }
274 }
275
276 if (tcpRecvBuffer > 0) {
277 try {
278 socket.setOption(lvlSock, SocketOption.RCVBUF, tcpRecvBuffer);
279 } catch (SocketException e) {
280 throw new STE("Could not set receive send buffer size: " ~ to!string(e),
281 STE.Type.RESOURCE_FAILED);
282 }
283 }
284
285 // Turn linger off to avoid blocking on socket close.
286 try {
287 Linger l;
288 l.on = 0;
289 l.time = 0;
290 socket.setOption(lvlSock, SocketOption.LINGER, l);
291 } catch (SocketException e) {
292 throw new STE("Could not disable socket linger: " ~ to!string(e),
293 STE.Type.RESOURCE_FAILED);
294 }
295
296 // Set TCP_NODELAY.
297 try {
298 socket.setOption(SocketOptionLevel.TCP, SocketOption.TCP_NODELAY, true);
299 } catch (SocketException e) {
300 throw new STE("Could not disable Nagle's algorithm: " ~ to!string(e),
301 STE.Type.RESOURCE_FAILED);
302 }
303
304 ushort retries;
305 while (true) {
306 try {
307 socket.bind(localAddr);
308 break;
309 } catch (SocketException) {}
310
311 // If bind() worked, we breaked outside the loop above.
312 retries++;
313 if (retries < retryLimit) {
314 Thread.sleep(retryDelay);
315 } else {
316 throw new STE(text("Could not bind to address: ", localAddr),
317 STE.Type.RESOURCE_FAILED);
318 }
319 }
320
321 socket.listen(backlog);
322 return socket;
323}
324
325unittest {
326 // Test interrupt().
327 {
328 auto sock = new TServerSocket(0);
329 sock.listen();
330 scope (exit) sock.close();
331
332 auto cancellation = new TCancellationOrigin;
333
334 auto intThread = new Thread({
335 // Sleep for a bit until the socket is accepting.
336 Thread.sleep(dur!"msecs"(50));
337 cancellation.trigger();
338 });
339 intThread.start();
340
341 import std.exception;
342 assertThrown!TCancelledException(sock.accept(cancellation));
343 }
344
345 // Test receive() timeout on accepted client sockets.
346 {
347 immutable port = 11122;
348 auto timeout = dur!"msecs"(500);
349 auto serverSock = new TServerSocket(port, timeout, timeout);
350 serverSock.listen();
351 scope (exit) serverSock.close();
352
353 auto clientSock = new TSocket("127.0.0.1", port);
354 clientSock.open();
355 scope (exit) clientSock.close();
356
357 shared bool hasTimedOut;
358 auto recvThread = new Thread({
359 auto sock = serverSock.accept();
360 ubyte[1] data;
361 try {
362 sock.read(data);
363 } catch (TTransportException e) {
364 if (e.type == TTransportException.Type.TIMED_OUT) {
365 hasTimedOut = true;
366 } else {
367 import std.stdio;
368 stderr.writeln(e);
369 }
370 }
371 });
372 recvThread.isDaemon = true;
373 recvThread.start();
374
375 // Wait for the timeout, with a little bit of spare time.
376 Thread.sleep(timeout + dur!"msecs"(50));
377 enforce(hasTimedOut,
378 "Client socket receive() blocked for longer than recvTimeout.");
379 }
380}