2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
19 package org
.apache
.thrift
.async
;
21 import java
.io
.IOException
;
22 import java
.nio
.ByteBuffer
;
23 import java
.nio
.channels
.SelectionKey
;
24 import java
.nio
.channels
.Selector
;
25 import java
.util
.concurrent
.atomic
.AtomicLong
;
27 import org
.apache
.thrift
.TException
;
28 import org
.apache
.thrift
.protocol
.TProtocol
;
29 import org
.apache
.thrift
.protocol
.TProtocolFactory
;
30 import org
.apache
.thrift
.transport
.TFramedTransport
;
31 import org
.apache
.thrift
.transport
.TMemoryBuffer
;
32 import org
.apache
.thrift
.transport
.TNonblockingTransport
;
33 import org
.apache
.thrift
.transport
.TTransportException
;
36 * Encapsulates an async method call.
40 * <li>protected abstract void write_args(TProtocol protocol)</li>
41 * <li>protected abstract T getResult() throws <Exception_1>, <Exception_2>, ...</li>
44 * @param <T> The return type of the encapsulated method call.
46 public abstract class TAsyncMethodCall
<T
> {
48 private static final int INITIAL_MEMORY_BUFFER_SIZE
= 128;
49 private static AtomicLong sequenceIdCounter
= new AtomicLong(0);
51 public static enum State
{
55 READING_RESPONSE_SIZE
,
56 READING_RESPONSE_BODY
,
62 * Next step in the call, initialized by start()
64 private State state
= null;
66 protected final TNonblockingTransport transport
;
67 private final TProtocolFactory protocolFactory
;
68 protected final TAsyncClient client
;
69 private final AsyncMethodCallback
<T
> callback
;
70 private final boolean isOneway
;
71 private long sequenceId
;
72 private final long timeout
;
74 private ByteBuffer sizeBuffer
;
75 private final byte[] sizeBufferArray
= new byte[4];
76 private ByteBuffer frameBuffer
;
78 private long startTime
= System
.currentTimeMillis();
80 protected TAsyncMethodCall(TAsyncClient client
, TProtocolFactory protocolFactory
, TNonblockingTransport transport
, AsyncMethodCallback
<T
> callback
, boolean isOneway
) {
81 this.transport
= transport
;
82 this.callback
= callback
;
83 this.protocolFactory
= protocolFactory
;
85 this.isOneway
= isOneway
;
86 this.sequenceId
= TAsyncMethodCall
.sequenceIdCounter
.getAndIncrement();
87 this.timeout
= client
.getTimeout();
90 protected State
getState() {
94 protected boolean isFinished() {
95 return state
== State
.RESPONSE_READ
;
98 protected long getStartTime() {
102 protected long getSequenceId() {
106 public TAsyncClient
getClient() {
110 public boolean hasTimeout() {
114 public long getTimeoutTimestamp() {
115 return timeout
+ startTime
;
118 protected abstract void write_args(TProtocol protocol
) throws TException
;
120 protected abstract T
getResult() throws Exception
;
123 * Initialize buffers.
124 * @throws TException if buffer initialization fails
126 protected void prepareMethodCall() throws TException
{
127 TMemoryBuffer memoryBuffer
= new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE
);
128 TProtocol protocol
= protocolFactory
.getProtocol(memoryBuffer
);
129 write_args(protocol
);
131 int length
= memoryBuffer
.length();
132 frameBuffer
= ByteBuffer
.wrap(memoryBuffer
.getArray(), 0, length
);
134 TFramedTransport
.encodeFrameSize(length
, sizeBufferArray
);
135 sizeBuffer
= ByteBuffer
.wrap(sizeBufferArray
);
139 * Register with selector and start first state, which could be either connecting or writing.
140 * @throws IOException if register or starting fails
142 void start(Selector sel
) throws IOException
{
144 if (transport
.isOpen()) {
145 state
= State
.WRITING_REQUEST_SIZE
;
146 key
= transport
.registerSelector(sel
, SelectionKey
.OP_WRITE
);
148 state
= State
.CONNECTING
;
149 key
= transport
.registerSelector(sel
, SelectionKey
.OP_CONNECT
);
151 // non-blocking connect can complete immediately,
152 // in which case we should not expect the OP_CONNECT
153 if (transport
.startConnect()) {
154 registerForFirstWrite(key
);
161 void registerForFirstWrite(SelectionKey key
) throws IOException
{
162 state
= State
.WRITING_REQUEST_SIZE
;
163 key
.interestOps(SelectionKey
.OP_WRITE
);
166 protected ByteBuffer
getFrameBuffer() {
171 * Transition to next state, doing whatever work is required. Since this
172 * method is only called by the selector thread, we can make changes to our
173 * select interests without worrying about concurrency.
176 void transition(SelectionKey key
) {
177 // Ensure key is valid
178 if (!key
.isValid()) {
180 Exception e
= new TTransportException("Selection key not valid!");
185 // Transition function
191 case WRITING_REQUEST_SIZE
:
192 doWritingRequestSize();
194 case WRITING_REQUEST_BODY
:
195 doWritingRequestBody(key
);
197 case READING_RESPONSE_SIZE
:
198 doReadingResponseSize();
200 case READING_RESPONSE_BODY
:
201 doReadingResponseBody(key
);
203 default: // RESPONSE_READ, ERROR, or bug
204 throw new IllegalStateException("Method call in state " + state
205 + " but selector called transition method. Seems like a bug...");
207 } catch (Exception e
) {
214 protected void onError(Exception e
) {
220 private void doReadingResponseBody(SelectionKey key
) throws IOException
{
221 if (transport
.read(frameBuffer
) < 0) {
222 throw new IOException("Read call frame failed");
224 if (frameBuffer
.remaining() == 0) {
225 cleanUpAndFireCallback(key
);
229 private void cleanUpAndFireCallback(SelectionKey key
) {
230 state
= State
.RESPONSE_READ
;
232 // this ensures that the TAsyncMethod instance doesn't hang around
235 T result
= this.getResult();
237 callback
.onComplete(result
);
238 } catch (Exception e
) {
244 private void doReadingResponseSize() throws IOException
{
245 if (transport
.read(sizeBuffer
) < 0) {
246 throw new IOException("Read call frame size failed");
248 if (sizeBuffer
.remaining() == 0) {
249 state
= State
.READING_RESPONSE_BODY
;
250 frameBuffer
= ByteBuffer
.allocate(TFramedTransport
.decodeFrameSize(sizeBufferArray
));
254 private void doWritingRequestBody(SelectionKey key
) throws IOException
{
255 if (transport
.write(frameBuffer
) < 0) {
256 throw new IOException("Write call frame failed");
258 if (frameBuffer
.remaining() == 0) {
260 cleanUpAndFireCallback(key
);
262 state
= State
.READING_RESPONSE_SIZE
;
263 sizeBuffer
.rewind(); // Prepare to read incoming frame size
264 key
.interestOps(SelectionKey
.OP_READ
);
269 private void doWritingRequestSize() throws IOException
{
270 if (transport
.write(sizeBuffer
) < 0) {
271 throw new IOException("Write call frame size failed");
273 if (sizeBuffer
.remaining() == 0) {
274 state
= State
.WRITING_REQUEST_BODY
;
278 private void doConnecting(SelectionKey key
) throws IOException
{
279 if (!key
.isConnectable() || !transport
.finishConnect()) {
280 throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT");
282 registerForFirstWrite(key
);