2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
23 #include <boost/asio/ip/address_v4.hpp>
24 #include <boost/algorithm/string.hpp>
25 #include <seastar/net/net.hh>
27 #include <seastar/net/toeplitz.hh>
28 #include <seastar/core/metrics.hh>
29 #include <seastar/core/print.hh>
30 #include <seastar/net/inet_address.hh>
34 std::ostream
& operator<<(std::ostream
&os
, ipv4_addr addr
) {
35 fmt_print(os
, "{:d}.{:d}.{:d}.{:d}",
36 (addr
.ip
>> 24) & 0xff,
37 (addr
.ip
>> 16) & 0xff,
38 (addr
.ip
>> 8) & 0xff,
40 return os
<< ":" << addr
.port
;
45 ipv4_addr::ipv4_addr(const std::string
&addr
) {
46 std::vector
<std::string
> items
;
47 boost::split(items
, addr
, boost::is_any_of(":"));
48 if (items
.size() == 1) {
49 ip
= boost::asio::ip::address_v4::from_string(addr
).to_ulong();
51 } else if (items
.size() == 2) {
52 ip
= boost::asio::ip::address_v4::from_string(items
[0]).to_ulong();
53 port
= std::stoul(items
[1]);
55 throw std::invalid_argument("invalid format: " + addr
);
59 ipv4_addr::ipv4_addr(const std::string
&addr
, uint16_t port_
) : ip(boost::asio::ip::address_v4::from_string(addr
).to_ulong()), port(port_
) {}
61 ipv4_addr::ipv4_addr(const net::inet_address
& a
, uint16_t port
)
62 : ipv4_addr(::in_addr(a
), port
)
65 ipv4_addr::ipv4_addr(const socket_address
&sa
)
66 : ipv4_addr(sa
.addr(), sa
.port())
69 ipv4_addr::ipv4_addr(const ::in_addr
& in
, uint16_t p
)
70 : ip(net::ntoh(in
.s_addr
)), port(p
)
77 if (_tx_packetq
.size() < 16) {
78 // refill send queue from upper layers
82 for (auto&& pr
: _pkt_providers
) {
86 _tx_packetq
.push_back(std::move(p
.value()));
87 if (_tx_packetq
.size() == 128) {
92 } while (work
&& _tx_packetq
.size() < 128);
94 if (!_tx_packetq
.empty()) {
95 _stats
.tx
.good
.update_pkts_bunch(send(_tx_packetq
));
102 qp::qp(bool register_copy_stats
,
103 const std::string stats_plugin_name
, uint8_t qid
)
104 : _tx_poller(reactor::poller::simple([this] { return poll_tx(); }))
105 , _stats_plugin_name(stats_plugin_name
)
106 , _queue_name(std::string("queue") + std::to_string(qid
))
108 namespace sm
= metrics
;
110 _metrics
.add_group(_stats_plugin_name
, {
112 // Packets rate: DERIVE:0:u
114 sm::make_derive(_queue_name
+ "_rx_packets", _stats
.rx
.good
.packets
,
115 sm::description("This metric is a receive packet rate for this queue.")),
117 sm::make_derive(_queue_name
+ "_tx_packets", _stats
.tx
.good
.packets
,
118 sm::description("This metric is a transmit packet rate for this queue.")),
120 // Bytes rate: DERIVE:0:U
122 sm::make_derive(_queue_name
+ "_rx_bytes", _stats
.rx
.good
.bytes
,
123 sm::description("This metric is a receive throughput for this queue.")),
125 sm::make_derive(_queue_name
+ "_tx_bytes", _stats
.tx
.good
.bytes
,
126 sm::description("This metric is a transmit throughput for this queue.")),
128 // Queue length: GAUGE:0:U
131 sm::make_gauge(_queue_name
+ "_tx_packet_queue", [this] { return _tx_packetq
.size(); },
132 sm::description("Holds a number of packets pending to be sent. "
133 "This metric will have high values if the network backend doesn't keep up with the upper layers or if upper layers send big bursts of packets.")),
136 // Linearization counter: DERIVE:0:U
138 sm::make_derive(_queue_name
+ "_xmit_linearized", _stats
.tx
.linearized
,
139 sm::description("Counts a number of linearized Tx packets. High value indicates that we send too fragmented packets.")),
142 // Number of packets in last bunch: GAUGE:0:U
145 sm::make_gauge(_queue_name
+ "_tx_packet_queue_last_bunch", _stats
.tx
.good
.last_bunch
,
146 sm::description(format("Holds a number of packets sent in the bunch. "
147 "A high value in conjunction with a high value of a {} indicates an efficient Tx packets bulking.", _queue_name
+ "_tx_packet_queue"))),
149 sm::make_gauge(_queue_name
+ "_rx_packet_queue_last_bunch", _stats
.rx
.good
.last_bunch
,
150 sm::description("Holds a number of packets received in the last Rx bunch. High value indicates an efficient Rx packets bulking.")),
153 // Fragments rate: DERIVE:0:U
156 sm::make_derive(_queue_name
+ "_tx_frags", _stats
.tx
.good
.nr_frags
,
157 sm::description(format("Counts a number of sent fragments. Divide this value by a {} to get an average number of fragments in a Tx packet.", _queue_name
+ "_tx_packets"))),
159 sm::make_derive(_queue_name
+ "_rx_frags", _stats
.rx
.good
.nr_frags
,
160 sm::description(format("Counts a number of received fragments. Divide this value by a {} to get an average number of fragments in an Rx packet.", _queue_name
+ "_rx_packets"))),
163 if (register_copy_stats
) {
164 _metrics
.add_group(_stats_plugin_name
, {
166 // Non-zero-copy data bytes rate: DERIVE:0:u
169 sm::make_derive(_queue_name
+ "_tx_copy_bytes", _stats
.tx
.good
.copy_bytes
,
170 sm::description(format("Counts a number of sent bytes that were handled in a non-zero-copy way. Divide this value by a {} to get a portion of data sent using a non-zero-copy flow.", _queue_name
+ "_tx_bytes"))),
172 sm::make_derive(_queue_name
+ "_rx_copy_bytes", _stats
.rx
.good
.copy_bytes
,
173 sm::description(format("Counts a number of received bytes that were handled in a non-zero-copy way. Divide this value by an {} to get a portion of received data handled using a non-zero-copy flow.", _queue_name
+ "_rx_bytes"))),
176 // Non-zero-copy data fragments rate: DERIVE:0:u
179 sm::make_derive(_queue_name
+ "_tx_copy_frags", _stats
.tx
.good
.copy_frags
,
180 sm::description(format("Counts a number of sent fragments that were handled in a non-zero-copy way. Divide this value by a {} to get a portion of fragments sent using a non-zero-copy flow.", _queue_name
+ "_tx_frags"))),
182 sm::make_derive(_queue_name
+ "_rx_copy_frags", _stats
.rx
.good
.copy_frags
,
183 sm::description(format("Counts a number of received fragments that were handled in a non-zero-copy way. Divide this value by a {} to get a portion of received fragments handled using a non-zero-copy flow.", _queue_name
+ "_rx_frags"))),
192 void qp::configure_proxies(const std::map
<unsigned, float>& cpu_weights
) {
193 assert(!cpu_weights
.empty());
194 if ((cpu_weights
.size() == 1 && cpu_weights
.begin()->first
== engine().cpu_id())) {
195 // special case queue sending to self only, to avoid requiring a hash value
198 register_packet_provider([this] {
199 compat::optional
<packet
> p
;
200 if (!_proxy_packetq
.empty()) {
201 p
= std::move(_proxy_packetq
.front());
202 _proxy_packetq
.pop_front();
206 build_sw_reta(cpu_weights
);
209 void qp::build_sw_reta(const std::map
<unsigned, float>& cpu_weights
) {
210 float total_weight
= 0;
211 for (auto&& x
: cpu_weights
) {
212 total_weight
+= x
.second
;
216 std::array
<uint8_t, 128> reta
;
217 for (auto&& entry
: cpu_weights
) {
218 auto cpu
= entry
.first
;
219 auto weight
= entry
.second
;
221 while (idx
< (accum
/ total_weight
* reta
.size() - 0.5)) {
229 device::receive(std::function
<future
<> (packet
)> next_packet
) {
230 auto sub
= _queues
[engine().cpu_id()]->_rx_stream
.listen(std::move(next_packet
));
231 _queues
[engine().cpu_id()]->rx_start();
235 void device::set_local_queue(std::unique_ptr
<qp
> dev
) {
236 assert(!_queues
[engine().cpu_id()]);
237 _queues
[engine().cpu_id()] = dev
.get();
238 engine().at_destroy([dev
= std::move(dev
)] {});
242 l3_protocol::l3_protocol(interface
* netif
, eth_protocol_num proto_num
, packet_provider_type func
)
243 : _netif(netif
), _proto_num(proto_num
) {
244 _netif
->register_packet_provider(std::move(func
));
247 future
<> l3_protocol::receive(
248 std::function
<future
<> (packet p
, ethernet_address from
)> rx_fn
,
249 std::function
<bool (forward_hash
&, packet
&, size_t)> forward
) {
250 return _netif
->register_l3(_proto_num
, std::move(rx_fn
), std::move(forward
));
253 interface::interface(std::shared_ptr
<device
> dev
)
255 , _hw_address(_dev
->hw_address())
256 , _hw_features(_dev
->hw_features()) {
257 // FIXME: ignored future
258 (void)_dev
->receive([this] (packet p
) {
259 return dispatch_packet(std::move(p
));
261 dev
->local_queue().register_packet_provider([this, idx
= 0u] () mutable {
262 compat::optional
<packet
> p
;
263 for (size_t i
= 0; i
< _pkt_providers
.size(); i
++) {
264 auto l3p
= _pkt_providers
[idx
++]();
265 if (idx
== _pkt_providers
.size())
268 auto l3pv
= std::move(l3p
.value());
269 auto eh
= l3pv
.p
.prepend_header
<eth_hdr
>();
270 eh
->dst_mac
= l3pv
.to
;
271 eh
->src_mac
= _hw_address
;
272 eh
->eth_proto
= uint16_t(l3pv
.proto_num
);
274 p
= std::move(l3pv
.p
);
283 interface::register_l3(eth_protocol_num proto_num
,
284 std::function
<future
<> (packet p
, ethernet_address from
)> next
,
285 std::function
<bool (forward_hash
&, packet
& p
, size_t)> forward
) {
286 auto i
= _proto_map
.emplace(std::piecewise_construct
, std::make_tuple(uint16_t(proto_num
)), std::forward_as_tuple(std::move(forward
)));
288 l3_rx_stream
& l3_rx
= i
.first
->second
;
289 return l3_rx
.packet_stream
.listen(std::move(next
)).done();
292 unsigned interface::hash2cpu(uint32_t hash
) {
293 return _dev
->hash2cpu(hash
);
296 uint16_t interface::hw_queues_count() {
297 return _dev
->hw_queues_count();
300 rss_key_type
interface::rss_key() const {
301 return _dev
->rss_key();
304 void interface::forward(unsigned cpuid
, packet p
) {
305 static __thread
unsigned queue_depth
;
307 if (queue_depth
< 1000) {
309 auto src_cpu
= engine().cpu_id();
310 // FIXME: future is discarded
311 (void)smp::submit_to(cpuid
, [this, p
= std::move(p
), src_cpu
]() mutable {
312 _dev
->l2receive(p
.free_on_cpu(src_cpu
));
319 future
<> interface::dispatch_packet(packet p
) {
320 auto eh
= p
.get_header
<eth_hdr
>();
322 auto i
= _proto_map
.find(ntoh(eh
->eth_proto
));
323 if (i
!= _proto_map
.end()) {
324 l3_rx_stream
& l3
= i
->second
;
325 auto fw
= _dev
->forward_dst(engine().cpu_id(), [&p
, &l3
, this] () {
326 auto hwrss
= p
.rss_hash();
328 return hwrss
.value();
331 if (l3
.forward(data
, p
, sizeof(eth_hdr
))) {
332 return toeplitz_hash(rss_key(), data
);
337 if (fw
!= engine().cpu_id()) {
338 forward(fw
, std::move(p
));
341 auto from
= h
.src_mac
;
342 p
.trim_front(sizeof(*eh
));
343 // avoid chaining, since queue lenth is unlimited
345 if (l3
.ready
.available()) {
346 l3
.ready
= l3
.packet_stream
.produce(std::move(p
), from
);
351 return make_ready_future
<>();