]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / java / src / org / apache / thrift / server / TThreadedSelectorServer.java
CommitLineData
f67539c2
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package org.apache.thrift.server;
21
22import org.apache.thrift.transport.TNonblockingServerTransport;
23import org.apache.thrift.transport.TNonblockingTransport;
24import org.apache.thrift.transport.TTransportException;
25import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27
28import java.io.IOException;
29import java.nio.channels.ClosedChannelException;
30import java.nio.channels.SelectableChannel;
31import java.nio.channels.SelectionKey;
32import java.nio.channels.Selector;
33import java.nio.channels.spi.SelectorProvider;
34import java.util.ArrayList;
35import java.util.Collection;
36import java.util.Collections;
37import java.util.HashSet;
38import java.util.Iterator;
39import java.util.Set;
40import java.util.concurrent.ArrayBlockingQueue;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ExecutorService;
43import java.util.concurrent.Executors;
44import java.util.concurrent.LinkedBlockingQueue;
45import java.util.concurrent.RejectedExecutionException;
46import java.util.concurrent.TimeUnit;
47
48/**
49 * A Half-Sync/Half-Async server with a separate pool of threads to handle
50 * non-blocking I/O. Accepts are handled on a single thread, and a configurable
51 * number of nonblocking selector threads manage reading and writing of client
52 * connections. A synchronous worker thread pool handles processing of requests.
53 *
54 * Performs better than TNonblockingServer/THsHaServer in multi-core
55 * environments when the the bottleneck is CPU on the single selector thread
56 * handling I/O. In addition, because the accept handling is decoupled from
57 * reads/writes and invocation, the server has better ability to handle back-
58 * pressure from new connections (e.g. stop accepting when busy).
59 *
60 * Like TNonblockingServer, it relies on the use of TFramedTransport.
61 */
62public class TThreadedSelectorServer extends AbstractNonblockingServer {
63 private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName());
64
65 public static class Args extends AbstractNonblockingServerArgs<Args> {
66
67 /** The number of threads for selecting on already-accepted connections */
68 public int selectorThreads = 2;
69 /**
70 * The size of the executor service (if none is specified) that will handle
71 * invocations. This may be set to 0, in which case invocations will be
72 * handled directly on the selector threads (as is in TNonblockingServer)
73 */
74 private int workerThreads = 5;
75 /** Time to wait for server to stop gracefully */
76 private int stopTimeoutVal = 60;
77 private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
78 /** The ExecutorService for handling dispatched requests */
79 private ExecutorService executorService = null;
80 /**
81 * The size of the blocking queue per selector thread for passing accepted
82 * connections to the selector thread
83 */
84 private int acceptQueueSizePerThread = 4;
85
86 /**
87 * Determines the strategy for handling new accepted connections.
88 */
89 public static enum AcceptPolicy {
90 /**
91 * Require accepted connection registration to be handled by the executor.
92 * If the worker pool is saturated, further accepts will be closed
93 * immediately. Slightly increases latency due to an extra scheduling.
94 */
95 FAIR_ACCEPT,
96 /**
97 * Handle the accepts as fast as possible, disregarding the status of the
98 * executor service.
99 */
100 FAST_ACCEPT
101 }
102
103 private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;
104
105 public Args(TNonblockingServerTransport transport) {
106 super(transport);
107 }
108
109 public Args selectorThreads(int i) {
110 selectorThreads = i;
111 return this;
112 }
113
114 public int getSelectorThreads() {
115 return selectorThreads;
116 }
117
118 public Args workerThreads(int i) {
119 workerThreads = i;
120 return this;
121 }
122
123 public int getWorkerThreads() {
124 return workerThreads;
125 }
126
127 public int getStopTimeoutVal() {
128 return stopTimeoutVal;
129 }
130
131 public Args stopTimeoutVal(int stopTimeoutVal) {
132 this.stopTimeoutVal = stopTimeoutVal;
133 return this;
134 }
135
136 public TimeUnit getStopTimeoutUnit() {
137 return stopTimeoutUnit;
138 }
139
140 public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
141 this.stopTimeoutUnit = stopTimeoutUnit;
142 return this;
143 }
144
145 public ExecutorService getExecutorService() {
146 return executorService;
147 }
148
149 public Args executorService(ExecutorService executorService) {
150 this.executorService = executorService;
151 return this;
152 }
153
154 public int getAcceptQueueSizePerThread() {
155 return acceptQueueSizePerThread;
156 }
157
158 public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) {
159 this.acceptQueueSizePerThread = acceptQueueSizePerThread;
160 return this;
161 }
162
163 public AcceptPolicy getAcceptPolicy() {
164 return acceptPolicy;
165 }
166
167 public Args acceptPolicy(AcceptPolicy acceptPolicy) {
168 this.acceptPolicy = acceptPolicy;
169 return this;
170 }
171
172 public void validate() {
173 if (selectorThreads <= 0) {
174 throw new IllegalArgumentException("selectorThreads must be positive.");
175 }
176 if (workerThreads < 0) {
177 throw new IllegalArgumentException("workerThreads must be non-negative.");
178 }
179 if (acceptQueueSizePerThread <= 0) {
180 throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
181 }
182 }
183 }
184
185 // The thread handling all accepts
186 private AcceptThread acceptThread;
187
188 // Threads handling events on client transports
189 private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
190
191 // This wraps all the functionality of queueing and thread pool management
192 // for the passing of Invocations from the selector thread(s) to the workers
193 // (if any).
194 private final ExecutorService invoker;
195
196 private final Args args;
197
198 /**
199 * Create the server with the specified Args configuration
200 */
201 public TThreadedSelectorServer(Args args) {
202 super(args);
203 args.validate();
204 invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
205 this.args = args;
206 }
207
208 /**
209 * Start the accept and selector threads running to deal with clients.
210 *
211 * @return true if everything went ok, false if we couldn't start for some
212 * reason.
213 */
214 @Override
215 protected boolean startThreads() {
216 try {
217 for (int i = 0; i < args.selectorThreads; ++i) {
218 selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
219 }
220 acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
221 createSelectorThreadLoadBalancer(selectorThreads));
222 for (SelectorThread thread : selectorThreads) {
223 thread.start();
224 }
225 acceptThread.start();
226 return true;
227 } catch (IOException e) {
228 LOGGER.error("Failed to start threads!", e);
229 return false;
230 }
231 }
232
233 /**
234 * Joins the accept and selector threads and shuts down the executor service.
235 */
236 @Override
237 protected void waitForShutdown() {
238 try {
239 joinThreads();
240 } catch (InterruptedException e) {
241 // Non-graceful shutdown occurred
242 LOGGER.error("Interrupted while joining threads!", e);
243 }
244 gracefullyShutdownInvokerPool();
245 }
246
247 protected void joinThreads() throws InterruptedException {
248 // wait until the io threads exit
249 acceptThread.join();
250 for (SelectorThread thread : selectorThreads) {
251 thread.join();
252 }
253 }
254
255 /**
256 * Stop serving and shut everything down.
257 */
258 @Override
259 public void stop() {
260 stopped_ = true;
261
262 // Stop queuing connect attempts asap
263 stopListening();
264
265 if (acceptThread != null) {
266 acceptThread.wakeupSelector();
267 }
268 if (selectorThreads != null) {
269 for (SelectorThread thread : selectorThreads) {
270 if (thread != null)
271 thread.wakeupSelector();
272 }
273 }
274 }
275
276 protected void gracefullyShutdownInvokerPool() {
277 // try to gracefully shut down the executor service
278 invoker.shutdown();
279
280 // Loop until awaitTermination finally does return without a interrupted
281 // exception. If we don't do this, then we'll shut down prematurely. We want
282 // to let the executorService clear it's task queue, closing client sockets
283 // appropriately.
284 long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
285 long now = System.currentTimeMillis();
286 while (timeoutMS >= 0) {
287 try {
288 invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
289 break;
290 } catch (InterruptedException ix) {
291 long newnow = System.currentTimeMillis();
292 timeoutMS -= (newnow - now);
293 now = newnow;
294 }
295 }
296 }
297
298 /**
299 * We override the standard invoke method here to queue the invocation for
300 * invoker service instead of immediately invoking. If there is no thread
301 * pool, handle the invocation inline on this thread
302 */
303 @Override
304 protected boolean requestInvoke(FrameBuffer frameBuffer) {
305 Runnable invocation = getRunnable(frameBuffer);
306 if (invoker != null) {
307 try {
308 invoker.execute(invocation);
309 return true;
310 } catch (RejectedExecutionException rx) {
311 LOGGER.warn("ExecutorService rejected execution!", rx);
312 return false;
313 }
314 } else {
315 // Invoke on the caller's thread
316 invocation.run();
317 return true;
318 }
319 }
320
321 protected Runnable getRunnable(FrameBuffer frameBuffer) {
322 return new Invocation(frameBuffer);
323 }
324
325 /**
326 * Helper to create the invoker if one is not specified
327 */
328 protected static ExecutorService createDefaultExecutor(Args options) {
329 return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null;
330 }
331
332 private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) {
333 if (queueSize == 0) {
334 // Unbounded queue
335 return new LinkedBlockingQueue<TNonblockingTransport>();
336 }
337 return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
338 }
339
340 /**
341 * The thread that selects on the server transport (listen socket) and accepts
342 * new connections to hand off to the IO selector threads
343 */
344 protected class AcceptThread extends Thread {
345
346 // The listen socket to accept on
347 private final TNonblockingServerTransport serverTransport;
348 private final Selector acceptSelector;
349
350 private final SelectorThreadLoadBalancer threadChooser;
351
352 /**
353 * Set up the AcceptThead
354 *
355 * @throws IOException
356 */
357 public AcceptThread(TNonblockingServerTransport serverTransport,
358 SelectorThreadLoadBalancer threadChooser) throws IOException {
359 this.serverTransport = serverTransport;
360 this.threadChooser = threadChooser;
361 this.acceptSelector = SelectorProvider.provider().openSelector();
362 this.serverTransport.registerSelector(acceptSelector);
363 }
364
365 /**
366 * The work loop. Selects on the server transport and accepts. If there was
367 * a server transport that had blocking accepts, and returned on blocking
368 * client transports, that should be used instead
369 */
370 public void run() {
371 try {
372 if (eventHandler_ != null) {
373 eventHandler_.preServe();
374 }
375
376 while (!stopped_) {
377 select();
378 }
379 } catch (Throwable t) {
380 LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
381 } finally {
382 try {
383 acceptSelector.close();
384 } catch (IOException e) {
385 LOGGER.error("Got an IOException while closing accept selector!", e);
386 }
387 // This will wake up the selector threads
388 TThreadedSelectorServer.this.stop();
389 }
390 }
391
392 /**
393 * If the selector is blocked, wake it up.
394 */
395 public void wakeupSelector() {
396 acceptSelector.wakeup();
397 }
398
399 /**
400 * Select and process IO events appropriately: If there are connections to
401 * be accepted, accept them.
402 */
403 private void select() {
404 try {
405 // wait for connect events.
406 acceptSelector.select();
407
408 // process the io events we received
409 Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
410 while (!stopped_ && selectedKeys.hasNext()) {
411 SelectionKey key = selectedKeys.next();
412 selectedKeys.remove();
413
414 // skip if not valid
415 if (!key.isValid()) {
416 continue;
417 }
418
419 if (key.isAcceptable()) {
420 handleAccept();
421 } else {
422 LOGGER.warn("Unexpected state in select! " + key.interestOps());
423 }
424 }
425 } catch (IOException e) {
426 LOGGER.warn("Got an IOException while selecting!", e);
427 }
428 }
429
430 /**
431 * Accept a new connection.
432 */
433 private void handleAccept() {
434 final TNonblockingTransport client = doAccept();
435 if (client != null) {
436 // Pass this connection to a selector thread
437 final SelectorThread targetThread = threadChooser.nextThread();
438
439 if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
440 doAddAccept(targetThread, client);
441 } else {
442 // FAIR_ACCEPT
443 try {
444 invoker.submit(new Runnable() {
445 public void run() {
446 doAddAccept(targetThread, client);
447 }
448 });
449 } catch (RejectedExecutionException rx) {
450 LOGGER.warn("ExecutorService rejected accept registration!", rx);
451 // close immediately
452 client.close();
453 }
454 }
455 }
456 }
457
458 private TNonblockingTransport doAccept() {
459 try {
460 return (TNonblockingTransport) serverTransport.accept();
461 } catch (TTransportException tte) {
462 // something went wrong accepting.
463 LOGGER.warn("Exception trying to accept!", tte);
464 return null;
465 }
466 }
467
468 private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
469 if (!thread.addAcceptedConnection(client)) {
470 client.close();
471 }
472 }
473 } // AcceptThread
474
475 /**
476 * The SelectorThread(s) will be doing all the selecting on accepted active
477 * connections.
478 */
479 protected class SelectorThread extends AbstractSelectThread {
480
481 // Accepted connections added by the accept thread.
482 private final BlockingQueue<TNonblockingTransport> acceptedQueue;
483 private int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
484 private long MONITOR_PERIOD = 1000L;
485 private int jvmBug = 0;
486
487 /**
488 * Set up the SelectorThread with an unbounded queue for incoming accepts.
489 *
490 * @throws IOException
491 * if a selector cannot be created
492 */
493 public SelectorThread() throws IOException {
494 this(new LinkedBlockingQueue<TNonblockingTransport>());
495 }
496
497 /**
498 * Set up the SelectorThread with an bounded queue for incoming accepts.
499 *
500 * @throws IOException
501 * if a selector cannot be created
502 */
503 public SelectorThread(int maxPendingAccepts) throws IOException {
504 this(createDefaultAcceptQueue(maxPendingAccepts));
505 }
506
507 /**
508 * Set up the SelectorThread with a specified queue for connections.
509 *
510 * @param acceptedQueue
511 * The BlockingQueue implementation for holding incoming accepted
512 * connections.
513 * @throws IOException
514 * if a selector cannot be created.
515 */
516 public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException {
517 this.acceptedQueue = acceptedQueue;
518 }
519
520 /**
521 * Hands off an accepted connection to be handled by this thread. This
522 * method will block if the queue for new connections is at capacity.
523 *
524 * @param accepted
525 * The connection that has been accepted.
526 * @return true if the connection has been successfully added.
527 */
528 public boolean addAcceptedConnection(TNonblockingTransport accepted) {
529 try {
530 acceptedQueue.put(accepted);
531 } catch (InterruptedException e) {
532 LOGGER.warn("Interrupted while adding accepted connection!", e);
533 return false;
534 }
535 selector.wakeup();
536 return true;
537 }
538
539 /**
540 * The work loop. Handles selecting (read/write IO), dispatching, and
541 * managing the selection preferences of all existing connections.
542 */
543 public void run() {
544 try {
545 while (!stopped_) {
546 select();
547 processAcceptedConnections();
548 processInterestChanges();
549 }
550 for (SelectionKey selectionKey : selector.keys()) {
551 cleanupSelectionKey(selectionKey);
552 }
553 } catch (Throwable t) {
554 LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
555 } finally {
556 try {
557 selector.close();
558 } catch (IOException e) {
559 LOGGER.error("Got an IOException while closing selector!", e);
560 }
561 // This will wake up the accept thread and the other selector threads
562 TThreadedSelectorServer.this.stop();
563 }
564 }
565
566 /**
567 * Select and process IO events appropriately: If there are existing
568 * connections with data waiting to be read, read it, buffering until a
569 * whole frame has been read. If there are any pending responses, buffer
570 * them until their target client is available, and then send the data.
571 */
572 private void select() {
573 try {
574
575 doSelect();
576
577 // process the io events we received
578 Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
579 while (!stopped_ && selectedKeys.hasNext()) {
580 SelectionKey key = selectedKeys.next();
581 selectedKeys.remove();
582
583 // skip if not valid
584 if (!key.isValid()) {
585 cleanupSelectionKey(key);
586 continue;
587 }
588
589 if (key.isReadable()) {
590 // deal with reads
591 handleRead(key);
592 } else if (key.isWritable()) {
593 // deal with writes
594 handleWrite(key);
595 } else {
596 LOGGER.warn("Unexpected state in select! " + key.interestOps());
597 }
598 }
599 } catch (IOException e) {
600 LOGGER.warn("Got an IOException while selecting!", e);
601 }
602 }
603
604 /**
605 * Do select and judge epoll bug happen.
606 * See : https://issues.apache.org/jira/browse/THRIFT-4251
607 */
608 private void doSelect() throws IOException {
609 long beforeSelect = System.currentTimeMillis();
610 int selectedNums = selector.select();
611 long afterSelect = System.currentTimeMillis();
612
613 if (selectedNums == 0) {
614 jvmBug++;
615 } else {
616 jvmBug = 0;
617 }
618
619 long selectedTime = afterSelect - beforeSelect;
620 if (selectedTime >= MONITOR_PERIOD) {
621 jvmBug = 0;
622 } else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
623 LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug);
624 rebuildSelector();
625 selector.selectNow();
626 jvmBug = 0;
627 }
628
629 }
630
631 /**
632 * Replaces the current Selector of this SelectorThread with newly created Selector to work
633 * around the infamous epoll 100% CPU bug.
634 */
635 private synchronized void rebuildSelector() {
636 final Selector oldSelector = selector;
637 if (oldSelector == null) {
638 return;
639 }
640 Selector newSelector = null;
641 try {
642 newSelector = Selector.open();
643 LOGGER.warn("Created new Selector.");
644 } catch (IOException e) {
645 LOGGER.error("Create new Selector error.", e);
646 }
647
648 for (SelectionKey key : oldSelector.selectedKeys()) {
649 if (!key.isValid() && key.readyOps() == 0)
650 continue;
651 SelectableChannel channel = key.channel();
652 Object attachment = key.attachment();
653
654 try {
655 if (attachment == null) {
656 channel.register(newSelector, key.readyOps());
657 } else {
658 channel.register(newSelector, key.readyOps(), attachment);
659 }
660 } catch (ClosedChannelException e) {
661 LOGGER.error("Register new selector key error.", e);
662 }
663
664 }
665
666 selector = newSelector;
667 try {
668 oldSelector.close();
669 } catch (IOException e) {
670 LOGGER.error("Close old selector error.", e);
671 }
672 LOGGER.warn("Replace new selector success.");
673 }
674
675 private void processAcceptedConnections() {
676 // Register accepted connections
677 while (!stopped_) {
678 TNonblockingTransport accepted = acceptedQueue.poll();
679 if (accepted == null) {
680 break;
681 }
682 registerAccepted(accepted);
683 }
684 }
685
686 protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
687 final SelectionKey selectionKey,
688 final AbstractSelectThread selectThread) {
689 return processorFactory_.isAsyncProcessor() ?
690 new AsyncFrameBuffer(trans, selectionKey, selectThread) :
691 new FrameBuffer(trans, selectionKey, selectThread);
692 }
693
694 private void registerAccepted(TNonblockingTransport accepted) {
695 SelectionKey clientKey = null;
696 try {
697 clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
698
699 FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);
700
701 clientKey.attach(frameBuffer);
702 } catch (IOException e) {
703 LOGGER.warn("Failed to register accepted connection to selector!", e);
704 if (clientKey != null) {
705 cleanupSelectionKey(clientKey);
706 }
707 accepted.close();
708 }
709 }
710 } // SelectorThread
711
712 /**
713 * Creates a SelectorThreadLoadBalancer to be used by the accept thread for
714 * assigning newly accepted connections across the threads.
715 */
716 protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
717 return new SelectorThreadLoadBalancer(threads);
718 }
719
720 /**
721 * A round robin load balancer for choosing selector threads for new
722 * connections.
723 */
724 protected static class SelectorThreadLoadBalancer {
725 private final Collection<? extends SelectorThread> threads;
726 private Iterator<? extends SelectorThread> nextThreadIterator;
727
728 public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
729 if (threads.isEmpty()) {
730 throw new IllegalArgumentException("At least one selector thread is required");
731 }
732 this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
733 nextThreadIterator = this.threads.iterator();
734 }
735
736 public SelectorThread nextThread() {
737 // Choose a selector thread (round robin)
738 if (!nextThreadIterator.hasNext()) {
739 nextThreadIterator = threads.iterator();
740 }
741 return nextThreadIterator.next();
742 }
743 }
744}