2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <thrift/thrift-config.h>
25 #include <sys/types.h>
26 #ifdef HAVE_SYS_SOCKET_H
27 #include <sys/socket.h>
32 #ifdef HAVE_SYS_POLL_H
35 #ifdef HAVE_NETINET_IN_H
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
47 #include <thrift/transport/TSocket.h>
48 #include <thrift/transport/TNonblockingServerSocket.h>
49 #include <thrift/transport/PlatformSocket.h>
52 #define AF_LOCAL AF_UNIX
55 #ifndef SOCKOPT_CAST_T
57 #define SOCKOPT_CAST_T void
59 #define SOCKOPT_CAST_T char
64 inline const SOCKOPT_CAST_T
* const_cast_sockopt(const T
* v
) {
65 return reinterpret_cast<const SOCKOPT_CAST_T
*>(v
);
69 inline SOCKOPT_CAST_T
* cast_sockopt(T
* v
) {
70 return reinterpret_cast<SOCKOPT_CAST_T
*>(v
);
78 using std::shared_ptr
;
80 TNonblockingServerSocket::TNonblockingServerSocket(int port
)
83 serverSocket_(THRIFT_INVALID_SOCKET
),
84 acceptBacklog_(DEFAULT_BACKLOG
),
95 TNonblockingServerSocket::TNonblockingServerSocket(int port
, int sendTimeout
, int recvTimeout
)
98 serverSocket_(THRIFT_INVALID_SOCKET
),
99 acceptBacklog_(DEFAULT_BACKLOG
),
100 sendTimeout_(sendTimeout
),
101 recvTimeout_(recvTimeout
),
110 TNonblockingServerSocket::TNonblockingServerSocket(const string
& address
, int port
)
114 serverSocket_(THRIFT_INVALID_SOCKET
),
115 acceptBacklog_(DEFAULT_BACKLOG
),
126 TNonblockingServerSocket::TNonblockingServerSocket(const string
& path
)
130 serverSocket_(THRIFT_INVALID_SOCKET
),
131 acceptBacklog_(DEFAULT_BACKLOG
),
142 TNonblockingServerSocket::~TNonblockingServerSocket() {
146 void TNonblockingServerSocket::setSendTimeout(int sendTimeout
) {
147 sendTimeout_
= sendTimeout
;
150 void TNonblockingServerSocket::setRecvTimeout(int recvTimeout
) {
151 recvTimeout_
= recvTimeout
;
154 void TNonblockingServerSocket::setAcceptBacklog(int accBacklog
) {
155 acceptBacklog_
= accBacklog
;
158 void TNonblockingServerSocket::setRetryLimit(int retryLimit
) {
159 retryLimit_
= retryLimit
;
162 void TNonblockingServerSocket::setRetryDelay(int retryDelay
) {
163 retryDelay_
= retryDelay
;
166 void TNonblockingServerSocket::setTcpSendBuffer(int tcpSendBuffer
) {
167 tcpSendBuffer_
= tcpSendBuffer
;
170 void TNonblockingServerSocket::setTcpRecvBuffer(int tcpRecvBuffer
) {
171 tcpRecvBuffer_
= tcpRecvBuffer
;
174 void TNonblockingServerSocket::listen() {
177 TWinsockSingleton::create();
180 // Validate port number
181 if (port_
< 0 || port_
> 0xFFFF) {
182 throw TTransportException(TTransportException::BAD_ARGS
, "Specified port is invalid");
185 const struct addrinfo
*res
;
187 char port
[sizeof("65535")];
188 THRIFT_SNPRINTF(port
, sizeof(port
), "%d", port_
);
190 struct addrinfo hints
;
191 std::memset(&hints
, 0, sizeof(hints
));
192 hints
.ai_family
= PF_UNSPEC
;
193 hints
.ai_socktype
= SOCK_STREAM
;
194 hints
.ai_flags
= AI_PASSIVE
| AI_ADDRCONFIG
;
196 // If address is not specified use wildcard address (NULL)
197 TGetAddrInfoWrapper
info(address_
.empty() ? nullptr : &address_
[0], port
, &hints
);
201 GlobalOutput
.printf("getaddrinfo %d: %s", error
, THRIFT_GAI_STRERROR(error
));
203 throw TTransportException(TTransportException::NOT_OPEN
,
204 "Could not resolve host for server socket.");
207 // Pick the ipv6 address first since ipv4 addresses can be mapped
209 for (res
= info
.res(); res
; res
= res
->ai_next
) {
210 if (res
->ai_family
== AF_INET6
|| res
->ai_next
== nullptr)
214 if (!path_
.empty()) {
215 serverSocket_
= socket(PF_UNIX
, SOCK_STREAM
, IPPROTO_IP
);
216 } else if (res
!= nullptr) {
217 serverSocket_
= socket(res
->ai_family
, res
->ai_socktype
, res
->ai_protocol
);
220 if (serverSocket_
== THRIFT_INVALID_SOCKET
) {
221 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
222 GlobalOutput
.perror("TNonblockingServerSocket::listen() socket() ", errno_copy
);
224 throw TTransportException(TTransportException::NOT_OPEN
,
225 "Could not create server socket.",
229 // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
231 if (-1 == setsockopt(serverSocket_
,
233 THRIFT_NO_SOCKET_CACHING
,
236 // ignore errors coming out of this setsockopt on Windows. This is because
237 // SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
238 // want to force servers to be an admin.
240 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
241 GlobalOutput
.perror("TNonblockingServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ",
244 throw TTransportException(TTransportException::NOT_OPEN
,
245 "Could not set THRIFT_NO_SOCKET_CACHING",
250 // Set TCP buffer sizes
251 if (tcpSendBuffer_
> 0) {
252 if (-1 == setsockopt(serverSocket_
,
255 cast_sockopt(&tcpSendBuffer_
),
256 sizeof(tcpSendBuffer_
))) {
257 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
258 GlobalOutput
.perror("TNonblockingServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy
);
260 throw TTransportException(TTransportException::NOT_OPEN
,
261 "Could not set SO_SNDBUF",
266 if (tcpRecvBuffer_
> 0) {
267 if (-1 == setsockopt(serverSocket_
,
270 cast_sockopt(&tcpRecvBuffer_
),
271 sizeof(tcpRecvBuffer_
))) {
272 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
273 GlobalOutput
.perror("TNonblockingServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy
);
275 throw TTransportException(TTransportException::NOT_OPEN
,
276 "Could not set SO_RCVBUF",
282 if (res
->ai_family
== AF_INET6
&& path_
.empty()) {
284 if (-1 == setsockopt(serverSocket_
,
289 GlobalOutput
.perror("TNonblockingServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR
);
292 #endif // #ifdef IPV6_V6ONLY
294 // Turn linger off, don't want to block on calls to close
295 struct linger ling
= {0, 0};
296 if (-1 == setsockopt(serverSocket_
, SOL_SOCKET
, SO_LINGER
, cast_sockopt(&ling
), sizeof(ling
))) {
297 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
298 GlobalOutput
.perror("TNonblockingServerSocket::listen() setsockopt() SO_LINGER ", errno_copy
);
300 throw TTransportException(TTransportException::NOT_OPEN
, "Could not set SO_LINGER", errno_copy
);
303 // Keepalive to ensure full result flushing
304 if (-1 == setsockopt(serverSocket_
, SOL_SOCKET
, SO_KEEPALIVE
, const_cast_sockopt(&one
), sizeof(one
))) {
305 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
306 GlobalOutput
.perror("TNonblockingServerSocket::listen() setsockopt() SO_KEEPALIVE ", errno_copy
);
308 throw TTransportException(TTransportException::NOT_OPEN
,
309 "Could not set TCP_NODELAY",
313 // Set TCP nodelay if available, MAC OS X Hack
314 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
316 // Unix Sockets do not need that
318 // TCP Nodelay, speed over bandwidth
320 == setsockopt(serverSocket_
, IPPROTO_TCP
, TCP_NODELAY
, cast_sockopt(&one
), sizeof(one
))) {
321 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
322 GlobalOutput
.perror("TNonblockingServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy
);
324 throw TTransportException(TTransportException::NOT_OPEN
,
325 "Could not set TCP_NODELAY",
331 // Set NONBLOCK on the accept socket
332 int flags
= THRIFT_FCNTL(serverSocket_
, THRIFT_F_GETFL
, 0);
334 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
335 GlobalOutput
.perror("TNonblockingServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy
);
337 throw TTransportException(TTransportException::NOT_OPEN
,
338 "THRIFT_FCNTL() THRIFT_F_GETFL failed",
342 if (-1 == THRIFT_FCNTL(serverSocket_
, THRIFT_F_SETFL
, flags
| THRIFT_O_NONBLOCK
)) {
343 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
344 GlobalOutput
.perror("TNonblockingServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy
);
346 throw TTransportException(TTransportException::NOT_OPEN
,
347 "THRIFT_FCNTL() THRIFT_F_SETFL THRIFT_O_NONBLOCK failed",
351 #ifdef TCP_LOW_MIN_RTO
352 if (TSocket::getUseLowMinRto()) {
353 if (-1 == setsockopt(s
, IPPROTO_TCP
, TCP_LOW_MIN_RTO
, const_cast_sockopt(&one
), sizeof(one
))) {
354 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
355 GlobalOutput
.perror("TNonblockingServerSocket::listen() setsockopt() TCP_LOW_MIN_RTO ", errno_copy
);
357 throw TTransportException(TTransportException::NOT_OPEN
,
358 "Could not set TCP_NODELAY",
364 // prepare the port information
365 // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
366 // always seem to work. The client can configure the retry variables.
370 if (!path_
.empty()) {
374 // Unix Domain Socket
375 size_t len
= path_
.size() + 1;
376 if (len
> sizeof(((sockaddr_un
*)nullptr)->sun_path
)) {
377 errno_copy
= THRIFT_GET_SOCKET_ERROR
;
378 GlobalOutput
.perror("TSocket::listen() Unix Domain socket path too long", errno_copy
);
379 throw TTransportException(TTransportException::NOT_OPEN
,
380 "Unix Domain socket path too long",
384 struct sockaddr_un address
;
385 address
.sun_family
= AF_UNIX
;
386 memcpy(address
.sun_path
, path_
.c_str(), len
);
388 auto structlen
= static_cast<socklen_t
>(sizeof(address
));
390 if (!address
.sun_path
[0]) { // abstract namespace socket
392 // sun_path is not null-terminated in this case and structlen determines its length
393 structlen
-= sizeof(address
.sun_path
) - len
;
395 GlobalOutput
.perror("TSocket::open() Abstract Namespace Domain sockets only supported on linux: ", -99);
396 throw TTransportException(TTransportException::NOT_OPEN
,
397 " Abstract Namespace Domain socket path not supported");
402 if (0 == ::bind(serverSocket_
, (struct sockaddr
*)&address
, structlen
)) {
405 errno_copy
= THRIFT_GET_SOCKET_ERROR
;
406 // use short circuit evaluation here to only sleep if we need to
407 } while ((retries
++ < retryLimit_
) && (THRIFT_SLEEP_SEC(retryDelay_
) == 0));
409 GlobalOutput
.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
410 throw TTransportException(TTransportException::NOT_OPEN
,
411 " Unix Domain socket path not supported");
415 if (0 == ::bind(serverSocket_
, res
->ai_addr
, static_cast<int>(res
->ai_addrlen
))) {
418 errno_copy
= THRIFT_GET_SOCKET_ERROR
;
419 // use short circuit evaluation here to only sleep if we need to
420 } while ((retries
++ < retryLimit_
) && (THRIFT_SLEEP_SEC(retryDelay_
) == 0));
422 // retrieve bind info
423 if (port_
== 0 && retries
<= retryLimit_
) {
424 struct sockaddr_storage sa
;
425 socklen_t len
= sizeof(sa
);
426 std::memset(&sa
, 0, len
);
427 if (::getsockname(serverSocket_
, reinterpret_cast<struct sockaddr
*>(&sa
), &len
) < 0) {
428 errno_copy
= THRIFT_GET_SOCKET_ERROR
;
429 GlobalOutput
.perror("TNonblockingServerSocket::getPort() getsockname() ", errno_copy
);
431 if (sa
.ss_family
== AF_INET6
) {
432 const auto* sin
= reinterpret_cast<const struct sockaddr_in6
*>(&sa
);
433 listenPort_
= ntohs(sin
->sin6_port
);
435 const auto* sin
= reinterpret_cast<const struct sockaddr_in
*>(&sa
);
436 listenPort_
= ntohs(sin
->sin_port
);
442 // throw an error if we failed to bind properly
443 if (retries
> retryLimit_
) {
445 if (!path_
.empty()) {
446 THRIFT_SNPRINTF(errbuf
, sizeof(errbuf
), "TNonblockingServerSocket::listen() PATH %s", path_
.c_str());
448 THRIFT_SNPRINTF(errbuf
, sizeof(errbuf
), "TNonblockingServerSocket::listen() BIND %d", port_
);
450 GlobalOutput(errbuf
);
452 throw TTransportException(TTransportException::NOT_OPEN
,
458 listenCallback_(serverSocket_
);
461 if (-1 == ::listen(serverSocket_
, acceptBacklog_
)) {
462 errno_copy
= THRIFT_GET_SOCKET_ERROR
;
463 GlobalOutput
.perror("TNonblockingServerSocket::listen() listen() ", errno_copy
);
465 throw TTransportException(TTransportException::NOT_OPEN
, "Could not listen", errno_copy
);
468 // The socket is now listening!
471 int TNonblockingServerSocket::getPort() {
475 int TNonblockingServerSocket::getListenPort() {
479 shared_ptr
<TSocket
> TNonblockingServerSocket::acceptImpl() {
480 if (serverSocket_
== THRIFT_INVALID_SOCKET
) {
481 throw TTransportException(TTransportException::NOT_OPEN
, "TNonblockingServerSocket not listening");
484 struct sockaddr_storage clientAddress
;
485 int size
= sizeof(clientAddress
);
486 THRIFT_SOCKET clientSocket
487 = ::accept(serverSocket_
, (struct sockaddr
*)&clientAddress
, (socklen_t
*)&size
);
489 if (clientSocket
== THRIFT_INVALID_SOCKET
) {
490 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
491 GlobalOutput
.perror("TNonblockingServerSocket::acceptImpl() ::accept() ", errno_copy
);
492 throw TTransportException(TTransportException::UNKNOWN
, "accept()", errno_copy
);
495 // Explicitly set this socket to NONBLOCK mode
496 int flags
= THRIFT_FCNTL(clientSocket
, THRIFT_F_GETFL
, 0);
498 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
499 ::THRIFT_CLOSESOCKET(clientSocket
);
500 GlobalOutput
.perror("TNonblockingServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy
);
501 throw TTransportException(TTransportException::UNKNOWN
,
502 "THRIFT_FCNTL(THRIFT_F_GETFL)",
506 if (-1 == THRIFT_FCNTL(clientSocket
, THRIFT_F_SETFL
, flags
| THRIFT_O_NONBLOCK
)) {
507 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
508 ::THRIFT_CLOSESOCKET(clientSocket
);
510 .perror("TNonblockingServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ",
512 throw TTransportException(TTransportException::UNKNOWN
,
513 "THRIFT_FCNTL(THRIFT_F_SETFL)",
517 shared_ptr
<TSocket
> client
= createSocket(clientSocket
);
518 if (sendTimeout_
> 0) {
519 client
->setSendTimeout(sendTimeout_
);
521 if (recvTimeout_
> 0) {
522 client
->setRecvTimeout(recvTimeout_
);
525 client
->setKeepAlive(keepAlive_
);
527 client
->setCachedAddress((sockaddr
*)&clientAddress
, size
);
530 acceptCallback_(clientSocket
);
535 shared_ptr
<TSocket
> TNonblockingServerSocket::createSocket(THRIFT_SOCKET clientSocket
) {
536 return std::make_shared
<TSocket
>(clientSocket
);
539 void TNonblockingServerSocket::close() {
540 if (serverSocket_
!= THRIFT_INVALID_SOCKET
) {
541 shutdown(serverSocket_
, THRIFT_SHUT_RDWR
);
542 ::THRIFT_CLOSESOCKET(serverSocket_
);
544 serverSocket_
= THRIFT_INVALID_SOCKET
;
549 } // apache::thrift::transport