]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/d/src/thrift/async/socket.d
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / d / src / thrift / async / socket.d
CommitLineData
f67539c2
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19module thrift.async.socket;
20
21import core.stdc.errno: ECONNRESET;
22import core.thread : Fiber;
23import core.time : dur, Duration;
24import std.array : empty;
25import std.conv : to;
26import std.exception : enforce;
27import std.socket;
28import thrift.base;
29import thrift.async.base;
30import thrift.transport.base;
31import thrift.transport.socket : TSocketBase;
32import thrift.internal.endian;
33import thrift.internal.socket;
34
35version (Windows) {
36 import std.c.windows.winsock : connect;
37} else version (Posix) {
38 import core.sys.posix.sys.socket : connect;
39} else static assert(0, "Don't know connect on this platform.");
40
41/**
42 * Non-blocking socket implementation of the TTransport interface.
43 *
44 * Whenever a socket operation would block, TAsyncSocket registers a callback
45 * with the specified TAsyncSocketManager and yields.
46 *
47 * As for thrift.transport.socket, due to the limitations of std.socket,
48 * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are
49 * not).
50 */
51class TAsyncSocket : TSocketBase, TAsyncTransport {
52 /**
53 * Constructor that takes an already created, connected (!) socket.
54 *
55 * Params:
56 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
57 * socket = Already created, connected socket object. Will be switched to
58 * non-blocking mode if it isn't already.
59 */
60 this(TAsyncSocketManager asyncManager, Socket socket) {
61 asyncManager_ = asyncManager;
62 socket.blocking = false;
63 super(socket);
64 }
65
66 /**
67 * Creates a new unconnected socket that will connect to the given host
68 * on the given port.
69 *
70 * Params:
71 * asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
72 * host = Remote host.
73 * port = Remote port.
74 */
75 this(TAsyncSocketManager asyncManager, string host, ushort port) {
76 asyncManager_ = asyncManager;
77 super(host, port);
78 }
79
80 override TAsyncManager asyncManager() @property {
81 return asyncManager_;
82 }
83
84 /**
85 * Asynchronously connects the socket.
86 *
87 * Completes without blocking and defers further operations on the socket
88 * until the connection is established. If connecting fails, this is
89 * currently not indicated in any way other than every call to read/write
90 * failing.
91 */
92 override void open() {
93 if (isOpen) return;
94
95 enforce(!host_.empty, new TTransportException(
96 "Cannot open null host.", TTransportException.Type.NOT_OPEN));
97 enforce(port_ != 0, new TTransportException(
98 "Cannot open with null port.", TTransportException.Type.NOT_OPEN));
99
100
101 // Cannot use std.socket.Socket.connect here because it hides away
102 // EINPROGRESS/WSAWOULDBLOCK.
103 Address addr;
104 try {
105 // Currently, we just go with the first address returned, could be made
106 // more intelligent though – IPv6?
107 addr = getAddress(host_, port_)[0];
108 } catch (Exception e) {
109 throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`,
110 TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
111 }
112
113 socket_ = new TcpSocket(addr.addressFamily);
114 socket_.blocking = false;
115 setSocketOpts();
116
117 auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen());
118 if (errorCode == 0) {
119 // If the connection could be established immediately, just return. I
120 // don't know if this ever happens.
121 return;
122 }
123
124 auto errno = getSocketErrno();
125 if (errno != CONNECT_INPROGRESS_ERRNO) {
126 throw new TTransportException(`Could not establish connection to "` ~
127 host_ ~ `": ` ~ socketErrnoString(errno),
128 TTransportException.Type.NOT_OPEN);
129 }
130
131 // This is the expected case: connect() signalled that the connection
132 // is being established in the background. Queue up a work item with the
133 // async manager which just defers any other operations on this
134 // TAsyncSocket instance until the socket is ready.
135 asyncManager_.execute(this,
136 {
137 auto fiber = Fiber.getThis();
138 TAsyncEventReason reason = void;
139 asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE,
140 connectTimeout,
141 scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); })
142 );
143 Fiber.yield();
144
145 if (reason == TAsyncEventReason.TIMED_OUT) {
146 // Close the connection, so that subsequent work items fail immediately.
147 closeImmediately();
148 return;
149 }
150
151 int errorCode = void;
152 socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR,
153 errorCode);
154
155 if (errorCode) {
156 logInfo("Could not connect TAsyncSocket: %s",
157 socketErrnoString(errorCode));
158
159 // Close the connection, so that subsequent work items fail immediately.
160 closeImmediately();
161 return;
162 }
163
164 }
165 );
166 }
167
168 /**
169 * Closes the socket.
170 *
171 * Will block until all currently active operations are finished before the
172 * socket is closed.
173 */
174 override void close() {
175 if (!isOpen) return;
176
177 import core.sync.condition;
178 import core.sync.mutex;
179
180 auto doneMutex = new Mutex;
181 auto doneCond = new Condition(doneMutex);
182 synchronized (doneMutex) {
183 asyncManager_.execute(this,
184 scopedDelegate(
185 {
186 closeImmediately();
187 synchronized (doneMutex) doneCond.notifyAll();
188 }
189 )
190 );
191 doneCond.wait();
192 }
193 }
194
195 override bool peek() {
196 if (!isOpen) return false;
197
198 ubyte buf;
199 auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK);
200 if (r == Socket.ERROR) {
201 auto lastErrno = getSocketErrno();
202 static if (connresetOnPeerShutdown) {
203 if (lastErrno == ECONNRESET) {
204 closeImmediately();
205 return false;
206 }
207 }
208 throw new TTransportException("Peeking into socket failed: " ~
209 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
210 }
211 return (r > 0);
212 }
213
214 override size_t read(ubyte[] buf) {
215 enforce(isOpen, new TTransportException(
216 "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
217
218 typeof(getSocketErrno()) lastErrno;
219
220 auto r = yieldOnBlock(socket_.receive(cast(void[])buf),
221 TAsyncEventType.READ);
222
223 // If recv went fine, immediately return.
224 if (r >= 0) return r;
225
226 // Something went wrong, find out how to handle it.
227 lastErrno = getSocketErrno();
228
229 static if (connresetOnPeerShutdown) {
230 // See top comment.
231 if (lastErrno == ECONNRESET) {
232 return 0;
233 }
234 }
235
236 throw new TTransportException("Receiving from socket failed: " ~
237 socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
238 }
239
240 override void write(in ubyte[] buf) {
241 size_t sent;
242 while (sent < buf.length) {
243 sent += writeSome(buf[sent .. $]);
244 }
245 assert(sent == buf.length);
246 }
247
248 override size_t writeSome(in ubyte[] buf) {
249 enforce(isOpen, new TTransportException(
250 "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN));
251
252 auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE);
253
254 // Everything went well, just return the number of bytes written.
255 if (r > 0) return r;
256
257 // Handle error conditions.
258 if (r < 0) {
259 auto lastErrno = getSocketErrno();
260
261 auto type = TTransportException.Type.UNKNOWN;
262 if (isSocketCloseErrno(lastErrno)) {
263 type = TTransportException.Type.NOT_OPEN;
264 closeImmediately();
265 }
266
267 throw new TTransportException("Sending to socket failed: " ~
268 socketErrnoString(lastErrno), type);
269 }
270
271 // send() should never return 0.
272 throw new TTransportException("Sending to socket failed (0 bytes written).",
273 TTransportException.Type.UNKNOWN);
274 }
275
276 /// The amount of time in which a conncetion must be established before the
277 /// open() call times out.
278 Duration connectTimeout = dur!"seconds"(5);
279
280private:
281 void closeImmediately() {
282 socket_.close();
283 socket_ = null;
284 }
285
286 T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) {
287 while (true) {
288 auto result = call();
289 if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result;
290
291 // We got an EAGAIN result, register a callback to return here once some
292 // event happens and yield.
293
294 Duration timeout = void;
295 final switch (eventType) {
296 case TAsyncEventType.READ:
297 timeout = recvTimeout_;
298 break;
299 case TAsyncEventType.WRITE:
300 timeout = sendTimeout_;
301 break;
302 }
303
304 auto fiber = Fiber.getThis();
305 assert(fiber, "Current fiber null – not running in TAsyncManager?");
306 TAsyncEventReason eventReason = void;
307 asyncManager_.addOneshotListener(socket_, eventType, timeout,
308 scopedDelegate((TAsyncEventReason reason) {
309 eventReason = reason;
310 fiber.call();
311 })
312 );
313
314 // Yields execution back to the async manager, will return back here once
315 // the above listener is called.
316 Fiber.yield();
317
318 if (eventReason == TAsyncEventReason.TIMED_OUT) {
319 // If we are cancelling the request due to a timed out operation, the
320 // connection is in an undefined state, because the server could decide
321 // to send the requested data later, or we could have already been half-
322 // way into writing a request. Thus, we close the connection to make any
323 // possibly queued up work items fail immediately. Besides, the server
324 // is not very likely to immediately recover after a socket-level
325 // timeout has expired anyway.
326 closeImmediately();
327
328 throw new TTransportException("Timed out while waiting for socket " ~
329 "to get ready to " ~ to!string(eventType) ~ ".",
330 TTransportException.Type.TIMED_OUT);
331 }
332 }
333 }
334
335 /// The TAsyncSocketManager to use for non-blocking I/O.
336 TAsyncSocketManager asyncManager_;
337}
338
339private {
340 // std.socket doesn't include SO_ERROR for reasons unknown.
341 version (linux) {
342 enum SO_ERROR = 4;
343 } else version (OSX) {
344 enum SO_ERROR = 0x1007;
345 } else version (FreeBSD) {
346 enum SO_ERROR = 0x1007;
347 } else version (Win32) {
348 import std.c.windows.winsock : SO_ERROR;
349 } else static assert(false, "Don't know SO_ERROR on this platform.");
350
351 // This hack forces a delegate literal to be scoped, even if it is passed to
352 // a function accepting normal delegates as well. DMD likes to allocate the
353 // context on the heap anyway, but it seems to work for LDC.
354 import std.traits : isDelegate;
355 auto scopedDelegate(D)(scope D d) if (isDelegate!D) {
356 return d;
357 }
358}