2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 #ifdef SEASTAR_HAVE_DPDK
24 #include <seastar/core/posix.hh>
25 #include "core/vla.hh"
26 #include <seastar/core/reactor.hh>
27 #include <seastar/net/virtio-interface.hh>
28 #include <seastar/core/stream.hh>
29 #include <seastar/core/circular_buffer.hh>
30 #include <seastar/core/align.hh>
31 #include <seastar/core/sstring.hh>
32 #include <seastar/core/memory.hh>
33 #include <seastar/core/metrics.hh>
34 #include <seastar/util/function_input_iterator.hh>
35 #include <seastar/util/transform_iterator.hh>
39 #include <seastar/util/std-compat.hh>
40 #include <boost/preprocessor.hpp>
41 #include <seastar/net/ip.hh>
42 #include <seastar/net/const.hh>
43 #include <seastar/core/dpdk_rte.hh>
44 #include <seastar/net/dpdk.hh>
45 #include <seastar/net/toeplitz.hh>
51 #include <rte_config.h>
52 #include <rte_common.h>
55 #include <rte_ethdev.h>
56 #include <rte_cycles.h>
57 #include <rte_memzone.h>
60 #if RTE_VERSION <= RTE_VERSION_NUM(2,0,0,16)
65 rte_mbuf_to_baddr(rte_mbuf
* mbuf
) {
66 return reinterpret_cast<char*>(RTE_MBUF_TO_BADDR(mbuf
));
69 void* as_cookie(struct rte_pktmbuf_pool_private
& p
) {
70 return reinterpret_cast<void*>(uint64_t(p
.mbuf_data_room_size
));
75 void* as_cookie(struct rte_pktmbuf_pool_private
& p
) {
82 typedef void *MARKER
[0]; /**< generic marker for a point in a structure */
85 // Calculate maximum amount of memory required to store given number of objects
87 get_mempool_xmem_size(uint32_t elt_num
, size_t total_elt_sz
, uint32_t pg_shift
)
89 size_t obj_per_page
, pg_num
, pg_sz
;
91 if (total_elt_sz
== 0) {
96 return total_elt_sz
* elt_num
;
99 pg_sz
= (size_t)1 << pg_shift
;
100 obj_per_page
= pg_sz
/ total_elt_sz
;
101 if (obj_per_page
== 0) {
102 return RTE_ALIGN_CEIL(total_elt_sz
, pg_sz
) * elt_num
;
105 pg_num
= (elt_num
+ obj_per_page
- 1) / obj_per_page
;
106 return pg_num
<< pg_shift
;
109 using namespace seastar::net
;
115 /******************* Net device related constatns *****************************/
116 static constexpr uint16_t default_ring_size
= 512;
119 // We need 2 times the ring size of buffers because of the way PMDs
122 static constexpr uint16_t mbufs_per_queue_rx
= 2 * default_ring_size
;
123 static constexpr uint16_t rx_gc_thresh
= 64;
126 // No need to keep more descriptors in the air than can be sent in a single
127 // rte_eth_tx_burst() call.
129 static constexpr uint16_t mbufs_per_queue_tx
= 2 * default_ring_size
;
131 static constexpr uint16_t mbuf_cache_size
= 512;
132 static constexpr uint16_t mbuf_overhead
=
133 sizeof(struct rte_mbuf
) + RTE_PKTMBUF_HEADROOM
;
135 // We'll allocate 2K data buffers for an inline case because this would require
136 // a single page per mbuf. If we used 4K data buffers here it would require 2
137 // pages for a single buffer (due to "mbuf_overhead") and this is a much more
138 // demanding memory constraint.
140 static constexpr size_t inline_mbuf_data_size
= 2048;
143 // Size of the data buffer in the non-inline case.
145 // We may want to change (increase) this value in future, while the
146 // inline_mbuf_data_size value will unlikely change due to reasons described
149 static constexpr size_t mbuf_data_size
= 2048;
151 // (INLINE_MBUF_DATA_SIZE(2K)*32 = 64K = Max TSO/LRO size) + 1 mbuf for headers
152 static constexpr uint8_t max_frags
= 32 + 1;
155 // Intel's 40G NIC HW limit for a number of fragments in an xmit segment.
157 // See Chapter 8.4.1 "Transmit Packet in System Memory" of the xl710 devices
158 // spec. for more details.
160 static constexpr uint8_t i40e_max_xmit_segment_frags
= 8;
163 // VMWare's virtual NIC limit for a number of fragments in an xmit segment.
165 // see drivers/net/vmxnet3/base/vmxnet3_defs.h VMXNET3_MAX_TXD_PER_PKT
167 static constexpr uint8_t vmxnet3_max_xmit_segment_frags
= 16;
169 static constexpr uint16_t inline_mbuf_size
=
170 inline_mbuf_data_size
+ mbuf_overhead
;
172 uint32_t qp_mempool_obj_size(bool hugetlbfs_membackend
)
174 uint32_t mp_size
= 0;
175 struct rte_mempool_objsz mp_obj_sz
= {};
178 // We will align each size to huge page size because DPDK allocates
179 // physically contiguous memory region for each pool object.
183 if (hugetlbfs_membackend
) {
185 align_up(rte_mempool_calc_obj_size(mbuf_overhead
, 0, &mp_obj_sz
)+
186 sizeof(struct rte_pktmbuf_pool_private
),
187 memory::huge_page_size
);
190 align_up(rte_mempool_calc_obj_size(inline_mbuf_size
, 0, &mp_obj_sz
)+
191 sizeof(struct rte_pktmbuf_pool_private
),
192 memory::huge_page_size
);
195 std::memset(&mp_obj_sz
, 0, sizeof(mp_obj_sz
));
196 mp_size
+= align_up(rte_mempool_calc_obj_size(inline_mbuf_size
, 0,
198 sizeof(struct rte_pktmbuf_pool_private
),
199 memory::huge_page_size
);
203 static constexpr const char* pktmbuf_pool_name
= "dpdk_pktmbuf_pool";
206 * When doing reads from the NIC queues, use this batch size
208 static constexpr uint8_t packet_read_size
= 32;
209 /******************************************************************************/
212 port_stats() : rx
{}, tx
{} {}
216 uint64_t mcast
; // number of received multicast packets
217 uint64_t pause_xon
; // number of received PAUSE XON frames
218 uint64_t pause_xoff
; // number of received PAUSE XOFF frames
222 uint64_t dropped
; // missed packets (e.g. full FIFO)
223 uint64_t crc
; // packets with CRC error
224 uint64_t len
; // packets with a bad length
225 uint64_t total
; // total number of erroneous received packets
231 uint64_t pause_xon
; // number of sent PAUSE XON frames
232 uint64_t pause_xoff
; // number of sent PAUSE XOFF frames
236 uint64_t total
; // total number of failed transmitted packets
241 #define XSTATS_ID_LIST \
242 (rx_multicast_packets) \
247 (rx_undersize_errors) \
248 (rx_oversize_errors) \
254 dpdk_xstats(uint16_t port_id
)
268 BOOST_PP_SEQ_ENUM(XSTATS_ID_LIST
)
273 _len
= rte_eth_xstats_get_names(_port_id
, NULL
, 0);
274 _xstats
= new rte_eth_xstat
[_len
];
275 _xstat_names
= new rte_eth_xstat_name
[_len
];
277 update_xstat_names();
281 void update_xstats() {
282 auto len
= rte_eth_xstats_get(_port_id
, _xstats
, _len
);
286 uint64_t get_value(const xstat_id id
) {
287 auto off
= _offsets
[static_cast<int>(id
)];
291 return _xstats
[off
].value
;
297 struct rte_eth_xstat
*_xstats
= nullptr;
298 struct rte_eth_xstat_name
*_xstat_names
= nullptr;
299 int _offsets
[BOOST_PP_SEQ_SIZE(XSTATS_ID_LIST
)];
301 static const sstring
id_to_str(const xstat_id id
) {
302 #define ENUM_TO_STR(r, data, elem) \
304 return BOOST_PP_STRINGIZE(elem);
306 BOOST_PP_SEQ_FOR_EACH(ENUM_TO_STR
, ~, XSTATS_ID_LIST
)
310 int get_offset_by_name(const xstat_id id
, const int len
) {
311 for (int i
= 0; i
< len
; i
++) {
312 if (id_to_str(id
) == _xstat_names
[i
].name
)
318 void update_xstat_names() {
319 auto len
= rte_eth_xstats_get_names(_port_id
, _xstat_names
, _len
);
323 void update_offsets() {
324 #define FIND_OFFSET(r, data, elem) \
325 _offsets[static_cast<int>(elem)] = \
326 get_offset_by_name(elem, _len);
328 BOOST_PP_SEQ_FOR_EACH(FIND_OFFSET
, ~, XSTATS_ID_LIST
)
332 class dpdk_device
: public device
{
334 uint16_t _num_queues
;
335 net::hw_features _hw_features
;
336 uint16_t _queues_ready
= 0;
340 std::vector
<uint8_t> _redir_table
;
341 rss_key_type _rss_key
;
343 timer
<> _stats_collector
;
344 const std::string _stats_plugin_name
;
345 const std::string _stats_plugin_inst
;
346 seastar::metrics::metric_groups _metrics
;
347 bool _is_i40e_device
= false;
348 bool _is_vmxnet3_device
= false;
352 rte_eth_dev_info _dev_info
= {};
353 promise
<> _link_ready_promise
;
357 * Port initialization consists of 3 main stages:
358 * 1) General port initialization which ends with a call to
359 * rte_eth_dev_configure() where we request the needed number of Rx and
361 * 2) Individual queues initialization. This is done in the constructor of
362 * dpdk_qp class. In particular the memory pools for queues are allocated
364 * 3) The final stage of the initialization which starts with the call of
365 * rte_eth_dev_start() after which the port becomes fully functional. We
366 * will also wait for a link to get up in this stage.
371 * First stage of the port initialization.
373 * @return 0 in case of success and an appropriate error code in case of an
376 int init_port_start();
379 * The final stage of a port initialization.
380 * @note Must be called *after* all queues from stage (2) have been
383 void init_port_fini();
386 * Check the link status of out port in up to 9s, and print them finally.
388 void check_port_link_status();
391 * Configures the HW Flow Control
393 void set_hw_flow_control();
396 dpdk_device(uint16_t port_idx
, uint16_t num_queues
, bool use_lro
,
398 : _port_idx(port_idx
)
399 , _num_queues(num_queues
)
400 , _home_cpu(this_shard_id())
402 , _enable_fc(enable_fc
)
403 , _stats_plugin_name("network")
404 , _stats_plugin_inst(std::string("port") + std::to_string(_port_idx
))
408 /* now initialise the port we will use */
409 int ret
= init_port_start();
411 rte_exit(EXIT_FAILURE
, "Cannot initialise port %u\n", _port_idx
);
414 // Register port statistics pollers
415 namespace sm
= seastar::metrics
;
416 _metrics
.add_group(_stats_plugin_name
, {
418 sm::make_derive("rx_multicast", _stats
.rx
.good
.mcast
,
419 sm::description("Counts a number of received multicast packets."), {sm::shard_label(_stats_plugin_inst
)}),
421 sm::make_derive("rx_crc_errors", _stats
.rx
.bad
.crc
,
422 sm::description("Counts a number of received packets with a bad CRC value. "
423 "A non-zero value of this metric usually indicates a HW problem, e.g. a bad cable."), {sm::shard_label(_stats_plugin_inst
)}),
425 sm::make_derive("rx_dropped", _stats
.rx
.bad
.dropped
,
426 sm::description("Counts a number of dropped received packets. "
427 "A non-zero value of this counter indicated the overflow of ingress HW buffers. "
428 "This usually happens because of a rate of a sender on the other side of the link is higher than we can process as a receiver."), {sm::shard_label(_stats_plugin_inst
)}),
430 sm::make_derive("rx_bad_length_errors", _stats
.rx
.bad
.len
,
431 sm::description("Counts a number of received packets with a bad length value. "
432 "A non-zero value of this metric usually indicates a HW issue: e.g. bad cable."), {sm::shard_label(_stats_plugin_inst
)}),
435 sm::make_derive("rx_pause_xon", _stats
.rx
.good
.pause_xon
,
436 sm::description("Counts a number of received PAUSE XON frames (PAUSE frame with a quanta of zero). "
437 "When PAUSE XON frame is received our port may resume sending L2 frames. "
438 "PAUSE XON frames are sent to resume sending that was previously paused with a PAUSE XOFF frame. If ingress "
439 "buffer falls below the low watermark threshold before the timeout configured in the original PAUSE XOFF frame the receiver may decide to send PAUSE XON frame. "
440 "A non-zero value of this metric may mean that our sender is bursty and that the spikes overwhelm the receiver on the other side of the link."), {sm::shard_label(_stats_plugin_inst
)}),
442 sm::make_derive("tx_pause_xon", _stats
.tx
.good
.pause_xon
,
443 sm::description("Counts a number of sent PAUSE XON frames (L2 flow control frames). "
444 "A non-zero value of this metric indicates that our ingress path doesn't keep up with the rate of a sender on the other side of the link. "
445 "Note that if a sender port respects PAUSE frames this will prevent it from sending from ALL its egress queues because L2 flow control is defined "
446 "on a per-link resolution."), {sm::shard_label(_stats_plugin_inst
)}),
448 sm::make_derive("rx_pause_xoff", _stats
.rx
.good
.pause_xoff
,
449 sm::description("Counts a number of received PAUSE XOFF frames. "
450 "A non-zero value of this metric indicates that our egress overwhelms the receiver on the other side of the link and it has to send PAUSE frames to make us stop sending. "
451 "Note that if our port respects PAUSE frames a reception of a PAUSE XOFF frame will cause ALL egress queues of this port to stop sending."), {sm::shard_label(_stats_plugin_inst
)}),
453 sm::make_derive("tx_pause_xoff", _stats
.tx
.good
.pause_xoff
,
454 sm::description("Counts a number of sent PAUSE XOFF frames. "
455 "A non-zero value of this metric indicates that our ingress path (SW and HW) doesn't keep up with the rate of a sender on the other side of the link and as a result "
456 "our ingress HW buffers overflow."), {sm::shard_label(_stats_plugin_inst
)}),
458 sm::make_derive("rx_errors", _stats
.rx
.bad
.total
,
459 sm::description("Counts the total number of ingress errors: CRC errors, bad length errors, etc."), {sm::shard_label(_stats_plugin_inst
)}),
461 sm::make_derive("tx_errors", _stats
.tx
.bad
.total
,
462 sm::description("Counts a total number of egress errors. A non-zero value usually indicated a problem with a HW or a SW driver."), {sm::shard_label(_stats_plugin_inst
)}),
467 _stats_collector
.cancel();
470 ethernet_address
hw_address() override
{
471 struct ether_addr mac
;
472 rte_eth_macaddr_get(_port_idx
, &mac
);
474 return mac
.addr_bytes
;
476 net::hw_features
hw_features() override
{
480 net::hw_features
& hw_features_ref() { return _hw_features
; }
482 const rte_eth_rxconf
* def_rx_conf() const {
483 return &_dev_info
.default_rxconf
;
486 const rte_eth_txconf
* def_tx_conf() const {
487 return &_dev_info
.default_txconf
;
491 * Set the RSS table in the device and store it in the internal vector.
493 void set_rss_table();
495 virtual uint16_t hw_queues_count() override
{ return _num_queues
; }
496 virtual future
<> link_ready() override
{ return _link_ready_promise
.get_future(); }
497 virtual std::unique_ptr
<qp
> init_local_queue(boost::program_options::variables_map opts
, uint16_t qid
) override
;
498 virtual unsigned hash2qid(uint32_t hash
) override
{
499 assert(_redir_table
.size());
500 return _redir_table
[hash
& (_redir_table
.size() - 1)];
502 uint16_t port_idx() { return _port_idx
; }
503 bool is_i40e_device() const {
504 return _is_i40e_device
;
506 bool is_vmxnet3_device() const {
507 return _is_vmxnet3_device
;
510 virtual rss_key_type
rss_key() const override
{ return _rss_key
; }
513 template <bool HugetlbfsMemBackend
>
514 class dpdk_qp
: public net::qp
{
515 class tx_buf_factory
;
518 friend class dpdk_qp
;
520 static tx_buf
* me(rte_mbuf
* mbuf
) {
521 return reinterpret_cast<tx_buf
*>(mbuf
);
526 * Checks if the original packet of a given cluster should be linearized
527 * due to HW limitations.
529 * @param head head of a cluster to check
531 * @return TRUE if a packet should be linearized.
533 static bool i40e_should_linearize(rte_mbuf
*head
) {
534 bool is_tso
= head
->ol_flags
& PKT_TX_TCP_SEG
;
536 // For a non-TSO case: number of fragments should not exceed 8
538 return head
->nb_segs
> i40e_max_xmit_segment_frags
;
542 // For a TSO case each MSS window should not include more than 8
543 // fragments including headers.
546 // Calculate the number of frags containing headers.
548 // Note: we support neither VLAN nor tunneling thus headers size
549 // accounting is super simple.
551 size_t headers_size
= head
->l2_len
+ head
->l3_len
+ head
->l4_len
;
552 unsigned hdr_frags
= 0;
553 size_t cur_payload_len
= 0;
554 rte_mbuf
*cur_seg
= head
;
556 while (cur_seg
&& cur_payload_len
< headers_size
) {
557 cur_payload_len
+= cur_seg
->data_len
;
558 cur_seg
= cur_seg
->next
;
563 // Header fragments will be used for each TSO segment, thus the
564 // maximum number of data segments will be 8 minus the number of
567 // It's unclear from the spec how the first TSO segment is treated
568 // if the last fragment with headers contains some data bytes:
569 // whether this fragment will be accounted as a single fragment or
570 // as two separate fragments. We prefer to play it safe and assume
571 // that this fragment will be accounted as two separate fragments.
573 size_t max_win_size
= i40e_max_xmit_segment_frags
- hdr_frags
;
575 if (head
->nb_segs
<= max_win_size
) {
579 // Get the data (without headers) part of the first data fragment
580 size_t prev_frag_data
= cur_payload_len
- headers_size
;
581 auto mss
= head
->tso_segsz
;
584 unsigned frags_in_seg
= 0;
585 size_t cur_seg_size
= 0;
587 if (prev_frag_data
) {
588 cur_seg_size
= prev_frag_data
;
593 while (cur_seg_size
< mss
&& cur_seg
) {
594 cur_seg_size
+= cur_seg
->data_len
;
595 cur_seg
= cur_seg
->next
;
598 if (frags_in_seg
> max_win_size
) {
603 if (cur_seg_size
> mss
) {
604 prev_frag_data
= cur_seg_size
- mss
;
612 * Sets the offload info in the head buffer of an rte_mbufs cluster.
614 * @param p an original packet the cluster is built for
615 * @param qp QP handle
616 * @param head a head of an rte_mbufs cluster
618 static void set_cluster_offload_info(const packet
& p
, const dpdk_qp
& qp
, rte_mbuf
* head
) {
619 // Handle TCP checksum offload
620 auto oi
= p
.offload_info();
621 if (oi
.needs_ip_csum
) {
622 head
->ol_flags
|= PKT_TX_IP_CKSUM
;
623 // TODO: Take a VLAN header into an account here
624 head
->l2_len
= sizeof(struct ether_hdr
);
625 head
->l3_len
= oi
.ip_hdr_len
;
627 if (qp
.port().hw_features().tx_csum_l4_offload
) {
628 if (oi
.protocol
== ip_protocol_num::tcp
) {
629 head
->ol_flags
|= PKT_TX_TCP_CKSUM
;
630 // TODO: Take a VLAN header into an account here
631 head
->l2_len
= sizeof(struct ether_hdr
);
632 head
->l3_len
= oi
.ip_hdr_len
;
634 if (oi
.tso_seg_size
) {
635 assert(oi
.needs_ip_csum
);
636 head
->ol_flags
|= PKT_TX_TCP_SEG
;
637 head
->l4_len
= oi
.tcp_hdr_len
;
638 head
->tso_segsz
= oi
.tso_seg_size
;
640 } else if (oi
.protocol
== ip_protocol_num::udp
) {
641 head
->ol_flags
|= PKT_TX_UDP_CKSUM
;
642 // TODO: Take a VLAN header into an account here
643 head
->l2_len
= sizeof(struct ether_hdr
);
644 head
->l3_len
= oi
.ip_hdr_len
;
650 * Creates a tx_buf cluster representing a given packet in a "zero-copy"
653 * @param p packet to translate
654 * @param qp dpdk_qp handle
656 * @return the HEAD tx_buf of the cluster or nullptr in case of a
659 static tx_buf
* from_packet_zc(packet
&& p
, dpdk_qp
& qp
) {
661 // Too fragmented - linearize
662 if (p
.nr_frags() > max_frags
) {
664 ++qp
._stats
.tx
.linearized
;
668 rte_mbuf
*head
= nullptr, *last_seg
= nullptr;
671 // Create a HEAD of the fragmented packet
672 if (!translate_one_frag(qp
, p
.frag(0), head
, last_seg
, nsegs
)) {
676 unsigned total_nsegs
= nsegs
;
678 for (unsigned i
= 1; i
< p
.nr_frags(); i
++) {
679 rte_mbuf
*h
= nullptr, *new_last_seg
= nullptr;
680 if (!translate_one_frag(qp
, p
.frag(i
), h
, new_last_seg
, nsegs
)) {
685 total_nsegs
+= nsegs
;
687 // Attach a new buffers' chain to the packet chain
689 last_seg
= new_last_seg
;
692 // Update the HEAD buffer with the packet info
693 head
->pkt_len
= p
.len();
694 head
->nb_segs
= total_nsegs
;
696 set_cluster_offload_info(p
, qp
, head
);
699 // If a packet hasn't been linearized already and the resulting
700 // cluster requires the linearisation due to HW limitation:
702 // - Recycle the cluster.
703 // - Linearize the packet.
704 // - Build the cluster once again
706 if (head
->nb_segs
> max_frags
||
707 (p
.nr_frags() > 1 && qp
.port().is_i40e_device() && i40e_should_linearize(head
)) ||
708 (p
.nr_frags() > vmxnet3_max_xmit_segment_frags
&& qp
.port().is_vmxnet3_device())) {
711 ++qp
._stats
.tx
.linearized
;
713 goto build_mbuf_cluster
;
716 me(last_seg
)->set_packet(std::move(p
));
722 * Copy the contents of the "packet" into the given cluster of
725 * @note Size of the cluster has to be big enough to accommodate all the
726 * contents of the given packet.
728 * @param p packet to copy
729 * @param head head of the rte_mbuf's cluster
731 static void copy_packet_to_cluster(const packet
& p
, rte_mbuf
* head
) {
732 rte_mbuf
* cur_seg
= head
;
733 size_t cur_seg_offset
= 0;
734 unsigned cur_frag_idx
= 0;
735 size_t cur_frag_offset
= 0;
738 size_t to_copy
= std::min(p
.frag(cur_frag_idx
).size
- cur_frag_offset
,
739 inline_mbuf_data_size
- cur_seg_offset
);
741 memcpy(rte_pktmbuf_mtod_offset(cur_seg
, void*, cur_seg_offset
),
742 p
.frag(cur_frag_idx
).base
+ cur_frag_offset
, to_copy
);
744 cur_frag_offset
+= to_copy
;
745 cur_seg_offset
+= to_copy
;
747 if (cur_frag_offset
>= p
.frag(cur_frag_idx
).size
) {
749 if (cur_frag_idx
>= p
.nr_frags()) {
751 // We are done - set the data size of the last segment
754 cur_seg
->data_len
= cur_seg_offset
;
761 if (cur_seg_offset
>= inline_mbuf_data_size
) {
762 cur_seg
->data_len
= inline_mbuf_data_size
;
763 cur_seg
= cur_seg
->next
;
766 // FIXME: assert in a fast-path - remove!!!
773 * Creates a tx_buf cluster representing a given packet in a "copy" way.
775 * @param p packet to translate
776 * @param qp dpdk_qp handle
778 * @return the HEAD tx_buf of the cluster or nullptr in case of a
781 static tx_buf
* from_packet_copy(packet
&& p
, dpdk_qp
& qp
) {
788 * Here we are going to use the fact that the inline data size is a
791 * We will first try to allocate the cluster and only if we are
792 * successful - we will go and copy the data.
794 auto aligned_len
= align_up((size_t)p
.len(), inline_mbuf_data_size
);
795 unsigned nsegs
= aligned_len
/ inline_mbuf_data_size
;
796 rte_mbuf
*head
= nullptr, *last_seg
= nullptr;
798 tx_buf
* buf
= qp
.get_tx_buf();
803 head
= buf
->rte_mbuf_p();
805 for (unsigned i
= 1; i
< nsegs
; i
++) {
806 buf
= qp
.get_tx_buf();
812 last_seg
->next
= buf
->rte_mbuf_p();
813 last_seg
= last_seg
->next
;
817 // If we've got here means that we have succeeded already!
818 // We only need to copy the data and set the head buffer with the
821 head
->pkt_len
= p
.len();
822 head
->nb_segs
= nsegs
;
824 copy_packet_to_cluster(p
, head
);
825 set_cluster_offload_info(p
, qp
, head
);
831 * Zero-copy handling of a single net::fragment.
833 * @param do_one_buf Functor responsible for a single rte_mbuf
835 * @param qp dpdk_qp handle (in)
836 * @param frag Fragment to copy (in)
837 * @param head Head of the cluster (out)
838 * @param last_seg Last segment of the cluster (out)
839 * @param nsegs Number of segments in the cluster (out)
841 * @return TRUE in case of success
843 template <class DoOneBufFunc
>
844 static bool do_one_frag(DoOneBufFunc do_one_buf
, dpdk_qp
& qp
,
845 fragment
& frag
, rte_mbuf
*& head
,
846 rte_mbuf
*& last_seg
, unsigned& nsegs
) {
847 size_t len
, left_to_set
= frag
.size
;
848 char* base
= frag
.base
;
852 // TODO: assert() in a fast path! Remove me ASAP!
855 // Create a HEAD of mbufs' cluster and set the first bytes into it
856 len
= do_one_buf(qp
, head
, base
, left_to_set
);
866 // Set the rest of the data into the new mbufs and chain them to
869 rte_mbuf
* prev_seg
= head
;
870 while (left_to_set
) {
871 len
= do_one_buf(qp
, m
, base
, left_to_set
);
885 // Return the last mbuf in the cluster
892 * Zero-copy handling of a single net::fragment.
894 * @param qp dpdk_qp handle (in)
895 * @param frag Fragment to copy (in)
896 * @param head Head of the cluster (out)
897 * @param last_seg Last segment of the cluster (out)
898 * @param nsegs Number of segments in the cluster (out)
900 * @return TRUE in case of success
902 static bool translate_one_frag(dpdk_qp
& qp
, fragment
& frag
,
903 rte_mbuf
*& head
, rte_mbuf
*& last_seg
,
905 return do_one_frag(set_one_data_buf
, qp
, frag
, head
,
910 * Copies one net::fragment into the cluster of rte_mbuf's.
912 * @param qp dpdk_qp handle (in)
913 * @param frag Fragment to copy (in)
914 * @param head Head of the cluster (out)
915 * @param last_seg Last segment of the cluster (out)
916 * @param nsegs Number of segments in the cluster (out)
918 * We return the "last_seg" to avoid traversing the cluster in order to get
921 * @return TRUE in case of success
923 static bool copy_one_frag(dpdk_qp
& qp
, fragment
& frag
,
924 rte_mbuf
*& head
, rte_mbuf
*& last_seg
,
926 return do_one_frag(copy_one_data_buf
, qp
, frag
, head
,
931 * Allocates a single rte_mbuf and sets it to point to a given data
934 * @param qp dpdk_qp handle (in)
935 * @param m New allocated rte_mbuf (out)
936 * @param va virtual address of a data buffer (in)
937 * @param buf_len length of the data to copy (in)
939 * @return The actual number of bytes that has been set in the mbuf
941 static size_t set_one_data_buf(
942 dpdk_qp
& qp
, rte_mbuf
*& m
, char* va
, size_t buf_len
) {
943 static constexpr size_t max_frag_len
= 15 * 1024; // 15K
946 // Currently we break a buffer on a 15K boundary because 82599
947 // devices have a 15.5K limitation on a maximum single fragment
950 rte_iova_t iova
= rte_mem_virt2iova(va
);
952 if (iova
== RTE_BAD_IOVA
) {
953 return copy_one_data_buf(qp
, m
, va
, buf_len
);
956 tx_buf
* buf
= qp
.get_tx_buf();
961 size_t len
= std::min(buf_len
, max_frag_len
);
963 buf
->set_zc_info(va
, iova
, len
);
964 m
= buf
->rte_mbuf_p();
970 * Allocates a single rte_mbuf and copies a given data into it.
972 * @param qp dpdk_qp handle (in)
973 * @param m New allocated rte_mbuf (out)
974 * @param data Data to copy from (in)
975 * @param buf_len length of the data to copy (in)
977 * @return The actual number of bytes that has been copied
979 static size_t copy_one_data_buf(
980 dpdk_qp
& qp
, rte_mbuf
*& m
, char* data
, size_t buf_len
)
982 tx_buf
* buf
= qp
.get_tx_buf();
987 size_t len
= std::min(buf_len
, inline_mbuf_data_size
);
989 m
= buf
->rte_mbuf_p();
995 qp
._stats
.tx
.good
.update_copy_stats(1, len
);
997 memcpy(rte_pktmbuf_mtod(m
, void*), data
, len
);
1003 tx_buf(tx_buf_factory
& fc
) : _fc(fc
) {
1005 _buf_iova
= _mbuf
.buf_iova
;
1006 _data_off
= _mbuf
.data_off
;
1009 rte_mbuf
* rte_mbuf_p() { return &_mbuf
; }
1011 void set_zc_info(void* va
, rte_iova_t iova
, size_t len
) {
1013 _mbuf
.data_len
= len
;
1014 _mbuf
.pkt_len
= len
;
1016 // Set the mbuf to point to our data
1017 _mbuf
.buf_addr
= va
;
1018 _mbuf
.buf_iova
= iova
;
1025 // If this mbuf was the last in a cluster and contains an
1026 // original packet object then call the destructor of the
1027 // original packet object.
1031 // Reset the std::optional. This in particular is going
1032 // to call the "packet"'s destructor and reset the
1033 // "optional" state to "nonengaged".
1037 } else if (!_is_zc
) {
1041 // Restore the rte_mbuf fields we trashed in set_zc_info()
1042 _mbuf
.buf_iova
= _buf_iova
;
1043 _mbuf
.buf_addr
= rte_mbuf_to_baddr(&_mbuf
);
1044 _mbuf
.data_off
= _data_off
;
1050 struct rte_mbuf
*m
= &_mbuf
, *m_next
;
1052 while (m
!= nullptr) {
1054 rte_pktmbuf_reset(m
);
1060 void set_packet(packet
&& p
) {
1065 struct rte_mbuf _mbuf
;
1066 MARKER private_start
;
1067 std::optional
<packet
> _p
;
1068 rte_iova_t _buf_iova
;
1070 // TRUE if underlying mbuf has been used in the zero-copy flow
1071 bool _is_zc
= false;
1072 // buffers' factory the buffer came from
1073 tx_buf_factory
& _fc
;
1077 class tx_buf_factory
{
1079 // Number of buffers to free in each GC iteration:
1080 // We want the buffers to be allocated from the mempool as many as
1083 // On the other hand if there is no Tx for some time we want the
1084 // completions to be eventually handled. Thus we choose the smallest
1085 // possible packets count number here.
1087 static constexpr int gc_count
= 1;
1089 tx_buf_factory(uint16_t qid
) {
1090 using namespace memory
;
1092 sstring name
= sstring(pktmbuf_pool_name
) + to_sstring(qid
) + "_tx";
1093 printf("Creating Tx mbuf pool '%s' [%u mbufs] ...\n",
1094 name
.c_str(), mbufs_per_queue_tx
);
1096 if (HugetlbfsMemBackend
) {
1099 _xmem
.reset(dpdk_qp::alloc_mempool_xmem(mbufs_per_queue_tx
,
1103 printf("Can't allocate a memory for Tx buffers\n");
1108 // We are going to push the buffers from the mempool into
1109 // the circular_buffer and then poll them from there anyway, so
1110 // we prefer to make a mempool non-atomic in this case.
1113 rte_mempool_create_empty(name
.c_str(),
1117 sizeof(struct rte_pktmbuf_pool_private
),
1118 rte_socket_id(), 0);
1120 rte_pktmbuf_pool_init(_pool
, nullptr);
1122 if (rte_mempool_populate_virt(_pool
, (char*)(_xmem
.get()),
1123 xmem_size
, page_size
,
1124 nullptr, nullptr) <= 0) {
1125 printf("Failed to populate mempool for Tx\n");
1129 rte_mempool_obj_iter(_pool
, rte_pktmbuf_init
, nullptr);
1134 rte_mempool_create(name
.c_str(),
1135 mbufs_per_queue_tx
, inline_mbuf_size
,
1137 sizeof(struct rte_pktmbuf_pool_private
),
1138 rte_pktmbuf_pool_init
, nullptr,
1139 rte_pktmbuf_init
, nullptr,
1140 rte_socket_id(), 0);
1144 printf("Failed to create mempool for Tx\n");
1149 // Fill the factory with the buffers from the mempool allocated
1156 * @note Should not be called if there are no free tx_buf's
1158 * @return a free tx_buf object
1161 // Take completed from the HW first
1162 tx_buf
*pkt
= get_one_completed();
1164 if (HugetlbfsMemBackend
) {
1172 // If there are no completed at the moment - take from the
1175 if (_ring
.empty()) {
1185 void put(tx_buf
* buf
) {
1186 if (HugetlbfsMemBackend
) {
1189 _ring
.push_back(buf
);
1193 for (int cnt
= 0; cnt
< gc_count
; ++cnt
) {
1194 auto tx_buf_p
= get_one_completed();
1206 * Fill the mbufs circular buffer: after this the _pool will become
1207 * empty. We will use it to catch the completed buffers:
1209 * - Underlying PMD drivers will "free" the mbufs once they are
1211 * - We will poll the _pktmbuf_pool_tx till it's empty and release
1212 * all the buffers from the freed mbufs.
1214 void init_factory() {
1215 while (rte_mbuf
* mbuf
= rte_pktmbuf_alloc(_pool
)) {
1216 _ring
.push_back(new(tx_buf::me(mbuf
)) tx_buf
{*this});
1221 * PMD puts the completed buffers back into the mempool they have
1222 * originally come from.
1224 * @note rte_pktmbuf_alloc() resets the mbuf so there is no need to call
1225 * rte_pktmbuf_reset() here again.
1227 * @return a single tx_buf that has been completed by HW.
1229 tx_buf
* get_one_completed() {
1230 return tx_buf::me(rte_pktmbuf_alloc(_pool
));
1234 std::vector
<tx_buf
*> _ring
;
1235 rte_mempool
* _pool
= nullptr;
1236 std::unique_ptr
<void, free_deleter
> _xmem
;
1240 explicit dpdk_qp(dpdk_device
* dev
, uint16_t qid
,
1241 const std::string stats_plugin_name
);
1243 virtual void rx_start() override
;
1244 virtual future
<> send(packet p
) override
{
1247 virtual ~dpdk_qp() {}
1249 virtual uint32_t send(circular_buffer
<packet
>& pb
) override
{
1250 if (HugetlbfsMemBackend
) {
1252 return _send(pb
, [&] (packet
&& p
) {
1253 return tx_buf::from_packet_zc(std::move(p
), *this);
1257 return _send(pb
, [&](packet
&& p
) {
1258 return tx_buf::from_packet_copy(std::move(p
), *this);
1263 dpdk_device
& port() const { return *_dev
; }
1264 tx_buf
* get_tx_buf() { return _tx_buf_factory
.get(); }
1267 template <class Func
>
1268 uint32_t _send(circular_buffer
<packet
>& pb
, Func packet_to_tx_buf_p
) {
1269 if (_tx_burst
.size() == 0) {
1270 for (auto&& p
: pb
) {
1271 // TODO: assert() in a fast path! Remove me ASAP!
1274 tx_buf
* buf
= packet_to_tx_buf_p(std::move(p
));
1279 _tx_burst
.push_back(buf
->rte_mbuf_p());
1283 uint16_t sent
= rte_eth_tx_burst(_dev
->port_idx(), _qid
,
1284 _tx_burst
.data() + _tx_burst_idx
,
1285 _tx_burst
.size() - _tx_burst_idx
);
1287 uint64_t nr_frags
= 0, bytes
= 0;
1289 for (int i
= 0; i
< sent
; i
++) {
1290 rte_mbuf
* m
= _tx_burst
[_tx_burst_idx
+ i
];
1291 bytes
+= m
->pkt_len
;
1292 nr_frags
+= m
->nb_segs
;
1296 _stats
.tx
.good
.update_frags_stats(nr_frags
, bytes
);
1298 _tx_burst_idx
+= sent
;
1300 if (_tx_burst_idx
== _tx_burst
.size()) {
1309 * Allocate a new data buffer and set the mbuf to point to it.
1311 * Do some DPDK hacks to work on PMD: it assumes that the buf_addr
1312 * points to the private data of RTE_PKTMBUF_HEADROOM before the actual
1315 * @param m mbuf to update
1317 static bool refill_rx_mbuf(rte_mbuf
* m
, size_t size
= mbuf_data_size
) {
1320 if (posix_memalign((void**)&data
, size
, size
)) {
1324 rte_iova_t iova
= rte_mem_virt2iova(data
);
1327 // Set the mbuf to point to our data.
1329 // Do some DPDK hacks to work on PMD: it assumes that the buf_addr
1330 // points to the private data of RTE_PKTMBUF_HEADROOM before the
1331 // actual data buffer.
1333 m
->buf_addr
= data
- RTE_PKTMBUF_HEADROOM
;
1334 m
->buf_iova
= iova
- RTE_PKTMBUF_HEADROOM
;
1338 static bool init_noninline_rx_mbuf(rte_mbuf
* m
,
1339 size_t size
= mbuf_data_size
) {
1340 if (!refill_rx_mbuf(m
, size
)) {
1343 // The below fields stay constant during the execution.
1344 m
->buf_len
= size
+ RTE_PKTMBUF_HEADROOM
;
1345 m
->data_off
= RTE_PKTMBUF_HEADROOM
;
1349 bool init_rx_mbuf_pool();
1352 bool refill_one_cluster(rte_mbuf
* head
);
1355 * Allocates a memory chunk to accommodate the given number of buffers of
1356 * the given size and fills a vector with underlying physical pages.
1358 * The chunk is going to be used as an external memory buffer of the DPDK
1361 * The chunk size if calculated using get_mempool_xmem_size() function.
1363 * @param num_bufs Number of buffers (in)
1364 * @param buf_sz Size of each buffer (in)
1365 * @param xmem_size Size of allocated memory chunk (out)
1367 * @return a virtual address of the allocated memory chunk or nullptr in
1368 * case of a failure.
1370 static void* alloc_mempool_xmem(uint16_t num_bufs
, uint16_t buf_sz
,
1374 * Polls for a burst of incoming packets. This function will not block and
1375 * will immediately return after processing all available packets.
1378 bool poll_rx_once();
1381 * Translates an rte_mbuf's into net::packet and feeds them to _rx_stream.
1383 * @param bufs An array of received rte_mbuf's
1384 * @param count Number of buffers in the bufs[]
1386 void process_packets(struct rte_mbuf
**bufs
, uint16_t count
);
1389 * Translate rte_mbuf into the "packet".
1390 * @param m mbuf to translate
1392 * @return a "optional" object representing the newly received data if in an
1393 * "engaged" state or an error if in a "disengaged" state.
1395 std::optional
<packet
> from_mbuf(rte_mbuf
* m
);
1398 * Transform an LRO rte_mbuf cluster into the "packet" object.
1399 * @param m HEAD of the mbufs' cluster to transform
1401 * @return a "optional" object representing the newly received LRO packet if
1402 * in an "engaged" state or an error if in a "disengaged" state.
1404 std::optional
<packet
> from_mbuf_lro(rte_mbuf
* m
);
1409 rte_mempool
*_pktmbuf_pool_rx
;
1410 std::vector
<rte_mbuf
*> _rx_free_pkts
;
1411 std::vector
<rte_mbuf
*> _rx_free_bufs
;
1412 std::vector
<fragment
> _frags
;
1413 std::vector
<char*> _bufs
;
1414 size_t _num_rx_free_segs
= 0;
1415 reactor::poller _rx_gc_poller
;
1416 std::unique_ptr
<void, free_deleter
> _rx_xmem
;
1417 tx_buf_factory _tx_buf_factory
;
1418 std::optional
<reactor::poller
> _rx_poller
;
1419 reactor::poller _tx_gc_poller
;
1420 std::vector
<rte_mbuf
*> _tx_burst
;
1421 uint16_t _tx_burst_idx
= 0;
1422 static constexpr phys_addr_t page_mask
= ~(memory::page_size
- 1);
1425 int dpdk_device::init_port_start()
1427 assert(_port_idx
< rte_eth_dev_count_avail());
1429 rte_eth_dev_info_get(_port_idx
, &_dev_info
);
1432 // This is a workaround for a missing handling of a HW limitation in the
1433 // DPDK i40e driver. This and all related to _is_i40e_device code should be
1434 // removed once this handling is added.
1436 if (sstring("rte_i40evf_pmd") == _dev_info
.driver_name
||
1437 sstring("rte_i40e_pmd") == _dev_info
.driver_name
) {
1438 printf("Device is an Intel's 40G NIC. Enabling 8 fragments hack!\n");
1439 _is_i40e_device
= true;
1442 if (std::string("rte_vmxnet3_pmd") == _dev_info
.driver_name
) {
1443 printf("Device is a VMWare Virtual NIC. Enabling 16 fragments hack!\n");
1444 _is_vmxnet3_device
= true;
1448 // Another workaround: this time for a lack of number of RSS bits.
1449 // ixgbe PF NICs support up to 16 RSS queues.
1450 // ixgbe VF NICs support up to 4 RSS queues.
1451 // i40e PF NICs support up to 64 RSS queues.
1452 // i40e VF NICs support up to 16 RSS queues.
1454 if (sstring("rte_ixgbe_pmd") == _dev_info
.driver_name
) {
1455 _dev_info
.max_rx_queues
= std::min(_dev_info
.max_rx_queues
, (uint16_t)16);
1456 } else if (sstring("rte_ixgbevf_pmd") == _dev_info
.driver_name
) {
1457 _dev_info
.max_rx_queues
= std::min(_dev_info
.max_rx_queues
, (uint16_t)4);
1458 } else if (sstring("rte_i40e_pmd") == _dev_info
.driver_name
) {
1459 _dev_info
.max_rx_queues
= std::min(_dev_info
.max_rx_queues
, (uint16_t)64);
1460 } else if (sstring("rte_i40evf_pmd") == _dev_info
.driver_name
) {
1461 _dev_info
.max_rx_queues
= std::min(_dev_info
.max_rx_queues
, (uint16_t)16);
1464 // Hardware offload capabilities
1465 // https://github.com/DPDK/dpdk/blob/v19.05/lib/librte_ethdev/rte_ethdev.h#L993-L1074
1467 // We want to support all available offload features
1468 // TODO: below features are implemented in 17.05, should support new ones
1469 const uint64_t tx_offloads_wanted
=
1470 DEV_TX_OFFLOAD_VLAN_INSERT
|
1471 DEV_TX_OFFLOAD_IPV4_CKSUM
|
1472 DEV_TX_OFFLOAD_UDP_CKSUM
|
1473 DEV_TX_OFFLOAD_TCP_CKSUM
|
1474 DEV_TX_OFFLOAD_SCTP_CKSUM
|
1475 DEV_TX_OFFLOAD_TCP_TSO
|
1476 DEV_TX_OFFLOAD_UDP_TSO
|
1477 DEV_TX_OFFLOAD_OUTER_IPV4_CKSUM
|
1478 DEV_TX_OFFLOAD_QINQ_INSERT
|
1479 DEV_TX_OFFLOAD_VXLAN_TNL_TSO
|
1480 DEV_TX_OFFLOAD_GRE_TNL_TSO
|
1481 DEV_TX_OFFLOAD_IPIP_TNL_TSO
|
1482 DEV_TX_OFFLOAD_GENEVE_TNL_TSO
|
1483 DEV_TX_OFFLOAD_MACSEC_INSERT
;
1485 _dev_info
.default_txconf
.offloads
=
1486 _dev_info
.tx_offload_capa
& tx_offloads_wanted
;
1488 /* for port configuration all features are off by default */
1489 rte_eth_conf port_conf
= { 0 };
1491 /* setting tx offloads for port */
1492 port_conf
.txmode
.offloads
= _dev_info
.default_txconf
.offloads
;
1494 printf("Port %d: max_rx_queues %d max_tx_queues %d\n",
1495 _port_idx
, _dev_info
.max_rx_queues
, _dev_info
.max_tx_queues
);
1497 _num_queues
= std::min({_num_queues
, _dev_info
.max_rx_queues
, _dev_info
.max_tx_queues
});
1499 printf("Port %d: using %d %s\n", _port_idx
, _num_queues
,
1500 (_num_queues
> 1) ? "queues" : "queue");
1502 // Set RSS mode: enable RSS if seastar is configured with more than 1 CPU.
1503 // Even if port has a single queue we still want the RSS feature to be
1504 // available in order to make HW calculate RSS hash for us.
1505 if (smp::count
> 1) {
1506 if (_dev_info
.hash_key_size
== 40) {
1507 _rss_key
= default_rsskey_40bytes
;
1508 } else if (_dev_info
.hash_key_size
== 52) {
1509 _rss_key
= default_rsskey_52bytes
;
1510 } else if (_dev_info
.hash_key_size
!= 0) {
1512 rte_exit(EXIT_FAILURE
,
1513 "Port %d: We support only 40 or 52 bytes RSS hash keys, %d bytes key requested",
1514 _port_idx
, _dev_info
.hash_key_size
);
1516 _rss_key
= default_rsskey_40bytes
;
1517 _dev_info
.hash_key_size
= 40;
1520 port_conf
.rxmode
.mq_mode
= ETH_MQ_RX_RSS
;
1521 /* enable all supported rss offloads */
1522 port_conf
.rx_adv_conf
.rss_conf
.rss_hf
= _dev_info
.flow_type_rss_offloads
;
1523 if (_dev_info
.hash_key_size
) {
1524 port_conf
.rx_adv_conf
.rss_conf
.rss_key
= const_cast<uint8_t *>(_rss_key
.data());
1525 port_conf
.rx_adv_conf
.rss_conf
.rss_key_len
= _dev_info
.hash_key_size
;
1528 port_conf
.rxmode
.mq_mode
= ETH_MQ_RX_NONE
;
1531 if (_num_queues
> 1) {
1532 if (_dev_info
.reta_size
) {
1533 // RETA size should be a power of 2
1534 assert((_dev_info
.reta_size
& (_dev_info
.reta_size
- 1)) == 0);
1536 // Set the RSS table to the correct size
1537 _redir_table
.resize(_dev_info
.reta_size
);
1538 _rss_table_bits
= std::lround(std::log2(_dev_info
.reta_size
));
1539 printf("Port %d: RSS table size is %d\n",
1540 _port_idx
, _dev_info
.reta_size
);
1542 _rss_table_bits
= std::lround(std::log2(_dev_info
.max_rx_queues
));
1545 _redir_table
.push_back(0);
1548 // Set Rx VLAN stripping
1549 if (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_VLAN_STRIP
) {
1550 port_conf
.rxmode
.offloads
|= DEV_RX_OFFLOAD_VLAN_STRIP
;
1553 #ifdef RTE_ETHDEV_HAS_LRO_SUPPORT
1555 if (_use_lro
&& (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_TCP_LRO
)) {
1556 printf("LRO is on\n");
1557 port_conf
.rxmode
.offloads
|= DEV_RX_OFFLOAD_TCP_LRO
;
1558 _hw_features
.rx_lro
= true;
1561 printf("LRO is off\n");
1563 // Check that all CSUM features are either all set all together or not set
1564 // all together. If this assumption breaks we need to rework the below logic
1565 // by splitting the csum offload feature bit into separate bits for IPv4,
1567 assert(((_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_IPV4_CKSUM
) &&
1568 (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_UDP_CKSUM
) &&
1569 (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_TCP_CKSUM
)) ||
1570 (!(_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_IPV4_CKSUM
) &&
1571 !(_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_UDP_CKSUM
) &&
1572 !(_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_TCP_CKSUM
)));
1574 // Set Rx checksum checking
1575 if ( (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_IPV4_CKSUM
) &&
1576 (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_UDP_CKSUM
) &&
1577 (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_TCP_CKSUM
)) {
1578 printf("RX checksum offload supported\n");
1579 port_conf
.rxmode
.offloads
|= DEV_RX_OFFLOAD_CHECKSUM
;
1580 _hw_features
.rx_csum_offload
= 1;
1583 if ((_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_IPV4_CKSUM
)) {
1584 printf("TX ip checksum offload supported\n");
1585 _hw_features
.tx_csum_ip_offload
= 1;
1588 // TSO is supported starting from DPDK v1.8
1589 if (_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_TCP_TSO
) {
1590 printf("TSO is supported\n");
1591 _hw_features
.tx_tso
= 1;
1594 // There is no UFO support in the PMDs yet.
1596 if (_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_UDP_TSO
) {
1597 printf("UFO is supported\n");
1598 _hw_features
.tx_ufo
= 1;
1602 // Check that Tx TCP and UDP CSUM features are either all set all together
1603 // or not set all together. If this assumption breaks we need to rework the
1604 // below logic by splitting the csum offload feature bit into separate bits
1606 assert(((_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_UDP_CKSUM
) &&
1607 (_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_TCP_CKSUM
)) ||
1608 (!(_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_UDP_CKSUM
) &&
1609 !(_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_TCP_CKSUM
)));
1611 if ( (_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_UDP_CKSUM
) &&
1612 (_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_TCP_CKSUM
)) {
1613 printf("TX TCP&UDP checksum offload supported\n");
1614 _hw_features
.tx_csum_l4_offload
= 1;
1619 printf("Port %u init ... ", _port_idx
);
1623 * Standard DPDK port initialisation - config port, then set up
1626 if ((retval
= rte_eth_dev_configure(_port_idx
, _num_queues
, _num_queues
,
1627 &port_conf
)) != 0) {
1631 //rte_eth_promiscuous_enable(port_num);
1637 void dpdk_device::set_hw_flow_control()
1639 // Read the port's current/default flow control settings
1640 struct rte_eth_fc_conf fc_conf
;
1641 auto ret
= rte_eth_dev_flow_ctrl_get(_port_idx
, &fc_conf
);
1643 if (ret
== -ENOTSUP
) {
1648 rte_exit(EXIT_FAILURE
, "Port %u: failed to get hardware flow control settings: (error %d)\n", _port_idx
, ret
);
1652 fc_conf
.mode
= RTE_FC_FULL
;
1654 fc_conf
.mode
= RTE_FC_NONE
;
1657 ret
= rte_eth_dev_flow_ctrl_set(_port_idx
, &fc_conf
);
1658 if (ret
== -ENOTSUP
) {
1663 rte_exit(EXIT_FAILURE
, "Port %u: failed to set hardware flow control (error %d)\n", _port_idx
, ret
);
1666 printf("Port %u: %s HW FC\n", _port_idx
,
1667 (_enable_fc
? "Enabling" : "Disabling"));
1671 printf("Port %u: Changing HW FC settings is not supported\n", _port_idx
);
1674 void dpdk_device::init_port_fini()
1676 // Changing FC requires HW reset, so set it before the port is initialized.
1677 set_hw_flow_control();
1679 if (rte_eth_dev_start(_port_idx
) < 0) {
1680 rte_exit(EXIT_FAILURE
, "Cannot start port %d\n", _port_idx
);
1683 /* need to defer initialize xstats since NIC specific xstat entries
1684 show up only after port initization */
1687 _stats_collector
.set_callback([&] {
1688 rte_eth_stats rte_stats
= {};
1689 int rc
= rte_eth_stats_get(_port_idx
, &rte_stats
);
1692 printf("Failed to get port statistics: %s\n", strerror(rc
));
1695 _stats
.rx
.good
.mcast
=
1696 _xstats
.get_value(dpdk_xstats::xstat_id::rx_multicast_packets
);
1697 _stats
.rx
.good
.pause_xon
=
1698 _xstats
.get_value(dpdk_xstats::xstat_id::rx_xon_packets
);
1699 _stats
.rx
.good
.pause_xoff
=
1700 _xstats
.get_value(dpdk_xstats::xstat_id::rx_xoff_packets
);
1703 _xstats
.get_value(dpdk_xstats::xstat_id::rx_crc_errors
);
1705 _xstats
.get_value(dpdk_xstats::xstat_id::rx_length_errors
) +
1706 _xstats
.get_value(dpdk_xstats::xstat_id::rx_undersize_errors
) +
1707 _xstats
.get_value(dpdk_xstats::xstat_id::rx_oversize_errors
);
1708 _stats
.rx
.bad
.total
= rte_stats
.ierrors
;
1710 _stats
.tx
.good
.pause_xon
=
1711 _xstats
.get_value(dpdk_xstats::xstat_id::tx_xon_packets
);
1712 _stats
.tx
.good
.pause_xoff
=
1713 _xstats
.get_value(dpdk_xstats::xstat_id::tx_xoff_packets
);
1715 _stats
.tx
.bad
.total
= rte_stats
.oerrors
;
1718 // TODO: replace deprecated filter api with generic flow api
1719 #pragma GCC diagnostic push
1720 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1721 if (_num_queues
> 1) {
1722 if (!rte_eth_dev_filter_supported(_port_idx
, RTE_ETH_FILTER_HASH
)) {
1723 printf("Port %d: HASH FILTER configuration is supported\n", _port_idx
);
1725 // Setup HW touse the TOEPLITZ hash function as an RSS hash function
1726 struct rte_eth_hash_filter_info info
= {};
1728 info
.info_type
= RTE_ETH_HASH_FILTER_GLOBAL_CONFIG
;
1729 info
.info
.global_conf
.hash_func
= RTE_ETH_HASH_FUNCTION_TOEPLITZ
;
1731 if (rte_eth_dev_filter_ctrl(_port_idx
, RTE_ETH_FILTER_HASH
,
1732 RTE_ETH_FILTER_SET
, &info
) < 0) {
1733 rte_exit(EXIT_FAILURE
, "Cannot set hash function on a port %d\n", _port_idx
);
1739 #pragma GCC diagnostic pop
1742 check_port_link_status();
1744 printf("Created DPDK device\n");
1747 template <bool HugetlbfsMemBackend
>
1748 void* dpdk_qp
<HugetlbfsMemBackend
>::alloc_mempool_xmem(
1749 uint16_t num_bufs
, uint16_t buf_sz
, size_t& xmem_size
)
1751 using namespace memory
;
1753 struct rte_mempool_objsz mp_obj_sz
= {};
1755 rte_mempool_calc_obj_size(buf_sz
, 0, &mp_obj_sz
);
1758 get_mempool_xmem_size(num_bufs
,
1759 mp_obj_sz
.elt_size
+ mp_obj_sz
.header_size
+
1760 mp_obj_sz
.trailer_size
,
1763 // Aligning to 2M causes the further failure in small allocations.
1764 // TODO: Check why - and fix.
1765 if (posix_memalign((void**)&xmem
, page_size
, xmem_size
)) {
1766 printf("Can't allocate %ld bytes aligned to %ld\n",
1767 xmem_size
, page_size
);
1774 template <bool HugetlbfsMemBackend
>
1775 bool dpdk_qp
<HugetlbfsMemBackend
>::init_rx_mbuf_pool()
1777 using namespace memory
;
1778 sstring name
= sstring(pktmbuf_pool_name
) + to_sstring(_qid
) + "_rx";
1780 printf("Creating Rx mbuf pool '%s' [%u mbufs] ...\n",
1781 name
.c_str(), mbufs_per_queue_rx
);
1784 // If we have a hugetlbfs memory backend we may perform a virt2phys
1785 // translation and memory is "pinned". Therefore we may provide an external
1786 // memory for DPDK pools and this way significantly reduce the memory needed
1787 // for the DPDK in this case.
1789 if (HugetlbfsMemBackend
) {
1792 _rx_xmem
.reset(alloc_mempool_xmem(mbufs_per_queue_rx
, mbuf_overhead
,
1794 if (!_rx_xmem
.get()) {
1795 printf("Can't allocate a memory for Rx buffers\n");
1800 // Don't pass single-producer/single-consumer flags to mbuf create as it
1801 // seems faster to use a cache instead.
1803 struct rte_pktmbuf_pool_private roomsz
= {};
1804 roomsz
.mbuf_data_room_size
= mbuf_data_size
+ RTE_PKTMBUF_HEADROOM
;
1806 rte_mempool_create_empty(name
.c_str(),
1807 mbufs_per_queue_rx
, mbuf_overhead
,
1809 sizeof(struct rte_pktmbuf_pool_private
),
1810 rte_socket_id(), 0);
1811 if (!_pktmbuf_pool_rx
) {
1812 printf("Failed to create mempool for Rx\n");
1816 rte_pktmbuf_pool_init(_pktmbuf_pool_rx
, as_cookie(roomsz
));
1818 if (rte_mempool_populate_virt(_pktmbuf_pool_rx
,
1819 (char*)(_rx_xmem
.get()), xmem_size
,
1821 nullptr, nullptr) < 0) {
1822 printf("Failed to populate mempool for Rx\n");
1826 rte_mempool_obj_iter(_pktmbuf_pool_rx
, rte_pktmbuf_init
, nullptr);
1828 // reserve the memory for Rx buffers containers
1829 _rx_free_pkts
.reserve(mbufs_per_queue_rx
);
1830 _rx_free_bufs
.reserve(mbufs_per_queue_rx
);
1833 // 1) Pull all entries from the pool.
1834 // 2) Bind data buffers to each of them.
1835 // 3) Return them back to the pool.
1837 for (int i
= 0; i
< mbufs_per_queue_rx
; i
++) {
1838 rte_mbuf
* m
= rte_pktmbuf_alloc(_pktmbuf_pool_rx
);
1840 _rx_free_bufs
.push_back(m
);
1843 for (auto&& m
: _rx_free_bufs
) {
1844 if (!init_noninline_rx_mbuf(m
)) {
1845 printf("Failed to allocate data buffers for Rx ring. "
1846 "Consider increasing the amount of memory.\n");
1851 rte_mempool_put_bulk(_pktmbuf_pool_rx
, (void**)_rx_free_bufs
.data(),
1852 _rx_free_bufs
.size());
1854 _rx_free_bufs
.clear();
1856 struct rte_pktmbuf_pool_private roomsz
= {};
1857 roomsz
.mbuf_data_room_size
= inline_mbuf_data_size
+ RTE_PKTMBUF_HEADROOM
;
1859 rte_mempool_create(name
.c_str(),
1860 mbufs_per_queue_rx
, inline_mbuf_size
,
1862 sizeof(struct rte_pktmbuf_pool_private
),
1863 rte_pktmbuf_pool_init
, as_cookie(roomsz
),
1864 rte_pktmbuf_init
, nullptr,
1865 rte_socket_id(), 0);
1868 return _pktmbuf_pool_rx
!= nullptr;
1871 // Map DMA address explicitly.
1872 // XXX: does NOT work with Mellanox NICs as they use IB libs instead of VFIO.
1873 template <bool HugetlbfsMemBackend
>
1874 bool dpdk_qp
<HugetlbfsMemBackend
>::map_dma()
1876 auto m
= memory::get_memory_layout();
1877 rte_iova_t iova
= rte_mem_virt2iova((const void*)m
.start
);
1879 return rte_vfio_dma_map(m
.start
, iova
, m
.end
- m
.start
) == 0;
1882 void dpdk_device::check_port_link_status()
1884 using namespace std::literals::chrono_literals
;
1886 constexpr auto check_interval
= 100ms
;
1888 std::cout
<< "\nChecking link status " << std::endl
;
1889 auto t
= new timer
<>;
1890 t
->set_callback([this, count
, t
] () mutable {
1891 const int max_check_time
= 90; /* 9s (90 * 100ms) in total */
1892 struct rte_eth_link link
;
1893 memset(&link
, 0, sizeof(link
));
1894 rte_eth_link_get_nowait(_port_idx
, &link
);
1896 if (link
.link_status
) {
1898 "done\nPort " << static_cast<unsigned>(_port_idx
) <<
1899 " Link Up - speed " << link
.link_speed
<<
1900 " Mbps - " << ((link
.link_duplex
== ETH_LINK_FULL_DUPLEX
) ?
1901 ("full-duplex") : ("half-duplex\n")) <<
1903 _link_ready_promise
.set_value();
1905 // We may start collecting statistics only after the Link is UP.
1906 _stats_collector
.arm_periodic(2s
);
1907 } else if (count
++ < max_check_time
) {
1908 std::cout
<< "." << std::flush
;
1911 std::cout
<< "done\nPort " << _port_idx
<< " Link Down" << std::endl
;
1916 t
->arm_periodic(check_interval
);
1919 // This function uses offsetof with non POD types.
1920 #pragma GCC diagnostic push
1921 #pragma GCC diagnostic ignored "-Winvalid-offsetof"
1923 template <bool HugetlbfsMemBackend
>
1924 dpdk_qp
<HugetlbfsMemBackend
>::dpdk_qp(dpdk_device
* dev
, uint16_t qid
,
1925 const std::string stats_plugin_name
)
1926 : qp(true, stats_plugin_name
, qid
), _dev(dev
), _qid(qid
),
1927 _rx_gc_poller(reactor::poller::simple([&] { return rx_gc(); })),
1928 _tx_buf_factory(qid
),
1929 _tx_gc_poller(reactor::poller::simple([&] { return _tx_buf_factory
.gc(); }))
1931 if (!init_rx_mbuf_pool()) {
1932 rte_exit(EXIT_FAILURE
, "Cannot initialize mbuf pools\n");
1935 if (HugetlbfsMemBackend
&& !map_dma()) {
1936 rte_exit(EXIT_FAILURE
, "Cannot map DMA\n");
1939 static_assert(offsetof(class tx_buf
, private_end
) -
1940 offsetof(class tx_buf
, private_start
) <= RTE_PKTMBUF_HEADROOM
,
1941 "RTE_PKTMBUF_HEADROOM is less than dpdk_qp::tx_buf size! "
1942 "Increase the headroom size in the DPDK configuration");
1943 static_assert(offsetof(class tx_buf
, _mbuf
) == 0,
1944 "There is a pad at the beginning of the tx_buf before _mbuf "
1946 static_assert((inline_mbuf_data_size
& (inline_mbuf_data_size
- 1)) == 0,
1947 "inline_mbuf_data_size has to be a power of two!");
1949 if (rte_eth_rx_queue_setup(_dev
->port_idx(), _qid
, default_ring_size
,
1950 rte_eth_dev_socket_id(_dev
->port_idx()),
1951 _dev
->def_rx_conf(), _pktmbuf_pool_rx
) < 0) {
1952 rte_exit(EXIT_FAILURE
, "Cannot initialize rx queue\n");
1955 if (rte_eth_tx_queue_setup(_dev
->port_idx(), _qid
, default_ring_size
,
1956 rte_eth_dev_socket_id(_dev
->port_idx()), _dev
->def_tx_conf()) < 0) {
1957 rte_exit(EXIT_FAILURE
, "Cannot initialize tx queue\n");
1960 // Register error statistics: Rx total and checksum errors
1961 namespace sm
= seastar::metrics
;
1962 _metrics
.add_group(_stats_plugin_name
, {
1963 sm::make_derive(_queue_name
+ "_rx_csum_errors", _stats
.rx
.bad
.csum
,
1964 sm::description("Counts a number of packets received by this queue that have a bad CSUM value. "
1965 "A non-zero value of this metric usually indicates a HW issue, e.g. a bad cable.")),
1967 sm::make_derive(_queue_name
+ "_rx_errors", _stats
.rx
.bad
.total
,
1968 sm::description("Counts a total number of errors in the ingress path for this queue: CSUM errors, etc.")),
1970 sm::make_derive(_queue_name
+ "_rx_no_memory_errors", _stats
.rx
.bad
.no_mem
,
1971 sm::description("Counts a number of ingress packets received by this HW queue but dropped by the SW due to low memory. "
1972 "A non-zero value indicates that seastar doesn't have enough memory to handle the packet reception or the memory is too fragmented.")),
1976 #pragma GCC diagnostic pop
1978 template <bool HugetlbfsMemBackend
>
1979 void dpdk_qp
<HugetlbfsMemBackend
>::rx_start() {
1980 _rx_poller
= reactor::poller::simple([&] { return poll_rx_once(); });
1984 inline std::optional
<packet
>
1985 dpdk_qp
<false>::from_mbuf_lro(rte_mbuf
* m
)
1988 // Try to allocate a buffer for the whole packet's data.
1989 // If we fail - construct the packet from mbufs.
1990 // If we succeed - copy the data into this buffer, create a packet based on
1991 // this buffer and return the mbuf to its pool.
1993 auto pkt_len
= rte_pktmbuf_pkt_len(m
);
1994 char* buf
= (char*)malloc(pkt_len
);
1996 // Copy the contents of the packet into the buffer we've just allocated
1998 for (rte_mbuf
* m1
= m
; m1
!= nullptr; m1
= m1
->next
) {
1999 char* data
= rte_pktmbuf_mtod(m1
, char*);
2000 auto len
= rte_pktmbuf_data_len(m1
);
2002 rte_memcpy(buf
+ offset
, data
, len
);
2006 rte_pktmbuf_free(m
);
2008 return packet(fragment
{buf
, pkt_len
}, make_free_deleter(buf
));
2011 // Drop if allocation failed
2012 rte_pktmbuf_free(m
);
2014 return std::nullopt
;
2018 inline std::optional
<packet
>
2019 dpdk_qp
<false>::from_mbuf(rte_mbuf
* m
)
2021 if (!_dev
->hw_features_ref().rx_lro
|| rte_pktmbuf_is_contiguous(m
)) {
2023 // Try to allocate a buffer for packet's data. If we fail - give the
2024 // application an mbuf itself. If we succeed - copy the data into this
2025 // buffer, create a packet based on this buffer and return the mbuf to
2028 auto len
= rte_pktmbuf_data_len(m
);
2029 char* buf
= (char*)malloc(len
);
2032 // Drop if allocation failed
2033 rte_pktmbuf_free(m
);
2035 return std::nullopt
;
2037 rte_memcpy(buf
, rte_pktmbuf_mtod(m
, char*), len
);
2038 rte_pktmbuf_free(m
);
2040 return packet(fragment
{buf
, len
}, make_free_deleter(buf
));
2043 return from_mbuf_lro(m
);
2048 inline std::optional
<packet
>
2049 dpdk_qp
<true>::from_mbuf_lro(rte_mbuf
* m
)
2054 for (; m
!= nullptr; m
= m
->next
) {
2055 char* data
= rte_pktmbuf_mtod(m
, char*);
2057 _frags
.emplace_back(fragment
{data
, rte_pktmbuf_data_len(m
)});
2058 _bufs
.push_back(data
);
2061 return packet(_frags
.begin(), _frags
.end(),
2062 make_deleter(deleter(),
2063 [bufs_vec
= std::move(_bufs
)] {
2064 for (auto&& b
: bufs_vec
) {
2071 inline std::optional
<packet
> dpdk_qp
<true>::from_mbuf(rte_mbuf
* m
)
2073 _rx_free_pkts
.push_back(m
);
2074 _num_rx_free_segs
+= m
->nb_segs
;
2076 if (!_dev
->hw_features_ref().rx_lro
|| rte_pktmbuf_is_contiguous(m
)) {
2077 char* data
= rte_pktmbuf_mtod(m
, char*);
2079 return packet(fragment
{data
, rte_pktmbuf_data_len(m
)},
2080 make_free_deleter(data
));
2082 return from_mbuf_lro(m
);
2086 template <bool HugetlbfsMemBackend
>
2087 inline bool dpdk_qp
<HugetlbfsMemBackend
>::refill_one_cluster(rte_mbuf
* head
)
2089 for (; head
!= nullptr; head
= head
->next
) {
2090 if (!refill_rx_mbuf(head
)) {
2092 // If we failed to allocate a new buffer - push the rest of the
2093 // cluster back to the free_packets list for a later retry.
2095 _rx_free_pkts
.push_back(head
);
2098 _rx_free_bufs
.push_back(head
);
2104 template <bool HugetlbfsMemBackend
>
2105 bool dpdk_qp
<HugetlbfsMemBackend
>::rx_gc()
2107 if (_num_rx_free_segs
>= rx_gc_thresh
) {
2108 while (!_rx_free_pkts
.empty()) {
2110 // Use back() + pop_back() semantics to avoid an extra
2111 // _rx_free_pkts.clear() at the end of the function - clear() has a
2112 // linear complexity.
2114 auto m
= _rx_free_pkts
.back();
2115 _rx_free_pkts
.pop_back();
2117 if (!refill_one_cluster(m
)) {
2122 if (_rx_free_bufs
.size()) {
2123 rte_mempool_put_bulk(_pktmbuf_pool_rx
,
2124 (void **)_rx_free_bufs
.data(),
2125 _rx_free_bufs
.size());
2127 // TODO: assert() in a fast path! Remove me ASAP!
2128 assert(_num_rx_free_segs
>= _rx_free_bufs
.size());
2130 _num_rx_free_segs
-= _rx_free_bufs
.size();
2131 _rx_free_bufs
.clear();
2133 // TODO: assert() in a fast path! Remove me ASAP!
2134 assert((_rx_free_pkts
.empty() && !_num_rx_free_segs
) ||
2135 (!_rx_free_pkts
.empty() && _num_rx_free_segs
));
2139 return _num_rx_free_segs
>= rx_gc_thresh
;
2143 template <bool HugetlbfsMemBackend
>
2144 void dpdk_qp
<HugetlbfsMemBackend
>::process_packets(
2145 struct rte_mbuf
**bufs
, uint16_t count
)
2147 uint64_t nr_frags
= 0, bytes
= 0;
2149 for (uint16_t i
= 0; i
< count
; i
++) {
2150 struct rte_mbuf
*m
= bufs
[i
];
2153 std::optional
<packet
> p
= from_mbuf(m
);
2155 // Drop the packet if translation above has failed
2157 _stats
.rx
.bad
.inc_no_mem();
2161 nr_frags
+= m
->nb_segs
;
2162 bytes
+= m
->pkt_len
;
2164 // Set stripped VLAN value if available
2165 if ((m
->ol_flags
& PKT_RX_VLAN_STRIPPED
) &&
2166 (m
->ol_flags
& PKT_RX_VLAN
)) {
2168 oi
.vlan_tci
= m
->vlan_tci
;
2171 if (_dev
->hw_features().rx_csum_offload
) {
2172 if (m
->ol_flags
& (PKT_RX_IP_CKSUM_BAD
| PKT_RX_L4_CKSUM_BAD
)) {
2173 // Packet with bad checksum, just drop it.
2174 _stats
.rx
.bad
.inc_csum_err();
2177 // Note that when _hw_features.rx_csum_offload is on, the receive
2178 // code for ip, tcp and udp will assume they don't need to check
2179 // the checksum again, because we did this here.
2182 (*p
).set_offload_info(oi
);
2183 if (m
->ol_flags
& PKT_RX_RSS_HASH
) {
2184 (*p
).set_rss_hash(m
->hash
.rss
);
2187 _dev
->l2receive(std::move(*p
));
2190 _stats
.rx
.good
.update_pkts_bunch(count
);
2191 _stats
.rx
.good
.update_frags_stats(nr_frags
, bytes
);
2193 if (!HugetlbfsMemBackend
) {
2194 _stats
.rx
.good
.copy_frags
= _stats
.rx
.good
.nr_frags
;
2195 _stats
.rx
.good
.copy_bytes
= _stats
.rx
.good
.bytes
;
2199 template <bool HugetlbfsMemBackend
>
2200 bool dpdk_qp
<HugetlbfsMemBackend
>::poll_rx_once()
2202 struct rte_mbuf
*buf
[packet_read_size
];
2205 uint16_t rx_count
= rte_eth_rx_burst(_dev
->port_idx(), _qid
,
2206 buf
, packet_read_size
);
2208 /* Now process the NIC packets read */
2209 if (likely(rx_count
> 0)) {
2210 process_packets(buf
, rx_count
);
2216 void dpdk_device::set_rss_table()
2218 if (_dev_info
.reta_size
== 0)
2221 int reta_conf_size
=
2222 std::max(1, _dev_info
.reta_size
/ RTE_RETA_GROUP_SIZE
);
2223 std::vector
<rte_eth_rss_reta_entry64
> reta_conf(reta_conf_size
);
2225 // Configure the HW indirection table
2227 for (auto& x
: reta_conf
) {
2229 for (auto& r
: x
.reta
) {
2230 r
= i
++ % _num_queues
;
2234 if (rte_eth_dev_rss_reta_update(_port_idx
, reta_conf
.data(), _dev_info
.reta_size
)) {
2235 rte_exit(EXIT_FAILURE
, "Port %d: Failed to update an RSS indirection table", _port_idx
);
2238 // Fill our local indirection table. Make it in a separate loop to keep things simple.
2240 for (auto& r
: _redir_table
) {
2241 r
= i
++ % _num_queues
;
2245 std::unique_ptr
<qp
> dpdk_device::init_local_queue(boost::program_options::variables_map opts
, uint16_t qid
) {
2247 std::unique_ptr
<qp
> qp
;
2248 if (opts
.count("hugepages")) {
2249 qp
= std::make_unique
<dpdk_qp
<true>>(this, qid
,
2250 _stats_plugin_name
+ "-" + _stats_plugin_inst
);
2252 qp
= std::make_unique
<dpdk_qp
<false>>(this, qid
,
2253 _stats_plugin_name
+ "-" + _stats_plugin_inst
);
2256 // FIXME: future is discarded
2257 (void)smp::submit_to(_home_cpu
, [this] () mutable {
2258 if (++_queues_ready
== _num_queues
) {
2266 /******************************** Interface functions *************************/
2268 std::unique_ptr
<net::device
> create_dpdk_net_device(
2270 uint16_t num_queues
,
2274 static bool called
= false;
2277 assert(dpdk::eal::initialized
);
2281 // Check that we have at least one DPDK-able port
2282 if (rte_eth_dev_count_avail() == 0) {
2283 rte_exit(EXIT_FAILURE
, "No Ethernet ports - bye\n");
2285 printf("ports number: %d\n", rte_eth_dev_count_avail());
2288 return std::make_unique
<dpdk::dpdk_device
>(port_idx
, num_queues
, use_lro
,
2292 std::unique_ptr
<net::device
> create_dpdk_net_device(
2293 const hw_config
& hw_cfg
)
2295 return create_dpdk_net_device(*hw_cfg
.port_index
, smp::count
, hw_cfg
.lro
, hw_cfg
.hw_fc
);
2299 boost::program_options::options_description
2300 get_dpdk_net_options_description()
2302 boost::program_options::options_description
opts(
2303 "DPDK net options");
2307 boost::program_options::value
<unsigned>()->default_value(0),
2312 boost::program_options::value
<std::string
>()->default_value("on"),
2313 "Enable HW Flow Control (on / off)");
2317 boost::program_options::value
<std::string
>()->default_value("on"),
2318 "Enable checksum offload feature (on / off)")
2320 boost::program_options::value
<std::string
>()->default_value("on"),
2321 "Enable TCP segment offload feature (on / off)")
2323 boost::program_options::value
<std::string
>()->default_value("on"),
2324 "Enable UDP fragmentation offload feature (on / off)")
2332 #endif // SEASTAR_HAVE_DPDK