]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/server/THsHaServer.java
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / java / src / org / apache / thrift / server / THsHaServer.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20
21 package org.apache.thrift.server;
22
23 import java.util.concurrent.ExecutorService;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.RejectedExecutionException;
26 import java.util.concurrent.ThreadPoolExecutor;
27 import java.util.concurrent.TimeUnit;
28
29 import org.apache.thrift.transport.TNonblockingServerTransport;
30
31 /**
32 * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
33 * Like TNonblockingServer, it relies on the use of TFramedTransport.
34 */
35 public class THsHaServer extends TNonblockingServer {
36
37 public static class Args extends AbstractNonblockingServerArgs<Args> {
38 public int minWorkerThreads = 5;
39 public int maxWorkerThreads = Integer.MAX_VALUE;
40 private int stopTimeoutVal = 60;
41 private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
42 private ExecutorService executorService = null;
43
44 public Args(TNonblockingServerTransport transport) {
45 super(transport);
46 }
47
48
49 /**
50 * Sets the min and max threads.
51 *
52 * @deprecated use {@link #minWorkerThreads(int)} and {@link #maxWorkerThreads(int)} instead.
53 */
54 @Deprecated
55 public Args workerThreads(int n) {
56 minWorkerThreads = n;
57 maxWorkerThreads = n;
58 return this;
59 }
60
61 /**
62 * @return what the min threads was set to.
63 * @deprecated use {@link #getMinWorkerThreads()} and {@link #getMaxWorkerThreads()} instead.
64 */
65 @Deprecated
66 public int getWorkerThreads() {
67 return minWorkerThreads;
68 }
69
70 public Args minWorkerThreads(int n) {
71 minWorkerThreads = n;
72 return this;
73 }
74
75 public Args maxWorkerThreads(int n) {
76 maxWorkerThreads = n;
77 return this;
78 }
79
80 public int getMinWorkerThreads() {
81 return minWorkerThreads;
82 }
83
84 public int getMaxWorkerThreads() {
85 return maxWorkerThreads;
86 }
87
88 public int getStopTimeoutVal() {
89 return stopTimeoutVal;
90 }
91
92 public Args stopTimeoutVal(int stopTimeoutVal) {
93 this.stopTimeoutVal = stopTimeoutVal;
94 return this;
95 }
96
97 public TimeUnit getStopTimeoutUnit() {
98 return stopTimeoutUnit;
99 }
100
101 public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
102 this.stopTimeoutUnit = stopTimeoutUnit;
103 return this;
104 }
105
106 public ExecutorService getExecutorService() {
107 return executorService;
108 }
109
110 public Args executorService(ExecutorService executorService) {
111 this.executorService = executorService;
112 return this;
113 }
114 }
115
116
117 // This wraps all the functionality of queueing and thread pool management
118 // for the passing of Invocations from the Selector to workers.
119 private final ExecutorService invoker;
120
121 private final Args args;
122
123 /**
124 * Create the server with the specified Args configuration
125 */
126 public THsHaServer(Args args) {
127 super(args);
128
129 invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
130 this.args = args;
131 }
132
133 /**
134 * {@inheritDoc}
135 */
136 @Override
137 protected void waitForShutdown() {
138 joinSelector();
139 gracefullyShutdownInvokerPool();
140 }
141
142 /**
143 * Helper to create an invoker pool
144 */
145 protected static ExecutorService createInvokerPool(Args options) {
146 int minWorkerThreads = options.minWorkerThreads;
147 int maxWorkerThreads = options.maxWorkerThreads;
148 int stopTimeoutVal = options.stopTimeoutVal;
149 TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
150
151 LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
152 ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
153 maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
154
155 return invoker;
156 }
157
158 protected ExecutorService getInvoker() {
159 return invoker;
160 }
161
162 protected void gracefullyShutdownInvokerPool() {
163 // try to gracefully shut down the executor service
164 invoker.shutdown();
165
166 // Loop until awaitTermination finally does return without a interrupted
167 // exception. If we don't do this, then we'll shut down prematurely. We want
168 // to let the executorService clear it's task queue, closing client sockets
169 // appropriately.
170 long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
171 long now = System.currentTimeMillis();
172 while (timeoutMS >= 0) {
173 try {
174 invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
175 break;
176 } catch (InterruptedException ix) {
177 long newnow = System.currentTimeMillis();
178 timeoutMS -= (newnow - now);
179 now = newnow;
180 }
181 }
182 }
183
184 /**
185 * We override the standard invoke method here to queue the invocation for
186 * invoker service instead of immediately invoking. The thread pool takes care
187 * of the rest.
188 */
189 @Override
190 protected boolean requestInvoke(FrameBuffer frameBuffer) {
191 try {
192 Runnable invocation = getRunnable(frameBuffer);
193 invoker.execute(invocation);
194 return true;
195 } catch (RejectedExecutionException rx) {
196 LOGGER.warn("ExecutorService rejected execution!", rx);
197 return false;
198 }
199 }
200
201 protected Runnable getRunnable(FrameBuffer frameBuffer){
202 return new Invocation(frameBuffer);
203 }
204 }