]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/net/posix-stack.cc
192eb3a81869ff1cccac0b6b7b7a515e6082ee8d
[ceph.git] / ceph / src / seastar / src / net / posix-stack.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22 #include <random>
23
24 #include <linux/if.h>
25 #include <linux/netlink.h>
26 #include <linux/rtnetlink.h>
27 #include <net/route.h>
28
29 #include <seastar/net/posix-stack.hh>
30 #include <seastar/net/net.hh>
31 #include <seastar/net/packet.hh>
32 #include <seastar/net/api.hh>
33 #include <seastar/net/inet_address.hh>
34 #include <seastar/util/std-compat.hh>
35 #include <netinet/tcp.h>
36 #include <netinet/sctp.h>
37
38 namespace std {
39
40 template <>
41 struct hash<seastar::net::posix_ap_server_socket_impl::protocol_and_socket_address> {
42 size_t operator()(const seastar::net::posix_ap_server_socket_impl::protocol_and_socket_address& t_sa) const {
43 auto h1 = std::hash<int>()(std::get<0>(t_sa));
44 auto h2 = std::hash<seastar::net::socket_address>()(std::get<1>(t_sa));
45 return h1 ^ h2;
46 }
47 };
48
49 }
50
51 namespace seastar {
52
53 namespace net {
54
55 using namespace seastar;
56
57 class posix_connected_socket_operations {
58 public:
59 virtual ~posix_connected_socket_operations() = default;
60 virtual void set_nodelay(file_desc& fd, bool nodelay) const = 0;
61 virtual bool get_nodelay(file_desc& fd) const = 0;
62 virtual void set_keepalive(file_desc& _fd, bool keepalive) const = 0;
63 virtual bool get_keepalive(file_desc& _fd) const = 0;
64 virtual void set_keepalive_parameters(file_desc& _fd, const keepalive_params& params) const = 0;
65 virtual keepalive_params get_keepalive_parameters(file_desc& _fd) const = 0;
66 };
67
68 thread_local posix_ap_server_socket_impl::sockets_map_t posix_ap_server_socket_impl::sockets{};
69 thread_local posix_ap_server_socket_impl::conn_map_t posix_ap_server_socket_impl::conn_q{};
70
71 class posix_tcp_connected_socket_operations : public posix_connected_socket_operations {
72 public:
73 virtual void set_nodelay(file_desc& _fd, bool nodelay) const override {
74 _fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, int(nodelay));
75 }
76 virtual bool get_nodelay(file_desc& _fd) const override {
77 return _fd.getsockopt<int>(IPPROTO_TCP, TCP_NODELAY);
78 }
79 virtual void set_keepalive(file_desc& _fd, bool keepalive) const override {
80 _fd.setsockopt(SOL_SOCKET, SO_KEEPALIVE, int(keepalive));
81 }
82 virtual bool get_keepalive(file_desc& _fd) const override {
83 return _fd.getsockopt<int>(SOL_SOCKET, SO_KEEPALIVE);
84 }
85 virtual void set_keepalive_parameters(file_desc& _fd, const keepalive_params& params) const override {
86 const tcp_keepalive_params& pms = compat::get<tcp_keepalive_params>(params);
87 _fd.setsockopt(IPPROTO_TCP, TCP_KEEPCNT, pms.count);
88 _fd.setsockopt(IPPROTO_TCP, TCP_KEEPIDLE, int(pms.idle.count()));
89 _fd.setsockopt(IPPROTO_TCP, TCP_KEEPINTVL, int(pms.interval.count()));
90 }
91 virtual keepalive_params get_keepalive_parameters(file_desc& _fd) const override {
92 return tcp_keepalive_params {
93 std::chrono::seconds(_fd.getsockopt<int>(IPPROTO_TCP, TCP_KEEPIDLE)),
94 std::chrono::seconds(_fd.getsockopt<int>(IPPROTO_TCP, TCP_KEEPINTVL)),
95 _fd.getsockopt<unsigned>(IPPROTO_TCP, TCP_KEEPCNT)
96 };
97 }
98 };
99
100 class posix_sctp_connected_socket_operations : public posix_connected_socket_operations {
101 public:
102 virtual void set_nodelay(file_desc& _fd, bool nodelay) const override {
103 _fd.setsockopt(SOL_SCTP, SCTP_NODELAY, int(nodelay));
104 }
105 virtual bool get_nodelay(file_desc& _fd) const override {
106 return _fd.getsockopt<int>(SOL_SCTP, SCTP_NODELAY);
107 }
108 virtual void set_keepalive(file_desc& _fd, bool keepalive) const override {
109 auto heartbeat = _fd.getsockopt<sctp_paddrparams>(SOL_SCTP, SCTP_PEER_ADDR_PARAMS);
110 if (keepalive) {
111 heartbeat.spp_flags |= SPP_HB_ENABLE;
112 } else {
113 heartbeat.spp_flags &= ~SPP_HB_ENABLE;
114 }
115 _fd.setsockopt(SOL_SCTP, SCTP_PEER_ADDR_PARAMS, heartbeat);
116 }
117 virtual bool get_keepalive(file_desc& _fd) const override {
118 return _fd.getsockopt<sctp_paddrparams>(SOL_SCTP, SCTP_PEER_ADDR_PARAMS).spp_flags & SPP_HB_ENABLE;
119 }
120 virtual void set_keepalive_parameters(file_desc& _fd, const keepalive_params& kpms) const override {
121 const sctp_keepalive_params& pms = compat::get<sctp_keepalive_params>(kpms);
122 auto params = _fd.getsockopt<sctp_paddrparams>(SOL_SCTP, SCTP_PEER_ADDR_PARAMS);
123 params.spp_hbinterval = pms.interval.count() * 1000; // in milliseconds
124 params.spp_pathmaxrxt = pms.count;
125 _fd.setsockopt(SOL_SCTP, SCTP_PEER_ADDR_PARAMS, params);
126 }
127 virtual keepalive_params get_keepalive_parameters(file_desc& _fd) const override {
128 auto params = _fd.getsockopt<sctp_paddrparams>(SOL_SCTP, SCTP_PEER_ADDR_PARAMS);
129 return sctp_keepalive_params {
130 std::chrono::seconds(params.spp_hbinterval/1000), // in seconds
131 params.spp_pathmaxrxt
132 };
133 }
134 };
135
136 class posix_unix_stream_connected_socket_operations : public posix_connected_socket_operations {
137 public:
138 virtual void set_nodelay(file_desc& fd, bool nodelay) const override {
139 assert(nodelay); // make sure nobody actually tries to use this non-existing functionality
140 }
141 virtual bool get_nodelay(file_desc& fd) const override {
142 return true;
143 }
144 virtual void set_keepalive(file_desc& fd, bool keepalive) const override {}
145 virtual bool get_keepalive(file_desc& fd) const override {
146 return false;
147 }
148 virtual void set_keepalive_parameters(file_desc& fd, const keepalive_params& p) const override {}
149 virtual keepalive_params get_keepalive_parameters(file_desc& fd) const override {
150 return keepalive_params{};
151 }
152 };
153
154 static const posix_connected_socket_operations*
155 get_posix_connected_socket_ops(sa_family_t family, int protocol) {
156 static posix_tcp_connected_socket_operations tcp_ops;
157 static posix_sctp_connected_socket_operations sctp_ops;
158 static posix_unix_stream_connected_socket_operations unix_ops;
159 switch (family) {
160 case AF_INET:
161 case AF_INET6:
162 switch (protocol) {
163 case IPPROTO_TCP: return &tcp_ops;
164 case IPPROTO_SCTP: return &sctp_ops;
165 default: abort();
166 }
167 case AF_UNIX:
168 return &unix_ops;
169 default:
170 abort();
171 }
172 }
173
174 class posix_connected_socket_impl final : public connected_socket_impl {
175 lw_shared_ptr<pollable_fd> _fd;
176 const posix_connected_socket_operations* _ops;
177 conntrack::handle _handle;
178 compat::polymorphic_allocator<char>* _allocator;
179 private:
180 explicit posix_connected_socket_impl(sa_family_t family, int protocol, lw_shared_ptr<pollable_fd> fd, compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) :
181 _fd(std::move(fd)), _ops(get_posix_connected_socket_ops(family, protocol)), _allocator(allocator) {}
182 explicit posix_connected_socket_impl(sa_family_t family, int protocol, lw_shared_ptr<pollable_fd> fd, conntrack::handle&& handle,
183 compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _fd(std::move(fd))
184 , _ops(get_posix_connected_socket_ops(family, protocol)), _handle(std::move(handle)), _allocator(allocator) {}
185 public:
186 virtual data_source source() override {
187 return data_source(std::make_unique< posix_data_source_impl>(_fd, _allocator));
188 }
189 virtual data_sink sink() override {
190 return data_sink(std::make_unique< posix_data_sink_impl>(_fd));
191 }
192 virtual void shutdown_input() override {
193 _fd->shutdown(SHUT_RD);
194 }
195 virtual void shutdown_output() override {
196 _fd->shutdown(SHUT_WR);
197 }
198 virtual void set_nodelay(bool nodelay) override {
199 return _ops->set_nodelay(_fd->get_file_desc(), nodelay);
200 }
201 virtual bool get_nodelay() const override {
202 return _ops->get_nodelay(_fd->get_file_desc());
203 }
204 void set_keepalive(bool keepalive) override {
205 return _ops->set_keepalive(_fd->get_file_desc(), keepalive);
206 }
207 bool get_keepalive() const override {
208 return _ops->get_keepalive(_fd->get_file_desc());
209 }
210 void set_keepalive_parameters(const keepalive_params& p) override {
211 return _ops->set_keepalive_parameters(_fd->get_file_desc(), p);
212 }
213 keepalive_params get_keepalive_parameters() const override {
214 return _ops->get_keepalive_parameters(_fd->get_file_desc());
215 }
216 friend class posix_server_socket_impl;
217 friend class posix_ap_server_socket_impl;
218 friend class posix_reuseport_server_socket_impl;
219 friend class posix_network_stack;
220 friend class posix_ap_network_stack;
221 friend class posix_socket_impl;
222 };
223
224 static void resolve_outgoing_address(socket_address& a) {
225 if (a.family() != AF_INET6
226 || a.as_posix_sockaddr_in6().sin6_scope_id != inet_address::invalid_scope
227 || !IN6_IS_ADDR_LINKLOCAL(&a.as_posix_sockaddr_in6().sin6_addr)
228 ) {
229 return;
230 }
231
232 FILE *f;
233
234 if (!(f = fopen("/proc/net/ipv6_route", "r"))) {
235 throw std::system_error(errno, std::system_category(), "resolve_address");
236 }
237
238 auto holder = std::unique_ptr<FILE, int(*)(FILE *)>(f, &::fclose);
239
240 /**
241 Here all configured IPv6 routes are shown in a special format. The example displays for loopback interface only. The meaning is shown below (see net/ipv6/route.c for more).
242
243 # cat /proc/net/ipv6_route
244 00000000000000000000000000000000 00 00000000000000000000000000000000 00 00000000000000000000000000000000 ffffffff 00000001 00000001 00200200 lo
245 +------------------------------+ ++ +------------------------------+ ++ +------------------------------+ +------+ +------+ +------+ +------+ ++
246 | | | | | | | | | |
247 1 2 3 4 5 6 7 8 9 10
248
249 1: IPv6 destination network displayed in 32 hexadecimal chars without colons as separator
250
251 2: IPv6 destination prefix length in hexadecimal
252
253 3: IPv6 source network displayed in 32 hexadecimal chars without colons as separator
254
255 4: IPv6 source prefix length in hexadecimal
256
257 5: IPv6 next hop displayed in 32 hexadecimal chars without colons as separator
258
259 6: Metric in hexadecimal
260
261 7: Reference counter
262
263 8: Use counter
264
265 9: Flags
266
267 10: Device name
268
269 */
270
271 uint32_t prefix_len, src_prefix_len;
272 unsigned long flags;
273 char device[16];
274 char dest_str[40];
275
276 for (;;) {
277 auto n = fscanf(f, "%4s%4s%4s%4s%4s%4s%4s%4s %02x "
278 "%*4s%*4s%*4s%*4s%*4s%*4s%*4s%*4s %02x "
279 "%*4s%*4s%*4s%*4s%*4s%*4s%*4s%*4s "
280 "%*08x %*08x %*08x %08lx %8s",
281 &dest_str[0], &dest_str[5], &dest_str[10], &dest_str[15],
282 &dest_str[20], &dest_str[25], &dest_str[30], &dest_str[35],
283 &prefix_len,
284 &src_prefix_len,
285 &flags, device);
286 if (n != 12) {
287 break;
288 }
289
290 if ((prefix_len > 128) || (src_prefix_len != 0)
291 || (flags & (RTF_POLICY | RTF_FLOW))
292 || ((flags & RTF_REJECT) && prefix_len == 0) /* reject all */) {
293 continue;
294 }
295
296 dest_str[4] = dest_str[9] = dest_str[14] = dest_str[19] = dest_str[24] = dest_str[29] = dest_str[34] = ':';
297 dest_str[39] = '\0';
298
299 struct in6_addr addr;
300 if (inet_pton(AF_INET6, dest_str, &addr) < 0) {
301 /* not an Ipv6 address */
302 continue;
303 }
304
305 auto bytes = prefix_len / 8;
306 auto bits = prefix_len % 8;
307
308 auto& src = a.as_posix_sockaddr_in6().sin6_addr;
309
310 if (bytes > 0 && memcmp(&src, &addr, bytes)) {
311 continue;
312 }
313 if (bits > 0) {
314 auto c1 = src.s6_addr[bytes];
315 auto c2 = addr.s6_addr[bytes];
316 auto mask = 0xffu << (8 - bits);
317 if ((c1 & mask) != (c2 & mask)) {
318 continue;
319 }
320 }
321
322 // found the route.
323 for (auto& nif : engine().net().network_interfaces()) {
324 if (nif.name() == device || nif.display_name() == device) {
325 a.as_posix_sockaddr_in6().sin6_scope_id = nif.index();
326 return;
327 }
328 }
329 }
330 }
331
332 class posix_socket_impl final : public socket_impl {
333 lw_shared_ptr<pollable_fd> _fd;
334 compat::polymorphic_allocator<char>* _allocator;
335 bool _reuseaddr = false;
336
337 future<> find_port_and_connect(socket_address sa, socket_address local, transport proto = transport::TCP) {
338 static thread_local std::default_random_engine random_engine{std::random_device{}()};
339 static thread_local std::uniform_int_distribution<uint16_t> u(49152/smp::count + 1, 65535/smp::count - 1);
340 // If no explicit local address, set to dest address family wildcard.
341 if (local.is_unspecified()) {
342 local = net::inet_address(sa.addr().in_family());
343 }
344 resolve_outgoing_address(sa);
345 return repeat([this, sa, local, proto, attempts = 0, requested_port = ntoh(local.as_posix_sockaddr_in().sin_port)] () mutable {
346 _fd = engine().make_pollable_fd(sa, int(proto));
347 _fd->get_file_desc().setsockopt(SOL_SOCKET, SO_REUSEADDR, int(_reuseaddr));
348 uint16_t port = attempts++ < 5 && requested_port == 0 && proto == transport::TCP ? u(random_engine) * smp::count + engine().cpu_id() : requested_port;
349 local.as_posix_sockaddr_in().sin_port = hton(port);
350 return futurize_apply([this, sa, local] { return engine().posix_connect(_fd, sa, local); }).then_wrapped([port, requested_port] (future<> f) {
351 try {
352 f.get();
353 return stop_iteration::yes;
354 } catch (std::system_error& err) {
355 if (port != requested_port && (err.code().value() == EADDRINUSE || err.code().value() == EADDRNOTAVAIL)) {
356 return stop_iteration::no;
357 }
358 throw;
359 }
360 });
361 });
362 }
363
364 /// an aux function to handle unix-domain-specific requests
365 future<connected_socket> connect_unix_domain(socket_address sa, socket_address local) {
366 // note that if the 'local' address was not set by the client, it was created as an undefined address
367 if (local.is_unspecified()) {
368 local = socket_address{unix_domain_addr{std::string{}}};
369 }
370
371 _fd = engine().make_pollable_fd(sa, 0);
372 return engine().posix_connect(_fd, sa, local).then(
373 [fd = _fd, allocator = _allocator](){
374 // a problem with 'private' interaction with 'unique_ptr'
375 std::unique_ptr<connected_socket_impl> csi;
376 csi.reset(new posix_connected_socket_impl{AF_UNIX, 0, std::move(fd), allocator});
377 return make_ready_future<connected_socket>(connected_socket(std::move(csi)));
378 }
379 );
380 }
381
382 public:
383 explicit posix_socket_impl(compat::polymorphic_allocator<char>* allocator=memory::malloc_allocator) : _allocator(allocator) {}
384
385 virtual future<connected_socket> connect(socket_address sa, socket_address local, transport proto = transport::TCP) override {
386 if (sa.is_af_unix()) {
387 return connect_unix_domain(sa, local);
388 }
389 return find_port_and_connect(sa, local, proto).then([this, sa, proto, allocator = _allocator] () mutable {
390 std::unique_ptr<connected_socket_impl> csi;
391 csi.reset(new posix_connected_socket_impl(sa.family(), static_cast<int>(proto), _fd, allocator));
392 return make_ready_future<connected_socket>(connected_socket(std::move(csi)));
393 });
394 }
395
396 void set_reuseaddr(bool reuseaddr) override {
397 _reuseaddr = reuseaddr;
398 if (_fd) {
399 _fd->get_file_desc().setsockopt(SOL_SOCKET, SO_REUSEADDR, int(reuseaddr));
400 }
401 }
402
403 bool get_reuseaddr() const override {
404 if(_fd) {
405 return _fd->get_file_desc().getsockopt<int>(SOL_SOCKET, SO_REUSEADDR);
406 } else {
407 return _reuseaddr;
408 }
409 }
410
411 virtual void shutdown() override {
412 if (_fd) {
413 try {
414 _fd->shutdown(SHUT_RDWR);
415 } catch (std::system_error& e) {
416 if (e.code().value() != ENOTCONN) {
417 throw;
418 }
419 }
420 }
421 }
422 };
423
424 future<accept_result>
425 posix_server_socket_impl::accept() {
426 return _lfd.accept().then([this] (std::tuple<pollable_fd, socket_address> fd_sa) {
427 auto& fd = std::get<0>(fd_sa);
428 auto& sa = std::get<1>(fd_sa);
429 auto cth = [this, &sa] {
430 switch(_lba) {
431 case server_socket::load_balancing_algorithm::connection_distribution:
432 return _conntrack.get_handle();
433 case server_socket::load_balancing_algorithm::port:
434 return _conntrack.get_handle(ntoh(sa.as_posix_sockaddr_in().sin_port) % smp::count);
435 case server_socket::load_balancing_algorithm::fixed:
436 return _conntrack.get_handle(_fixed_cpu);
437 default: abort();
438 }
439 } ();
440 auto cpu = cth.cpu();
441 if (cpu == engine().cpu_id()) {
442 std::unique_ptr<connected_socket_impl> csi(
443 new posix_connected_socket_impl(sa.family(), _protocol, make_lw_shared(std::move(fd)), std::move(cth), _allocator));
444 return make_ready_future<accept_result>(
445 accept_result{connected_socket(std::move(csi)), sa});
446 } else {
447 // FIXME: future is discarded
448 (void)smp::submit_to(cpu, [protocol = _protocol, ssa = _sa, fd = std::move(fd.get_file_desc()), sa, cth = std::move(cth), allocator = _allocator] () mutable {
449 posix_ap_server_socket_impl::move_connected_socket(protocol, ssa, pollable_fd(std::move(fd)), sa, std::move(cth), allocator);
450 });
451 return accept();
452 }
453 });
454 }
455
456 void
457 posix_server_socket_impl::abort_accept() {
458 _lfd.abort_reader();
459 }
460
461 socket_address posix_server_socket_impl::local_address() const {
462 return _lfd.get_file_desc().get_address();
463 }
464
465 future<accept_result> posix_ap_server_socket_impl::accept() {
466 auto t_sa = std::make_tuple(_protocol, _sa);
467 auto conni = conn_q.find(t_sa);
468 if (conni != conn_q.end()) {
469 connection c = std::move(conni->second);
470 conn_q.erase(conni);
471 try {
472 std::unique_ptr<connected_socket_impl> csi(
473 new posix_connected_socket_impl(_sa.family(), _protocol, make_lw_shared(std::move(c.fd)), std::move(c.connection_tracking_handle), _allocator));
474 return make_ready_future<accept_result>(accept_result{connected_socket(std::move(csi)), std::move(c.addr)});
475 } catch (...) {
476 return make_exception_future<accept_result>(std::current_exception());
477 }
478 } else {
479 try {
480 auto i = sockets.emplace(std::piecewise_construct, std::make_tuple(t_sa), std::make_tuple());
481 assert(i.second);
482 return i.first->second.get_future();
483 } catch (...) {
484 return make_exception_future<accept_result>(std::current_exception());
485 }
486 }
487 }
488
489 void
490 posix_ap_server_socket_impl::abort_accept() {
491 auto t_sa = std::make_tuple(_protocol, _sa);
492 conn_q.erase(t_sa);
493 auto i = sockets.find(t_sa);
494 if (i != sockets.end()) {
495 i->second.set_exception(std::system_error(ECONNABORTED, std::system_category()));
496 sockets.erase(i);
497 }
498 }
499
500 future<accept_result>
501 posix_reuseport_server_socket_impl::accept() {
502 return _lfd.accept().then([allocator = _allocator, protocol = _protocol] (std::tuple<pollable_fd, socket_address> fd_sa) {
503 auto& fd = std::get<0>(fd_sa);
504 auto& sa = std::get<1>(fd_sa);
505 std::unique_ptr<connected_socket_impl> csi(
506 new posix_connected_socket_impl(sa.family(), protocol, make_lw_shared(std::move(fd)), allocator));
507 return make_ready_future<accept_result>(
508 accept_result{connected_socket(std::move(csi)), sa});
509 });
510 }
511
512 void
513 posix_reuseport_server_socket_impl::abort_accept() {
514 _lfd.abort_reader();
515 }
516
517 socket_address posix_reuseport_server_socket_impl::local_address() const {
518 return _lfd.get_file_desc().get_address();
519 }
520
521 void
522 posix_ap_server_socket_impl::move_connected_socket(int protocol, socket_address sa, pollable_fd fd, socket_address addr, conntrack::handle cth, compat::polymorphic_allocator<char>* allocator) {
523 auto t_sa = std::make_tuple(protocol, sa);
524 auto i = sockets.find(t_sa);
525 if (i != sockets.end()) {
526 try {
527 std::unique_ptr<connected_socket_impl> csi(new posix_connected_socket_impl(sa.family(), protocol, make_lw_shared(std::move(fd)), std::move(cth), allocator));
528 i->second.set_value(accept_result{connected_socket(std::move(csi)), std::move(addr)});
529 } catch (...) {
530 i->second.set_exception(std::current_exception());
531 }
532 sockets.erase(i);
533 } else {
534 conn_q.emplace(std::piecewise_construct, std::make_tuple(t_sa), std::make_tuple(std::move(fd), std::move(addr), std::move(cth)));
535 }
536 }
537
538 future<temporary_buffer<char>>
539 posix_data_source_impl::get() {
540 return _fd->read_some(_buf.get_write(), _buf_size).then([this] (size_t size) {
541 _buf.trim(size);
542 auto ret = std::move(_buf);
543 _buf = make_temporary_buffer<char>(_buffer_allocator, _buf_size);
544 return make_ready_future<temporary_buffer<char>>(std::move(ret));
545 });
546 }
547
548 future<> posix_data_source_impl::close() {
549 _fd->shutdown(SHUT_RD);
550 return make_ready_future<>();
551 }
552
553 std::vector<struct iovec> to_iovec(const packet& p) {
554 std::vector<struct iovec> v;
555 v.reserve(p.nr_frags());
556 for (auto&& f : p.fragments()) {
557 v.push_back({.iov_base = f.base, .iov_len = f.size});
558 }
559 return v;
560 }
561
562 std::vector<iovec> to_iovec(std::vector<temporary_buffer<char>>& buf_vec) {
563 std::vector<iovec> v;
564 v.reserve(buf_vec.size());
565 for (auto& buf : buf_vec) {
566 v.push_back({.iov_base = buf.get_write(), .iov_len = buf.size()});
567 }
568 return v;
569 }
570
571 future<>
572 posix_data_sink_impl::put(temporary_buffer<char> buf) {
573 return _fd->write_all(buf.get(), buf.size()).then([d = buf.release()] {});
574 }
575
576 future<>
577 posix_data_sink_impl::put(packet p) {
578 _p = std::move(p);
579 return _fd->write_all(_p).then([this] { _p.reset(); });
580 }
581
582 future<>
583 posix_data_sink_impl::close() {
584 _fd->shutdown(SHUT_WR);
585 return make_ready_future<>();
586 }
587
588 server_socket
589 posix_network_stack::listen(socket_address sa, listen_options opt) {
590 using server_socket = seastar::api_v2::server_socket;
591 // allow unspecified bind address -> default to ipv4 wildcard
592 if (sa.is_unspecified()) {
593 sa = inet_address(inet_address::family::INET);
594 }
595 if (sa.is_af_unix()) {
596 return server_socket(std::make_unique<posix_server_socket_impl>(0, sa, engine().posix_listen(sa, opt), opt.lba, opt.fixed_cpu, _allocator));
597 }
598 auto protocol = static_cast<int>(opt.proto);
599 return _reuseport ?
600 server_socket(std::make_unique<posix_reuseport_server_socket_impl>(protocol, sa, engine().posix_listen(sa, opt), _allocator))
601 :
602 server_socket(std::make_unique<posix_server_socket_impl>(protocol, sa, engine().posix_listen(sa, opt), opt.lba, opt.fixed_cpu, _allocator));
603 }
604
605 ::seastar::socket posix_network_stack::socket() {
606 return ::seastar::socket(std::make_unique<posix_socket_impl>(_allocator));
607 }
608
609 server_socket
610 posix_ap_network_stack::listen(socket_address sa, listen_options opt) {
611 using server_socket = seastar::api_v2::server_socket;
612 // allow unspecified bind address -> default to ipv4 wildcard
613 if (sa.is_unspecified()) {
614 sa = inet_address(inet_address::family::INET);
615 }
616 if (sa.is_af_unix()) {
617 return server_socket(std::make_unique<posix_ap_server_socket_impl>(0, sa, _allocator));
618 }
619 auto protocol = static_cast<int>(opt.proto);
620 return _reuseport ?
621 server_socket(std::make_unique<posix_reuseport_server_socket_impl>(protocol, sa, engine().posix_listen(sa, opt), _allocator))
622 :
623 server_socket(std::make_unique<posix_ap_server_socket_impl>(protocol, sa, _allocator));
624 }
625
626 struct cmsg_with_pktinfo {
627 struct cmsghdrcmh;
628 union {
629 struct in_pktinfo pktinfo;
630 struct in6_pktinfo pkt6info;
631 };
632 };
633
634 class posix_udp_channel : public udp_channel_impl {
635 private:
636 static constexpr int MAX_DATAGRAM_SIZE = 65507;
637 struct recv_ctx {
638 struct msghdr _hdr;
639 struct iovec _iov;
640 socket_address _src_addr;
641 char* _buffer;
642 cmsg_with_pktinfo _cmsg;
643
644 recv_ctx() {
645 memset(&_hdr, 0, sizeof(_hdr));
646 _hdr.msg_iov = &_iov;
647 _hdr.msg_iovlen = 1;
648 _hdr.msg_name = &_src_addr.u.sa;
649 _hdr.msg_namelen = sizeof(_src_addr.u.sas);
650 memset(&_cmsg, 0, sizeof(_cmsg));
651 _hdr.msg_control = &_cmsg;
652 _hdr.msg_controllen = sizeof(_cmsg);
653 }
654
655 void prepare() {
656 _buffer = new char[MAX_DATAGRAM_SIZE];
657 _iov.iov_base = _buffer;
658 _iov.iov_len = MAX_DATAGRAM_SIZE;
659 }
660 };
661 struct send_ctx {
662 struct msghdr _hdr;
663 std::vector<struct iovec> _iovecs;
664 socket_address _dst;
665 packet _p;
666
667 send_ctx() {
668 memset(&_hdr, 0, sizeof(_hdr));
669 _hdr.msg_name = &_dst.u.sa;
670 _hdr.msg_namelen = sizeof(_dst.u.sas);
671 }
672
673 void prepare(const socket_address& dst, packet p) {
674 _dst = dst;
675 _p = std::move(p);
676 _iovecs = to_iovec(_p);
677 _hdr.msg_iov = _iovecs.data();
678 _hdr.msg_iovlen = _iovecs.size();
679 resolve_outgoing_address(_dst);
680 }
681 };
682 std::unique_ptr<pollable_fd> _fd;
683 socket_address _address;
684 recv_ctx _recv;
685 send_ctx _send;
686 bool _closed;
687 public:
688 posix_udp_channel(const socket_address& bind_address)
689 : _closed(false) {
690 auto sa = bind_address.is_unspecified() ? socket_address(inet_address(inet_address::family::INET)) : bind_address;
691 file_desc fd = file_desc::socket(sa.u.sa.sa_family, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
692 fd.setsockopt(SOL_IP, IP_PKTINFO, true);
693 if (engine().posix_reuseport_available()) {
694 fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);
695 }
696 fd.bind(sa.u.sa, sizeof(sa.u.sas));
697 _address = fd.get_address();
698 _fd = std::make_unique<pollable_fd>(std::move(fd));
699 }
700 virtual ~posix_udp_channel() { if (!_closed) close(); };
701 virtual future<udp_datagram> receive() override;
702 virtual future<> send(const socket_address& dst, const char *msg) override;
703 virtual future<> send(const socket_address& dst, packet p) override;
704 virtual void shutdown_input() override {
705 _fd->abort_reader();
706 }
707 virtual void shutdown_output() override {
708 _fd->abort_writer();
709 }
710 virtual void close() override {
711 _closed = true;
712 _fd.reset();
713 }
714 virtual bool is_closed() const override { return _closed; }
715 socket_address local_address() const override {
716 assert(_address.u.sas.ss_family != AF_INET6 || (_address.addr_length > 20));
717 return _address;
718 }
719 };
720
721 future<> posix_udp_channel::send(const socket_address& dst, const char *message) {
722 auto len = strlen(message);
723 auto a = dst;
724 resolve_outgoing_address(a);
725 return _fd->sendto(a, message, len)
726 .then([len] (size_t size) { assert(size == len); });
727 }
728
729 future<> posix_udp_channel::send(const socket_address& dst, packet p) {
730 auto len = p.len();
731 _send.prepare(dst, std::move(p));
732 return _fd->sendmsg(&_send._hdr)
733 .then([len] (size_t size) { assert(size == len); });
734 }
735
736 udp_channel
737 posix_network_stack::make_udp_channel(const socket_address& addr) {
738 return udp_channel(std::make_unique<posix_udp_channel>(addr));
739 }
740
741 bool
742 posix_network_stack::supports_ipv6() const {
743 static bool has_ipv6 = [] {
744 try {
745 posix_udp_channel c(ipv6_addr{"::1"});
746 c.close();
747 return true;
748 } catch (...) {}
749 return false;
750 }();
751
752 return has_ipv6;
753 }
754
755 class posix_datagram : public udp_datagram_impl {
756 private:
757 socket_address _src;
758 socket_address _dst;
759 packet _p;
760 public:
761 posix_datagram(const socket_address& src, const socket_address& dst, packet p) : _src(src), _dst(dst), _p(std::move(p)) {}
762 virtual socket_address get_src() override { return _src; }
763 virtual socket_address get_dst() override { return _dst; }
764 virtual uint16_t get_dst_port() override { return _dst.port(); }
765 virtual packet& get_data() override { return _p; }
766 };
767
768 future<udp_datagram>
769 posix_udp_channel::receive() {
770 _recv.prepare();
771 return _fd->recvmsg(&_recv._hdr).then([this] (size_t size) {
772 socket_address dst;
773 for (auto* cmsg = CMSG_FIRSTHDR(&_recv._hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&_recv._hdr, cmsg)) {
774 if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
775 dst = ipv4_addr(reinterpret_cast<const in_pktinfo*>(CMSG_DATA(cmsg))->ipi_addr, _address.port());
776 break;
777 } else if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) {
778 dst = ipv6_addr(reinterpret_cast<const in6_pktinfo*>(CMSG_DATA(cmsg))->ipi6_addr, _address.port());
779 break;
780 }
781 }
782 return make_ready_future<udp_datagram>(udp_datagram(std::make_unique<posix_datagram>(
783 _recv._src_addr, dst, packet(fragment{_recv._buffer, size}, make_deleter([buf = _recv._buffer] { delete[] buf; })))));
784 }).handle_exception([p = _recv._buffer](auto ep) {
785 delete[] p;
786 return make_exception_future<udp_datagram>(std::move(ep));
787 });
788 }
789
790 void register_posix_stack() {
791 register_network_stack("posix", boost::program_options::options_description(),
792 [](boost::program_options::variables_map ops) {
793 return smp::main_thread() ? posix_network_stack::create(ops)
794 : posix_ap_network_stack::create(ops);
795 },
796 true);
797 }
798
799 // nw interface stuff
800
801 std::vector<network_interface> posix_network_stack::network_interfaces() {
802 class posix_network_interface_impl final : public network_interface_impl {
803 public:
804 uint32_t _index = 0, _mtu = 0;
805 sstring _name, _display_name;
806 std::vector<net::inet_address> _addresses;
807 std::vector<uint8_t> _hardware_address;
808 bool _loopback = false, _virtual = false, _up = false;
809
810 uint32_t index() const override {
811 return _index;
812 }
813 uint32_t mtu() const override {
814 return _mtu;
815 }
816 const sstring& name() const override {
817 return _name;
818 }
819 const sstring& display_name() const override {
820 return _display_name.empty() ? name() : _display_name;
821 }
822 const std::vector<net::inet_address>& addresses() const override {
823 return _addresses;
824 }
825 const std::vector<uint8_t> hardware_address() const override {
826 return _hardware_address;
827 }
828 bool is_loopback() const override {
829 return _loopback;
830 }
831 bool is_virtual() const override {
832 return _virtual;
833 }
834 bool is_up() const override {
835 // TODO: should be checked on query?
836 return _up;
837 }
838 bool supports_ipv6() const override {
839 // TODO: this is not 100% correct.
840 return std::any_of(_addresses.begin(), _addresses.end(), std::mem_fn(&inet_address::is_ipv6));
841 }
842 };
843
844 // For now, keep an immutable set of interfaces created on start, shared across
845 // shards
846 static const std::vector<posix_network_interface_impl> global_interfaces = [] {
847 auto fd = ::socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE);
848 throw_system_error_on(fd < 0, "could not open netlink socket");
849
850 std::unique_ptr<int, void(*)(int*)> fd_guard(&fd, [](int* p) { ::close(*p); });
851
852 auto pid = ::getpid();
853
854 sockaddr_nl local = { 0, };
855 local.nl_family = AF_NETLINK;
856 local.nl_pid = pid;
857 local.nl_groups = RTMGRP_IPV6_IFADDR|RTMGRP_IPV4_IFADDR;
858
859 throw_system_error_on(bind(fd, (struct sockaddr *) &local, sizeof(local)) < 0, "could not bind netlink socket");
860
861 /* RTNL socket is ready for use, prepare and send requests */
862
863 std::vector<posix_network_interface_impl> res;
864
865 for (auto msg : { RTM_GETLINK, RTM_GETADDR}) {
866 struct nl_req {
867 nlmsghdr hdr;
868 union {
869 rtgenmsg gen;
870 ifaddrmsg addr;
871 };
872 } req = { {0}, };
873
874 sockaddr_nl kernel = { 0, };
875 msghdr rtnl_msg = { 0, };
876
877 kernel.nl_family = AF_NETLINK; /* fill-in kernel address (destination of our message) */
878
879 req.hdr.nlmsg_len = NLMSG_LENGTH(sizeof(struct rtgenmsg));
880 req.hdr.nlmsg_type = msg;
881 req.hdr.nlmsg_flags = NLM_F_REQUEST | NLM_F_ROOT;
882 req.hdr.nlmsg_seq = 1;
883 req.hdr.nlmsg_pid = pid;
884
885 if (msg == RTM_GETLINK) {
886 req.gen.rtgen_family = AF_PACKET; /* no preferred AF, we will get *all* interfaces */
887 } else {
888 req.addr.ifa_family = AF_UNSPEC;
889 }
890
891 iovec io;
892
893 io.iov_base = &req;
894 io.iov_len = req.hdr.nlmsg_len;
895
896 rtnl_msg.msg_iov = &io;
897 rtnl_msg.msg_iovlen = 1;
898 rtnl_msg.msg_name = &kernel;
899 rtnl_msg.msg_namelen = sizeof(kernel);
900
901 throw_system_error_on(::sendmsg(fd, (struct msghdr *) &rtnl_msg, 0) < 0, "could not send netlink request");
902 /* parse reply */
903
904 constexpr size_t reply_buffer_size = 8192;
905 char reply[reply_buffer_size];
906
907 bool done = false;
908
909 while (!done) {
910 msghdr rtnl_reply = { 0, };
911 iovec io_reply = { 0, };
912
913 io_reply.iov_base = reply;
914 io_reply.iov_len = reply_buffer_size;
915 rtnl_reply.msg_iov = &io_reply;
916 rtnl_reply.msg_iovlen = 1;
917 rtnl_reply.msg_name = &kernel;
918 rtnl_reply.msg_namelen = sizeof(kernel);
919
920 auto len = ::recvmsg(fd, &rtnl_reply, 0); /* read as much data as fits in the receive buffer */
921 if (len <= 0) {
922 return res;
923 }
924
925 for (auto* msg_ptr = (struct nlmsghdr *) reply; NLMSG_OK(msg_ptr, len); msg_ptr = NLMSG_NEXT(msg_ptr, len)) {
926 switch(msg_ptr->nlmsg_type) {
927 case NLMSG_DONE: // that is all
928 done = true;
929 break;
930 case RTM_NEWLINK:
931 {
932 auto* iface = reinterpret_cast<const ifinfomsg*>(NLMSG_DATA(msg_ptr));
933 auto ilen = msg_ptr->nlmsg_len - NLMSG_LENGTH(sizeof(ifinfomsg));
934
935 // todo: filter any non-network interfaces (family)
936
937 posix_network_interface_impl nwif;
938
939 nwif._index = iface->ifi_index;
940 nwif._loopback = (iface->ifi_flags & IFF_LOOPBACK) != 0;
941 nwif._up = (iface->ifi_flags & IFF_UP) != 0;
942 #if defined(IFF_802_1Q_VLAN) && defined(IFF_EBRIDGE) && defined(IFF_SLAVE_INACTIVE)
943 nwif._virtual = (iface->ifi_flags & (IFF_802_1Q_VLAN|IFF_EBRIDGE|IFF_SLAVE_INACTIVE)) != 0;
944 #endif
945 for (auto* attribute = IFLA_RTA(iface); RTA_OK(attribute, ilen); attribute = RTA_NEXT(attribute, ilen)) {
946 switch(attribute->rta_type) {
947 case IFLA_IFNAME:
948 nwif._name = reinterpret_cast<const char *>(RTA_DATA(attribute));
949 break;
950 case IFLA_MTU:
951 nwif._mtu = *reinterpret_cast<const uint32_t *>(RTA_DATA(attribute));
952 break;
953 case IFLA_ADDRESS:
954 nwif._hardware_address.assign(reinterpret_cast<const uint8_t *>(RTA_DATA(attribute)), reinterpret_cast<const uint8_t *>(RTA_DATA(attribute)) + RTA_PAYLOAD(attribute));
955 break;
956 default:
957 break;
958 }
959 }
960
961 res.emplace_back(std::move(nwif));
962
963 break;
964 }
965 case RTM_NEWADDR:
966 {
967 auto* addr = reinterpret_cast<const ifaddrmsg*>(NLMSG_DATA(msg_ptr));
968 auto ilen = msg_ptr->nlmsg_len - NLMSG_LENGTH(sizeof(ifaddrmsg));
969
970 for (auto& nwif : res) {
971 if (nwif._index == addr->ifa_index) {
972 for (auto* attribute = IFA_RTA(addr); RTA_OK(attribute, ilen); attribute = RTA_NEXT(attribute, ilen)) {
973 compat::optional<inet_address> ia;
974
975 switch(attribute->rta_type) {
976 case IFA_LOCAL:
977 case IFA_ADDRESS: // ipv6 addresses are reported only as "ADDRESS"
978
979 if (RTA_PAYLOAD(attribute) == sizeof(::in_addr)) {
980 ia.emplace(*reinterpret_cast<const ::in_addr *>(RTA_DATA(attribute)));
981 } else if (RTA_PAYLOAD(attribute) == sizeof(::in6_addr)) {
982 ia.emplace(*reinterpret_cast<const ::in6_addr *>(RTA_DATA(attribute)), nwif.index());
983 }
984
985 if (ia && std::find(nwif._addresses.begin(), nwif._addresses.end(), *ia) == nwif._addresses.end()) {
986 nwif._addresses.emplace_back(*ia);
987 }
988
989 break;
990 default:
991 break;
992 }
993 }
994
995 break;
996 }
997 }
998 }
999 default:
1000 break;
1001 }
1002 }
1003 }
1004 }
1005
1006 return res;
1007 }();
1008
1009 // And a similarly immutable set of shared_ptr to network_interface_impl per shard, ready
1010 // to be handed out to callers with minimal overhead
1011 static const thread_local std::vector<shared_ptr<posix_network_interface_impl>> thread_local_interfaces = [] {
1012 std::vector<shared_ptr<posix_network_interface_impl>> res;
1013 res.reserve(global_interfaces.size());
1014 std::transform(global_interfaces.begin(), global_interfaces.end(), std::back_inserter(res), [](const posix_network_interface_impl& impl) {
1015 return make_shared<posix_network_interface_impl>(impl);
1016 });
1017 return res;
1018 }();
1019
1020 return std::vector<network_interface>(thread_local_interfaces.begin(), thread_local_interfaces.end());
1021 }
1022
1023 }
1024
1025 }