]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TSocket.cpp
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / transport / TSocket.cpp
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <cstring>
23 #include <sstream>
24 #ifdef HAVE_SYS_IOCTL_H
25 #include <sys/ioctl.h>
26 #endif
27 #ifdef HAVE_SYS_SOCKET_H
28 #include <sys/socket.h>
29 #endif
30 #ifdef HAVE_SYS_UN_H
31 #include <sys/un.h>
32 #endif
33 #ifdef HAVE_SYS_POLL_H
34 #include <sys/poll.h>
35 #endif
36 #include <sys/types.h>
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #include <netinet/tcp.h>
40 #endif
41 #ifdef HAVE_UNISTD_H
42 #include <unistd.h>
43 #endif
44 #include <fcntl.h>
45
46 #include <thrift/concurrency/Monitor.h>
47 #include <thrift/transport/TSocket.h>
48 #include <thrift/transport/TTransportException.h>
49 #include <thrift/transport/PlatformSocket.h>
50
51 #ifndef SOCKOPT_CAST_T
52 #ifndef _WIN32
53 #define SOCKOPT_CAST_T void
54 #else
55 #define SOCKOPT_CAST_T char
56 #endif // _WIN32
57 #endif
58
59 template <class T>
60 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
61 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
62 }
63
64 template <class T>
65 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
66 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
67 }
68
69 using std::string;
70
71 namespace apache {
72 namespace thrift {
73 namespace transport {
74
75 /**
76 * TSocket implementation.
77 *
78 */
79
80 TSocket::TSocket(const string& host, int port)
81 : host_(host),
82 port_(port),
83 socket_(THRIFT_INVALID_SOCKET),
84 peerPort_(0),
85 connTimeout_(0),
86 sendTimeout_(0),
87 recvTimeout_(0),
88 keepAlive_(false),
89 lingerOn_(1),
90 lingerVal_(0),
91 noDelay_(1),
92 maxRecvRetries_(5) {
93 }
94
95 TSocket::TSocket(const string& path)
96 : port_(0),
97 path_(path),
98 socket_(THRIFT_INVALID_SOCKET),
99 peerPort_(0),
100 connTimeout_(0),
101 sendTimeout_(0),
102 recvTimeout_(0),
103 keepAlive_(false),
104 lingerOn_(1),
105 lingerVal_(0),
106 noDelay_(1),
107 maxRecvRetries_(5) {
108 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
109 }
110
111 TSocket::TSocket()
112 : port_(0),
113 socket_(THRIFT_INVALID_SOCKET),
114 peerPort_(0),
115 connTimeout_(0),
116 sendTimeout_(0),
117 recvTimeout_(0),
118 keepAlive_(false),
119 lingerOn_(1),
120 lingerVal_(0),
121 noDelay_(1),
122 maxRecvRetries_(5) {
123 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
124 }
125
126 TSocket::TSocket(THRIFT_SOCKET socket)
127 : port_(0),
128 socket_(socket),
129 peerPort_(0),
130 connTimeout_(0),
131 sendTimeout_(0),
132 recvTimeout_(0),
133 keepAlive_(false),
134 lingerOn_(1),
135 lingerVal_(0),
136 noDelay_(1),
137 maxRecvRetries_(5) {
138 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
139 #ifdef SO_NOSIGPIPE
140 {
141 int one = 1;
142 setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
143 }
144 #endif
145 }
146
147 TSocket::TSocket(THRIFT_SOCKET socket, std::shared_ptr<THRIFT_SOCKET> interruptListener)
148 : port_(0),
149 socket_(socket),
150 peerPort_(0),
151 interruptListener_(interruptListener),
152 connTimeout_(0),
153 sendTimeout_(0),
154 recvTimeout_(0),
155 keepAlive_(false),
156 lingerOn_(1),
157 lingerVal_(0),
158 noDelay_(1),
159 maxRecvRetries_(5) {
160 cachedPeerAddr_.ipv4.sin_family = AF_UNSPEC;
161 #ifdef SO_NOSIGPIPE
162 {
163 int one = 1;
164 setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
165 }
166 #endif
167 }
168
169 TSocket::~TSocket() {
170 close();
171 }
172
173 bool TSocket::hasPendingDataToRead() {
174 if (!isOpen()) {
175 return false;
176 }
177
178 int32_t retries = 0;
179 THRIFT_IOCTL_SOCKET_NUM_BYTES_TYPE numBytesAvailable;
180 try_again:
181 int r = THRIFT_IOCTL_SOCKET(socket_, FIONREAD, &numBytesAvailable);
182 if (r == -1) {
183 int errno_copy = THRIFT_GET_SOCKET_ERROR;
184 if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
185 goto try_again;
186 }
187 GlobalOutput.perror("TSocket::hasPendingDataToRead() THRIFT_IOCTL_SOCKET() " + getSocketInfo(), errno_copy);
188 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
189 }
190 return numBytesAvailable > 0;
191 }
192
193 bool TSocket::isOpen() const {
194 return (socket_ != THRIFT_INVALID_SOCKET);
195 }
196
197 bool TSocket::peek() {
198 if (!isOpen()) {
199 return false;
200 }
201 if (interruptListener_) {
202 for (int retries = 0;;) {
203 struct THRIFT_POLLFD fds[2];
204 std::memset(fds, 0, sizeof(fds));
205 fds[0].fd = socket_;
206 fds[0].events = THRIFT_POLLIN;
207 fds[1].fd = *(interruptListener_.get());
208 fds[1].events = THRIFT_POLLIN;
209 int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
210 int errno_copy = THRIFT_GET_SOCKET_ERROR;
211 if (ret < 0) {
212 // error cases
213 if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
214 continue;
215 }
216 GlobalOutput.perror("TSocket::peek() THRIFT_POLL() ", errno_copy);
217 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
218 } else if (ret > 0) {
219 // Check the interruptListener
220 if (fds[1].revents & THRIFT_POLLIN) {
221 return false;
222 }
223 // There must be data or a disconnection, fall through to the PEEK
224 break;
225 } else {
226 // timeout
227 return false;
228 }
229 }
230 }
231
232 // Check to see if data is available or if the remote side closed
233 uint8_t buf;
234 int r = static_cast<int>(recv(socket_, cast_sockopt(&buf), 1, MSG_PEEK));
235 if (r == -1) {
236 int errno_copy = THRIFT_GET_SOCKET_ERROR;
237 #if defined __FreeBSD__ || defined __MACH__
238 /* shigin:
239 * freebsd returns -1 and THRIFT_ECONNRESET if socket was closed by
240 * the other side
241 */
242 if (errno_copy == THRIFT_ECONNRESET) {
243 return false;
244 }
245 #endif
246 GlobalOutput.perror("TSocket::peek() recv() " + getSocketInfo(), errno_copy);
247 throw TTransportException(TTransportException::UNKNOWN, "recv()", errno_copy);
248 }
249 return (r > 0);
250 }
251
252 void TSocket::openConnection(struct addrinfo* res) {
253
254 if (isOpen()) {
255 return;
256 }
257
258 if (!path_.empty()) {
259 socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
260 } else {
261 socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
262 }
263
264 if (socket_ == THRIFT_INVALID_SOCKET) {
265 int errno_copy = THRIFT_GET_SOCKET_ERROR;
266 GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
267 throw TTransportException(TTransportException::NOT_OPEN, "socket()", errno_copy);
268 }
269
270 // Send timeout
271 if (sendTimeout_ > 0) {
272 setSendTimeout(sendTimeout_);
273 }
274
275 // Recv timeout
276 if (recvTimeout_ > 0) {
277 setRecvTimeout(recvTimeout_);
278 }
279
280 if (keepAlive_) {
281 setKeepAlive(keepAlive_);
282 }
283
284 // Linger
285 setLinger(lingerOn_, lingerVal_);
286
287 // No delay
288 setNoDelay(noDelay_);
289
290 #ifdef SO_NOSIGPIPE
291 {
292 int one = 1;
293 setsockopt(socket_, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one));
294 }
295 #endif
296
297 // Uses a low min RTO if asked to.
298 #ifdef TCP_LOW_MIN_RTO
299 if (getUseLowMinRto()) {
300 int one = 1;
301 setsockopt(socket_, IPPROTO_TCP, TCP_LOW_MIN_RTO, &one, sizeof(one));
302 }
303 #endif
304
305 // Set the socket to be non blocking for connect if a timeout exists
306 int flags = THRIFT_FCNTL(socket_, THRIFT_F_GETFL, 0);
307 if (connTimeout_ > 0) {
308 if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
309 int errno_copy = THRIFT_GET_SOCKET_ERROR;
310 GlobalOutput.perror("TSocket::open() THRIFT_FCNTL() " + getSocketInfo(), errno_copy);
311 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
312 }
313 } else {
314 if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags & ~THRIFT_O_NONBLOCK)) {
315 int errno_copy = THRIFT_GET_SOCKET_ERROR;
316 GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
317 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
318 }
319 }
320
321 // Connect the socket
322 int ret;
323 if (!path_.empty()) {
324
325 #ifndef _WIN32
326 size_t len = path_.size() + 1;
327 if (len > sizeof(((sockaddr_un*)nullptr)->sun_path)) {
328 int errno_copy = THRIFT_GET_SOCKET_ERROR;
329 GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy);
330 throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
331 }
332
333 struct sockaddr_un address;
334 address.sun_family = AF_UNIX;
335 memcpy(address.sun_path, path_.c_str(), len);
336
337 auto structlen = static_cast<socklen_t>(sizeof(address));
338
339 if (!address.sun_path[0]) { // abstract namespace socket
340 #ifdef __linux__
341 // sun_path is not null-terminated in this case and structlen determines its length
342 structlen -= sizeof(address.sun_path) - len;
343 #else
344 GlobalOutput.perror("TSocket::open() Abstract Namespace Domain sockets only supported on linux: ", -99);
345 throw TTransportException(TTransportException::NOT_OPEN,
346 " Abstract Namespace Domain socket path not supported");
347 #endif
348 }
349
350 ret = connect(socket_, (struct sockaddr*)&address, structlen);
351 #else
352 GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
353 throw TTransportException(TTransportException::NOT_OPEN,
354 " Unix Domain socket path not supported");
355 #endif
356
357 } else {
358 ret = connect(socket_, res->ai_addr, static_cast<int>(res->ai_addrlen));
359 }
360
361 // success case
362 if (ret == 0) {
363 goto done;
364 }
365
366 if ((THRIFT_GET_SOCKET_ERROR != THRIFT_EINPROGRESS)
367 && (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK)) {
368 int errno_copy = THRIFT_GET_SOCKET_ERROR;
369 GlobalOutput.perror("TSocket::open() connect() " + getSocketInfo(), errno_copy);
370 throw TTransportException(TTransportException::NOT_OPEN, "connect() failed", errno_copy);
371 }
372
373 struct THRIFT_POLLFD fds[1];
374 std::memset(fds, 0, sizeof(fds));
375 fds[0].fd = socket_;
376 fds[0].events = THRIFT_POLLOUT;
377 ret = THRIFT_POLL(fds, 1, connTimeout_);
378
379 if (ret > 0) {
380 // Ensure the socket is connected and that there are no errors set
381 int val;
382 socklen_t lon;
383 lon = sizeof(int);
384 int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, cast_sockopt(&val), &lon);
385 if (ret2 == -1) {
386 int errno_copy = THRIFT_GET_SOCKET_ERROR;
387 GlobalOutput.perror("TSocket::open() getsockopt() " + getSocketInfo(), errno_copy);
388 throw TTransportException(TTransportException::NOT_OPEN, "getsockopt()", errno_copy);
389 }
390 // no errors on socket, go to town
391 if (val == 0) {
392 goto done;
393 }
394 GlobalOutput.perror("TSocket::open() error on socket (after THRIFT_POLL) " + getSocketInfo(),
395 val);
396 throw TTransportException(TTransportException::NOT_OPEN, "socket open() error", val);
397 } else if (ret == 0) {
398 // socket timed out
399 string errStr = "TSocket::open() timed out " + getSocketInfo();
400 GlobalOutput(errStr.c_str());
401 throw TTransportException(TTransportException::NOT_OPEN, "open() timed out");
402 } else {
403 // error on THRIFT_POLL()
404 int errno_copy = THRIFT_GET_SOCKET_ERROR;
405 GlobalOutput.perror("TSocket::open() THRIFT_POLL() " + getSocketInfo(), errno_copy);
406 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_POLL() failed", errno_copy);
407 }
408
409 done:
410 // Set socket back to normal mode (blocking)
411 if (-1 == THRIFT_FCNTL(socket_, THRIFT_F_SETFL, flags)) {
412 int errno_copy = THRIFT_GET_SOCKET_ERROR;
413 GlobalOutput.perror("TSocket::open() THRIFT_FCNTL " + getSocketInfo(), errno_copy);
414 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_FCNTL() failed", errno_copy);
415 }
416
417 if (path_.empty()) {
418 setCachedAddress(res->ai_addr, static_cast<socklen_t>(res->ai_addrlen));
419 }
420 }
421
422 void TSocket::open() {
423 if (isOpen()) {
424 return;
425 }
426 if (!path_.empty()) {
427 unix_open();
428 } else {
429 local_open();
430 }
431 }
432
433 void TSocket::unix_open() {
434 if (!path_.empty()) {
435 // Unix Domain SOcket does not need addrinfo struct, so we pass NULL
436 openConnection(nullptr);
437 }
438 }
439
440 void TSocket::local_open() {
441
442 #ifdef _WIN32
443 TWinsockSingleton::create();
444 #endif // _WIN32
445
446 if (isOpen()) {
447 return;
448 }
449
450 // Validate port number
451 if (port_ < 0 || port_ > 0xFFFF) {
452 throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
453 }
454
455 struct addrinfo hints, *res, *res0;
456 res = nullptr;
457 res0 = nullptr;
458 int error;
459 char port[sizeof("65535")];
460 std::memset(&hints, 0, sizeof(hints));
461 hints.ai_family = PF_UNSPEC;
462 hints.ai_socktype = SOCK_STREAM;
463 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
464 sprintf(port, "%d", port_);
465
466 error = getaddrinfo(host_.c_str(), port, &hints, &res0);
467
468 #ifdef _WIN32
469 if (error == WSANO_DATA) {
470 hints.ai_flags &= ~AI_ADDRCONFIG;
471 error = getaddrinfo(host_.c_str(), port, &hints, &res0);
472 }
473 #endif
474
475 if (error) {
476 string errStr = "TSocket::open() getaddrinfo() " + getSocketInfo()
477 + string(THRIFT_GAI_STRERROR(error));
478 GlobalOutput(errStr.c_str());
479 close();
480 throw TTransportException(TTransportException::NOT_OPEN,
481 "Could not resolve host for client socket.");
482 }
483
484 // Cycle through all the returned addresses until one
485 // connects or push the exception up.
486 for (res = res0; res; res = res->ai_next) {
487 try {
488 openConnection(res);
489 break;
490 } catch (TTransportException&) {
491 if (res->ai_next) {
492 close();
493 } else {
494 close();
495 freeaddrinfo(res0); // cleanup on failure
496 throw;
497 }
498 }
499 }
500
501 // Free address structure memory
502 freeaddrinfo(res0);
503 }
504
505 void TSocket::close() {
506 if (socket_ != THRIFT_INVALID_SOCKET) {
507 shutdown(socket_, THRIFT_SHUT_RDWR);
508 ::THRIFT_CLOSESOCKET(socket_);
509 }
510 socket_ = THRIFT_INVALID_SOCKET;
511 }
512
513 void TSocket::setSocketFD(THRIFT_SOCKET socket) {
514 if (socket_ != THRIFT_INVALID_SOCKET) {
515 close();
516 }
517 socket_ = socket;
518 }
519
520 uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
521 if (socket_ == THRIFT_INVALID_SOCKET) {
522 throw TTransportException(TTransportException::NOT_OPEN, "Called read on non-open socket");
523 }
524
525 int32_t retries = 0;
526
527 // THRIFT_EAGAIN can be signalled both when a timeout has occurred and when
528 // the system is out of resources (an awesome undocumented feature).
529 // The following is an approximation of the time interval under which
530 // THRIFT_EAGAIN is taken to indicate an out of resources error.
531 uint32_t eagainThresholdMicros = 0;
532 if (recvTimeout_) {
533 // if a readTimeout is specified along with a max number of recv retries, then
534 // the threshold will ensure that the read timeout is not exceeded even in the
535 // case of resource errors
536 eagainThresholdMicros = (recvTimeout_ * 1000) / ((maxRecvRetries_ > 0) ? maxRecvRetries_ : 2);
537 }
538
539 try_again:
540 // Read from the socket
541 struct timeval begin;
542 if (recvTimeout_ > 0) {
543 THRIFT_GETTIMEOFDAY(&begin, nullptr);
544 } else {
545 // if there is no read timeout we don't need the TOD to determine whether
546 // an THRIFT_EAGAIN is due to a timeout or an out-of-resource condition.
547 begin.tv_sec = begin.tv_usec = 0;
548 }
549
550 int got = 0;
551
552 if (interruptListener_) {
553 struct THRIFT_POLLFD fds[2];
554 std::memset(fds, 0, sizeof(fds));
555 fds[0].fd = socket_;
556 fds[0].events = THRIFT_POLLIN;
557 fds[1].fd = *(interruptListener_.get());
558 fds[1].events = THRIFT_POLLIN;
559
560 int ret = THRIFT_POLL(fds, 2, (recvTimeout_ == 0) ? -1 : recvTimeout_);
561 int errno_copy = THRIFT_GET_SOCKET_ERROR;
562 if (ret < 0) {
563 // error cases
564 if (errno_copy == THRIFT_EINTR && (retries++ < maxRecvRetries_)) {
565 goto try_again;
566 }
567 GlobalOutput.perror("TSocket::read() THRIFT_POLL() ", errno_copy);
568 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
569 } else if (ret > 0) {
570 // Check the interruptListener
571 if (fds[1].revents & THRIFT_POLLIN) {
572 throw TTransportException(TTransportException::INTERRUPTED, "Interrupted");
573 }
574 } else /* ret == 0 */ {
575 throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_EAGAIN (timed out)");
576 }
577
578 // falling through means there is something to recv and it cannot block
579 }
580
581 got = static_cast<int>(recv(socket_, cast_sockopt(buf), len, 0));
582 // THRIFT_GETTIMEOFDAY can change THRIFT_GET_SOCKET_ERROR
583 int errno_copy = THRIFT_GET_SOCKET_ERROR;
584
585 // Check for error on read
586 if (got < 0) {
587 if (errno_copy == THRIFT_EAGAIN) {
588 // if no timeout we can assume that resource exhaustion has occurred.
589 if (recvTimeout_ == 0) {
590 throw TTransportException(TTransportException::TIMED_OUT,
591 "THRIFT_EAGAIN (unavailable resources)");
592 }
593 // check if this is the lack of resources or timeout case
594 struct timeval end;
595 THRIFT_GETTIMEOFDAY(&end, nullptr);
596 auto readElapsedMicros = static_cast<uint32_t>(((end.tv_sec - begin.tv_sec) * 1000 * 1000)
597 + (end.tv_usec - begin.tv_usec));
598
599 if (!eagainThresholdMicros || (readElapsedMicros < eagainThresholdMicros)) {
600 if (retries++ < maxRecvRetries_) {
601 THRIFT_SLEEP_USEC(50);
602 goto try_again;
603 } else {
604 throw TTransportException(TTransportException::TIMED_OUT,
605 "THRIFT_EAGAIN (unavailable resources)");
606 }
607 } else {
608 // infer that timeout has been hit
609 throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_EAGAIN (timed out)");
610 }
611 }
612
613 // If interrupted, try again
614 if (errno_copy == THRIFT_EINTR && retries++ < maxRecvRetries_) {
615 goto try_again;
616 }
617
618 if (errno_copy == THRIFT_ECONNRESET) {
619 return 0;
620 }
621
622 // This ish isn't open
623 if (errno_copy == THRIFT_ENOTCONN) {
624 throw TTransportException(TTransportException::NOT_OPEN, "THRIFT_ENOTCONN");
625 }
626
627 // Timed out!
628 if (errno_copy == THRIFT_ETIMEDOUT) {
629 throw TTransportException(TTransportException::TIMED_OUT, "THRIFT_ETIMEDOUT");
630 }
631
632 // Now it's not a try again case, but a real probblez
633 GlobalOutput.perror("TSocket::read() recv() " + getSocketInfo(), errno_copy);
634
635 // Some other error, whatevz
636 throw TTransportException(TTransportException::UNKNOWN, "Unknown", errno_copy);
637 }
638
639 return got;
640 }
641
642 void TSocket::write(const uint8_t* buf, uint32_t len) {
643 uint32_t sent = 0;
644
645 while (sent < len) {
646 uint32_t b = write_partial(buf + sent, len - sent);
647 if (b == 0) {
648 // This should only happen if the timeout set with SO_SNDTIMEO expired.
649 // Raise an exception.
650 throw TTransportException(TTransportException::TIMED_OUT, "send timeout expired");
651 }
652 sent += b;
653 }
654 }
655
656 uint32_t TSocket::write_partial(const uint8_t* buf, uint32_t len) {
657 if (socket_ == THRIFT_INVALID_SOCKET) {
658 throw TTransportException(TTransportException::NOT_OPEN, "Called write on non-open socket");
659 }
660
661 uint32_t sent = 0;
662
663 int flags = 0;
664 #ifdef MSG_NOSIGNAL
665 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
666 // check for the THRIFT_EPIPE return condition and close the socket in that case
667 flags |= MSG_NOSIGNAL;
668 #endif // ifdef MSG_NOSIGNAL
669
670 int b = static_cast<int>(send(socket_, const_cast_sockopt(buf + sent), len - sent, flags));
671
672 if (b < 0) {
673 if (THRIFT_GET_SOCKET_ERROR == THRIFT_EWOULDBLOCK || THRIFT_GET_SOCKET_ERROR == THRIFT_EAGAIN) {
674 return 0;
675 }
676 // Fail on a send error
677 int errno_copy = THRIFT_GET_SOCKET_ERROR;
678 GlobalOutput.perror("TSocket::write_partial() send() " + getSocketInfo(), errno_copy);
679
680 if (errno_copy == THRIFT_EPIPE || errno_copy == THRIFT_ECONNRESET
681 || errno_copy == THRIFT_ENOTCONN) {
682 throw TTransportException(TTransportException::NOT_OPEN, "write() send()", errno_copy);
683 }
684
685 throw TTransportException(TTransportException::UNKNOWN, "write() send()", errno_copy);
686 }
687
688 // Fail on blocked send
689 if (b == 0) {
690 throw TTransportException(TTransportException::NOT_OPEN, "Socket send returned 0.");
691 }
692 return b;
693 }
694
695 std::string TSocket::getHost() {
696 return host_;
697 }
698
699 int TSocket::getPort() {
700 return port_;
701 }
702
703 void TSocket::setHost(string host) {
704 host_ = host;
705 }
706
707 void TSocket::setPort(int port) {
708 port_ = port;
709 }
710
711 void TSocket::setLinger(bool on, int linger) {
712 lingerOn_ = on;
713 lingerVal_ = linger;
714 if (socket_ == THRIFT_INVALID_SOCKET) {
715 return;
716 }
717
718 #ifndef _WIN32
719 struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
720 #else
721 struct linger l = {static_cast<u_short>(lingerOn_ ? 1 : 0), static_cast<u_short>(lingerVal_)};
722 #endif
723
724 int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&l), sizeof(l));
725 if (ret == -1) {
726 int errno_copy
727 = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
728 GlobalOutput.perror("TSocket::setLinger() setsockopt() " + getSocketInfo(), errno_copy);
729 }
730 }
731
732 void TSocket::setNoDelay(bool noDelay) {
733 noDelay_ = noDelay;
734 if (socket_ == THRIFT_INVALID_SOCKET || !path_.empty()) {
735 return;
736 }
737
738 // Set socket to NODELAY
739 int v = noDelay_ ? 1 : 0;
740 int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&v), sizeof(v));
741 if (ret == -1) {
742 int errno_copy
743 = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
744 GlobalOutput.perror("TSocket::setNoDelay() setsockopt() " + getSocketInfo(), errno_copy);
745 }
746 }
747
748 void TSocket::setConnTimeout(int ms) {
749 connTimeout_ = ms;
750 }
751
752 void setGenericTimeout(THRIFT_SOCKET s, int timeout_ms, int optname) {
753 if (timeout_ms < 0) {
754 char errBuf[512];
755 sprintf(errBuf, "TSocket::setGenericTimeout with negative input: %d\n", timeout_ms);
756 GlobalOutput(errBuf);
757 return;
758 }
759
760 if (s == THRIFT_INVALID_SOCKET) {
761 return;
762 }
763
764 #ifdef _WIN32
765 DWORD platform_time = static_cast<DWORD>(timeout_ms);
766 #else
767 struct timeval platform_time = {(int)(timeout_ms / 1000), (int)((timeout_ms % 1000) * 1000)};
768 #endif
769
770 int ret = setsockopt(s, SOL_SOCKET, optname, cast_sockopt(&platform_time), sizeof(platform_time));
771 if (ret == -1) {
772 int errno_copy
773 = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
774 GlobalOutput.perror("TSocket::setGenericTimeout() setsockopt() ", errno_copy);
775 }
776 }
777
778 void TSocket::setRecvTimeout(int ms) {
779 setGenericTimeout(socket_, ms, SO_RCVTIMEO);
780 recvTimeout_ = ms;
781 }
782
783 void TSocket::setSendTimeout(int ms) {
784 setGenericTimeout(socket_, ms, SO_SNDTIMEO);
785 sendTimeout_ = ms;
786 }
787
788 void TSocket::setKeepAlive(bool keepAlive) {
789 keepAlive_ = keepAlive;
790
791 if (socket_ == THRIFT_INVALID_SOCKET) {
792 return;
793 }
794
795 int value = keepAlive_;
796 int ret
797 = setsockopt(socket_, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&value), sizeof(value));
798
799 if (ret == -1) {
800 int errno_copy
801 = THRIFT_GET_SOCKET_ERROR; // Copy THRIFT_GET_SOCKET_ERROR because we're allocating memory.
802 GlobalOutput.perror("TSocket::setKeepAlive() setsockopt() " + getSocketInfo(), errno_copy);
803 }
804 }
805
806 void TSocket::setMaxRecvRetries(int maxRecvRetries) {
807 maxRecvRetries_ = maxRecvRetries;
808 }
809
810 string TSocket::getSocketInfo() const {
811 std::ostringstream oss;
812 if (path_.empty()) {
813 if (host_.empty() || port_ == 0) {
814 oss << "<Host: " << getPeerAddress();
815 oss << " Port: " << getPeerPort() << ">";
816 } else {
817 oss << "<Host: " << host_ << " Port: " << port_ << ">";
818 }
819 } else {
820 oss << "<Path: " << path_ << ">";
821 }
822 return oss.str();
823 }
824
825 std::string TSocket::getPeerHost() const {
826 if (peerHost_.empty() && path_.empty()) {
827 struct sockaddr_storage addr;
828 struct sockaddr* addrPtr;
829 socklen_t addrLen;
830
831 if (socket_ == THRIFT_INVALID_SOCKET) {
832 return host_;
833 }
834
835 addrPtr = getCachedAddress(&addrLen);
836
837 if (addrPtr == nullptr) {
838 addrLen = sizeof(addr);
839 if (getpeername(socket_, (sockaddr*)&addr, &addrLen) != 0) {
840 return peerHost_;
841 }
842 addrPtr = (sockaddr*)&addr;
843
844 const_cast<TSocket&>(*this).setCachedAddress(addrPtr, addrLen);
845 }
846
847 char clienthost[NI_MAXHOST];
848 char clientservice[NI_MAXSERV];
849
850 getnameinfo((sockaddr*)addrPtr,
851 addrLen,
852 clienthost,
853 sizeof(clienthost),
854 clientservice,
855 sizeof(clientservice),
856 0);
857
858 peerHost_ = clienthost;
859 }
860 return peerHost_;
861 }
862
863 std::string TSocket::getPeerAddress() const {
864 if (peerAddress_.empty() && path_.empty()) {
865 struct sockaddr_storage addr;
866 struct sockaddr* addrPtr;
867 socklen_t addrLen;
868
869 if (socket_ == THRIFT_INVALID_SOCKET) {
870 return peerAddress_;
871 }
872
873 addrPtr = getCachedAddress(&addrLen);
874
875 if (addrPtr == nullptr) {
876 addrLen = sizeof(addr);
877 if (getpeername(socket_, (sockaddr*)&addr, &addrLen) != 0) {
878 return peerAddress_;
879 }
880 addrPtr = (sockaddr*)&addr;
881
882 const_cast<TSocket&>(*this).setCachedAddress(addrPtr, addrLen);
883 }
884
885 char clienthost[NI_MAXHOST];
886 char clientservice[NI_MAXSERV];
887
888 getnameinfo(addrPtr,
889 addrLen,
890 clienthost,
891 sizeof(clienthost),
892 clientservice,
893 sizeof(clientservice),
894 NI_NUMERICHOST | NI_NUMERICSERV);
895
896 peerAddress_ = clienthost;
897 peerPort_ = std::atoi(clientservice);
898 }
899 return peerAddress_;
900 }
901
902 int TSocket::getPeerPort() const {
903 getPeerAddress();
904 return peerPort_;
905 }
906
907 void TSocket::setCachedAddress(const sockaddr* addr, socklen_t len) {
908 if (!path_.empty()) {
909 return;
910 }
911
912 switch (addr->sa_family) {
913 case AF_INET:
914 if (len == sizeof(sockaddr_in)) {
915 memcpy((void*)&cachedPeerAddr_.ipv4, (void*)addr, len);
916 }
917 break;
918
919 case AF_INET6:
920 if (len == sizeof(sockaddr_in6)) {
921 memcpy((void*)&cachedPeerAddr_.ipv6, (void*)addr, len);
922 }
923 break;
924 }
925 peerAddress_.clear();
926 peerHost_.clear();
927 }
928
929 sockaddr* TSocket::getCachedAddress(socklen_t* len) const {
930 switch (cachedPeerAddr_.ipv4.sin_family) {
931 case AF_INET:
932 *len = sizeof(sockaddr_in);
933 return (sockaddr*)&cachedPeerAddr_.ipv4;
934
935 case AF_INET6:
936 *len = sizeof(sockaddr_in6);
937 return (sockaddr*)&cachedPeerAddr_.ipv6;
938
939 default:
940 return nullptr;
941 }
942 }
943
944 bool TSocket::useLowMinRto_ = false;
945 void TSocket::setUseLowMinRto(bool useLowMinRto) {
946 useLowMinRto_ = useLowMinRto;
947 }
948 bool TSocket::getUseLowMinRto() {
949 return useLowMinRto_;
950 }
951
952 const std::string TSocket::getOrigin() const {
953 std::ostringstream oss;
954 oss << getPeerHost() << ":" << getPeerPort();
955 return oss.str();
956 }
957 }
958 }
959 } // apache::thrift::transport