]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | package org.apache.thrift.server; | |
21 | ||
22 | import java.util.Arrays; | |
23 | import java.util.List; | |
24 | import java.util.Random; | |
25 | import java.util.concurrent.ExecutorService; | |
26 | import java.util.concurrent.RejectedExecutionException; | |
27 | import java.util.concurrent.SynchronousQueue; | |
28 | import java.util.concurrent.ThreadPoolExecutor; | |
29 | import java.util.concurrent.TimeUnit; | |
30 | ||
31 | import org.apache.thrift.TException; | |
32 | import org.apache.thrift.TProcessor; | |
33 | import org.apache.thrift.protocol.TProtocol; | |
34 | import org.apache.thrift.transport.TServerTransport; | |
35 | import org.apache.thrift.transport.TTransport; | |
36 | import org.apache.thrift.transport.TTransportException; | |
37 | import org.slf4j.Logger; | |
38 | import org.slf4j.LoggerFactory; | |
39 | ||
40 | /** | |
41 | * Server which uses Java's built in ThreadPool management to spawn off | |
42 | * a worker pool that | |
43 | * | |
44 | */ | |
45 | public class TThreadPoolServer extends TServer { | |
46 | private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName()); | |
47 | ||
48 | public static class Args extends AbstractServerArgs<Args> { | |
49 | public int minWorkerThreads = 5; | |
50 | public int maxWorkerThreads = Integer.MAX_VALUE; | |
51 | public ExecutorService executorService; | |
52 | public int stopTimeoutVal = 60; | |
53 | public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; | |
54 | public int requestTimeout = 20; | |
55 | public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS; | |
56 | public int beBackoffSlotLength = 100; | |
57 | public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS; | |
58 | ||
59 | public Args(TServerTransport transport) { | |
60 | super(transport); | |
61 | } | |
62 | ||
63 | public Args minWorkerThreads(int n) { | |
64 | minWorkerThreads = n; | |
65 | return this; | |
66 | } | |
67 | ||
68 | public Args maxWorkerThreads(int n) { | |
69 | maxWorkerThreads = n; | |
70 | return this; | |
71 | } | |
72 | ||
73 | public Args stopTimeoutVal(int n) { | |
74 | stopTimeoutVal = n; | |
75 | return this; | |
76 | } | |
77 | ||
78 | public Args stopTimeoutUnit(TimeUnit tu) { | |
79 | stopTimeoutUnit = tu; | |
80 | return this; | |
81 | } | |
82 | ||
83 | public Args requestTimeout(int n) { | |
84 | requestTimeout = n; | |
85 | return this; | |
86 | } | |
87 | ||
88 | public Args requestTimeoutUnit(TimeUnit tu) { | |
89 | requestTimeoutUnit = tu; | |
90 | return this; | |
91 | } | |
92 | //Binary exponential backoff slot length | |
93 | public Args beBackoffSlotLength(int n) { | |
94 | beBackoffSlotLength = n; | |
95 | return this; | |
96 | } | |
97 | ||
98 | //Binary exponential backoff slot time unit | |
99 | public Args beBackoffSlotLengthUnit(TimeUnit tu) { | |
100 | beBackoffSlotLengthUnit = tu; | |
101 | return this; | |
102 | } | |
103 | ||
104 | public Args executorService(ExecutorService executorService) { | |
105 | this.executorService = executorService; | |
106 | return this; | |
107 | } | |
108 | } | |
109 | ||
110 | // Executor service for handling client connections | |
111 | private ExecutorService executorService_; | |
112 | ||
113 | private final TimeUnit stopTimeoutUnit; | |
114 | ||
115 | private final long stopTimeoutVal; | |
116 | ||
117 | private final TimeUnit requestTimeoutUnit; | |
118 | ||
119 | private final long requestTimeout; | |
120 | ||
121 | private final long beBackoffSlotInMillis; | |
122 | ||
123 | private Random random = new Random(System.currentTimeMillis()); | |
124 | ||
125 | public TThreadPoolServer(Args args) { | |
126 | super(args); | |
127 | ||
128 | stopTimeoutUnit = args.stopTimeoutUnit; | |
129 | stopTimeoutVal = args.stopTimeoutVal; | |
130 | requestTimeoutUnit = args.requestTimeoutUnit; | |
131 | requestTimeout = args.requestTimeout; | |
132 | beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength); | |
133 | ||
134 | executorService_ = args.executorService != null ? | |
135 | args.executorService : createDefaultExecutorService(args); | |
136 | } | |
137 | ||
138 | private static ExecutorService createDefaultExecutorService(Args args) { | |
139 | SynchronousQueue<Runnable> executorQueue = | |
140 | new SynchronousQueue<Runnable>(); | |
141 | return new ThreadPoolExecutor(args.minWorkerThreads, | |
142 | args.maxWorkerThreads, | |
143 | args.stopTimeoutVal, | |
144 | args.stopTimeoutUnit, | |
145 | executorQueue); | |
146 | } | |
147 | ||
148 | protected ExecutorService getExecutorService() { | |
149 | return executorService_; | |
150 | } | |
151 | ||
152 | protected boolean preServe() { | |
153 | try { | |
154 | serverTransport_.listen(); | |
155 | } catch (TTransportException ttx) { | |
156 | LOGGER.error("Error occurred during listening.", ttx); | |
157 | return false; | |
158 | } | |
159 | ||
160 | // Run the preServe event | |
161 | if (eventHandler_ != null) { | |
162 | eventHandler_.preServe(); | |
163 | } | |
164 | stopped_ = false; | |
165 | setServing(true); | |
166 | ||
167 | return true; | |
168 | } | |
169 | ||
170 | public void serve() { | |
171 | if (!preServe()) { | |
172 | return; | |
173 | } | |
174 | ||
175 | execute(); | |
176 | waitForShutdown(); | |
177 | ||
178 | setServing(false); | |
179 | } | |
180 | ||
181 | protected void execute() { | |
182 | int failureCount = 0; | |
183 | while (!stopped_) { | |
184 | try { | |
185 | TTransport client = serverTransport_.accept(); | |
186 | WorkerProcess wp = new WorkerProcess(client); | |
187 | ||
188 | int retryCount = 0; | |
189 | long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout); | |
190 | while(true) { | |
191 | try { | |
192 | executorService_.execute(wp); | |
193 | break; | |
194 | } catch(Throwable t) { | |
195 | if (t instanceof RejectedExecutionException) { | |
196 | retryCount++; | |
197 | try { | |
198 | if (remainTimeInMillis > 0) { | |
199 | //do a truncated 20 binary exponential backoff sleep | |
200 | long sleepTimeInMillis = ((long) (random.nextDouble() * | |
201 | (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis; | |
202 | sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis); | |
203 | TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis); | |
204 | remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis; | |
205 | } else { | |
206 | client.close(); | |
207 | wp = null; | |
208 | LOGGER.warn("Task has been rejected by ExecutorService " + retryCount | |
209 | + " times till timedout, reason: " + t); | |
210 | break; | |
211 | } | |
212 | } catch (InterruptedException e) { | |
213 | LOGGER.warn("Interrupted while waiting to place client on executor queue."); | |
214 | Thread.currentThread().interrupt(); | |
215 | break; | |
216 | } | |
217 | } else if (t instanceof Error) { | |
218 | LOGGER.error("ExecutorService threw error: " + t, t); | |
219 | throw (Error)t; | |
220 | } else { | |
221 | //for other possible runtime errors from ExecutorService, should also not kill serve | |
222 | LOGGER.warn("ExecutorService threw error: " + t, t); | |
223 | break; | |
224 | } | |
225 | } | |
226 | } | |
227 | } catch (TTransportException ttx) { | |
228 | if (!stopped_) { | |
229 | ++failureCount; | |
230 | LOGGER.warn("Transport error occurred during acceptance of message.", ttx); | |
231 | } | |
232 | } | |
233 | } | |
234 | } | |
235 | ||
236 | protected void waitForShutdown() { | |
237 | executorService_.shutdown(); | |
238 | ||
239 | // Loop until awaitTermination finally does return without a interrupted | |
240 | // exception. If we don't do this, then we'll shut down prematurely. We want | |
241 | // to let the executorService clear it's task queue, closing client sockets | |
242 | // appropriately. | |
243 | long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal); | |
244 | long now = System.currentTimeMillis(); | |
245 | while (timeoutMS >= 0) { | |
246 | try { | |
247 | executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); | |
248 | break; | |
249 | } catch (InterruptedException ix) { | |
250 | long newnow = System.currentTimeMillis(); | |
251 | timeoutMS -= (newnow - now); | |
252 | now = newnow; | |
253 | } | |
254 | } | |
255 | } | |
256 | ||
257 | public void stop() { | |
258 | stopped_ = true; | |
259 | serverTransport_.interrupt(); | |
260 | } | |
261 | ||
262 | private class WorkerProcess implements Runnable { | |
263 | ||
264 | /** | |
265 | * Client that this services. | |
266 | */ | |
267 | private TTransport client_; | |
268 | ||
269 | /** | |
270 | * Default constructor. | |
271 | * | |
272 | * @param client Transport to process | |
273 | */ | |
274 | private WorkerProcess(TTransport client) { | |
275 | client_ = client; | |
276 | } | |
277 | ||
278 | /** | |
279 | * Loops on processing a client forever | |
280 | */ | |
281 | public void run() { | |
282 | TProcessor processor = null; | |
283 | TTransport inputTransport = null; | |
284 | TTransport outputTransport = null; | |
285 | TProtocol inputProtocol = null; | |
286 | TProtocol outputProtocol = null; | |
287 | ||
288 | TServerEventHandler eventHandler = null; | |
289 | ServerContext connectionContext = null; | |
290 | ||
291 | try { | |
292 | processor = processorFactory_.getProcessor(client_); | |
293 | inputTransport = inputTransportFactory_.getTransport(client_); | |
294 | outputTransport = outputTransportFactory_.getTransport(client_); | |
295 | inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); | |
296 | outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); | |
297 | ||
298 | eventHandler = getEventHandler(); | |
299 | if (eventHandler != null) { | |
300 | connectionContext = eventHandler.createContext(inputProtocol, outputProtocol); | |
301 | } | |
302 | // we check stopped_ first to make sure we're not supposed to be shutting | |
303 | // down. this is necessary for graceful shutdown. | |
304 | while (true) { | |
305 | ||
306 | if (eventHandler != null) { | |
307 | eventHandler.processContext(connectionContext, inputTransport, outputTransport); | |
308 | } | |
309 | ||
310 | if (stopped_) { | |
311 | break; | |
312 | } | |
313 | processor.process(inputProtocol, outputProtocol); | |
314 | } | |
315 | } catch (Exception x) { | |
316 | // We'll usually receive RuntimeException types here | |
317 | // Need to unwrap to ascertain real causing exception before we choose to ignore | |
318 | // Ignore err-logging all transport-level/type exceptions | |
319 | if (!isIgnorableException(x)) { | |
320 | // Log the exception at error level and continue | |
321 | LOGGER.error((x instanceof TException? "Thrift " : "") + "Error occurred during processing of message.", x); | |
322 | } | |
323 | } finally { | |
324 | if (eventHandler != null) { | |
325 | eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol); | |
326 | } | |
327 | if (inputTransport != null) { | |
328 | inputTransport.close(); | |
329 | } | |
330 | if (outputTransport != null) { | |
331 | outputTransport.close(); | |
332 | } | |
333 | if (client_.isOpen()) { | |
334 | client_.close(); | |
335 | } | |
336 | } | |
337 | } | |
338 | ||
339 | private boolean isIgnorableException(Exception x) { | |
340 | TTransportException tTransportException = null; | |
341 | ||
342 | if (x instanceof TTransportException) { | |
343 | tTransportException = (TTransportException)x; | |
344 | } | |
345 | else if (x.getCause() instanceof TTransportException) { | |
346 | tTransportException = (TTransportException)x.getCause(); | |
347 | } | |
348 | ||
349 | if (tTransportException != null) { | |
350 | switch(tTransportException.getType()) { | |
351 | case TTransportException.END_OF_FILE: | |
352 | case TTransportException.TIMED_OUT: | |
353 | return true; | |
354 | } | |
355 | } | |
356 | return false; | |
357 | } | |
358 | } | |
359 | } |