2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
21 package org
.apache
.thrift
.transport
;
23 import java
.io
.IOException
;
24 import java
.net
.InetSocketAddress
;
25 import java
.net
.Socket
;
26 import java
.net
.SocketAddress
;
27 import java
.net
.SocketException
;
28 import java
.nio
.ByteBuffer
;
29 import java
.nio
.channels
.SelectionKey
;
30 import java
.nio
.channels
.Selector
;
31 import java
.nio
.channels
.SocketChannel
;
33 import org
.slf4j
.Logger
;
34 import org
.slf4j
.LoggerFactory
;
37 * Transport for use with async client.
39 public class TNonblockingSocket
extends TNonblockingTransport
{
41 private static final Logger LOGGER
= LoggerFactory
.getLogger(TNonblockingSocket
.class.getName());
44 * Host and port if passed in, used for lazy non-blocking connect.
46 private final SocketAddress socketAddress_
;
48 private final SocketChannel socketChannel_
;
50 public TNonblockingSocket(String host
, int port
) throws IOException
{
55 * Create a new nonblocking socket transport that will be connected to host:port.
60 public TNonblockingSocket(String host
, int port
, int timeout
) throws IOException
{
61 this(SocketChannel
.open(), timeout
, new InetSocketAddress(host
, port
));
65 * Constructor that takes an already created socket.
67 * @param socketChannel Already created SocketChannel object
68 * @throws IOException if there is an error setting up the streams
70 public TNonblockingSocket(SocketChannel socketChannel
) throws IOException
{
71 this(socketChannel
, 0, null);
72 if (!socketChannel
.isConnected()) throw new IOException("Socket must already be connected");
75 private TNonblockingSocket(SocketChannel socketChannel
, int timeout
, SocketAddress socketAddress
)
77 socketChannel_
= socketChannel
;
78 socketAddress_
= socketAddress
;
80 // make it a nonblocking channel
81 socketChannel
.configureBlocking(false);
84 Socket socket
= socketChannel
.socket();
85 socket
.setSoLinger(false, 0);
86 socket
.setTcpNoDelay(true);
87 socket
.setKeepAlive(true);
92 * Register the new SocketChannel with our Selector, indicating
93 * we'd like to be notified when it's ready for I/O.
96 * @return the selection key for this socket.
98 public SelectionKey
registerSelector(Selector selector
, int interests
) throws IOException
{
99 return socketChannel_
.register(selector
, interests
);
103 * Sets the socket timeout, although this implementation never uses blocking operations so it is unused.
105 * @param timeout Milliseconds timeout
107 public void setTimeout(int timeout
) {
109 socketChannel_
.socket().setSoTimeout(timeout
);
110 } catch (SocketException sx
) {
111 LOGGER
.warn("Could not set socket timeout.", sx
);
116 * Returns a reference to the underlying SocketChannel.
118 public SocketChannel
getSocketChannel() {
119 return socketChannel_
;
123 * Checks whether the socket is connected.
125 public boolean isOpen() {
126 // isConnected() does not return false after close(), but isOpen() does
127 return socketChannel_
.isOpen() && socketChannel_
.isConnected();
131 * Do not call, the implementation provides its own lazy non-blocking connect.
133 public void open() throws TTransportException
{
134 throw new RuntimeException("open() is not implemented for TNonblockingSocket");
138 * Perform a nonblocking read into buffer.
140 public int read(ByteBuffer buffer
) throws IOException
{
141 return socketChannel_
.read(buffer
);
146 * Reads from the underlying input stream if not null.
148 public int read(byte[] buf
, int off
, int len
) throws TTransportException
{
149 if ((socketChannel_
.validOps() & SelectionKey
.OP_READ
) != SelectionKey
.OP_READ
) {
150 throw new TTransportException(TTransportException
.NOT_OPEN
,
151 "Cannot read from write-only socket channel");
154 return socketChannel_
.read(ByteBuffer
.wrap(buf
, off
, len
));
155 } catch (IOException iox
) {
156 throw new TTransportException(TTransportException
.UNKNOWN
, iox
);
161 * Perform a nonblocking write of the data in buffer;
163 public int write(ByteBuffer buffer
) throws IOException
{
164 return socketChannel_
.write(buffer
);
168 * Writes to the underlying output stream if not null.
170 public void write(byte[] buf
, int off
, int len
) throws TTransportException
{
171 if ((socketChannel_
.validOps() & SelectionKey
.OP_WRITE
) != SelectionKey
.OP_WRITE
) {
172 throw new TTransportException(TTransportException
.NOT_OPEN
,
173 "Cannot write to write-only socket channel");
176 socketChannel_
.write(ByteBuffer
.wrap(buf
, off
, len
));
177 } catch (IOException iox
) {
178 throw new TTransportException(TTransportException
.UNKNOWN
, iox
);
185 public void flush() throws TTransportException
{
186 // Not supported by SocketChannel.
192 public void close() {
194 socketChannel_
.close();
195 } catch (IOException iox
) {
196 LOGGER
.warn("Could not close socket.", iox
);
201 public boolean startConnect() throws IOException
{
202 return socketChannel_
.connect(socketAddress_
);
206 public boolean finishConnect() throws IOException
{
207 return socketChannel_
.finishConnect();