2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 package org
.apache
.thrift
.async
;
21 import java
.io
.IOException
;
22 import java
.io
.Serializable
;
23 import java
.nio
.channels
.ClosedSelectorException
;
24 import java
.nio
.channels
.SelectionKey
;
25 import java
.nio
.channels
.Selector
;
26 import java
.nio
.channels
.spi
.SelectorProvider
;
27 import java
.util
.Comparator
;
28 import java
.util
.Iterator
;
29 import java
.util
.TreeSet
;
30 import java
.util
.concurrent
.ConcurrentLinkedQueue
;
31 import java
.util
.concurrent
.TimeoutException
;
33 import org
.apache
.thrift
.TException
;
34 import org
.slf4j
.Logger
;
35 import org
.slf4j
.LoggerFactory
;
38 * Contains selector thread which transitions method call objects
40 public class TAsyncClientManager
{
41 private static final Logger LOGGER
= LoggerFactory
.getLogger(TAsyncClientManager
.class.getName());
43 private final SelectThread selectThread
;
44 private final ConcurrentLinkedQueue
<TAsyncMethodCall
> pendingCalls
= new ConcurrentLinkedQueue
<TAsyncMethodCall
>();
46 public TAsyncClientManager() throws IOException
{
47 this.selectThread
= new SelectThread();
51 public void call(TAsyncMethodCall method
) throws TException
{
53 throw new TException("SelectThread is not running");
55 method
.prepareMethodCall();
56 pendingCalls
.add(method
);
57 selectThread
.getSelector().wakeup();
61 selectThread
.finish();
64 public boolean isRunning() {
65 return selectThread
.isAlive();
68 private class SelectThread
extends Thread
{
69 private final Selector selector
;
70 private volatile boolean running
;
71 private final TreeSet
<TAsyncMethodCall
> timeoutWatchSet
= new TreeSet
<TAsyncMethodCall
>(new TAsyncMethodCallTimeoutComparator());
73 public SelectThread() throws IOException
{
74 this.selector
= SelectorProvider
.provider().openSelector();
76 this.setName("TAsyncClientManager#SelectorThread " + this.getId());
78 // We don't want to hold up the JVM when shutting down
82 public Selector
getSelector() {
86 public void finish() {
95 if (timeoutWatchSet
.size() == 0) {
96 // No timeouts, so select indefinitely
99 // We have a timeout pending, so calculate the time until then and select appropriately
100 long nextTimeout
= timeoutWatchSet
.first().getTimeoutTimestamp();
101 long selectTime
= nextTimeout
- System
.currentTimeMillis();
102 if (selectTime
> 0) {
103 // Next timeout is in the future, select and wake up then
104 selector
.select(selectTime
);
106 // Next timeout is now or in past, select immediately so we can time out
107 selector
.selectNow();
110 } catch (IOException e
) {
111 LOGGER
.error("Caught IOException in TAsyncClientManager!", e
);
115 startPendingMethods();
116 } catch (Exception exception
) {
117 LOGGER
.error("Ignoring uncaught exception in SelectThread", exception
);
123 } catch (IOException ex
) {
124 LOGGER
.warn("Could not close selector. This may result in leaked resources!", ex
);
128 // Transition methods for ready keys
129 private void transitionMethods() {
131 Iterator
<SelectionKey
> keys
= selector
.selectedKeys().iterator();
132 while (keys
.hasNext()) {
133 SelectionKey key
= keys
.next();
135 if (!key
.isValid()) {
136 // this can happen if the method call experienced an error and the
137 // key was cancelled. can also happen if we timeout a method, which
138 // results in a channel close.
142 TAsyncMethodCall methodCall
= (TAsyncMethodCall
)key
.attachment();
143 methodCall
.transition(key
);
145 // If done or error occurred, remove from timeout watch set
146 if (methodCall
.isFinished() || methodCall
.getClient().hasError()) {
147 timeoutWatchSet
.remove(methodCall
);
150 } catch (ClosedSelectorException e
) {
151 LOGGER
.error("Caught ClosedSelectorException in TAsyncClientManager!", e
);
155 // Timeout any existing method calls
156 private void timeoutMethods() {
157 Iterator
<TAsyncMethodCall
> iterator
= timeoutWatchSet
.iterator();
158 long currentTime
= System
.currentTimeMillis();
159 while (iterator
.hasNext()) {
160 TAsyncMethodCall methodCall
= iterator
.next();
161 if (currentTime
>= methodCall
.getTimeoutTimestamp()) {
163 methodCall
.onError(new TimeoutException("Operation " + methodCall
.getClass() + " timed out after " + (currentTime
- methodCall
.getStartTime()) + " ms."));
170 // Start any new calls
171 private void startPendingMethods() {
172 TAsyncMethodCall methodCall
;
173 while ((methodCall
= pendingCalls
.poll()) != null) {
174 // Catch registration errors. method will catch transition errors and cleanup.
176 methodCall
.start(selector
);
178 // If timeout specified and first transition went smoothly, add to timeout watch set
179 TAsyncClient client
= methodCall
.getClient();
180 if (client
.hasTimeout() && !client
.hasError()) {
181 timeoutWatchSet
.add(methodCall
);
183 } catch (Exception exception
) {
184 LOGGER
.warn("Caught exception in TAsyncClientManager!", exception
);
185 methodCall
.onError(exception
);
191 /** Comparator used in TreeSet */
192 private static class TAsyncMethodCallTimeoutComparator
implements Comparator
<TAsyncMethodCall
>, Serializable
{
193 public int compare(TAsyncMethodCall left
, TAsyncMethodCall right
) {
194 if (left
.getTimeoutTimestamp() == right
.getTimeoutTimestamp()) {
195 return (int)(left
.getSequenceId() - right
.getSequenceId());
197 return (int)(left
.getTimeoutTimestamp() - right
.getTimeoutTimestamp());