]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/net/dns.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / net / dns.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2016 Cloudius Systems
20 */
21
22 #include <arpa/nameser.h>
23 #include <chrono>
24
25 #include <ares.h>
26 #include <boost/lexical_cast.hpp>
27
28 #include <ostream>
29 #include <seastar/util/std-compat.hh>
30 #include <seastar/net/inet_address.hh>
31
32 #include <seastar/net/ip.hh>
33 #include <seastar/net/api.hh>
34 #include <seastar/net/dns.hh>
35 #include <seastar/core/sstring.hh>
36 #include <seastar/core/timer.hh>
37 #include <seastar/core/reactor.hh>
38 #include <seastar/core/gate.hh>
39 #include <seastar/core/print.hh>
40
41 namespace seastar::net {
42
43 // NOTE: Should be prior to <seastar/util/log.hh> include because
44 // logger::stringer_for<T> needs to see the corresponding `operator <<`
45 // declaration at the call site
46 //
47 // This doesn't need to be in the public API, so leave it there instead of placing into `inet_address.hh`
48 std::ostream& operator<<(std::ostream& os, const opt_family& f) {
49 if (f) {
50 return os << *f;
51 } else {
52 return os << "ANY";
53 }
54 }
55
56 }
57
58 #include <seastar/util/log.hh>
59
60 namespace seastar {
61
62 static logger dns_log("dns_resolver");
63
64 class ares_error_category : public std::error_category {
65 public:
66 constexpr ares_error_category() noexcept : std::error_category{} {}
67 const char * name() const noexcept {
68 return "C-Ares";
69 }
70 std::string message(int error) const {
71 switch (error) {
72 /* Server error codes (ARES_ENODATA indicates no relevant answer) */
73 case ARES_ENODATA: return "No data";
74 case ARES_EFORMERR: return "Form error";
75 case ARES_ESERVFAIL: return "Server failure";
76 case ARES_ENOTFOUND: return "Not found";
77 case ARES_ENOTIMP: return "Not implemented";
78 case ARES_EREFUSED: return "Refused";
79
80 /* Locally generated error codes */
81 case ARES_EBADQUERY: return "Bad query";
82 case ARES_EBADNAME: return "Bad name";
83 case ARES_EBADFAMILY: return "Bad family";
84 case ARES_EBADRESP: return "Bad response";
85 case ARES_ECONNREFUSED :return "Connection refused";
86 case ARES_ETIMEOUT: return "Timeout";
87 case ARES_EOF: return "EOF";
88 case ARES_EFILE: return "File error";
89 case ARES_ENOMEM: return "No memory";
90 case ARES_EDESTRUCTION: return "Destruction";
91 case ARES_EBADSTR: return "Bad string";
92
93 /* ares_getnameinfo error codes */
94 case ARES_EBADFLAGS: return "Invalid flags";
95
96 /* ares_getaddrinfo error codes */
97 case ARES_ENONAME: return "No name";
98 case ARES_EBADHINTS: return "Bad hints";
99
100 /* Uninitialized library error code */
101 case ARES_ENOTINITIALIZED: return "Not initialized";
102
103 /* ares_library_init error codes */
104 case ARES_ELOADIPHLPAPI: return "Load PHLPAPI";
105 case ARES_EADDRGETNETWORKPARAMS: return "Get network parameters";
106
107 /* More error codes */
108 case ARES_ECANCELLED: return "Cancelled";
109 default:
110 return "Unknown error";
111 }
112 }
113 };
114
115 static const ares_error_category ares_errorc;
116
117 static void check_ares_error(int error) {
118 if (error != ARES_SUCCESS) {
119 throw std::system_error(error, ares_errorc);
120 }
121 }
122
123 struct ares_initializer {
124 ares_initializer() {
125 check_ares_error(ares_library_init(0));
126 }
127 ~ares_initializer() {
128 ares_library_cleanup();
129 }
130 };
131
132 class net::dns_resolver::impl
133 : public enable_shared_from_this<impl>
134 {
135 public:
136 impl(network_stack& stack, const options& opts)
137 : _stack(stack)
138 , _timeout(opts.timeout ? *opts.timeout : std::chrono::milliseconds(5000) /* from ares private */)
139 , _timer(std::bind(&impl::poll_sockets, this))
140 {
141 static const ares_initializer a_init;
142
143 // this can "block" ever so slightly, because it will
144 // look in resolv.conf etc for query setup. We could
145 // do this ourselves, and instead set ares options
146 // here, but it seems more error prone (me parsing
147 // resolv.conf -> hah!)
148 ares_options a_opts = { 0, };
149
150 // For now, use the default "fb" query order
151 // (set explicitly lest we forget).
152 // We only do querying dns server really async.
153 // Reading hosts files is doen by c-ares internally
154 // and with normal fread calls. Thus they theorectically
155 // block. This can potentially be an issue for some application
156 // and if so, we need to revisit this. For now, assume
157 // it won't block us in any measurable way.
158 char buf[3] = "fb";
159 a_opts.lookups = buf; // only net
160 // Always set the timeout
161 a_opts.timeout = _timeout.count();
162 int flags = ARES_OPT_LOOKUPS|ARES_OPT_TIMEOUTMS;
163
164 if (opts.use_tcp_query && *opts.use_tcp_query) {
165 a_opts.flags = ARES_FLAG_USEVC | ARES_FLAG_PRIMARY;
166 flags |= ARES_OPT_FLAGS;
167 }
168 std::vector<in_addr> addr_tmp;
169 if (opts.servers) {
170 std::transform(opts.servers->begin(), opts.servers->end(), std::back_inserter(addr_tmp), [](const inet_address& a) {
171 if (a.in_family() != inet_address::family::INET) {
172 throw std::invalid_argument("Servers must be ipv4 addresses");
173 }
174 in_addr in = a;
175 return in;
176 });
177 a_opts.servers = addr_tmp.data();
178 a_opts.nservers = int(addr_tmp.size());
179 flags |= ARES_OPT_SERVERS;
180 }
181 std::vector<const char *> dom_tmp;
182 if (opts.domains) {
183 std::transform(opts.domains->begin(), opts.domains->end(), std::back_inserter(dom_tmp), [](const sstring& s) {
184 return s.data();
185 });
186 a_opts.domains = const_cast<char **>(dom_tmp.data());
187 a_opts.ndomains = int(dom_tmp.size());
188 flags |= ARES_OPT_DOMAINS;
189 }
190 if (opts.tcp_port) {
191 a_opts.tcp_port = *opts.tcp_port;
192 flags |= ARES_OPT_TCP_PORT;
193 }
194 if (opts.udp_port) {
195 a_opts.udp_port = *opts.udp_port;
196 flags |= ARES_OPT_UDP_PORT;
197 }
198
199 check_ares_error(ares_init_options(&_channel, &a_opts, flags));
200
201 static auto get_impl = [](void * p) { return reinterpret_cast<impl *>(p); };
202 static const ares_socket_functions callbacks = {
203 [](int af, int type, int protocol, void * p) { return get_impl(p)->do_socket(af, type, protocol); },
204 [](ares_socket_t s, void * p) { return get_impl(p)->do_close(s); },
205 [](ares_socket_t s, const struct sockaddr * addr, socklen_t len, void * p) { return get_impl(p)->do_connect(s, addr, len); },
206 [](ares_socket_t s, void * dst, size_t len, int flags, struct sockaddr * addr, socklen_t * alen, void * p) {
207 return get_impl(p)->do_recvfrom(s, dst, len, flags, addr, alen);
208 },
209 [](ares_socket_t s, const struct iovec * vec, int len, void * p) {
210 return get_impl(p)->do_sendv(s, vec, len);
211 },
212 };
213
214 ares_set_socket_functions(_channel, &callbacks, this);
215
216 // just in case you need printf-debug.
217 // dns_log.set_level(log_level::trace);
218 }
219 ~impl() {
220 _timer.cancel();
221 if (_channel) {
222 ares_destroy(_channel);
223 }
224 }
225
226 future<inet_address> resolve_name(sstring name, opt_family family) {
227 return get_host_by_name(std::move(name), family).then([](hostent h) {
228 return make_ready_future<inet_address>(h.addr_list.front());
229 });
230 }
231
232 future<hostent> get_host_by_name(sstring name, opt_family family) {
233 class promise_wrap : public promise<hostent> {
234 public:
235 promise_wrap(sstring s)
236 : name(std::move(s))
237 {}
238 sstring name;
239 };
240
241 dns_log.debug("Query name {} ({})", name, family);
242
243 if (!family) {
244 auto res = inet_address::parse_numerical(name);
245 if (res) {
246 return make_ready_future<hostent>(hostent{ {name}, {*res}});
247 }
248 }
249
250 auto p = new promise_wrap(std::move(name));
251 auto f = p->get_future();
252
253 dns_call call(*this);
254
255 auto af = family ? int(*family) : AF_UNSPEC;
256
257 ares_gethostbyname(_channel, p->name.c_str(), af, [](void* arg, int status, int timeouts, ::hostent* host) {
258 // we do potentially allocating operations below, so wrap the pointer in a
259 // unique here.
260 std::unique_ptr<promise_wrap> p(reinterpret_cast<promise_wrap *>(arg));
261
262 switch (status) {
263 default:
264 dns_log.debug("Query failed: {}", status);
265 p->set_exception(std::system_error(status, ares_errorc, p->name));
266 break;
267 case ARES_SUCCESS:
268 p->set_value(make_hostent(*host));
269 break;
270 }
271
272 }, reinterpret_cast<void *>(p));
273
274
275 poll_sockets();
276
277 return f.finally([this] {
278 end_call();
279 });
280 }
281
282 future<hostent> get_host_by_addr(inet_address addr) {
283 class promise_wrap : public promise<hostent> {
284 public:
285 promise_wrap(inet_address a)
286 : addr(std::move(a))
287 {}
288 inet_address addr;
289 };
290
291 dns_log.debug("Query addr {}", addr);
292
293 auto p = new promise_wrap(std::move(addr));
294 auto f = p->get_future();
295
296 dns_call call(*this);
297
298 ares_gethostbyaddr(_channel, p->addr.data(), p->addr.size(), int(p->addr.in_family()), [](void* arg, int status, int timeouts, ::hostent* host) {
299 // we do potentially allocating operations below, so wrap the pointer in a
300 // unique here.
301 std::unique_ptr<promise_wrap> p(reinterpret_cast<promise_wrap *>(arg));
302
303 switch (status) {
304 default:
305 dns_log.debug("Query failed: {}", status);
306 p->set_exception(std::system_error(status, ares_errorc, boost::lexical_cast<std::string>(p->addr)));
307 break;
308 case ARES_SUCCESS:
309 p->set_value(make_hostent(*host));
310 break;
311 }
312
313 }, reinterpret_cast<void *>(p));
314
315
316 poll_sockets();
317
318 return f.finally([this] {
319 end_call();
320 });
321 }
322
323 future<srv_records> get_srv_records(srv_proto proto,
324 const sstring& service,
325 const sstring& domain) {
326 auto p = std::make_unique<promise<srv_records>>();
327 auto f = p->get_future();
328
329 const auto query = format("_{}._{}.{}",
330 service,
331 proto == srv_proto::tcp ? "tcp" : "udp",
332 domain);
333
334 dns_log.debug("Query srv {}", query);
335
336 dns_call call(*this);
337
338 ares_query(_channel, query.c_str(), ns_c_in, ns_t_srv,
339 [](void* arg, int status, int timeouts,
340 unsigned char* buf, int len) {
341 auto p = std::unique_ptr<promise<srv_records>>(
342 reinterpret_cast<promise<srv_records> *>(arg));
343 if (status != ARES_SUCCESS) {
344 dns_log.debug("Query failed: {}", status);
345 p->set_exception(std::system_error(status, ares_errorc));
346 return;
347 }
348 ares_srv_reply* start = nullptr;
349 status = ares_parse_srv_reply(buf, len, &start);
350 if (status != ARES_SUCCESS) {
351 dns_log.debug("Parse failed: {}", status);
352 p->set_exception(std::system_error(status, ares_errorc));
353 return;
354 }
355 try {
356 p->set_value(make_srv_records(start));
357 } catch (...) {
358 p->set_exception(std::current_exception());
359 }
360 ares_free_data(start);
361 }, reinterpret_cast<void *>(p.release()));
362
363
364 poll_sockets();
365
366 return f.finally([this] {
367 end_call();
368 });
369 }
370
371 future<sstring> resolve_addr(inet_address addr) {
372 return get_host_by_addr(addr).then([](hostent h) {
373 return make_ready_future<sstring>(h.names.front());
374 });
375 }
376
377 future<> close() {
378 _closed = true;
379 ares_cancel(_channel);
380 dns_log.trace("Shutting down {} sockets", _sockets.size());
381 for (auto & p : _sockets) {
382 do_close(p.first);
383 }
384 dns_log.trace("Closing gate");
385 return _gate.close();
386 }
387 private:
388 enum class type {
389 none, tcp, udp
390 };
391 struct dns_call {
392 dns_call(impl & i)
393 : _i(i)
394 , _c(++i._calls)
395 {}
396 ~dns_call() {
397 // If a query does not immediately complete
398 // it might never do so, unless data actually
399 // comes back to us and a waiting recv promise
400 // is fulfilled.
401 // We need to add a timer to do polling at ~timeout
402 // ms later, so the ares logic can detect this and
403 // tell us we're over.
404 if (_c == 1 && _i._calls != 0) {
405 _i._timer.arm_periodic(_i._timeout);
406 }
407 }
408 impl& _i;
409 uint64_t _c;
410 };
411
412 void end_call() {
413 if (--_calls == 0) {
414 _timer.cancel();
415 }
416 }
417 void poll_sockets() {
418 fd_set readers, writers;
419 int n = 0;
420
421 dns_log.trace("Poll sockets");
422
423 do {
424 // Retrieve the set of file descriptors that the library wants us to monitor.
425 FD_ZERO(&readers);
426 FD_ZERO(&writers);
427
428 n = ares_fds(_channel, &readers, &writers);
429
430 dns_log.trace("ares_fds: {}", n);
431
432 if (n == 0) {
433 break;
434 }
435
436 n = 0;
437
438 for (auto & p : _sockets) {
439 auto & e = p.second;
440 auto fd = p.first;
441 auto r = FD_ISSET(p.first, &readers);
442 auto w = FD_ISSET(p.first, &writers);
443 auto ra = e.avail & POLLIN;
444 auto wa = e.avail & POLLOUT;
445
446 dns_log.trace("fd {} {}{}/{}{}", fd, (r ? "r" : ""),
447 (w ? "w" : ""), (ra ? "r" : ""),
448 (wa ? "w" : ""));
449
450 if (!wa) {
451 FD_CLR(fd, &writers);
452 }
453 if (!ra) {
454 FD_CLR(fd, &readers);
455 }
456 if (FD_ISSET(fd, &writers) || FD_ISSET(fd, &readers)) {
457 ++n;
458 }
459 }
460
461 ares_process(_channel, &readers, &writers);
462 } while (n != 0);
463 }
464
465 static srv_records make_srv_records(ares_srv_reply* start) {
466 srv_records records;
467 for (auto reply = start; reply; reply = reply->next) {
468 srv_record record = {reply->priority,
469 reply->weight,
470 reply->port,
471 sstring{reply->host}};
472 records.push_back(std::move(record));
473 }
474 return records;
475 }
476
477 static hostent make_hostent(const ::hostent& host) {
478 hostent e;
479 e.names.emplace_back(host.h_name);
480 auto np = host.h_aliases;
481 while (*np != 0) {
482 e.names.emplace_back(*np++);
483 }
484 auto p = host.h_addr_list;
485 while (*p != nullptr) {
486 switch (host.h_addrtype) {
487 case AF_INET:
488 assert(size_t(host.h_length) >= sizeof(in_addr));
489 e.addr_list.emplace_back(*reinterpret_cast<const in_addr*>(*p));
490 break;
491 case AF_INET6:
492 assert(size_t(host.h_length) >= sizeof(in6_addr));
493 e.addr_list.emplace_back(*reinterpret_cast<const in6_addr*>(*p));
494 break;
495 default:
496 break;
497 }
498 ++p;
499 }
500
501 dns_log.debug("Query success: {}/{}", e.names.front(), e.addr_list.front());
502
503 return e;
504 }
505 // We need to partially ref-count our socket entries
506 // when we have pending reads/writes, so we don't erase the
507 // entry to early.
508 void use(ares_socket_t fd) {
509 _gate.enter();
510 auto& e = _sockets.at(fd);
511 ++e.pending;
512 }
513 void release(ares_socket_t fd) {
514 auto& e = _sockets.at(fd);
515 dns_log.trace("Release socket {} -> {}", fd, e.pending - 1);
516 if (--e.pending < 0) {
517 _sockets.erase(fd);
518 dns_log.trace("Released socket {}", fd);
519 }
520 _gate.leave();
521 }
522 ares_socket_t do_socket(int af, int type, int protocol) {
523 if (_closed) {
524 return -1;
525 }
526 int fd = next_fd();
527 switch (type) {
528 case SOCK_STREAM:
529 _sockets.emplace(fd, connected_socket());
530 dns_log.trace("Created tcp socket {}", fd);
531 break;
532 case SOCK_DGRAM:
533 _sockets.emplace(fd, _stack.make_udp_channel());
534 dns_log.trace("Created udp socket {}", fd);
535 break;
536 default: return -1;
537 }
538 return fd;
539 }
540 int do_close(ares_socket_t fd) {
541 dns_log.trace("Close socket {}", fd);
542 auto& e = _sockets.at(fd);
543
544 // Mark as closed.
545 if (std::exchange(e.closed, true)) {
546 return 0;
547 }
548
549 _gate.enter(); // "leave" is done in release(fd)
550
551 switch (e.typ) {
552 case type::tcp:
553 {
554 dns_log.trace("Close tcp socket {}, {} pending", fd, e.pending);
555 future<> f = make_ready_future();
556 if (e.tcp.in) {
557 e.tcp.socket.shutdown_input();
558 dns_log.trace("Closed tcp socket {} input", fd);
559 }
560 if (e.tcp.out) {
561 f = f.then([&e] {
562 return e.tcp.out->close();
563 }).then([fd] {
564 dns_log.trace("Closed tcp socket {} output", fd);
565 });
566 }
567 f = f.finally([me = shared_from_this(), fd] {
568 me->release(fd);
569 });
570 break;
571 }
572 case type::udp:
573 e.udp.channel.shutdown_input();
574 e.udp.channel.shutdown_output();
575 release(fd);
576 break;
577 default:
578 // should not happen
579 _gate.leave();
580 break;
581 }
582 return 0;
583 }
584 socket_address sock_addr(const sockaddr * addr, socklen_t len) {
585 if (addr->sa_family != AF_INET) {
586 throw std::invalid_argument("No ipv6 yet");
587 }
588 auto in = reinterpret_cast<const sockaddr_in *>(addr);
589 return *in;
590 }
591 int do_connect(ares_socket_t fd, const sockaddr * addr, socklen_t len) {
592 if (_closed) {
593 return -1;
594 }
595 try {
596 auto& e = get_socket_entry(fd);
597 auto sa = sock_addr(addr, len);
598
599 dns_log.trace("Connect {}({})->{}", fd, int(e.typ), sa);
600
601 assert(e.avail == 0);
602
603 e.avail = POLLOUT|POLLIN; // until we know otherwise
604
605 switch (e.typ) {
606 case type::tcp: {
607 auto f = _stack.connect(sa);
608 if (!f.available()) {
609 dns_log.trace("Connection pending: {}", fd);
610 e.avail = 0;
611 use(fd);
612 // FIXME: future is discarded
613 (void)f.then_wrapped([me = shared_from_this(), &e, fd](future<connected_socket> f) {
614 try {
615 e.tcp.socket = f.get0();
616 dns_log.trace("Connection complete: {}", fd);
617 } catch (...) {
618 dns_log.debug("Connect {} failed: {}", fd, std::current_exception());
619 }
620 e.avail = POLLOUT|POLLIN;
621 me->poll_sockets();
622 me->release(fd);
623 });
624 errno = EWOULDBLOCK;
625 return -1;
626 }
627 e.tcp.socket = f.get0();
628 break;
629 }
630 case type::udp:
631 // we do not have udp connect, so just keep
632 // track of the destination
633 e.udp.dst = sa;
634 break;
635 default:
636 return -1;
637 }
638 return 0;
639 } catch (...) {
640 return -1;
641 }
642 }
643 ssize_t do_recvfrom(ares_socket_t fd, void * dst, size_t len, int flags, struct sockaddr * from, socklen_t * from_len) {
644 if (_closed) {
645 return -1;
646 }
647 try {
648 auto& e = get_socket_entry(fd);
649 dns_log.trace("Read {}({})", fd, int(e.typ));
650 // check if we're already reading.
651 if (!(e.avail & POLLIN)) {
652 dns_log.trace("Read already pending {}", fd);
653 errno = EWOULDBLOCK;
654 return -1;
655 }
656 for (;;) {
657 switch (e.typ) {
658 case type::tcp: {
659 auto & tcp = e.tcp;
660 if (!tcp.indata.empty()) {
661 dns_log.trace("Read {}. {} bytes available", fd, tcp.indata.size());
662 len = std::min(len, tcp.indata.size());
663 std::copy(tcp.indata.begin(), tcp.indata.begin() + len, reinterpret_cast<char *>(dst));
664 tcp.indata.trim_front(len);
665 return len;
666 }
667 if (!tcp.in) {
668 tcp.in = tcp.socket.input();
669 }
670 auto f = tcp.in->read_up_to(len);
671 if (!f.available()) {
672 dns_log.trace("Read {}: data unavailable", fd);
673 e.avail &= ~POLLIN;
674 use(fd);
675 // FIXME: future is discarded
676 (void)f.then_wrapped([me = shared_from_this(), &e, fd](future<temporary_buffer<char>> f) {
677 try {
678 auto buf = f.get0();
679 dns_log.trace("Read {} -> {} bytes", fd, buf.size());
680 e.tcp.indata = std::move(buf);
681 } catch (...) {
682 dns_log.debug("Read {} failed: {}", fd, std::current_exception());
683 }
684 e.avail |= POLLIN; // always reset state
685 me->poll_sockets();
686 me->release(fd);
687 });
688 errno = EWOULDBLOCK;
689 return -1;
690 }
691
692 try {
693 tcp.indata = f.get0();
694 continue; // loop will take care of data
695 } catch (std::system_error& e) {
696 errno = e.code().value();
697 return -1;
698 } catch (...) {
699 }
700 return -1;
701
702 }
703 case type::udp: {
704 auto & udp = e.udp;
705 if (udp.in) {
706 auto & p = udp.in->get_data();
707
708 dns_log.trace("Read {}. {} bytes available from {}", fd, p.len(), udp.in->get_src());
709
710 if (from != nullptr) {
711 *from = socket_address(udp.in->get_src()).as_posix_sockaddr();
712 if (from_len != nullptr) {
713 // TODO: ipvv6
714 *from_len = sizeof(sockaddr_in);
715 }
716 }
717
718 len = std::min(len, size_t(p.len()));
719 size_t rem = len;
720 auto * out = reinterpret_cast<char *>(dst);
721 for (auto & f : p.fragments()) {
722 auto n = std::min(rem, f.size);
723 out = std::copy_n(f.base, n, out);
724 rem = rem - n;
725 }
726 if (p.len() == len) {
727 udp.in = {};
728 } else {
729 p.trim_front(len);
730 }
731 return len;
732 }
733 auto f = udp.channel.receive();
734 if (!f.available()) {
735 e.avail &= ~POLLIN;
736 use(fd);
737 dns_log.trace("Read {}: data unavailable", fd);
738 // FIXME: future is discarded
739 (void)f.then_wrapped([me = shared_from_this(), &e, fd](future<net::udp_datagram> f) {
740 try {
741 auto d = f.get0();
742 dns_log.trace("Read {} -> {} bytes", fd, d.get_data().len());
743 e.udp.in = std::move(d);
744 e.avail |= POLLIN;
745 } catch (...) {
746 dns_log.debug("Read {} failed: {}", fd, std::current_exception());
747 }
748 me->poll_sockets();
749 me->release(fd);
750 });
751 errno = EWOULDBLOCK;
752 return -1;
753 }
754
755 try {
756 udp.in = f.get0();
757 continue; // loop will take care of data
758 } catch (std::system_error& e) {
759 errno = e.code().value();
760 return -1;
761 } catch (...) {
762 }
763 return -1;
764 }
765 default:
766 return -1;
767 }
768 }
769 } catch (...) {
770 }
771 return -1;
772 }
773 ssize_t do_sendv(ares_socket_t fd, const iovec * vec, int len) {
774 if (_closed) {
775 return -1;
776 }
777 try {
778 auto& e = _sockets.at(fd);
779 dns_log.trace("Send {}({})", fd, int(e.typ));
780
781 // Assume we will be able to send data eventually very soon
782 // and just assume that unless we get immediate
783 // failures, we'll be ok. If we're not, the
784 // timeout logic will have to handle the problem.
785 //
786 // This saves us on two accounts:
787 // 1.) c-ares does not handle EWOULDBLOCK for
788 // udp sockets. Must pretend to finish
789 // immediately there anyway
790 // 2.) Doing so for tcp writes saves us having to
791 // match iovec->packet fragments. Downside is we
792 // have to copy the data, but we pretty much
793 // have to anyway, since we could otherwise
794 // get a query time out while we're sending
795 // with zero-copy and suddenly have freed
796 // memory in packets. Bad.
797
798
799 for (;;) {
800 // check if we're already writing.
801 if (e.typ == type::tcp && !(e.avail & POLLOUT)) {
802 dns_log.trace("Send already pending {}", fd);
803 errno = EWOULDBLOCK;
804 return -1;
805 }
806
807 net::packet p;
808 p.reserve(len);
809 for (int i = 0; i < len; ++i) {
810 p = net::packet(std::move(p), net::fragment{reinterpret_cast<char *>(vec[i].iov_base), vec[i].iov_len});
811 }
812
813 auto bytes = p.len();
814 auto f = make_ready_future();
815
816 switch (e.typ) {
817 case type::tcp:
818 if (!e.tcp.out) {
819 e.tcp.out = e.tcp.socket.output(0);
820 }
821 f = e.tcp.out->write(std::move(p));
822 break;
823 case type::udp:
824 f = e.udp.channel.send(e.udp.dst, std::move(p));
825 break;
826 default:
827 return -1;
828 }
829
830 if (!f.available()) {
831 dns_log.trace("Send {} unavailable.", fd);
832 e.avail &= ~POLLOUT;
833 use(fd);
834 // FIXME: future is discarded
835 (void)f.then_wrapped([me = shared_from_this(), &e, bytes, fd](future<> f) {
836 try {
837 f.get();
838 dns_log.trace("Send {}. {} bytes sent.", fd, bytes);
839 } catch (...) {
840 dns_log.debug("Send {} failed: {}", fd, std::current_exception());
841 }
842 e.avail |= POLLOUT;
843 me->poll_sockets();
844 me->release(fd);
845 });
846 // c-ares does _not_ use non-blocking retry for udp sockets. We just pretend
847 // all is fine even though we have no idea. Barring stack/adapter failure it
848 // is close to the same guarantee a "normal" message send would have anyway.
849 // For tcp we also pretend we're done, to make sure we don't have to deal with
850 // matching sent data
851 }
852 if (f.failed()) {
853 try {
854 f.get();
855 } catch (std::system_error& e) {
856 errno = e.code().value();
857 } catch (...) {
858 }
859 return -1;
860 }
861
862 return len;
863 }
864 } catch (...) {
865 }
866 return -1;
867 }
868
869 // Note: cannot use to much here, because fd_sets only handle
870 // ~1024 fd:s. Set to something below that in case you need to
871 // debug (maybe)
872 static constexpr ares_socket_t socket_offset = 1;
873
874 ares_socket_t next_fd() {
875 ares_socket_t fd = ares_socket_t(_sockets.size() + socket_offset);
876 while (_sockets.count(fd)) {
877 ++fd;
878 }
879 return fd;
880 }
881 struct tcp_entry {
882 tcp_entry(connected_socket s)
883 : socket(std::move(s)) {
884 }
885 ;
886 connected_socket socket;
887 std::optional<input_stream<char>> in;
888 std::optional<output_stream<char>> out;
889 temporary_buffer<char> indata;
890 };
891 struct udp_entry {
892 udp_entry(net::udp_channel c)
893 : channel(std::move(c)) {
894 }
895 net::udp_channel channel;
896 std::optional<net::udp_datagram> in;;
897 socket_address dst;
898 };
899 struct sock_entry {
900 union {
901 tcp_entry tcp;
902 udp_entry udp;
903 };
904 type typ;
905 int avail = 0;
906 int pending = 0;
907 bool closed = false;
908
909 sock_entry(sock_entry&& e)
910 : typ(e.typ)
911 , avail(e.avail)
912 {
913 e.typ = type::none;
914 switch (typ) {
915 case type::tcp:
916 tcp = std::move(e.tcp);
917 break;
918 case type::udp:
919 udp = std::move(e.udp);
920 break;
921 default:
922 break;
923 }
924 }
925 sock_entry(connected_socket s)
926 : tcp(tcp_entry{std::move(s)})
927 , typ(type::tcp)
928 {}
929 sock_entry(net::udp_channel c)
930 : udp(udp_entry{std::move(c)})
931 , typ(type::udp)
932 {}
933 ~sock_entry() {
934 switch (typ) {
935 case type::tcp: tcp.~tcp_entry(); break;
936 case type::udp: udp.~udp_entry(); break;
937 default: break;
938 }
939 }
940 };
941
942 sock_entry& get_socket_entry(ares_socket_t fd) {
943 auto& e = _sockets.at(fd);
944 if (e.closed) {
945 throw std::runtime_error("Socket closed");
946 }
947 return e;
948 }
949
950
951 typedef std::unordered_map<ares_socket_t, sock_entry> socket_map;
952
953 friend struct dns_call;
954
955 socket_map _sockets;
956 network_stack & _stack;
957
958 ares_channel _channel = {};
959 uint64_t _calls = 0;
960 std::chrono::milliseconds _timeout;
961 timer<> _timer;
962 gate _gate;
963 bool _closed = false;
964 };
965
966 net::dns_resolver::dns_resolver()
967 : dns_resolver(options())
968 {}
969
970 net::dns_resolver::dns_resolver(const options& opts)
971 : dns_resolver(engine().net(), opts)
972 {}
973
974 net::dns_resolver::dns_resolver(network_stack& stack, const options& opts)
975 : _impl(make_shared<impl>(stack, opts))
976 {}
977
978 net::dns_resolver::~dns_resolver()
979 {}
980
981 net::dns_resolver::dns_resolver(dns_resolver&&) noexcept = default;
982 net::dns_resolver& net::dns_resolver::operator=(dns_resolver&&) noexcept = default;
983
984 future<net::hostent> net::dns_resolver::get_host_by_name(const sstring& name, opt_family family) {
985 return _impl->get_host_by_name(name, family);
986 }
987
988 future<net::hostent> net::dns_resolver::get_host_by_addr(const inet_address& addr) {
989 return _impl->get_host_by_addr(addr);
990 }
991
992 future<net::inet_address> net::dns_resolver::resolve_name(const sstring& name, opt_family family) {
993 return _impl->resolve_name(name, family);
994 }
995
996 future<sstring> net::dns_resolver::resolve_addr(const inet_address& addr) {
997 return _impl->resolve_addr(addr);
998 }
999
1000 future<net::dns_resolver::srv_records> net::dns_resolver::get_srv_records(net::dns_resolver::srv_proto proto,
1001 const sstring& service,
1002 const sstring& domain) {
1003 return _impl->get_srv_records(proto, service, domain);
1004 }
1005
1006 future<> net::dns_resolver::close() {
1007 return _impl->close();
1008 }
1009
1010 static net::dns_resolver& resolver() {
1011 static thread_local net::dns_resolver resolver;
1012 return resolver;
1013 }
1014
1015
1016 future<net::hostent> net::dns::get_host_by_name(const sstring& name, opt_family family) {
1017 return resolver().get_host_by_name(name, family);
1018 }
1019
1020 future<net::hostent> net::dns::get_host_by_addr(const inet_address& addr) {
1021 return resolver().get_host_by_addr(addr);
1022 }
1023
1024 future<net::inet_address> net::dns::resolve_name(const sstring& name, opt_family family) {
1025 return resolver().resolve_name(name, family);
1026 }
1027
1028 future<sstring> net::dns::resolve_addr(const inet_address& addr) {
1029 return resolver().resolve_addr(addr);
1030 }
1031
1032 future<net::dns_resolver::srv_records> net::dns::get_srv_records(net::dns_resolver::srv_proto proto,
1033 const sstring& service,
1034 const sstring& domain) {
1035 return resolver().get_srv_records(proto, service, domain);
1036 }
1037
1038 future<sstring> net::inet_address::hostname() const {
1039 return dns::resolve_addr(*this);
1040 }
1041
1042 future<std::vector<sstring>> net::inet_address::aliases() const {
1043 return dns::get_host_by_addr(*this).then([](hostent e) {
1044 return make_ready_future<std::vector<sstring>>(std::move(e.names));
1045 });
1046 }
1047
1048 future<net::inet_address> net::inet_address::find(
1049 const sstring& name) {
1050 return dns::resolve_name(name);
1051 }
1052
1053 future<net::inet_address> net::inet_address::find(
1054 const sstring& name, family f) {
1055 return dns::resolve_name(name, f);
1056 }
1057
1058 future<std::vector<net::inet_address>> net::inet_address::find_all(
1059 const sstring& name) {
1060 return dns::get_host_by_name(name).then([](hostent e) {
1061 return make_ready_future<std::vector<net::inet_address>>(std::move(e.addr_list));
1062 });
1063 }
1064
1065 future<std::vector<net::inet_address>> net::inet_address::find_all(
1066 const sstring& name, family f) {
1067 return dns::get_host_by_name(name, f).then([](hostent e) {
1068 return make_ready_future<std::vector<net::inet_address>>(std::move(e.addr_list));
1069 });
1070 }
1071
1072 }