]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | ||
21 | package org.apache.thrift.transport; | |
22 | ||
23 | import java.io.IOException; | |
24 | import java.net.InetSocketAddress; | |
25 | import java.net.Socket; | |
26 | import java.net.SocketAddress; | |
27 | import java.net.SocketException; | |
28 | import java.nio.ByteBuffer; | |
29 | import java.nio.channels.SelectionKey; | |
30 | import java.nio.channels.Selector; | |
31 | import java.nio.channels.SocketChannel; | |
32 | ||
33 | import org.slf4j.Logger; | |
34 | import org.slf4j.LoggerFactory; | |
35 | ||
36 | /** | |
37 | * Transport for use with async client. | |
38 | */ | |
39 | public class TNonblockingSocket extends TNonblockingTransport { | |
40 | ||
41 | private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingSocket.class.getName()); | |
42 | ||
43 | /** | |
44 | * Host and port if passed in, used for lazy non-blocking connect. | |
45 | */ | |
46 | private final SocketAddress socketAddress_; | |
47 | ||
48 | private final SocketChannel socketChannel_; | |
49 | ||
50 | public TNonblockingSocket(String host, int port) throws IOException { | |
51 | this(host, port, 0); | |
52 | } | |
53 | ||
54 | /** | |
55 | * Create a new nonblocking socket transport that will be connected to host:port. | |
56 | * @param host | |
57 | * @param port | |
58 | * @throws IOException | |
59 | */ | |
60 | public TNonblockingSocket(String host, int port, int timeout) throws IOException { | |
61 | this(SocketChannel.open(), timeout, new InetSocketAddress(host, port)); | |
62 | } | |
63 | ||
64 | /** | |
65 | * Constructor that takes an already created socket. | |
66 | * | |
67 | * @param socketChannel Already created SocketChannel object | |
68 | * @throws IOException if there is an error setting up the streams | |
69 | */ | |
70 | public TNonblockingSocket(SocketChannel socketChannel) throws IOException { | |
71 | this(socketChannel, 0, null); | |
72 | if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected"); | |
73 | } | |
74 | ||
75 | private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress) | |
76 | throws IOException { | |
77 | socketChannel_ = socketChannel; | |
78 | socketAddress_ = socketAddress; | |
79 | ||
80 | // make it a nonblocking channel | |
81 | socketChannel.configureBlocking(false); | |
82 | ||
83 | // set options | |
84 | Socket socket = socketChannel.socket(); | |
85 | socket.setSoLinger(false, 0); | |
86 | socket.setTcpNoDelay(true); | |
87 | socket.setKeepAlive(true); | |
88 | setTimeout(timeout); | |
89 | } | |
90 | ||
91 | /** | |
92 | * Register the new SocketChannel with our Selector, indicating | |
93 | * we'd like to be notified when it's ready for I/O. | |
94 | * | |
95 | * @param selector | |
96 | * @return the selection key for this socket. | |
97 | */ | |
98 | public SelectionKey registerSelector(Selector selector, int interests) throws IOException { | |
99 | return socketChannel_.register(selector, interests); | |
100 | } | |
101 | ||
102 | /** | |
103 | * Sets the socket timeout, although this implementation never uses blocking operations so it is unused. | |
104 | * | |
105 | * @param timeout Milliseconds timeout | |
106 | */ | |
107 | public void setTimeout(int timeout) { | |
108 | try { | |
109 | socketChannel_.socket().setSoTimeout(timeout); | |
110 | } catch (SocketException sx) { | |
111 | LOGGER.warn("Could not set socket timeout.", sx); | |
112 | } | |
113 | } | |
114 | ||
115 | /** | |
116 | * Returns a reference to the underlying SocketChannel. | |
117 | */ | |
118 | public SocketChannel getSocketChannel() { | |
119 | return socketChannel_; | |
120 | } | |
121 | ||
122 | /** | |
123 | * Checks whether the socket is connected. | |
124 | */ | |
125 | public boolean isOpen() { | |
126 | // isConnected() does not return false after close(), but isOpen() does | |
127 | return socketChannel_.isOpen() && socketChannel_.isConnected(); | |
128 | } | |
129 | ||
130 | /** | |
131 | * Do not call, the implementation provides its own lazy non-blocking connect. | |
132 | */ | |
133 | public void open() throws TTransportException { | |
134 | throw new RuntimeException("open() is not implemented for TNonblockingSocket"); | |
135 | } | |
136 | ||
137 | /** | |
138 | * Perform a nonblocking read into buffer. | |
139 | */ | |
140 | public int read(ByteBuffer buffer) throws IOException { | |
141 | return socketChannel_.read(buffer); | |
142 | } | |
143 | ||
144 | ||
145 | /** | |
146 | * Reads from the underlying input stream if not null. | |
147 | */ | |
148 | public int read(byte[] buf, int off, int len) throws TTransportException { | |
149 | if ((socketChannel_.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) { | |
150 | throw new TTransportException(TTransportException.NOT_OPEN, | |
151 | "Cannot read from write-only socket channel"); | |
152 | } | |
153 | try { | |
154 | return socketChannel_.read(ByteBuffer.wrap(buf, off, len)); | |
155 | } catch (IOException iox) { | |
156 | throw new TTransportException(TTransportException.UNKNOWN, iox); | |
157 | } | |
158 | } | |
159 | ||
160 | /** | |
161 | * Perform a nonblocking write of the data in buffer; | |
162 | */ | |
163 | public int write(ByteBuffer buffer) throws IOException { | |
164 | return socketChannel_.write(buffer); | |
165 | } | |
166 | ||
167 | /** | |
168 | * Writes to the underlying output stream if not null. | |
169 | */ | |
170 | public void write(byte[] buf, int off, int len) throws TTransportException { | |
171 | if ((socketChannel_.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) { | |
172 | throw new TTransportException(TTransportException.NOT_OPEN, | |
173 | "Cannot write to write-only socket channel"); | |
174 | } | |
175 | try { | |
176 | socketChannel_.write(ByteBuffer.wrap(buf, off, len)); | |
177 | } catch (IOException iox) { | |
178 | throw new TTransportException(TTransportException.UNKNOWN, iox); | |
179 | } | |
180 | } | |
181 | ||
182 | /** | |
183 | * Noop. | |
184 | */ | |
185 | public void flush() throws TTransportException { | |
186 | // Not supported by SocketChannel. | |
187 | } | |
188 | ||
189 | /** | |
190 | * Closes the socket. | |
191 | */ | |
192 | public void close() { | |
193 | try { | |
194 | socketChannel_.close(); | |
195 | } catch (IOException iox) { | |
196 | LOGGER.warn("Could not close socket.", iox); | |
197 | } | |
198 | } | |
199 | ||
200 | /** {@inheritDoc} */ | |
201 | public boolean startConnect() throws IOException { | |
202 | return socketChannel_.connect(socketAddress_); | |
203 | } | |
204 | ||
205 | /** {@inheritDoc} */ | |
206 | public boolean finishConnect() throws IOException { | |
207 | return socketChannel_.finishConnect(); | |
208 | } | |
209 | ||
210 | } |