2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 module client_pool_test;
21 import core.sync.semaphore : Semaphore;
22 import core.time : Duration, dur;
23 import core.thread : Thread;
32 import std.variant : Variant;
34 import thrift.async.libevent;
35 import thrift.async.socket;
36 import thrift.codegen.base;
37 import thrift.codegen.async_client;
38 import thrift.codegen.async_client_pool;
39 import thrift.codegen.client;
40 import thrift.codegen.client_pool;
41 import thrift.codegen.processor;
42 import thrift.protocol.base;
43 import thrift.protocol.binary;
44 import thrift.server.base;
45 import thrift.server.simple;
46 import thrift.server.transport.socket;
47 import thrift.transport.base;
48 import thrift.transport.buffered;
49 import thrift.transport.socket;
50 import thrift.util.cancellation;
51 import thrift.util.future;
53 // We use this as our RPC-layer exception here to make sure socket/… problems
54 // (that would usually considered to be RPC layer faults) cause the tests to
55 // fail, even though we are testing the RPC exception handling.
56 class TestServiceException : TException {
60 interface TestService {
62 alias .TestServiceException TestServiceException;
63 enum methodMeta = [TMethodMeta("getPort", [],
64 [TExceptionMeta("a", 1, "TestServiceException")])];
67 // Use some derived service, just to check that the pools handle inheritance
69 interface ExTestService : TestService {
70 int[] getPortInArray();
71 enum methodMeta = [TMethodMeta("getPortInArray", [],
72 [TExceptionMeta("a", 1, "TestServiceException")])];
75 class ExTestHandler : ExTestService {
76 this(ushort port, Duration delay, bool failing, bool trace) {
79 this.failing = failing;
83 override int getPort() {
85 stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
93 override int[] getPortInArray() {
104 if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
107 void failIfEnabled() {
108 if (!failing) return;
110 auto e = new TestServiceException;
116 class ServerPreServeHandler : TServerEventHandler {
117 this(Semaphore sem) {
121 override void preServe() {
125 Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
126 void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
127 void preProcess(Variant serverContext, TTransport transport) {}
133 class ServerThread : Thread {
134 this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) {
137 cancellation_ = cancellation;
138 serverHandler_ = serverHandler;
143 auto protocolFactory = new TBinaryProtocolFactory!();
144 auto processor = new TServiceProcessor!ExTestService(handler_);
145 auto serverTransport = new TServerSocket(handler_.port);
146 serverTransport.recvTimeout = dur!"seconds"(3);
147 auto transportFactory = new TBufferedTransportFactory;
149 auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory);
150 server.eventHandler = serverHandler_;
151 server.serve(cancellation_);
152 } catch (Exception e) {
153 writefln("Server thread on port %s failed: %s", handler_.port, e);
157 ExTestHandler handler_;
158 ServerPreServeHandler serverHandler_;
159 TCancellation cancellation_;
162 void main(string[] args) {
165 getopt(args, "port", &port, "trace", &trace);
167 auto serverCancellation = new TCancellationOrigin;
168 scope (exit) serverCancellation.trigger();
170 immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
172 // semaphore that will be incremented whenever each server thread has bound and started listening
173 Semaphore sem = new Semaphore(0);
176 // Cannot use this due to multiple DMD @@BUG@@s:
177 // 1. »function D main is a nested function and cannot be accessed from array«
178 // when calling array() on the result of the outer map() – would have to
179 // manually do the eager evaluation/array conversion.
180 // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
181 // can be worked around by calling array() on the map result first.
182 // 3. Even when using the workarounds for the last two points, the DMD-built
183 // executable crashes when building without (sic!) inlining enabled,
184 // the backtrace points into the first delegate literal.
185 auto handlers = array(map!((args){
186 return new ExTestHandler(args._0, args._1, args._2, trace);
189 map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
190 [false, false, false, true, true, true]
194 new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
195 new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
196 new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
197 new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
198 new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
199 new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
203 // Fire up the server threads.
204 foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start();
206 // wait until all the handlers signal that they're ready to serve
207 foreach (h; handlers) (sem.wait(dur!`seconds`(1)));
209 syncClientPoolTest(ports, handlers);
210 asyncClientPoolTest(ports, handlers);
211 asyncFastestClientPoolTest(ports, handlers);
212 asyncAggregatorTest(ports, handlers);
216 void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
217 auto clients = array(map!((a){
218 return cast(TClientBase!ExTestService)tClient!ExTestService(
219 tBinaryProtocol(new TSocket("127.0.0.1", a))
223 scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
225 // Try the case where the first client succeeds.
227 enforce(makePool(clients).getPort() == ports[0]);
230 // Try the case where all clients fail.
232 auto pool = makePool(clients[3 .. $]);
233 auto e = cast(TCompoundOperationException)collectException(pool.getPort());
235 enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
239 // Try the case where the first clients fail, but a later one succeeds.
241 auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
242 enforce(pool.getPortInArray() == [ports[0]]);
245 // Make sure a client is properly deactivated when it has failed too often.
247 auto pool = makePool(clients);
248 pool.faultDisableCount = 1;
249 pool.faultDisableDuration = dur!"msecs"(50);
251 handlers[0].failing = true;
252 enforce(pool.getPort() == ports[1]);
254 handlers[0].failing = false;
255 enforce(pool.getPort() == ports[1]);
257 Thread.sleep(dur!"msecs"(50));
258 enforce(pool.getPort() == ports[0]);
262 auto makePool(TClientBase!ExTestService[] clients) {
263 auto p = tClientPool(clients);
264 p.permuteClients = false;
265 p.rpcFaultFilter = (Exception e) {
266 return (cast(TestServiceException)e !is null);
272 void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
273 auto manager = new TLibeventAsyncManager;
274 scope (exit) manager.stop(dur!"hnsecs"(0));
276 auto clients = makeAsyncClients(manager, ports);
277 scope(exit) foreach (c; clients) c.transport.close();
279 // Try the case where the first client succeeds.
281 enforce(makeAsyncPool(clients).getPort() == ports[0]);
284 // Try the case where all clients fail.
286 auto pool = makeAsyncPool(clients[3 .. $]);
287 auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
289 enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
293 // Try the case where the first clients fail, but a later one succeeds.
295 auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
296 enforce(pool.getPortInArray() == [ports[0]]);
299 // Make sure a client is properly deactivated when it has failed too often.
301 auto pool = makeAsyncPool(clients);
302 pool.faultDisableCount = 1;
303 pool.faultDisableDuration = dur!"msecs"(50);
305 handlers[0].failing = true;
306 enforce(pool.getPort() == ports[1]);
308 handlers[0].failing = false;
309 enforce(pool.getPort() == ports[1]);
311 Thread.sleep(dur!"msecs"(50));
312 enforce(pool.getPort() == ports[0]);
316 auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
317 auto p = tAsyncClientPool(clients);
318 p.permuteClients = false;
319 p.rpcFaultFilter = (Exception e) {
320 return (cast(TestServiceException)e !is null);
325 auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
326 // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
327 // to »function D main is a nested function and cannot be accessed from array«.
328 // Thus, we manually do the array conversion.
329 auto lazyClients = map!((a){
330 return new TAsyncClient!ExTestService(
331 new TAsyncSocket(manager, "127.0.0.1", a),
332 new TBufferedTransportFactory,
333 new TBinaryProtocolFactory!(TBufferedTransport)
336 TAsyncClientBase!ExTestService[] clients;
337 foreach (c; lazyClients) clients ~= c;
342 void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
343 auto manager = new TLibeventAsyncManager;
344 scope (exit) manager.stop(dur!"hnsecs"(0));
346 auto clients = makeAsyncClients(manager, ports);
347 scope(exit) foreach (c; clients) c.transport.close();
349 // Make sure the fastest client wins, even if they are called in some other
352 auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
353 enforce(result == ports[0]);
356 // Try the case where all clients fail.
358 auto pool = makeAsyncFastestPool(clients[3 .. $]);
359 auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
361 enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
365 // Try the case where the first clients fail, but a later one succeeds.
367 auto pool = makeAsyncFastestPool(clients[1 .. $]);
368 enforce(pool.getPortInArray() == [ports[1]]);
372 auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
373 auto p = tAsyncFastestClientPool(clients);
374 p.rpcFaultFilter = (Exception e) {
375 return (cast(TestServiceException)e !is null);
381 void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
382 auto manager = new TLibeventAsyncManager;
383 scope (exit) manager.stop(dur!"hnsecs"(0));
385 auto clients = makeAsyncClients(manager, ports);
386 scope(exit) foreach (c; clients) c.transport.close();
388 auto aggregator = tAsyncAggregator(
389 cast(TAsyncClientBase!ExTestService[])clients);
391 // Test aggregator range interface.
393 auto range = aggregator.getPort().range(dur!"msecs"(50));
394 enforce(equal(range, ports[0 .. 2][]));
395 enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
397 enforce(range.completedCount == 4);
400 // Test default accumulator for scalars.
402 auto fullResult = aggregator.getPort().accumulate();
403 enforce(fullResult.waitGet() == ports[0 .. 3]);
405 auto partialResult = aggregator.getPort().accumulate();
406 Thread.sleep(dur!"msecs"(20));
407 enforce(partialResult.finishGet() == ports[0 .. 2]);
411 // Test default accumulator for arrays.
413 auto fullResult = aggregator.getPortInArray().accumulate();
414 enforce(fullResult.waitGet() == ports[0 .. 3]);
416 auto partialResult = aggregator.getPortInArray().accumulate();
417 Thread.sleep(dur!"msecs"(20));
418 enforce(partialResult.finishGet() == ports[0 .. 2]);
421 // Test custom accumulator.
423 auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
424 return reduce!"a + b"(results);
426 enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
428 auto partialResult = aggregator.getPort().accumulate!(
429 function(int[] results, Exception[] exceptions) {
430 // Return a tuple of the parameters so we can check them outside of
431 // this function (to verify the values, we need access to »ports«, but
432 // due to DMD @@BUG5710@@, we can't use a delegate literal).f
433 return tuple(results, exceptions);
436 Thread.sleep(dur!"msecs"(20));
437 auto resultTuple = partialResult.finishGet();
438 enforce(resultTuple[0] == ports[0 .. 2]);
439 enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]),