]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | module thrift.async.libevent; | |
20 | ||
21 | import core.atomic; | |
22 | import core.time : Duration, dur; | |
23 | import core.exception : onOutOfMemoryError; | |
24 | import core.memory : GC; | |
25 | import core.thread : Fiber, Thread; | |
26 | import core.sync.condition; | |
27 | import core.sync.mutex; | |
28 | import core.stdc.stdlib : free, malloc; | |
29 | import deimos.event2.event; | |
30 | import std.array : empty, front, popFront; | |
31 | import std.conv : text, to; | |
32 | import std.exception : enforce; | |
33 | import std.socket : Socket, socketPair; | |
34 | import thrift.base; | |
35 | import thrift.async.base; | |
36 | import thrift.internal.socket; | |
37 | import thrift.internal.traits; | |
38 | import thrift.util.cancellation; | |
39 | ||
40 | // To avoid DMD @@BUG6395@@. | |
41 | import thrift.internal.algorithm; | |
42 | ||
43 | /** | |
44 | * A TAsyncManager implementation based on libevent. | |
45 | * | |
46 | * The libevent loop for handling non-blocking sockets is run in a background | |
47 | * thread, which is lazily spawned. The thread is not daemonized to avoid | |
48 | * crashes on program shutdown, it is only stopped when the manager instance | |
49 | * is destroyed. So, to ensure a clean program teardown, either make sure this | |
50 | * instance gets destroyed (e.g. by using scope), or manually call stop() at | |
51 | * the end. | |
52 | */ | |
53 | class TLibeventAsyncManager : TAsyncSocketManager { | |
54 | this() { | |
55 | eventBase_ = event_base_new(); | |
56 | ||
57 | // Set up the socket pair for transferring control messages to the event | |
58 | // loop. | |
59 | auto pair = socketPair(); | |
60 | controlSendSocket_ = pair[0]; | |
61 | controlReceiveSocket_ = pair[1]; | |
62 | controlReceiveSocket_.blocking = false; | |
63 | ||
64 | // Register an event for receiving control messages. | |
65 | controlReceiveEvent_ = event_new(eventBase_, controlReceiveSocket_.handle, | |
66 | EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&controlMsgReceiveCallback), | |
67 | cast(void*)this); | |
68 | event_add(controlReceiveEvent_, null); | |
69 | ||
70 | queuedCountMutex_ = new Mutex; | |
71 | zeroQueuedCondition_ = new Condition(queuedCountMutex_); | |
72 | } | |
73 | ||
74 | ~this() { | |
75 | // stop() should be safe to call, because either we don't have a worker | |
76 | // thread running and it is a no-op anyway, or it is guaranteed to be | |
77 | // still running (blocked in event_base_loop), and thus guaranteed not to | |
78 | // be garbage collected yet. | |
79 | stop(dur!"hnsecs"(0)); | |
80 | ||
81 | event_free(controlReceiveEvent_); | |
82 | event_base_free(eventBase_); | |
83 | eventBase_ = null; | |
84 | } | |
85 | ||
86 | override void execute(TAsyncTransport transport, Work work, | |
87 | TCancellation cancellation = null | |
88 | ) { | |
89 | if (cancellation && cancellation.triggered) return; | |
90 | ||
91 | // Keep track that there is a new work item to be processed. | |
92 | incrementQueuedCount(); | |
93 | ||
94 | ensureWorkerThreadRunning(); | |
95 | ||
96 | // We should be able to send the control message as a whole – we currently | |
97 | // assume to be able to receive it at once as well. If this proves to be | |
98 | // unstable (e.g. send could possibly return early if the receiving buffer | |
99 | // is full and the blocking call gets interrupted by a signal), it could | |
100 | // be changed to a more sophisticated scheme. | |
101 | ||
102 | // Make sure the delegate context doesn't get GCd while the work item is | |
103 | // on the wire. | |
104 | GC.addRoot(work.ptr); | |
105 | ||
106 | // Send work message. | |
107 | sendControlMsg(ControlMsg(MsgType.WORK, work, transport)); | |
108 | ||
109 | if (cancellation) { | |
110 | cancellation.triggering.addCallback({ | |
111 | sendControlMsg(ControlMsg(MsgType.CANCEL, work, transport)); | |
112 | }); | |
113 | } | |
114 | } | |
115 | ||
116 | override void delay(Duration duration, void delegate() work) { | |
117 | incrementQueuedCount(); | |
118 | ||
119 | ensureWorkerThreadRunning(); | |
120 | ||
121 | const tv = toTimeval(duration); | |
122 | ||
123 | // DMD @@BUG@@: Cannot deduce T to void delegate() here. | |
124 | registerOneshotEvent!(void delegate())( | |
125 | -1, 0, assumeNothrow(&delayCallback), &tv, | |
126 | { | |
127 | work(); | |
128 | decrementQueuedCount(); | |
129 | } | |
130 | ); | |
131 | } | |
132 | ||
133 | override bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)) { | |
134 | bool cleanExit = true; | |
135 | ||
136 | synchronized (this) { | |
137 | if (workerThread_) { | |
138 | synchronized (queuedCountMutex_) { | |
139 | if (waitFinishTimeout > dur!"hnsecs"(0)) { | |
140 | if (queuedCount_ > 0) { | |
141 | zeroQueuedCondition_.wait(waitFinishTimeout); | |
142 | } | |
143 | } else if (waitFinishTimeout < dur!"hnsecs"(0)) { | |
144 | while (queuedCount_ > 0) zeroQueuedCondition_.wait(); | |
145 | } else { | |
146 | // waitFinishTimeout is zero, immediately exit in all cases. | |
147 | } | |
148 | cleanExit = (queuedCount_ == 0); | |
149 | } | |
150 | ||
151 | event_base_loopbreak(eventBase_); | |
152 | sendControlMsg(ControlMsg(MsgType.SHUTDOWN)); | |
153 | workerThread_.join(); | |
154 | workQueues_ = null; | |
155 | // We have nuked all currently enqueued items, so set the count to | |
156 | // zero. This is safe to do without locking, since the worker thread | |
157 | // is down. | |
158 | queuedCount_ = 0; | |
159 | atomicStore(*(cast(shared)&workerThread_), cast(shared(Thread))null); | |
160 | } | |
161 | } | |
162 | ||
163 | return cleanExit; | |
164 | } | |
165 | ||
166 | override void addOneshotListener(Socket socket, TAsyncEventType eventType, | |
167 | TSocketEventListener listener | |
168 | ) { | |
169 | addOneshotListenerImpl(socket, eventType, null, listener); | |
170 | } | |
171 | ||
172 | override void addOneshotListener(Socket socket, TAsyncEventType eventType, | |
173 | Duration timeout, TSocketEventListener listener | |
174 | ) { | |
175 | if (timeout <= dur!"hnsecs"(0)) { | |
176 | addOneshotListenerImpl(socket, eventType, null, listener); | |
177 | } else { | |
178 | // This is not really documented well, but libevent does not require to | |
179 | // keep the timeval around after the event was added. | |
180 | auto tv = toTimeval(timeout); | |
181 | addOneshotListenerImpl(socket, eventType, &tv, listener); | |
182 | } | |
183 | } | |
184 | ||
185 | private: | |
186 | alias void delegate() Work; | |
187 | ||
188 | void addOneshotListenerImpl(Socket socket, TAsyncEventType eventType, | |
189 | const(timeval)* timeout, TSocketEventListener listener | |
190 | ) { | |
191 | registerOneshotEvent(socket.handle, libeventEventType(eventType), | |
192 | assumeNothrow(&socketCallback), timeout, listener); | |
193 | } | |
194 | ||
195 | void registerOneshotEvent(T)(evutil_socket_t fd, short type, | |
196 | event_callback_fn callback, const(timeval)* timeout, T payload | |
197 | ) { | |
198 | // Create a copy of the payload on the C heap. | |
199 | auto payloadMem = malloc(payload.sizeof); | |
200 | if (!payloadMem) onOutOfMemoryError(); | |
201 | (cast(T*)payloadMem)[0 .. 1] = payload; | |
202 | GC.addRange(payloadMem, payload.sizeof); | |
203 | ||
204 | auto result = event_base_once(eventBase_, fd, type, callback, | |
205 | payloadMem, timeout); | |
206 | ||
207 | // Assuming that we didn't get our arguments wrong above, the only other | |
208 | // situation in which event_base_once can fail is when it can't allocate | |
209 | // memory. | |
210 | if (result != 0) onOutOfMemoryError(); | |
211 | } | |
212 | ||
213 | enum MsgType : ubyte { | |
214 | SHUTDOWN, | |
215 | WORK, | |
216 | CANCEL | |
217 | } | |
218 | ||
219 | struct ControlMsg { | |
220 | MsgType type; | |
221 | Work work; | |
222 | TAsyncTransport transport; | |
223 | } | |
224 | ||
225 | /** | |
226 | * Starts the worker thread if it is not already running. | |
227 | */ | |
228 | void ensureWorkerThreadRunning() { | |
229 | // Technically, only half barriers would be required here, but adding the | |
230 | // argument seems to trigger a DMD template argument deduction @@BUG@@. | |
231 | if (!atomicLoad(*(cast(shared)&workerThread_))) { | |
232 | synchronized (this) { | |
233 | if (!workerThread_) { | |
234 | auto thread = new Thread({ event_base_loop(eventBase_, 0); }); | |
235 | thread.start(); | |
236 | atomicStore(*(cast(shared)&workerThread_), cast(shared)thread); | |
237 | } | |
238 | } | |
239 | } | |
240 | } | |
241 | ||
242 | /** | |
243 | * Sends a control message to the worker thread. | |
244 | */ | |
245 | void sendControlMsg(const(ControlMsg) msg) { | |
246 | auto result = controlSendSocket_.send((&msg)[0 .. 1]); | |
247 | enum size = msg.sizeof; | |
248 | enforce(result == size, new TException(text( | |
249 | "Sending control message of type ", msg.type, " failed (", result, | |
250 | " bytes instead of ", size, " transmitted)."))); | |
251 | } | |
252 | ||
253 | /** | |
254 | * Receives messages from the control message socket and acts on them. Called | |
255 | * from the worker thread. | |
256 | */ | |
257 | void receiveControlMsg() { | |
258 | // Read as many new work items off the socket as possible (at least one | |
259 | // should be available, as we got notified by libevent). | |
260 | ControlMsg msg; | |
261 | ptrdiff_t bytesRead; | |
262 | while (true) { | |
263 | bytesRead = controlReceiveSocket_.receive(cast(ubyte[])((&msg)[0 .. 1])); | |
264 | ||
265 | if (bytesRead < 0) { | |
266 | auto errno = getSocketErrno(); | |
267 | if (errno != WOULD_BLOCK_ERRNO) { | |
268 | logError("Reading control message, some work item will possibly " ~ | |
269 | "never be executed: %s", socketErrnoString(errno)); | |
270 | } | |
271 | } | |
272 | if (bytesRead != msg.sizeof) break; | |
273 | ||
274 | // Everything went fine, we received a new control message. | |
275 | final switch (msg.type) { | |
276 | case MsgType.SHUTDOWN: | |
277 | // The message was just intended to wake us up for shutdown. | |
278 | break; | |
279 | ||
280 | case MsgType.CANCEL: | |
281 | // When processing a cancellation, we must not touch the first item, | |
282 | // since it is already being processed. | |
283 | auto queue = workQueues_[msg.transport]; | |
284 | if (queue.length > 0) { | |
285 | workQueues_[msg.transport] = [queue[0]] ~ | |
286 | removeEqual(queue[1 .. $], msg.work); | |
287 | } | |
288 | break; | |
289 | ||
290 | case MsgType.WORK: | |
291 | // Now that the work item is back in the D world, we don't need the | |
292 | // extra GC root for the context pointer anymore (see execute()). | |
293 | GC.removeRoot(msg.work.ptr); | |
294 | ||
295 | // Add the work item to the queue and execute it. | |
296 | auto queue = msg.transport in workQueues_; | |
297 | if (queue is null || (*queue).empty) { | |
298 | // If the queue is empty, add the new work item to the queue as well, | |
299 | // but immediately start executing it. | |
300 | workQueues_[msg.transport] = [msg.work]; | |
301 | executeWork(msg.transport, msg.work); | |
302 | } else { | |
303 | (*queue) ~= msg.work; | |
304 | } | |
305 | break; | |
306 | } | |
307 | } | |
308 | ||
309 | // If the last read was successful, but didn't read enough bytes, we got | |
310 | // a problem. | |
311 | if (bytesRead > 0) { | |
312 | logError("Unexpected partial control message read (%s byte(s) " ~ | |
313 | "instead of %s), some work item will possibly never be executed.", | |
314 | bytesRead, msg.sizeof); | |
315 | } | |
316 | } | |
317 | ||
318 | /** | |
319 | * Executes the given work item and all others enqueued for the same | |
320 | * transport in a new fiber. Called from the worker thread. | |
321 | */ | |
322 | void executeWork(TAsyncTransport transport, Work work) { | |
323 | (new Fiber({ | |
324 | auto item = work; | |
325 | while (true) { | |
326 | try { | |
327 | // Execute the actual work. It will possibly add listeners to the | |
328 | // event loop and yield away if it has to wait for blocking | |
329 | // operations. It is quite possible that another fiber will modify | |
330 | // the work queue for the current transport. | |
331 | item(); | |
332 | } catch (Exception e) { | |
333 | // This should never happen, just to be sure the worker thread | |
334 | // doesn't stop working in mysterious ways because of an unhandled | |
335 | // exception. | |
336 | logError("Exception thrown by work item: %s", e); | |
337 | } | |
338 | ||
339 | // Remove the item from the work queue. | |
340 | // Note: Due to the value semantics of array slices, we have to | |
341 | // re-lookup this on every iteration. This could be solved, but I'd | |
342 | // rather replace this directly with a queue type once one becomes | |
343 | // available in Phobos. | |
344 | auto queue = workQueues_[transport]; | |
345 | assert(queue.front == item); | |
346 | queue.popFront(); | |
347 | workQueues_[transport] = queue; | |
348 | ||
349 | // Now that the work item is done, no longer count it as queued. | |
350 | decrementQueuedCount(); | |
351 | ||
352 | if (queue.empty) break; | |
353 | ||
354 | // If the queue is not empty, execute the next waiting item. | |
355 | item = queue.front; | |
356 | } | |
357 | })).call(); | |
358 | } | |
359 | ||
360 | /** | |
361 | * Increments the amount of queued items. | |
362 | */ | |
363 | void incrementQueuedCount() { | |
364 | synchronized (queuedCountMutex_) { | |
365 | ++queuedCount_; | |
366 | } | |
367 | } | |
368 | ||
369 | /** | |
370 | * Decrements the amount of queued items. | |
371 | */ | |
372 | void decrementQueuedCount() { | |
373 | synchronized (queuedCountMutex_) { | |
374 | assert(queuedCount_ > 0); | |
375 | --queuedCount_; | |
376 | if (queuedCount_ == 0) { | |
377 | zeroQueuedCondition_.notifyAll(); | |
378 | } | |
379 | } | |
380 | } | |
381 | ||
382 | static extern(C) void controlMsgReceiveCallback(evutil_socket_t, short, | |
383 | void *managerThis | |
384 | ) { | |
385 | (cast(TLibeventAsyncManager)managerThis).receiveControlMsg(); | |
386 | } | |
387 | ||
388 | static extern(C) void socketCallback(evutil_socket_t, short flags, | |
389 | void *arg | |
390 | ) { | |
391 | auto reason = (flags & EV_TIMEOUT) ? TAsyncEventReason.TIMED_OUT : | |
392 | TAsyncEventReason.NORMAL; | |
393 | (*(cast(TSocketEventListener*)arg))(reason); | |
394 | GC.removeRange(arg); | |
395 | destroy(arg); | |
396 | free(arg); | |
397 | } | |
398 | ||
399 | static extern(C) void delayCallback(evutil_socket_t, short flags, | |
400 | void *arg | |
401 | ) { | |
402 | assert(flags & EV_TIMEOUT); | |
403 | (*(cast(void delegate()*)arg))(); | |
404 | GC.removeRange(arg); | |
405 | destroy(arg); | |
406 | free(arg); | |
407 | } | |
408 | ||
409 | Thread workerThread_; | |
410 | ||
411 | event_base* eventBase_; | |
412 | ||
413 | /// The socket used for receiving new work items in the event loop. Paired | |
414 | /// with controlSendSocket_. Invalid (i.e. TAsyncWorkItem.init) items are | |
415 | /// ignored and can be used to wake up the worker thread. | |
416 | Socket controlReceiveSocket_; | |
417 | event* controlReceiveEvent_; | |
418 | ||
419 | /// The socket used to send new work items to the event loop. It is | |
420 | /// expected that work items can always be read at once from it, i.e. that | |
421 | /// there will never be short reads. | |
422 | Socket controlSendSocket_; | |
423 | ||
424 | /// Queued up work delegates for async transports. This also includes | |
425 | /// currently active ones, they are removed from the queue on completion, | |
426 | /// which is relied on by the control message receive fiber (the main one) | |
427 | /// to decide whether to immediately start executing items or not. | |
428 | // TODO: This should really be of some queue type, not an array slice, but | |
429 | // std.container doesn't have anything. | |
430 | Work[][TAsyncTransport] workQueues_; | |
431 | ||
432 | /// The total number of work items not yet finished (queued and currently | |
433 | /// executed) and delays not yet executed. | |
434 | uint queuedCount_; | |
435 | ||
436 | /// Protects queuedCount_. | |
437 | Mutex queuedCountMutex_; | |
438 | ||
439 | /// Triggered when queuedCount_ reaches zero, protected by queuedCountMutex_. | |
440 | Condition zeroQueuedCondition_; | |
441 | } | |
442 | ||
443 | private { | |
444 | timeval toTimeval(const(Duration) dur) { | |
445 | timeval tv; | |
446 | dur.split!("seconds", "usecs")(tv.tv_sec, tv.tv_usec); | |
447 | return tv; | |
448 | } | |
449 | ||
450 | /** | |
451 | * Returns the libevent flags combination to represent a given TAsyncEventType. | |
452 | */ | |
453 | short libeventEventType(TAsyncEventType type) { | |
454 | final switch (type) { | |
455 | case TAsyncEventType.READ: | |
456 | return EV_READ | EV_ET; | |
457 | case TAsyncEventType.WRITE: | |
458 | return EV_WRITE | EV_ET; | |
459 | } | |
460 | } | |
461 | } |