]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/net/dns.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / src / net / dns.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2016 Cloudius Systems
20 */
21
22 #include <arpa/nameser.h>
23 #include <chrono>
24
25 #include <ares.h>
26 #include <boost/lexical_cast.hpp>
27
28 #include <ostream>
29 #include <seastar/util/std-compat.hh>
30 #include <seastar/net/inet_address.hh>
31
32 #include <seastar/net/ip.hh>
33 #include <seastar/net/api.hh>
34 #include <seastar/net/dns.hh>
35 #include <seastar/core/sstring.hh>
36 #include <seastar/core/timer.hh>
37 #include <seastar/core/reactor.hh>
38 #include <seastar/core/gate.hh>
39 #include <seastar/core/print.hh>
40
41 namespace seastar::net {
42
43 // NOTE: Should be prior to <seastar/util/log.hh> include because
44 // logger::stringer_for<T> needs to see the corresponding `operator <<`
45 // declaration at the call site
46 //
47 // This doesn't need to be in the public API, so leave it there instead of placing into `inet_address.hh`
48 std::ostream& operator<<(std::ostream& os, const opt_family& f) {
49 if (f) {
50 return os << *f;
51 } else {
52 return os << "ANY";
53 }
54 }
55
56 }
57
58 #if FMT_VERSION >= 90000
59 template <> struct fmt::formatter<seastar::net::opt_family> : fmt::ostream_formatter {};
60 #endif
61
62 #include <seastar/util/log.hh>
63
64 namespace seastar {
65
66 static logger dns_log("dns_resolver");
67
68 class ares_error_category : public std::error_category {
69 public:
70 constexpr ares_error_category() noexcept : std::error_category{} {}
71 const char * name() const noexcept {
72 return "C-Ares";
73 }
74 std::string message(int error) const {
75 switch (error) {
76 /* Server error codes (ARES_ENODATA indicates no relevant answer) */
77 case ARES_ENODATA: return "No data";
78 case ARES_EFORMERR: return "Form error";
79 case ARES_ESERVFAIL: return "Server failure";
80 case ARES_ENOTFOUND: return "Not found";
81 case ARES_ENOTIMP: return "Not implemented";
82 case ARES_EREFUSED: return "Refused";
83
84 /* Locally generated error codes */
85 case ARES_EBADQUERY: return "Bad query";
86 case ARES_EBADNAME: return "Bad name";
87 case ARES_EBADFAMILY: return "Bad family";
88 case ARES_EBADRESP: return "Bad response";
89 case ARES_ECONNREFUSED :return "Connection refused";
90 case ARES_ETIMEOUT: return "Timeout";
91 case ARES_EOF: return "EOF";
92 case ARES_EFILE: return "File error";
93 case ARES_ENOMEM: return "No memory";
94 case ARES_EDESTRUCTION: return "Destruction";
95 case ARES_EBADSTR: return "Bad string";
96
97 /* ares_getnameinfo error codes */
98 case ARES_EBADFLAGS: return "Invalid flags";
99
100 /* ares_getaddrinfo error codes */
101 case ARES_ENONAME: return "No name";
102 case ARES_EBADHINTS: return "Bad hints";
103
104 /* Uninitialized library error code */
105 case ARES_ENOTINITIALIZED: return "Not initialized";
106
107 /* ares_library_init error codes */
108 case ARES_ELOADIPHLPAPI: return "Load PHLPAPI";
109 case ARES_EADDRGETNETWORKPARAMS: return "Get network parameters";
110
111 /* More error codes */
112 case ARES_ECANCELLED: return "Cancelled";
113 default:
114 return "Unknown error";
115 }
116 }
117 };
118
119 static const ares_error_category ares_errorc;
120
121 static void check_ares_error(int error) {
122 if (error != ARES_SUCCESS) {
123 throw std::system_error(error, ares_errorc);
124 }
125 }
126
127 struct ares_initializer {
128 ares_initializer() {
129 check_ares_error(ares_library_init(0));
130 }
131 ~ares_initializer() {
132 ares_library_cleanup();
133 }
134 };
135
136 class net::dns_resolver::impl
137 : public enable_shared_from_this<impl>
138 {
139 public:
140 impl(network_stack& stack, const options& opts)
141 : _stack(stack)
142 , _timeout(opts.timeout ? *opts.timeout : std::chrono::milliseconds(5000) /* from ares private */)
143 , _timer(std::bind(&impl::poll_sockets, this))
144 {
145 static const ares_initializer a_init;
146
147 // this can "block" ever so slightly, because it will
148 // look in resolv.conf etc for query setup. We could
149 // do this ourselves, and instead set ares options
150 // here, but it seems more error prone (me parsing
151 // resolv.conf -> hah!)
152 ares_options a_opts = { 0, };
153
154 // For now, use the default "fb" query order
155 // (set explicitly lest we forget).
156 // We only do querying dns server really async.
157 // Reading hosts files is doen by c-ares internally
158 // and with normal fread calls. Thus they theorectically
159 // block. This can potentially be an issue for some application
160 // and if so, we need to revisit this. For now, assume
161 // it won't block us in any measurable way.
162 char buf[3] = "fb";
163 a_opts.lookups = buf; // only net
164 // Always set the timeout
165 a_opts.timeout = _timeout.count();
166 int flags = ARES_OPT_LOOKUPS|ARES_OPT_TIMEOUTMS;
167
168 if (opts.use_tcp_query && *opts.use_tcp_query) {
169 a_opts.flags = ARES_FLAG_USEVC | ARES_FLAG_PRIMARY;
170 flags |= ARES_OPT_FLAGS;
171 }
172 std::vector<in_addr> addr_tmp;
173 if (opts.servers) {
174 std::transform(opts.servers->begin(), opts.servers->end(), std::back_inserter(addr_tmp), [](const inet_address& a) {
175 if (a.in_family() != inet_address::family::INET) {
176 throw std::invalid_argument("Servers must be ipv4 addresses");
177 }
178 in_addr in = a;
179 return in;
180 });
181 a_opts.servers = addr_tmp.data();
182 a_opts.nservers = int(addr_tmp.size());
183 flags |= ARES_OPT_SERVERS;
184 }
185 std::vector<const char *> dom_tmp;
186 if (opts.domains) {
187 std::transform(opts.domains->begin(), opts.domains->end(), std::back_inserter(dom_tmp), [](const sstring& s) {
188 return s.data();
189 });
190 a_opts.domains = const_cast<char **>(dom_tmp.data());
191 a_opts.ndomains = int(dom_tmp.size());
192 flags |= ARES_OPT_DOMAINS;
193 }
194 if (opts.tcp_port) {
195 a_opts.tcp_port = *opts.tcp_port;
196 flags |= ARES_OPT_TCP_PORT;
197 }
198 if (opts.udp_port) {
199 a_opts.udp_port = *opts.udp_port;
200 flags |= ARES_OPT_UDP_PORT;
201 }
202
203 check_ares_error(ares_init_options(&_channel, &a_opts, flags));
204
205 static auto get_impl = [](void * p) { return reinterpret_cast<impl *>(p); };
206 static const ares_socket_functions callbacks = {
207 [](int af, int type, int protocol, void * p) { return get_impl(p)->do_socket(af, type, protocol); },
208 [](ares_socket_t s, void * p) { return get_impl(p)->do_close(s); },
209 [](ares_socket_t s, const struct sockaddr * addr, socklen_t len, void * p) { return get_impl(p)->do_connect(s, addr, len); },
210 [](ares_socket_t s, void * dst, size_t len, int flags, struct sockaddr * addr, socklen_t * alen, void * p) {
211 return get_impl(p)->do_recvfrom(s, dst, len, flags, addr, alen);
212 },
213 [](ares_socket_t s, const struct iovec * vec, int len, void * p) {
214 return get_impl(p)->do_sendv(s, vec, len);
215 },
216 };
217
218 ares_set_socket_functions(_channel, &callbacks, this);
219
220 // just in case you need printf-debug.
221 // dns_log.set_level(log_level::trace);
222 }
223 ~impl() {
224 _timer.cancel();
225 if (_channel) {
226 ares_destroy(_channel);
227 }
228 }
229
230 future<inet_address> resolve_name(sstring name, opt_family family) {
231 return get_host_by_name(std::move(name), family).then([](hostent h) {
232 return make_ready_future<inet_address>(h.addr_list.front());
233 });
234 }
235
236 future<hostent> get_host_by_name(sstring name, opt_family family) {
237 class promise_wrap : public promise<hostent> {
238 public:
239 promise_wrap(sstring s)
240 : name(std::move(s))
241 {}
242 sstring name;
243 };
244
245 dns_log.debug("Query name {} ({})", name, family);
246
247 if (!family) {
248 auto res = inet_address::parse_numerical(name);
249 if (res) {
250 return make_ready_future<hostent>(hostent{ {name}, {*res}});
251 }
252 }
253
254 auto p = new promise_wrap(std::move(name));
255 auto f = p->get_future();
256
257 dns_call call(*this);
258
259 auto af = family ? int(*family) : AF_UNSPEC;
260
261 // The following pragma is needed to work around a false-positive warning
262 // in Gcc 11 (see https://gcc.gnu.org/bugzilla/show_bug.cgi?id=96003).
263 #pragma GCC diagnostic ignored "-Wnonnull"
264 ares_gethostbyname(_channel, p->name.c_str(), af, [](void* arg, int status, int timeouts, ::hostent* host) {
265 // we do potentially allocating operations below, so wrap the pointer in a
266 // unique here.
267 std::unique_ptr<promise_wrap> p(reinterpret_cast<promise_wrap *>(arg));
268
269 switch (status) {
270 default:
271 dns_log.debug("Query failed: {}", status);
272 p->set_exception(std::system_error(status, ares_errorc, p->name));
273 break;
274 case ARES_SUCCESS:
275 p->set_value(make_hostent(*host));
276 break;
277 }
278
279 }, reinterpret_cast<void *>(p));
280
281
282 poll_sockets();
283
284 return f.finally([this] {
285 end_call();
286 });
287 }
288
289 future<hostent> get_host_by_addr(inet_address addr) {
290 class promise_wrap : public promise<hostent> {
291 public:
292 promise_wrap(inet_address a)
293 : addr(std::move(a))
294 {}
295 inet_address addr;
296 };
297
298 dns_log.debug("Query addr {}", addr);
299
300 auto p = new promise_wrap(std::move(addr));
301 auto f = p->get_future();
302
303 dns_call call(*this);
304
305 ares_gethostbyaddr(_channel, p->addr.data(), p->addr.size(), int(p->addr.in_family()), [](void* arg, int status, int timeouts, ::hostent* host) {
306 // we do potentially allocating operations below, so wrap the pointer in a
307 // unique here.
308 std::unique_ptr<promise_wrap> p(reinterpret_cast<promise_wrap *>(arg));
309
310 switch (status) {
311 default:
312 dns_log.debug("Query failed: {}", status);
313 p->set_exception(std::system_error(status, ares_errorc, boost::lexical_cast<std::string>(p->addr)));
314 break;
315 case ARES_SUCCESS:
316 p->set_value(make_hostent(*host));
317 break;
318 }
319
320 }, reinterpret_cast<void *>(p));
321
322
323 poll_sockets();
324
325 return f.finally([this] {
326 end_call();
327 });
328 }
329
330 future<srv_records> get_srv_records(srv_proto proto,
331 const sstring& service,
332 const sstring& domain) {
333 auto p = std::make_unique<promise<srv_records>>();
334 auto f = p->get_future();
335
336 const auto query = format("_{}._{}.{}",
337 service,
338 proto == srv_proto::tcp ? "tcp" : "udp",
339 domain);
340
341 dns_log.debug("Query srv {}", query);
342
343 dns_call call(*this);
344
345 ares_query(_channel, query.c_str(), ns_c_in, ns_t_srv,
346 [](void* arg, int status, int timeouts,
347 unsigned char* buf, int len) {
348 auto p = std::unique_ptr<promise<srv_records>>(
349 reinterpret_cast<promise<srv_records> *>(arg));
350 if (status != ARES_SUCCESS) {
351 dns_log.debug("Query failed: {}", status);
352 p->set_exception(std::system_error(status, ares_errorc));
353 return;
354 }
355 ares_srv_reply* start = nullptr;
356 status = ares_parse_srv_reply(buf, len, &start);
357 if (status != ARES_SUCCESS) {
358 dns_log.debug("Parse failed: {}", status);
359 p->set_exception(std::system_error(status, ares_errorc));
360 return;
361 }
362 try {
363 p->set_value(make_srv_records(start));
364 } catch (...) {
365 p->set_exception(std::current_exception());
366 }
367 ares_free_data(start);
368 }, reinterpret_cast<void *>(p.release()));
369
370
371 poll_sockets();
372
373 return f.finally([this] {
374 end_call();
375 });
376 }
377
378 future<sstring> resolve_addr(inet_address addr) {
379 return get_host_by_addr(addr).then([](hostent h) {
380 return make_ready_future<sstring>(h.names.front());
381 });
382 }
383
384 future<> close() {
385 _closed = true;
386 ares_cancel(_channel);
387 dns_log.trace("Shutting down {} sockets", _sockets.size());
388 for (auto & p : _sockets) {
389 do_close(p.first);
390 }
391 dns_log.trace("Closing gate");
392 return _gate.close();
393 }
394 private:
395 enum class type {
396 none, tcp, udp
397 };
398 struct dns_call {
399 dns_call(impl & i)
400 : _i(i)
401 , _c(++i._calls)
402 {}
403 ~dns_call() {
404 // If a query does not immediately complete
405 // it might never do so, unless data actually
406 // comes back to us and a waiting recv promise
407 // is fulfilled.
408 // We need to add a timer to do polling at ~timeout
409 // ms later, so the ares logic can detect this and
410 // tell us we're over.
411 if (_c == 1 && _i._calls != 0) {
412 _i._timer.arm_periodic(_i._timeout);
413 }
414 }
415 impl& _i;
416 uint64_t _c;
417 };
418
419 void end_call() {
420 if (--_calls == 0) {
421 _timer.cancel();
422 }
423 }
424 void poll_sockets() {
425 fd_set readers, writers;
426 int n = 0;
427
428 dns_log.trace("Poll sockets");
429
430 do {
431 // Retrieve the set of file descriptors that the library wants us to monitor.
432 FD_ZERO(&readers);
433 FD_ZERO(&writers);
434
435 n = ares_fds(_channel, &readers, &writers);
436
437 dns_log.trace("ares_fds: {}", n);
438
439 if (n == 0) {
440 break;
441 }
442
443 n = 0;
444
445 for (auto & p : _sockets) {
446 auto & e = p.second;
447 auto fd = p.first;
448 auto r = FD_ISSET(p.first, &readers);
449 auto w = FD_ISSET(p.first, &writers);
450 auto ra = e.avail & POLLIN;
451 auto wa = e.avail & POLLOUT;
452
453 dns_log.trace("fd {} {}{}/{}{}", fd, (r ? "r" : ""),
454 (w ? "w" : ""), (ra ? "r" : ""),
455 (wa ? "w" : ""));
456
457 if (!wa) {
458 FD_CLR(fd, &writers);
459 }
460 if (!ra) {
461 FD_CLR(fd, &readers);
462 }
463 if (FD_ISSET(fd, &writers) || FD_ISSET(fd, &readers)) {
464 ++n;
465 }
466 }
467
468 ares_process(_channel, &readers, &writers);
469 } while (n != 0);
470 }
471
472 static srv_records make_srv_records(ares_srv_reply* start) {
473 srv_records records;
474 for (auto reply = start; reply; reply = reply->next) {
475 srv_record record = {reply->priority,
476 reply->weight,
477 reply->port,
478 sstring{reply->host}};
479 records.push_back(std::move(record));
480 }
481 return records;
482 }
483
484 static hostent make_hostent(const ::hostent& host) {
485 hostent e;
486 e.names.emplace_back(host.h_name);
487 auto np = host.h_aliases;
488 while (*np != 0) {
489 e.names.emplace_back(*np++);
490 }
491 auto p = host.h_addr_list;
492 while (*p != nullptr) {
493 switch (host.h_addrtype) {
494 case AF_INET:
495 assert(size_t(host.h_length) >= sizeof(in_addr));
496 e.addr_list.emplace_back(*reinterpret_cast<const in_addr*>(*p));
497 break;
498 case AF_INET6:
499 assert(size_t(host.h_length) >= sizeof(in6_addr));
500 e.addr_list.emplace_back(*reinterpret_cast<const in6_addr*>(*p));
501 break;
502 default:
503 break;
504 }
505 ++p;
506 }
507
508 dns_log.debug("Query success: {}/{}", e.names.front(), e.addr_list.front());
509
510 return e;
511 }
512 // We need to partially ref-count our socket entries
513 // when we have pending reads/writes, so we don't erase the
514 // entry to early.
515 void use(ares_socket_t fd) {
516 _gate.enter();
517 auto& e = _sockets.at(fd);
518 ++e.pending;
519 }
520 void release(ares_socket_t fd) {
521 auto& e = _sockets.at(fd);
522 dns_log.trace("Release socket {} -> {}", fd, e.pending - 1);
523 if (--e.pending < 0) {
524 _sockets.erase(fd);
525 dns_log.trace("Released socket {}", fd);
526 }
527 _gate.leave();
528 }
529 ares_socket_t do_socket(int af, int type, int protocol) {
530 if (_closed) {
531 return -1;
532 }
533 int fd = next_fd();
534 switch (type) {
535 case SOCK_STREAM:
536 _sockets.emplace(fd, connected_socket());
537 dns_log.trace("Created tcp socket {}", fd);
538 break;
539 case SOCK_DGRAM:
540 _sockets.emplace(fd, _stack.make_udp_channel());
541 dns_log.trace("Created udp socket {}", fd);
542 break;
543 default: return -1;
544 }
545 return fd;
546 }
547 int do_close(ares_socket_t fd) {
548 dns_log.trace("Close socket {}", fd);
549 auto& e = _sockets.at(fd);
550
551 // Mark as closed.
552 if (std::exchange(e.closed, true)) {
553 return 0;
554 }
555
556 _gate.enter(); // "leave" is done in release(fd)
557
558 switch (e.typ) {
559 case type::tcp:
560 {
561 dns_log.trace("Close tcp socket {}, {} pending", fd, e.pending);
562 future<> f = make_ready_future();
563 if (e.tcp.in) {
564 e.tcp.socket.shutdown_input();
565 dns_log.trace("Closed tcp socket {} input", fd);
566 }
567 if (e.tcp.out) {
568 f = f.then([&e] {
569 return e.tcp.out->close();
570 }).then([fd] {
571 dns_log.trace("Closed tcp socket {} output", fd);
572 });
573 }
574 f = f.finally([me = shared_from_this(), fd] {
575 me->release(fd);
576 });
577 break;
578 }
579 case type::udp:
580 e.udp.channel.shutdown_input();
581 e.udp.channel.shutdown_output();
582 release(fd);
583 break;
584 default:
585 // should not happen
586 _gate.leave();
587 break;
588 }
589 return 0;
590 }
591 socket_address sock_addr(const sockaddr * addr, socklen_t len) {
592 if (addr->sa_family != AF_INET) {
593 throw std::invalid_argument("No ipv6 yet");
594 }
595 auto in = reinterpret_cast<const sockaddr_in *>(addr);
596 return *in;
597 }
598 int do_connect(ares_socket_t fd, const sockaddr * addr, socklen_t len) {
599 if (_closed) {
600 return -1;
601 }
602 try {
603 auto& e = get_socket_entry(fd);
604 auto sa = sock_addr(addr, len);
605
606 dns_log.trace("Connect {}({})->{}", fd, int(e.typ), sa);
607
608 assert(e.avail == 0);
609
610 e.avail = POLLOUT|POLLIN; // until we know otherwise
611
612 switch (e.typ) {
613 case type::tcp: {
614 auto f = _stack.connect(sa);
615 if (!f.available()) {
616 dns_log.trace("Connection pending: {}", fd);
617 e.avail = 0;
618 use(fd);
619 // FIXME: future is discarded
620 (void)f.then_wrapped([me = shared_from_this(), &e, fd](future<connected_socket> f) {
621 try {
622 e.tcp.socket = f.get0();
623 dns_log.trace("Connection complete: {}", fd);
624 } catch (...) {
625 dns_log.debug("Connect {} failed: {}", fd, std::current_exception());
626 }
627 e.avail = POLLOUT|POLLIN;
628 me->poll_sockets();
629 me->release(fd);
630 });
631 errno = EWOULDBLOCK;
632 return -1;
633 }
634 e.tcp.socket = f.get0();
635 break;
636 }
637 case type::udp:
638 // we do not have udp connect, so just keep
639 // track of the destination
640 e.udp.dst = sa;
641 break;
642 default:
643 return -1;
644 }
645 return 0;
646 } catch (...) {
647 return -1;
648 }
649 }
650 ssize_t do_recvfrom(ares_socket_t fd, void * dst, size_t len, int flags, struct sockaddr * from, socklen_t * from_len) {
651 if (_closed) {
652 return -1;
653 }
654 try {
655 auto& e = get_socket_entry(fd);
656 dns_log.trace("Read {}({})", fd, int(e.typ));
657 // check if we're already reading.
658 if (!(e.avail & POLLIN)) {
659 dns_log.trace("Read already pending {}", fd);
660 errno = EWOULDBLOCK;
661 return -1;
662 }
663 for (;;) {
664 switch (e.typ) {
665 case type::tcp: {
666 auto & tcp = e.tcp;
667 if (!tcp.indata.empty()) {
668 dns_log.trace("Read {}. {} bytes available", fd, tcp.indata.size());
669 len = std::min(len, tcp.indata.size());
670 std::copy(tcp.indata.begin(), tcp.indata.begin() + len, reinterpret_cast<char *>(dst));
671 tcp.indata.trim_front(len);
672 return len;
673 }
674 if (!tcp.socket) {
675 errno = ENOTCONN;
676 return -1;
677 }
678 if (!tcp.in) {
679 tcp.in = tcp.socket.input();
680 }
681 auto f = tcp.in->read_up_to(len);
682 if (!f.available()) {
683 dns_log.trace("Read {}: data unavailable", fd);
684 e.avail &= ~POLLIN;
685 use(fd);
686 // FIXME: future is discarded
687 (void)f.then_wrapped([me = shared_from_this(), &e, fd](future<temporary_buffer<char>> f) {
688 try {
689 auto buf = f.get0();
690 dns_log.trace("Read {} -> {} bytes", fd, buf.size());
691 e.tcp.indata = std::move(buf);
692 } catch (...) {
693 dns_log.debug("Read {} failed: {}", fd, std::current_exception());
694 }
695 e.avail |= POLLIN; // always reset state
696 me->poll_sockets();
697 me->release(fd);
698 });
699 errno = EWOULDBLOCK;
700 return -1;
701 }
702
703 try {
704 tcp.indata = f.get0();
705 continue; // loop will take care of data
706 } catch (std::system_error& e) {
707 errno = e.code().value();
708 return -1;
709 } catch (...) {
710 }
711 return -1;
712
713 }
714 case type::udp: {
715 auto & udp = e.udp;
716 if (udp.in) {
717 auto & p = udp.in->get_data();
718
719 dns_log.trace("Read {}. {} bytes available from {}", fd, p.len(), udp.in->get_src());
720
721 if (from != nullptr) {
722 *from = socket_address(udp.in->get_src()).as_posix_sockaddr();
723 if (from_len != nullptr) {
724 // TODO: ipvv6
725 *from_len = sizeof(sockaddr_in);
726 }
727 }
728
729 len = std::min(len, size_t(p.len()));
730 size_t rem = len;
731 auto * out = reinterpret_cast<char *>(dst);
732 for (auto & f : p.fragments()) {
733 auto n = std::min(rem, f.size);
734 out = std::copy_n(f.base, n, out);
735 rem = rem - n;
736 }
737 if (p.len() == len) {
738 udp.in = {};
739 } else {
740 p.trim_front(len);
741 }
742 return len;
743 }
744 auto f = udp.channel.receive();
745 if (!f.available()) {
746 e.avail &= ~POLLIN;
747 use(fd);
748 dns_log.trace("Read {}: data unavailable", fd);
749 // FIXME: future is discarded
750 (void)f.then_wrapped([me = shared_from_this(), &e, fd](future<net::udp_datagram> f) {
751 try {
752 auto d = f.get0();
753 dns_log.trace("Read {} -> {} bytes", fd, d.get_data().len());
754 e.udp.in = std::move(d);
755 e.avail |= POLLIN;
756 } catch (...) {
757 dns_log.debug("Read {} failed: {}", fd, std::current_exception());
758 }
759 me->poll_sockets();
760 me->release(fd);
761 });
762 errno = EWOULDBLOCK;
763 return -1;
764 }
765
766 try {
767 udp.in = f.get0();
768 continue; // loop will take care of data
769 } catch (std::system_error& e) {
770 errno = e.code().value();
771 return -1;
772 } catch (...) {
773 }
774 return -1;
775 }
776 default:
777 return -1;
778 }
779 }
780 } catch (...) {
781 }
782 return -1;
783 }
784 ssize_t do_sendv(ares_socket_t fd, const iovec * vec, int len) {
785 if (_closed) {
786 return -1;
787 }
788 try {
789 auto& e = _sockets.at(fd);
790 dns_log.trace("Send {}({})", fd, int(e.typ));
791
792 // Assume we will be able to send data eventually very soon
793 // and just assume that unless we get immediate
794 // failures, we'll be ok. If we're not, the
795 // timeout logic will have to handle the problem.
796 //
797 // This saves us on two accounts:
798 // 1.) c-ares does not handle EWOULDBLOCK for
799 // udp sockets. Must pretend to finish
800 // immediately there anyway
801 // 2.) Doing so for tcp writes saves us having to
802 // match iovec->packet fragments. Downside is we
803 // have to copy the data, but we pretty much
804 // have to anyway, since we could otherwise
805 // get a query time out while we're sending
806 // with zero-copy and suddenly have freed
807 // memory in packets. Bad.
808
809
810 for (;;) {
811 // check if we're already writing.
812 if (e.typ == type::tcp && !(e.avail & POLLOUT)) {
813 dns_log.trace("Send already pending {}", fd);
814 errno = EWOULDBLOCK;
815 return -1;
816 }
817
818 if (!e.tcp.socket) {
819 errno = ENOTCONN;
820 return -1;
821 }
822
823 net::packet p;
824 p.reserve(len);
825 for (int i = 0; i < len; ++i) {
826 p = net::packet(std::move(p), net::fragment{reinterpret_cast<char *>(vec[i].iov_base), vec[i].iov_len});
827 }
828
829 auto bytes = p.len();
830 auto f = make_ready_future();
831
832 use(fd);
833
834 switch (e.typ) {
835 case type::tcp:
836 if (!e.tcp.out) {
837 e.tcp.out = e.tcp.socket.output(0);
838 }
839 f = e.tcp.out->write(std::move(p));
840 break;
841 case type::udp:
842 // always chain UDP sends
843 e.udp.f = e.udp.f.finally([&e, p = std::move(p)]() mutable {
844 return e.udp.channel.send(e.udp.dst, std::move(p));;
845 }).finally([fd, me = shared_from_this()] {
846 me->release(fd);
847 });
848 // if we have a fast-fail, give error.
849 if (e.udp.f.failed()) {
850 try {
851 e.udp.f.get();
852 } catch (std::system_error& e) {
853 errno = e.code().value();
854 } catch (...) {
855 }
856 e.udp.f = make_ready_future<>();
857 return -1;
858 }
859 // c-ares does _not_ use non-blocking retry for udp sockets. We just pretend
860 // all is fine even though we have no idea. Barring stack/adapter failure it
861 // is close to the same guarantee a "normal" message send would have anyway.
862 return bytes;
863 default:
864 return -1;
865 }
866
867 if (!f.available()) {
868 dns_log.trace("Send {} unavailable.", fd);
869 e.avail &= ~POLLOUT;
870 // FIXME: future is discarded
871 (void)f.then_wrapped([me = shared_from_this(), &e, bytes, fd](future<> f) {
872 try {
873 f.get();
874 dns_log.trace("Send {}. {} bytes sent.", fd, bytes);
875 } catch (...) {
876 dns_log.debug("Send {} failed: {}", fd, std::current_exception());
877 }
878 e.avail |= POLLOUT;
879 me->poll_sockets();
880 me->release(fd);
881 });
882
883 // For tcp we also pretend we're done, to make sure we don't have to deal with
884 // matching sent data
885 return bytes;
886 }
887
888 release(fd);
889
890 if (f.failed()) {
891 try {
892 f.get();
893 } catch (std::system_error& e) {
894 errno = e.code().value();
895 } catch (...) {
896 }
897 return -1;
898 }
899
900 return bytes;
901 }
902 } catch (...) {
903 }
904 return -1;
905 }
906
907 // Note: cannot use to much here, because fd_sets only handle
908 // ~1024 fd:s. Set to something below that in case you need to
909 // debug (maybe)
910 static constexpr ares_socket_t socket_offset = 1;
911
912 ares_socket_t next_fd() {
913 ares_socket_t fd = ares_socket_t(_sockets.size() + socket_offset);
914 while (_sockets.count(fd)) {
915 ++fd;
916 }
917 return fd;
918 }
919 struct tcp_entry {
920 tcp_entry(connected_socket s)
921 : socket(std::move(s)) {
922 }
923 ;
924 connected_socket socket;
925 std::optional<input_stream<char>> in;
926 std::optional<output_stream<char>> out;
927 temporary_buffer<char> indata;
928 };
929 struct udp_entry {
930 udp_entry(net::udp_channel c)
931 : channel(std::move(c)) {
932 }
933 net::udp_channel channel;
934 std::optional<net::udp_datagram> in;;
935 socket_address dst;
936 future<> f = make_ready_future<>();
937 };
938 struct sock_entry {
939 union {
940 tcp_entry tcp;
941 udp_entry udp;
942 };
943 type typ;
944 int avail = 0;
945 int pending = 0;
946 bool closed = false;
947
948 sock_entry(sock_entry&& e)
949 : typ(e.typ)
950 , avail(e.avail)
951 {
952 e.typ = type::none;
953 switch (typ) {
954 case type::tcp:
955 new (&tcp) tcp_entry(std::move(e.tcp));
956 break;
957 case type::udp:
958 new (&udp) udp_entry(std::move(e.udp));
959 break;
960 default:
961 break;
962 }
963 }
964 sock_entry(connected_socket s)
965 : tcp(tcp_entry{std::move(s)})
966 , typ(type::tcp)
967 {}
968 sock_entry(net::udp_channel c)
969 : udp(udp_entry{std::move(c)})
970 , typ(type::udp)
971 {}
972 ~sock_entry() {
973 switch (typ) {
974 case type::tcp: tcp.~tcp_entry(); break;
975 case type::udp: udp.~udp_entry(); break;
976 default: break;
977 }
978 }
979 };
980
981 sock_entry& get_socket_entry(ares_socket_t fd) {
982 auto& e = _sockets.at(fd);
983 if (e.closed) {
984 throw std::runtime_error("Socket closed");
985 }
986 return e;
987 }
988
989
990 using socket_map = std::unordered_map<ares_socket_t, sock_entry>;
991
992 friend struct dns_call;
993
994 socket_map _sockets;
995 network_stack & _stack;
996
997 ares_channel _channel = {};
998 uint64_t _calls = 0;
999 std::chrono::milliseconds _timeout;
1000 timer<> _timer;
1001 gate _gate;
1002 bool _closed = false;
1003 };
1004
1005 net::dns_resolver::dns_resolver()
1006 : dns_resolver(options())
1007 {}
1008
1009 net::dns_resolver::dns_resolver(const options& opts)
1010 : dns_resolver(engine().net(), opts)
1011 {}
1012
1013 net::dns_resolver::dns_resolver(network_stack& stack, const options& opts)
1014 : _impl(make_shared<impl>(stack, opts))
1015 {}
1016
1017 net::dns_resolver::~dns_resolver()
1018 {}
1019
1020 net::dns_resolver::dns_resolver(dns_resolver&&) noexcept = default;
1021 net::dns_resolver& net::dns_resolver::operator=(dns_resolver&&) noexcept = default;
1022
1023 future<net::hostent> net::dns_resolver::get_host_by_name(const sstring& name, opt_family family) {
1024 return _impl->get_host_by_name(name, family);
1025 }
1026
1027 future<net::hostent> net::dns_resolver::get_host_by_addr(const inet_address& addr) {
1028 return _impl->get_host_by_addr(addr);
1029 }
1030
1031 future<net::inet_address> net::dns_resolver::resolve_name(const sstring& name, opt_family family) {
1032 return _impl->resolve_name(name, family);
1033 }
1034
1035 future<sstring> net::dns_resolver::resolve_addr(const inet_address& addr) {
1036 return _impl->resolve_addr(addr);
1037 }
1038
1039 future<net::dns_resolver::srv_records> net::dns_resolver::get_srv_records(net::dns_resolver::srv_proto proto,
1040 const sstring& service,
1041 const sstring& domain) {
1042 return _impl->get_srv_records(proto, service, domain);
1043 }
1044
1045 future<> net::dns_resolver::close() {
1046 return _impl->close();
1047 }
1048
1049 static net::dns_resolver& resolver() {
1050 static thread_local net::dns_resolver resolver;
1051 return resolver;
1052 }
1053
1054
1055 future<net::hostent> net::dns::get_host_by_name(const sstring& name, opt_family family) {
1056 return resolver().get_host_by_name(name, family);
1057 }
1058
1059 future<net::hostent> net::dns::get_host_by_addr(const inet_address& addr) {
1060 return resolver().get_host_by_addr(addr);
1061 }
1062
1063 future<net::inet_address> net::dns::resolve_name(const sstring& name, opt_family family) {
1064 return resolver().resolve_name(name, family);
1065 }
1066
1067 future<sstring> net::dns::resolve_addr(const inet_address& addr) {
1068 return resolver().resolve_addr(addr);
1069 }
1070
1071 future<net::dns_resolver::srv_records> net::dns::get_srv_records(net::dns_resolver::srv_proto proto,
1072 const sstring& service,
1073 const sstring& domain) {
1074 return resolver().get_srv_records(proto, service, domain);
1075 }
1076
1077 future<sstring> net::inet_address::hostname() const {
1078 return dns::resolve_addr(*this);
1079 }
1080
1081 future<std::vector<sstring>> net::inet_address::aliases() const {
1082 return dns::get_host_by_addr(*this).then([](hostent e) {
1083 return make_ready_future<std::vector<sstring>>(std::move(e.names));
1084 });
1085 }
1086
1087 future<net::inet_address> net::inet_address::find(
1088 const sstring& name) {
1089 return dns::resolve_name(name);
1090 }
1091
1092 future<net::inet_address> net::inet_address::find(
1093 const sstring& name, family f) {
1094 return dns::resolve_name(name, f);
1095 }
1096
1097 future<std::vector<net::inet_address>> net::inet_address::find_all(
1098 const sstring& name) {
1099 return dns::get_host_by_name(name).then([](hostent e) {
1100 return make_ready_future<std::vector<net::inet_address>>(std::move(e.addr_list));
1101 });
1102 }
1103
1104 future<std::vector<net::inet_address>> net::inet_address::find_all(
1105 const sstring& name, family f) {
1106 return dns::get_host_by_name(name, f).then([](hostent e) {
1107 return make_ready_future<std::vector<net::inet_address>>(std::move(e.addr_list));
1108 });
1109 }
1110
1111 }