]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | package org.apache.thrift.server; | |
21 | ||
22 | import org.apache.thrift.transport.TNonblockingServerTransport; | |
23 | import org.apache.thrift.transport.TNonblockingTransport; | |
24 | import org.apache.thrift.transport.TTransportException; | |
25 | import org.slf4j.Logger; | |
26 | import org.slf4j.LoggerFactory; | |
27 | ||
28 | import java.io.IOException; | |
29 | import java.nio.channels.ClosedChannelException; | |
30 | import java.nio.channels.SelectableChannel; | |
31 | import java.nio.channels.SelectionKey; | |
32 | import java.nio.channels.Selector; | |
33 | import java.nio.channels.spi.SelectorProvider; | |
34 | import java.util.ArrayList; | |
35 | import java.util.Collection; | |
36 | import java.util.Collections; | |
37 | import java.util.HashSet; | |
38 | import java.util.Iterator; | |
39 | import java.util.Set; | |
40 | import java.util.concurrent.ArrayBlockingQueue; | |
41 | import java.util.concurrent.BlockingQueue; | |
42 | import java.util.concurrent.ExecutorService; | |
43 | import java.util.concurrent.Executors; | |
44 | import java.util.concurrent.LinkedBlockingQueue; | |
45 | import java.util.concurrent.RejectedExecutionException; | |
46 | import java.util.concurrent.TimeUnit; | |
47 | ||
48 | /** | |
49 | * A Half-Sync/Half-Async server with a separate pool of threads to handle | |
50 | * non-blocking I/O. Accepts are handled on a single thread, and a configurable | |
51 | * number of nonblocking selector threads manage reading and writing of client | |
52 | * connections. A synchronous worker thread pool handles processing of requests. | |
53 | * | |
54 | * Performs better than TNonblockingServer/THsHaServer in multi-core | |
55 | * environments when the the bottleneck is CPU on the single selector thread | |
56 | * handling I/O. In addition, because the accept handling is decoupled from | |
57 | * reads/writes and invocation, the server has better ability to handle back- | |
58 | * pressure from new connections (e.g. stop accepting when busy). | |
59 | * | |
60 | * Like TNonblockingServer, it relies on the use of TFramedTransport. | |
61 | */ | |
62 | public class TThreadedSelectorServer extends AbstractNonblockingServer { | |
63 | private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName()); | |
64 | ||
65 | public static class Args extends AbstractNonblockingServerArgs<Args> { | |
66 | ||
67 | /** The number of threads for selecting on already-accepted connections */ | |
68 | public int selectorThreads = 2; | |
69 | /** | |
70 | * The size of the executor service (if none is specified) that will handle | |
71 | * invocations. This may be set to 0, in which case invocations will be | |
72 | * handled directly on the selector threads (as is in TNonblockingServer) | |
73 | */ | |
74 | private int workerThreads = 5; | |
75 | /** Time to wait for server to stop gracefully */ | |
76 | private int stopTimeoutVal = 60; | |
77 | private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; | |
78 | /** The ExecutorService for handling dispatched requests */ | |
79 | private ExecutorService executorService = null; | |
80 | /** | |
81 | * The size of the blocking queue per selector thread for passing accepted | |
82 | * connections to the selector thread | |
83 | */ | |
84 | private int acceptQueueSizePerThread = 4; | |
85 | ||
86 | /** | |
87 | * Determines the strategy for handling new accepted connections. | |
88 | */ | |
89 | public static enum AcceptPolicy { | |
90 | /** | |
91 | * Require accepted connection registration to be handled by the executor. | |
92 | * If the worker pool is saturated, further accepts will be closed | |
93 | * immediately. Slightly increases latency due to an extra scheduling. | |
94 | */ | |
95 | FAIR_ACCEPT, | |
96 | /** | |
97 | * Handle the accepts as fast as possible, disregarding the status of the | |
98 | * executor service. | |
99 | */ | |
100 | FAST_ACCEPT | |
101 | } | |
102 | ||
103 | private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT; | |
104 | ||
105 | public Args(TNonblockingServerTransport transport) { | |
106 | super(transport); | |
107 | } | |
108 | ||
109 | public Args selectorThreads(int i) { | |
110 | selectorThreads = i; | |
111 | return this; | |
112 | } | |
113 | ||
114 | public int getSelectorThreads() { | |
115 | return selectorThreads; | |
116 | } | |
117 | ||
118 | public Args workerThreads(int i) { | |
119 | workerThreads = i; | |
120 | return this; | |
121 | } | |
122 | ||
123 | public int getWorkerThreads() { | |
124 | return workerThreads; | |
125 | } | |
126 | ||
127 | public int getStopTimeoutVal() { | |
128 | return stopTimeoutVal; | |
129 | } | |
130 | ||
131 | public Args stopTimeoutVal(int stopTimeoutVal) { | |
132 | this.stopTimeoutVal = stopTimeoutVal; | |
133 | return this; | |
134 | } | |
135 | ||
136 | public TimeUnit getStopTimeoutUnit() { | |
137 | return stopTimeoutUnit; | |
138 | } | |
139 | ||
140 | public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) { | |
141 | this.stopTimeoutUnit = stopTimeoutUnit; | |
142 | return this; | |
143 | } | |
144 | ||
145 | public ExecutorService getExecutorService() { | |
146 | return executorService; | |
147 | } | |
148 | ||
149 | public Args executorService(ExecutorService executorService) { | |
150 | this.executorService = executorService; | |
151 | return this; | |
152 | } | |
153 | ||
154 | public int getAcceptQueueSizePerThread() { | |
155 | return acceptQueueSizePerThread; | |
156 | } | |
157 | ||
158 | public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) { | |
159 | this.acceptQueueSizePerThread = acceptQueueSizePerThread; | |
160 | return this; | |
161 | } | |
162 | ||
163 | public AcceptPolicy getAcceptPolicy() { | |
164 | return acceptPolicy; | |
165 | } | |
166 | ||
167 | public Args acceptPolicy(AcceptPolicy acceptPolicy) { | |
168 | this.acceptPolicy = acceptPolicy; | |
169 | return this; | |
170 | } | |
171 | ||
172 | public void validate() { | |
173 | if (selectorThreads <= 0) { | |
174 | throw new IllegalArgumentException("selectorThreads must be positive."); | |
175 | } | |
176 | if (workerThreads < 0) { | |
177 | throw new IllegalArgumentException("workerThreads must be non-negative."); | |
178 | } | |
179 | if (acceptQueueSizePerThread <= 0) { | |
180 | throw new IllegalArgumentException("acceptQueueSizePerThread must be positive."); | |
181 | } | |
182 | } | |
183 | } | |
184 | ||
185 | // The thread handling all accepts | |
186 | private AcceptThread acceptThread; | |
187 | ||
188 | // Threads handling events on client transports | |
189 | private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>(); | |
190 | ||
191 | // This wraps all the functionality of queueing and thread pool management | |
192 | // for the passing of Invocations from the selector thread(s) to the workers | |
193 | // (if any). | |
194 | private final ExecutorService invoker; | |
195 | ||
196 | private final Args args; | |
197 | ||
198 | /** | |
199 | * Create the server with the specified Args configuration | |
200 | */ | |
201 | public TThreadedSelectorServer(Args args) { | |
202 | super(args); | |
203 | args.validate(); | |
204 | invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService; | |
205 | this.args = args; | |
206 | } | |
207 | ||
208 | /** | |
209 | * Start the accept and selector threads running to deal with clients. | |
210 | * | |
211 | * @return true if everything went ok, false if we couldn't start for some | |
212 | * reason. | |
213 | */ | |
214 | @Override | |
215 | protected boolean startThreads() { | |
216 | try { | |
217 | for (int i = 0; i < args.selectorThreads; ++i) { | |
218 | selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread)); | |
219 | } | |
220 | acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, | |
221 | createSelectorThreadLoadBalancer(selectorThreads)); | |
222 | for (SelectorThread thread : selectorThreads) { | |
223 | thread.start(); | |
224 | } | |
225 | acceptThread.start(); | |
226 | return true; | |
227 | } catch (IOException e) { | |
228 | LOGGER.error("Failed to start threads!", e); | |
229 | return false; | |
230 | } | |
231 | } | |
232 | ||
233 | /** | |
234 | * Joins the accept and selector threads and shuts down the executor service. | |
235 | */ | |
236 | @Override | |
237 | protected void waitForShutdown() { | |
238 | try { | |
239 | joinThreads(); | |
240 | } catch (InterruptedException e) { | |
241 | // Non-graceful shutdown occurred | |
242 | LOGGER.error("Interrupted while joining threads!", e); | |
243 | } | |
244 | gracefullyShutdownInvokerPool(); | |
245 | } | |
246 | ||
247 | protected void joinThreads() throws InterruptedException { | |
248 | // wait until the io threads exit | |
249 | acceptThread.join(); | |
250 | for (SelectorThread thread : selectorThreads) { | |
251 | thread.join(); | |
252 | } | |
253 | } | |
254 | ||
255 | /** | |
256 | * Stop serving and shut everything down. | |
257 | */ | |
258 | @Override | |
259 | public void stop() { | |
260 | stopped_ = true; | |
261 | ||
262 | // Stop queuing connect attempts asap | |
263 | stopListening(); | |
264 | ||
265 | if (acceptThread != null) { | |
266 | acceptThread.wakeupSelector(); | |
267 | } | |
268 | if (selectorThreads != null) { | |
269 | for (SelectorThread thread : selectorThreads) { | |
270 | if (thread != null) | |
271 | thread.wakeupSelector(); | |
272 | } | |
273 | } | |
274 | } | |
275 | ||
276 | protected void gracefullyShutdownInvokerPool() { | |
277 | // try to gracefully shut down the executor service | |
278 | invoker.shutdown(); | |
279 | ||
280 | // Loop until awaitTermination finally does return without a interrupted | |
281 | // exception. If we don't do this, then we'll shut down prematurely. We want | |
282 | // to let the executorService clear it's task queue, closing client sockets | |
283 | // appropriately. | |
284 | long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal); | |
285 | long now = System.currentTimeMillis(); | |
286 | while (timeoutMS >= 0) { | |
287 | try { | |
288 | invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); | |
289 | break; | |
290 | } catch (InterruptedException ix) { | |
291 | long newnow = System.currentTimeMillis(); | |
292 | timeoutMS -= (newnow - now); | |
293 | now = newnow; | |
294 | } | |
295 | } | |
296 | } | |
297 | ||
298 | /** | |
299 | * We override the standard invoke method here to queue the invocation for | |
300 | * invoker service instead of immediately invoking. If there is no thread | |
301 | * pool, handle the invocation inline on this thread | |
302 | */ | |
303 | @Override | |
304 | protected boolean requestInvoke(FrameBuffer frameBuffer) { | |
305 | Runnable invocation = getRunnable(frameBuffer); | |
306 | if (invoker != null) { | |
307 | try { | |
308 | invoker.execute(invocation); | |
309 | return true; | |
310 | } catch (RejectedExecutionException rx) { | |
311 | LOGGER.warn("ExecutorService rejected execution!", rx); | |
312 | return false; | |
313 | } | |
314 | } else { | |
315 | // Invoke on the caller's thread | |
316 | invocation.run(); | |
317 | return true; | |
318 | } | |
319 | } | |
320 | ||
321 | protected Runnable getRunnable(FrameBuffer frameBuffer) { | |
322 | return new Invocation(frameBuffer); | |
323 | } | |
324 | ||
325 | /** | |
326 | * Helper to create the invoker if one is not specified | |
327 | */ | |
328 | protected static ExecutorService createDefaultExecutor(Args options) { | |
329 | return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null; | |
330 | } | |
331 | ||
332 | private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) { | |
333 | if (queueSize == 0) { | |
334 | // Unbounded queue | |
335 | return new LinkedBlockingQueue<TNonblockingTransport>(); | |
336 | } | |
337 | return new ArrayBlockingQueue<TNonblockingTransport>(queueSize); | |
338 | } | |
339 | ||
340 | /** | |
341 | * The thread that selects on the server transport (listen socket) and accepts | |
342 | * new connections to hand off to the IO selector threads | |
343 | */ | |
344 | protected class AcceptThread extends Thread { | |
345 | ||
346 | // The listen socket to accept on | |
347 | private final TNonblockingServerTransport serverTransport; | |
348 | private final Selector acceptSelector; | |
349 | ||
350 | private final SelectorThreadLoadBalancer threadChooser; | |
351 | ||
352 | /** | |
353 | * Set up the AcceptThead | |
354 | * | |
355 | * @throws IOException | |
356 | */ | |
357 | public AcceptThread(TNonblockingServerTransport serverTransport, | |
358 | SelectorThreadLoadBalancer threadChooser) throws IOException { | |
359 | this.serverTransport = serverTransport; | |
360 | this.threadChooser = threadChooser; | |
361 | this.acceptSelector = SelectorProvider.provider().openSelector(); | |
362 | this.serverTransport.registerSelector(acceptSelector); | |
363 | } | |
364 | ||
365 | /** | |
366 | * The work loop. Selects on the server transport and accepts. If there was | |
367 | * a server transport that had blocking accepts, and returned on blocking | |
368 | * client transports, that should be used instead | |
369 | */ | |
370 | public void run() { | |
371 | try { | |
372 | if (eventHandler_ != null) { | |
373 | eventHandler_.preServe(); | |
374 | } | |
375 | ||
376 | while (!stopped_) { | |
377 | select(); | |
378 | } | |
379 | } catch (Throwable t) { | |
380 | LOGGER.error("run() on AcceptThread exiting due to uncaught error", t); | |
381 | } finally { | |
382 | try { | |
383 | acceptSelector.close(); | |
384 | } catch (IOException e) { | |
385 | LOGGER.error("Got an IOException while closing accept selector!", e); | |
386 | } | |
387 | // This will wake up the selector threads | |
388 | TThreadedSelectorServer.this.stop(); | |
389 | } | |
390 | } | |
391 | ||
392 | /** | |
393 | * If the selector is blocked, wake it up. | |
394 | */ | |
395 | public void wakeupSelector() { | |
396 | acceptSelector.wakeup(); | |
397 | } | |
398 | ||
399 | /** | |
400 | * Select and process IO events appropriately: If there are connections to | |
401 | * be accepted, accept them. | |
402 | */ | |
403 | private void select() { | |
404 | try { | |
405 | // wait for connect events. | |
406 | acceptSelector.select(); | |
407 | ||
408 | // process the io events we received | |
409 | Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator(); | |
410 | while (!stopped_ && selectedKeys.hasNext()) { | |
411 | SelectionKey key = selectedKeys.next(); | |
412 | selectedKeys.remove(); | |
413 | ||
414 | // skip if not valid | |
415 | if (!key.isValid()) { | |
416 | continue; | |
417 | } | |
418 | ||
419 | if (key.isAcceptable()) { | |
420 | handleAccept(); | |
421 | } else { | |
422 | LOGGER.warn("Unexpected state in select! " + key.interestOps()); | |
423 | } | |
424 | } | |
425 | } catch (IOException e) { | |
426 | LOGGER.warn("Got an IOException while selecting!", e); | |
427 | } | |
428 | } | |
429 | ||
430 | /** | |
431 | * Accept a new connection. | |
432 | */ | |
433 | private void handleAccept() { | |
434 | final TNonblockingTransport client = doAccept(); | |
435 | if (client != null) { | |
436 | // Pass this connection to a selector thread | |
437 | final SelectorThread targetThread = threadChooser.nextThread(); | |
438 | ||
439 | if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) { | |
440 | doAddAccept(targetThread, client); | |
441 | } else { | |
442 | // FAIR_ACCEPT | |
443 | try { | |
444 | invoker.submit(new Runnable() { | |
445 | public void run() { | |
446 | doAddAccept(targetThread, client); | |
447 | } | |
448 | }); | |
449 | } catch (RejectedExecutionException rx) { | |
450 | LOGGER.warn("ExecutorService rejected accept registration!", rx); | |
451 | // close immediately | |
452 | client.close(); | |
453 | } | |
454 | } | |
455 | } | |
456 | } | |
457 | ||
458 | private TNonblockingTransport doAccept() { | |
459 | try { | |
460 | return (TNonblockingTransport) serverTransport.accept(); | |
461 | } catch (TTransportException tte) { | |
462 | // something went wrong accepting. | |
463 | LOGGER.warn("Exception trying to accept!", tte); | |
464 | return null; | |
465 | } | |
466 | } | |
467 | ||
468 | private void doAddAccept(SelectorThread thread, TNonblockingTransport client) { | |
469 | if (!thread.addAcceptedConnection(client)) { | |
470 | client.close(); | |
471 | } | |
472 | } | |
473 | } // AcceptThread | |
474 | ||
475 | /** | |
476 | * The SelectorThread(s) will be doing all the selecting on accepted active | |
477 | * connections. | |
478 | */ | |
479 | protected class SelectorThread extends AbstractSelectThread { | |
480 | ||
481 | // Accepted connections added by the accept thread. | |
482 | private final BlockingQueue<TNonblockingTransport> acceptedQueue; | |
483 | private int SELECTOR_AUTO_REBUILD_THRESHOLD = 512; | |
484 | private long MONITOR_PERIOD = 1000L; | |
485 | private int jvmBug = 0; | |
486 | ||
487 | /** | |
488 | * Set up the SelectorThread with an unbounded queue for incoming accepts. | |
489 | * | |
490 | * @throws IOException | |
491 | * if a selector cannot be created | |
492 | */ | |
493 | public SelectorThread() throws IOException { | |
494 | this(new LinkedBlockingQueue<TNonblockingTransport>()); | |
495 | } | |
496 | ||
497 | /** | |
498 | * Set up the SelectorThread with an bounded queue for incoming accepts. | |
499 | * | |
500 | * @throws IOException | |
501 | * if a selector cannot be created | |
502 | */ | |
503 | public SelectorThread(int maxPendingAccepts) throws IOException { | |
504 | this(createDefaultAcceptQueue(maxPendingAccepts)); | |
505 | } | |
506 | ||
507 | /** | |
508 | * Set up the SelectorThread with a specified queue for connections. | |
509 | * | |
510 | * @param acceptedQueue | |
511 | * The BlockingQueue implementation for holding incoming accepted | |
512 | * connections. | |
513 | * @throws IOException | |
514 | * if a selector cannot be created. | |
515 | */ | |
516 | public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException { | |
517 | this.acceptedQueue = acceptedQueue; | |
518 | } | |
519 | ||
520 | /** | |
521 | * Hands off an accepted connection to be handled by this thread. This | |
522 | * method will block if the queue for new connections is at capacity. | |
523 | * | |
524 | * @param accepted | |
525 | * The connection that has been accepted. | |
526 | * @return true if the connection has been successfully added. | |
527 | */ | |
528 | public boolean addAcceptedConnection(TNonblockingTransport accepted) { | |
529 | try { | |
530 | acceptedQueue.put(accepted); | |
531 | } catch (InterruptedException e) { | |
532 | LOGGER.warn("Interrupted while adding accepted connection!", e); | |
533 | return false; | |
534 | } | |
535 | selector.wakeup(); | |
536 | return true; | |
537 | } | |
538 | ||
539 | /** | |
540 | * The work loop. Handles selecting (read/write IO), dispatching, and | |
541 | * managing the selection preferences of all existing connections. | |
542 | */ | |
543 | public void run() { | |
544 | try { | |
545 | while (!stopped_) { | |
546 | select(); | |
547 | processAcceptedConnections(); | |
548 | processInterestChanges(); | |
549 | } | |
550 | for (SelectionKey selectionKey : selector.keys()) { | |
551 | cleanupSelectionKey(selectionKey); | |
552 | } | |
553 | } catch (Throwable t) { | |
554 | LOGGER.error("run() on SelectorThread exiting due to uncaught error", t); | |
555 | } finally { | |
556 | try { | |
557 | selector.close(); | |
558 | } catch (IOException e) { | |
559 | LOGGER.error("Got an IOException while closing selector!", e); | |
560 | } | |
561 | // This will wake up the accept thread and the other selector threads | |
562 | TThreadedSelectorServer.this.stop(); | |
563 | } | |
564 | } | |
565 | ||
566 | /** | |
567 | * Select and process IO events appropriately: If there are existing | |
568 | * connections with data waiting to be read, read it, buffering until a | |
569 | * whole frame has been read. If there are any pending responses, buffer | |
570 | * them until their target client is available, and then send the data. | |
571 | */ | |
572 | private void select() { | |
573 | try { | |
574 | ||
575 | doSelect(); | |
576 | ||
577 | // process the io events we received | |
578 | Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); | |
579 | while (!stopped_ && selectedKeys.hasNext()) { | |
580 | SelectionKey key = selectedKeys.next(); | |
581 | selectedKeys.remove(); | |
582 | ||
583 | // skip if not valid | |
584 | if (!key.isValid()) { | |
585 | cleanupSelectionKey(key); | |
586 | continue; | |
587 | } | |
588 | ||
589 | if (key.isReadable()) { | |
590 | // deal with reads | |
591 | handleRead(key); | |
592 | } else if (key.isWritable()) { | |
593 | // deal with writes | |
594 | handleWrite(key); | |
595 | } else { | |
596 | LOGGER.warn("Unexpected state in select! " + key.interestOps()); | |
597 | } | |
598 | } | |
599 | } catch (IOException e) { | |
600 | LOGGER.warn("Got an IOException while selecting!", e); | |
601 | } | |
602 | } | |
603 | ||
604 | /** | |
605 | * Do select and judge epoll bug happen. | |
606 | * See : https://issues.apache.org/jira/browse/THRIFT-4251 | |
607 | */ | |
608 | private void doSelect() throws IOException { | |
609 | long beforeSelect = System.currentTimeMillis(); | |
610 | int selectedNums = selector.select(); | |
611 | long afterSelect = System.currentTimeMillis(); | |
612 | ||
613 | if (selectedNums == 0) { | |
614 | jvmBug++; | |
615 | } else { | |
616 | jvmBug = 0; | |
617 | } | |
618 | ||
619 | long selectedTime = afterSelect - beforeSelect; | |
620 | if (selectedTime >= MONITOR_PERIOD) { | |
621 | jvmBug = 0; | |
622 | } else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) { | |
623 | LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug); | |
624 | rebuildSelector(); | |
625 | selector.selectNow(); | |
626 | jvmBug = 0; | |
627 | } | |
628 | ||
629 | } | |
630 | ||
631 | /** | |
632 | * Replaces the current Selector of this SelectorThread with newly created Selector to work | |
633 | * around the infamous epoll 100% CPU bug. | |
634 | */ | |
635 | private synchronized void rebuildSelector() { | |
636 | final Selector oldSelector = selector; | |
637 | if (oldSelector == null) { | |
638 | return; | |
639 | } | |
640 | Selector newSelector = null; | |
641 | try { | |
642 | newSelector = Selector.open(); | |
643 | LOGGER.warn("Created new Selector."); | |
644 | } catch (IOException e) { | |
645 | LOGGER.error("Create new Selector error.", e); | |
646 | } | |
647 | ||
648 | for (SelectionKey key : oldSelector.selectedKeys()) { | |
649 | if (!key.isValid() && key.readyOps() == 0) | |
650 | continue; | |
651 | SelectableChannel channel = key.channel(); | |
652 | Object attachment = key.attachment(); | |
653 | ||
654 | try { | |
655 | if (attachment == null) { | |
656 | channel.register(newSelector, key.readyOps()); | |
657 | } else { | |
658 | channel.register(newSelector, key.readyOps(), attachment); | |
659 | } | |
660 | } catch (ClosedChannelException e) { | |
661 | LOGGER.error("Register new selector key error.", e); | |
662 | } | |
663 | ||
664 | } | |
665 | ||
666 | selector = newSelector; | |
667 | try { | |
668 | oldSelector.close(); | |
669 | } catch (IOException e) { | |
670 | LOGGER.error("Close old selector error.", e); | |
671 | } | |
672 | LOGGER.warn("Replace new selector success."); | |
673 | } | |
674 | ||
675 | private void processAcceptedConnections() { | |
676 | // Register accepted connections | |
677 | while (!stopped_) { | |
678 | TNonblockingTransport accepted = acceptedQueue.poll(); | |
679 | if (accepted == null) { | |
680 | break; | |
681 | } | |
682 | registerAccepted(accepted); | |
683 | } | |
684 | } | |
685 | ||
686 | protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, | |
687 | final SelectionKey selectionKey, | |
688 | final AbstractSelectThread selectThread) { | |
689 | return processorFactory_.isAsyncProcessor() ? | |
690 | new AsyncFrameBuffer(trans, selectionKey, selectThread) : | |
691 | new FrameBuffer(trans, selectionKey, selectThread); | |
692 | } | |
693 | ||
694 | private void registerAccepted(TNonblockingTransport accepted) { | |
695 | SelectionKey clientKey = null; | |
696 | try { | |
697 | clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ); | |
698 | ||
699 | FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this); | |
700 | ||
701 | clientKey.attach(frameBuffer); | |
702 | } catch (IOException e) { | |
703 | LOGGER.warn("Failed to register accepted connection to selector!", e); | |
704 | if (clientKey != null) { | |
705 | cleanupSelectionKey(clientKey); | |
706 | } | |
707 | accepted.close(); | |
708 | } | |
709 | } | |
710 | } // SelectorThread | |
711 | ||
712 | /** | |
713 | * Creates a SelectorThreadLoadBalancer to be used by the accept thread for | |
714 | * assigning newly accepted connections across the threads. | |
715 | */ | |
716 | protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) { | |
717 | return new SelectorThreadLoadBalancer(threads); | |
718 | } | |
719 | ||
720 | /** | |
721 | * A round robin load balancer for choosing selector threads for new | |
722 | * connections. | |
723 | */ | |
724 | protected static class SelectorThreadLoadBalancer { | |
725 | private final Collection<? extends SelectorThread> threads; | |
726 | private Iterator<? extends SelectorThread> nextThreadIterator; | |
727 | ||
728 | public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) { | |
729 | if (threads.isEmpty()) { | |
730 | throw new IllegalArgumentException("At least one selector thread is required"); | |
731 | } | |
732 | this.threads = Collections.unmodifiableList(new ArrayList<T>(threads)); | |
733 | nextThreadIterator = this.threads.iterator(); | |
734 | } | |
735 | ||
736 | public SelectorThread nextThread() { | |
737 | // Choose a selector thread (round robin) | |
738 | if (!nextThreadIterator.hasNext()) { | |
739 | nextThreadIterator = threads.iterator(); | |
740 | } | |
741 | return nextThreadIterator.next(); | |
742 | } | |
743 | } | |
744 | } |