]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/net/udp.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / net / udp.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21
22 #include <seastar/net/ip.hh>
23 #include <seastar/net/stack.hh>
24 #include <seastar/net/inet_address.hh>
25
26 namespace seastar {
27
28 using namespace net;
29
30 namespace net {
31 namespace ipv4_udp_impl {
32
33 static inline
34 ipv4_addr
35 to_ipv4_addr(ipv4_address a, uint16_t port) {
36 return {a.ip, port};
37 }
38
39 class native_datagram : public udp_datagram_impl {
40 private:
41 ipv4_addr _src;
42 ipv4_addr _dst;
43 packet _p;
44 public:
45 native_datagram(ipv4_address src, ipv4_address dst, packet p)
46 : _p(std::move(p)) {
47 udp_hdr* hdr = _p.get_header<udp_hdr>();
48 auto h = ntoh(*hdr);
49 _p.trim_front(sizeof(*hdr));
50 _src = to_ipv4_addr(src, h.src_port);
51 _dst = to_ipv4_addr(dst, h.dst_port);
52 }
53
54 virtual socket_address get_src() override {
55 return _src;
56 };
57
58 virtual socket_address get_dst() override {
59 return _dst;
60 };
61
62 virtual uint16_t get_dst_port() override {
63 return _dst.port;
64 }
65
66 virtual packet& get_data() override {
67 return _p;
68 }
69 };
70
71 class native_channel : public udp_channel_impl {
72 private:
73 ipv4_udp& _proto;
74 ipv4_udp::registration _reg;
75 bool _closed;
76 lw_shared_ptr<udp_channel_state> _state;
77
78 public:
79 native_channel(ipv4_udp &proto, ipv4_udp::registration reg, lw_shared_ptr<udp_channel_state> state)
80 : _proto(proto)
81 , _reg(reg)
82 , _closed(false)
83 , _state(state)
84 {
85 }
86
87 ~native_channel()
88 {
89 if (!_closed)
90 close();
91 }
92
93 socket_address local_address() const override {
94 return socket_address(_proto.inet().host_address(), _reg.port());
95 }
96
97 virtual future<udp_datagram> receive() override {
98 return _state->_queue.pop_eventually();
99 }
100
101 virtual future<> send(const socket_address& dst, const char* msg) override {
102 return send(dst, packet::from_static_data(msg, strlen(msg)));
103 }
104
105 virtual future<> send(const socket_address& dst, packet p) override {
106 auto len = p.len();
107 return _state->wait_for_send_buffer(len).then([this, dst, p = std::move(p), len] () mutable {
108 p = packet(std::move(p), make_deleter([s = _state, len] { s->complete_send(len); }));
109 _proto.send(_reg.port(), dst, std::move(p));
110 });
111 }
112
113 virtual bool is_closed() const override {
114 return _closed;
115 }
116
117 virtual void shutdown_input() override {
118 _state->_queue.abort(std::make_exception_ptr(std::system_error(EBADF, std::system_category())));
119 }
120
121 virtual void shutdown_output() override {
122 _state->_queue.abort(std::make_exception_ptr(std::system_error(EPIPE, std::system_category())));
123 }
124
125 virtual void close() override {
126 _reg.unregister();
127 _closed = true;
128 }
129 };
130
131 } /* namespace ipv4_udp_impl */
132
133 using namespace net::ipv4_udp_impl;
134
135 const int ipv4_udp::default_queue_size = 1024;
136
137 ipv4_udp::ipv4_udp(ipv4& inet)
138 : _inet(inet)
139 {
140 _inet.register_packet_provider([this] {
141 std::optional<ipv4_traits::l4packet> l4p;
142 if (!_packetq.empty()) {
143 l4p = std::move(_packetq.front());
144 _packetq.pop_front();
145 }
146 return l4p;
147 });
148 }
149
150 bool ipv4_udp::forward(forward_hash& out_hash_data, packet& p, size_t off)
151 {
152 auto uh = p.get_header<udp_hdr>(off);
153
154 if (uh) {
155 out_hash_data.push_back(uh->src_port);
156 out_hash_data.push_back(uh->dst_port);
157 }
158 return true;
159 }
160
161 void ipv4_udp::received(packet p, ipv4_address from, ipv4_address to)
162 {
163 udp_datagram dgram(std::make_unique<native_datagram>(from, to, std::move(p)));
164
165 auto chan_it = _channels.find(dgram.get_dst_port());
166 if (chan_it != _channels.end()) {
167 auto chan = chan_it->second;
168 chan->_queue.push(std::move(dgram));
169 }
170 }
171
172 void ipv4_udp::send(uint16_t src_port, ipv4_addr dst, packet &&p)
173 {
174 auto src = _inet.host_address();
175 auto hdr = p.prepend_header<udp_hdr>();
176 hdr->src_port = src_port;
177 hdr->dst_port = dst.port;
178 hdr->len = p.len();
179 *hdr = hton(*hdr);
180
181 offload_info oi;
182 checksummer csum;
183 ipv4_traits::udp_pseudo_header_checksum(csum, src, dst, p.len());
184 bool needs_frag = ipv4::needs_frag(p, ip_protocol_num::udp, _inet.hw_features());
185 if (_inet.hw_features().tx_csum_l4_offload && !needs_frag) {
186 hdr->cksum = ~csum.get();
187 oi.needs_csum = true;
188 } else {
189 csum.sum(p);
190 hdr->cksum = csum.get();
191 oi.needs_csum = false;
192 }
193 oi.protocol = ip_protocol_num::udp;
194 p.set_offload_info(oi);
195
196 // FIXME: future is discarded
197 (void)_inet.get_l2_dst_address(dst).then([this, dst, p = std::move(p)] (ethernet_address e_dst) mutable {
198 _packetq.emplace_back(ipv4_traits::l4packet{dst, std::move(p), e_dst, ip_protocol_num::udp});
199 });
200 }
201
202 uint16_t ipv4_udp::next_port(uint16_t port) {
203 return (port + 1) == 0 ? min_anonymous_port : port + 1;
204 }
205
206 udp_channel
207 ipv4_udp::make_channel(ipv4_addr addr) {
208 if (!is_ip_unspecified(addr)) {
209 throw std::runtime_error("Binding to specific IP not supported yet");
210 }
211
212 uint16_t bind_port;
213
214 if (!is_port_unspecified(addr)) {
215 if (_channels.count(addr.port)) {
216 throw std::runtime_error("Address already in use");
217 }
218 bind_port = addr.port;
219 } else {
220 auto starting_port = _next_anonymous_port;
221 while (_channels.count(_next_anonymous_port)) {
222 _next_anonymous_port = next_port(_next_anonymous_port);
223 if (starting_port == _next_anonymous_port) {
224 throw std::runtime_error("No free port");
225 }
226 }
227
228 bind_port = _next_anonymous_port;
229 _next_anonymous_port = next_port(_next_anonymous_port);
230 }
231
232 auto chan_state = make_lw_shared<udp_channel_state>(_queue_size);
233 _channels[bind_port] = chan_state;
234 return udp_channel(std::make_unique<native_channel>(*this, registration(*this, bind_port), chan_state));
235 }
236
237 } /* namespace net */
238
239 }
240