]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | module client_pool_test; | |
20 | ||
21 | import core.sync.semaphore : Semaphore; | |
22 | import core.time : Duration, dur; | |
23 | import core.thread : Thread; | |
24 | import std.algorithm; | |
25 | import std.array; | |
26 | import std.conv; | |
27 | import std.exception; | |
28 | import std.getopt; | |
29 | import std.range; | |
30 | import std.stdio; | |
31 | import std.typecons; | |
32 | import std.variant : Variant; | |
33 | import thrift.base; | |
34 | import thrift.async.libevent; | |
35 | import thrift.async.socket; | |
36 | import thrift.codegen.base; | |
37 | import thrift.codegen.async_client; | |
38 | import thrift.codegen.async_client_pool; | |
39 | import thrift.codegen.client; | |
40 | import thrift.codegen.client_pool; | |
41 | import thrift.codegen.processor; | |
42 | import thrift.protocol.base; | |
43 | import thrift.protocol.binary; | |
44 | import thrift.server.base; | |
45 | import thrift.server.simple; | |
46 | import thrift.server.transport.socket; | |
47 | import thrift.transport.base; | |
48 | import thrift.transport.buffered; | |
49 | import thrift.transport.socket; | |
50 | import thrift.util.cancellation; | |
51 | import thrift.util.future; | |
52 | ||
53 | // We use this as our RPC-layer exception here to make sure socket/… problems | |
54 | // (that would usually considered to be RPC layer faults) cause the tests to | |
55 | // fail, even though we are testing the RPC exception handling. | |
56 | class TestServiceException : TException { | |
57 | int port; | |
58 | } | |
59 | ||
60 | interface TestService { | |
61 | int getPort(); | |
62 | alias .TestServiceException TestServiceException; | |
63 | enum methodMeta = [TMethodMeta("getPort", [], | |
64 | [TExceptionMeta("a", 1, "TestServiceException")])]; | |
65 | } | |
66 | ||
67 | // Use some derived service, just to check that the pools handle inheritance | |
68 | // correctly. | |
69 | interface ExTestService : TestService { | |
70 | int[] getPortInArray(); | |
71 | enum methodMeta = [TMethodMeta("getPortInArray", [], | |
72 | [TExceptionMeta("a", 1, "TestServiceException")])]; | |
73 | } | |
74 | ||
75 | class ExTestHandler : ExTestService { | |
76 | this(ushort port, Duration delay, bool failing, bool trace) { | |
77 | this.port = port; | |
78 | this.delay = delay; | |
79 | this.failing = failing; | |
80 | this.trace = trace; | |
81 | } | |
82 | ||
83 | override int getPort() { | |
84 | if (trace) { | |
85 | stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port, | |
86 | delay, failing); | |
87 | } | |
88 | sleep(); | |
89 | failIfEnabled(); | |
90 | return port; | |
91 | } | |
92 | ||
93 | override int[] getPortInArray() { | |
94 | return [getPort()]; | |
95 | } | |
96 | ||
97 | ushort port; | |
98 | Duration delay; | |
99 | bool failing; | |
100 | bool trace; | |
101 | ||
102 | private: | |
103 | void sleep() { | |
104 | if (delay > dur!"hnsecs"(0)) Thread.sleep(delay); | |
105 | } | |
106 | ||
107 | void failIfEnabled() { | |
108 | if (!failing) return; | |
109 | ||
110 | auto e = new TestServiceException; | |
111 | e.port = port; | |
112 | throw e; | |
113 | } | |
114 | } | |
115 | ||
116 | class ServerPreServeHandler : TServerEventHandler { | |
117 | this(Semaphore sem) { | |
118 | sem_ = sem; | |
119 | } | |
120 | ||
121 | override void preServe() { | |
122 | sem_.notify(); | |
123 | } | |
124 | ||
125 | Variant createContext(TProtocol input, TProtocol output) { return Variant.init; } | |
126 | void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {} | |
127 | void preProcess(Variant serverContext, TTransport transport) {} | |
128 | ||
129 | private: | |
130 | Semaphore sem_; | |
131 | } | |
132 | ||
133 | class ServerThread : Thread { | |
134 | this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) { | |
135 | super(&run); | |
136 | handler_ = handler; | |
137 | cancellation_ = cancellation; | |
138 | serverHandler_ = serverHandler; | |
139 | } | |
140 | private: | |
141 | void run() { | |
142 | try { | |
143 | auto protocolFactory = new TBinaryProtocolFactory!(); | |
144 | auto processor = new TServiceProcessor!ExTestService(handler_); | |
145 | auto serverTransport = new TServerSocket(handler_.port); | |
146 | serverTransport.recvTimeout = dur!"seconds"(3); | |
147 | auto transportFactory = new TBufferedTransportFactory; | |
148 | ||
149 | auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory); | |
150 | server.eventHandler = serverHandler_; | |
151 | server.serve(cancellation_); | |
152 | } catch (Exception e) { | |
153 | writefln("Server thread on port %s failed: %s", handler_.port, e); | |
154 | } | |
155 | } | |
156 | ||
157 | ExTestHandler handler_; | |
158 | ServerPreServeHandler serverHandler_; | |
159 | TCancellation cancellation_; | |
160 | } | |
161 | ||
162 | void main(string[] args) { | |
163 | bool trace; | |
164 | ushort port = 9090; | |
165 | getopt(args, "port", &port, "trace", &trace); | |
166 | ||
167 | auto serverCancellation = new TCancellationOrigin; | |
168 | scope (exit) serverCancellation.trigger(); | |
169 | ||
170 | immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6))); | |
171 | ||
172 | // semaphore that will be incremented whenever each server thread has bound and started listening | |
173 | Semaphore sem = new Semaphore(0); | |
174 | ||
175 | version (none) { | |
176 | // Cannot use this due to multiple DMD @@BUG@@s: | |
177 | // 1. »function D main is a nested function and cannot be accessed from array« | |
178 | // when calling array() on the result of the outer map() – would have to | |
179 | // manually do the eager evaluation/array conversion. | |
180 | // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument, | |
181 | // can be worked around by calling array() on the map result first. | |
182 | // 3. Even when using the workarounds for the last two points, the DMD-built | |
183 | // executable crashes when building without (sic!) inlining enabled, | |
184 | // the backtrace points into the first delegate literal. | |
185 | auto handlers = array(map!((args){ | |
186 | return new ExTestHandler(args._0, args._1, args._2, trace); | |
187 | })(zip( | |
188 | ports, | |
189 | map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]), | |
190 | [false, false, false, true, true, true] | |
191 | ))); | |
192 | } else { | |
193 | auto handlers = [ | |
194 | new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace), | |
195 | new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace), | |
196 | new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace), | |
197 | new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace), | |
198 | new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace), | |
199 | new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace) | |
200 | ]; | |
201 | } | |
202 | ||
203 | // Fire up the server threads. | |
204 | foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start(); | |
205 | ||
206 | // wait until all the handlers signal that they're ready to serve | |
207 | foreach (h; handlers) (sem.wait(dur!`seconds`(1))); | |
208 | ||
209 | syncClientPoolTest(ports, handlers); | |
210 | asyncClientPoolTest(ports, handlers); | |
211 | asyncFastestClientPoolTest(ports, handlers); | |
212 | asyncAggregatorTest(ports, handlers); | |
213 | } | |
214 | ||
215 | ||
216 | void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { | |
217 | auto clients = array(map!((a){ | |
218 | return cast(TClientBase!ExTestService)tClient!ExTestService( | |
219 | tBinaryProtocol(new TSocket("127.0.0.1", a)) | |
220 | ); | |
221 | })(ports)); | |
222 | ||
223 | scope(exit) foreach (c; clients) c.outputProtocol.transport.close(); | |
224 | ||
225 | // Try the case where the first client succeeds. | |
226 | { | |
227 | enforce(makePool(clients).getPort() == ports[0]); | |
228 | } | |
229 | ||
230 | // Try the case where all clients fail. | |
231 | { | |
232 | auto pool = makePool(clients[3 .. $]); | |
233 | auto e = cast(TCompoundOperationException)collectException(pool.getPort()); | |
234 | enforce(e); | |
235 | enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), | |
236 | ports[3 .. $])); | |
237 | } | |
238 | ||
239 | // Try the case where the first clients fail, but a later one succeeds. | |
240 | { | |
241 | auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]); | |
242 | enforce(pool.getPortInArray() == [ports[0]]); | |
243 | } | |
244 | ||
245 | // Make sure a client is properly deactivated when it has failed too often. | |
246 | { | |
247 | auto pool = makePool(clients); | |
248 | pool.faultDisableCount = 1; | |
249 | pool.faultDisableDuration = dur!"msecs"(50); | |
250 | ||
251 | handlers[0].failing = true; | |
252 | enforce(pool.getPort() == ports[1]); | |
253 | ||
254 | handlers[0].failing = false; | |
255 | enforce(pool.getPort() == ports[1]); | |
256 | ||
257 | Thread.sleep(dur!"msecs"(50)); | |
258 | enforce(pool.getPort() == ports[0]); | |
259 | } | |
260 | } | |
261 | ||
262 | auto makePool(TClientBase!ExTestService[] clients) { | |
263 | auto p = tClientPool(clients); | |
264 | p.permuteClients = false; | |
265 | p.rpcFaultFilter = (Exception e) { | |
266 | return (cast(TestServiceException)e !is null); | |
267 | }; | |
268 | return p; | |
269 | } | |
270 | ||
271 | ||
272 | void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { | |
273 | auto manager = new TLibeventAsyncManager; | |
274 | scope (exit) manager.stop(dur!"hnsecs"(0)); | |
275 | ||
276 | auto clients = makeAsyncClients(manager, ports); | |
277 | scope(exit) foreach (c; clients) c.transport.close(); | |
278 | ||
279 | // Try the case where the first client succeeds. | |
280 | { | |
281 | enforce(makeAsyncPool(clients).getPort() == ports[0]); | |
282 | } | |
283 | ||
284 | // Try the case where all clients fail. | |
285 | { | |
286 | auto pool = makeAsyncPool(clients[3 .. $]); | |
287 | auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet()); | |
288 | enforce(e); | |
289 | enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), | |
290 | ports[3 .. $])); | |
291 | } | |
292 | ||
293 | // Try the case where the first clients fail, but a later one succeeds. | |
294 | { | |
295 | auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]); | |
296 | enforce(pool.getPortInArray() == [ports[0]]); | |
297 | } | |
298 | ||
299 | // Make sure a client is properly deactivated when it has failed too often. | |
300 | { | |
301 | auto pool = makeAsyncPool(clients); | |
302 | pool.faultDisableCount = 1; | |
303 | pool.faultDisableDuration = dur!"msecs"(50); | |
304 | ||
305 | handlers[0].failing = true; | |
306 | enforce(pool.getPort() == ports[1]); | |
307 | ||
308 | handlers[0].failing = false; | |
309 | enforce(pool.getPort() == ports[1]); | |
310 | ||
311 | Thread.sleep(dur!"msecs"(50)); | |
312 | enforce(pool.getPort() == ports[0]); | |
313 | } | |
314 | } | |
315 | ||
316 | auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) { | |
317 | auto p = tAsyncClientPool(clients); | |
318 | p.permuteClients = false; | |
319 | p.rpcFaultFilter = (Exception e) { | |
320 | return (cast(TestServiceException)e !is null); | |
321 | }; | |
322 | return p; | |
323 | } | |
324 | ||
325 | auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) { | |
326 | // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads | |
327 | // to »function D main is a nested function and cannot be accessed from array«. | |
328 | // Thus, we manually do the array conversion. | |
329 | auto lazyClients = map!((a){ | |
330 | return new TAsyncClient!ExTestService( | |
331 | new TAsyncSocket(manager, "127.0.0.1", a), | |
332 | new TBufferedTransportFactory, | |
333 | new TBinaryProtocolFactory!(TBufferedTransport) | |
334 | ); | |
335 | })(ports); | |
336 | TAsyncClientBase!ExTestService[] clients; | |
337 | foreach (c; lazyClients) clients ~= c; | |
338 | return clients; | |
339 | } | |
340 | ||
341 | ||
342 | void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { | |
343 | auto manager = new TLibeventAsyncManager; | |
344 | scope (exit) manager.stop(dur!"hnsecs"(0)); | |
345 | ||
346 | auto clients = makeAsyncClients(manager, ports); | |
347 | scope(exit) foreach (c; clients) c.transport.close(); | |
348 | ||
349 | // Make sure the fastest client wins, even if they are called in some other | |
350 | // order. | |
351 | { | |
352 | auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet(); | |
353 | enforce(result == ports[0]); | |
354 | } | |
355 | ||
356 | // Try the case where all clients fail. | |
357 | { | |
358 | auto pool = makeAsyncFastestPool(clients[3 .. $]); | |
359 | auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet()); | |
360 | enforce(e); | |
361 | enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), | |
362 | ports[3 .. $])); | |
363 | } | |
364 | ||
365 | // Try the case where the first clients fail, but a later one succeeds. | |
366 | { | |
367 | auto pool = makeAsyncFastestPool(clients[1 .. $]); | |
368 | enforce(pool.getPortInArray() == [ports[1]]); | |
369 | } | |
370 | } | |
371 | ||
372 | auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) { | |
373 | auto p = tAsyncFastestClientPool(clients); | |
374 | p.rpcFaultFilter = (Exception e) { | |
375 | return (cast(TestServiceException)e !is null); | |
376 | }; | |
377 | return p; | |
378 | } | |
379 | ||
380 | ||
381 | void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) { | |
382 | auto manager = new TLibeventAsyncManager; | |
383 | scope (exit) manager.stop(dur!"hnsecs"(0)); | |
384 | ||
385 | auto clients = makeAsyncClients(manager, ports); | |
386 | scope(exit) foreach (c; clients) c.transport.close(); | |
387 | ||
388 | auto aggregator = tAsyncAggregator( | |
389 | cast(TAsyncClientBase!ExTestService[])clients); | |
390 | ||
391 | // Test aggregator range interface. | |
392 | { | |
393 | auto range = aggregator.getPort().range(dur!"msecs"(50)); | |
394 | enforce(equal(range, ports[0 .. 2][])); | |
395 | enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions), | |
396 | ports[3 .. $ - 1])); | |
397 | enforce(range.completedCount == 4); | |
398 | } | |
399 | ||
400 | // Test default accumulator for scalars. | |
401 | { | |
402 | auto fullResult = aggregator.getPort().accumulate(); | |
403 | enforce(fullResult.waitGet() == ports[0 .. 3]); | |
404 | ||
405 | auto partialResult = aggregator.getPort().accumulate(); | |
406 | Thread.sleep(dur!"msecs"(20)); | |
407 | enforce(partialResult.finishGet() == ports[0 .. 2]); | |
408 | ||
409 | } | |
410 | ||
411 | // Test default accumulator for arrays. | |
412 | { | |
413 | auto fullResult = aggregator.getPortInArray().accumulate(); | |
414 | enforce(fullResult.waitGet() == ports[0 .. 3]); | |
415 | ||
416 | auto partialResult = aggregator.getPortInArray().accumulate(); | |
417 | Thread.sleep(dur!"msecs"(20)); | |
418 | enforce(partialResult.finishGet() == ports[0 .. 2]); | |
419 | } | |
420 | ||
421 | // Test custom accumulator. | |
422 | { | |
423 | auto fullResult = aggregator.getPort().accumulate!(function(int[] results){ | |
424 | return reduce!"a + b"(results); | |
425 | })(); | |
426 | enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]); | |
427 | ||
428 | auto partialResult = aggregator.getPort().accumulate!( | |
429 | function(int[] results, Exception[] exceptions) { | |
430 | // Return a tuple of the parameters so we can check them outside of | |
431 | // this function (to verify the values, we need access to »ports«, but | |
432 | // due to DMD @@BUG5710@@, we can't use a delegate literal).f | |
433 | return tuple(results, exceptions); | |
434 | } | |
435 | )(); | |
436 | Thread.sleep(dur!"msecs"(20)); | |
437 | auto resultTuple = partialResult.finishGet(); | |
438 | enforce(resultTuple[0] == ports[0 .. 2]); | |
439 | enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]), | |
440 | ports[3 .. $ - 1])); | |
441 | } | |
442 | } |