]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/net/net.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / src / net / net.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 *
21 */
22
23 #include <boost/asio/ip/address_v4.hpp>
24 #include <boost/algorithm/string.hpp>
25 #include <seastar/net/net.hh>
26 #include <utility>
27 #include <seastar/net/toeplitz.hh>
28 #include <seastar/core/reactor.hh>
29 #include <seastar/core/metrics.hh>
30 #include <seastar/core/print.hh>
31 #include <seastar/net/inet_address.hh>
32
33 namespace seastar {
34
35 static_assert(std::is_nothrow_default_constructible_v<ipv4_addr>);
36 static_assert(std::is_nothrow_copy_constructible_v<ipv4_addr>);
37 static_assert(std::is_nothrow_move_constructible_v<ipv4_addr>);
38
39 static_assert(std::is_nothrow_default_constructible_v<ipv6_addr>);
40 static_assert(std::is_nothrow_copy_constructible_v<ipv6_addr>);
41 static_assert(std::is_nothrow_move_constructible_v<ipv6_addr>);
42
43 using std::move;
44
45 ipv4_addr::ipv4_addr(const std::string &addr) {
46 std::vector<std::string> items;
47 boost::split(items, addr, boost::is_any_of(":"));
48 if (items.size() == 1) {
49 ip = boost::asio::ip::address_v4::from_string(addr).to_ulong();
50 port = 0;
51 } else if (items.size() == 2) {
52 ip = boost::asio::ip::address_v4::from_string(items[0]).to_ulong();
53 port = std::stoul(items[1]);
54 } else {
55 throw std::invalid_argument("invalid format: " + addr);
56 }
57 }
58
59 ipv4_addr::ipv4_addr(const std::string &addr, uint16_t port_) : ip(boost::asio::ip::address_v4::from_string(addr).to_ulong()), port(port_) {}
60
61 ipv4_addr::ipv4_addr(const net::inet_address& a, uint16_t port)
62 : ipv4_addr(::in_addr(a), port)
63 {}
64
65 ipv4_addr::ipv4_addr(const socket_address &sa) noexcept
66 : ipv4_addr(sa.addr(), sa.port())
67 {}
68
69 ipv4_addr::ipv4_addr(const ::in_addr& in, uint16_t p) noexcept
70 : ip(net::ntoh(in.s_addr)), port(p)
71 {}
72
73 namespace net {
74
75 inline
76 bool qp::poll_tx() {
77 if (_tx_packetq.size() < 16) {
78 // refill send queue from upper layers
79 uint32_t work;
80 do {
81 work = 0;
82 for (auto&& pr : _pkt_providers) {
83 auto p = pr();
84 if (p) {
85 work++;
86 _tx_packetq.push_back(std::move(p.value()));
87 if (_tx_packetq.size() == 128) {
88 break;
89 }
90 }
91 }
92 } while (work && _tx_packetq.size() < 128);
93 }
94 if (!_tx_packetq.empty()) {
95 _stats.tx.good.update_pkts_bunch(send(_tx_packetq));
96 return true;
97 }
98
99 return false;
100 }
101
102 qp::qp(bool register_copy_stats,
103 const std::string stats_plugin_name, uint8_t qid)
104 : _tx_poller(std::make_unique<internal::poller>(reactor::poller::simple([this] { return poll_tx(); })))
105 , _stats_plugin_name(stats_plugin_name)
106 , _queue_name(std::string("queue") + std::to_string(qid))
107 {
108 namespace sm = metrics;
109
110 _metrics.add_group(_stats_plugin_name, {
111 //
112 // Packets rate: DERIVE:0:u
113 //
114 sm::make_counter(_queue_name + "_rx_packets", _stats.rx.good.packets,
115 sm::description("This metric is a receive packet rate for this queue.")),
116
117 sm::make_counter(_queue_name + "_tx_packets", _stats.tx.good.packets,
118 sm::description("This metric is a transmit packet rate for this queue.")),
119 //
120 // Bytes rate: DERIVE:0:U
121 //
122 sm::make_counter(_queue_name + "_rx_bytes", _stats.rx.good.bytes,
123 sm::description("This metric is a receive throughput for this queue.")),
124
125 sm::make_counter(_queue_name + "_tx_bytes", _stats.tx.good.bytes,
126 sm::description("This metric is a transmit throughput for this queue.")),
127 //
128 // Queue length: GAUGE:0:U
129 //
130 // Tx
131 sm::make_gauge(_queue_name + "_tx_packet_queue", [this] { return _tx_packetq.size(); },
132 sm::description("Holds a number of packets pending to be sent. "
133 "This metric will have high values if the network backend doesn't keep up with the upper layers or if upper layers send big bursts of packets.")),
134
135 //
136 // Linearization counter: DERIVE:0:U
137 //
138 sm::make_counter(_queue_name + "_xmit_linearized", _stats.tx.linearized,
139 sm::description("Counts a number of linearized Tx packets. High value indicates that we send too fragmented packets.")),
140
141 //
142 // Number of packets in last bunch: GAUGE:0:U
143 //
144 // Tx
145 sm::make_gauge(_queue_name + "_tx_packet_queue_last_bunch", _stats.tx.good.last_bunch,
146 sm::description(format("Holds a number of packets sent in the bunch. "
147 "A high value in conjunction with a high value of a {} indicates an efficient Tx packets bulking.", _queue_name + "_tx_packet_queue"))),
148 // Rx
149 sm::make_gauge(_queue_name + "_rx_packet_queue_last_bunch", _stats.rx.good.last_bunch,
150 sm::description("Holds a number of packets received in the last Rx bunch. High value indicates an efficient Rx packets bulking.")),
151
152 //
153 // Fragments rate: DERIVE:0:U
154 //
155 // Tx
156 sm::make_counter(_queue_name + "_tx_frags", _stats.tx.good.nr_frags,
157 sm::description(format("Counts a number of sent fragments. Divide this value by a {} to get an average number of fragments in a Tx packet.", _queue_name + "_tx_packets"))),
158 // Rx
159 sm::make_counter(_queue_name + "_rx_frags", _stats.rx.good.nr_frags,
160 sm::description(format("Counts a number of received fragments. Divide this value by a {} to get an average number of fragments in an Rx packet.", _queue_name + "_rx_packets"))),
161 });
162
163 if (register_copy_stats) {
164 _metrics.add_group(_stats_plugin_name, {
165 //
166 // Non-zero-copy data bytes rate: DERIVE:0:u
167 //
168 // Tx
169 sm::make_counter(_queue_name + "_tx_copy_bytes", _stats.tx.good.copy_bytes,
170 sm::description(format("Counts a number of sent bytes that were handled in a non-zero-copy way. Divide this value by a {} to get a portion of data sent using a non-zero-copy flow.", _queue_name + "_tx_bytes"))),
171 // Rx
172 sm::make_counter(_queue_name + "_rx_copy_bytes", _stats.rx.good.copy_bytes,
173 sm::description(format("Counts a number of received bytes that were handled in a non-zero-copy way. Divide this value by an {} to get a portion of received data handled using a non-zero-copy flow.", _queue_name + "_rx_bytes"))),
174
175 //
176 // Non-zero-copy data fragments rate: DERIVE:0:u
177 //
178 // Tx
179 sm::make_counter(_queue_name + "_tx_copy_frags", _stats.tx.good.copy_frags,
180 sm::description(format("Counts a number of sent fragments that were handled in a non-zero-copy way. Divide this value by a {} to get a portion of fragments sent using a non-zero-copy flow.", _queue_name + "_tx_frags"))),
181 // Rx
182 sm::make_counter(_queue_name + "_rx_copy_frags", _stats.rx.good.copy_frags,
183 sm::description(format("Counts a number of received fragments that were handled in a non-zero-copy way. Divide this value by a {} to get a portion of received fragments handled using a non-zero-copy flow.", _queue_name + "_rx_frags"))),
184
185 });
186 }
187 }
188
189 qp::~qp() {
190 }
191
192 void qp::configure_proxies(const std::map<unsigned, float>& cpu_weights) {
193 assert(!cpu_weights.empty());
194 if ((cpu_weights.size() == 1 && cpu_weights.begin()->first == this_shard_id())) {
195 // special case queue sending to self only, to avoid requiring a hash value
196 return;
197 }
198 register_packet_provider([this] {
199 std::optional<packet> p;
200 if (!_proxy_packetq.empty()) {
201 p = std::move(_proxy_packetq.front());
202 _proxy_packetq.pop_front();
203 }
204 return p;
205 });
206 build_sw_reta(cpu_weights);
207 }
208
209 void qp::build_sw_reta(const std::map<unsigned, float>& cpu_weights) {
210 float total_weight = 0;
211 for (auto&& x : cpu_weights) {
212 total_weight += x.second;
213 }
214 float accum = 0;
215 unsigned idx = 0;
216 std::array<uint8_t, 128> reta;
217 for (auto&& entry : cpu_weights) {
218 auto cpu = entry.first;
219 auto weight = entry.second;
220 accum += weight;
221 while (idx < (accum / total_weight * reta.size() - 0.5)) {
222 reta[idx++] = cpu;
223 }
224 }
225 _sw_reta = reta;
226 }
227
228 future<>
229 device::receive(std::function<future<> (packet)> next_packet) {
230 auto sub = _queues[this_shard_id()]->_rx_stream.listen(std::move(next_packet));
231 _queues[this_shard_id()]->rx_start();
232 return sub.done();
233 }
234
235 void device::set_local_queue(std::unique_ptr<qp> dev) {
236 assert(!_queues[this_shard_id()]);
237 _queues[this_shard_id()] = dev.get();
238 engine().at_destroy([dev = std::move(dev)] {});
239 }
240
241
242 l3_protocol::l3_protocol(interface* netif, eth_protocol_num proto_num, packet_provider_type func)
243 : _netif(netif), _proto_num(proto_num) {
244 _netif->register_packet_provider(std::move(func));
245 }
246
247 future<> l3_protocol::receive(
248 std::function<future<> (packet p, ethernet_address from)> rx_fn,
249 std::function<bool (forward_hash&, packet&, size_t)> forward) {
250 return _netif->register_l3(_proto_num, std::move(rx_fn), std::move(forward));
251 };
252
253 interface::interface(std::shared_ptr<device> dev)
254 : _dev(dev)
255 , _hw_address(_dev->hw_address())
256 , _hw_features(_dev->hw_features()) {
257 // FIXME: ignored future
258 (void)_dev->receive([this] (packet p) {
259 return dispatch_packet(std::move(p));
260 });
261 dev->local_queue().register_packet_provider([this, idx = 0u] () mutable {
262 std::optional<packet> p;
263 for (size_t i = 0; i < _pkt_providers.size(); i++) {
264 auto l3p = _pkt_providers[idx++]();
265 if (idx == _pkt_providers.size())
266 idx = 0;
267 if (l3p) {
268 auto l3pv = std::move(l3p.value());
269 auto eh = l3pv.p.prepend_header<eth_hdr>();
270 eh->dst_mac = l3pv.to;
271 eh->src_mac = _hw_address;
272 eh->eth_proto = uint16_t(l3pv.proto_num);
273 *eh = hton(*eh);
274 p = std::move(l3pv.p);
275 return p;
276 }
277 }
278 return p;
279 });
280 }
281
282 future<>
283 interface::register_l3(eth_protocol_num proto_num,
284 std::function<future<> (packet p, ethernet_address from)> next,
285 std::function<bool (forward_hash&, packet& p, size_t)> forward) {
286 auto i = _proto_map.emplace(std::piecewise_construct, std::make_tuple(uint16_t(proto_num)), std::forward_as_tuple(std::move(forward)));
287 assert(i.second);
288 l3_rx_stream& l3_rx = i.first->second;
289 return l3_rx.packet_stream.listen(std::move(next)).done();
290 }
291
292 unsigned interface::hash2cpu(uint32_t hash) {
293 return _dev->hash2cpu(hash);
294 }
295
296 uint16_t interface::hw_queues_count() {
297 return _dev->hw_queues_count();
298 }
299
300 rss_key_type interface::rss_key() const {
301 return _dev->rss_key();
302 }
303
304 void interface::forward(unsigned cpuid, packet p) {
305 static __thread unsigned queue_depth;
306
307 if (queue_depth < 1000) {
308 queue_depth++;
309 auto src_cpu = this_shard_id();
310 // FIXME: future is discarded
311 (void)smp::submit_to(cpuid, [this, p = std::move(p), src_cpu]() mutable {
312 _dev->l2receive(p.free_on_cpu(src_cpu));
313 }).then([] {
314 queue_depth--;
315 });
316 }
317 }
318
319 future<> interface::dispatch_packet(packet p) {
320 auto eh = p.get_header<eth_hdr>();
321 if (eh) {
322 auto i = _proto_map.find(ntoh(eh->eth_proto));
323 if (i != _proto_map.end()) {
324 l3_rx_stream& l3 = i->second;
325 auto fw = _dev->forward_dst(this_shard_id(), [&p, &l3, this] () {
326 auto hwrss = p.rss_hash();
327 if (hwrss) {
328 return hwrss.value();
329 } else {
330 forward_hash data;
331 if (l3.forward(data, p, sizeof(eth_hdr))) {
332 return toeplitz_hash(rss_key(), data);
333 }
334 return 0u;
335 }
336 });
337 if (fw != this_shard_id()) {
338 forward(fw, std::move(p));
339 } else {
340 auto h = ntoh(*eh);
341 auto from = h.src_mac;
342 p.trim_front(sizeof(*eh));
343 // avoid chaining, since queue lenth is unlimited
344 // drop instead.
345 if (l3.ready.available()) {
346 l3.ready = l3.packet_stream.produce(std::move(p), from);
347 }
348 }
349 }
350 }
351 return make_ready_future<>();
352 }
353
354 }
355
356 }