2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 package org
.apache
.thrift
.server
;
22 import java
.util
.Arrays
;
23 import java
.util
.List
;
24 import java
.util
.Random
;
25 import java
.util
.concurrent
.ExecutorService
;
26 import java
.util
.concurrent
.RejectedExecutionException
;
27 import java
.util
.concurrent
.SynchronousQueue
;
28 import java
.util
.concurrent
.ThreadPoolExecutor
;
29 import java
.util
.concurrent
.TimeUnit
;
31 import org
.apache
.thrift
.TException
;
32 import org
.apache
.thrift
.TProcessor
;
33 import org
.apache
.thrift
.protocol
.TProtocol
;
34 import org
.apache
.thrift
.transport
.TServerTransport
;
35 import org
.apache
.thrift
.transport
.TTransport
;
36 import org
.apache
.thrift
.transport
.TTransportException
;
37 import org
.slf4j
.Logger
;
38 import org
.slf4j
.LoggerFactory
;
41 * Server which uses Java's built in ThreadPool management to spawn off
45 public class TThreadPoolServer
extends TServer
{
46 private static final Logger LOGGER
= LoggerFactory
.getLogger(TThreadPoolServer
.class.getName());
48 public static class Args
extends AbstractServerArgs
<Args
> {
49 public int minWorkerThreads
= 5;
50 public int maxWorkerThreads
= Integer
.MAX_VALUE
;
51 public ExecutorService executorService
;
52 public int stopTimeoutVal
= 60;
53 public TimeUnit stopTimeoutUnit
= TimeUnit
.SECONDS
;
54 public int requestTimeout
= 20;
55 public TimeUnit requestTimeoutUnit
= TimeUnit
.SECONDS
;
56 public int beBackoffSlotLength
= 100;
57 public TimeUnit beBackoffSlotLengthUnit
= TimeUnit
.MILLISECONDS
;
59 public Args(TServerTransport transport
) {
63 public Args
minWorkerThreads(int n
) {
68 public Args
maxWorkerThreads(int n
) {
73 public Args
stopTimeoutVal(int n
) {
78 public Args
stopTimeoutUnit(TimeUnit tu
) {
83 public Args
requestTimeout(int n
) {
88 public Args
requestTimeoutUnit(TimeUnit tu
) {
89 requestTimeoutUnit
= tu
;
92 //Binary exponential backoff slot length
93 public Args
beBackoffSlotLength(int n
) {
94 beBackoffSlotLength
= n
;
98 //Binary exponential backoff slot time unit
99 public Args
beBackoffSlotLengthUnit(TimeUnit tu
) {
100 beBackoffSlotLengthUnit
= tu
;
104 public Args
executorService(ExecutorService executorService
) {
105 this.executorService
= executorService
;
110 // Executor service for handling client connections
111 private ExecutorService executorService_
;
113 private final TimeUnit stopTimeoutUnit
;
115 private final long stopTimeoutVal
;
117 private final TimeUnit requestTimeoutUnit
;
119 private final long requestTimeout
;
121 private final long beBackoffSlotInMillis
;
123 private Random random
= new Random(System
.currentTimeMillis());
125 public TThreadPoolServer(Args args
) {
128 stopTimeoutUnit
= args
.stopTimeoutUnit
;
129 stopTimeoutVal
= args
.stopTimeoutVal
;
130 requestTimeoutUnit
= args
.requestTimeoutUnit
;
131 requestTimeout
= args
.requestTimeout
;
132 beBackoffSlotInMillis
= args
.beBackoffSlotLengthUnit
.toMillis(args
.beBackoffSlotLength
);
134 executorService_
= args
.executorService
!= null ?
135 args
.executorService
: createDefaultExecutorService(args
);
138 private static ExecutorService
createDefaultExecutorService(Args args
) {
139 SynchronousQueue
<Runnable
> executorQueue
=
140 new SynchronousQueue
<Runnable
>();
141 return new ThreadPoolExecutor(args
.minWorkerThreads
,
142 args
.maxWorkerThreads
,
144 args
.stopTimeoutUnit
,
148 protected ExecutorService
getExecutorService() {
149 return executorService_
;
152 protected boolean preServe() {
154 serverTransport_
.listen();
155 } catch (TTransportException ttx
) {
156 LOGGER
.error("Error occurred during listening.", ttx
);
160 // Run the preServe event
161 if (eventHandler_
!= null) {
162 eventHandler_
.preServe();
170 public void serve() {
181 protected void execute() {
182 int failureCount
= 0;
185 TTransport client
= serverTransport_
.accept();
186 WorkerProcess wp
= new WorkerProcess(client
);
189 long remainTimeInMillis
= requestTimeoutUnit
.toMillis(requestTimeout
);
192 executorService_
.execute(wp
);
194 } catch(Throwable t
) {
195 if (t
instanceof RejectedExecutionException
) {
198 if (remainTimeInMillis
> 0) {
199 //do a truncated 20 binary exponential backoff sleep
200 long sleepTimeInMillis
= ((long) (random
.nextDouble() *
201 (1L << Math
.min(retryCount
, 20)))) * beBackoffSlotInMillis
;
202 sleepTimeInMillis
= Math
.min(sleepTimeInMillis
, remainTimeInMillis
);
203 TimeUnit
.MILLISECONDS
.sleep(sleepTimeInMillis
);
204 remainTimeInMillis
= remainTimeInMillis
- sleepTimeInMillis
;
208 LOGGER
.warn("Task has been rejected by ExecutorService " + retryCount
209 + " times till timedout, reason: " + t
);
212 } catch (InterruptedException e
) {
213 LOGGER
.warn("Interrupted while waiting to place client on executor queue.");
214 Thread
.currentThread().interrupt();
217 } else if (t
instanceof Error
) {
218 LOGGER
.error("ExecutorService threw error: " + t
, t
);
221 //for other possible runtime errors from ExecutorService, should also not kill serve
222 LOGGER
.warn("ExecutorService threw error: " + t
, t
);
227 } catch (TTransportException ttx
) {
230 LOGGER
.warn("Transport error occurred during acceptance of message.", ttx
);
236 protected void waitForShutdown() {
237 executorService_
.shutdown();
239 // Loop until awaitTermination finally does return without a interrupted
240 // exception. If we don't do this, then we'll shut down prematurely. We want
241 // to let the executorService clear it's task queue, closing client sockets
243 long timeoutMS
= stopTimeoutUnit
.toMillis(stopTimeoutVal
);
244 long now
= System
.currentTimeMillis();
245 while (timeoutMS
>= 0) {
247 executorService_
.awaitTermination(timeoutMS
, TimeUnit
.MILLISECONDS
);
249 } catch (InterruptedException ix
) {
250 long newnow
= System
.currentTimeMillis();
251 timeoutMS
-= (newnow
- now
);
259 serverTransport_
.interrupt();
262 private class WorkerProcess
implements Runnable
{
265 * Client that this services.
267 private TTransport client_
;
270 * Default constructor.
272 * @param client Transport to process
274 private WorkerProcess(TTransport client
) {
279 * Loops on processing a client forever
282 TProcessor processor
= null;
283 TTransport inputTransport
= null;
284 TTransport outputTransport
= null;
285 TProtocol inputProtocol
= null;
286 TProtocol outputProtocol
= null;
288 TServerEventHandler eventHandler
= null;
289 ServerContext connectionContext
= null;
292 processor
= processorFactory_
.getProcessor(client_
);
293 inputTransport
= inputTransportFactory_
.getTransport(client_
);
294 outputTransport
= outputTransportFactory_
.getTransport(client_
);
295 inputProtocol
= inputProtocolFactory_
.getProtocol(inputTransport
);
296 outputProtocol
= outputProtocolFactory_
.getProtocol(outputTransport
);
298 eventHandler
= getEventHandler();
299 if (eventHandler
!= null) {
300 connectionContext
= eventHandler
.createContext(inputProtocol
, outputProtocol
);
302 // we check stopped_ first to make sure we're not supposed to be shutting
303 // down. this is necessary for graceful shutdown.
306 if (eventHandler
!= null) {
307 eventHandler
.processContext(connectionContext
, inputTransport
, outputTransport
);
313 processor
.process(inputProtocol
, outputProtocol
);
315 } catch (Exception x
) {
316 // We'll usually receive RuntimeException types here
317 // Need to unwrap to ascertain real causing exception before we choose to ignore
318 // Ignore err-logging all transport-level/type exceptions
319 if (!isIgnorableException(x
)) {
320 // Log the exception at error level and continue
321 LOGGER
.error((x
instanceof TException?
"Thrift " : "") + "Error occurred during processing of message.", x
);
324 if (eventHandler
!= null) {
325 eventHandler
.deleteContext(connectionContext
, inputProtocol
, outputProtocol
);
327 if (inputTransport
!= null) {
328 inputTransport
.close();
330 if (outputTransport
!= null) {
331 outputTransport
.close();
333 if (client_
.isOpen()) {
339 private boolean isIgnorableException(Exception x
) {
340 TTransportException tTransportException
= null;
342 if (x
instanceof TTransportException
) {
343 tTransportException
= (TTransportException
)x
;
345 else if (x
.getCause() instanceof TTransportException
) {
346 tTransportException
= (TTransportException
)x
.getCause();
349 if (tTransportException
!= null) {
350 switch(tTransportException
.getType()) {
351 case TTransportException
.END_OF_FILE
:
352 case TTransportException
.TIMED_OUT
: