]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/d/src/thrift/async/libevent.d
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / d / src / thrift / async / libevent.d
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 module thrift.async.libevent;
20
21 import core.atomic;
22 import core.time : Duration, dur;
23 import core.exception : onOutOfMemoryError;
24 import core.memory : GC;
25 import core.thread : Fiber, Thread;
26 import core.sync.condition;
27 import core.sync.mutex;
28 import core.stdc.stdlib : free, malloc;
29 import deimos.event2.event;
30 import std.array : empty, front, popFront;
31 import std.conv : text, to;
32 import std.exception : enforce;
33 import std.socket : Socket, socketPair;
34 import thrift.base;
35 import thrift.async.base;
36 import thrift.internal.socket;
37 import thrift.internal.traits;
38 import thrift.util.cancellation;
39
40 // To avoid DMD @@BUG6395@@.
41 import thrift.internal.algorithm;
42
43 /**
44 * A TAsyncManager implementation based on libevent.
45 *
46 * The libevent loop for handling non-blocking sockets is run in a background
47 * thread, which is lazily spawned. The thread is not daemonized to avoid
48 * crashes on program shutdown, it is only stopped when the manager instance
49 * is destroyed. So, to ensure a clean program teardown, either make sure this
50 * instance gets destroyed (e.g. by using scope), or manually call stop() at
51 * the end.
52 */
53 class TLibeventAsyncManager : TAsyncSocketManager {
54 this() {
55 eventBase_ = event_base_new();
56
57 // Set up the socket pair for transferring control messages to the event
58 // loop.
59 auto pair = socketPair();
60 controlSendSocket_ = pair[0];
61 controlReceiveSocket_ = pair[1];
62 controlReceiveSocket_.blocking = false;
63
64 // Register an event for receiving control messages.
65 controlReceiveEvent_ = event_new(eventBase_, controlReceiveSocket_.handle,
66 EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&controlMsgReceiveCallback),
67 cast(void*)this);
68 event_add(controlReceiveEvent_, null);
69
70 queuedCountMutex_ = new Mutex;
71 zeroQueuedCondition_ = new Condition(queuedCountMutex_);
72 }
73
74 ~this() {
75 // stop() should be safe to call, because either we don't have a worker
76 // thread running and it is a no-op anyway, or it is guaranteed to be
77 // still running (blocked in event_base_loop), and thus guaranteed not to
78 // be garbage collected yet.
79 stop(dur!"hnsecs"(0));
80
81 event_free(controlReceiveEvent_);
82 event_base_free(eventBase_);
83 eventBase_ = null;
84 }
85
86 override void execute(TAsyncTransport transport, Work work,
87 TCancellation cancellation = null
88 ) {
89 if (cancellation && cancellation.triggered) return;
90
91 // Keep track that there is a new work item to be processed.
92 incrementQueuedCount();
93
94 ensureWorkerThreadRunning();
95
96 // We should be able to send the control message as a whole – we currently
97 // assume to be able to receive it at once as well. If this proves to be
98 // unstable (e.g. send could possibly return early if the receiving buffer
99 // is full and the blocking call gets interrupted by a signal), it could
100 // be changed to a more sophisticated scheme.
101
102 // Make sure the delegate context doesn't get GCd while the work item is
103 // on the wire.
104 GC.addRoot(work.ptr);
105
106 // Send work message.
107 sendControlMsg(ControlMsg(MsgType.WORK, work, transport));
108
109 if (cancellation) {
110 cancellation.triggering.addCallback({
111 sendControlMsg(ControlMsg(MsgType.CANCEL, work, transport));
112 });
113 }
114 }
115
116 override void delay(Duration duration, void delegate() work) {
117 incrementQueuedCount();
118
119 ensureWorkerThreadRunning();
120
121 const tv = toTimeval(duration);
122
123 // DMD @@BUG@@: Cannot deduce T to void delegate() here.
124 registerOneshotEvent!(void delegate())(
125 -1, 0, assumeNothrow(&delayCallback), &tv,
126 {
127 work();
128 decrementQueuedCount();
129 }
130 );
131 }
132
133 override bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)) {
134 bool cleanExit = true;
135
136 synchronized (this) {
137 if (workerThread_) {
138 synchronized (queuedCountMutex_) {
139 if (waitFinishTimeout > dur!"hnsecs"(0)) {
140 if (queuedCount_ > 0) {
141 zeroQueuedCondition_.wait(waitFinishTimeout);
142 }
143 } else if (waitFinishTimeout < dur!"hnsecs"(0)) {
144 while (queuedCount_ > 0) zeroQueuedCondition_.wait();
145 } else {
146 // waitFinishTimeout is zero, immediately exit in all cases.
147 }
148 cleanExit = (queuedCount_ == 0);
149 }
150
151 event_base_loopbreak(eventBase_);
152 sendControlMsg(ControlMsg(MsgType.SHUTDOWN));
153 workerThread_.join();
154 workQueues_ = null;
155 // We have nuked all currently enqueued items, so set the count to
156 // zero. This is safe to do without locking, since the worker thread
157 // is down.
158 queuedCount_ = 0;
159 atomicStore(*(cast(shared)&workerThread_), cast(shared(Thread))null);
160 }
161 }
162
163 return cleanExit;
164 }
165
166 override void addOneshotListener(Socket socket, TAsyncEventType eventType,
167 TSocketEventListener listener
168 ) {
169 addOneshotListenerImpl(socket, eventType, null, listener);
170 }
171
172 override void addOneshotListener(Socket socket, TAsyncEventType eventType,
173 Duration timeout, TSocketEventListener listener
174 ) {
175 if (timeout <= dur!"hnsecs"(0)) {
176 addOneshotListenerImpl(socket, eventType, null, listener);
177 } else {
178 // This is not really documented well, but libevent does not require to
179 // keep the timeval around after the event was added.
180 auto tv = toTimeval(timeout);
181 addOneshotListenerImpl(socket, eventType, &tv, listener);
182 }
183 }
184
185 private:
186 alias void delegate() Work;
187
188 void addOneshotListenerImpl(Socket socket, TAsyncEventType eventType,
189 const(timeval)* timeout, TSocketEventListener listener
190 ) {
191 registerOneshotEvent(socket.handle, libeventEventType(eventType),
192 assumeNothrow(&socketCallback), timeout, listener);
193 }
194
195 void registerOneshotEvent(T)(evutil_socket_t fd, short type,
196 event_callback_fn callback, const(timeval)* timeout, T payload
197 ) {
198 // Create a copy of the payload on the C heap.
199 auto payloadMem = malloc(payload.sizeof);
200 if (!payloadMem) onOutOfMemoryError();
201 (cast(T*)payloadMem)[0 .. 1] = payload;
202 GC.addRange(payloadMem, payload.sizeof);
203
204 auto result = event_base_once(eventBase_, fd, type, callback,
205 payloadMem, timeout);
206
207 // Assuming that we didn't get our arguments wrong above, the only other
208 // situation in which event_base_once can fail is when it can't allocate
209 // memory.
210 if (result != 0) onOutOfMemoryError();
211 }
212
213 enum MsgType : ubyte {
214 SHUTDOWN,
215 WORK,
216 CANCEL
217 }
218
219 struct ControlMsg {
220 MsgType type;
221 Work work;
222 TAsyncTransport transport;
223 }
224
225 /**
226 * Starts the worker thread if it is not already running.
227 */
228 void ensureWorkerThreadRunning() {
229 // Technically, only half barriers would be required here, but adding the
230 // argument seems to trigger a DMD template argument deduction @@BUG@@.
231 if (!atomicLoad(*(cast(shared)&workerThread_))) {
232 synchronized (this) {
233 if (!workerThread_) {
234 auto thread = new Thread({ event_base_loop(eventBase_, 0); });
235 thread.start();
236 atomicStore(*(cast(shared)&workerThread_), cast(shared)thread);
237 }
238 }
239 }
240 }
241
242 /**
243 * Sends a control message to the worker thread.
244 */
245 void sendControlMsg(const(ControlMsg) msg) {
246 auto result = controlSendSocket_.send((&msg)[0 .. 1]);
247 enum size = msg.sizeof;
248 enforce(result == size, new TException(text(
249 "Sending control message of type ", msg.type, " failed (", result,
250 " bytes instead of ", size, " transmitted).")));
251 }
252
253 /**
254 * Receives messages from the control message socket and acts on them. Called
255 * from the worker thread.
256 */
257 void receiveControlMsg() {
258 // Read as many new work items off the socket as possible (at least one
259 // should be available, as we got notified by libevent).
260 ControlMsg msg;
261 ptrdiff_t bytesRead;
262 while (true) {
263 bytesRead = controlReceiveSocket_.receive(cast(ubyte[])((&msg)[0 .. 1]));
264
265 if (bytesRead < 0) {
266 auto errno = getSocketErrno();
267 if (errno != WOULD_BLOCK_ERRNO) {
268 logError("Reading control message, some work item will possibly " ~
269 "never be executed: %s", socketErrnoString(errno));
270 }
271 }
272 if (bytesRead != msg.sizeof) break;
273
274 // Everything went fine, we received a new control message.
275 final switch (msg.type) {
276 case MsgType.SHUTDOWN:
277 // The message was just intended to wake us up for shutdown.
278 break;
279
280 case MsgType.CANCEL:
281 // When processing a cancellation, we must not touch the first item,
282 // since it is already being processed.
283 auto queue = workQueues_[msg.transport];
284 if (queue.length > 0) {
285 workQueues_[msg.transport] = [queue[0]] ~
286 removeEqual(queue[1 .. $], msg.work);
287 }
288 break;
289
290 case MsgType.WORK:
291 // Now that the work item is back in the D world, we don't need the
292 // extra GC root for the context pointer anymore (see execute()).
293 GC.removeRoot(msg.work.ptr);
294
295 // Add the work item to the queue and execute it.
296 auto queue = msg.transport in workQueues_;
297 if (queue is null || (*queue).empty) {
298 // If the queue is empty, add the new work item to the queue as well,
299 // but immediately start executing it.
300 workQueues_[msg.transport] = [msg.work];
301 executeWork(msg.transport, msg.work);
302 } else {
303 (*queue) ~= msg.work;
304 }
305 break;
306 }
307 }
308
309 // If the last read was successful, but didn't read enough bytes, we got
310 // a problem.
311 if (bytesRead > 0) {
312 logError("Unexpected partial control message read (%s byte(s) " ~
313 "instead of %s), some work item will possibly never be executed.",
314 bytesRead, msg.sizeof);
315 }
316 }
317
318 /**
319 * Executes the given work item and all others enqueued for the same
320 * transport in a new fiber. Called from the worker thread.
321 */
322 void executeWork(TAsyncTransport transport, Work work) {
323 (new Fiber({
324 auto item = work;
325 while (true) {
326 try {
327 // Execute the actual work. It will possibly add listeners to the
328 // event loop and yield away if it has to wait for blocking
329 // operations. It is quite possible that another fiber will modify
330 // the work queue for the current transport.
331 item();
332 } catch (Exception e) {
333 // This should never happen, just to be sure the worker thread
334 // doesn't stop working in mysterious ways because of an unhandled
335 // exception.
336 logError("Exception thrown by work item: %s", e);
337 }
338
339 // Remove the item from the work queue.
340 // Note: Due to the value semantics of array slices, we have to
341 // re-lookup this on every iteration. This could be solved, but I'd
342 // rather replace this directly with a queue type once one becomes
343 // available in Phobos.
344 auto queue = workQueues_[transport];
345 assert(queue.front == item);
346 queue.popFront();
347 workQueues_[transport] = queue;
348
349 // Now that the work item is done, no longer count it as queued.
350 decrementQueuedCount();
351
352 if (queue.empty) break;
353
354 // If the queue is not empty, execute the next waiting item.
355 item = queue.front;
356 }
357 })).call();
358 }
359
360 /**
361 * Increments the amount of queued items.
362 */
363 void incrementQueuedCount() {
364 synchronized (queuedCountMutex_) {
365 ++queuedCount_;
366 }
367 }
368
369 /**
370 * Decrements the amount of queued items.
371 */
372 void decrementQueuedCount() {
373 synchronized (queuedCountMutex_) {
374 assert(queuedCount_ > 0);
375 --queuedCount_;
376 if (queuedCount_ == 0) {
377 zeroQueuedCondition_.notifyAll();
378 }
379 }
380 }
381
382 static extern(C) void controlMsgReceiveCallback(evutil_socket_t, short,
383 void *managerThis
384 ) {
385 (cast(TLibeventAsyncManager)managerThis).receiveControlMsg();
386 }
387
388 static extern(C) void socketCallback(evutil_socket_t, short flags,
389 void *arg
390 ) {
391 auto reason = (flags & EV_TIMEOUT) ? TAsyncEventReason.TIMED_OUT :
392 TAsyncEventReason.NORMAL;
393 (*(cast(TSocketEventListener*)arg))(reason);
394 GC.removeRange(arg);
395 destroy(arg);
396 free(arg);
397 }
398
399 static extern(C) void delayCallback(evutil_socket_t, short flags,
400 void *arg
401 ) {
402 assert(flags & EV_TIMEOUT);
403 (*(cast(void delegate()*)arg))();
404 GC.removeRange(arg);
405 destroy(arg);
406 free(arg);
407 }
408
409 Thread workerThread_;
410
411 event_base* eventBase_;
412
413 /// The socket used for receiving new work items in the event loop. Paired
414 /// with controlSendSocket_. Invalid (i.e. TAsyncWorkItem.init) items are
415 /// ignored and can be used to wake up the worker thread.
416 Socket controlReceiveSocket_;
417 event* controlReceiveEvent_;
418
419 /// The socket used to send new work items to the event loop. It is
420 /// expected that work items can always be read at once from it, i.e. that
421 /// there will never be short reads.
422 Socket controlSendSocket_;
423
424 /// Queued up work delegates for async transports. This also includes
425 /// currently active ones, they are removed from the queue on completion,
426 /// which is relied on by the control message receive fiber (the main one)
427 /// to decide whether to immediately start executing items or not.
428 // TODO: This should really be of some queue type, not an array slice, but
429 // std.container doesn't have anything.
430 Work[][TAsyncTransport] workQueues_;
431
432 /// The total number of work items not yet finished (queued and currently
433 /// executed) and delays not yet executed.
434 uint queuedCount_;
435
436 /// Protects queuedCount_.
437 Mutex queuedCountMutex_;
438
439 /// Triggered when queuedCount_ reaches zero, protected by queuedCountMutex_.
440 Condition zeroQueuedCondition_;
441 }
442
443 private {
444 timeval toTimeval(const(Duration) dur) {
445 timeval tv;
446 dur.split!("seconds", "usecs")(tv.tv_sec, tv.tv_usec);
447 return tv;
448 }
449
450 /**
451 * Returns the libevent flags combination to represent a given TAsyncEventType.
452 */
453 short libeventEventType(TAsyncEventType type) {
454 final switch (type) {
455 case TAsyncEventType.READ:
456 return EV_READ | EV_ET;
457 case TAsyncEventType.WRITE:
458 return EV_WRITE | EV_ET;
459 }
460 }
461 }