]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | module thrift.async.socket; | |
20 | ||
21 | import core.stdc.errno: ECONNRESET; | |
22 | import core.thread : Fiber; | |
23 | import core.time : dur, Duration; | |
24 | import std.array : empty; | |
25 | import std.conv : to; | |
26 | import std.exception : enforce; | |
27 | import std.socket; | |
28 | import thrift.base; | |
29 | import thrift.async.base; | |
30 | import thrift.transport.base; | |
31 | import thrift.transport.socket : TSocketBase; | |
32 | import thrift.internal.endian; | |
33 | import thrift.internal.socket; | |
34 | ||
35 | version (Windows) { | |
36 | import std.c.windows.winsock : connect; | |
37 | } else version (Posix) { | |
38 | import core.sys.posix.sys.socket : connect; | |
39 | } else static assert(0, "Don't know connect on this platform."); | |
40 | ||
41 | /** | |
42 | * Non-blocking socket implementation of the TTransport interface. | |
43 | * | |
44 | * Whenever a socket operation would block, TAsyncSocket registers a callback | |
45 | * with the specified TAsyncSocketManager and yields. | |
46 | * | |
47 | * As for thrift.transport.socket, due to the limitations of std.socket, | |
48 | * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are | |
49 | * not). | |
50 | */ | |
51 | class TAsyncSocket : TSocketBase, TAsyncTransport { | |
52 | /** | |
53 | * Constructor that takes an already created, connected (!) socket. | |
54 | * | |
55 | * Params: | |
56 | * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. | |
57 | * socket = Already created, connected socket object. Will be switched to | |
58 | * non-blocking mode if it isn't already. | |
59 | */ | |
60 | this(TAsyncSocketManager asyncManager, Socket socket) { | |
61 | asyncManager_ = asyncManager; | |
62 | socket.blocking = false; | |
63 | super(socket); | |
64 | } | |
65 | ||
66 | /** | |
67 | * Creates a new unconnected socket that will connect to the given host | |
68 | * on the given port. | |
69 | * | |
70 | * Params: | |
71 | * asyncManager = The TAsyncSocketManager to use for non-blocking I/O. | |
72 | * host = Remote host. | |
73 | * port = Remote port. | |
74 | */ | |
75 | this(TAsyncSocketManager asyncManager, string host, ushort port) { | |
76 | asyncManager_ = asyncManager; | |
77 | super(host, port); | |
78 | } | |
79 | ||
80 | override TAsyncManager asyncManager() @property { | |
81 | return asyncManager_; | |
82 | } | |
83 | ||
84 | /** | |
85 | * Asynchronously connects the socket. | |
86 | * | |
87 | * Completes without blocking and defers further operations on the socket | |
88 | * until the connection is established. If connecting fails, this is | |
89 | * currently not indicated in any way other than every call to read/write | |
90 | * failing. | |
91 | */ | |
92 | override void open() { | |
93 | if (isOpen) return; | |
94 | ||
95 | enforce(!host_.empty, new TTransportException( | |
96 | "Cannot open null host.", TTransportException.Type.NOT_OPEN)); | |
97 | enforce(port_ != 0, new TTransportException( | |
98 | "Cannot open with null port.", TTransportException.Type.NOT_OPEN)); | |
99 | ||
100 | ||
101 | // Cannot use std.socket.Socket.connect here because it hides away | |
102 | // EINPROGRESS/WSAWOULDBLOCK. | |
103 | Address addr; | |
104 | try { | |
105 | // Currently, we just go with the first address returned, could be made | |
106 | // more intelligent though – IPv6? | |
107 | addr = getAddress(host_, port_)[0]; | |
108 | } catch (Exception e) { | |
109 | throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`, | |
110 | TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e); | |
111 | } | |
112 | ||
113 | socket_ = new TcpSocket(addr.addressFamily); | |
114 | socket_.blocking = false; | |
115 | setSocketOpts(); | |
116 | ||
117 | auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen()); | |
118 | if (errorCode == 0) { | |
119 | // If the connection could be established immediately, just return. I | |
120 | // don't know if this ever happens. | |
121 | return; | |
122 | } | |
123 | ||
124 | auto errno = getSocketErrno(); | |
125 | if (errno != CONNECT_INPROGRESS_ERRNO) { | |
126 | throw new TTransportException(`Could not establish connection to "` ~ | |
127 | host_ ~ `": ` ~ socketErrnoString(errno), | |
128 | TTransportException.Type.NOT_OPEN); | |
129 | } | |
130 | ||
131 | // This is the expected case: connect() signalled that the connection | |
132 | // is being established in the background. Queue up a work item with the | |
133 | // async manager which just defers any other operations on this | |
134 | // TAsyncSocket instance until the socket is ready. | |
135 | asyncManager_.execute(this, | |
136 | { | |
137 | auto fiber = Fiber.getThis(); | |
138 | TAsyncEventReason reason = void; | |
139 | asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE, | |
140 | connectTimeout, | |
141 | scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); }) | |
142 | ); | |
143 | Fiber.yield(); | |
144 | ||
145 | if (reason == TAsyncEventReason.TIMED_OUT) { | |
146 | // Close the connection, so that subsequent work items fail immediately. | |
147 | closeImmediately(); | |
148 | return; | |
149 | } | |
150 | ||
151 | int errorCode = void; | |
152 | socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR, | |
153 | errorCode); | |
154 | ||
155 | if (errorCode) { | |
156 | logInfo("Could not connect TAsyncSocket: %s", | |
157 | socketErrnoString(errorCode)); | |
158 | ||
159 | // Close the connection, so that subsequent work items fail immediately. | |
160 | closeImmediately(); | |
161 | return; | |
162 | } | |
163 | ||
164 | } | |
165 | ); | |
166 | } | |
167 | ||
168 | /** | |
169 | * Closes the socket. | |
170 | * | |
171 | * Will block until all currently active operations are finished before the | |
172 | * socket is closed. | |
173 | */ | |
174 | override void close() { | |
175 | if (!isOpen) return; | |
176 | ||
177 | import core.sync.condition; | |
178 | import core.sync.mutex; | |
179 | ||
180 | auto doneMutex = new Mutex; | |
181 | auto doneCond = new Condition(doneMutex); | |
182 | synchronized (doneMutex) { | |
183 | asyncManager_.execute(this, | |
184 | scopedDelegate( | |
185 | { | |
186 | closeImmediately(); | |
187 | synchronized (doneMutex) doneCond.notifyAll(); | |
188 | } | |
189 | ) | |
190 | ); | |
191 | doneCond.wait(); | |
192 | } | |
193 | } | |
194 | ||
195 | override bool peek() { | |
196 | if (!isOpen) return false; | |
197 | ||
198 | ubyte buf; | |
199 | auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK); | |
200 | if (r == Socket.ERROR) { | |
201 | auto lastErrno = getSocketErrno(); | |
202 | static if (connresetOnPeerShutdown) { | |
203 | if (lastErrno == ECONNRESET) { | |
204 | closeImmediately(); | |
205 | return false; | |
206 | } | |
207 | } | |
208 | throw new TTransportException("Peeking into socket failed: " ~ | |
209 | socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); | |
210 | } | |
211 | return (r > 0); | |
212 | } | |
213 | ||
214 | override size_t read(ubyte[] buf) { | |
215 | enforce(isOpen, new TTransportException( | |
216 | "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN)); | |
217 | ||
218 | typeof(getSocketErrno()) lastErrno; | |
219 | ||
220 | auto r = yieldOnBlock(socket_.receive(cast(void[])buf), | |
221 | TAsyncEventType.READ); | |
222 | ||
223 | // If recv went fine, immediately return. | |
224 | if (r >= 0) return r; | |
225 | ||
226 | // Something went wrong, find out how to handle it. | |
227 | lastErrno = getSocketErrno(); | |
228 | ||
229 | static if (connresetOnPeerShutdown) { | |
230 | // See top comment. | |
231 | if (lastErrno == ECONNRESET) { | |
232 | return 0; | |
233 | } | |
234 | } | |
235 | ||
236 | throw new TTransportException("Receiving from socket failed: " ~ | |
237 | socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN); | |
238 | } | |
239 | ||
240 | override void write(in ubyte[] buf) { | |
241 | size_t sent; | |
242 | while (sent < buf.length) { | |
243 | sent += writeSome(buf[sent .. $]); | |
244 | } | |
245 | assert(sent == buf.length); | |
246 | } | |
247 | ||
248 | override size_t writeSome(in ubyte[] buf) { | |
249 | enforce(isOpen, new TTransportException( | |
250 | "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN)); | |
251 | ||
252 | auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE); | |
253 | ||
254 | // Everything went well, just return the number of bytes written. | |
255 | if (r > 0) return r; | |
256 | ||
257 | // Handle error conditions. | |
258 | if (r < 0) { | |
259 | auto lastErrno = getSocketErrno(); | |
260 | ||
261 | auto type = TTransportException.Type.UNKNOWN; | |
262 | if (isSocketCloseErrno(lastErrno)) { | |
263 | type = TTransportException.Type.NOT_OPEN; | |
264 | closeImmediately(); | |
265 | } | |
266 | ||
267 | throw new TTransportException("Sending to socket failed: " ~ | |
268 | socketErrnoString(lastErrno), type); | |
269 | } | |
270 | ||
271 | // send() should never return 0. | |
272 | throw new TTransportException("Sending to socket failed (0 bytes written).", | |
273 | TTransportException.Type.UNKNOWN); | |
274 | } | |
275 | ||
276 | /// The amount of time in which a conncetion must be established before the | |
277 | /// open() call times out. | |
278 | Duration connectTimeout = dur!"seconds"(5); | |
279 | ||
280 | private: | |
281 | void closeImmediately() { | |
282 | socket_.close(); | |
283 | socket_ = null; | |
284 | } | |
285 | ||
286 | T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) { | |
287 | while (true) { | |
288 | auto result = call(); | |
289 | if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result; | |
290 | ||
291 | // We got an EAGAIN result, register a callback to return here once some | |
292 | // event happens and yield. | |
293 | ||
294 | Duration timeout = void; | |
295 | final switch (eventType) { | |
296 | case TAsyncEventType.READ: | |
297 | timeout = recvTimeout_; | |
298 | break; | |
299 | case TAsyncEventType.WRITE: | |
300 | timeout = sendTimeout_; | |
301 | break; | |
302 | } | |
303 | ||
304 | auto fiber = Fiber.getThis(); | |
305 | assert(fiber, "Current fiber null – not running in TAsyncManager?"); | |
306 | TAsyncEventReason eventReason = void; | |
307 | asyncManager_.addOneshotListener(socket_, eventType, timeout, | |
308 | scopedDelegate((TAsyncEventReason reason) { | |
309 | eventReason = reason; | |
310 | fiber.call(); | |
311 | }) | |
312 | ); | |
313 | ||
314 | // Yields execution back to the async manager, will return back here once | |
315 | // the above listener is called. | |
316 | Fiber.yield(); | |
317 | ||
318 | if (eventReason == TAsyncEventReason.TIMED_OUT) { | |
319 | // If we are cancelling the request due to a timed out operation, the | |
320 | // connection is in an undefined state, because the server could decide | |
321 | // to send the requested data later, or we could have already been half- | |
322 | // way into writing a request. Thus, we close the connection to make any | |
323 | // possibly queued up work items fail immediately. Besides, the server | |
324 | // is not very likely to immediately recover after a socket-level | |
325 | // timeout has expired anyway. | |
326 | closeImmediately(); | |
327 | ||
328 | throw new TTransportException("Timed out while waiting for socket " ~ | |
329 | "to get ready to " ~ to!string(eventType) ~ ".", | |
330 | TTransportException.Type.TIMED_OUT); | |
331 | } | |
332 | } | |
333 | } | |
334 | ||
335 | /// The TAsyncSocketManager to use for non-blocking I/O. | |
336 | TAsyncSocketManager asyncManager_; | |
337 | } | |
338 | ||
339 | private { | |
340 | // std.socket doesn't include SO_ERROR for reasons unknown. | |
341 | version (linux) { | |
342 | enum SO_ERROR = 4; | |
343 | } else version (OSX) { | |
344 | enum SO_ERROR = 0x1007; | |
345 | } else version (FreeBSD) { | |
346 | enum SO_ERROR = 0x1007; | |
347 | } else version (Win32) { | |
348 | import std.c.windows.winsock : SO_ERROR; | |
349 | } else static assert(false, "Don't know SO_ERROR on this platform."); | |
350 | ||
351 | // This hack forces a delegate literal to be scoped, even if it is passed to | |
352 | // a function accepting normal delegates as well. DMD likes to allocate the | |
353 | // context on the heap anyway, but it seems to work for LDC. | |
354 | import std.traits : isDelegate; | |
355 | auto scopedDelegate(D)(scope D d) if (isDelegate!D) { | |
356 | return d; | |
357 | } | |
358 | } |