]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / java / src / org / apache / thrift / async / TAsyncClientManager.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.thrift.async;
20
21 import java.io.IOException;
22 import java.io.Serializable;
23 import java.nio.channels.ClosedSelectorException;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.spi.SelectorProvider;
27 import java.util.Comparator;
28 import java.util.Iterator;
29 import java.util.TreeSet;
30 import java.util.concurrent.ConcurrentLinkedQueue;
31 import java.util.concurrent.TimeoutException;
32
33 import org.apache.thrift.TException;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38 * Contains selector thread which transitions method call objects
39 */
40 public class TAsyncClientManager {
41 private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName());
42
43 private final SelectThread selectThread;
44 private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls = new ConcurrentLinkedQueue<TAsyncMethodCall>();
45
46 public TAsyncClientManager() throws IOException {
47 this.selectThread = new SelectThread();
48 selectThread.start();
49 }
50
51 public void call(TAsyncMethodCall method) throws TException {
52 if (!isRunning()) {
53 throw new TException("SelectThread is not running");
54 }
55 method.prepareMethodCall();
56 pendingCalls.add(method);
57 selectThread.getSelector().wakeup();
58 }
59
60 public void stop() {
61 selectThread.finish();
62 }
63
64 public boolean isRunning() {
65 return selectThread.isAlive();
66 }
67
68 private class SelectThread extends Thread {
69 private final Selector selector;
70 private volatile boolean running;
71 private final TreeSet<TAsyncMethodCall> timeoutWatchSet = new TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator());
72
73 public SelectThread() throws IOException {
74 this.selector = SelectorProvider.provider().openSelector();
75 this.running = true;
76 this.setName("TAsyncClientManager#SelectorThread " + this.getId());
77
78 // We don't want to hold up the JVM when shutting down
79 setDaemon(true);
80 }
81
82 public Selector getSelector() {
83 return selector;
84 }
85
86 public void finish() {
87 running = false;
88 selector.wakeup();
89 }
90
91 public void run() {
92 while (running) {
93 try {
94 try {
95 if (timeoutWatchSet.size() == 0) {
96 // No timeouts, so select indefinitely
97 selector.select();
98 } else {
99 // We have a timeout pending, so calculate the time until then and select appropriately
100 long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp();
101 long selectTime = nextTimeout - System.currentTimeMillis();
102 if (selectTime > 0) {
103 // Next timeout is in the future, select and wake up then
104 selector.select(selectTime);
105 } else {
106 // Next timeout is now or in past, select immediately so we can time out
107 selector.selectNow();
108 }
109 }
110 } catch (IOException e) {
111 LOGGER.error("Caught IOException in TAsyncClientManager!", e);
112 }
113 transitionMethods();
114 timeoutMethods();
115 startPendingMethods();
116 } catch (Exception exception) {
117 LOGGER.error("Ignoring uncaught exception in SelectThread", exception);
118 }
119 }
120
121 try {
122 selector.close();
123 } catch (IOException ex) {
124 LOGGER.warn("Could not close selector. This may result in leaked resources!", ex);
125 }
126 }
127
128 // Transition methods for ready keys
129 private void transitionMethods() {
130 try {
131 Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
132 while (keys.hasNext()) {
133 SelectionKey key = keys.next();
134 keys.remove();
135 if (!key.isValid()) {
136 // this can happen if the method call experienced an error and the
137 // key was cancelled. can also happen if we timeout a method, which
138 // results in a channel close.
139 // just skip
140 continue;
141 }
142 TAsyncMethodCall methodCall = (TAsyncMethodCall)key.attachment();
143 methodCall.transition(key);
144
145 // If done or error occurred, remove from timeout watch set
146 if (methodCall.isFinished() || methodCall.getClient().hasError()) {
147 timeoutWatchSet.remove(methodCall);
148 }
149 }
150 } catch (ClosedSelectorException e) {
151 LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
152 }
153 }
154
155 // Timeout any existing method calls
156 private void timeoutMethods() {
157 Iterator<TAsyncMethodCall> iterator = timeoutWatchSet.iterator();
158 long currentTime = System.currentTimeMillis();
159 while (iterator.hasNext()) {
160 TAsyncMethodCall methodCall = iterator.next();
161 if (currentTime >= methodCall.getTimeoutTimestamp()) {
162 iterator.remove();
163 methodCall.onError(new TimeoutException("Operation " + methodCall.getClass() + " timed out after " + (currentTime - methodCall.getStartTime()) + " ms."));
164 } else {
165 break;
166 }
167 }
168 }
169
170 // Start any new calls
171 private void startPendingMethods() {
172 TAsyncMethodCall methodCall;
173 while ((methodCall = pendingCalls.poll()) != null) {
174 // Catch registration errors. method will catch transition errors and cleanup.
175 try {
176 methodCall.start(selector);
177
178 // If timeout specified and first transition went smoothly, add to timeout watch set
179 TAsyncClient client = methodCall.getClient();
180 if (client.hasTimeout() && !client.hasError()) {
181 timeoutWatchSet.add(methodCall);
182 }
183 } catch (Exception exception) {
184 LOGGER.warn("Caught exception in TAsyncClientManager!", exception);
185 methodCall.onError(exception);
186 }
187 }
188 }
189 }
190
191 /** Comparator used in TreeSet */
192 private static class TAsyncMethodCallTimeoutComparator implements Comparator<TAsyncMethodCall>, Serializable {
193 public int compare(TAsyncMethodCall left, TAsyncMethodCall right) {
194 if (left.getTimeoutTimestamp() == right.getTimeoutTimestamp()) {
195 return (int)(left.getSequenceId() - right.getSequenceId());
196 } else {
197 return (int)(left.getTimeoutTimestamp() - right.getTimeoutTimestamp());
198 }
199 }
200 }
201 }