]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / jaegertracing / thrift / lib / java / src / org / apache / thrift / server / TThreadPoolServer.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.thrift.server;
21
22 import java.util.Arrays;
23 import java.util.List;
24 import java.util.Random;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.RejectedExecutionException;
27 import java.util.concurrent.SynchronousQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30
31 import org.apache.thrift.TException;
32 import org.apache.thrift.TProcessor;
33 import org.apache.thrift.protocol.TProtocol;
34 import org.apache.thrift.transport.TServerTransport;
35 import org.apache.thrift.transport.TTransport;
36 import org.apache.thrift.transport.TTransportException;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 /**
41 * Server which uses Java's built in ThreadPool management to spawn off
42 * a worker pool that
43 *
44 */
45 public class TThreadPoolServer extends TServer {
46 private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName());
47
48 public static class Args extends AbstractServerArgs<Args> {
49 public int minWorkerThreads = 5;
50 public int maxWorkerThreads = Integer.MAX_VALUE;
51 public ExecutorService executorService;
52 public int stopTimeoutVal = 60;
53 public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
54 public int requestTimeout = 20;
55 public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
56 public int beBackoffSlotLength = 100;
57 public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS;
58
59 public Args(TServerTransport transport) {
60 super(transport);
61 }
62
63 public Args minWorkerThreads(int n) {
64 minWorkerThreads = n;
65 return this;
66 }
67
68 public Args maxWorkerThreads(int n) {
69 maxWorkerThreads = n;
70 return this;
71 }
72
73 public Args stopTimeoutVal(int n) {
74 stopTimeoutVal = n;
75 return this;
76 }
77
78 public Args stopTimeoutUnit(TimeUnit tu) {
79 stopTimeoutUnit = tu;
80 return this;
81 }
82
83 public Args requestTimeout(int n) {
84 requestTimeout = n;
85 return this;
86 }
87
88 public Args requestTimeoutUnit(TimeUnit tu) {
89 requestTimeoutUnit = tu;
90 return this;
91 }
92 //Binary exponential backoff slot length
93 public Args beBackoffSlotLength(int n) {
94 beBackoffSlotLength = n;
95 return this;
96 }
97
98 //Binary exponential backoff slot time unit
99 public Args beBackoffSlotLengthUnit(TimeUnit tu) {
100 beBackoffSlotLengthUnit = tu;
101 return this;
102 }
103
104 public Args executorService(ExecutorService executorService) {
105 this.executorService = executorService;
106 return this;
107 }
108 }
109
110 // Executor service for handling client connections
111 private ExecutorService executorService_;
112
113 private final TimeUnit stopTimeoutUnit;
114
115 private final long stopTimeoutVal;
116
117 private final TimeUnit requestTimeoutUnit;
118
119 private final long requestTimeout;
120
121 private final long beBackoffSlotInMillis;
122
123 private Random random = new Random(System.currentTimeMillis());
124
125 public TThreadPoolServer(Args args) {
126 super(args);
127
128 stopTimeoutUnit = args.stopTimeoutUnit;
129 stopTimeoutVal = args.stopTimeoutVal;
130 requestTimeoutUnit = args.requestTimeoutUnit;
131 requestTimeout = args.requestTimeout;
132 beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength);
133
134 executorService_ = args.executorService != null ?
135 args.executorService : createDefaultExecutorService(args);
136 }
137
138 private static ExecutorService createDefaultExecutorService(Args args) {
139 SynchronousQueue<Runnable> executorQueue =
140 new SynchronousQueue<Runnable>();
141 return new ThreadPoolExecutor(args.minWorkerThreads,
142 args.maxWorkerThreads,
143 args.stopTimeoutVal,
144 args.stopTimeoutUnit,
145 executorQueue);
146 }
147
148 protected ExecutorService getExecutorService() {
149 return executorService_;
150 }
151
152 protected boolean preServe() {
153 try {
154 serverTransport_.listen();
155 } catch (TTransportException ttx) {
156 LOGGER.error("Error occurred during listening.", ttx);
157 return false;
158 }
159
160 // Run the preServe event
161 if (eventHandler_ != null) {
162 eventHandler_.preServe();
163 }
164 stopped_ = false;
165 setServing(true);
166
167 return true;
168 }
169
170 public void serve() {
171 if (!preServe()) {
172 return;
173 }
174
175 execute();
176 waitForShutdown();
177
178 setServing(false);
179 }
180
181 protected void execute() {
182 int failureCount = 0;
183 while (!stopped_) {
184 try {
185 TTransport client = serverTransport_.accept();
186 WorkerProcess wp = new WorkerProcess(client);
187
188 int retryCount = 0;
189 long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
190 while(true) {
191 try {
192 executorService_.execute(wp);
193 break;
194 } catch(Throwable t) {
195 if (t instanceof RejectedExecutionException) {
196 retryCount++;
197 try {
198 if (remainTimeInMillis > 0) {
199 //do a truncated 20 binary exponential backoff sleep
200 long sleepTimeInMillis = ((long) (random.nextDouble() *
201 (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
202 sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
203 TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
204 remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
205 } else {
206 client.close();
207 wp = null;
208 LOGGER.warn("Task has been rejected by ExecutorService " + retryCount
209 + " times till timedout, reason: " + t);
210 break;
211 }
212 } catch (InterruptedException e) {
213 LOGGER.warn("Interrupted while waiting to place client on executor queue.");
214 Thread.currentThread().interrupt();
215 break;
216 }
217 } else if (t instanceof Error) {
218 LOGGER.error("ExecutorService threw error: " + t, t);
219 throw (Error)t;
220 } else {
221 //for other possible runtime errors from ExecutorService, should also not kill serve
222 LOGGER.warn("ExecutorService threw error: " + t, t);
223 break;
224 }
225 }
226 }
227 } catch (TTransportException ttx) {
228 if (!stopped_) {
229 ++failureCount;
230 LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
231 }
232 }
233 }
234 }
235
236 protected void waitForShutdown() {
237 executorService_.shutdown();
238
239 // Loop until awaitTermination finally does return without a interrupted
240 // exception. If we don't do this, then we'll shut down prematurely. We want
241 // to let the executorService clear it's task queue, closing client sockets
242 // appropriately.
243 long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
244 long now = System.currentTimeMillis();
245 while (timeoutMS >= 0) {
246 try {
247 executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
248 break;
249 } catch (InterruptedException ix) {
250 long newnow = System.currentTimeMillis();
251 timeoutMS -= (newnow - now);
252 now = newnow;
253 }
254 }
255 }
256
257 public void stop() {
258 stopped_ = true;
259 serverTransport_.interrupt();
260 }
261
262 private class WorkerProcess implements Runnable {
263
264 /**
265 * Client that this services.
266 */
267 private TTransport client_;
268
269 /**
270 * Default constructor.
271 *
272 * @param client Transport to process
273 */
274 private WorkerProcess(TTransport client) {
275 client_ = client;
276 }
277
278 /**
279 * Loops on processing a client forever
280 */
281 public void run() {
282 TProcessor processor = null;
283 TTransport inputTransport = null;
284 TTransport outputTransport = null;
285 TProtocol inputProtocol = null;
286 TProtocol outputProtocol = null;
287
288 TServerEventHandler eventHandler = null;
289 ServerContext connectionContext = null;
290
291 try {
292 processor = processorFactory_.getProcessor(client_);
293 inputTransport = inputTransportFactory_.getTransport(client_);
294 outputTransport = outputTransportFactory_.getTransport(client_);
295 inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
296 outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
297
298 eventHandler = getEventHandler();
299 if (eventHandler != null) {
300 connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);
301 }
302 // we check stopped_ first to make sure we're not supposed to be shutting
303 // down. this is necessary for graceful shutdown.
304 while (true) {
305
306 if (eventHandler != null) {
307 eventHandler.processContext(connectionContext, inputTransport, outputTransport);
308 }
309
310 if (stopped_) {
311 break;
312 }
313 processor.process(inputProtocol, outputProtocol);
314 }
315 } catch (Exception x) {
316 // We'll usually receive RuntimeException types here
317 // Need to unwrap to ascertain real causing exception before we choose to ignore
318 // Ignore err-logging all transport-level/type exceptions
319 if (!isIgnorableException(x)) {
320 // Log the exception at error level and continue
321 LOGGER.error((x instanceof TException? "Thrift " : "") + "Error occurred during processing of message.", x);
322 }
323 } finally {
324 if (eventHandler != null) {
325 eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);
326 }
327 if (inputTransport != null) {
328 inputTransport.close();
329 }
330 if (outputTransport != null) {
331 outputTransport.close();
332 }
333 if (client_.isOpen()) {
334 client_.close();
335 }
336 }
337 }
338
339 private boolean isIgnorableException(Exception x) {
340 TTransportException tTransportException = null;
341
342 if (x instanceof TTransportException) {
343 tTransportException = (TTransportException)x;
344 }
345 else if (x.getCause() instanceof TTransportException) {
346 tTransportException = (TTransportException)x.getCause();
347 }
348
349 if (tTransportException != null) {
350 switch(tTransportException.getType()) {
351 case TTransportException.END_OF_FILE:
352 case TTransportException.TIMED_OUT:
353 return true;
354 }
355 }
356 return false;
357 }
358 }
359 }