2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 package org
.apache
.thrift
.async
;
21 import java
.io
.IOException
;
22 import java
.io
.PrintWriter
;
23 import java
.io
.StringWriter
;
24 import java
.util
.ArrayList
;
25 import java
.util
.HashMap
;
26 import java
.util
.List
;
27 import java
.util
.concurrent
.CountDownLatch
;
28 import java
.util
.concurrent
.TimeUnit
;
29 import java
.util
.concurrent
.TimeoutException
;
30 import java
.util
.concurrent
.atomic
.AtomicBoolean
;
31 import java
.util
.concurrent
.atomic
.AtomicReference
;
33 import junit
.framework
.TestCase
;
35 import org
.apache
.thrift
.TException
;
36 import org
.apache
.thrift
.protocol
.TBinaryProtocol
;
37 import org
.apache
.thrift
.server
.ServerTestBase
;
38 import org
.apache
.thrift
.server
.THsHaServer
;
39 import org
.apache
.thrift
.server
.THsHaServer
.Args
;
40 import org
.apache
.thrift
.transport
.TNonblockingServerSocket
;
41 import org
.apache
.thrift
.transport
.TNonblockingSocket
;
43 import thrift
.test
.CompactProtoTestStruct
;
44 import thrift
.test
.ExceptionWithAMap
;
45 import thrift
.test
.Srv
;
46 import thrift
.test
.Srv
.Iface
;
48 public class TestTAsyncClientManager
extends TestCase
{
50 private THsHaServer server_
;
51 private Thread serverThread_
;
52 private TAsyncClientManager clientManager_
;
54 public void setUp() throws Exception
{
55 server_
= new THsHaServer(new Args(new TNonblockingServerSocket(
56 new TNonblockingServerSocket
.NonblockingAbstractServerSocketArgs().port(ServerTestBase
.PORT
))).
57 processor(new Srv
.Processor(new SrvHandler())));
58 serverThread_
= new Thread(new Runnable() {
63 serverThread_
.start();
64 clientManager_
= new TAsyncClientManager();
68 public void tearDown() throws Exception
{
70 clientManager_
.stop();
74 public void testBasicCall() throws Exception
{
75 Srv
.AsyncClient client
= getClient();
79 public void testBasicCallWithTimeout() throws Exception
{
80 Srv
.AsyncClient client
= getClient();
81 client
.setTimeout(5000);
85 private static abstract class ErrorCallTest
<C
extends TAsyncClient
, R
> {
86 final void runTest() throws Exception
{
87 final CountDownLatch latch
= new CountDownLatch(1);
88 final AtomicReference
<Exception
> error
= new AtomicReference
<Exception
>();
89 C client
= executeErroringCall(new AsyncMethodCallback
<R
>() {
91 public void onComplete(R response
) {
96 public void onError(Exception exception
) {
101 latch
.await(2, TimeUnit
.SECONDS
);
102 assertTrue(client
.hasError());
103 Exception exception
= error
.get();
104 assertNotNull(exception
);
105 assertSame(exception
, client
.getError());
106 validateError(client
, exception
);
110 * Executes a call that is expected to raise an exception.
112 * @param callback The testing callback that should be installed.
113 * @return The client the call was made against.
114 * @throws Exception if there was a problem setting up the client or making the call.
116 abstract C
executeErroringCall(AsyncMethodCallback
<R
> callback
) throws Exception
;
119 * Further validates the properties of the error raised in the remote call and the state of the
120 * client after that call.
122 * @param client The client returned from {@link #executeErroringCall(AsyncMethodCallback)}.
123 * @param error The exception raised by the remote call.
125 abstract void validateError(C client
, Exception error
);
128 public void testUnexpectedRemoteExceptionCall() throws Exception
{
129 new ErrorCallTest
<Srv
.AsyncClient
, Boolean
>() {
131 Srv
.AsyncClient
executeErroringCall(AsyncMethodCallback
<Boolean
> callback
) throws Exception
{
132 Srv
.AsyncClient client
= getClient();
133 client
.declaredExceptionMethod(false, callback
);
138 void validateError(Srv
.AsyncClient client
, Exception error
) {
139 assertFalse(client
.hasTimeout());
140 assertTrue(error
instanceof TException
);
145 public void testDeclaredRemoteExceptionCall() throws Exception
{
146 new ErrorCallTest
<Srv
.AsyncClient
, Boolean
>() {
148 Srv
.AsyncClient
executeErroringCall(AsyncMethodCallback
<Boolean
> callback
) throws Exception
{
149 Srv
.AsyncClient client
= getClient();
150 client
.declaredExceptionMethod(true, callback
);
155 void validateError(Srv
.AsyncClient client
, Exception error
) {
156 assertFalse(client
.hasTimeout());
157 assertEquals(ExceptionWithAMap
.class, error
.getClass());
158 ExceptionWithAMap exceptionWithAMap
= (ExceptionWithAMap
) error
;
159 assertEquals("blah", exceptionWithAMap
.getBlah());
160 assertEquals(new HashMap
<String
, String
>(), exceptionWithAMap
.getMap_field());
165 public void testTimeoutCall() throws Exception
{
166 new ErrorCallTest
<Srv
.AsyncClient
, Integer
>() {
168 Srv
.AsyncClient
executeErroringCall(AsyncMethodCallback
<Integer
> callback
) throws Exception
{
169 Srv
.AsyncClient client
= getClient();
170 client
.setTimeout(100);
171 client
.primitiveMethod(callback
);
176 void validateError(Srv
.AsyncClient client
, Exception error
) {
177 assertTrue(client
.hasTimeout());
178 assertTrue(error
instanceof TimeoutException
);
183 public void testVoidCall() throws Exception
{
184 final CountDownLatch latch
= new CountDownLatch(1);
185 final AtomicBoolean returned
= new AtomicBoolean(false);
186 Srv
.AsyncClient client
= getClient();
187 client
.voidMethod(new FailureLessCallback
<Void
>() {
189 public void onComplete(Void response
) {
194 latch
.await(1, TimeUnit
.SECONDS
);
195 assertTrue(returned
.get());
198 public void testOnewayCall() throws Exception
{
199 final CountDownLatch latch
= new CountDownLatch(1);
200 final AtomicBoolean returned
= new AtomicBoolean(false);
201 Srv
.AsyncClient client
= getClient();
202 client
.onewayMethod(new FailureLessCallback
<Void
>() {
204 public void onComplete(Void response
) {
209 latch
.await(1, TimeUnit
.SECONDS
);
210 assertTrue(returned
.get());
213 public void testParallelCalls() throws Exception
{
214 // make multiple calls with deserialization in the selector thread (repro Eric's issue)
216 int numCallsPerThread
= 100;
217 List
<JankyRunnable
> runnables
= new ArrayList
<JankyRunnable
>();
218 List
<Thread
> threads
= new ArrayList
<Thread
>();
219 for (int i
= 0; i
< numThreads
; i
++) {
220 JankyRunnable runnable
= new JankyRunnable(numCallsPerThread
);
221 Thread thread
= new Thread(runnable
);
224 runnables
.add(runnable
);
226 for (Thread thread
: threads
) {
229 int numSuccesses
= 0;
230 for (JankyRunnable runnable
: runnables
) {
231 numSuccesses
+= runnable
.getNumSuccesses();
233 assertEquals(numThreads
* numCallsPerThread
, numSuccesses
);
236 private Srv
.AsyncClient
getClient() throws IOException
{
237 TNonblockingSocket clientSocket
= new TNonblockingSocket(ServerTestBase
.HOST
, ServerTestBase
.PORT
);
238 return new Srv
.AsyncClient(new TBinaryProtocol
.Factory(), clientManager_
, clientSocket
);
241 private void basicCall(Srv
.AsyncClient client
) throws Exception
{
242 final CountDownLatch latch
= new CountDownLatch(1);
243 final AtomicBoolean returned
= new AtomicBoolean(false);
244 client
.Janky(1, new FailureLessCallback
<Integer
>() {
246 public void onComplete(Integer response
) {
247 assertEquals(3, response
.intValue());
253 public void onError(Exception exception
) {
255 StringWriter sink
= new StringWriter();
256 exception
.printStackTrace(new PrintWriter(sink
, true));
257 fail("unexpected onError with exception " + sink
.toString());
263 latch
.await(100, TimeUnit
.SECONDS
);
264 assertTrue(returned
.get());
267 public class SrvHandler
implements Iface
{
268 // Use this method for a standard call testing
270 public int Janky(int arg
) throws TException
{
271 assertEquals(1, arg
);
275 // Using this method for timeout testing - sleeps for 1 second before returning
277 public int primitiveMethod() throws TException
{
280 } catch (InterruptedException e
) {
287 public void methodWithDefaultArgs(int something
) throws TException
{ }
290 public CompactProtoTestStruct
structMethod() throws TException
{
295 public void voidMethod() throws TException
{
299 public void onewayMethod() throws TException
{
303 public boolean declaredExceptionMethod(boolean shouldThrowDeclared
) throws TException
{
304 if (shouldThrowDeclared
) {
305 throw new ExceptionWithAMap("blah", new HashMap
<String
, String
>());
307 throw new TException("Unexpected!");
312 private static abstract class FailureLessCallback
<T
> implements AsyncMethodCallback
<T
> {
314 public void onError(Exception exception
) {
319 private static void fail(Exception exception
) {
320 StringWriter sink
= new StringWriter();
321 exception
.printStackTrace(new PrintWriter(sink
, true));
322 fail("unexpected error " + sink
.toString());
325 private class JankyRunnable
implements Runnable
{
326 private int numCalls_
;
327 private int numSuccesses_
= 0;
328 private Srv
.AsyncClient client_
;
330 public JankyRunnable(int numCalls
) throws Exception
{
331 numCalls_
= numCalls
;
332 client_
= getClient();
333 client_
.setTimeout(20000);
336 public int getNumSuccesses() {
337 return numSuccesses_
;
341 for (int i
= 0; i
< numCalls_
&& !client_
.hasError(); i
++) {
342 final int iteration
= i
;
344 // connect an async client
345 final CountDownLatch latch
= new CountDownLatch(1);
346 final AtomicBoolean returned
= new AtomicBoolean(false);
347 client_
.Janky(1, new AsyncMethodCallback
<Integer
>() {
350 public void onComplete(Integer result
) {
351 assertEquals(3, result
.intValue());
357 public void onError(Exception exception
) {
359 StringWriter sink
= new StringWriter();
360 exception
.printStackTrace(new PrintWriter(sink
, true));
361 fail("unexpected onError on iteration " + iteration
+ ": " + sink
.toString());
368 boolean calledBack
= latch
.await(30, TimeUnit
.SECONDS
);
369 assertTrue("wasn't called back in time on iteration " + iteration
, calledBack
);
370 assertTrue("onComplete not called on iteration " + iteration
, returned
.get());
371 this.numSuccesses_
++;
372 } catch (Exception e
) {