]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/net/dns.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / src / net / dns.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2016 Cloudius Systems
20 */
21
22 #include <arpa/nameser.h>
23 #include <chrono>
24
25 #include <ares.h>
26 #include <boost/lexical_cast.hpp>
27
28 #include <ostream>
29 #include <seastar/util/std-compat.hh>
30 #include <seastar/net/inet_address.hh>
31
32 #include <seastar/net/ip.hh>
33 #include <seastar/net/api.hh>
34 #include <seastar/net/dns.hh>
35 #include <seastar/core/sstring.hh>
36 #include <seastar/core/timer.hh>
37 #include <seastar/core/reactor.hh>
38 #include <seastar/core/gate.hh>
39 #include <seastar/core/print.hh>
40
41 namespace seastar::net {
42
43 // NOTE: Should be prior to <seastar/util/log.hh> include because
44 // logger::stringer_for<T> needs to see the corresponding `operator <<`
45 // declaration at the call site
46 //
47 // This doesn't need to be in the public API, so leave it there instead of placing into `inet_address.hh`
48 std::ostream& operator<<(std::ostream& os, const opt_family& f) {
49 if (f) {
50 return os << *f;
51 } else {
52 return os << "ANY";
53 }
54 }
55
56 }
57
58 #include <seastar/util/log.hh>
59
60 namespace seastar {
61
62 static logger dns_log("dns_resolver");
63
64 class ares_error_category : public std::error_category {
65 public:
66 constexpr ares_error_category() noexcept : std::error_category{} {}
67 const char * name() const noexcept {
68 return "C-Ares";
69 }
70 std::string message(int error) const {
71 switch (error) {
72 /* Server error codes (ARES_ENODATA indicates no relevant answer) */
73 case ARES_ENODATA: return "No data";
74 case ARES_EFORMERR: return "Form error";
75 case ARES_ESERVFAIL: return "Server failure";
76 case ARES_ENOTFOUND: return "Not found";
77 case ARES_ENOTIMP: return "Not implemented";
78 case ARES_EREFUSED: return "Refused";
79
80 /* Locally generated error codes */
81 case ARES_EBADQUERY: return "Bad query";
82 case ARES_EBADNAME: return "Bad name";
83 case ARES_EBADFAMILY: return "Bad family";
84 case ARES_EBADRESP: return "Bad response";
85 case ARES_ECONNREFUSED :return "Connection refused";
86 case ARES_ETIMEOUT: return "Timeout";
87 case ARES_EOF: return "EOF";
88 case ARES_EFILE: return "File error";
89 case ARES_ENOMEM: return "No memory";
90 case ARES_EDESTRUCTION: return "Destruction";
91 case ARES_EBADSTR: return "Bad string";
92
93 /* ares_getnameinfo error codes */
94 case ARES_EBADFLAGS: return "Invalid flags";
95
96 /* ares_getaddrinfo error codes */
97 case ARES_ENONAME: return "No name";
98 case ARES_EBADHINTS: return "Bad hints";
99
100 /* Uninitialized library error code */
101 case ARES_ENOTINITIALIZED: return "Not initialized";
102
103 /* ares_library_init error codes */
104 case ARES_ELOADIPHLPAPI: return "Load PHLPAPI";
105 case ARES_EADDRGETNETWORKPARAMS: return "Get network parameters";
106
107 /* More error codes */
108 case ARES_ECANCELLED: return "Cancelled";
109 default:
110 return "Unknown error";
111 }
112 }
113 };
114
115 static const ares_error_category ares_errorc;
116
117 static void check_ares_error(int error) {
118 if (error != ARES_SUCCESS) {
119 throw std::system_error(error, ares_errorc);
120 }
121 }
122
123 struct ares_initializer {
124 ares_initializer() {
125 check_ares_error(ares_library_init(0));
126 }
127 ~ares_initializer() {
128 ares_library_cleanup();
129 }
130 };
131
132 class net::dns_resolver::impl
133 : public enable_shared_from_this<impl>
134 {
135 public:
136 impl(network_stack& stack, const options& opts)
137 : _stack(stack)
138 , _timeout(opts.timeout ? *opts.timeout : std::chrono::milliseconds(5000) /* from ares private */)
139 , _timer(std::bind(&impl::poll_sockets, this))
140 {
141 static const ares_initializer a_init;
142
143 // this can "block" ever so slightly, because it will
144 // look in resolv.conf etc for query setup. We could
145 // do this ourselves, and instead set ares options
146 // here, but it seems more error prone (me parsing
147 // resolv.conf -> hah!)
148 ares_options a_opts = { 0, };
149
150 // For now, use the default "fb" query order
151 // (set explicitly lest we forget).
152 // We only do querying dns server really async.
153 // Reading hosts files is doen by c-ares internally
154 // and with normal fread calls. Thus they theorectically
155 // block. This can potentially be an issue for some application
156 // and if so, we need to revisit this. For now, assume
157 // it won't block us in any measurable way.
158 char buf[3] = "fb";
159 a_opts.lookups = buf; // only net
160 // Always set the timeout
161 a_opts.timeout = _timeout.count();
162 int flags = ARES_OPT_LOOKUPS|ARES_OPT_TIMEOUTMS;
163
164 if (opts.use_tcp_query && *opts.use_tcp_query) {
165 a_opts.flags = ARES_FLAG_USEVC | ARES_FLAG_PRIMARY;
166 flags |= ARES_OPT_FLAGS;
167 }
168 std::vector<in_addr> addr_tmp;
169 if (opts.servers) {
170 std::transform(opts.servers->begin(), opts.servers->end(), std::back_inserter(addr_tmp), [](const inet_address& a) {
171 if (a.in_family() != inet_address::family::INET) {
172 throw std::invalid_argument("Servers must be ipv4 addresses");
173 }
174 in_addr in = a;
175 return in;
176 });
177 a_opts.servers = addr_tmp.data();
178 a_opts.nservers = int(addr_tmp.size());
179 flags |= ARES_OPT_SERVERS;
180 }
181 std::vector<const char *> dom_tmp;
182 if (opts.domains) {
183 std::transform(opts.domains->begin(), opts.domains->end(), std::back_inserter(dom_tmp), [](const sstring& s) {
184 return s.data();
185 });
186 a_opts.domains = const_cast<char **>(dom_tmp.data());
187 a_opts.ndomains = int(dom_tmp.size());
188 flags |= ARES_OPT_DOMAINS;
189 }
190 if (opts.tcp_port) {
191 a_opts.tcp_port = *opts.tcp_port;
192 flags |= ARES_OPT_TCP_PORT;
193 }
194 if (opts.udp_port) {
195 a_opts.udp_port = *opts.udp_port;
196 flags |= ARES_OPT_UDP_PORT;
197 }
198
199 check_ares_error(ares_init_options(&_channel, &a_opts, flags));
200
201 static auto get_impl = [](void * p) { return reinterpret_cast<impl *>(p); };
202 static const ares_socket_functions callbacks = {
203 [](int af, int type, int protocol, void * p) { return get_impl(p)->do_socket(af, type, protocol); },
204 [](ares_socket_t s, void * p) { return get_impl(p)->do_close(s); },
205 [](ares_socket_t s, const struct sockaddr * addr, socklen_t len, void * p) { return get_impl(p)->do_connect(s, addr, len); },
206 [](ares_socket_t s, void * dst, size_t len, int flags, struct sockaddr * addr, socklen_t * alen, void * p) {
207 return get_impl(p)->do_recvfrom(s, dst, len, flags, addr, alen);
208 },
209 [](ares_socket_t s, const struct iovec * vec, int len, void * p) {
210 return get_impl(p)->do_sendv(s, vec, len);
211 },
212 };
213
214 ares_set_socket_functions(_channel, &callbacks, this);
215
216 // just in case you need printf-debug.
217 // dns_log.set_level(log_level::trace);
218 }
219 ~impl() {
220 _timer.cancel();
221 if (_channel) {
222 ares_destroy(_channel);
223 }
224 }
225
226 future<inet_address> resolve_name(sstring name, opt_family family) {
227 return get_host_by_name(std::move(name), family).then([](hostent h) {
228 return make_ready_future<inet_address>(h.addr_list.front());
229 });
230 }
231
232 future<hostent> get_host_by_name(sstring name, opt_family family) {
233 class promise_wrap : public promise<hostent> {
234 public:
235 promise_wrap(sstring s)
236 : name(std::move(s))
237 {}
238 sstring name;
239 };
240
241 dns_log.debug("Query name {} ({})", name, family);
242
243 if (!family) {
244 auto res = inet_address::parse_numerical(name);
245 if (res) {
246 return make_ready_future<hostent>(hostent{ {name}, {*res}});
247 }
248 }
249
250 auto p = new promise_wrap(std::move(name));
251 auto f = p->get_future();
252
253 dns_call call(*this);
254
255 auto af = family ? int(*family) : AF_UNSPEC;
256
257 // The following pragma is needed to work around a false-positive warning
258 // in Gcc 11 (see https://gcc.gnu.org/bugzilla/show_bug.cgi?id=96003).
259 #pragma GCC diagnostic ignored "-Wnonnull"
260 ares_gethostbyname(_channel, p->name.c_str(), af, [](void* arg, int status, int timeouts, ::hostent* host) {
261 // we do potentially allocating operations below, so wrap the pointer in a
262 // unique here.
263 std::unique_ptr<promise_wrap> p(reinterpret_cast<promise_wrap *>(arg));
264
265 switch (status) {
266 default:
267 dns_log.debug("Query failed: {}", status);
268 p->set_exception(std::system_error(status, ares_errorc, p->name));
269 break;
270 case ARES_SUCCESS:
271 p->set_value(make_hostent(*host));
272 break;
273 }
274
275 }, reinterpret_cast<void *>(p));
276
277
278 poll_sockets();
279
280 return f.finally([this] {
281 end_call();
282 });
283 }
284
285 future<hostent> get_host_by_addr(inet_address addr) {
286 class promise_wrap : public promise<hostent> {
287 public:
288 promise_wrap(inet_address a)
289 : addr(std::move(a))
290 {}
291 inet_address addr;
292 };
293
294 dns_log.debug("Query addr {}", addr);
295
296 auto p = new promise_wrap(std::move(addr));
297 auto f = p->get_future();
298
299 dns_call call(*this);
300
301 ares_gethostbyaddr(_channel, p->addr.data(), p->addr.size(), int(p->addr.in_family()), [](void* arg, int status, int timeouts, ::hostent* host) {
302 // we do potentially allocating operations below, so wrap the pointer in a
303 // unique here.
304 std::unique_ptr<promise_wrap> p(reinterpret_cast<promise_wrap *>(arg));
305
306 switch (status) {
307 default:
308 dns_log.debug("Query failed: {}", status);
309 p->set_exception(std::system_error(status, ares_errorc, boost::lexical_cast<std::string>(p->addr)));
310 break;
311 case ARES_SUCCESS:
312 p->set_value(make_hostent(*host));
313 break;
314 }
315
316 }, reinterpret_cast<void *>(p));
317
318
319 poll_sockets();
320
321 return f.finally([this] {
322 end_call();
323 });
324 }
325
326 future<srv_records> get_srv_records(srv_proto proto,
327 const sstring& service,
328 const sstring& domain) {
329 auto p = std::make_unique<promise<srv_records>>();
330 auto f = p->get_future();
331
332 const auto query = format("_{}._{}.{}",
333 service,
334 proto == srv_proto::tcp ? "tcp" : "udp",
335 domain);
336
337 dns_log.debug("Query srv {}", query);
338
339 dns_call call(*this);
340
341 ares_query(_channel, query.c_str(), ns_c_in, ns_t_srv,
342 [](void* arg, int status, int timeouts,
343 unsigned char* buf, int len) {
344 auto p = std::unique_ptr<promise<srv_records>>(
345 reinterpret_cast<promise<srv_records> *>(arg));
346 if (status != ARES_SUCCESS) {
347 dns_log.debug("Query failed: {}", status);
348 p->set_exception(std::system_error(status, ares_errorc));
349 return;
350 }
351 ares_srv_reply* start = nullptr;
352 status = ares_parse_srv_reply(buf, len, &start);
353 if (status != ARES_SUCCESS) {
354 dns_log.debug("Parse failed: {}", status);
355 p->set_exception(std::system_error(status, ares_errorc));
356 return;
357 }
358 try {
359 p->set_value(make_srv_records(start));
360 } catch (...) {
361 p->set_exception(std::current_exception());
362 }
363 ares_free_data(start);
364 }, reinterpret_cast<void *>(p.release()));
365
366
367 poll_sockets();
368
369 return f.finally([this] {
370 end_call();
371 });
372 }
373
374 future<sstring> resolve_addr(inet_address addr) {
375 return get_host_by_addr(addr).then([](hostent h) {
376 return make_ready_future<sstring>(h.names.front());
377 });
378 }
379
380 future<> close() {
381 _closed = true;
382 ares_cancel(_channel);
383 dns_log.trace("Shutting down {} sockets", _sockets.size());
384 for (auto & p : _sockets) {
385 do_close(p.first);
386 }
387 dns_log.trace("Closing gate");
388 return _gate.close();
389 }
390 private:
391 enum class type {
392 none, tcp, udp
393 };
394 struct dns_call {
395 dns_call(impl & i)
396 : _i(i)
397 , _c(++i._calls)
398 {}
399 ~dns_call() {
400 // If a query does not immediately complete
401 // it might never do so, unless data actually
402 // comes back to us and a waiting recv promise
403 // is fulfilled.
404 // We need to add a timer to do polling at ~timeout
405 // ms later, so the ares logic can detect this and
406 // tell us we're over.
407 if (_c == 1 && _i._calls != 0) {
408 _i._timer.arm_periodic(_i._timeout);
409 }
410 }
411 impl& _i;
412 uint64_t _c;
413 };
414
415 void end_call() {
416 if (--_calls == 0) {
417 _timer.cancel();
418 }
419 }
420 void poll_sockets() {
421 fd_set readers, writers;
422 int n = 0;
423
424 dns_log.trace("Poll sockets");
425
426 do {
427 // Retrieve the set of file descriptors that the library wants us to monitor.
428 FD_ZERO(&readers);
429 FD_ZERO(&writers);
430
431 n = ares_fds(_channel, &readers, &writers);
432
433 dns_log.trace("ares_fds: {}", n);
434
435 if (n == 0) {
436 break;
437 }
438
439 n = 0;
440
441 for (auto & p : _sockets) {
442 auto & e = p.second;
443 auto fd = p.first;
444 auto r = FD_ISSET(p.first, &readers);
445 auto w = FD_ISSET(p.first, &writers);
446 auto ra = e.avail & POLLIN;
447 auto wa = e.avail & POLLOUT;
448
449 dns_log.trace("fd {} {}{}/{}{}", fd, (r ? "r" : ""),
450 (w ? "w" : ""), (ra ? "r" : ""),
451 (wa ? "w" : ""));
452
453 if (!wa) {
454 FD_CLR(fd, &writers);
455 }
456 if (!ra) {
457 FD_CLR(fd, &readers);
458 }
459 if (FD_ISSET(fd, &writers) || FD_ISSET(fd, &readers)) {
460 ++n;
461 }
462 }
463
464 ares_process(_channel, &readers, &writers);
465 } while (n != 0);
466 }
467
468 static srv_records make_srv_records(ares_srv_reply* start) {
469 srv_records records;
470 for (auto reply = start; reply; reply = reply->next) {
471 srv_record record = {reply->priority,
472 reply->weight,
473 reply->port,
474 sstring{reply->host}};
475 records.push_back(std::move(record));
476 }
477 return records;
478 }
479
480 static hostent make_hostent(const ::hostent& host) {
481 hostent e;
482 e.names.emplace_back(host.h_name);
483 auto np = host.h_aliases;
484 while (*np != 0) {
485 e.names.emplace_back(*np++);
486 }
487 auto p = host.h_addr_list;
488 while (*p != nullptr) {
489 switch (host.h_addrtype) {
490 case AF_INET:
491 assert(size_t(host.h_length) >= sizeof(in_addr));
492 e.addr_list.emplace_back(*reinterpret_cast<const in_addr*>(*p));
493 break;
494 case AF_INET6:
495 assert(size_t(host.h_length) >= sizeof(in6_addr));
496 e.addr_list.emplace_back(*reinterpret_cast<const in6_addr*>(*p));
497 break;
498 default:
499 break;
500 }
501 ++p;
502 }
503
504 dns_log.debug("Query success: {}/{}", e.names.front(), e.addr_list.front());
505
506 return e;
507 }
508 // We need to partially ref-count our socket entries
509 // when we have pending reads/writes, so we don't erase the
510 // entry to early.
511 void use(ares_socket_t fd) {
512 _gate.enter();
513 auto& e = _sockets.at(fd);
514 ++e.pending;
515 }
516 void release(ares_socket_t fd) {
517 auto& e = _sockets.at(fd);
518 dns_log.trace("Release socket {} -> {}", fd, e.pending - 1);
519 if (--e.pending < 0) {
520 _sockets.erase(fd);
521 dns_log.trace("Released socket {}", fd);
522 }
523 _gate.leave();
524 }
525 ares_socket_t do_socket(int af, int type, int protocol) {
526 if (_closed) {
527 return -1;
528 }
529 int fd = next_fd();
530 switch (type) {
531 case SOCK_STREAM:
532 _sockets.emplace(fd, connected_socket());
533 dns_log.trace("Created tcp socket {}", fd);
534 break;
535 case SOCK_DGRAM:
536 _sockets.emplace(fd, _stack.make_udp_channel());
537 dns_log.trace("Created udp socket {}", fd);
538 break;
539 default: return -1;
540 }
541 return fd;
542 }
543 int do_close(ares_socket_t fd) {
544 dns_log.trace("Close socket {}", fd);
545 auto& e = _sockets.at(fd);
546
547 // Mark as closed.
548 if (std::exchange(e.closed, true)) {
549 return 0;
550 }
551
552 _gate.enter(); // "leave" is done in release(fd)
553
554 switch (e.typ) {
555 case type::tcp:
556 {
557 dns_log.trace("Close tcp socket {}, {} pending", fd, e.pending);
558 future<> f = make_ready_future();
559 if (e.tcp.in) {
560 e.tcp.socket.shutdown_input();
561 dns_log.trace("Closed tcp socket {} input", fd);
562 }
563 if (e.tcp.out) {
564 f = f.then([&e] {
565 return e.tcp.out->close();
566 }).then([fd] {
567 dns_log.trace("Closed tcp socket {} output", fd);
568 });
569 }
570 f = f.finally([me = shared_from_this(), fd] {
571 me->release(fd);
572 });
573 break;
574 }
575 case type::udp:
576 e.udp.channel.shutdown_input();
577 e.udp.channel.shutdown_output();
578 release(fd);
579 break;
580 default:
581 // should not happen
582 _gate.leave();
583 break;
584 }
585 return 0;
586 }
587 socket_address sock_addr(const sockaddr * addr, socklen_t len) {
588 if (addr->sa_family != AF_INET) {
589 throw std::invalid_argument("No ipv6 yet");
590 }
591 auto in = reinterpret_cast<const sockaddr_in *>(addr);
592 return *in;
593 }
594 int do_connect(ares_socket_t fd, const sockaddr * addr, socklen_t len) {
595 if (_closed) {
596 return -1;
597 }
598 try {
599 auto& e = get_socket_entry(fd);
600 auto sa = sock_addr(addr, len);
601
602 dns_log.trace("Connect {}({})->{}", fd, int(e.typ), sa);
603
604 assert(e.avail == 0);
605
606 e.avail = POLLOUT|POLLIN; // until we know otherwise
607
608 switch (e.typ) {
609 case type::tcp: {
610 auto f = _stack.connect(sa);
611 if (!f.available()) {
612 dns_log.trace("Connection pending: {}", fd);
613 e.avail = 0;
614 use(fd);
615 // FIXME: future is discarded
616 (void)f.then_wrapped([me = shared_from_this(), &e, fd](future<connected_socket> f) {
617 try {
618 e.tcp.socket = f.get0();
619 dns_log.trace("Connection complete: {}", fd);
620 } catch (...) {
621 dns_log.debug("Connect {} failed: {}", fd, std::current_exception());
622 }
623 e.avail = POLLOUT|POLLIN;
624 me->poll_sockets();
625 me->release(fd);
626 });
627 errno = EWOULDBLOCK;
628 return -1;
629 }
630 e.tcp.socket = f.get0();
631 break;
632 }
633 case type::udp:
634 // we do not have udp connect, so just keep
635 // track of the destination
636 e.udp.dst = sa;
637 break;
638 default:
639 return -1;
640 }
641 return 0;
642 } catch (...) {
643 return -1;
644 }
645 }
646 ssize_t do_recvfrom(ares_socket_t fd, void * dst, size_t len, int flags, struct sockaddr * from, socklen_t * from_len) {
647 if (_closed) {
648 return -1;
649 }
650 try {
651 auto& e = get_socket_entry(fd);
652 dns_log.trace("Read {}({})", fd, int(e.typ));
653 // check if we're already reading.
654 if (!(e.avail & POLLIN)) {
655 dns_log.trace("Read already pending {}", fd);
656 errno = EWOULDBLOCK;
657 return -1;
658 }
659 for (;;) {
660 switch (e.typ) {
661 case type::tcp: {
662 auto & tcp = e.tcp;
663 if (!tcp.indata.empty()) {
664 dns_log.trace("Read {}. {} bytes available", fd, tcp.indata.size());
665 len = std::min(len, tcp.indata.size());
666 std::copy(tcp.indata.begin(), tcp.indata.begin() + len, reinterpret_cast<char *>(dst));
667 tcp.indata.trim_front(len);
668 return len;
669 }
670 if (!tcp.in) {
671 tcp.in = tcp.socket.input();
672 }
673 auto f = tcp.in->read_up_to(len);
674 if (!f.available()) {
675 dns_log.trace("Read {}: data unavailable", fd);
676 e.avail &= ~POLLIN;
677 use(fd);
678 // FIXME: future is discarded
679 (void)f.then_wrapped([me = shared_from_this(), &e, fd](future<temporary_buffer<char>> f) {
680 try {
681 auto buf = f.get0();
682 dns_log.trace("Read {} -> {} bytes", fd, buf.size());
683 e.tcp.indata = std::move(buf);
684 } catch (...) {
685 dns_log.debug("Read {} failed: {}", fd, std::current_exception());
686 }
687 e.avail |= POLLIN; // always reset state
688 me->poll_sockets();
689 me->release(fd);
690 });
691 errno = EWOULDBLOCK;
692 return -1;
693 }
694
695 try {
696 tcp.indata = f.get0();
697 continue; // loop will take care of data
698 } catch (std::system_error& e) {
699 errno = e.code().value();
700 return -1;
701 } catch (...) {
702 }
703 return -1;
704
705 }
706 case type::udp: {
707 auto & udp = e.udp;
708 if (udp.in) {
709 auto & p = udp.in->get_data();
710
711 dns_log.trace("Read {}. {} bytes available from {}", fd, p.len(), udp.in->get_src());
712
713 if (from != nullptr) {
714 *from = socket_address(udp.in->get_src()).as_posix_sockaddr();
715 if (from_len != nullptr) {
716 // TODO: ipvv6
717 *from_len = sizeof(sockaddr_in);
718 }
719 }
720
721 len = std::min(len, size_t(p.len()));
722 size_t rem = len;
723 auto * out = reinterpret_cast<char *>(dst);
724 for (auto & f : p.fragments()) {
725 auto n = std::min(rem, f.size);
726 out = std::copy_n(f.base, n, out);
727 rem = rem - n;
728 }
729 if (p.len() == len) {
730 udp.in = {};
731 } else {
732 p.trim_front(len);
733 }
734 return len;
735 }
736 auto f = udp.channel.receive();
737 if (!f.available()) {
738 e.avail &= ~POLLIN;
739 use(fd);
740 dns_log.trace("Read {}: data unavailable", fd);
741 // FIXME: future is discarded
742 (void)f.then_wrapped([me = shared_from_this(), &e, fd](future<net::udp_datagram> f) {
743 try {
744 auto d = f.get0();
745 dns_log.trace("Read {} -> {} bytes", fd, d.get_data().len());
746 e.udp.in = std::move(d);
747 e.avail |= POLLIN;
748 } catch (...) {
749 dns_log.debug("Read {} failed: {}", fd, std::current_exception());
750 }
751 me->poll_sockets();
752 me->release(fd);
753 });
754 errno = EWOULDBLOCK;
755 return -1;
756 }
757
758 try {
759 udp.in = f.get0();
760 continue; // loop will take care of data
761 } catch (std::system_error& e) {
762 errno = e.code().value();
763 return -1;
764 } catch (...) {
765 }
766 return -1;
767 }
768 default:
769 return -1;
770 }
771 }
772 } catch (...) {
773 }
774 return -1;
775 }
776 ssize_t do_sendv(ares_socket_t fd, const iovec * vec, int len) {
777 if (_closed) {
778 return -1;
779 }
780 try {
781 auto& e = _sockets.at(fd);
782 dns_log.trace("Send {}({})", fd, int(e.typ));
783
784 // Assume we will be able to send data eventually very soon
785 // and just assume that unless we get immediate
786 // failures, we'll be ok. If we're not, the
787 // timeout logic will have to handle the problem.
788 //
789 // This saves us on two accounts:
790 // 1.) c-ares does not handle EWOULDBLOCK for
791 // udp sockets. Must pretend to finish
792 // immediately there anyway
793 // 2.) Doing so for tcp writes saves us having to
794 // match iovec->packet fragments. Downside is we
795 // have to copy the data, but we pretty much
796 // have to anyway, since we could otherwise
797 // get a query time out while we're sending
798 // with zero-copy and suddenly have freed
799 // memory in packets. Bad.
800
801
802 for (;;) {
803 // check if we're already writing.
804 if (e.typ == type::tcp && !(e.avail & POLLOUT)) {
805 dns_log.trace("Send already pending {}", fd);
806 errno = EWOULDBLOCK;
807 return -1;
808 }
809
810 net::packet p;
811 p.reserve(len);
812 for (int i = 0; i < len; ++i) {
813 p = net::packet(std::move(p), net::fragment{reinterpret_cast<char *>(vec[i].iov_base), vec[i].iov_len});
814 }
815
816 auto bytes = p.len();
817 auto f = make_ready_future();
818
819 switch (e.typ) {
820 case type::tcp:
821 if (!e.tcp.out) {
822 e.tcp.out = e.tcp.socket.output(0);
823 }
824 f = e.tcp.out->write(std::move(p));
825 break;
826 case type::udp:
827 f = e.udp.channel.send(e.udp.dst, std::move(p));
828 break;
829 default:
830 return -1;
831 }
832
833 if (!f.available()) {
834 dns_log.trace("Send {} unavailable.", fd);
835 e.avail &= ~POLLOUT;
836 use(fd);
837 // FIXME: future is discarded
838 (void)f.then_wrapped([me = shared_from_this(), &e, bytes, fd](future<> f) {
839 try {
840 f.get();
841 dns_log.trace("Send {}. {} bytes sent.", fd, bytes);
842 } catch (...) {
843 dns_log.debug("Send {} failed: {}", fd, std::current_exception());
844 }
845 e.avail |= POLLOUT;
846 me->poll_sockets();
847 me->release(fd);
848 });
849 // c-ares does _not_ use non-blocking retry for udp sockets. We just pretend
850 // all is fine even though we have no idea. Barring stack/adapter failure it
851 // is close to the same guarantee a "normal" message send would have anyway.
852 // For tcp we also pretend we're done, to make sure we don't have to deal with
853 // matching sent data
854 return bytes;
855 }
856 if (f.failed()) {
857 try {
858 f.get();
859 } catch (std::system_error& e) {
860 errno = e.code().value();
861 } catch (...) {
862 }
863 return -1;
864 }
865
866 return bytes;
867 }
868 } catch (...) {
869 }
870 return -1;
871 }
872
873 // Note: cannot use to much here, because fd_sets only handle
874 // ~1024 fd:s. Set to something below that in case you need to
875 // debug (maybe)
876 static constexpr ares_socket_t socket_offset = 1;
877
878 ares_socket_t next_fd() {
879 ares_socket_t fd = ares_socket_t(_sockets.size() + socket_offset);
880 while (_sockets.count(fd)) {
881 ++fd;
882 }
883 return fd;
884 }
885 struct tcp_entry {
886 tcp_entry(connected_socket s)
887 : socket(std::move(s)) {
888 }
889 ;
890 connected_socket socket;
891 std::optional<input_stream<char>> in;
892 std::optional<output_stream<char>> out;
893 temporary_buffer<char> indata;
894 };
895 struct udp_entry {
896 udp_entry(net::udp_channel c)
897 : channel(std::move(c)) {
898 }
899 net::udp_channel channel;
900 std::optional<net::udp_datagram> in;;
901 socket_address dst;
902 };
903 struct sock_entry {
904 union {
905 tcp_entry tcp;
906 udp_entry udp;
907 };
908 type typ;
909 int avail = 0;
910 int pending = 0;
911 bool closed = false;
912
913 sock_entry(sock_entry&& e)
914 : typ(e.typ)
915 , avail(e.avail)
916 {
917 e.typ = type::none;
918 switch (typ) {
919 case type::tcp:
920 tcp = std::move(e.tcp);
921 break;
922 case type::udp:
923 udp = std::move(e.udp);
924 break;
925 default:
926 break;
927 }
928 }
929 sock_entry(connected_socket s)
930 : tcp(tcp_entry{std::move(s)})
931 , typ(type::tcp)
932 {}
933 sock_entry(net::udp_channel c)
934 : udp(udp_entry{std::move(c)})
935 , typ(type::udp)
936 {}
937 ~sock_entry() {
938 switch (typ) {
939 case type::tcp: tcp.~tcp_entry(); break;
940 case type::udp: udp.~udp_entry(); break;
941 default: break;
942 }
943 }
944 };
945
946 sock_entry& get_socket_entry(ares_socket_t fd) {
947 auto& e = _sockets.at(fd);
948 if (e.closed) {
949 throw std::runtime_error("Socket closed");
950 }
951 return e;
952 }
953
954
955 typedef std::unordered_map<ares_socket_t, sock_entry> socket_map;
956
957 friend struct dns_call;
958
959 socket_map _sockets;
960 network_stack & _stack;
961
962 ares_channel _channel = {};
963 uint64_t _calls = 0;
964 std::chrono::milliseconds _timeout;
965 timer<> _timer;
966 gate _gate;
967 bool _closed = false;
968 };
969
970 net::dns_resolver::dns_resolver()
971 : dns_resolver(options())
972 {}
973
974 net::dns_resolver::dns_resolver(const options& opts)
975 : dns_resolver(engine().net(), opts)
976 {}
977
978 net::dns_resolver::dns_resolver(network_stack& stack, const options& opts)
979 : _impl(make_shared<impl>(stack, opts))
980 {}
981
982 net::dns_resolver::~dns_resolver()
983 {}
984
985 net::dns_resolver::dns_resolver(dns_resolver&&) noexcept = default;
986 net::dns_resolver& net::dns_resolver::operator=(dns_resolver&&) noexcept = default;
987
988 future<net::hostent> net::dns_resolver::get_host_by_name(const sstring& name, opt_family family) {
989 return _impl->get_host_by_name(name, family);
990 }
991
992 future<net::hostent> net::dns_resolver::get_host_by_addr(const inet_address& addr) {
993 return _impl->get_host_by_addr(addr);
994 }
995
996 future<net::inet_address> net::dns_resolver::resolve_name(const sstring& name, opt_family family) {
997 return _impl->resolve_name(name, family);
998 }
999
1000 future<sstring> net::dns_resolver::resolve_addr(const inet_address& addr) {
1001 return _impl->resolve_addr(addr);
1002 }
1003
1004 future<net::dns_resolver::srv_records> net::dns_resolver::get_srv_records(net::dns_resolver::srv_proto proto,
1005 const sstring& service,
1006 const sstring& domain) {
1007 return _impl->get_srv_records(proto, service, domain);
1008 }
1009
1010 future<> net::dns_resolver::close() {
1011 return _impl->close();
1012 }
1013
1014 static net::dns_resolver& resolver() {
1015 static thread_local net::dns_resolver resolver;
1016 return resolver;
1017 }
1018
1019
1020 future<net::hostent> net::dns::get_host_by_name(const sstring& name, opt_family family) {
1021 return resolver().get_host_by_name(name, family);
1022 }
1023
1024 future<net::hostent> net::dns::get_host_by_addr(const inet_address& addr) {
1025 return resolver().get_host_by_addr(addr);
1026 }
1027
1028 future<net::inet_address> net::dns::resolve_name(const sstring& name, opt_family family) {
1029 return resolver().resolve_name(name, family);
1030 }
1031
1032 future<sstring> net::dns::resolve_addr(const inet_address& addr) {
1033 return resolver().resolve_addr(addr);
1034 }
1035
1036 future<net::dns_resolver::srv_records> net::dns::get_srv_records(net::dns_resolver::srv_proto proto,
1037 const sstring& service,
1038 const sstring& domain) {
1039 return resolver().get_srv_records(proto, service, domain);
1040 }
1041
1042 future<sstring> net::inet_address::hostname() const {
1043 return dns::resolve_addr(*this);
1044 }
1045
1046 future<std::vector<sstring>> net::inet_address::aliases() const {
1047 return dns::get_host_by_addr(*this).then([](hostent e) {
1048 return make_ready_future<std::vector<sstring>>(std::move(e.names));
1049 });
1050 }
1051
1052 future<net::inet_address> net::inet_address::find(
1053 const sstring& name) {
1054 return dns::resolve_name(name);
1055 }
1056
1057 future<net::inet_address> net::inet_address::find(
1058 const sstring& name, family f) {
1059 return dns::resolve_name(name, f);
1060 }
1061
1062 future<std::vector<net::inet_address>> net::inet_address::find_all(
1063 const sstring& name) {
1064 return dns::get_host_by_name(name).then([](hostent e) {
1065 return make_ready_future<std::vector<net::inet_address>>(std::move(e.addr_list));
1066 });
1067 }
1068
1069 future<std::vector<net::inet_address>> net::inet_address::find_all(
1070 const sstring& name, family f) {
1071 return dns::get_host_by_name(name, f).then([](hostent e) {
1072 return make_ready_future<std::vector<net::inet_address>>(std::move(e.addr_list));
1073 });
1074 }
1075
1076 }