2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <thrift/thrift-config.h>
22 #include <thrift/server/TNonblockingServer.h>
23 #include <thrift/concurrency/Exception.h>
24 #include <thrift/transport/TSocket.h>
25 #include <thrift/concurrency/ThreadFactory.h>
26 #include <thrift/transport/PlatformSocket.h>
35 #elif HAVE_SYS_SELECT_H
36 #include <sys/select.h>
39 #ifdef HAVE_SYS_SOCKET_H
40 #include <sys/socket.h>
43 #ifdef HAVE_NETINET_IN_H
44 #include <netinet/in.h>
45 #include <netinet/tcp.h>
48 #ifdef HAVE_ARPA_INET_H
49 #include <arpa/inet.h>
67 #define AF_LOCAL AF_UNIX
70 #ifdef HAVE_INTTYPES_H
82 using namespace apache::thrift::protocol
;
83 using namespace apache::thrift::transport
;
84 using namespace apache::thrift::concurrency
;
85 using apache::thrift::transport::TSocket
;
86 using apache::thrift::transport::TTransportException
;
87 using std::shared_ptr
;
89 /// Three states for sockets: recv frame size, recv data, and send mode
90 enum TSocketState
{ SOCKET_RECV_FRAMING
, SOCKET_RECV
, SOCKET_SEND
};
93 * Five states for the nonblocking server:
95 * 2) read 4 byte frame size
96 * 3) read frame of data
97 * 4) send back data (if any)
98 * 5) force immediate connection close
110 * Represents a connection that is handled via libevent. This connection
111 * essentially encapsulates a socket that has some associated libevent state.
113 class TNonblockingServer::TConnection
{
115 /// Server IO Thread handling this connection
116 TNonblockingIOThread
* ioThread_
;
119 TNonblockingServer
* server_
;
122 std::shared_ptr
<TProcessor
> processor_
;
124 /// Object wrapping network socket
125 std::shared_ptr
<TSocket
> tSocket_
;
134 TSocketState socketState_
;
136 /// Application state
139 /// How much data needed to read
142 /// Where in the read buffer are we
143 uint32_t readBufferPos_
;
146 uint8_t* readBuffer_
;
149 uint32_t readBufferSize_
;
152 uint8_t* writeBuffer_
;
154 /// Write buffer size
155 uint32_t writeBufferSize_
;
157 /// How far through writing are we?
158 uint32_t writeBufferPos_
;
160 /// Largest size of write buffer seen since buffer was constructed
161 size_t largestWriteBufferSize_
;
163 /// Count of the number of calls for use with getResizeBufferEveryN().
164 int32_t callsForResize_
;
166 /// Transport to read from
167 std::shared_ptr
<TMemoryBuffer
> inputTransport_
;
169 /// Transport that processor writes to
170 std::shared_ptr
<TMemoryBuffer
> outputTransport_
;
172 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
173 std::shared_ptr
<TTransport
> factoryInputTransport_
;
174 std::shared_ptr
<TTransport
> factoryOutputTransport_
;
177 std::shared_ptr
<TProtocol
> inputProtocol_
;
180 std::shared_ptr
<TProtocol
> outputProtocol_
;
182 /// Server event handler, if any
183 std::shared_ptr
<TServerEventHandler
> serverEventHandler_
;
185 /// Thrift call context, if any
186 void* connectionContext_
;
188 /// Go into read mode
189 void setRead() { setFlags(EV_READ
| EV_PERSIST
); }
191 /// Go into write mode
192 void setWrite() { setFlags(EV_WRITE
| EV_PERSIST
); }
195 void setIdle() { setFlags(0); }
198 * Set event flags for this connection.
200 * @param eventFlags flags we pass to libevent for the connection.
202 void setFlags(short eventFlags
);
205 * Libevent handler called (via our static wrapper) when the connection
206 * socket had something happen. Rather than use the flags libevent passed,
207 * we use the connection state to determine whether we need to read or
216 TConnection(std::shared_ptr
<TSocket
> socket
,
217 TNonblockingIOThread
* ioThread
) {
218 readBuffer_
= nullptr;
221 ioThread_
= ioThread
;
222 server_
= ioThread
->getServer();
224 // Allocate input and output transports these only need to be allocated
225 // once per TConnection (they don't need to be reallocated on init() call)
226 inputTransport_
.reset(new TMemoryBuffer(readBuffer_
, readBufferSize_
));
227 outputTransport_
.reset(
228 new TMemoryBuffer(static_cast<uint32_t>(server_
->getWriteBufferDefaultSize())));
235 ~TConnection() { std::free(readBuffer_
); }
237 /// Close this connection and free or reset its resources.
241 * Check buffers against any size limits and shrink it if exceeded.
243 * @param readLimit we reduce read buffer size to this (if nonzero).
244 * @param writeLimit if nonzero and write buffer is larger, replace it.
246 void checkIdleBufferMemLimit(size_t readLimit
, size_t writeLimit
);
249 void init(TNonblockingIOThread
* ioThread
);
251 /// set socket for connection
252 void setSocket(std::shared_ptr
<TSocket
> socket
);
255 * This is called when the application transitions from one state into
256 * another. This means that it has finished writing the data that it needed
257 * to, or finished receiving the data that it needed to.
262 * C-callable event handler for connection events. Provides a callback
263 * that libevent can understand which invokes connection_->workSocket().
265 * @param fd the descriptor the event occurred on.
266 * @param which the flags associated with the event.
267 * @param v void* callback arg where we placed TConnection's "this".
269 static void eventHandler(evutil_socket_t fd
, short /* which */, void* v
) {
270 assert(fd
== static_cast<evutil_socket_t
>(((TConnection
*)v
)->getTSocket()->getSocketFD()));
271 ((TConnection
*)v
)->workSocket();
275 * Notification to server that processing has ended on this request.
276 * Can be called either when processing is completed or when a waiting
277 * task has been preemptively terminated (on overload).
279 * Don't call this from the IO thread itself.
281 * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
283 bool notifyIOThread() { return ioThread_
->notify(this); }
286 * Returns the number of this connection's currently assigned IO
289 int getIOThreadNumber() const { return ioThread_
->getThreadNumber(); }
291 /// Force connection shutdown for this connection.
293 appState_
= APP_CLOSE_CONNECTION
;
294 if (!notifyIOThread()) {
295 server_
->decrementActiveProcessors();
297 throw TException("TConnection::forceClose: failed write on notify pipe");
301 /// return the server this connection was initialized for.
302 TNonblockingServer
* getServer() const { return server_
; }
304 /// get state of connection.
305 TAppState
getState() const { return appState_
; }
307 /// return the TSocket transport wrapping this network connection
308 std::shared_ptr
<TSocket
> getTSocket() const { return tSocket_
; }
310 /// return the server event handler if any
311 std::shared_ptr
<TServerEventHandler
> getServerEventHandler() { return serverEventHandler_
; }
313 /// return the Thrift connection context if any
314 void* getConnectionContext() { return connectionContext_
; }
317 class TNonblockingServer::TConnection::Task
: public Runnable
{
319 Task(std::shared_ptr
<TProcessor
> processor
,
320 std::shared_ptr
<TProtocol
> input
,
321 std::shared_ptr
<TProtocol
> output
,
322 TConnection
* connection
)
323 : processor_(processor
),
326 connection_(connection
),
327 serverEventHandler_(connection_
->getServerEventHandler()),
328 connectionContext_(connection_
->getConnectionContext()) {}
330 void run() override
{
333 if (serverEventHandler_
) {
334 serverEventHandler_
->processContext(connectionContext_
, connection_
->getTSocket());
336 if (!processor_
->process(input_
, output_
, connectionContext_
)
337 || !input_
->getTransport()->peek()) {
341 } catch (const TTransportException
& ttx
) {
342 GlobalOutput
.printf("TNonblockingServer: client died: %s", ttx
.what());
343 } catch (const std::bad_alloc
&) {
344 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
346 } catch (const std::exception
& x
) {
347 GlobalOutput
.printf("TNonblockingServer: process() exception: %s: %s",
351 GlobalOutput
.printf("TNonblockingServer: unknown exception while processing.");
354 // Signal completion back to the libevent thread via a pipe
355 if (!connection_
->notifyIOThread()) {
356 GlobalOutput
.printf("TNonblockingServer: failed to notifyIOThread, closing.");
357 connection_
->server_
->decrementActiveProcessors();
358 connection_
->close();
359 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
363 TConnection
* getTConnection() { return connection_
; }
366 std::shared_ptr
<TProcessor
> processor_
;
367 std::shared_ptr
<TProtocol
> input_
;
368 std::shared_ptr
<TProtocol
> output_
;
369 TConnection
* connection_
;
370 std::shared_ptr
<TServerEventHandler
> serverEventHandler_
;
371 void* connectionContext_
;
374 void TNonblockingServer::TConnection::init(TNonblockingIOThread
* ioThread
) {
375 ioThread_
= ioThread
;
376 server_
= ioThread
->getServer();
377 appState_
= APP_INIT
;
383 writeBuffer_
= nullptr;
384 writeBufferSize_
= 0;
386 largestWriteBufferSize_
= 0;
388 socketState_
= SOCKET_RECV_FRAMING
;
391 // get input/transports
392 factoryInputTransport_
= server_
->getInputTransportFactory()->getTransport(inputTransport_
);
393 factoryOutputTransport_
= server_
->getOutputTransportFactory()->getTransport(outputTransport_
);
396 if (server_
->getHeaderTransport()) {
397 inputProtocol_
= server_
->getInputProtocolFactory()->getProtocol(factoryInputTransport_
,
398 factoryOutputTransport_
);
399 outputProtocol_
= inputProtocol_
;
401 inputProtocol_
= server_
->getInputProtocolFactory()->getProtocol(factoryInputTransport_
);
402 outputProtocol_
= server_
->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_
);
405 // Set up for any server event handler
406 serverEventHandler_
= server_
->getEventHandler();
407 if (serverEventHandler_
) {
408 connectionContext_
= serverEventHandler_
->createContext(inputProtocol_
, outputProtocol_
);
410 connectionContext_
= nullptr;
414 processor_
= server_
->getProcessor(inputProtocol_
, outputProtocol_
, tSocket_
);
417 void TNonblockingServer::TConnection::setSocket(std::shared_ptr
<TSocket
> socket
) {
421 void TNonblockingServer::TConnection::workSocket() {
422 int got
= 0, left
= 0, sent
= 0;
425 switch (socketState_
) {
426 case SOCKET_RECV_FRAMING
:
428 uint8_t buf
[sizeof(uint32_t)];
432 // if we've already received some bytes we kept them here
433 framing
.size
= readWant_
;
434 // determine size of this frame
436 // Read from the socket
437 fetch
= tSocket_
->read(&framing
.buf
[readBufferPos_
],
438 uint32_t(sizeof(framing
.size
) - readBufferPos_
));
440 // Whenever we get here it means a remote disconnect
444 readBufferPos_
+= fetch
;
445 } catch (TTransportException
& te
) {
446 //In Nonblocking SSLSocket some operations need to be retried again.
447 //Current approach is parsing exception message, but a better solution needs to be investigated.
448 if(!strstr(te
.what(), "retry")) {
449 GlobalOutput
.printf("TConnection::workSocket(): %s", te
.what());
456 if (readBufferPos_
< sizeof(framing
.size
)) {
457 // more needed before frame size is known -- save what we have so far
458 readWant_
= framing
.size
;
462 readWant_
= ntohl(framing
.size
);
463 if (readWant_
> server_
->getMaxFrameSize()) {
464 // Don't allow giant frame sizes. This prevents bad clients from
465 // causing us to try and allocate a giant buffer.
467 "TNonblockingServer: frame size too large "
468 "(%" PRIu32
" > %" PRIu64
470 "Remote side not using TFramedTransport?",
472 (uint64_t)server_
->getMaxFrameSize(),
473 tSocket_
->getSocketInfo().c_str());
477 // size known; now get the rest of the frame
480 // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
481 // regular sockets, because if there is more data, libevent will fire the event handler registered for read
482 // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
483 // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
484 // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket,
485 // despite having more data.
486 if (tSocket_
->hasPendingDataToRead())
494 // It is an error to be in this state if we already have all the data
495 assert(readBufferPos_
< readWant_
);
498 // Read from the socket
499 fetch
= readWant_
- readBufferPos_
;
500 got
= tSocket_
->read(readBuffer_
+ readBufferPos_
, fetch
);
501 } catch (TTransportException
& te
) {
502 //In Nonblocking SSLSocket some operations need to be retried again.
503 //Current approach is parsing exception message, but a better solution needs to be investigated.
504 if(!strstr(te
.what(), "retry")) {
505 GlobalOutput
.printf("TConnection::workSocket(): %s", te
.what());
513 // Move along in the buffer
514 readBufferPos_
+= got
;
516 // Check that we did not overdo it
517 assert(readBufferPos_
<= readWant_
);
519 // We are done reading, move onto the next state
520 if (readBufferPos_
== readWant_
) {
526 // Whenever we get down here it means a remote disconnect
532 // Should never have position past size
533 assert(writeBufferPos_
<= writeBufferSize_
);
535 // If there is no data to send, then let us move on
536 if (writeBufferPos_
== writeBufferSize_
) {
537 GlobalOutput("WARNING: Send state with no data to send");
543 left
= writeBufferSize_
- writeBufferPos_
;
544 sent
= tSocket_
->write_partial(writeBuffer_
+ writeBufferPos_
, left
);
545 } catch (TTransportException
& te
) {
546 GlobalOutput
.printf("TConnection::workSocket(): %s ", te
.what());
551 writeBufferPos_
+= sent
;
554 assert(writeBufferPos_
<= writeBufferSize_
);
557 if (writeBufferPos_
== writeBufferSize_
) {
564 GlobalOutput
.printf("Unexpected Socket State %d", socketState_
);
569 bool TNonblockingServer::getHeaderTransport() {
570 // Currently if there is no output protocol factory,
571 // we assume header transport (without having to create
572 // a new transport and check)
573 return getOutputProtocolFactory() == nullptr;
577 * This is called when the application transitions from one state into
578 * another. This means that it has finished writing the data that it needed
579 * to, or finished receiving the data that it needed to.
581 void TNonblockingServer::TConnection::transition() {
582 // ensure this connection is active right now
586 // Switch upon the state that we are currently in and move to a new state
589 case APP_READ_REQUEST
:
590 // We are done reading the request, package the read buffer into transport
591 // and get back some data from the dispatch function
592 if (server_
->getHeaderTransport()) {
593 inputTransport_
->resetBuffer(readBuffer_
, readBufferPos_
);
594 outputTransport_
->resetBuffer();
596 // We saved room for the framing size in case header transport needed it,
597 // but just skip it for the non-header case
598 inputTransport_
->resetBuffer(readBuffer_
+ 4, readBufferPos_
- 4);
599 outputTransport_
->resetBuffer();
601 // Prepend four bytes of blank space to the buffer so we can
602 // write the frame size there later.
603 outputTransport_
->getWritePtr(4);
604 outputTransport_
->wroteBytes(4);
607 server_
->incrementActiveProcessors();
609 if (server_
->isThreadPoolProcessing()) {
610 // We are setting up a Task to do this work and we will wait on it
612 // Create task and dispatch to the thread manager
613 std::shared_ptr
<Runnable
> task
= std::shared_ptr
<Runnable
>(
614 new Task(processor_
, inputProtocol_
, outputProtocol_
, this));
615 // The application is now waiting on the task to finish
616 appState_
= APP_WAIT_TASK
;
618 // Set this connection idle so that libevent doesn't process more
619 // data on it while we're still waiting for the threadmanager to
624 server_
->addTask(task
);
625 } catch (IllegalStateException
& ise
) {
626 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
627 GlobalOutput
.printf("IllegalStateException: Server::process() %s", ise
.what());
628 server_
->decrementActiveProcessors();
630 } catch (TimedOutException
& to
) {
631 GlobalOutput
.printf("[ERROR] TimedOutException: Server::process() %s", to
.what());
632 server_
->decrementActiveProcessors();
639 if (serverEventHandler_
) {
640 serverEventHandler_
->processContext(connectionContext_
, getTSocket());
642 // Invoke the processor
643 processor_
->process(inputProtocol_
, outputProtocol_
, connectionContext_
);
644 } catch (const TTransportException
& ttx
) {
646 "TNonblockingServer transport error in "
649 server_
->decrementActiveProcessors();
652 } catch (const std::exception
& x
) {
653 GlobalOutput
.printf("Server::process() uncaught exception: %s: %s",
656 server_
->decrementActiveProcessors();
660 GlobalOutput
.printf("Server::process() unknown exception");
661 server_
->decrementActiveProcessors();
668 // Intentionally fall through here, the call to process has written into
672 // We have now finished processing a task and the result has been written
673 // into the outputTransport_, so we grab its contents and place them into
674 // the writeBuffer_ for actual writing by the libevent thread
676 server_
->decrementActiveProcessors();
677 // Get the result of the operation
678 outputTransport_
->getBuffer(&writeBuffer_
, &writeBufferSize_
);
680 // If the function call generated return data, then move into the send
681 // state and get going
682 // 4 bytes were reserved for frame size
683 if (writeBufferSize_
> 4) {
685 // Move into write state
687 socketState_
= SOCKET_SEND
;
689 // Put the frame size into the write buffer
690 auto frameSize
= (int32_t)htonl(writeBufferSize_
- 4);
691 memcpy(writeBuffer_
, &frameSize
, 4);
693 // Socket into write mode
694 appState_
= APP_SEND_RESULT
;
700 // In this case, the request was oneway and we should fall through
701 // right back into the read frame header state
704 case APP_SEND_RESULT
:
705 // it's now safe to perform buffer size housekeeping.
706 if (writeBufferSize_
> largestWriteBufferSize_
) {
707 largestWriteBufferSize_
= writeBufferSize_
;
709 if (server_
->getResizeBufferEveryN() > 0
710 && ++callsForResize_
>= server_
->getResizeBufferEveryN()) {
711 checkIdleBufferMemLimit(server_
->getIdleReadBufferLimit(),
712 server_
->getIdleWriteBufferLimit());
717 // N.B.: We also intentionally fall through here into the INIT state!
722 // Clear write buffer variables
723 writeBuffer_
= nullptr;
725 writeBufferSize_
= 0;
727 // Into read4 state we go
728 socketState_
= SOCKET_RECV_FRAMING
;
729 appState_
= APP_READ_FRAME_SIZE
;
733 // Register read event
738 case APP_READ_FRAME_SIZE
:
741 // We just read the request length
742 // Double the buffer size until it is big enough
743 if (readWant_
> readBufferSize_
) {
744 if (readBufferSize_
== 0) {
747 uint32_t newSize
= readBufferSize_
;
748 while (readWant_
> newSize
) {
752 auto* newBuffer
= (uint8_t*)std::realloc(readBuffer_
, newSize
);
753 if (newBuffer
== nullptr) {
754 // nothing else to be done...
755 throw std::bad_alloc();
757 readBuffer_
= newBuffer
;
758 readBufferSize_
= newSize
;
762 *((uint32_t*)readBuffer_
) = htonl(readWant_
- 4);
764 // Move into read request state
765 socketState_
= SOCKET_RECV
;
766 appState_
= APP_READ_REQUEST
;
770 case APP_CLOSE_CONNECTION
:
771 server_
->decrementActiveProcessors();
776 GlobalOutput
.printf("Unexpected Application State %d", appState_
);
781 void TNonblockingServer::TConnection::setFlags(short eventFlags
) {
782 // Catch the do nothing case
783 if (eventFlags_
== eventFlags
) {
787 // Delete a previously existing event
788 if (eventFlags_
&& event_del(&event_
) == -1) {
789 GlobalOutput
.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR
);
793 // Update in memory structure
794 eventFlags_
= eventFlags
;
796 // Do not call event_set if there are no flags
804 * Prepares the event structure &event to be used in future calls to
805 * event_add() and event_del(). The event will be prepared to call the
806 * eventHandler using the 'sock' file descriptor to monitor events.
808 * The events can be either EV_READ, EV_WRITE, or both, indicating
809 * that an application can read or write from the file respectively without
812 * The eventHandler will be called with the file descriptor that triggered
813 * the event and the type of event which will be one of: EV_TIMEOUT,
814 * EV_SIGNAL, EV_READ, EV_WRITE.
816 * The additional flag EV_PERSIST makes an event_add() persistent until
817 * event_del() has been called.
819 * Once initialized, the &event struct can be used repeatedly with
820 * event_add() and event_del() and does not need to be reinitialized unless
821 * the eventHandler and/or the argument to it are to be changed. However,
822 * when an ev structure has been added to libevent using event_add() the
823 * structure must persist until the event occurs (assuming EV_PERSIST
824 * is not set) or is removed using event_del(). You may not reuse the same
825 * ev structure for multiple monitored descriptors; each descriptor needs
828 event_set(&event_
, tSocket_
->getSocketFD(), eventFlags_
, TConnection::eventHandler
, this);
829 event_base_set(ioThread_
->getEventBase(), &event_
);
832 if (event_add(&event_
, nullptr) == -1) {
833 GlobalOutput
.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR
);
838 * Closes a connection
840 void TNonblockingServer::TConnection::close() {
843 if (serverEventHandler_
) {
844 serverEventHandler_
->deleteContext(connectionContext_
, inputProtocol_
, outputProtocol_
);
851 // close any factory produced transports
852 factoryInputTransport_
->close();
853 factoryOutputTransport_
->close();
855 // release processor and handler
858 // Give this object back to the server that owns it
859 server_
->returnConnection(this);
862 void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit
, size_t writeLimit
) {
863 if (readLimit
> 0 && readBufferSize_
> readLimit
) {
865 readBuffer_
= nullptr;
869 if (writeLimit
> 0 && largestWriteBufferSize_
> writeLimit
) {
871 outputTransport_
->resetBuffer(static_cast<uint32_t>(server_
->getWriteBufferDefaultSize()));
872 largestWriteBufferSize_
= 0;
876 TNonblockingServer::~TNonblockingServer() {
877 // Close any active connections (moves them to the idle connection stack)
878 while (activeConnections_
.size()) {
879 activeConnections_
.front()->close();
881 // Clean up unused TConnection objects in connectionStack_
882 while (!connectionStack_
.empty()) {
883 TConnection
* connection
= connectionStack_
.top();
884 connectionStack_
.pop();
887 // The TNonblockingIOThread objects have shared_ptrs to the Thread
888 // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
889 // objects (as runnable) so these objects will never deallocate without help.
890 while (!ioThreads_
.empty()) {
891 std::shared_ptr
<TNonblockingIOThread
> iot
= ioThreads_
.back();
892 ioThreads_
.pop_back();
893 iot
->setThread(std::shared_ptr
<Thread
>());
898 * Creates a new connection either by reusing an object off the stack or
899 * by allocating a new one entirely
901 TNonblockingServer::TConnection
* TNonblockingServer::createConnection(std::shared_ptr
<TSocket
> socket
) {
905 // pick an IO thread to handle this connection -- currently round robin
906 assert(nextIOThread_
< ioThreads_
.size());
907 int selectedThreadIdx
= nextIOThread_
;
908 nextIOThread_
= static_cast<uint32_t>((nextIOThread_
+ 1) % ioThreads_
.size());
910 TNonblockingIOThread
* ioThread
= ioThreads_
[selectedThreadIdx
].get();
912 // Check the connection stack to see if we can re-use
913 TConnection
* result
= nullptr;
914 if (connectionStack_
.empty()) {
915 result
= new TConnection(socket
, ioThread
);
918 result
= connectionStack_
.top();
919 connectionStack_
.pop();
920 result
->setSocket(socket
);
921 result
->init(ioThread
);
923 activeConnections_
.push_back(result
);
928 * Returns a connection to the stack
930 void TNonblockingServer::returnConnection(TConnection
* connection
) {
933 activeConnections_
.erase(std::remove(activeConnections_
.begin(),
934 activeConnections_
.end(),
936 activeConnections_
.end());
938 if (connectionStackLimit_
&& (connectionStack_
.size() >= connectionStackLimit_
)) {
942 connection
->checkIdleBufferMemLimit(idleReadBufferLimit_
, idleWriteBufferLimit_
);
943 connectionStack_
.push(connection
);
948 * Server socket had something happen. We accept all waiting client
949 * connections on fd and assign TConnection objects to handle those requests.
951 void TNonblockingServer::handleEvent(THRIFT_SOCKET fd
, short which
) {
953 // Make sure that libevent didn't mess up the socket handles
954 assert(fd
== serverSocket_
);
956 // Going to accept a new client socket
957 std::shared_ptr
<TSocket
> clientSocket
;
959 clientSocket
= serverTransport_
->accept();
961 // If we're overloaded, take action here
962 if (overloadAction_
!= T_OVERLOAD_NO_ACTION
&& serverOverloaded()) {
964 nConnectionsDropped_
++;
965 nTotalConnectionsDropped_
++;
966 if (overloadAction_
== T_OVERLOAD_CLOSE_ON_ACCEPT
) {
967 clientSocket
->close();
969 } else if (overloadAction_
== T_OVERLOAD_DRAIN_TASK_QUEUE
) {
970 if (!drainPendingTask()) {
971 // Nothing left to discard, so we drop connection instead.
972 clientSocket
->close();
978 // Create a new TConnection for this client socket.
979 TConnection
* clientConnection
= createConnection(clientSocket
);
981 // Fail fast if we could not create a TConnection object
982 if (clientConnection
== nullptr) {
983 GlobalOutput
.printf("thriftServerEventHandler: failed TConnection factory");
984 clientSocket
->close();
989 * Either notify the ioThread that is assigned this connection to
990 * start processing, or if it is us, we'll just ask this
991 * connection to do its initial state change here.
993 * (We need to avoid writing to our own notification pipe, to
994 * avoid possible deadlocks if the pipe is full.)
996 * The IO thread #0 is the only one that handles these listen
997 * events, so unless the connection has been assigned to thread #0
998 * we know it's not on our thread.
1000 if (clientConnection
->getIOThreadNumber() == 0) {
1001 clientConnection
->transition();
1003 if (!clientConnection
->notifyIOThread()) {
1004 GlobalOutput
.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno
);
1005 clientConnection
->close();
1012 * Creates a socket to listen on and binds it to the local port.
1014 void TNonblockingServer::createAndListenOnSocket() {
1015 serverTransport_
->listen();
1016 serverSocket_
= serverTransport_
->getSocketFD();
1020 void TNonblockingServer::setThreadManager(std::shared_ptr
<ThreadManager
> threadManager
) {
1021 threadManager_
= threadManager
;
1022 if (threadManager
) {
1023 threadManager
->setExpireCallback(
1024 std::bind(&TNonblockingServer::expireClose
,
1026 std::placeholders::_1
));
1027 threadPoolProcessing_
= true;
1029 threadPoolProcessing_
= false;
1033 bool TNonblockingServer::serverOverloaded() {
1034 size_t activeConnections
= numTConnections_
- connectionStack_
.size();
1035 if (numActiveProcessors_
> maxActiveProcessors_
|| activeConnections
> maxConnections_
) {
1037 GlobalOutput
.printf("TNonblockingServer: overload condition begun.");
1041 if (overloaded_
&& (numActiveProcessors_
<= overloadHysteresis_
* maxActiveProcessors_
)
1042 && (activeConnections
<= overloadHysteresis_
* maxConnections_
)) {
1043 GlobalOutput
.printf(
1044 "TNonblockingServer: overload ended; "
1045 "%u dropped (%llu total)",
1046 nConnectionsDropped_
,
1047 nTotalConnectionsDropped_
);
1048 nConnectionsDropped_
= 0;
1049 overloaded_
= false;
1056 bool TNonblockingServer::drainPendingTask() {
1057 if (threadManager_
) {
1058 std::shared_ptr
<Runnable
> task
= threadManager_
->removeNextPending();
1060 TConnection
* connection
= static_cast<TConnection::Task
*>(task
.get())->getTConnection();
1061 assert(connection
&& connection
->getServer() && connection
->getState() == APP_WAIT_TASK
);
1062 connection
->forceClose();
1069 void TNonblockingServer::expireClose(std::shared_ptr
<Runnable
> task
) {
1070 TConnection
* connection
= static_cast<TConnection::Task
*>(task
.get())->getTConnection();
1071 assert(connection
&& connection
->getServer() && connection
->getState() == APP_WAIT_TASK
);
1072 connection
->forceClose();
1075 void TNonblockingServer::stop() {
1076 // Breaks the event loop in all threads so that they end ASAP.
1077 for (auto & ioThread
: ioThreads_
) {
1082 void TNonblockingServer::registerEvents(event_base
* user_event_base
) {
1083 userEventBase_
= user_event_base
;
1085 // init listen socket
1086 if (serverSocket_
== THRIFT_INVALID_SOCKET
)
1087 createAndListenOnSocket();
1089 // set up the IO threads
1090 assert(ioThreads_
.empty());
1091 if (!numIOThreads_
) {
1092 numIOThreads_
= DEFAULT_IO_THREADS
;
1094 // User-provided event-base doesn't works for multi-threaded servers
1095 assert(numIOThreads_
== 1 || !userEventBase_
);
1097 for (uint32_t id
= 0; id
< numIOThreads_
; ++id
) {
1098 // the first IO thread also does the listening on server socket
1099 THRIFT_SOCKET listenFd
= (id
== 0 ? serverSocket_
: THRIFT_INVALID_SOCKET
);
1101 shared_ptr
<TNonblockingIOThread
> thread(
1102 new TNonblockingIOThread(this, id
, listenFd
, useHighPriorityIOThreads_
));
1103 ioThreads_
.push_back(thread
);
1106 // Notify handler of the preServe event
1107 if (eventHandler_
) {
1108 eventHandler_
->preServe();
1111 // Start all of our helper IO threads. Note that the threads run forever,
1112 // only terminating if stop() is called.
1113 assert(ioThreads_
.size() == numIOThreads_
);
1114 assert(ioThreads_
.size() > 0);
1116 GlobalOutput
.printf("TNonblockingServer: Serving with %d io threads.",
1119 // Launch all the secondary IO threads in separate threads
1120 if (ioThreads_
.size() > 1) {
1121 ioThreadFactory_
.reset(new ThreadFactory(
1125 assert(ioThreadFactory_
.get());
1127 // intentionally starting at thread 1, not 0
1128 for (uint32_t i
= 1; i
< ioThreads_
.size(); ++i
) {
1129 shared_ptr
<Thread
> thread
= ioThreadFactory_
->newThread(ioThreads_
[i
]);
1130 ioThreads_
[i
]->setThread(thread
);
1135 // Register the events for the primary (listener) IO thread
1136 ioThreads_
[0]->registerEvents();
1140 * Main workhorse function, starts up the server listening on a port and
1141 * loops over the libevent handler.
1143 void TNonblockingServer::serve() {
1145 if (ioThreads_
.empty())
1146 registerEvents(nullptr);
1148 // Run the primary (listener) IO thread loop in our main thread; this will
1149 // only return when the server is shutting down.
1150 ioThreads_
[0]->run();
1152 // Ensure all threads are finished before exiting serve()
1153 for (uint32_t i
= 0; i
< ioThreads_
.size(); ++i
) {
1154 ioThreads_
[i
]->join();
1155 GlobalOutput
.printf("TNonblocking: join done for IO thread #%d", i
);
1159 TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer
* server
,
1161 THRIFT_SOCKET listenSocket
,
1162 bool useHighPriority
)
1166 listenSocket_(listenSocket
),
1167 useHighPriority_(useHighPriority
),
1168 eventBase_(nullptr),
1169 ownEventBase_(false),
1171 notificationEvent_
{} {
1172 notificationPipeFDs_
[0] = -1;
1173 notificationPipeFDs_
[1] = -1;
1176 TNonblockingIOThread::~TNonblockingIOThread() {
1177 // make sure our associated thread is fully finished
1180 if (eventBase_
&& ownEventBase_
) {
1181 event_base_free(eventBase_
);
1182 ownEventBase_
= false;
1185 if (listenSocket_
!= THRIFT_INVALID_SOCKET
) {
1186 if (0 != ::THRIFT_CLOSESOCKET(listenSocket_
)) {
1187 GlobalOutput
.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR
);
1189 listenSocket_
= THRIFT_INVALID_SOCKET
;
1192 for (auto notificationPipeFD
: notificationPipeFDs_
) {
1193 if (notificationPipeFD
>= 0) {
1194 if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFD
)) {
1195 GlobalOutput
.perror("TNonblockingIOThread notificationPipe close(): ",
1196 THRIFT_GET_SOCKET_ERROR
);
1198 notificationPipeFD
= THRIFT_INVALID_SOCKET
;
1203 void TNonblockingIOThread::createNotificationPipe() {
1204 if (evutil_socketpair(AF_LOCAL
, SOCK_STREAM
, 0, notificationPipeFDs_
) == -1) {
1205 GlobalOutput
.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
1206 throw TException("can't create notification pipe");
1208 if (evutil_make_socket_nonblocking(notificationPipeFDs_
[0]) < 0
1209 || evutil_make_socket_nonblocking(notificationPipeFDs_
[1]) < 0) {
1210 ::THRIFT_CLOSESOCKET(notificationPipeFDs_
[0]);
1211 ::THRIFT_CLOSESOCKET(notificationPipeFDs_
[1]);
1212 throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
1214 for (auto notificationPipeFD
: notificationPipeFDs_
) {
1215 #if LIBEVENT_VERSION_NUMBER < 0x02000000
1217 if ((flags
= THRIFT_FCNTL(notificationPipeFD
, F_GETFD
, 0)) < 0
1218 || THRIFT_FCNTL(notificationPipeFD
, F_SETFD
, flags
| FD_CLOEXEC
) < 0) {
1220 if (evutil_make_socket_closeonexec(notificationPipeFD
) < 0) {
1222 ::THRIFT_CLOSESOCKET(notificationPipeFDs_
[0]);
1223 ::THRIFT_CLOSESOCKET(notificationPipeFDs_
[1]);
1225 "TNonblockingServer::createNotificationPipe() "
1232 * Register the core libevent events onto the proper base.
1234 void TNonblockingIOThread::registerEvents() {
1235 threadId_
= Thread::get_current();
1237 assert(eventBase_
== nullptr);
1238 eventBase_
= getServer()->getUserEventBase();
1239 if (eventBase_
== nullptr) {
1240 eventBase_
= event_base_new();
1241 ownEventBase_
= true;
1244 // Print some libevent stats
1246 GlobalOutput
.printf("TNonblockingServer: using libevent %s method %s",
1247 event_get_version(),
1248 event_base_get_method(eventBase_
));
1251 if (listenSocket_
!= THRIFT_INVALID_SOCKET
) {
1252 // Register the server event
1253 event_set(&serverEvent_
,
1255 EV_READ
| EV_PERSIST
,
1256 TNonblockingIOThread::listenHandler
,
1258 event_base_set(eventBase_
, &serverEvent_
);
1260 // Add the event and start up the server
1261 if (-1 == event_add(&serverEvent_
, nullptr)) {
1263 "TNonblockingServer::serve(): "
1264 "event_add() failed on server listen event");
1266 GlobalOutput
.printf("TNonblocking: IO thread #%d registered for listen.", number_
);
1269 createNotificationPipe();
1271 // Create an event to be notified when a task finishes
1272 event_set(¬ificationEvent_
,
1273 getNotificationRecvFD(),
1274 EV_READ
| EV_PERSIST
,
1275 TNonblockingIOThread::notifyHandler
,
1278 // Attach to the base
1279 event_base_set(eventBase_
, ¬ificationEvent_
);
1281 // Add the event and start up the server
1282 if (-1 == event_add(¬ificationEvent_
, nullptr)) {
1284 "TNonblockingServer::serve(): "
1285 "event_add() failed on task-done notification event");
1287 GlobalOutput
.printf("TNonblocking: IO thread #%d registered for notify.", number_
);
1290 bool TNonblockingIOThread::notify(TNonblockingServer::TConnection
* conn
) {
1291 auto fd
= getNotificationSendFD();
1297 long kSize
= sizeof(conn
);
1298 const char * pos
= (const char *)const_cast_sockopt(&conn
);
1300 #if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H)
1301 struct pollfd pfd
= {fd
, POLLOUT
, 0};
1305 ret
= poll(&pfd
, 1, -1);
1308 } else if (ret
== 0) {
1312 if (pfd
.revents
& POLLHUP
|| pfd
.revents
& POLLERR
) {
1313 ::THRIFT_CLOSESOCKET(fd
);
1317 if (pfd
.revents
& POLLOUT
) {
1318 ret
= send(fd
, pos
, kSize
, 0);
1320 if (errno
== EAGAIN
) {
1324 ::THRIFT_CLOSESOCKET(fd
);
1340 ret
= select(static_cast<int>(fd
+ 1), NULL
, &wfds
, &efds
, NULL
);
1343 } else if (ret
== 0) {
1347 if (FD_ISSET(fd
, &efds
)) {
1348 ::THRIFT_CLOSESOCKET(fd
);
1352 if (FD_ISSET(fd
, &wfds
)) {
1353 ret
= send(fd
, pos
, kSize
, 0);
1355 if (errno
== EAGAIN
) {
1359 ::THRIFT_CLOSESOCKET(fd
);
1373 void TNonblockingIOThread::notifyHandler(evutil_socket_t fd
, short which
, void* v
) {
1374 auto* ioThread
= (TNonblockingIOThread
*)v
;
1379 TNonblockingServer::TConnection
* connection
= nullptr;
1380 const int kSize
= sizeof(connection
);
1381 long nBytes
= recv(fd
, cast_sockopt(&connection
), kSize
, 0);
1382 if (nBytes
== kSize
) {
1383 if (connection
== nullptr) {
1384 // this is the command to stop our thread, exit the handler!
1385 ioThread
->breakLoop(false);
1388 connection
->transition();
1389 } else if (nBytes
> 0) {
1390 // throw away these bytes and hope that next time we get a solid read
1391 GlobalOutput
.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes
, kSize
);
1392 ioThread
->breakLoop(true);
1394 } else if (nBytes
== 0) {
1395 GlobalOutput
.printf("notifyHandler: Notify socket closed!");
1396 ioThread
->breakLoop(false);
1399 } else { // nBytes < 0
1400 if (THRIFT_GET_SOCKET_ERROR
!= THRIFT_EWOULDBLOCK
1401 && THRIFT_GET_SOCKET_ERROR
!= THRIFT_EAGAIN
) {
1402 GlobalOutput
.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR
);
1403 ioThread
->breakLoop(true);
1412 void TNonblockingIOThread::breakLoop(bool error
) {
1414 GlobalOutput
.printf("TNonblockingServer: IO thread #%d exiting with error.", number_
);
1415 // TODO: figure out something better to do here, but for now kill the
1417 GlobalOutput
.printf("TNonblockingServer: aborting process.");
1421 // If we're running in the same thread, we can't use the notify(0)
1422 // mechanism to stop the thread, but happily if we're running in the
1423 // same thread, this means the thread can't be blocking in the event
1425 if (!Thread::is_current(threadId_
)) {
1428 // cause the loop to stop ASAP - even if it has things to do in it
1429 event_base_loopbreak(eventBase_
);
1433 void TNonblockingIOThread::setCurrentThreadHighPriority(bool value
) {
1435 // Start out with a standard, low-priority setup for the sched params.
1436 struct sched_param sp
;
1437 bzero((void*)&sp
, sizeof(sp
));
1438 int policy
= SCHED_OTHER
;
1440 // If desired, set up high-priority sched params structure.
1442 // FIFO scheduler, ranked above default SCHED_OTHER queue
1443 policy
= SCHED_FIFO
;
1444 // The priority only compares us to other SCHED_FIFO threads, so we
1445 // just pick a random priority halfway between min & max.
1446 const int priority
= (sched_get_priority_max(policy
) + sched_get_priority_min(policy
)) / 2;
1448 sp
.sched_priority
= priority
;
1451 // Actually set the sched params for the current thread.
1452 if (0 == pthread_setschedparam(pthread_self(), policy
, &sp
)) {
1453 GlobalOutput
.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_
);
1455 GlobalOutput
.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR
);
1458 THRIFT_UNUSED_VARIABLE(value
);
1462 void TNonblockingIOThread::run() {
1463 if (eventBase_
== nullptr) {
1466 if (useHighPriority_
) {
1467 setCurrentThreadHighPriority(true);
1470 if (eventBase_
!= nullptr)
1472 GlobalOutput
.printf("TNonblockingServer: IO thread #%d entering loop...", number_
);
1473 // Run libevent engine, never returns, invokes calls to eventHandler
1474 event_base_loop(eventBase_
, 0);
1476 if (useHighPriority_
) {
1477 setCurrentThreadHighPriority(false);
1480 // cleans up our registered events
1484 GlobalOutput
.printf("TNonblockingServer: IO thread #%d run() done!", number_
);
1487 void TNonblockingIOThread::cleanupEvents() {
1488 // stop the listen socket, if any
1489 if (listenSocket_
!= THRIFT_INVALID_SOCKET
) {
1490 if (event_del(&serverEvent_
) == -1) {
1491 GlobalOutput
.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR
);
1495 event_del(¬ificationEvent_
);
1498 void TNonblockingIOThread::stop() {
1499 // This should cause the thread to fall out of its event loop ASAP.
1503 void TNonblockingIOThread::join() {
1504 // If this was a thread created by a factory (not the thread that called
1505 // serve()), we join() it to make sure we shut down fully.
1508 // Note that it is safe to both join() ourselves twice, as well as join
1509 // the current thread as the pthread implementation checks for deadlock.
1512 // swallow everything
1518 } // apache::thrift::server