2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <thrift/thrift-config.h>
24 #ifdef HAVE_SYS_IOCTL_H
25 #include <sys/ioctl.h>
27 #ifdef HAVE_SYS_SOCKET_H
28 #include <sys/socket.h>
33 #ifdef HAVE_SYS_POLL_H
36 #include <sys/types.h>
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
46 #include <thrift/concurrency/Monitor.h>
47 #include <thrift/transport/TSocket.h>
48 #include <thrift/transport/TTransportException.h>
49 #include <thrift/transport/PlatformSocket.h>
51 #ifndef SOCKOPT_CAST_T
53 #define SOCKOPT_CAST_T void
55 #define SOCKOPT_CAST_T char
60 inline const SOCKOPT_CAST_T
* const_cast_sockopt(const T
* v
) {
61 return reinterpret_cast<const SOCKOPT_CAST_T
*>(v
);
65 inline SOCKOPT_CAST_T
* cast_sockopt(T
* v
) {
66 return reinterpret_cast<SOCKOPT_CAST_T
*>(v
);
76 * TSocket implementation.
80 TSocket::TSocket(const string
& host
, int port
)
83 socket_(THRIFT_INVALID_SOCKET
),
95 TSocket::TSocket(const string
& path
)
98 socket_(THRIFT_INVALID_SOCKET
),
108 cachedPeerAddr_
.ipv4
.sin_family
= AF_UNSPEC
;
113 socket_(THRIFT_INVALID_SOCKET
),
123 cachedPeerAddr_
.ipv4
.sin_family
= AF_UNSPEC
;
126 TSocket::TSocket(THRIFT_SOCKET socket
)
138 cachedPeerAddr_
.ipv4
.sin_family
= AF_UNSPEC
;
142 setsockopt(socket_
, SOL_SOCKET
, SO_NOSIGPIPE
, &one
, sizeof(one
));
147 TSocket::TSocket(THRIFT_SOCKET socket
, std::shared_ptr
<THRIFT_SOCKET
> interruptListener
)
151 interruptListener_(interruptListener
),
160 cachedPeerAddr_
.ipv4
.sin_family
= AF_UNSPEC
;
164 setsockopt(socket_
, SOL_SOCKET
, SO_NOSIGPIPE
, &one
, sizeof(one
));
169 TSocket::~TSocket() {
173 bool TSocket::hasPendingDataToRead() {
179 THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE numBytesAvailable
;
181 int r
= THRIFT_IOCTL_SOCKET(socket_
, FIONREAD
, &numBytesAvailable
);
183 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
184 if (errno_copy
== THRIFT_EINTR
&& (retries
++ < maxRecvRetries_
)) {
187 GlobalOutput
.perror("TSocket::hasPendingDataToRead() THRIFT_IOCTL_SOCKET() " + getSocketInfo(), errno_copy
);
188 throw TTransportException(TTransportException::UNKNOWN
, "Unknown", errno_copy
);
190 return numBytesAvailable
> 0;
193 bool TSocket::isOpen() const {
194 return (socket_
!= THRIFT_INVALID_SOCKET
);
197 bool TSocket::peek() {
201 if (interruptListener_
) {
202 for (int retries
= 0;;) {
203 struct THRIFT_POLLFD fds
[2];
204 std::memset(fds
, 0, sizeof(fds
));
206 fds
[0].events
= THRIFT_POLLIN
;
207 fds
[1].fd
= *(interruptListener_
.get());
208 fds
[1].events
= THRIFT_POLLIN
;
209 int ret
= THRIFT_POLL(fds
, 2, (recvTimeout_
== 0) ? -1 : recvTimeout_
);
210 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
213 if (errno_copy
== THRIFT_EINTR
&& (retries
++ < maxRecvRetries_
)) {
216 GlobalOutput
.perror("TSocket::peek() THRIFT_POLL() ", errno_copy
);
217 throw TTransportException(TTransportException::UNKNOWN
, "Unknown", errno_copy
);
218 } else if (ret
> 0) {
219 // Check the interruptListener
220 if (fds
[1].revents
& THRIFT_POLLIN
) {
223 // There must be data or a disconnection, fall through to the PEEK
232 // Check to see if data is available or if the remote side closed
234 int r
= static_cast<int>(recv(socket_
, cast_sockopt(&buf
), 1, MSG_PEEK
));
236 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
237 #if defined __FreeBSD__ || defined __MACH__
239 * freebsd returns -1 and THRIFT_ECONNRESET if socket was closed by
242 if (errno_copy
== THRIFT_ECONNRESET
) {
246 GlobalOutput
.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy
);
247 throw TTransportException(TTransportException::UNKNOWN
, "recv()", errno_copy
);
252 void TSocket::openConnection(struct addrinfo
* res
) {
258 if (!path_
.empty()) {
259 socket_
= socket(PF_UNIX
, SOCK_STREAM
, IPPROTO_IP
);
261 socket_
= socket(res
->ai_family
, res
->ai_socktype
, res
->ai_protocol
);
264 if (socket_
== THRIFT_INVALID_SOCKET
) {
265 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
266 GlobalOutput
.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy
);
267 throw TTransportException(TTransportException::NOT_OPEN
, "socket()", errno_copy
);
271 if (sendTimeout_
> 0) {
272 setSendTimeout(sendTimeout_
);
276 if (recvTimeout_
> 0) {
277 setRecvTimeout(recvTimeout_
);
281 setKeepAlive(keepAlive_
);
285 setLinger(lingerOn_
, lingerVal_
);
288 setNoDelay(noDelay_
);
293 setsockopt(socket_
, SOL_SOCKET
, SO_NOSIGPIPE
, &one
, sizeof(one
));
297 // Uses a low min RTO if asked to.
298 #ifdef TCP_LOW_MIN_RTO
299 if (getUseLowMinRto()) {
301 setsockopt(socket_
, IPPROTO_TCP
, TCP_LOW_MIN_RTO
, &one
, sizeof(one
));
305 // Set the socket to be non blocking for connect if a timeout exists
306 int flags
= THRIFT_FCNTL(socket_
, THRIFT_F_GETFL
, 0);
307 if (connTimeout_
> 0) {
308 if (-1 == THRIFT_FCNTL(socket_
, THRIFT_F_SETFL
, flags
| THRIFT_O_NONBLOCK
)) {
309 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
310 GlobalOutput
.perror("TSocket::open() THRIFT_FCNTL() " + getSocketInfo(), errno_copy
);
311 throw TTransportException(TTransportException::NOT_OPEN
, "THRIFT_FCNTL() failed", errno_copy
);
314 if (-1 == THRIFT_FCNTL(socket_
, THRIFT_F_SETFL
, flags
& ~THRIFT_O_NONBLOCK
)) {
315 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
316 GlobalOutput
.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy
);
317 throw TTransportException(TTransportException::NOT_OPEN
, "THRIFT_FCNTL() failed", errno_copy
);
321 // Connect the socket
323 if (!path_
.empty()) {
326 size_t len
= path_
.size() + 1;
327 if (len
> sizeof(((sockaddr_un
*)nullptr)->sun_path
)) {
328 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
329 GlobalOutput
.perror("TSocket::open() Unix Domain socket path too long", errno_copy
);
330 throw TTransportException(TTransportException::NOT_OPEN
, " Unix Domain socket path too long");
333 struct sockaddr_un address
;
334 address
.sun_family
= AF_UNIX
;
335 memcpy(address
.sun_path
, path_
.c_str(), len
);
337 auto structlen
= static_cast<socklen_t
>(sizeof(address
));
339 if (!address
.sun_path
[0]) { // abstract namespace socket
341 // sun_path is not null-terminated in this case and structlen determines its length
342 structlen
-= sizeof(address
.sun_path
) - len
;
344 GlobalOutput
.perror("TSocket::open() Abstract Namespace Domain sockets only supported on linux: ", -99);
345 throw TTransportException(TTransportException::NOT_OPEN
,
346 " Abstract Namespace Domain socket path not supported");
350 ret
= connect(socket_
, (struct sockaddr
*)&address
, structlen
);
352 GlobalOutput
.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
353 throw TTransportException(TTransportException::NOT_OPEN
,
354 " Unix Domain socket path not supported");
358 ret
= connect(socket_
, res
->ai_addr
, static_cast<int>(res
->ai_addrlen
));
366 if ((THRIFT_GET_SOCKET_ERROR
!= THRIFT_EINPROGRESS
)
367 && (THRIFT_GET_SOCKET_ERROR
!= THRIFT_EWOULDBLOCK
)) {
368 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
369 GlobalOutput
.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy
);
370 throw TTransportException(TTransportException::NOT_OPEN
, "connect() failed", errno_copy
);
373 struct THRIFT_POLLFD fds
[1];
374 std::memset(fds
, 0, sizeof(fds
));
376 fds
[0].events
= THRIFT_POLLOUT
;
377 ret
= THRIFT_POLL(fds
, 1, connTimeout_
);
380 // Ensure the socket is connected and that there are no errors set
384 int ret2
= getsockopt(socket_
, SOL_SOCKET
, SO_ERROR
, cast_sockopt(&val
), &lon
);
386 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
387 GlobalOutput
.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy
);
388 throw TTransportException(TTransportException::NOT_OPEN
, "getsockopt()", errno_copy
);
390 // no errors on socket, go to town
394 GlobalOutput
.perror("TSocket::open() error on socket (after THRIFT_POLL) " + getSocketInfo(),
396 throw TTransportException(TTransportException::NOT_OPEN
, "socket open() error", val
);
397 } else if (ret
== 0) {
399 string errStr
= "TSocket::open() timed out " + getSocketInfo();
400 GlobalOutput(errStr
.c_str());
401 throw TTransportException(TTransportException::NOT_OPEN
, "open() timed out");
403 // error on THRIFT_POLL()
404 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
405 GlobalOutput
.perror("TSocket::open() THRIFT_POLL() " + getSocketInfo(), errno_copy
);
406 throw TTransportException(TTransportException::NOT_OPEN
, "THRIFT_POLL() failed", errno_copy
);
410 // Set socket back to normal mode (blocking)
411 if (-1 == THRIFT_FCNTL(socket_
, THRIFT_F_SETFL
, flags
)) {
412 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
413 GlobalOutput
.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy
);
414 throw TTransportException(TTransportException::NOT_OPEN
, "THRIFT_FCNTL() failed", errno_copy
);
418 setCachedAddress(res
->ai_addr
, static_cast<socklen_t
>(res
->ai_addrlen
));
422 void TSocket::open() {
426 if (!path_
.empty()) {
433 void TSocket::unix_open() {
434 if (!path_
.empty()) {
435 // Unix Domain SOcket does not need addrinfo struct, so we pass NULL
436 openConnection(nullptr);
440 void TSocket::local_open() {
443 TWinsockSingleton::create();
450 // Validate port number
451 if (port_
< 0 || port_
> 0xFFFF) {
452 throw TTransportException(TTransportException::BAD_ARGS
, "Specified port is invalid");
455 struct addrinfo hints
, *res
, *res0
;
459 char port
[sizeof("65535")];
460 std::memset(&hints
, 0, sizeof(hints
));
461 hints
.ai_family
= PF_UNSPEC
;
462 hints
.ai_socktype
= SOCK_STREAM
;
463 hints
.ai_flags
= AI_PASSIVE
| AI_ADDRCONFIG
;
464 sprintf(port
, "%d", port_
);
466 error
= getaddrinfo(host_
.c_str(), port
, &hints
, &res0
);
469 if (error
== WSANO_DATA
) {
470 hints
.ai_flags
&= ~AI_ADDRCONFIG
;
471 error
= getaddrinfo(host_
.c_str(), port
, &hints
, &res0
);
476 string errStr
= "TSocket::open() getaddrinfo() " + getSocketInfo()
477 + string(THRIFT_GAI_STRERROR(error
));
478 GlobalOutput(errStr
.c_str());
480 throw TTransportException(TTransportException::NOT_OPEN
,
481 "Could not resolve host for client socket.");
484 // Cycle through all the returned addresses until one
485 // connects or push the exception up.
486 for (res
= res0
; res
; res
= res
->ai_next
) {
490 } catch (TTransportException
&) {
495 freeaddrinfo(res0
); // cleanup on failure
501 // Free address structure memory
505 void TSocket::close() {
506 if (socket_
!= THRIFT_INVALID_SOCKET
) {
507 shutdown(socket_
, THRIFT_SHUT_RDWR
);
508 ::THRIFT_CLOSESOCKET(socket_
);
510 socket_
= THRIFT_INVALID_SOCKET
;
513 void TSocket::setSocketFD(THRIFT_SOCKET socket
) {
514 if (socket_
!= THRIFT_INVALID_SOCKET
) {
520 uint32_t TSocket::read(uint8_t* buf
, uint32_t len
) {
521 if (socket_
== THRIFT_INVALID_SOCKET
) {
522 throw TTransportException(TTransportException::NOT_OPEN
, "Called read on non-open socket");
527 // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when
528 // the system is out of resources (an awesome undocumented feature).
529 // The following is an approximation of the time interval under which
530 // THRIFT_EAGAIN is taken to indicate an out of resources error.
531 uint32_t eagainThresholdMicros
= 0;
533 // if a readTimeout is specified along with a max number of recv retries, then
534 // the threshold will ensure that the read timeout is not exceeded even in the
535 // case of resource errors
536 eagainThresholdMicros
= (recvTimeout_
* 1000) / ((maxRecvRetries_
> 0) ? maxRecvRetries_
: 2);
540 // Read from the socket
541 struct timeval begin
;
542 if (recvTimeout_
> 0) {
543 THRIFT_GETTIMEOFDAY(&begin
, nullptr);
545 // if there is no read timeout we don't need the TOD to determine whether
546 // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
547 begin
.tv_sec
= begin
.tv_usec
= 0;
552 if (interruptListener_
) {
553 struct THRIFT_POLLFD fds
[2];
554 std::memset(fds
, 0, sizeof(fds
));
556 fds
[0].events
= THRIFT_POLLIN
;
557 fds
[1].fd
= *(interruptListener_
.get());
558 fds
[1].events
= THRIFT_POLLIN
;
560 int ret
= THRIFT_POLL(fds
, 2, (recvTimeout_
== 0) ? -1 : recvTimeout_
);
561 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
564 if (errno_copy
== THRIFT_EINTR
&& (retries
++ < maxRecvRetries_
)) {
567 GlobalOutput
.perror("TSocket::read() THRIFT_POLL() ", errno_copy
);
568 throw TTransportException(TTransportException::UNKNOWN
, "Unknown", errno_copy
);
569 } else if (ret
> 0) {
570 // Check the interruptListener
571 if (fds
[1].revents
& THRIFT_POLLIN
) {
572 throw TTransportException(TTransportException::INTERRUPTED
, "Interrupted");
574 } else /* ret == 0 */ {
575 throw TTransportException(TTransportException::TIMED_OUT
, "THRIFT_EAGAIN (timed out)");
578 // falling through means there is something to recv and it cannot block
581 got
= static_cast<int>(recv(socket_
, cast_sockopt(buf
), len
, 0));
582 // THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
583 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
585 // Check for error on read
587 if (errno_copy
== THRIFT_EAGAIN
) {
588 // if no timeout we can assume that resource exhaustion has occurred.
589 if (recvTimeout_
== 0) {
590 throw TTransportException(TTransportException::TIMED_OUT
,
591 "THRIFT_EAGAIN (unavailable resources)");
593 // check if this is the lack of resources or timeout case
595 THRIFT_GETTIMEOFDAY(&end
, nullptr);
596 auto readElapsedMicros
= static_cast<uint32_t>(((end
.tv_sec
- begin
.tv_sec
) * 1000 * 1000)
597 + (end
.tv_usec
- begin
.tv_usec
));
599 if (!eagainThresholdMicros
|| (readElapsedMicros
< eagainThresholdMicros
)) {
600 if (retries
++ < maxRecvRetries_
) {
601 THRIFT_SLEEP_USEC(50);
604 throw TTransportException(TTransportException::TIMED_OUT
,
605 "THRIFT_EAGAIN (unavailable resources)");
608 // infer that timeout has been hit
609 throw TTransportException(TTransportException::TIMED_OUT
, "THRIFT_EAGAIN (timed out)");
613 // If interrupted, try again
614 if (errno_copy
== THRIFT_EINTR
&& retries
++ < maxRecvRetries_
) {
618 if (errno_copy
== THRIFT_ECONNRESET
) {
622 // This ish isn't open
623 if (errno_copy
== THRIFT_ENOTCONN
) {
624 throw TTransportException(TTransportException::NOT_OPEN
, "THRIFT_ENOTCONN");
628 if (errno_copy
== THRIFT_ETIMEDOUT
) {
629 throw TTransportException(TTransportException::TIMED_OUT
, "THRIFT_ETIMEDOUT");
632 // Now it's not a try again case, but a real probblez
633 GlobalOutput
.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy
);
635 // Some other error, whatevz
636 throw TTransportException(TTransportException::UNKNOWN
, "Unknown", errno_copy
);
642 void TSocket::write(const uint8_t* buf
, uint32_t len
) {
646 uint32_t b
= write_partial(buf
+ sent
, len
- sent
);
648 // This should only happen if the timeout set with SO_SNDTIMEO expired.
649 // Raise an exception.
650 throw TTransportException(TTransportException::TIMED_OUT
, "send timeout expired");
656 uint32_t TSocket::write_partial(const uint8_t* buf
, uint32_t len
) {
657 if (socket_
== THRIFT_INVALID_SOCKET
) {
658 throw TTransportException(TTransportException::NOT_OPEN
, "Called write on non-open socket");
665 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
666 // check for the THRIFT_EPIPE return condition and close the socket in that case
667 flags
|= MSG_NOSIGNAL
;
668 #endif // ifdef MSG_NOSIGNAL
670 int b
= static_cast<int>(send(socket_
, const_cast_sockopt(buf
+ sent
), len
- sent
, flags
));
673 if (THRIFT_GET_SOCKET_ERROR
== THRIFT_EWOULDBLOCK
|| THRIFT_GET_SOCKET_ERROR
== THRIFT_EAGAIN
) {
676 // Fail on a send error
677 int errno_copy
= THRIFT_GET_SOCKET_ERROR
;
678 GlobalOutput
.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy
);
680 if (errno_copy
== THRIFT_EPIPE
|| errno_copy
== THRIFT_ECONNRESET
681 || errno_copy
== THRIFT_ENOTCONN
) {
682 throw TTransportException(TTransportException::NOT_OPEN
, "write() send()", errno_copy
);
685 throw TTransportException(TTransportException::UNKNOWN
, "write() send()", errno_copy
);
688 // Fail on blocked send
690 throw TTransportException(TTransportException::NOT_OPEN
, "Socket send returned 0.");
695 std::string
TSocket::getHost() {
699 int TSocket::getPort() {
703 void TSocket::setHost(string host
) {
707 void TSocket::setPort(int port
) {
711 void TSocket::setLinger(bool on
, int linger
) {
714 if (socket_
== THRIFT_INVALID_SOCKET
) {
719 struct linger l
= {(lingerOn_
? 1 : 0), lingerVal_
};
721 struct linger l
= {static_cast<u_short
>(lingerOn_
? 1 : 0), static_cast<u_short
>(lingerVal_
)};
724 int ret
= setsockopt(socket_
, SOL_SOCKET
, SO_LINGER
, cast_sockopt(&l
), sizeof(l
));
727 = THRIFT_GET_SOCKET_ERROR
; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
728 GlobalOutput
.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy
);
732 void TSocket::setNoDelay(bool noDelay
) {
734 if (socket_
== THRIFT_INVALID_SOCKET
|| !path_
.empty()) {
738 // Set socket to NODELAY
739 int v
= noDelay_
? 1 : 0;
740 int ret
= setsockopt(socket_
, IPPROTO_TCP
, TCP_NODELAY
, cast_sockopt(&v
), sizeof(v
));
743 = THRIFT_GET_SOCKET_ERROR
; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
744 GlobalOutput
.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy
);
748 void TSocket::setConnTimeout(int ms
) {
752 void setGenericTimeout(THRIFT_SOCKET s
, int timeout_ms
, int optname
) {
753 if (timeout_ms
< 0) {
755 sprintf(errBuf
, "TSocket::setGenericTimeout with negative input: %d\n", timeout_ms
);
756 GlobalOutput(errBuf
);
760 if (s
== THRIFT_INVALID_SOCKET
) {
765 DWORD platform_time
= static_cast<DWORD
>(timeout_ms
);
767 struct timeval platform_time
= {(int)(timeout_ms
/ 1000), (int)((timeout_ms
% 1000) * 1000)};
770 int ret
= setsockopt(s
, SOL_SOCKET
, optname
, cast_sockopt(&platform_time
), sizeof(platform_time
));
773 = THRIFT_GET_SOCKET_ERROR
; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
774 GlobalOutput
.perror("TSocket::setGenericTimeout() setsockopt() ", errno_copy
);
778 void TSocket::setRecvTimeout(int ms
) {
779 setGenericTimeout(socket_
, ms
, SO_RCVTIMEO
);
783 void TSocket::setSendTimeout(int ms
) {
784 setGenericTimeout(socket_
, ms
, SO_SNDTIMEO
);
788 void TSocket::setKeepAlive(bool keepAlive
) {
789 keepAlive_
= keepAlive
;
791 if (socket_
== THRIFT_INVALID_SOCKET
) {
795 int value
= keepAlive_
;
797 = setsockopt(socket_
, SOL_SOCKET
, SO_KEEPALIVE
, const_cast_sockopt(&value
), sizeof(value
));
801 = THRIFT_GET_SOCKET_ERROR
; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
802 GlobalOutput
.perror("TSocket::setKeepAlive() setsockopt() " + getSocketInfo(), errno_copy
);
806 void TSocket::setMaxRecvRetries(int maxRecvRetries
) {
807 maxRecvRetries_
= maxRecvRetries
;
810 string
TSocket::getSocketInfo() const {
811 std::ostringstream oss
;
813 if (host_
.empty() || port_
== 0) {
814 oss
<< "<Host: " << getPeerAddress();
815 oss
<< " Port: " << getPeerPort() << ">";
817 oss
<< "<Host: " << host_
<< " Port: " << port_
<< ">";
820 oss
<< "<Path: " << path_
<< ">";
825 std::string
TSocket::getPeerHost() const {
826 if (peerHost_
.empty() && path_
.empty()) {
827 struct sockaddr_storage addr
;
828 struct sockaddr
* addrPtr
;
831 if (socket_
== THRIFT_INVALID_SOCKET
) {
835 addrPtr
= getCachedAddress(&addrLen
);
837 if (addrPtr
== nullptr) {
838 addrLen
= sizeof(addr
);
839 if (getpeername(socket_
, (sockaddr
*)&addr
, &addrLen
) != 0) {
842 addrPtr
= (sockaddr
*)&addr
;
844 const_cast<TSocket
&>(*this).setCachedAddress(addrPtr
, addrLen
);
847 char clienthost
[NI_MAXHOST
];
848 char clientservice
[NI_MAXSERV
];
850 getnameinfo((sockaddr
*)addrPtr
,
855 sizeof(clientservice
),
858 peerHost_
= clienthost
;
863 std::string
TSocket::getPeerAddress() const {
864 if (peerAddress_
.empty() && path_
.empty()) {
865 struct sockaddr_storage addr
;
866 struct sockaddr
* addrPtr
;
869 if (socket_
== THRIFT_INVALID_SOCKET
) {
873 addrPtr
= getCachedAddress(&addrLen
);
875 if (addrPtr
== nullptr) {
876 addrLen
= sizeof(addr
);
877 if (getpeername(socket_
, (sockaddr
*)&addr
, &addrLen
) != 0) {
880 addrPtr
= (sockaddr
*)&addr
;
882 const_cast<TSocket
&>(*this).setCachedAddress(addrPtr
, addrLen
);
885 char clienthost
[NI_MAXHOST
];
886 char clientservice
[NI_MAXSERV
];
893 sizeof(clientservice
),
894 NI_NUMERICHOST
| NI_NUMERICSERV
);
896 peerAddress_
= clienthost
;
897 peerPort_
= std::atoi(clientservice
);
902 int TSocket::getPeerPort() const {
907 void TSocket::setCachedAddress(const sockaddr
* addr
, socklen_t len
) {
908 if (!path_
.empty()) {
912 switch (addr
->sa_family
) {
914 if (len
== sizeof(sockaddr_in
)) {
915 memcpy((void*)&cachedPeerAddr_
.ipv4
, (void*)addr
, len
);
920 if (len
== sizeof(sockaddr_in6
)) {
921 memcpy((void*)&cachedPeerAddr_
.ipv6
, (void*)addr
, len
);
925 peerAddress_
.clear();
929 sockaddr
* TSocket::getCachedAddress(socklen_t
* len
) const {
930 switch (cachedPeerAddr_
.ipv4
.sin_family
) {
932 *len
= sizeof(sockaddr_in
);
933 return (sockaddr
*)&cachedPeerAddr_
.ipv4
;
936 *len
= sizeof(sockaddr_in6
);
937 return (sockaddr
*)&cachedPeerAddr_
.ipv6
;
944 bool TSocket::useLowMinRto_
= false;
945 void TSocket::setUseLowMinRto(bool useLowMinRto
) {
946 useLowMinRto_
= useLowMinRto
;
948 bool TSocket::getUseLowMinRto() {
949 return useLowMinRto_
;
952 const std::string
TSocket::getOrigin() const {
953 std::ostringstream oss
;
954 oss
<< getPeerHost() << ":" << getPeerPort();
959 } // apache::thrift::transport