]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / java / src / org / apache / thrift / async / TAsyncMethodCall.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.thrift.async;
20
21 import java.io.IOException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.thrift.TException;
28 import org.apache.thrift.protocol.TProtocol;
29 import org.apache.thrift.protocol.TProtocolFactory;
30 import org.apache.thrift.transport.TFramedTransport;
31 import org.apache.thrift.transport.TMemoryBuffer;
32 import org.apache.thrift.transport.TNonblockingTransport;
33 import org.apache.thrift.transport.TTransportException;
34
35 /**
36 * Encapsulates an async method call.
37 * <p>
38 * Need to generate:
39 * <ul>
40 * <li>protected abstract void write_args(TProtocol protocol)</li>
41 * <li>protected abstract T getResult() throws &lt;Exception_1&gt;, &lt;Exception_2&gt;, ...</li>
42 * </ul>
43 *
44 * @param <T> The return type of the encapsulated method call.
45 */
46 public abstract class TAsyncMethodCall<T> {
47
48 private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
49 private static AtomicLong sequenceIdCounter = new AtomicLong(0);
50
51 public static enum State {
52 CONNECTING,
53 WRITING_REQUEST_SIZE,
54 WRITING_REQUEST_BODY,
55 READING_RESPONSE_SIZE,
56 READING_RESPONSE_BODY,
57 RESPONSE_READ,
58 ERROR;
59 }
60
61 /**
62 * Next step in the call, initialized by start()
63 */
64 private State state = null;
65
66 protected final TNonblockingTransport transport;
67 private final TProtocolFactory protocolFactory;
68 protected final TAsyncClient client;
69 private final AsyncMethodCallback<T> callback;
70 private final boolean isOneway;
71 private long sequenceId;
72 private final long timeout;
73
74 private ByteBuffer sizeBuffer;
75 private final byte[] sizeBufferArray = new byte[4];
76 private ByteBuffer frameBuffer;
77
78 private long startTime = System.currentTimeMillis();
79
80 protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) {
81 this.transport = transport;
82 this.callback = callback;
83 this.protocolFactory = protocolFactory;
84 this.client = client;
85 this.isOneway = isOneway;
86 this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement();
87 this.timeout = client.getTimeout();
88 }
89
90 protected State getState() {
91 return state;
92 }
93
94 protected boolean isFinished() {
95 return state == State.RESPONSE_READ;
96 }
97
98 protected long getStartTime() {
99 return startTime;
100 }
101
102 protected long getSequenceId() {
103 return sequenceId;
104 }
105
106 public TAsyncClient getClient() {
107 return client;
108 }
109
110 public boolean hasTimeout() {
111 return timeout > 0;
112 }
113
114 public long getTimeoutTimestamp() {
115 return timeout + startTime;
116 }
117
118 protected abstract void write_args(TProtocol protocol) throws TException;
119
120 protected abstract T getResult() throws Exception;
121
122 /**
123 * Initialize buffers.
124 * @throws TException if buffer initialization fails
125 */
126 protected void prepareMethodCall() throws TException {
127 TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE);
128 TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);
129 write_args(protocol);
130
131 int length = memoryBuffer.length();
132 frameBuffer = ByteBuffer.wrap(memoryBuffer.getArray(), 0, length);
133
134 TFramedTransport.encodeFrameSize(length, sizeBufferArray);
135 sizeBuffer = ByteBuffer.wrap(sizeBufferArray);
136 }
137
138 /**
139 * Register with selector and start first state, which could be either connecting or writing.
140 * @throws IOException if register or starting fails
141 */
142 void start(Selector sel) throws IOException {
143 SelectionKey key;
144 if (transport.isOpen()) {
145 state = State.WRITING_REQUEST_SIZE;
146 key = transport.registerSelector(sel, SelectionKey.OP_WRITE);
147 } else {
148 state = State.CONNECTING;
149 key = transport.registerSelector(sel, SelectionKey.OP_CONNECT);
150
151 // non-blocking connect can complete immediately,
152 // in which case we should not expect the OP_CONNECT
153 if (transport.startConnect()) {
154 registerForFirstWrite(key);
155 }
156 }
157
158 key.attach(this);
159 }
160
161 void registerForFirstWrite(SelectionKey key) throws IOException {
162 state = State.WRITING_REQUEST_SIZE;
163 key.interestOps(SelectionKey.OP_WRITE);
164 }
165
166 protected ByteBuffer getFrameBuffer() {
167 return frameBuffer;
168 }
169
170 /**
171 * Transition to next state, doing whatever work is required. Since this
172 * method is only called by the selector thread, we can make changes to our
173 * select interests without worrying about concurrency.
174 * @param key
175 */
176 void transition(SelectionKey key) {
177 // Ensure key is valid
178 if (!key.isValid()) {
179 key.cancel();
180 Exception e = new TTransportException("Selection key not valid!");
181 onError(e);
182 return;
183 }
184
185 // Transition function
186 try {
187 switch (state) {
188 case CONNECTING:
189 doConnecting(key);
190 break;
191 case WRITING_REQUEST_SIZE:
192 doWritingRequestSize();
193 break;
194 case WRITING_REQUEST_BODY:
195 doWritingRequestBody(key);
196 break;
197 case READING_RESPONSE_SIZE:
198 doReadingResponseSize();
199 break;
200 case READING_RESPONSE_BODY:
201 doReadingResponseBody(key);
202 break;
203 default: // RESPONSE_READ, ERROR, or bug
204 throw new IllegalStateException("Method call in state " + state
205 + " but selector called transition method. Seems like a bug...");
206 }
207 } catch (Exception e) {
208 key.cancel();
209 key.attach(null);
210 onError(e);
211 }
212 }
213
214 protected void onError(Exception e) {
215 client.onError(e);
216 callback.onError(e);
217 state = State.ERROR;
218 }
219
220 private void doReadingResponseBody(SelectionKey key) throws IOException {
221 if (transport.read(frameBuffer) < 0) {
222 throw new IOException("Read call frame failed");
223 }
224 if (frameBuffer.remaining() == 0) {
225 cleanUpAndFireCallback(key);
226 }
227 }
228
229 private void cleanUpAndFireCallback(SelectionKey key) {
230 state = State.RESPONSE_READ;
231 key.interestOps(0);
232 // this ensures that the TAsyncMethod instance doesn't hang around
233 key.attach(null);
234 try {
235 T result = this.getResult();
236 client.onComplete();
237 callback.onComplete(result);
238 } catch (Exception e) {
239 key.cancel();
240 onError(e);
241 }
242 }
243
244 private void doReadingResponseSize() throws IOException {
245 if (transport.read(sizeBuffer) < 0) {
246 throw new IOException("Read call frame size failed");
247 }
248 if (sizeBuffer.remaining() == 0) {
249 state = State.READING_RESPONSE_BODY;
250 frameBuffer = ByteBuffer.allocate(TFramedTransport.decodeFrameSize(sizeBufferArray));
251 }
252 }
253
254 private void doWritingRequestBody(SelectionKey key) throws IOException {
255 if (transport.write(frameBuffer) < 0) {
256 throw new IOException("Write call frame failed");
257 }
258 if (frameBuffer.remaining() == 0) {
259 if (isOneway) {
260 cleanUpAndFireCallback(key);
261 } else {
262 state = State.READING_RESPONSE_SIZE;
263 sizeBuffer.rewind(); // Prepare to read incoming frame size
264 key.interestOps(SelectionKey.OP_READ);
265 }
266 }
267 }
268
269 private void doWritingRequestSize() throws IOException {
270 if (transport.write(sizeBuffer) < 0) {
271 throw new IOException("Write call frame size failed");
272 }
273 if (sizeBuffer.remaining() == 0) {
274 state = State.WRITING_REQUEST_BODY;
275 }
276 }
277
278 private void doConnecting(SelectionKey key) throws IOException {
279 if (!key.isConnectable() || !transport.finishConnect()) {
280 throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT");
281 }
282 registerForFirstWrite(key);
283 }
284 }