]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | ||
21 | package org.apache.thrift.transport; | |
22 | ||
23 | import java.io.IOException; | |
24 | import java.net.InetSocketAddress; | |
25 | import java.net.ServerSocket; | |
26 | import java.net.SocketException; | |
27 | import java.nio.channels.ClosedChannelException; | |
28 | import java.nio.channels.SelectionKey; | |
29 | import java.nio.channels.Selector; | |
30 | import java.nio.channels.ServerSocketChannel; | |
31 | import java.nio.channels.SocketChannel; | |
32 | ||
33 | import org.slf4j.Logger; | |
34 | import org.slf4j.LoggerFactory; | |
35 | ||
36 | /** | |
37 | * Wrapper around ServerSocketChannel | |
38 | */ | |
39 | public class TNonblockingServerSocket extends TNonblockingServerTransport { | |
40 | private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingServerSocket.class.getName()); | |
41 | ||
42 | /** | |
43 | * This channel is where all the nonblocking magic happens. | |
44 | */ | |
45 | private ServerSocketChannel serverSocketChannel = null; | |
46 | ||
47 | /** | |
48 | * Underlying ServerSocket object | |
49 | */ | |
50 | private ServerSocket serverSocket_ = null; | |
51 | ||
52 | /** | |
53 | * Timeout for client sockets from accept | |
54 | */ | |
55 | private int clientTimeout_ = 0; | |
56 | ||
57 | public static class NonblockingAbstractServerSocketArgs extends | |
58 | AbstractServerTransportArgs<NonblockingAbstractServerSocketArgs> {} | |
59 | ||
60 | /** | |
61 | * Creates just a port listening server socket | |
62 | */ | |
63 | public TNonblockingServerSocket(int port) throws TTransportException { | |
64 | this(port, 0); | |
65 | } | |
66 | ||
67 | /** | |
68 | * Creates just a port listening server socket | |
69 | */ | |
70 | public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException { | |
71 | this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout)); | |
72 | } | |
73 | ||
74 | public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException { | |
75 | this(bindAddr, 0); | |
76 | } | |
77 | ||
78 | public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException { | |
79 | this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout)); | |
80 | } | |
81 | ||
82 | public TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args) throws TTransportException { | |
83 | clientTimeout_ = args.clientTimeout; | |
84 | try { | |
85 | serverSocketChannel = ServerSocketChannel.open(); | |
86 | serverSocketChannel.configureBlocking(false); | |
87 | ||
88 | // Make server socket | |
89 | serverSocket_ = serverSocketChannel.socket(); | |
90 | // Prevent 2MSL delay problem on server restarts | |
91 | serverSocket_.setReuseAddress(true); | |
92 | // Bind to listening port | |
93 | serverSocket_.bind(args.bindAddr, args.backlog); | |
94 | } catch (IOException ioe) { | |
95 | serverSocket_ = null; | |
96 | throw new TTransportException("Could not create ServerSocket on address " + args.bindAddr.toString() + ".", ioe); | |
97 | } | |
98 | } | |
99 | ||
100 | public void listen() throws TTransportException { | |
101 | // Make sure not to block on accept | |
102 | if (serverSocket_ != null) { | |
103 | try { | |
104 | serverSocket_.setSoTimeout(0); | |
105 | } catch (SocketException sx) { | |
106 | LOGGER.error("Socket exception while setting socket timeout", sx); | |
107 | } | |
108 | } | |
109 | } | |
110 | ||
111 | protected TNonblockingSocket acceptImpl() throws TTransportException { | |
112 | if (serverSocket_ == null) { | |
113 | throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); | |
114 | } | |
115 | try { | |
116 | SocketChannel socketChannel = serverSocketChannel.accept(); | |
117 | if (socketChannel == null) { | |
118 | return null; | |
119 | } | |
120 | ||
121 | TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel); | |
122 | tsocket.setTimeout(clientTimeout_); | |
123 | return tsocket; | |
124 | } catch (IOException iox) { | |
125 | throw new TTransportException(iox); | |
126 | } | |
127 | } | |
128 | ||
129 | public void registerSelector(Selector selector) { | |
130 | try { | |
131 | // Register the server socket channel, indicating an interest in | |
132 | // accepting new connections | |
133 | serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); | |
134 | } catch (ClosedChannelException e) { | |
135 | // this shouldn't happen, ideally... | |
136 | // TODO: decide what to do with this. | |
137 | } | |
138 | } | |
139 | ||
140 | public void close() { | |
141 | if (serverSocket_ != null) { | |
142 | try { | |
143 | serverSocket_.close(); | |
144 | } catch (IOException iox) { | |
145 | LOGGER.warn("WARNING: Could not close server socket: " + iox.getMessage()); | |
146 | } | |
147 | serverSocket_ = null; | |
148 | } | |
149 | } | |
150 | ||
151 | public void interrupt() { | |
152 | // The thread-safeness of this is dubious, but Java documentation suggests | |
153 | // that it is safe to do this from a different thread context | |
154 | close(); | |
155 | } | |
156 | ||
157 | public int getPort() { | |
158 | if (serverSocket_ == null) | |
159 | return -1; | |
160 | return serverSocket_.getLocalPort(); | |
161 | } | |
162 | ||
163 | } |