2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
21 package org
.apache
.thrift
.server
;
23 import java
.util
.concurrent
.ExecutorService
;
24 import java
.util
.concurrent
.LinkedBlockingQueue
;
25 import java
.util
.concurrent
.RejectedExecutionException
;
26 import java
.util
.concurrent
.ThreadPoolExecutor
;
27 import java
.util
.concurrent
.TimeUnit
;
29 import org
.apache
.thrift
.transport
.TNonblockingServerTransport
;
32 * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
33 * Like TNonblockingServer, it relies on the use of TFramedTransport.
35 public class THsHaServer
extends TNonblockingServer
{
37 public static class Args
extends AbstractNonblockingServerArgs
<Args
> {
38 public int minWorkerThreads
= 5;
39 public int maxWorkerThreads
= Integer
.MAX_VALUE
;
40 private int stopTimeoutVal
= 60;
41 private TimeUnit stopTimeoutUnit
= TimeUnit
.SECONDS
;
42 private ExecutorService executorService
= null;
44 public Args(TNonblockingServerTransport transport
) {
50 * Sets the min and max threads.
52 * @deprecated use {@link #minWorkerThreads(int)} and {@link #maxWorkerThreads(int)} instead.
55 public Args
workerThreads(int n
) {
62 * @return what the min threads was set to.
63 * @deprecated use {@link #getMinWorkerThreads()} and {@link #getMaxWorkerThreads()} instead.
66 public int getWorkerThreads() {
67 return minWorkerThreads
;
70 public Args
minWorkerThreads(int n
) {
75 public Args
maxWorkerThreads(int n
) {
80 public int getMinWorkerThreads() {
81 return minWorkerThreads
;
84 public int getMaxWorkerThreads() {
85 return maxWorkerThreads
;
88 public int getStopTimeoutVal() {
89 return stopTimeoutVal
;
92 public Args
stopTimeoutVal(int stopTimeoutVal
) {
93 this.stopTimeoutVal
= stopTimeoutVal
;
97 public TimeUnit
getStopTimeoutUnit() {
98 return stopTimeoutUnit
;
101 public Args
stopTimeoutUnit(TimeUnit stopTimeoutUnit
) {
102 this.stopTimeoutUnit
= stopTimeoutUnit
;
106 public ExecutorService
getExecutorService() {
107 return executorService
;
110 public Args
executorService(ExecutorService executorService
) {
111 this.executorService
= executorService
;
117 // This wraps all the functionality of queueing and thread pool management
118 // for the passing of Invocations from the Selector to workers.
119 private final ExecutorService invoker
;
121 private final Args args
;
124 * Create the server with the specified Args configuration
126 public THsHaServer(Args args
) {
129 invoker
= args
.executorService
== null ?
createInvokerPool(args
) : args
.executorService
;
137 protected void waitForShutdown() {
139 gracefullyShutdownInvokerPool();
143 * Helper to create an invoker pool
145 protected static ExecutorService
createInvokerPool(Args options
) {
146 int minWorkerThreads
= options
.minWorkerThreads
;
147 int maxWorkerThreads
= options
.maxWorkerThreads
;
148 int stopTimeoutVal
= options
.stopTimeoutVal
;
149 TimeUnit stopTimeoutUnit
= options
.stopTimeoutUnit
;
151 LinkedBlockingQueue
<Runnable
> queue
= new LinkedBlockingQueue
<Runnable
>();
152 ExecutorService invoker
= new ThreadPoolExecutor(minWorkerThreads
,
153 maxWorkerThreads
, stopTimeoutVal
, stopTimeoutUnit
, queue
);
158 protected ExecutorService
getInvoker() {
162 protected void gracefullyShutdownInvokerPool() {
163 // try to gracefully shut down the executor service
166 // Loop until awaitTermination finally does return without a interrupted
167 // exception. If we don't do this, then we'll shut down prematurely. We want
168 // to let the executorService clear it's task queue, closing client sockets
170 long timeoutMS
= args
.stopTimeoutUnit
.toMillis(args
.stopTimeoutVal
);
171 long now
= System
.currentTimeMillis();
172 while (timeoutMS
>= 0) {
174 invoker
.awaitTermination(timeoutMS
, TimeUnit
.MILLISECONDS
);
176 } catch (InterruptedException ix
) {
177 long newnow
= System
.currentTimeMillis();
178 timeoutMS
-= (newnow
- now
);
185 * We override the standard invoke method here to queue the invocation for
186 * invoker service instead of immediately invoking. The thread pool takes care
190 protected boolean requestInvoke(FrameBuffer frameBuffer
) {
192 Runnable invocation
= getRunnable(frameBuffer
);
193 invoker
.execute(invocation
);
195 } catch (RejectedExecutionException rx
) {
196 LOGGER
.warn("ExecutorService rejected execution!", rx
);
201 protected Runnable
getRunnable(FrameBuffer frameBuffer
){
202 return new Invocation(frameBuffer
);