]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | #include <thrift/thrift-config.h> | |
21 | ||
22 | #include <thrift/server/TNonblockingServer.h> | |
23 | #include <thrift/concurrency/Exception.h> | |
24 | #include <thrift/transport/TSocket.h> | |
25 | #include <thrift/concurrency/ThreadFactory.h> | |
26 | #include <thrift/transport/PlatformSocket.h> | |
27 | ||
28 | #include <algorithm> | |
29 | #include <iostream> | |
30 | ||
31 | #ifdef HAVE_POLL_H | |
32 | #include <poll.h> | |
33 | #elif HAVE_SYS_POLL_H | |
34 | #include <sys/poll.h> | |
35 | #elif HAVE_SYS_SELECT_H | |
36 | #include <sys/select.h> | |
37 | #endif | |
38 | ||
39 | #ifdef HAVE_SYS_SOCKET_H | |
40 | #include <sys/socket.h> | |
41 | #endif | |
42 | ||
43 | #ifdef HAVE_NETINET_IN_H | |
44 | #include <netinet/in.h> | |
45 | #include <netinet/tcp.h> | |
46 | #endif | |
47 | ||
48 | #ifdef HAVE_ARPA_INET_H | |
49 | #include <arpa/inet.h> | |
50 | #endif | |
51 | ||
52 | #ifdef HAVE_NETDB_H | |
53 | #include <netdb.h> | |
54 | #endif | |
55 | ||
56 | #ifdef HAVE_FCNTL_H | |
57 | #include <fcntl.h> | |
58 | #endif | |
59 | ||
60 | #include <assert.h> | |
61 | ||
62 | #ifdef HAVE_SCHED_H | |
63 | #include <sched.h> | |
64 | #endif | |
65 | ||
66 | #ifndef AF_LOCAL | |
67 | #define AF_LOCAL AF_UNIX | |
68 | #endif | |
69 | ||
70 | #ifdef HAVE_INTTYPES_H | |
71 | #include <inttypes.h> | |
72 | #endif | |
73 | ||
74 | #ifdef HAVE_STDINT_H | |
75 | #include <stdint.h> | |
76 | #endif | |
77 | ||
78 | namespace apache { | |
79 | namespace thrift { | |
80 | namespace server { | |
81 | ||
82 | using namespace apache::thrift::protocol; | |
83 | using namespace apache::thrift::transport; | |
84 | using namespace apache::thrift::concurrency; | |
85 | using apache::thrift::transport::TSocket; | |
86 | using apache::thrift::transport::TTransportException; | |
87 | using std::shared_ptr; | |
88 | ||
89 | /// Three states for sockets: recv frame size, recv data, and send mode | |
90 | enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND }; | |
91 | ||
92 | /** | |
93 | * Five states for the nonblocking server: | |
94 | * 1) initialize | |
95 | * 2) read 4 byte frame size | |
96 | * 3) read frame of data | |
97 | * 4) send back data (if any) | |
98 | * 5) force immediate connection close | |
99 | */ | |
100 | enum TAppState { | |
101 | APP_INIT, | |
102 | APP_READ_FRAME_SIZE, | |
103 | APP_READ_REQUEST, | |
104 | APP_WAIT_TASK, | |
105 | APP_SEND_RESULT, | |
106 | APP_CLOSE_CONNECTION | |
107 | }; | |
108 | ||
109 | /** | |
110 | * Represents a connection that is handled via libevent. This connection | |
111 | * essentially encapsulates a socket that has some associated libevent state. | |
112 | */ | |
113 | class TNonblockingServer::TConnection { | |
114 | private: | |
115 | /// Server IO Thread handling this connection | |
116 | TNonblockingIOThread* ioThread_; | |
117 | ||
118 | /// Server handle | |
119 | TNonblockingServer* server_; | |
120 | ||
121 | /// TProcessor | |
122 | std::shared_ptr<TProcessor> processor_; | |
123 | ||
124 | /// Object wrapping network socket | |
125 | std::shared_ptr<TSocket> tSocket_; | |
126 | ||
127 | /// Libevent object | |
128 | struct event event_; | |
129 | ||
130 | /// Libevent flags | |
131 | short eventFlags_; | |
132 | ||
133 | /// Socket mode | |
134 | TSocketState socketState_; | |
135 | ||
136 | /// Application state | |
137 | TAppState appState_; | |
138 | ||
139 | /// How much data needed to read | |
140 | uint32_t readWant_; | |
141 | ||
142 | /// Where in the read buffer are we | |
143 | uint32_t readBufferPos_; | |
144 | ||
145 | /// Read buffer | |
146 | uint8_t* readBuffer_; | |
147 | ||
148 | /// Read buffer size | |
149 | uint32_t readBufferSize_; | |
150 | ||
151 | /// Write buffer | |
152 | uint8_t* writeBuffer_; | |
153 | ||
154 | /// Write buffer size | |
155 | uint32_t writeBufferSize_; | |
156 | ||
157 | /// How far through writing are we? | |
158 | uint32_t writeBufferPos_; | |
159 | ||
160 | /// Largest size of write buffer seen since buffer was constructed | |
161 | size_t largestWriteBufferSize_; | |
162 | ||
163 | /// Count of the number of calls for use with getResizeBufferEveryN(). | |
164 | int32_t callsForResize_; | |
165 | ||
166 | /// Transport to read from | |
167 | std::shared_ptr<TMemoryBuffer> inputTransport_; | |
168 | ||
169 | /// Transport that processor writes to | |
170 | std::shared_ptr<TMemoryBuffer> outputTransport_; | |
171 | ||
172 | /// extra transport generated by transport factory (e.g. BufferedRouterTransport) | |
173 | std::shared_ptr<TTransport> factoryInputTransport_; | |
174 | std::shared_ptr<TTransport> factoryOutputTransport_; | |
175 | ||
176 | /// Protocol decoder | |
177 | std::shared_ptr<TProtocol> inputProtocol_; | |
178 | ||
179 | /// Protocol encoder | |
180 | std::shared_ptr<TProtocol> outputProtocol_; | |
181 | ||
182 | /// Server event handler, if any | |
183 | std::shared_ptr<TServerEventHandler> serverEventHandler_; | |
184 | ||
185 | /// Thrift call context, if any | |
186 | void* connectionContext_; | |
187 | ||
188 | /// Go into read mode | |
189 | void setRead() { setFlags(EV_READ | EV_PERSIST); } | |
190 | ||
191 | /// Go into write mode | |
192 | void setWrite() { setFlags(EV_WRITE | EV_PERSIST); } | |
193 | ||
194 | /// Set socket idle | |
195 | void setIdle() { setFlags(0); } | |
196 | ||
197 | /** | |
198 | * Set event flags for this connection. | |
199 | * | |
200 | * @param eventFlags flags we pass to libevent for the connection. | |
201 | */ | |
202 | void setFlags(short eventFlags); | |
203 | ||
204 | /** | |
205 | * Libevent handler called (via our static wrapper) when the connection | |
206 | * socket had something happen. Rather than use the flags libevent passed, | |
207 | * we use the connection state to determine whether we need to read or | |
208 | * write the socket. | |
209 | */ | |
210 | void workSocket(); | |
211 | ||
212 | public: | |
213 | class Task; | |
214 | ||
215 | /// Constructor | |
216 | TConnection(std::shared_ptr<TSocket> socket, | |
217 | TNonblockingIOThread* ioThread) { | |
218 | readBuffer_ = nullptr; | |
219 | readBufferSize_ = 0; | |
220 | ||
221 | ioThread_ = ioThread; | |
222 | server_ = ioThread->getServer(); | |
223 | ||
224 | // Allocate input and output transports these only need to be allocated | |
225 | // once per TConnection (they don't need to be reallocated on init() call) | |
226 | inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_)); | |
227 | outputTransport_.reset( | |
228 | new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()))); | |
229 | ||
230 | tSocket_ = socket; | |
231 | ||
232 | init(ioThread); | |
233 | } | |
234 | ||
235 | ~TConnection() { std::free(readBuffer_); } | |
236 | ||
237 | /// Close this connection and free or reset its resources. | |
238 | void close(); | |
239 | ||
240 | /** | |
241 | * Check buffers against any size limits and shrink it if exceeded. | |
242 | * | |
243 | * @param readLimit we reduce read buffer size to this (if nonzero). | |
244 | * @param writeLimit if nonzero and write buffer is larger, replace it. | |
245 | */ | |
246 | void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit); | |
247 | ||
248 | /// Initialize | |
249 | void init(TNonblockingIOThread* ioThread); | |
250 | ||
251 | /// set socket for connection | |
252 | void setSocket(std::shared_ptr<TSocket> socket); | |
253 | ||
254 | /** | |
255 | * This is called when the application transitions from one state into | |
256 | * another. This means that it has finished writing the data that it needed | |
257 | * to, or finished receiving the data that it needed to. | |
258 | */ | |
259 | void transition(); | |
260 | ||
261 | /** | |
262 | * C-callable event handler for connection events. Provides a callback | |
263 | * that libevent can understand which invokes connection_->workSocket(). | |
264 | * | |
265 | * @param fd the descriptor the event occurred on. | |
266 | * @param which the flags associated with the event. | |
267 | * @param v void* callback arg where we placed TConnection's "this". | |
268 | */ | |
269 | static void eventHandler(evutil_socket_t fd, short /* which */, void* v) { | |
270 | assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD())); | |
271 | ((TConnection*)v)->workSocket(); | |
272 | } | |
273 | ||
274 | /** | |
275 | * Notification to server that processing has ended on this request. | |
276 | * Can be called either when processing is completed or when a waiting | |
277 | * task has been preemptively terminated (on overload). | |
278 | * | |
279 | * Don't call this from the IO thread itself. | |
280 | * | |
281 | * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR). | |
282 | */ | |
283 | bool notifyIOThread() { return ioThread_->notify(this); } | |
284 | ||
285 | /* | |
286 | * Returns the number of this connection's currently assigned IO | |
287 | * thread. | |
288 | */ | |
289 | int getIOThreadNumber() const { return ioThread_->getThreadNumber(); } | |
290 | ||
291 | /// Force connection shutdown for this connection. | |
292 | void forceClose() { | |
293 | appState_ = APP_CLOSE_CONNECTION; | |
294 | if (!notifyIOThread()) { | |
295 | server_->decrementActiveProcessors(); | |
296 | close(); | |
297 | throw TException("TConnection::forceClose: failed write on notify pipe"); | |
298 | } | |
299 | } | |
300 | ||
301 | /// return the server this connection was initialized for. | |
302 | TNonblockingServer* getServer() const { return server_; } | |
303 | ||
304 | /// get state of connection. | |
305 | TAppState getState() const { return appState_; } | |
306 | ||
307 | /// return the TSocket transport wrapping this network connection | |
308 | std::shared_ptr<TSocket> getTSocket() const { return tSocket_; } | |
309 | ||
310 | /// return the server event handler if any | |
311 | std::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; } | |
312 | ||
313 | /// return the Thrift connection context if any | |
314 | void* getConnectionContext() { return connectionContext_; } | |
315 | }; | |
316 | ||
317 | class TNonblockingServer::TConnection::Task : public Runnable { | |
318 | public: | |
319 | Task(std::shared_ptr<TProcessor> processor, | |
320 | std::shared_ptr<TProtocol> input, | |
321 | std::shared_ptr<TProtocol> output, | |
322 | TConnection* connection) | |
323 | : processor_(processor), | |
324 | input_(input), | |
325 | output_(output), | |
326 | connection_(connection), | |
327 | serverEventHandler_(connection_->getServerEventHandler()), | |
328 | connectionContext_(connection_->getConnectionContext()) {} | |
329 | ||
330 | void run() override { | |
331 | try { | |
332 | for (;;) { | |
333 | if (serverEventHandler_) { | |
334 | serverEventHandler_->processContext(connectionContext_, connection_->getTSocket()); | |
335 | } | |
336 | if (!processor_->process(input_, output_, connectionContext_) | |
337 | || !input_->getTransport()->peek()) { | |
338 | break; | |
339 | } | |
340 | } | |
341 | } catch (const TTransportException& ttx) { | |
342 | GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what()); | |
343 | } catch (const std::bad_alloc&) { | |
344 | GlobalOutput("TNonblockingServer: caught bad_alloc exception."); | |
345 | exit(1); | |
346 | } catch (const std::exception& x) { | |
347 | GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s", | |
348 | typeid(x).name(), | |
349 | x.what()); | |
350 | } catch (...) { | |
351 | GlobalOutput.printf("TNonblockingServer: unknown exception while processing."); | |
352 | } | |
353 | ||
354 | // Signal completion back to the libevent thread via a pipe | |
355 | if (!connection_->notifyIOThread()) { | |
356 | GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing."); | |
357 | connection_->server_->decrementActiveProcessors(); | |
358 | connection_->close(); | |
359 | throw TException("TNonblockingServer::Task::run: failed write on notify pipe"); | |
360 | } | |
361 | } | |
362 | ||
363 | TConnection* getTConnection() { return connection_; } | |
364 | ||
365 | private: | |
366 | std::shared_ptr<TProcessor> processor_; | |
367 | std::shared_ptr<TProtocol> input_; | |
368 | std::shared_ptr<TProtocol> output_; | |
369 | TConnection* connection_; | |
370 | std::shared_ptr<TServerEventHandler> serverEventHandler_; | |
371 | void* connectionContext_; | |
372 | }; | |
373 | ||
374 | void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) { | |
375 | ioThread_ = ioThread; | |
376 | server_ = ioThread->getServer(); | |
377 | appState_ = APP_INIT; | |
378 | eventFlags_ = 0; | |
379 | ||
380 | readBufferPos_ = 0; | |
381 | readWant_ = 0; | |
382 | ||
383 | writeBuffer_ = nullptr; | |
384 | writeBufferSize_ = 0; | |
385 | writeBufferPos_ = 0; | |
386 | largestWriteBufferSize_ = 0; | |
387 | ||
388 | socketState_ = SOCKET_RECV_FRAMING; | |
389 | callsForResize_ = 0; | |
390 | ||
391 | // get input/transports | |
392 | factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_); | |
393 | factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_); | |
394 | ||
395 | // Create protocol | |
396 | if (server_->getHeaderTransport()) { | |
397 | inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_, | |
398 | factoryOutputTransport_); | |
399 | outputProtocol_ = inputProtocol_; | |
400 | } else { | |
401 | inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_); | |
402 | outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_); | |
403 | } | |
404 | ||
405 | // Set up for any server event handler | |
406 | serverEventHandler_ = server_->getEventHandler(); | |
407 | if (serverEventHandler_) { | |
408 | connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_); | |
409 | } else { | |
410 | connectionContext_ = nullptr; | |
411 | } | |
412 | ||
413 | // Get the processor | |
414 | processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_); | |
415 | } | |
416 | ||
417 | void TNonblockingServer::TConnection::setSocket(std::shared_ptr<TSocket> socket) { | |
418 | tSocket_ = socket; | |
419 | } | |
420 | ||
421 | void TNonblockingServer::TConnection::workSocket() { | |
422 | int got = 0, left = 0, sent = 0; | |
423 | uint32_t fetch = 0; | |
424 | ||
425 | switch (socketState_) { | |
426 | case SOCKET_RECV_FRAMING: | |
427 | union { | |
428 | uint8_t buf[sizeof(uint32_t)]; | |
429 | uint32_t size; | |
430 | } framing; | |
431 | ||
432 | // if we've already received some bytes we kept them here | |
433 | framing.size = readWant_; | |
434 | // determine size of this frame | |
435 | try { | |
436 | // Read from the socket | |
437 | fetch = tSocket_->read(&framing.buf[readBufferPos_], | |
438 | uint32_t(sizeof(framing.size) - readBufferPos_)); | |
439 | if (fetch == 0) { | |
440 | // Whenever we get here it means a remote disconnect | |
441 | close(); | |
442 | return; | |
443 | } | |
444 | readBufferPos_ += fetch; | |
445 | } catch (TTransportException& te) { | |
446 | //In Nonblocking SSLSocket some operations need to be retried again. | |
447 | //Current approach is parsing exception message, but a better solution needs to be investigated. | |
448 | if(!strstr(te.what(), "retry")) { | |
449 | GlobalOutput.printf("TConnection::workSocket(): %s", te.what()); | |
450 | close(); | |
451 | ||
452 | return; | |
453 | } | |
454 | } | |
455 | ||
456 | if (readBufferPos_ < sizeof(framing.size)) { | |
457 | // more needed before frame size is known -- save what we have so far | |
458 | readWant_ = framing.size; | |
459 | return; | |
460 | } | |
461 | ||
462 | readWant_ = ntohl(framing.size); | |
463 | if (readWant_ > server_->getMaxFrameSize()) { | |
464 | // Don't allow giant frame sizes. This prevents bad clients from | |
465 | // causing us to try and allocate a giant buffer. | |
466 | GlobalOutput.printf( | |
467 | "TNonblockingServer: frame size too large " | |
468 | "(%" PRIu32 " > %" PRIu64 | |
469 | ") from client %s. " | |
470 | "Remote side not using TFramedTransport?", | |
471 | readWant_, | |
472 | (uint64_t)server_->getMaxFrameSize(), | |
473 | tSocket_->getSocketInfo().c_str()); | |
474 | close(); | |
475 | return; | |
476 | } | |
477 | // size known; now get the rest of the frame | |
478 | transition(); | |
479 | ||
480 | // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for | |
481 | // regular sockets, because if there is more data, libevent will fire the event handler registered for read | |
482 | // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the | |
483 | // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In | |
484 | // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket, | |
485 | // despite having more data. | |
486 | if (tSocket_->hasPendingDataToRead()) | |
487 | { | |
488 | workSocket(); | |
489 | } | |
490 | ||
491 | return; | |
492 | ||
493 | case SOCKET_RECV: | |
494 | // It is an error to be in this state if we already have all the data | |
495 | assert(readBufferPos_ < readWant_); | |
496 | ||
497 | try { | |
498 | // Read from the socket | |
499 | fetch = readWant_ - readBufferPos_; | |
500 | got = tSocket_->read(readBuffer_ + readBufferPos_, fetch); | |
501 | } catch (TTransportException& te) { | |
502 | //In Nonblocking SSLSocket some operations need to be retried again. | |
503 | //Current approach is parsing exception message, but a better solution needs to be investigated. | |
504 | if(!strstr(te.what(), "retry")) { | |
505 | GlobalOutput.printf("TConnection::workSocket(): %s", te.what()); | |
506 | close(); | |
507 | } | |
508 | ||
509 | return; | |
510 | } | |
511 | ||
512 | if (got > 0) { | |
513 | // Move along in the buffer | |
514 | readBufferPos_ += got; | |
515 | ||
516 | // Check that we did not overdo it | |
517 | assert(readBufferPos_ <= readWant_); | |
518 | ||
519 | // We are done reading, move onto the next state | |
520 | if (readBufferPos_ == readWant_) { | |
521 | transition(); | |
522 | } | |
523 | return; | |
524 | } | |
525 | ||
526 | // Whenever we get down here it means a remote disconnect | |
527 | close(); | |
528 | ||
529 | return; | |
530 | ||
531 | case SOCKET_SEND: | |
532 | // Should never have position past size | |
533 | assert(writeBufferPos_ <= writeBufferSize_); | |
534 | ||
535 | // If there is no data to send, then let us move on | |
536 | if (writeBufferPos_ == writeBufferSize_) { | |
537 | GlobalOutput("WARNING: Send state with no data to send"); | |
538 | transition(); | |
539 | return; | |
540 | } | |
541 | ||
542 | try { | |
543 | left = writeBufferSize_ - writeBufferPos_; | |
544 | sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left); | |
545 | } catch (TTransportException& te) { | |
546 | GlobalOutput.printf("TConnection::workSocket(): %s ", te.what()); | |
547 | close(); | |
548 | return; | |
549 | } | |
550 | ||
551 | writeBufferPos_ += sent; | |
552 | ||
553 | // Did we overdo it? | |
554 | assert(writeBufferPos_ <= writeBufferSize_); | |
555 | ||
556 | // We are done! | |
557 | if (writeBufferPos_ == writeBufferSize_) { | |
558 | transition(); | |
559 | } | |
560 | ||
561 | return; | |
562 | ||
563 | default: | |
564 | GlobalOutput.printf("Unexpected Socket State %d", socketState_); | |
565 | assert(0); | |
566 | } | |
567 | } | |
568 | ||
569 | bool TNonblockingServer::getHeaderTransport() { | |
570 | // Currently if there is no output protocol factory, | |
571 | // we assume header transport (without having to create | |
572 | // a new transport and check) | |
573 | return getOutputProtocolFactory() == nullptr; | |
574 | } | |
575 | ||
576 | /** | |
577 | * This is called when the application transitions from one state into | |
578 | * another. This means that it has finished writing the data that it needed | |
579 | * to, or finished receiving the data that it needed to. | |
580 | */ | |
581 | void TNonblockingServer::TConnection::transition() { | |
582 | // ensure this connection is active right now | |
583 | assert(ioThread_); | |
584 | assert(server_); | |
585 | ||
586 | // Switch upon the state that we are currently in and move to a new state | |
587 | switch (appState_) { | |
588 | ||
589 | case APP_READ_REQUEST: | |
590 | // We are done reading the request, package the read buffer into transport | |
591 | // and get back some data from the dispatch function | |
592 | if (server_->getHeaderTransport()) { | |
593 | inputTransport_->resetBuffer(readBuffer_, readBufferPos_); | |
594 | outputTransport_->resetBuffer(); | |
595 | } else { | |
596 | // We saved room for the framing size in case header transport needed it, | |
597 | // but just skip it for the non-header case | |
598 | inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4); | |
599 | outputTransport_->resetBuffer(); | |
600 | ||
601 | // Prepend four bytes of blank space to the buffer so we can | |
602 | // write the frame size there later. | |
603 | outputTransport_->getWritePtr(4); | |
604 | outputTransport_->wroteBytes(4); | |
605 | } | |
606 | ||
607 | server_->incrementActiveProcessors(); | |
608 | ||
609 | if (server_->isThreadPoolProcessing()) { | |
610 | // We are setting up a Task to do this work and we will wait on it | |
611 | ||
612 | // Create task and dispatch to the thread manager | |
613 | std::shared_ptr<Runnable> task = std::shared_ptr<Runnable>( | |
614 | new Task(processor_, inputProtocol_, outputProtocol_, this)); | |
615 | // The application is now waiting on the task to finish | |
616 | appState_ = APP_WAIT_TASK; | |
617 | ||
618 | // Set this connection idle so that libevent doesn't process more | |
619 | // data on it while we're still waiting for the threadmanager to | |
620 | // finish this task | |
621 | setIdle(); | |
622 | ||
623 | try { | |
624 | server_->addTask(task); | |
625 | } catch (IllegalStateException& ise) { | |
626 | // The ThreadManager is not ready to handle any more tasks (it's probably shutting down). | |
627 | GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what()); | |
628 | server_->decrementActiveProcessors(); | |
629 | close(); | |
630 | } catch (TimedOutException& to) { | |
631 | GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what()); | |
632 | server_->decrementActiveProcessors(); | |
633 | close(); | |
634 | } | |
635 | ||
636 | return; | |
637 | } else { | |
638 | try { | |
639 | if (serverEventHandler_) { | |
640 | serverEventHandler_->processContext(connectionContext_, getTSocket()); | |
641 | } | |
642 | // Invoke the processor | |
643 | processor_->process(inputProtocol_, outputProtocol_, connectionContext_); | |
644 | } catch (const TTransportException& ttx) { | |
645 | GlobalOutput.printf( | |
646 | "TNonblockingServer transport error in " | |
647 | "process(): %s", | |
648 | ttx.what()); | |
649 | server_->decrementActiveProcessors(); | |
650 | close(); | |
651 | return; | |
652 | } catch (const std::exception& x) { | |
653 | GlobalOutput.printf("Server::process() uncaught exception: %s: %s", | |
654 | typeid(x).name(), | |
655 | x.what()); | |
656 | server_->decrementActiveProcessors(); | |
657 | close(); | |
658 | return; | |
659 | } catch (...) { | |
660 | GlobalOutput.printf("Server::process() unknown exception"); | |
661 | server_->decrementActiveProcessors(); | |
662 | close(); | |
663 | return; | |
664 | } | |
665 | } | |
666 | // fallthrough | |
667 | ||
668 | // Intentionally fall through here, the call to process has written into | |
669 | // the writeBuffer_ | |
670 | ||
671 | case APP_WAIT_TASK: | |
672 | // We have now finished processing a task and the result has been written | |
673 | // into the outputTransport_, so we grab its contents and place them into | |
674 | // the writeBuffer_ for actual writing by the libevent thread | |
675 | ||
676 | server_->decrementActiveProcessors(); | |
677 | // Get the result of the operation | |
678 | outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_); | |
679 | ||
680 | // If the function call generated return data, then move into the send | |
681 | // state and get going | |
682 | // 4 bytes were reserved for frame size | |
683 | if (writeBufferSize_ > 4) { | |
684 | ||
685 | // Move into write state | |
686 | writeBufferPos_ = 0; | |
687 | socketState_ = SOCKET_SEND; | |
688 | ||
689 | // Put the frame size into the write buffer | |
690 | auto frameSize = (int32_t)htonl(writeBufferSize_ - 4); | |
691 | memcpy(writeBuffer_, &frameSize, 4); | |
692 | ||
693 | // Socket into write mode | |
694 | appState_ = APP_SEND_RESULT; | |
695 | setWrite(); | |
696 | ||
697 | return; | |
698 | } | |
699 | ||
700 | // In this case, the request was oneway and we should fall through | |
701 | // right back into the read frame header state | |
702 | goto LABEL_APP_INIT; | |
703 | ||
704 | case APP_SEND_RESULT: | |
705 | // it's now safe to perform buffer size housekeeping. | |
706 | if (writeBufferSize_ > largestWriteBufferSize_) { | |
707 | largestWriteBufferSize_ = writeBufferSize_; | |
708 | } | |
709 | if (server_->getResizeBufferEveryN() > 0 | |
710 | && ++callsForResize_ >= server_->getResizeBufferEveryN()) { | |
711 | checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(), | |
712 | server_->getIdleWriteBufferLimit()); | |
713 | callsForResize_ = 0; | |
714 | } | |
715 | // fallthrough | |
716 | ||
717 | // N.B.: We also intentionally fall through here into the INIT state! | |
718 | ||
719 | LABEL_APP_INIT: | |
720 | case APP_INIT: | |
721 | ||
722 | // Clear write buffer variables | |
723 | writeBuffer_ = nullptr; | |
724 | writeBufferPos_ = 0; | |
725 | writeBufferSize_ = 0; | |
726 | ||
727 | // Into read4 state we go | |
728 | socketState_ = SOCKET_RECV_FRAMING; | |
729 | appState_ = APP_READ_FRAME_SIZE; | |
730 | ||
731 | readBufferPos_ = 0; | |
732 | ||
733 | // Register read event | |
734 | setRead(); | |
735 | ||
736 | return; | |
737 | ||
738 | case APP_READ_FRAME_SIZE: | |
739 | readWant_ += 4; | |
740 | ||
741 | // We just read the request length | |
742 | // Double the buffer size until it is big enough | |
743 | if (readWant_ > readBufferSize_) { | |
744 | if (readBufferSize_ == 0) { | |
745 | readBufferSize_ = 1; | |
746 | } | |
747 | uint32_t newSize = readBufferSize_; | |
748 | while (readWant_ > newSize) { | |
749 | newSize *= 2; | |
750 | } | |
751 | ||
752 | auto* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize); | |
753 | if (newBuffer == nullptr) { | |
754 | // nothing else to be done... | |
755 | throw std::bad_alloc(); | |
756 | } | |
757 | readBuffer_ = newBuffer; | |
758 | readBufferSize_ = newSize; | |
759 | } | |
760 | ||
761 | readBufferPos_ = 4; | |
762 | *((uint32_t*)readBuffer_) = htonl(readWant_ - 4); | |
763 | ||
764 | // Move into read request state | |
765 | socketState_ = SOCKET_RECV; | |
766 | appState_ = APP_READ_REQUEST; | |
767 | ||
768 | return; | |
769 | ||
770 | case APP_CLOSE_CONNECTION: | |
771 | server_->decrementActiveProcessors(); | |
772 | close(); | |
773 | return; | |
774 | ||
775 | default: | |
776 | GlobalOutput.printf("Unexpected Application State %d", appState_); | |
777 | assert(0); | |
778 | } | |
779 | } | |
780 | ||
781 | void TNonblockingServer::TConnection::setFlags(short eventFlags) { | |
782 | // Catch the do nothing case | |
783 | if (eventFlags_ == eventFlags) { | |
784 | return; | |
785 | } | |
786 | ||
787 | // Delete a previously existing event | |
788 | if (eventFlags_ && event_del(&event_) == -1) { | |
789 | GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR); | |
790 | return; | |
791 | } | |
792 | ||
793 | // Update in memory structure | |
794 | eventFlags_ = eventFlags; | |
795 | ||
796 | // Do not call event_set if there are no flags | |
797 | if (!eventFlags_) { | |
798 | return; | |
799 | } | |
800 | ||
801 | /* | |
802 | * event_set: | |
803 | * | |
804 | * Prepares the event structure &event to be used in future calls to | |
805 | * event_add() and event_del(). The event will be prepared to call the | |
806 | * eventHandler using the 'sock' file descriptor to monitor events. | |
807 | * | |
808 | * The events can be either EV_READ, EV_WRITE, or both, indicating | |
809 | * that an application can read or write from the file respectively without | |
810 | * blocking. | |
811 | * | |
812 | * The eventHandler will be called with the file descriptor that triggered | |
813 | * the event and the type of event which will be one of: EV_TIMEOUT, | |
814 | * EV_SIGNAL, EV_READ, EV_WRITE. | |
815 | * | |
816 | * The additional flag EV_PERSIST makes an event_add() persistent until | |
817 | * event_del() has been called. | |
818 | * | |
819 | * Once initialized, the &event struct can be used repeatedly with | |
820 | * event_add() and event_del() and does not need to be reinitialized unless | |
821 | * the eventHandler and/or the argument to it are to be changed. However, | |
822 | * when an ev structure has been added to libevent using event_add() the | |
823 | * structure must persist until the event occurs (assuming EV_PERSIST | |
824 | * is not set) or is removed using event_del(). You may not reuse the same | |
825 | * ev structure for multiple monitored descriptors; each descriptor needs | |
826 | * its own ev. | |
827 | */ | |
828 | event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this); | |
829 | event_base_set(ioThread_->getEventBase(), &event_); | |
830 | ||
831 | // Add the event | |
832 | if (event_add(&event_, nullptr) == -1) { | |
833 | GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR); | |
834 | } | |
835 | } | |
836 | ||
837 | /** | |
838 | * Closes a connection | |
839 | */ | |
840 | void TNonblockingServer::TConnection::close() { | |
841 | setIdle(); | |
842 | ||
843 | if (serverEventHandler_) { | |
844 | serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_); | |
845 | } | |
846 | ioThread_ = nullptr; | |
847 | ||
848 | // Close the socket | |
849 | tSocket_->close(); | |
850 | ||
851 | // close any factory produced transports | |
852 | factoryInputTransport_->close(); | |
853 | factoryOutputTransport_->close(); | |
854 | ||
855 | // release processor and handler | |
856 | processor_.reset(); | |
857 | ||
858 | // Give this object back to the server that owns it | |
859 | server_->returnConnection(this); | |
860 | } | |
861 | ||
862 | void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) { | |
863 | if (readLimit > 0 && readBufferSize_ > readLimit) { | |
864 | free(readBuffer_); | |
865 | readBuffer_ = nullptr; | |
866 | readBufferSize_ = 0; | |
867 | } | |
868 | ||
869 | if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) { | |
870 | // just start over | |
871 | outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())); | |
872 | largestWriteBufferSize_ = 0; | |
873 | } | |
874 | } | |
875 | ||
876 | TNonblockingServer::~TNonblockingServer() { | |
877 | // Close any active connections (moves them to the idle connection stack) | |
878 | while (activeConnections_.size()) { | |
879 | activeConnections_.front()->close(); | |
880 | } | |
881 | // Clean up unused TConnection objects in connectionStack_ | |
882 | while (!connectionStack_.empty()) { | |
883 | TConnection* connection = connectionStack_.top(); | |
884 | connectionStack_.pop(); | |
885 | delete connection; | |
886 | } | |
887 | // The TNonblockingIOThread objects have shared_ptrs to the Thread | |
888 | // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread | |
889 | // objects (as runnable) so these objects will never deallocate without help. | |
890 | while (!ioThreads_.empty()) { | |
891 | std::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back(); | |
892 | ioThreads_.pop_back(); | |
893 | iot->setThread(std::shared_ptr<Thread>()); | |
894 | } | |
895 | } | |
896 | ||
897 | /** | |
898 | * Creates a new connection either by reusing an object off the stack or | |
899 | * by allocating a new one entirely | |
900 | */ | |
901 | TNonblockingServer::TConnection* TNonblockingServer::createConnection(std::shared_ptr<TSocket> socket) { | |
902 | // Check the stack | |
903 | Guard g(connMutex_); | |
904 | ||
905 | // pick an IO thread to handle this connection -- currently round robin | |
906 | assert(nextIOThread_ < ioThreads_.size()); | |
907 | int selectedThreadIdx = nextIOThread_; | |
908 | nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size()); | |
909 | ||
910 | TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get(); | |
911 | ||
912 | // Check the connection stack to see if we can re-use | |
913 | TConnection* result = nullptr; | |
914 | if (connectionStack_.empty()) { | |
915 | result = new TConnection(socket, ioThread); | |
916 | ++numTConnections_; | |
917 | } else { | |
918 | result = connectionStack_.top(); | |
919 | connectionStack_.pop(); | |
920 | result->setSocket(socket); | |
921 | result->init(ioThread); | |
922 | } | |
923 | activeConnections_.push_back(result); | |
924 | return result; | |
925 | } | |
926 | ||
927 | /** | |
928 | * Returns a connection to the stack | |
929 | */ | |
930 | void TNonblockingServer::returnConnection(TConnection* connection) { | |
931 | Guard g(connMutex_); | |
932 | ||
933 | activeConnections_.erase(std::remove(activeConnections_.begin(), | |
934 | activeConnections_.end(), | |
935 | connection), | |
936 | activeConnections_.end()); | |
937 | ||
938 | if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) { | |
939 | delete connection; | |
940 | --numTConnections_; | |
941 | } else { | |
942 | connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_); | |
943 | connectionStack_.push(connection); | |
944 | } | |
945 | } | |
946 | ||
947 | /** | |
948 | * Server socket had something happen. We accept all waiting client | |
949 | * connections on fd and assign TConnection objects to handle those requests. | |
950 | */ | |
951 | void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) { | |
952 | (void)which; | |
953 | // Make sure that libevent didn't mess up the socket handles | |
954 | assert(fd == serverSocket_); | |
955 | ||
956 | // Going to accept a new client socket | |
957 | std::shared_ptr<TSocket> clientSocket; | |
958 | ||
959 | clientSocket = serverTransport_->accept(); | |
960 | if (clientSocket) { | |
961 | // If we're overloaded, take action here | |
962 | if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) { | |
963 | Guard g(connMutex_); | |
964 | nConnectionsDropped_++; | |
965 | nTotalConnectionsDropped_++; | |
966 | if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) { | |
967 | clientSocket->close(); | |
968 | return; | |
969 | } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) { | |
970 | if (!drainPendingTask()) { | |
971 | // Nothing left to discard, so we drop connection instead. | |
972 | clientSocket->close(); | |
973 | return; | |
974 | } | |
975 | } | |
976 | } | |
977 | ||
978 | // Create a new TConnection for this client socket. | |
979 | TConnection* clientConnection = createConnection(clientSocket); | |
980 | ||
981 | // Fail fast if we could not create a TConnection object | |
982 | if (clientConnection == nullptr) { | |
983 | GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory"); | |
984 | clientSocket->close(); | |
985 | return; | |
986 | } | |
987 | ||
988 | /* | |
989 | * Either notify the ioThread that is assigned this connection to | |
990 | * start processing, or if it is us, we'll just ask this | |
991 | * connection to do its initial state change here. | |
992 | * | |
993 | * (We need to avoid writing to our own notification pipe, to | |
994 | * avoid possible deadlocks if the pipe is full.) | |
995 | * | |
996 | * The IO thread #0 is the only one that handles these listen | |
997 | * events, so unless the connection has been assigned to thread #0 | |
998 | * we know it's not on our thread. | |
999 | */ | |
1000 | if (clientConnection->getIOThreadNumber() == 0) { | |
1001 | clientConnection->transition(); | |
1002 | } else { | |
1003 | if (!clientConnection->notifyIOThread()) { | |
1004 | GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno); | |
1005 | clientConnection->close(); | |
1006 | } | |
1007 | } | |
1008 | } | |
1009 | } | |
1010 | ||
1011 | /** | |
1012 | * Creates a socket to listen on and binds it to the local port. | |
1013 | */ | |
1014 | void TNonblockingServer::createAndListenOnSocket() { | |
1015 | serverTransport_->listen(); | |
1016 | serverSocket_ = serverTransport_->getSocketFD(); | |
1017 | } | |
1018 | ||
1019 | ||
1020 | void TNonblockingServer::setThreadManager(std::shared_ptr<ThreadManager> threadManager) { | |
1021 | threadManager_ = threadManager; | |
1022 | if (threadManager) { | |
1023 | threadManager->setExpireCallback( | |
1024 | std::bind(&TNonblockingServer::expireClose, | |
1025 | this, | |
1026 | std::placeholders::_1)); | |
1027 | threadPoolProcessing_ = true; | |
1028 | } else { | |
1029 | threadPoolProcessing_ = false; | |
1030 | } | |
1031 | } | |
1032 | ||
1033 | bool TNonblockingServer::serverOverloaded() { | |
1034 | size_t activeConnections = numTConnections_ - connectionStack_.size(); | |
1035 | if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) { | |
1036 | if (!overloaded_) { | |
1037 | GlobalOutput.printf("TNonblockingServer: overload condition begun."); | |
1038 | overloaded_ = true; | |
1039 | } | |
1040 | } else { | |
1041 | if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) | |
1042 | && (activeConnections <= overloadHysteresis_ * maxConnections_)) { | |
1043 | GlobalOutput.printf( | |
1044 | "TNonblockingServer: overload ended; " | |
1045 | "%u dropped (%llu total)", | |
1046 | nConnectionsDropped_, | |
1047 | nTotalConnectionsDropped_); | |
1048 | nConnectionsDropped_ = 0; | |
1049 | overloaded_ = false; | |
1050 | } | |
1051 | } | |
1052 | ||
1053 | return overloaded_; | |
1054 | } | |
1055 | ||
1056 | bool TNonblockingServer::drainPendingTask() { | |
1057 | if (threadManager_) { | |
1058 | std::shared_ptr<Runnable> task = threadManager_->removeNextPending(); | |
1059 | if (task) { | |
1060 | TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection(); | |
1061 | assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK); | |
1062 | connection->forceClose(); | |
1063 | return true; | |
1064 | } | |
1065 | } | |
1066 | return false; | |
1067 | } | |
1068 | ||
1069 | void TNonblockingServer::expireClose(std::shared_ptr<Runnable> task) { | |
1070 | TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection(); | |
1071 | assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK); | |
1072 | connection->forceClose(); | |
1073 | } | |
1074 | ||
1075 | void TNonblockingServer::stop() { | |
1076 | // Breaks the event loop in all threads so that they end ASAP. | |
1077 | for (auto & ioThread : ioThreads_) { | |
1078 | ioThread->stop(); | |
1079 | } | |
1080 | } | |
1081 | ||
1082 | void TNonblockingServer::registerEvents(event_base* user_event_base) { | |
1083 | userEventBase_ = user_event_base; | |
1084 | ||
1085 | // init listen socket | |
1086 | if (serverSocket_ == THRIFT_INVALID_SOCKET) | |
1087 | createAndListenOnSocket(); | |
1088 | ||
1089 | // set up the IO threads | |
1090 | assert(ioThreads_.empty()); | |
1091 | if (!numIOThreads_) { | |
1092 | numIOThreads_ = DEFAULT_IO_THREADS; | |
1093 | } | |
1094 | // User-provided event-base doesn't works for multi-threaded servers | |
1095 | assert(numIOThreads_ == 1 || !userEventBase_); | |
1096 | ||
1097 | for (uint32_t id = 0; id < numIOThreads_; ++id) { | |
1098 | // the first IO thread also does the listening on server socket | |
1099 | THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET); | |
1100 | ||
1101 | shared_ptr<TNonblockingIOThread> thread( | |
1102 | new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_)); | |
1103 | ioThreads_.push_back(thread); | |
1104 | } | |
1105 | ||
1106 | // Notify handler of the preServe event | |
1107 | if (eventHandler_) { | |
1108 | eventHandler_->preServe(); | |
1109 | } | |
1110 | ||
1111 | // Start all of our helper IO threads. Note that the threads run forever, | |
1112 | // only terminating if stop() is called. | |
1113 | assert(ioThreads_.size() == numIOThreads_); | |
1114 | assert(ioThreads_.size() > 0); | |
1115 | ||
1116 | GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.", | |
1117 | ioThreads_.size()); | |
1118 | ||
1119 | // Launch all the secondary IO threads in separate threads | |
1120 | if (ioThreads_.size() > 1) { | |
1121 | ioThreadFactory_.reset(new ThreadFactory( | |
1122 | false // detached | |
1123 | )); | |
1124 | ||
1125 | assert(ioThreadFactory_.get()); | |
1126 | ||
1127 | // intentionally starting at thread 1, not 0 | |
1128 | for (uint32_t i = 1; i < ioThreads_.size(); ++i) { | |
1129 | shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]); | |
1130 | ioThreads_[i]->setThread(thread); | |
1131 | thread->start(); | |
1132 | } | |
1133 | } | |
1134 | ||
1135 | // Register the events for the primary (listener) IO thread | |
1136 | ioThreads_[0]->registerEvents(); | |
1137 | } | |
1138 | ||
1139 | /** | |
1140 | * Main workhorse function, starts up the server listening on a port and | |
1141 | * loops over the libevent handler. | |
1142 | */ | |
1143 | void TNonblockingServer::serve() { | |
1144 | ||
1145 | if (ioThreads_.empty()) | |
1146 | registerEvents(nullptr); | |
1147 | ||
1148 | // Run the primary (listener) IO thread loop in our main thread; this will | |
1149 | // only return when the server is shutting down. | |
1150 | ioThreads_[0]->run(); | |
1151 | ||
1152 | // Ensure all threads are finished before exiting serve() | |
1153 | for (uint32_t i = 0; i < ioThreads_.size(); ++i) { | |
1154 | ioThreads_[i]->join(); | |
1155 | GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i); | |
1156 | } | |
1157 | } | |
1158 | ||
1159 | TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server, | |
1160 | int number, | |
1161 | THRIFT_SOCKET listenSocket, | |
1162 | bool useHighPriority) | |
1163 | : server_(server), | |
1164 | number_(number), | |
1165 | threadId_{}, | |
1166 | listenSocket_(listenSocket), | |
1167 | useHighPriority_(useHighPriority), | |
1168 | eventBase_(nullptr), | |
1169 | ownEventBase_(false), | |
1170 | serverEvent_{}, | |
1171 | notificationEvent_{} { | |
1172 | notificationPipeFDs_[0] = -1; | |
1173 | notificationPipeFDs_[1] = -1; | |
1174 | } | |
1175 | ||
1176 | TNonblockingIOThread::~TNonblockingIOThread() { | |
1177 | // make sure our associated thread is fully finished | |
1178 | join(); | |
1179 | ||
1180 | if (eventBase_ && ownEventBase_) { | |
1181 | event_base_free(eventBase_); | |
1182 | ownEventBase_ = false; | |
1183 | } | |
1184 | ||
1185 | if (listenSocket_ != THRIFT_INVALID_SOCKET) { | |
1186 | if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) { | |
1187 | GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR); | |
1188 | } | |
1189 | listenSocket_ = THRIFT_INVALID_SOCKET; | |
1190 | } | |
1191 | ||
1192 | for (auto notificationPipeFD : notificationPipeFDs_) { | |
1193 | if (notificationPipeFD >= 0) { | |
1194 | if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFD)) { | |
1195 | GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ", | |
1196 | THRIFT_GET_SOCKET_ERROR); | |
1197 | } | |
1198 | notificationPipeFD = THRIFT_INVALID_SOCKET; | |
1199 | } | |
1200 | } | |
1201 | } | |
1202 | ||
1203 | void TNonblockingIOThread::createNotificationPipe() { | |
1204 | if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) { | |
1205 | GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR()); | |
1206 | throw TException("can't create notification pipe"); | |
1207 | } | |
1208 | if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0 | |
1209 | || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) { | |
1210 | ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]); | |
1211 | ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]); | |
1212 | throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK"); | |
1213 | } | |
1214 | for (auto notificationPipeFD : notificationPipeFDs_) { | |
1215 | #if LIBEVENT_VERSION_NUMBER < 0x02000000 | |
1216 | int flags; | |
1217 | if ((flags = THRIFT_FCNTL(notificationPipeFD, F_GETFD, 0)) < 0 | |
1218 | || THRIFT_FCNTL(notificationPipeFD, F_SETFD, flags | FD_CLOEXEC) < 0) { | |
1219 | #else | |
1220 | if (evutil_make_socket_closeonexec(notificationPipeFD) < 0) { | |
1221 | #endif | |
1222 | ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]); | |
1223 | ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]); | |
1224 | throw TException( | |
1225 | "TNonblockingServer::createNotificationPipe() " | |
1226 | "FD_CLOEXEC"); | |
1227 | } | |
1228 | } | |
1229 | } | |
1230 | ||
1231 | /** | |
1232 | * Register the core libevent events onto the proper base. | |
1233 | */ | |
1234 | void TNonblockingIOThread::registerEvents() { | |
1235 | threadId_ = Thread::get_current(); | |
1236 | ||
1237 | assert(eventBase_ == nullptr); | |
1238 | eventBase_ = getServer()->getUserEventBase(); | |
1239 | if (eventBase_ == nullptr) { | |
1240 | eventBase_ = event_base_new(); | |
1241 | ownEventBase_ = true; | |
1242 | } | |
1243 | ||
1244 | // Print some libevent stats | |
1245 | if (number_ == 0) { | |
1246 | GlobalOutput.printf("TNonblockingServer: using libevent %s method %s", | |
1247 | event_get_version(), | |
1248 | event_base_get_method(eventBase_)); | |
1249 | } | |
1250 | ||
1251 | if (listenSocket_ != THRIFT_INVALID_SOCKET) { | |
1252 | // Register the server event | |
1253 | event_set(&serverEvent_, | |
1254 | listenSocket_, | |
1255 | EV_READ | EV_PERSIST, | |
1256 | TNonblockingIOThread::listenHandler, | |
1257 | server_); | |
1258 | event_base_set(eventBase_, &serverEvent_); | |
1259 | ||
1260 | // Add the event and start up the server | |
1261 | if (-1 == event_add(&serverEvent_, nullptr)) { | |
1262 | throw TException( | |
1263 | "TNonblockingServer::serve(): " | |
1264 | "event_add() failed on server listen event"); | |
1265 | } | |
1266 | GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_); | |
1267 | } | |
1268 | ||
1269 | createNotificationPipe(); | |
1270 | ||
1271 | // Create an event to be notified when a task finishes | |
1272 | event_set(¬ificationEvent_, | |
1273 | getNotificationRecvFD(), | |
1274 | EV_READ | EV_PERSIST, | |
1275 | TNonblockingIOThread::notifyHandler, | |
1276 | this); | |
1277 | ||
1278 | // Attach to the base | |
1279 | event_base_set(eventBase_, ¬ificationEvent_); | |
1280 | ||
1281 | // Add the event and start up the server | |
1282 | if (-1 == event_add(¬ificationEvent_, nullptr)) { | |
1283 | throw TException( | |
1284 | "TNonblockingServer::serve(): " | |
1285 | "event_add() failed on task-done notification event"); | |
1286 | } | |
1287 | GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_); | |
1288 | } | |
1289 | ||
1290 | bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) { | |
1291 | auto fd = getNotificationSendFD(); | |
1292 | if (fd < 0) { | |
1293 | return false; | |
1294 | } | |
1295 | ||
1296 | int ret = -1; | |
1297 | long kSize = sizeof(conn); | |
1298 | const char * pos = (const char *)const_cast_sockopt(&conn); | |
1299 | ||
1300 | #if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H) | |
1301 | struct pollfd pfd = {fd, POLLOUT, 0}; | |
1302 | ||
1303 | while (kSize > 0) { | |
1304 | pfd.revents = 0; | |
1305 | ret = poll(&pfd, 1, -1); | |
1306 | if (ret < 0) { | |
1307 | return false; | |
1308 | } else if (ret == 0) { | |
1309 | continue; | |
1310 | } | |
1311 | ||
1312 | if (pfd.revents & POLLHUP || pfd.revents & POLLERR) { | |
1313 | ::THRIFT_CLOSESOCKET(fd); | |
1314 | return false; | |
1315 | } | |
1316 | ||
1317 | if (pfd.revents & POLLOUT) { | |
1318 | ret = send(fd, pos, kSize, 0); | |
1319 | if (ret < 0) { | |
1320 | if (errno == EAGAIN) { | |
1321 | continue; | |
1322 | } | |
1323 | ||
1324 | ::THRIFT_CLOSESOCKET(fd); | |
1325 | return false; | |
1326 | } | |
1327 | ||
1328 | kSize -= ret; | |
1329 | pos += ret; | |
1330 | } | |
1331 | } | |
1332 | #else | |
1333 | fd_set wfds, efds; | |
1334 | ||
1335 | while (kSize > 0) { | |
1336 | FD_ZERO(&wfds); | |
1337 | FD_ZERO(&efds); | |
1338 | FD_SET(fd, &wfds); | |
1339 | FD_SET(fd, &efds); | |
1340 | ret = select(static_cast<int>(fd + 1), NULL, &wfds, &efds, NULL); | |
1341 | if (ret < 0) { | |
1342 | return false; | |
1343 | } else if (ret == 0) { | |
1344 | continue; | |
1345 | } | |
1346 | ||
1347 | if (FD_ISSET(fd, &efds)) { | |
1348 | ::THRIFT_CLOSESOCKET(fd); | |
1349 | return false; | |
1350 | } | |
1351 | ||
1352 | if (FD_ISSET(fd, &wfds)) { | |
1353 | ret = send(fd, pos, kSize, 0); | |
1354 | if (ret < 0) { | |
1355 | if (errno == EAGAIN) { | |
1356 | continue; | |
1357 | } | |
1358 | ||
1359 | ::THRIFT_CLOSESOCKET(fd); | |
1360 | return false; | |
1361 | } | |
1362 | ||
1363 | kSize -= ret; | |
1364 | pos += ret; | |
1365 | } | |
1366 | } | |
1367 | #endif | |
1368 | ||
1369 | return true; | |
1370 | } | |
1371 | ||
1372 | /* static */ | |
1373 | void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) { | |
1374 | auto* ioThread = (TNonblockingIOThread*)v; | |
1375 | assert(ioThread); | |
1376 | (void)which; | |
1377 | ||
1378 | while (true) { | |
1379 | TNonblockingServer::TConnection* connection = nullptr; | |
1380 | const int kSize = sizeof(connection); | |
1381 | long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0); | |
1382 | if (nBytes == kSize) { | |
1383 | if (connection == nullptr) { | |
1384 | // this is the command to stop our thread, exit the handler! | |
1385 | ioThread->breakLoop(false); | |
1386 | return; | |
1387 | } | |
1388 | connection->transition(); | |
1389 | } else if (nBytes > 0) { | |
1390 | // throw away these bytes and hope that next time we get a solid read | |
1391 | GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize); | |
1392 | ioThread->breakLoop(true); | |
1393 | return; | |
1394 | } else if (nBytes == 0) { | |
1395 | GlobalOutput.printf("notifyHandler: Notify socket closed!"); | |
1396 | ioThread->breakLoop(false); | |
1397 | // exit the loop | |
1398 | break; | |
1399 | } else { // nBytes < 0 | |
1400 | if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK | |
1401 | && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) { | |
1402 | GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR); | |
1403 | ioThread->breakLoop(true); | |
1404 | return; | |
1405 | } | |
1406 | // exit the loop | |
1407 | break; | |
1408 | } | |
1409 | } | |
1410 | } | |
1411 | ||
1412 | void TNonblockingIOThread::breakLoop(bool error) { | |
1413 | if (error) { | |
1414 | GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_); | |
1415 | // TODO: figure out something better to do here, but for now kill the | |
1416 | // whole process. | |
1417 | GlobalOutput.printf("TNonblockingServer: aborting process."); | |
1418 | ::abort(); | |
1419 | } | |
1420 | ||
1421 | // If we're running in the same thread, we can't use the notify(0) | |
1422 | // mechanism to stop the thread, but happily if we're running in the | |
1423 | // same thread, this means the thread can't be blocking in the event | |
1424 | // loop either. | |
1425 | if (!Thread::is_current(threadId_)) { | |
1426 | notify(nullptr); | |
1427 | } else { | |
1428 | // cause the loop to stop ASAP - even if it has things to do in it | |
1429 | event_base_loopbreak(eventBase_); | |
1430 | } | |
1431 | } | |
1432 | ||
1433 | void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) { | |
1434 | #ifdef HAVE_SCHED_H | |
1435 | // Start out with a standard, low-priority setup for the sched params. | |
1436 | struct sched_param sp; | |
1437 | bzero((void*)&sp, sizeof(sp)); | |
1438 | int policy = SCHED_OTHER; | |
1439 | ||
1440 | // If desired, set up high-priority sched params structure. | |
1441 | if (value) { | |
1442 | // FIFO scheduler, ranked above default SCHED_OTHER queue | |
1443 | policy = SCHED_FIFO; | |
1444 | // The priority only compares us to other SCHED_FIFO threads, so we | |
1445 | // just pick a random priority halfway between min & max. | |
1446 | const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2; | |
1447 | ||
1448 | sp.sched_priority = priority; | |
1449 | } | |
1450 | ||
1451 | // Actually set the sched params for the current thread. | |
1452 | if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) { | |
1453 | GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_); | |
1454 | } else { | |
1455 | GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR); | |
1456 | } | |
1457 | #else | |
1458 | THRIFT_UNUSED_VARIABLE(value); | |
1459 | #endif | |
1460 | } | |
1461 | ||
1462 | void TNonblockingIOThread::run() { | |
1463 | if (eventBase_ == nullptr) { | |
1464 | registerEvents(); | |
1465 | } | |
1466 | if (useHighPriority_) { | |
1467 | setCurrentThreadHighPriority(true); | |
1468 | } | |
1469 | ||
1470 | if (eventBase_ != nullptr) | |
1471 | { | |
1472 | GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_); | |
1473 | // Run libevent engine, never returns, invokes calls to eventHandler | |
1474 | event_base_loop(eventBase_, 0); | |
1475 | ||
1476 | if (useHighPriority_) { | |
1477 | setCurrentThreadHighPriority(false); | |
1478 | } | |
1479 | ||
1480 | // cleans up our registered events | |
1481 | cleanupEvents(); | |
1482 | } | |
1483 | ||
1484 | GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_); | |
1485 | } | |
1486 | ||
1487 | void TNonblockingIOThread::cleanupEvents() { | |
1488 | // stop the listen socket, if any | |
1489 | if (listenSocket_ != THRIFT_INVALID_SOCKET) { | |
1490 | if (event_del(&serverEvent_) == -1) { | |
1491 | GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR); | |
1492 | } | |
1493 | } | |
1494 | ||
1495 | event_del(¬ificationEvent_); | |
1496 | } | |
1497 | ||
1498 | void TNonblockingIOThread::stop() { | |
1499 | // This should cause the thread to fall out of its event loop ASAP. | |
1500 | breakLoop(false); | |
1501 | } | |
1502 | ||
1503 | void TNonblockingIOThread::join() { | |
1504 | // If this was a thread created by a factory (not the thread that called | |
1505 | // serve()), we join() it to make sure we shut down fully. | |
1506 | if (thread_) { | |
1507 | try { | |
1508 | // Note that it is safe to both join() ourselves twice, as well as join | |
1509 | // the current thread as the pthread implementation checks for deadlock. | |
1510 | thread_->join(); | |
1511 | } catch (...) { | |
1512 | // swallow everything | |
1513 | } | |
1514 | } | |
1515 | } | |
1516 | } | |
1517 | } | |
1518 | } // apache::thrift::server |