]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/transport/TNonblockingServerSocket.cpp
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / transport / TNonblockingServerSocket.cpp
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <cstring>
23 #include <memory>
24 #include <stdexcept>
25 #include <sys/types.h>
26 #ifdef HAVE_SYS_SOCKET_H
27 #include <sys/socket.h>
28 #endif
29 #ifdef HAVE_SYS_UN_H
30 #include <sys/un.h>
31 #endif
32 #ifdef HAVE_SYS_POLL_H
33 #include <sys/poll.h>
34 #endif
35 #ifdef HAVE_NETINET_IN_H
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #endif
39 #ifdef HAVE_NETDB_H
40 #include <netdb.h>
41 #endif
42 #include <fcntl.h>
43 #ifdef HAVE_UNISTD_H
44 #include <unistd.h>
45 #endif
46
47 #include <thrift/transport/TSocket.h>
48 #include <thrift/transport/TNonblockingServerSocket.h>
49 #include <thrift/transport/PlatformSocket.h>
50
51 #ifndef AF_LOCAL
52 #define AF_LOCAL AF_UNIX
53 #endif
54
55 #ifndef SOCKOPT_CAST_T
56 #ifndef _WIN32
57 #define SOCKOPT_CAST_T void
58 #else
59 #define SOCKOPT_CAST_T char
60 #endif // _WIN32
61 #endif
62
63 template <class T>
64 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
65 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
66 }
67
68 template <class T>
69 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
70 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
71 }
72
73 namespace apache {
74 namespace thrift {
75 namespace transport {
76
77 using std::string;
78 using std::shared_ptr;
79
80 TNonblockingServerSocket::TNonblockingServerSocket(int port)
81 : port_(port),
82 listenPort_(port),
83 serverSocket_(THRIFT_INVALID_SOCKET),
84 acceptBacklog_(DEFAULT_BACKLOG),
85 sendTimeout_(0),
86 recvTimeout_(0),
87 retryLimit_(0),
88 retryDelay_(0),
89 tcpSendBuffer_(0),
90 tcpRecvBuffer_(0),
91 keepAlive_(false),
92 listening_(false) {
93 }
94
95 TNonblockingServerSocket::TNonblockingServerSocket(int port, int sendTimeout, int recvTimeout)
96 : port_(port),
97 listenPort_(port),
98 serverSocket_(THRIFT_INVALID_SOCKET),
99 acceptBacklog_(DEFAULT_BACKLOG),
100 sendTimeout_(sendTimeout),
101 recvTimeout_(recvTimeout),
102 retryLimit_(0),
103 retryDelay_(0),
104 tcpSendBuffer_(0),
105 tcpRecvBuffer_(0),
106 keepAlive_(false),
107 listening_(false) {
108 }
109
110 TNonblockingServerSocket::TNonblockingServerSocket(const string& address, int port)
111 : port_(port),
112 listenPort_(port),
113 address_(address),
114 serverSocket_(THRIFT_INVALID_SOCKET),
115 acceptBacklog_(DEFAULT_BACKLOG),
116 sendTimeout_(0),
117 recvTimeout_(0),
118 retryLimit_(0),
119 retryDelay_(0),
120 tcpSendBuffer_(0),
121 tcpRecvBuffer_(0),
122 keepAlive_(false),
123 listening_(false) {
124 }
125
126 TNonblockingServerSocket::TNonblockingServerSocket(const string& path)
127 : port_(0),
128 listenPort_(0),
129 path_(path),
130 serverSocket_(THRIFT_INVALID_SOCKET),
131 acceptBacklog_(DEFAULT_BACKLOG),
132 sendTimeout_(0),
133 recvTimeout_(0),
134 retryLimit_(0),
135 retryDelay_(0),
136 tcpSendBuffer_(0),
137 tcpRecvBuffer_(0),
138 keepAlive_(false),
139 listening_(false) {
140 }
141
142 TNonblockingServerSocket::~TNonblockingServerSocket() {
143 close();
144 }
145
146 void TNonblockingServerSocket::setSendTimeout(int sendTimeout) {
147 sendTimeout_ = sendTimeout;
148 }
149
150 void TNonblockingServerSocket::setRecvTimeout(int recvTimeout) {
151 recvTimeout_ = recvTimeout;
152 }
153
154 void TNonblockingServerSocket::setAcceptBacklog(int accBacklog) {
155 acceptBacklog_ = accBacklog;
156 }
157
158 void TNonblockingServerSocket::setRetryLimit(int retryLimit) {
159 retryLimit_ = retryLimit;
160 }
161
162 void TNonblockingServerSocket::setRetryDelay(int retryDelay) {
163 retryDelay_ = retryDelay;
164 }
165
166 void TNonblockingServerSocket::setTcpSendBuffer(int tcpSendBuffer) {
167 tcpSendBuffer_ = tcpSendBuffer;
168 }
169
170 void TNonblockingServerSocket::setTcpRecvBuffer(int tcpRecvBuffer) {
171 tcpRecvBuffer_ = tcpRecvBuffer;
172 }
173
174 void TNonblockingServerSocket::listen() {
175 listening_ = true;
176 #ifdef _WIN32
177 TWinsockSingleton::create();
178 #endif // _WIN32
179
180 // Validate port number
181 if (port_ < 0 || port_ > 0xFFFF) {
182 throw TTransportException(TTransportException::BAD_ARGS, "Specified port is invalid");
183 }
184
185 const struct addrinfo *res;
186 int error;
187 char port[sizeof("65535")];
188 THRIFT_SNPRINTF(port, sizeof(port), "%d", port_);
189
190 struct addrinfo hints;
191 std::memset(&hints, 0, sizeof(hints));
192 hints.ai_family = PF_UNSPEC;
193 hints.ai_socktype = SOCK_STREAM;
194 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
195
196 // If address is not specified use wildcard address (NULL)
197 TGetAddrInfoWrapper info(address_.empty() ? nullptr : &address_[0], port, &hints);
198
199 error = info.init();
200 if (error) {
201 GlobalOutput.printf("getaddrinfo %d: %s", error, THRIFT_GAI_STRERROR(error));
202 close();
203 throw TTransportException(TTransportException::NOT_OPEN,
204 "Could not resolve host for server socket.");
205 }
206
207 // Pick the ipv6 address first since ipv4 addresses can be mapped
208 // into ipv6 space.
209 for (res = info.res(); res; res = res->ai_next) {
210 if (res->ai_family == AF_INET6 || res->ai_next == nullptr)
211 break;
212 }
213
214 if (!path_.empty()) {
215 serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
216 } else if (res != nullptr) {
217 serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
218 }
219
220 if (serverSocket_ == THRIFT_INVALID_SOCKET) {
221 int errno_copy = THRIFT_GET_SOCKET_ERROR;
222 GlobalOutput.perror("TNonblockingServerSocket::listen() socket() ", errno_copy);
223 close();
224 throw TTransportException(TTransportException::NOT_OPEN,
225 "Could not create server socket.",
226 errno_copy);
227 }
228
229 // Set THRIFT_NO_SOCKET_CACHING to prevent 2MSL delay on accept
230 int one = 1;
231 if (-1 == setsockopt(serverSocket_,
232 SOL_SOCKET,
233 THRIFT_NO_SOCKET_CACHING,
234 cast_sockopt(&one),
235 sizeof(one))) {
236 // ignore errors coming out of this setsockopt on Windows. This is because
237 // SO_EXCLUSIVEADDRUSE requires admin privileges on WinXP, but we don't
238 // want to force servers to be an admin.
239 #ifndef _WIN32
240 int errno_copy = THRIFT_GET_SOCKET_ERROR;
241 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() THRIFT_NO_SOCKET_CACHING ",
242 errno_copy);
243 close();
244 throw TTransportException(TTransportException::NOT_OPEN,
245 "Could not set THRIFT_NO_SOCKET_CACHING",
246 errno_copy);
247 #endif
248 }
249
250 // Set TCP buffer sizes
251 if (tcpSendBuffer_ > 0) {
252 if (-1 == setsockopt(serverSocket_,
253 SOL_SOCKET,
254 SO_SNDBUF,
255 cast_sockopt(&tcpSendBuffer_),
256 sizeof(tcpSendBuffer_))) {
257 int errno_copy = THRIFT_GET_SOCKET_ERROR;
258 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_SNDBUF ", errno_copy);
259 close();
260 throw TTransportException(TTransportException::NOT_OPEN,
261 "Could not set SO_SNDBUF",
262 errno_copy);
263 }
264 }
265
266 if (tcpRecvBuffer_ > 0) {
267 if (-1 == setsockopt(serverSocket_,
268 SOL_SOCKET,
269 SO_RCVBUF,
270 cast_sockopt(&tcpRecvBuffer_),
271 sizeof(tcpRecvBuffer_))) {
272 int errno_copy = THRIFT_GET_SOCKET_ERROR;
273 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_RCVBUF ", errno_copy);
274 close();
275 throw TTransportException(TTransportException::NOT_OPEN,
276 "Could not set SO_RCVBUF",
277 errno_copy);
278 }
279 }
280
281 #ifdef IPV6_V6ONLY
282 if (res->ai_family == AF_INET6 && path_.empty()) {
283 int zero = 0;
284 if (-1 == setsockopt(serverSocket_,
285 IPPROTO_IPV6,
286 IPV6_V6ONLY,
287 cast_sockopt(&zero),
288 sizeof(zero))) {
289 GlobalOutput.perror("TNonblockingServerSocket::listen() IPV6_V6ONLY ", THRIFT_GET_SOCKET_ERROR);
290 }
291 }
292 #endif // #ifdef IPV6_V6ONLY
293
294 // Turn linger off, don't want to block on calls to close
295 struct linger ling = {0, 0};
296 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER, cast_sockopt(&ling), sizeof(ling))) {
297 int errno_copy = THRIFT_GET_SOCKET_ERROR;
298 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_LINGER ", errno_copy);
299 close();
300 throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
301 }
302
303 // Keepalive to ensure full result flushing
304 if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one))) {
305 int errno_copy = THRIFT_GET_SOCKET_ERROR;
306 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() SO_KEEPALIVE ", errno_copy);
307 close();
308 throw TTransportException(TTransportException::NOT_OPEN,
309 "Could not set TCP_NODELAY",
310 errno_copy);
311 }
312
313 // Set TCP nodelay if available, MAC OS X Hack
314 // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
315 #ifndef TCP_NOPUSH
316 // Unix Sockets do not need that
317 if (path_.empty()) {
318 // TCP Nodelay, speed over bandwidth
319 if (-1
320 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY, cast_sockopt(&one), sizeof(one))) {
321 int errno_copy = THRIFT_GET_SOCKET_ERROR;
322 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
323 close();
324 throw TTransportException(TTransportException::NOT_OPEN,
325 "Could not set TCP_NODELAY",
326 errno_copy);
327 }
328 }
329 #endif
330
331 // Set NONBLOCK on the accept socket
332 int flags = THRIFT_FCNTL(serverSocket_, THRIFT_F_GETFL, 0);
333 if (flags == -1) {
334 int errno_copy = THRIFT_GET_SOCKET_ERROR;
335 GlobalOutput.perror("TNonblockingServerSocket::listen() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
336 close();
337 throw TTransportException(TTransportException::NOT_OPEN,
338 "THRIFT_FCNTL() THRIFT_F_GETFL failed",
339 errno_copy);
340 }
341
342 if (-1 == THRIFT_FCNTL(serverSocket_, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
343 int errno_copy = THRIFT_GET_SOCKET_ERROR;
344 GlobalOutput.perror("TNonblockingServerSocket::listen() THRIFT_FCNTL() THRIFT_O_NONBLOCK ", errno_copy);
345 close();
346 throw TTransportException(TTransportException::NOT_OPEN,
347 "THRIFT_FCNTL() THRIFT_F_SETFL THRIFT_O_NONBLOCK failed",
348 errno_copy);
349 }
350
351 #ifdef TCP_LOW_MIN_RTO
352 if (TSocket::getUseLowMinRto()) {
353 if (-1 == setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one))) {
354 int errno_copy = THRIFT_GET_SOCKET_ERROR;
355 GlobalOutput.perror("TNonblockingServerSocket::listen() setsockopt() TCP_LOW_MIN_RTO ", errno_copy);
356 close();
357 throw TTransportException(TTransportException::NOT_OPEN,
358 "Could not set TCP_NODELAY",
359 errno_copy);
360 }
361 }
362 #endif
363
364 // prepare the port information
365 // we may want to try to bind more than once, since THRIFT_NO_SOCKET_CACHING doesn't
366 // always seem to work. The client can configure the retry variables.
367 int retries = 0;
368 int errno_copy = 0;
369
370 if (!path_.empty()) {
371
372 #ifndef _WIN32
373
374 // Unix Domain Socket
375 size_t len = path_.size() + 1;
376 if (len > sizeof(((sockaddr_un*)nullptr)->sun_path)) {
377 errno_copy = THRIFT_GET_SOCKET_ERROR;
378 GlobalOutput.perror("TSocket::listen() Unix Domain socket path too long", errno_copy);
379 throw TTransportException(TTransportException::NOT_OPEN,
380 "Unix Domain socket path too long",
381 errno_copy);
382 }
383
384 struct sockaddr_un address;
385 address.sun_family = AF_UNIX;
386 memcpy(address.sun_path, path_.c_str(), len);
387
388 auto structlen = static_cast<socklen_t>(sizeof(address));
389
390 if (!address.sun_path[0]) { // abstract namespace socket
391 #ifdef __linux__
392 // sun_path is not null-terminated in this case and structlen determines its length
393 structlen -= sizeof(address.sun_path) - len;
394 #else
395 GlobalOutput.perror("TSocket::open() Abstract Namespace Domain sockets only supported on linux: ", -99);
396 throw TTransportException(TTransportException::NOT_OPEN,
397 " Abstract Namespace Domain socket path not supported");
398 #endif
399 }
400
401 do {
402 if (0 == ::bind(serverSocket_, (struct sockaddr*)&address, structlen)) {
403 break;
404 }
405 errno_copy = THRIFT_GET_SOCKET_ERROR;
406 // use short circuit evaluation here to only sleep if we need to
407 } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
408 #else
409 GlobalOutput.perror("TSocket::open() Unix Domain socket path not supported on windows", -99);
410 throw TTransportException(TTransportException::NOT_OPEN,
411 " Unix Domain socket path not supported");
412 #endif
413 } else {
414 do {
415 if (0 == ::bind(serverSocket_, res->ai_addr, static_cast<int>(res->ai_addrlen))) {
416 break;
417 }
418 errno_copy = THRIFT_GET_SOCKET_ERROR;
419 // use short circuit evaluation here to only sleep if we need to
420 } while ((retries++ < retryLimit_) && (THRIFT_SLEEP_SEC(retryDelay_) == 0));
421
422 // retrieve bind info
423 if (port_ == 0 && retries <= retryLimit_) {
424 struct sockaddr_storage sa;
425 socklen_t len = sizeof(sa);
426 std::memset(&sa, 0, len);
427 if (::getsockname(serverSocket_, reinterpret_cast<struct sockaddr*>(&sa), &len) < 0) {
428 errno_copy = THRIFT_GET_SOCKET_ERROR;
429 GlobalOutput.perror("TNonblockingServerSocket::getPort() getsockname() ", errno_copy);
430 } else {
431 if (sa.ss_family == AF_INET6) {
432 const auto* sin = reinterpret_cast<const struct sockaddr_in6*>(&sa);
433 listenPort_ = ntohs(sin->sin6_port);
434 } else {
435 const auto* sin = reinterpret_cast<const struct sockaddr_in*>(&sa);
436 listenPort_ = ntohs(sin->sin_port);
437 }
438 }
439 }
440 }
441
442 // throw an error if we failed to bind properly
443 if (retries > retryLimit_) {
444 char errbuf[1024];
445 if (!path_.empty()) {
446 THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() PATH %s", path_.c_str());
447 } else {
448 THRIFT_SNPRINTF(errbuf, sizeof(errbuf), "TNonblockingServerSocket::listen() BIND %d", port_);
449 }
450 GlobalOutput(errbuf);
451 close();
452 throw TTransportException(TTransportException::NOT_OPEN,
453 "Could not bind",
454 errno_copy);
455 }
456
457 if (listenCallback_)
458 listenCallback_(serverSocket_);
459
460 // Call listen
461 if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
462 errno_copy = THRIFT_GET_SOCKET_ERROR;
463 GlobalOutput.perror("TNonblockingServerSocket::listen() listen() ", errno_copy);
464 close();
465 throw TTransportException(TTransportException::NOT_OPEN, "Could not listen", errno_copy);
466 }
467
468 // The socket is now listening!
469 }
470
471 int TNonblockingServerSocket::getPort() {
472 return port_;
473 }
474
475 int TNonblockingServerSocket::getListenPort() {
476 return listenPort_;
477 }
478
479 shared_ptr<TSocket> TNonblockingServerSocket::acceptImpl() {
480 if (serverSocket_ == THRIFT_INVALID_SOCKET) {
481 throw TTransportException(TTransportException::NOT_OPEN, "TNonblockingServerSocket not listening");
482 }
483
484 struct sockaddr_storage clientAddress;
485 int size = sizeof(clientAddress);
486 THRIFT_SOCKET clientSocket
487 = ::accept(serverSocket_, (struct sockaddr*)&clientAddress, (socklen_t*)&size);
488
489 if (clientSocket == THRIFT_INVALID_SOCKET) {
490 int errno_copy = THRIFT_GET_SOCKET_ERROR;
491 GlobalOutput.perror("TNonblockingServerSocket::acceptImpl() ::accept() ", errno_copy);
492 throw TTransportException(TTransportException::UNKNOWN, "accept()", errno_copy);
493 }
494
495 // Explicitly set this socket to NONBLOCK mode
496 int flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0);
497 if (flags == -1) {
498 int errno_copy = THRIFT_GET_SOCKET_ERROR;
499 ::THRIFT_CLOSESOCKET(clientSocket);
500 GlobalOutput.perror("TNonblockingServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_GETFL ", errno_copy);
501 throw TTransportException(TTransportException::UNKNOWN,
502 "THRIFT_FCNTL(THRIFT_F_GETFL)",
503 errno_copy);
504 }
505
506 if (-1 == THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK)) {
507 int errno_copy = THRIFT_GET_SOCKET_ERROR;
508 ::THRIFT_CLOSESOCKET(clientSocket);
509 GlobalOutput
510 .perror("TNonblockingServerSocket::acceptImpl() THRIFT_FCNTL() THRIFT_F_SETFL ~THRIFT_O_NONBLOCK ",
511 errno_copy);
512 throw TTransportException(TTransportException::UNKNOWN,
513 "THRIFT_FCNTL(THRIFT_F_SETFL)",
514 errno_copy);
515 }
516
517 shared_ptr<TSocket> client = createSocket(clientSocket);
518 if (sendTimeout_ > 0) {
519 client->setSendTimeout(sendTimeout_);
520 }
521 if (recvTimeout_ > 0) {
522 client->setRecvTimeout(recvTimeout_);
523 }
524 if (keepAlive_) {
525 client->setKeepAlive(keepAlive_);
526 }
527 client->setCachedAddress((sockaddr*)&clientAddress, size);
528
529 if (acceptCallback_)
530 acceptCallback_(clientSocket);
531
532 return client;
533 }
534
535 shared_ptr<TSocket> TNonblockingServerSocket::createSocket(THRIFT_SOCKET clientSocket) {
536 return std::make_shared<TSocket>(clientSocket);
537 }
538
539 void TNonblockingServerSocket::close() {
540 if (serverSocket_ != THRIFT_INVALID_SOCKET) {
541 shutdown(serverSocket_, THRIFT_SHUT_RDWR);
542 ::THRIFT_CLOSESOCKET(serverSocket_);
543 }
544 serverSocket_ = THRIFT_INVALID_SOCKET;
545 listening_ = false;
546 }
547 }
548 }
549 } // apache::thrift::transport