]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / java / test / org / apache / thrift / async / TestTAsyncClientManager.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.thrift.async;
20
21 import java.io.IOException;
22 import java.io.PrintWriter;
23 import java.io.StringWriter;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.TimeoutException;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicReference;
32
33 import junit.framework.TestCase;
34
35 import org.apache.thrift.TException;
36 import org.apache.thrift.protocol.TBinaryProtocol;
37 import org.apache.thrift.server.ServerTestBase;
38 import org.apache.thrift.server.THsHaServer;
39 import org.apache.thrift.server.THsHaServer.Args;
40 import org.apache.thrift.transport.TNonblockingServerSocket;
41 import org.apache.thrift.transport.TNonblockingSocket;
42
43 import thrift.test.CompactProtoTestStruct;
44 import thrift.test.ExceptionWithAMap;
45 import thrift.test.Srv;
46 import thrift.test.Srv.Iface;
47
48 public class TestTAsyncClientManager extends TestCase {
49
50 private THsHaServer server_;
51 private Thread serverThread_;
52 private TAsyncClientManager clientManager_;
53
54 public void setUp() throws Exception {
55 server_ = new THsHaServer(new Args(new TNonblockingServerSocket(
56 new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(ServerTestBase.PORT))).
57 processor(new Srv.Processor(new SrvHandler())));
58 serverThread_ = new Thread(new Runnable() {
59 public void run() {
60 server_.serve();
61 }
62 });
63 serverThread_.start();
64 clientManager_ = new TAsyncClientManager();
65 Thread.sleep(500);
66 }
67
68 public void tearDown() throws Exception {
69 server_.stop();
70 clientManager_.stop();
71 serverThread_.join();
72 }
73
74 public void testBasicCall() throws Exception {
75 Srv.AsyncClient client = getClient();
76 basicCall(client);
77 }
78
79 public void testBasicCallWithTimeout() throws Exception {
80 Srv.AsyncClient client = getClient();
81 client.setTimeout(5000);
82 basicCall(client);
83 }
84
85 private static abstract class ErrorCallTest<C extends TAsyncClient, R> {
86 final void runTest() throws Exception {
87 final CountDownLatch latch = new CountDownLatch(1);
88 final AtomicReference<Exception> error = new AtomicReference<Exception>();
89 C client = executeErroringCall(new AsyncMethodCallback<R>() {
90 @Override
91 public void onComplete(R response) {
92 latch.countDown();
93 }
94
95 @Override
96 public void onError(Exception exception) {
97 error.set(exception);
98 latch.countDown();
99 }
100 });
101 latch.await(2, TimeUnit.SECONDS);
102 assertTrue(client.hasError());
103 Exception exception = error.get();
104 assertNotNull(exception);
105 assertSame(exception, client.getError());
106 validateError(client, exception);
107 }
108
109 /**
110 * Executes a call that is expected to raise an exception.
111 *
112 * @param callback The testing callback that should be installed.
113 * @return The client the call was made against.
114 * @throws Exception if there was a problem setting up the client or making the call.
115 */
116 abstract C executeErroringCall(AsyncMethodCallback<R> callback) throws Exception;
117
118 /**
119 * Further validates the properties of the error raised in the remote call and the state of the
120 * client after that call.
121 *
122 * @param client The client returned from {@link #executeErroringCall(AsyncMethodCallback)}.
123 * @param error The exception raised by the remote call.
124 */
125 abstract void validateError(C client, Exception error);
126 }
127
128 public void testUnexpectedRemoteExceptionCall() throws Exception {
129 new ErrorCallTest<Srv.AsyncClient, Boolean>() {
130 @Override
131 Srv.AsyncClient executeErroringCall(AsyncMethodCallback<Boolean> callback) throws Exception {
132 Srv.AsyncClient client = getClient();
133 client.declaredExceptionMethod(false, callback);
134 return client;
135 }
136
137 @Override
138 void validateError(Srv.AsyncClient client, Exception error) {
139 assertFalse(client.hasTimeout());
140 assertTrue(error instanceof TException);
141 }
142 }.runTest();
143 }
144
145 public void testDeclaredRemoteExceptionCall() throws Exception {
146 new ErrorCallTest<Srv.AsyncClient, Boolean>() {
147 @Override
148 Srv.AsyncClient executeErroringCall(AsyncMethodCallback<Boolean> callback) throws Exception {
149 Srv.AsyncClient client = getClient();
150 client.declaredExceptionMethod(true, callback);
151 return client;
152 }
153
154 @Override
155 void validateError(Srv.AsyncClient client, Exception error) {
156 assertFalse(client.hasTimeout());
157 assertEquals(ExceptionWithAMap.class, error.getClass());
158 ExceptionWithAMap exceptionWithAMap = (ExceptionWithAMap) error;
159 assertEquals("blah", exceptionWithAMap.getBlah());
160 assertEquals(new HashMap<String, String>(), exceptionWithAMap.getMap_field());
161 }
162 }.runTest();
163 }
164
165 public void testTimeoutCall() throws Exception {
166 new ErrorCallTest<Srv.AsyncClient, Integer>() {
167 @Override
168 Srv.AsyncClient executeErroringCall(AsyncMethodCallback<Integer> callback) throws Exception {
169 Srv.AsyncClient client = getClient();
170 client.setTimeout(100);
171 client.primitiveMethod(callback);
172 return client;
173 }
174
175 @Override
176 void validateError(Srv.AsyncClient client, Exception error) {
177 assertTrue(client.hasTimeout());
178 assertTrue(error instanceof TimeoutException);
179 }
180 }.runTest();
181 }
182
183 public void testVoidCall() throws Exception {
184 final CountDownLatch latch = new CountDownLatch(1);
185 final AtomicBoolean returned = new AtomicBoolean(false);
186 Srv.AsyncClient client = getClient();
187 client.voidMethod(new FailureLessCallback<Void>() {
188 @Override
189 public void onComplete(Void response) {
190 returned.set(true);
191 latch.countDown();
192 }
193 });
194 latch.await(1, TimeUnit.SECONDS);
195 assertTrue(returned.get());
196 }
197
198 public void testOnewayCall() throws Exception {
199 final CountDownLatch latch = new CountDownLatch(1);
200 final AtomicBoolean returned = new AtomicBoolean(false);
201 Srv.AsyncClient client = getClient();
202 client.onewayMethod(new FailureLessCallback<Void>() {
203 @Override
204 public void onComplete(Void response) {
205 returned.set(true);
206 latch.countDown();
207 }
208 });
209 latch.await(1, TimeUnit.SECONDS);
210 assertTrue(returned.get());
211 }
212
213 public void testParallelCalls() throws Exception {
214 // make multiple calls with deserialization in the selector thread (repro Eric's issue)
215 int numThreads = 50;
216 int numCallsPerThread = 100;
217 List<JankyRunnable> runnables = new ArrayList<JankyRunnable>();
218 List<Thread> threads = new ArrayList<Thread>();
219 for (int i = 0; i < numThreads; i++) {
220 JankyRunnable runnable = new JankyRunnable(numCallsPerThread);
221 Thread thread = new Thread(runnable);
222 thread.start();
223 threads.add(thread);
224 runnables.add(runnable);
225 }
226 for (Thread thread : threads) {
227 thread.join();
228 }
229 int numSuccesses = 0;
230 for (JankyRunnable runnable : runnables) {
231 numSuccesses += runnable.getNumSuccesses();
232 }
233 assertEquals(numThreads * numCallsPerThread, numSuccesses);
234 }
235
236 private Srv.AsyncClient getClient() throws IOException {
237 TNonblockingSocket clientSocket = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT);
238 return new Srv.AsyncClient(new TBinaryProtocol.Factory(), clientManager_, clientSocket);
239 }
240
241 private void basicCall(Srv.AsyncClient client) throws Exception {
242 final CountDownLatch latch = new CountDownLatch(1);
243 final AtomicBoolean returned = new AtomicBoolean(false);
244 client.Janky(1, new FailureLessCallback<Integer>() {
245 @Override
246 public void onComplete(Integer response) {
247 assertEquals(3, response.intValue());
248 returned.set(true);
249 latch.countDown();
250 }
251
252 @Override
253 public void onError(Exception exception) {
254 try {
255 StringWriter sink = new StringWriter();
256 exception.printStackTrace(new PrintWriter(sink, true));
257 fail("unexpected onError with exception " + sink.toString());
258 } finally {
259 latch.countDown();
260 }
261 }
262 });
263 latch.await(100, TimeUnit.SECONDS);
264 assertTrue(returned.get());
265 }
266
267 public class SrvHandler implements Iface {
268 // Use this method for a standard call testing
269 @Override
270 public int Janky(int arg) throws TException {
271 assertEquals(1, arg);
272 return 3;
273 }
274
275 // Using this method for timeout testing - sleeps for 1 second before returning
276 @Override
277 public int primitiveMethod() throws TException {
278 try {
279 Thread.sleep(1000);
280 } catch (InterruptedException e) {
281 e.printStackTrace();
282 }
283 return 0;
284 }
285
286 @Override
287 public void methodWithDefaultArgs(int something) throws TException { }
288
289 @Override
290 public CompactProtoTestStruct structMethod() throws TException {
291 return null;
292 }
293
294 @Override
295 public void voidMethod() throws TException {
296 }
297
298 @Override
299 public void onewayMethod() throws TException {
300 }
301
302 @Override
303 public boolean declaredExceptionMethod(boolean shouldThrowDeclared) throws TException {
304 if (shouldThrowDeclared) {
305 throw new ExceptionWithAMap("blah", new HashMap<String, String>());
306 } else {
307 throw new TException("Unexpected!");
308 }
309 }
310 }
311
312 private static abstract class FailureLessCallback<T> implements AsyncMethodCallback<T> {
313 @Override
314 public void onError(Exception exception) {
315 fail(exception);
316 }
317 }
318
319 private static void fail(Exception exception) {
320 StringWriter sink = new StringWriter();
321 exception.printStackTrace(new PrintWriter(sink, true));
322 fail("unexpected error " + sink.toString());
323 }
324
325 private class JankyRunnable implements Runnable {
326 private int numCalls_;
327 private int numSuccesses_ = 0;
328 private Srv.AsyncClient client_;
329
330 public JankyRunnable(int numCalls) throws Exception {
331 numCalls_ = numCalls;
332 client_ = getClient();
333 client_.setTimeout(20000);
334 }
335
336 public int getNumSuccesses() {
337 return numSuccesses_;
338 }
339
340 public void run() {
341 for (int i = 0; i < numCalls_ && !client_.hasError(); i++) {
342 final int iteration = i;
343 try {
344 // connect an async client
345 final CountDownLatch latch = new CountDownLatch(1);
346 final AtomicBoolean returned = new AtomicBoolean(false);
347 client_.Janky(1, new AsyncMethodCallback<Integer>() {
348
349 @Override
350 public void onComplete(Integer result) {
351 assertEquals(3, result.intValue());
352 returned.set(true);
353 latch.countDown();
354 }
355
356 @Override
357 public void onError(Exception exception) {
358 try {
359 StringWriter sink = new StringWriter();
360 exception.printStackTrace(new PrintWriter(sink, true));
361 fail("unexpected onError on iteration " + iteration + ": " + sink.toString());
362 } finally {
363 latch.countDown();
364 }
365 }
366 });
367
368 boolean calledBack = latch.await(30, TimeUnit.SECONDS);
369 assertTrue("wasn't called back in time on iteration " + iteration, calledBack);
370 assertTrue("onComplete not called on iteration " + iteration, returned.get());
371 this.numSuccesses_++;
372 } catch (Exception e) {
373 fail(e);
374 }
375 }
376 }
377 }
378 }