1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
23 * Ceph - scalable distributed file system
25 * Copyright (C) 2015 XSky <haomai@xsky.com>
27 * Author: Haomai Wang <haomaiwang@gmail.com>
29 * This is free software; you can redistribute it and/or
30 * modify it under the terms of the GNU Lesser General Public
31 * License version 2.1, as published by the Free Software
32 * Foundation. See file COPYING.
40 #include <rte_config.h>
41 #include <rte_common.h>
44 #include <rte_ethdev.h>
45 #include <rte_cycles.h>
46 #include <rte_memzone.h>
48 #include "include/page.h"
56 #include "common/Cycles.h"
57 #include "common/dout.h"
58 #include "common/errno.h"
59 #include "include/assert.h"
61 #define dout_subsys ceph_subsys_dpdk
63 #define dout_prefix *_dout << "dpdk "
66 void* as_cookie(struct rte_pktmbuf_pool_private
& p
) {
71 typedef void *MARKER
[0]; /**< generic marker for a point in a structure */
74 /******************* Net device related constatns *****************************/
75 static constexpr uint16_t default_ring_size
= 512;
78 // We need 2 times the ring size of buffers because of the way PMDs
81 static constexpr uint16_t mbufs_per_queue_rx
= 2 * default_ring_size
;
82 static constexpr uint16_t rx_gc_thresh
= 64;
85 // No need to keep more descriptors in the air than can be sent in a single
86 // rte_eth_tx_burst() call.
88 static constexpr uint16_t mbufs_per_queue_tx
= 2 * default_ring_size
;
90 static constexpr uint16_t mbuf_cache_size
= 512;
91 static constexpr uint16_t mbuf_overhead
=
92 sizeof(struct rte_mbuf
) + RTE_PKTMBUF_HEADROOM
;
94 // We'll allocate 2K data buffers for an inline case because this would require
95 // a single page per mbuf. If we used 4K data buffers here it would require 2
96 // pages for a single buffer (due to "mbuf_overhead") and this is a much more
97 // demanding memory constraint.
99 static constexpr size_t inline_mbuf_data_size
= 2048;
102 // Size of the data buffer in the non-inline case.
104 // We may want to change (increase) this value in future, while the
105 // inline_mbuf_data_size value will unlikely change due to reasons described
108 static constexpr size_t mbuf_data_size
= 4096;
110 // (INLINE_MBUF_DATA_SIZE(2K)*32 = 64K = Max TSO/LRO size) + 1 mbuf for headers
111 static constexpr uint8_t max_frags
= 32 + 1;
114 // Intel's 40G NIC HW limit for a number of fragments in an xmit segment.
116 // See Chapter 8.4.1 "Transmit Packet in System Memory" of the xl710 devices
117 // spec. for more details.
119 static constexpr uint8_t i40e_max_xmit_segment_frags
= 8;
122 // VMWare's virtual NIC limit for a number of fragments in an xmit segment.
124 // see drivers/net/vmxnet3/base/vmxnet3_defs.h VMXNET3_MAX_TXD_PER_PKT
126 static constexpr uint8_t vmxnet3_max_xmit_segment_frags
= 16;
128 static constexpr uint16_t inline_mbuf_size
= inline_mbuf_data_size
+ mbuf_overhead
;
130 static size_t huge_page_size
= 512 * CEPH_PAGE_SIZE
;
132 uint32_t qp_mempool_obj_size()
134 uint32_t mp_size
= 0;
135 struct rte_mempool_objsz mp_obj_sz
= {};
138 // We will align each size to huge page size because DPDK allocates
139 // physically contiguous memory region for each pool object.
143 mp_size
+= align_up(rte_mempool_calc_obj_size(mbuf_overhead
, 0, &mp_obj_sz
)+
144 sizeof(struct rte_pktmbuf_pool_private
),
148 std::memset(&mp_obj_sz
, 0, sizeof(mp_obj_sz
));
149 mp_size
+= align_up(rte_mempool_calc_obj_size(inline_mbuf_size
, 0,
151 sizeof(struct rte_pktmbuf_pool_private
),
156 static constexpr const char* pktmbuf_pool_name
= "dpdk_net_pktmbuf_pool";
159 * When doing reads from the NIC queues, use this batch size
161 static constexpr uint8_t packet_read_size
= 32;
162 /******************************************************************************/
164 int DPDKDevice::init_port_start()
166 assert(_port_idx
< rte_eth_dev_count());
168 rte_eth_dev_info_get(_port_idx
, &_dev_info
);
171 // This is a workaround for a missing handling of a HW limitation in the
172 // DPDK i40e driver. This and all related to _is_i40e_device code should be
173 // removed once this handling is added.
175 if (std::string("rte_i40evf_pmd") == _dev_info
.driver_name
||
176 std::string("rte_i40e_pmd") == _dev_info
.driver_name
) {
177 ldout(cct
, 1) << __func__
<< " Device is an Intel's 40G NIC. Enabling 8 fragments hack!" << dendl
;
178 _is_i40e_device
= true;
181 if (std::string("rte_vmxnet3_pmd") == _dev_info
.driver_name
) {
182 ldout(cct
, 1) << __func__
<< " Device is a VMWare Virtual NIC. Enabling 16 fragments hack!" << dendl
;
183 _is_vmxnet3_device
= true;
187 // Another workaround: this time for a lack of number of RSS bits.
188 // ixgbe PF NICs support up to 16 RSS queues.
189 // ixgbe VF NICs support up to 4 RSS queues.
190 // i40e PF NICs support up to 64 RSS queues.
191 // i40e VF NICs support up to 16 RSS queues.
193 if (std::string("rte_ixgbe_pmd") == _dev_info
.driver_name
) {
194 _dev_info
.max_rx_queues
= std::min(_dev_info
.max_rx_queues
, (uint16_t)16);
195 } else if (std::string("rte_ixgbevf_pmd") == _dev_info
.driver_name
) {
196 _dev_info
.max_rx_queues
= std::min(_dev_info
.max_rx_queues
, (uint16_t)4);
197 } else if (std::string("rte_i40e_pmd") == _dev_info
.driver_name
) {
198 _dev_info
.max_rx_queues
= std::min(_dev_info
.max_rx_queues
, (uint16_t)64);
199 } else if (std::string("rte_i40evf_pmd") == _dev_info
.driver_name
) {
200 _dev_info
.max_rx_queues
= std::min(_dev_info
.max_rx_queues
, (uint16_t)16);
203 // Clear txq_flags - we want to support all available offload features
204 // except for multi-mempool and refcnt'ing which we don't need
205 _dev_info
.default_txconf
.txq_flags
=
206 ETH_TXQ_FLAGS_NOMULTMEMP
| ETH_TXQ_FLAGS_NOREFCOUNT
;
209 // Disable features that are not supported by port's HW
211 if (!(_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_UDP_CKSUM
)) {
212 _dev_info
.default_txconf
.txq_flags
|= ETH_TXQ_FLAGS_NOXSUMUDP
;
215 if (!(_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_TCP_CKSUM
)) {
216 _dev_info
.default_txconf
.txq_flags
|= ETH_TXQ_FLAGS_NOXSUMTCP
;
219 if (!(_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_SCTP_CKSUM
)) {
220 _dev_info
.default_txconf
.txq_flags
|= ETH_TXQ_FLAGS_NOXSUMSCTP
;
223 if (!(_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_VLAN_INSERT
)) {
224 _dev_info
.default_txconf
.txq_flags
|= ETH_TXQ_FLAGS_NOVLANOFFL
;
227 if (!(_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_VLAN_INSERT
)) {
228 _dev_info
.default_txconf
.txq_flags
|= ETH_TXQ_FLAGS_NOVLANOFFL
;
231 if (!(_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_TCP_TSO
)) {
232 _dev_info
.default_txconf
.txq_flags
|= ETH_TXQ_FLAGS_NOMULTSEGS
;
235 /* for port configuration all features are off by default */
236 rte_eth_conf port_conf
= { 0 };
238 ldout(cct
, 5) << __func__
<< " Port " << int(_port_idx
) << ": max_rx_queues "
239 << _dev_info
.max_rx_queues
<< " max_tx_queues "
240 << _dev_info
.max_tx_queues
<< dendl
;
242 _num_queues
= std::min({_num_queues
, _dev_info
.max_rx_queues
, _dev_info
.max_tx_queues
});
244 ldout(cct
, 5) << __func__
<< " Port " << int(_port_idx
) << ": using "
245 << _num_queues
<< " queues" << dendl
;;
247 // Set RSS mode: enable RSS if seastar is configured with more than 1 CPU.
248 // Even if port has a single queue we still want the RSS feature to be
249 // available in order to make HW calculate RSS hash for us.
250 if (_num_queues
> 1) {
251 if (_dev_info
.hash_key_size
== 40) {
252 _rss_key
= default_rsskey_40bytes
;
253 } else if (_dev_info
.hash_key_size
== 52) {
254 _rss_key
= default_rsskey_52bytes
;
255 } else if (_dev_info
.hash_key_size
!= 0) {
257 rte_exit(EXIT_FAILURE
,
258 "Port %d: We support only 40 or 52 bytes RSS hash keys, %d bytes key requested",
259 _port_idx
, _dev_info
.hash_key_size
);
261 _rss_key
= default_rsskey_40bytes
;
262 _dev_info
.hash_key_size
= 40;
265 port_conf
.rxmode
.mq_mode
= ETH_MQ_RX_RSS
;
266 port_conf
.rx_adv_conf
.rss_conf
.rss_hf
= ETH_RSS_PROTO_MASK
;
267 if (_dev_info
.hash_key_size
) {
268 port_conf
.rx_adv_conf
.rss_conf
.rss_key
= const_cast<uint8_t *>(_rss_key
.data());
269 port_conf
.rx_adv_conf
.rss_conf
.rss_key_len
= _dev_info
.hash_key_size
;
272 port_conf
.rxmode
.mq_mode
= ETH_MQ_RX_NONE
;
275 if (_num_queues
> 1) {
276 if (_dev_info
.reta_size
) {
277 // RETA size should be a power of 2
278 assert((_dev_info
.reta_size
& (_dev_info
.reta_size
- 1)) == 0);
280 // Set the RSS table to the correct size
281 _redir_table
.resize(_dev_info
.reta_size
);
282 _rss_table_bits
= std::lround(std::log2(_dev_info
.reta_size
));
283 ldout(cct
, 5) << __func__
<< " Port " << int(_port_idx
)
284 << ": RSS table size is " << _dev_info
.reta_size
<< dendl
;
286 // FIXME: same with sw_reta
287 _redir_table
.resize(128);
288 _rss_table_bits
= std::lround(std::log2(128));
291 _redir_table
.push_back(0);
294 // Set Rx VLAN stripping
295 if (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_VLAN_STRIP
) {
296 port_conf
.rxmode
.hw_vlan_strip
= 1;
299 // Enable HW CRC stripping
300 port_conf
.rxmode
.hw_strip_crc
= 1;
302 #ifdef RTE_ETHDEV_HAS_LRO_SUPPORT
304 if (_use_lro
&& (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_TCP_LRO
)) {
305 ldout(cct
, 1) << __func__
<< " LRO is on" << dendl
;
306 port_conf
.rxmode
.enable_lro
= 1;
307 _hw_features
.rx_lro
= true;
310 ldout(cct
, 1) << __func__
<< " LRO is off" << dendl
;
312 // Check that all CSUM features are either all set all together or not set
313 // all together. If this assumption breaks we need to rework the below logic
314 // by splitting the csum offload feature bit into separate bits for IPv4,
316 assert(((_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_IPV4_CKSUM
) &&
317 (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_TCP_CKSUM
)) ||
318 (!(_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_IPV4_CKSUM
) &&
319 !(_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_TCP_CKSUM
)));
321 // Set Rx checksum checking
322 if ((_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_IPV4_CKSUM
) &&
323 (_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_TCP_CKSUM
)) {
324 ldout(cct
, 1) << __func__
<< " RX checksum offload supported" << dendl
;
325 port_conf
.rxmode
.hw_ip_checksum
= 1;
326 _hw_features
.rx_csum_offload
= 1;
329 if ((_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_IPV4_CKSUM
)) {
330 ldout(cct
, 1) << __func__
<< " TX ip checksum offload supported" << dendl
;
331 _hw_features
.tx_csum_ip_offload
= 1;
334 // TSO is supported starting from DPDK v1.8
335 if (_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_TCP_TSO
) {
336 ldout(cct
, 1) << __func__
<< " TSO is supported" << dendl
;
337 _hw_features
.tx_tso
= 1;
340 // Check that Tx TCP CSUM features are either all set all together
341 // or not set all together. If this assumption breaks we need to rework the
342 // below logic by splitting the csum offload feature bit into separate bits
344 assert((_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_TCP_CKSUM
) ||
345 !(_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_TCP_CKSUM
));
347 if (_dev_info
.tx_offload_capa
& DEV_TX_OFFLOAD_TCP_CKSUM
) {
348 ldout(cct
, 1) << __func__
<< " TX TCP checksum offload supported" << dendl
;
349 _hw_features
.tx_csum_l4_offload
= 1;
354 ldout(cct
, 1) << __func__
<< " Port " << int(_port_idx
) << " init ... " << dendl
;
357 * Standard DPDK port initialisation - config port, then set up
360 if ((retval
= rte_eth_dev_configure(_port_idx
, _num_queues
, _num_queues
,
362 lderr(cct
) << __func__
<< " failed to configure port " << (int)_port_idx
363 << " rx/tx queues " << _num_queues
<< " error " << cpp_strerror(retval
) << dendl
;
367 //rte_eth_promiscuous_enable(port_num);
368 ldout(cct
, 1) << __func__
<< " done." << dendl
;
373 void DPDKDevice::set_hw_flow_control()
375 // Read the port's current/default flow control settings
376 struct rte_eth_fc_conf fc_conf
;
377 auto ret
= rte_eth_dev_flow_ctrl_get(_port_idx
, &fc_conf
);
379 if (ret
== -ENOTSUP
) {
380 ldout(cct
, 1) << __func__
<< " port " << int(_port_idx
)
381 << ": not support to get hardware flow control settings: " << ret
<< dendl
;
386 lderr(cct
) << __func__
<< " port " << int(_port_idx
)
387 << ": failed to get hardware flow control settings: " << ret
<< dendl
;
392 fc_conf
.mode
= RTE_FC_FULL
;
394 fc_conf
.mode
= RTE_FC_NONE
;
397 ret
= rte_eth_dev_flow_ctrl_set(_port_idx
, &fc_conf
);
398 if (ret
== -ENOTSUP
) {
399 ldout(cct
, 1) << __func__
<< " port " << int(_port_idx
)
400 << ": not support to set hardware flow control settings: " << ret
<< dendl
;
405 lderr(cct
) << __func__
<< " port " << int(_port_idx
)
406 << ": failed to set hardware flow control settings: " << ret
<< dendl
;
410 ldout(cct
, 1) << __func__
<< " port " << int(_port_idx
) << ": HW FC " << _enable_fc
<< dendl
;
414 ldout(cct
, 1) << __func__
<< " port " << int(_port_idx
) << ": changing HW FC settings is not supported" << dendl
;
417 int DPDKDevice::init_port_fini()
419 // Changing FC requires HW reset, so set it before the port is initialized.
420 set_hw_flow_control();
422 if (rte_eth_dev_start(_port_idx
) != 0) {
423 lderr(cct
) << __func__
<< " can't start port " << _port_idx
<< dendl
;
427 if (_num_queues
> 1) {
428 if (!rte_eth_dev_filter_supported(_port_idx
, RTE_ETH_FILTER_HASH
)) {
429 ldout(cct
, 5) << __func__
<< " Port " << _port_idx
<< ": HASH FILTER configuration is supported" << dendl
;
431 // Setup HW touse the TOEPLITZ hash function as an RSS hash function
432 struct rte_eth_hash_filter_info info
= {};
434 info
.info_type
= RTE_ETH_HASH_FILTER_GLOBAL_CONFIG
;
435 info
.info
.global_conf
.hash_func
= RTE_ETH_HASH_FUNCTION_TOEPLITZ
;
437 if (rte_eth_dev_filter_ctrl(_port_idx
, RTE_ETH_FILTER_HASH
,
438 RTE_ETH_FILTER_SET
, &info
) < 0) {
439 lderr(cct
) << __func__
<< " cannot set hash function on a port " << _port_idx
<< dendl
;
448 if (check_port_link_status() < 0) {
449 lderr(cct
) << __func__
<< " port link up failed " << _port_idx
<< dendl
;
453 ldout(cct
, 5) << __func__
<< " created DPDK device" << dendl
;
457 void DPDKQueuePair::configure_proxies(const std::map
<unsigned, float>& cpu_weights
) {
458 assert(!cpu_weights
.empty());
459 if (cpu_weights
.size() == 1 && cpu_weights
.begin()->first
== _qid
) {
460 // special case queue sending to self only, to avoid requiring a hash value
463 register_packet_provider([this] {
465 if (!_proxy_packetq
.empty()) {
466 p
= std::move(_proxy_packetq
.front());
467 _proxy_packetq
.pop_front();
471 build_sw_reta(cpu_weights
);
474 void DPDKQueuePair::build_sw_reta(const std::map
<unsigned, float>& cpu_weights
) {
475 float total_weight
= 0;
476 for (auto&& x
: cpu_weights
) {
477 total_weight
+= x
.second
;
481 std::array
<uint8_t, 128> reta
;
482 for (auto&& entry
: cpu_weights
) {
483 auto cpu
= entry
.first
;
484 auto weight
= entry
.second
;
486 while (idx
< (accum
/ total_weight
* reta
.size() - 0.5)) {
494 bool DPDKQueuePair::init_rx_mbuf_pool()
496 std::string name
= std::string(pktmbuf_pool_name
) + std::to_string(_qid
) + "_rx";
498 // reserve the memory for Rx buffers containers
499 _rx_free_pkts
.reserve(mbufs_per_queue_rx
);
500 _rx_free_bufs
.reserve(mbufs_per_queue_rx
);
502 _pktmbuf_pool_rx
= rte_mempool_lookup(name
.c_str());
503 if (!_pktmbuf_pool_rx
) {
504 ldout(cct
, 1) << __func__
<< " Creating Rx mbuf pool '" << name
.c_str()
505 << "' [" << mbufs_per_queue_rx
<< " mbufs] ..."<< dendl
;
508 // Don't pass single-producer/single-consumer flags to mbuf create as it
509 // seems faster to use a cache instead.
511 struct rte_pktmbuf_pool_private roomsz
= {};
512 roomsz
.mbuf_data_room_size
= mbuf_data_size
+ RTE_PKTMBUF_HEADROOM
;
513 _pktmbuf_pool_rx
= rte_mempool_create(
515 mbufs_per_queue_rx
, mbuf_overhead
,
517 sizeof(struct rte_pktmbuf_pool_private
),
518 rte_pktmbuf_pool_init
, as_cookie(roomsz
),
519 rte_pktmbuf_init
, nullptr,
521 if (!_pktmbuf_pool_rx
) {
522 lderr(cct
) << __func__
<< " Failed to create mempool for rx" << dendl
;
527 // 1) Pull all entries from the pool.
528 // 2) Bind data buffers to each of them.
529 // 3) Return them back to the pool.
531 for (int i
= 0; i
< mbufs_per_queue_rx
; i
++) {
532 rte_mbuf
* m
= rte_pktmbuf_alloc(_pktmbuf_pool_rx
);
534 _rx_free_bufs
.push_back(m
);
537 for (int i
= 0; i
< cct
->_conf
->ms_dpdk_rx_buffer_count_per_core
; i
++) {
538 void* m
= rte_malloc(NULL
, mbuf_data_size
, mbuf_data_size
);
540 _alloc_bufs
.push_back(m
);
543 for (auto&& m
: _rx_free_bufs
) {
544 if (!init_noninline_rx_mbuf(m
, mbuf_data_size
, _alloc_bufs
)) {
545 lderr(cct
) << __func__
<< " Failed to allocate data buffers for Rx ring. "
546 "Consider increasing the amount of memory." << dendl
;
551 rte_mempool_put_bulk(_pktmbuf_pool_rx
, (void**)_rx_free_bufs
.data(),
552 _rx_free_bufs
.size());
554 _rx_free_bufs
.clear();
555 if (rte_eth_rx_queue_setup(_dev_port_idx
, _qid
, default_ring_size
,
556 rte_eth_dev_socket_id(_dev_port_idx
),
557 _dev
->def_rx_conf(), _pktmbuf_pool_rx
) < 0) {
558 lderr(cct
) << __func__
<< " cannot initialize rx queue" << dendl
;
563 ldout(cct
, 20) << __func__
<< " count " << rte_mempool_count(_pktmbuf_pool_rx
) << " free count " << rte_mempool_free_count(_pktmbuf_pool_rx
) << dendl
;
564 return _pktmbuf_pool_rx
!= nullptr;
567 int DPDKDevice::check_port_link_status()
571 ldout(cct
, 20) << __func__
<< dendl
;
572 const int sleep_time
= 100 * 1000;
573 const int max_check_time
= 90; /* 9s (90 * 100ms) in total */
575 struct rte_eth_link link
;
576 memset(&link
, 0, sizeof(link
));
577 rte_eth_link_get_nowait(_port_idx
, &link
);
580 if (link
.link_status
) {
581 ldout(cct
, 5) << __func__
<< " done port "
582 << static_cast<unsigned>(_port_idx
)
583 << " link Up - speed " << link
.link_speed
585 << ((link
.link_duplex
== ETH_LINK_FULL_DUPLEX
) ? ("full-duplex") : ("half-duplex\n"))
588 } else if (count
++ < max_check_time
) {
589 ldout(cct
, 20) << __func__
<< " not ready, continue to wait." << dendl
;
592 lderr(cct
) << __func__
<< "done port " << _port_idx
<< " link down" << dendl
;
600 class C_handle_dev_stats
: public EventCallback
{
603 C_handle_dev_stats(DPDKQueuePair
*qp
): _qp(qp
) { }
604 void do_request(int id
) {
609 DPDKQueuePair::DPDKQueuePair(CephContext
*c
, EventCenter
*cen
, DPDKDevice
* dev
, uint8_t qid
)
610 : cct(c
), _dev(dev
), _dev_port_idx(dev
->port_idx()), center(cen
), _qid(qid
),
611 _tx_poller(this), _rx_gc_poller(this), _tx_buf_factory(c
, dev
, qid
),
614 if (!init_rx_mbuf_pool()) {
615 lderr(cct
) << __func__
<< " cannot initialize mbuf pools" << dendl
;
619 static_assert(offsetof(tx_buf
, private_end
) -
620 offsetof(tx_buf
, private_start
) <= RTE_PKTMBUF_HEADROOM
,
621 "RTE_PKTMBUF_HEADROOM is less than DPDKQueuePair::tx_buf size! "
622 "Increase the headroom size in the DPDK configuration");
623 static_assert(offsetof(tx_buf
, _mbuf
) == 0,
624 "There is a pad at the beginning of the tx_buf before _mbuf "
626 static_assert((inline_mbuf_data_size
& (inline_mbuf_data_size
- 1)) == 0,
627 "inline_mbuf_data_size has to be a power of two!");
629 std::string
name(std::string("queue") + std::to_string(qid
));
630 PerfCountersBuilder
plb(cct
, name
, l_dpdk_qp_first
, l_dpdk_qp_last
);
632 plb
.add_u64_counter(l_dpdk_qp_rx_packets
, "dpdk_receive_packets", "DPDK received packets");
633 plb
.add_u64_counter(l_dpdk_qp_tx_packets
, "dpdk_send_packets", "DPDK sendd packets");
634 plb
.add_u64_counter(l_dpdk_qp_rx_bad_checksum_errors
, "dpdk_receive_bad_checksum_errors", "DPDK received bad checksum packets");
635 plb
.add_u64_counter(l_dpdk_qp_rx_no_memory_errors
, "dpdk_receive_no_memory_errors", "DPDK received no memory packets");
636 plb
.add_u64_counter(l_dpdk_qp_rx_bytes
, "dpdk_receive_bytes", "DPDK received bytes");
637 plb
.add_u64_counter(l_dpdk_qp_tx_bytes
, "dpdk_send_bytes", "DPDK sendd bytes");
638 plb
.add_u64_counter(l_dpdk_qp_rx_last_bunch
, "dpdk_receive_last_bunch", "DPDK last received bunch");
639 plb
.add_u64_counter(l_dpdk_qp_tx_last_bunch
, "dpdk_send_last_bunch", "DPDK last send bunch");
640 plb
.add_u64_counter(l_dpdk_qp_rx_fragments
, "dpdk_receive_fragments", "DPDK received total fragments");
641 plb
.add_u64_counter(l_dpdk_qp_tx_fragments
, "dpdk_send_fragments", "DPDK sendd total fragments");
642 plb
.add_u64_counter(l_dpdk_qp_rx_copy_ops
, "dpdk_receive_copy_ops", "DPDK received copy operations");
643 plb
.add_u64_counter(l_dpdk_qp_tx_copy_ops
, "dpdk_send_copy_ops", "DPDK sendd copy operations");
644 plb
.add_u64_counter(l_dpdk_qp_rx_copy_bytes
, "dpdk_receive_copy_bytes", "DPDK received copy bytes");
645 plb
.add_u64_counter(l_dpdk_qp_tx_copy_bytes
, "dpdk_send_copy_bytes", "DPDK send copy bytes");
646 plb
.add_u64_counter(l_dpdk_qp_rx_linearize_ops
, "dpdk_receive_linearize_ops", "DPDK received linearize operations");
647 plb
.add_u64_counter(l_dpdk_qp_tx_linearize_ops
, "dpdk_send_linearize_ops", "DPDK send linearize operations");
648 plb
.add_u64_counter(l_dpdk_qp_tx_queue_length
, "dpdk_send_queue_length", "DPDK send queue length");
650 perf_logger
= plb
.create_perf_counters();
651 cct
->get_perfcounters_collection()->add(perf_logger
);
654 device_stat_time_fd
= center
->create_time_event(1000*1000, new C_handle_dev_stats(this));
657 void DPDKQueuePair::handle_stats()
659 ldout(cct
, 20) << __func__
<< " started." << dendl
;
660 rte_eth_stats rte_stats
= {};
661 int rc
= rte_eth_stats_get(_dev_port_idx
, &rte_stats
);
664 ldout(cct
, 0) << __func__
<< " failed to get port statistics: " << cpp_strerror(rc
) << dendl
;
668 #if RTE_VERSION < RTE_VERSION_NUM(16,7,0,0)
669 _dev
->perf_logger
->set(l_dpdk_dev_rx_mcast
, rte_stats
.imcasts
);
670 _dev
->perf_logger
->set(l_dpdk_dev_rx_badcrc_errors
, rte_stats
.ibadcrc
);
672 _dev
->perf_logger
->set(l_dpdk_dev_rx_dropped_errors
, rte_stats
.imissed
);
673 _dev
->perf_logger
->set(l_dpdk_dev_rx_nombuf_errors
, rte_stats
.rx_nombuf
);
675 _dev
->perf_logger
->set(l_dpdk_dev_rx_total_errors
, rte_stats
.ierrors
);
676 _dev
->perf_logger
->set(l_dpdk_dev_tx_total_errors
, rte_stats
.oerrors
);
677 device_stat_time_fd
= center
->create_time_event(1000*1000, new C_handle_dev_stats(this));
680 bool DPDKQueuePair::poll_tx() {
681 bool nonloopback
= !cct
->_conf
->ms_dpdk_debug_allow_loopback
;
683 uint64_t start
= Cycles::rdtsc();
685 uint32_t total_work
= 0;
686 if (_tx_packetq
.size() < 16) {
687 // refill send queue from upper layers
691 for (auto&& pr
: _pkt_providers
) {
695 if (likely(nonloopback
)) {
696 // ldout(cct, 0) << __func__ << " len: " << p->len() << " frags: " << p->nr_frags() << dendl;
697 _tx_packetq
.push_back(std::move(*p
));
699 auto th
= p
->get_header
<eth_hdr
>(0);
700 if (th
->dst_mac
== th
->src_mac
) {
701 _dev
->l2receive(_qid
, std::move(*p
));
703 _tx_packetq
.push_back(std::move(*p
));
706 if (_tx_packetq
.size() == 128) {
712 } while (work
&& total_work
< 256 && _tx_packetq
.size() < 128);
714 if (!_tx_packetq
.empty()) {
715 uint64_t c
= send(_tx_packetq
);
716 perf_logger
->inc(l_dpdk_qp_tx_packets
, c
);
717 perf_logger
->set(l_dpdk_qp_tx_last_bunch
, c
);
719 tx_count
+= total_work
;
720 tx_cycles
+= Cycles::rdtsc() - start
;
728 inline Tub
<Packet
> DPDKQueuePair::from_mbuf_lro(rte_mbuf
* m
)
733 for (; m
!= nullptr; m
= m
->next
) {
734 char* data
= rte_pktmbuf_mtod(m
, char*);
736 _frags
.emplace_back(fragment
{data
, rte_pktmbuf_data_len(m
)});
737 _bufs
.push_back(data
);
740 auto del
= std::bind(
741 [this](std::vector
<char*> &bufs
) {
742 for (auto&& b
: bufs
) { _alloc_bufs
.push_back(b
); }
743 }, std::move(_bufs
));
745 _frags
.begin(), _frags
.end(), make_deleter(std::move(del
)));
748 inline Tub
<Packet
> DPDKQueuePair::from_mbuf(rte_mbuf
* m
)
750 _rx_free_pkts
.push_back(m
);
751 _num_rx_free_segs
+= m
->nb_segs
;
753 if (!_dev
->hw_features_ref().rx_lro
|| rte_pktmbuf_is_contiguous(m
)) {
754 char* data
= rte_pktmbuf_mtod(m
, char*);
756 return Packet(fragment
{data
, rte_pktmbuf_data_len(m
)},
757 make_deleter([this, data
] { _alloc_bufs
.push_back(data
); }));
759 return from_mbuf_lro(m
);
763 inline bool DPDKQueuePair::refill_one_cluster(rte_mbuf
* head
)
765 for (; head
!= nullptr; head
= head
->next
) {
766 if (!refill_rx_mbuf(head
, mbuf_data_size
, _alloc_bufs
)) {
768 // If we failed to allocate a new buffer - push the rest of the
769 // cluster back to the free_packets list for a later retry.
771 _rx_free_pkts
.push_back(head
);
774 _rx_free_bufs
.push_back(head
);
780 bool DPDKQueuePair::rx_gc(bool force
)
782 if (_num_rx_free_segs
>= rx_gc_thresh
|| force
) {
783 ldout(cct
, 10) << __func__
<< " free segs " << _num_rx_free_segs
784 << " thresh " << rx_gc_thresh
785 << " free pkts " << _rx_free_pkts
.size()
786 << " pool count " << rte_mempool_count(_pktmbuf_pool_rx
)
787 << " free pool count " << rte_mempool_free_count(_pktmbuf_pool_rx
)
790 while (!_rx_free_pkts
.empty()) {
792 // Use back() + pop_back() semantics to avoid an extra
793 // _rx_free_pkts.clear() at the end of the function - clear() has a
794 // linear complexity.
796 auto m
= _rx_free_pkts
.back();
797 _rx_free_pkts
.pop_back();
799 if (!refill_one_cluster(m
)) {
800 ldout(cct
, 1) << __func__
<< " get new mbuf failed " << dendl
;
805 if (_rx_free_bufs
.size()) {
806 rte_mempool_put_bulk(_pktmbuf_pool_rx
,
807 (void **)_rx_free_bufs
.data(),
808 _rx_free_bufs
.size());
810 // TODO: assert() in a fast path! Remove me ASAP!
811 assert(_num_rx_free_segs
>= _rx_free_bufs
.size());
813 _num_rx_free_segs
-= _rx_free_bufs
.size();
814 _rx_free_bufs
.clear();
816 // TODO: assert() in a fast path! Remove me ASAP!
817 assert((_rx_free_pkts
.empty() && !_num_rx_free_segs
) ||
818 (!_rx_free_pkts
.empty() && _num_rx_free_segs
));
822 return _num_rx_free_segs
>= rx_gc_thresh
;
826 void DPDKQueuePair::process_packets(
827 struct rte_mbuf
**bufs
, uint16_t count
)
829 uint64_t nr_frags
= 0, bytes
= 0;
831 for (uint16_t i
= 0; i
< count
; i
++) {
832 struct rte_mbuf
*m
= bufs
[i
];
835 Tub
<Packet
> p
= from_mbuf(m
);
837 // Drop the packet if translation above has failed
839 perf_logger
->inc(l_dpdk_qp_rx_no_memory_errors
);
842 // ldout(cct, 0) << __func__ << " len " << p->len() << " " << dendl;
844 nr_frags
+= m
->nb_segs
;
847 // Set stipped VLAN value if available
848 if ((_dev
->_dev_info
.rx_offload_capa
& DEV_RX_OFFLOAD_VLAN_STRIP
) &&
849 (m
->ol_flags
& PKT_RX_VLAN_PKT
)) {
850 oi
.vlan_tci
= m
->vlan_tci
;
853 if (_dev
->get_hw_features().rx_csum_offload
) {
854 if (m
->ol_flags
& (PKT_RX_IP_CKSUM_BAD
| PKT_RX_L4_CKSUM_BAD
)) {
855 // Packet with bad checksum, just drop it.
856 perf_logger
->inc(l_dpdk_qp_rx_bad_checksum_errors
);
859 // Note that when _hw_features.rx_csum_offload is on, the receive
860 // code for ip, tcp and udp will assume they don't need to check
861 // the checksum again, because we did this here.
864 p
->set_offload_info(oi
);
865 if (m
->ol_flags
& PKT_RX_RSS_HASH
) {
866 p
->set_rss_hash(m
->hash
.rss
);
869 _dev
->l2receive(_qid
, std::move(*p
));
872 perf_logger
->inc(l_dpdk_qp_rx_packets
, count
);
873 perf_logger
->set(l_dpdk_qp_rx_last_bunch
, count
);
874 perf_logger
->inc(l_dpdk_qp_rx_fragments
, nr_frags
);
875 perf_logger
->inc(l_dpdk_qp_rx_bytes
, bytes
);
878 bool DPDKQueuePair::poll_rx_once()
880 struct rte_mbuf
*buf
[packet_read_size
];
884 uint64_t start
= Cycles::rdtsc();
886 uint16_t count
= rte_eth_rx_burst(_dev_port_idx
, _qid
,
887 buf
, packet_read_size
);
889 /* Now process the NIC packets read */
890 if (likely(count
> 0)) {
891 process_packets(buf
, count
);
893 rx_cycles
= Cycles::rdtsc() - start
;
899 if (rx_count
> 10000 && tx_count
) {
900 ldout(cct
, 0) << __func__
<< " rx count=" << rx_count
<< " avg rx=" << Cycles::to_nanoseconds(rx_cycles
)/rx_count
<< "ns "
901 << " tx count=" << tx_count
<< " avg tx=" << Cycles::to_nanoseconds(tx_cycles
)/tx_count
<< "ns"
903 rx_count
= rx_cycles
= tx_count
= tx_cycles
= 0;
911 DPDKQueuePair::tx_buf_factory::tx_buf_factory(CephContext
*c
,
912 DPDKDevice
*dev
, uint8_t qid
): cct(c
)
914 std::string name
= std::string(pktmbuf_pool_name
) + std::to_string(qid
) + "_tx";
916 _pool
= rte_mempool_lookup(name
.c_str());
918 ldout(cct
, 0) << __func__
<< " Creating Tx mbuf pool '" << name
.c_str()
919 << "' [" << mbufs_per_queue_tx
<< " mbufs] ..." << dendl
;
921 // We are going to push the buffers from the mempool into
922 // the circular_buffer and then poll them from there anyway, so
923 // we prefer to make a mempool non-atomic in this case.
925 _pool
= rte_mempool_create(name
.c_str(),
926 mbufs_per_queue_tx
, inline_mbuf_size
,
928 sizeof(struct rte_pktmbuf_pool_private
),
929 rte_pktmbuf_pool_init
, nullptr,
930 rte_pktmbuf_init
, nullptr,
934 lderr(cct
) << __func__
<< " Failed to create mempool for Tx" << dendl
;
937 if (rte_eth_tx_queue_setup(dev
->port_idx(), qid
, default_ring_size
,
938 rte_eth_dev_socket_id(dev
->port_idx()),
939 dev
->def_tx_conf()) < 0) {
940 lderr(cct
) << __func__
<< " cannot initialize tx queue" << dendl
;
946 // Fill the factory with the buffers from the mempool allocated
952 bool DPDKQueuePair::tx_buf::i40e_should_linearize(rte_mbuf
*head
)
954 bool is_tso
= head
->ol_flags
& PKT_TX_TCP_SEG
;
956 // For a non-TSO case: number of fragments should not exceed 8
958 return head
->nb_segs
> i40e_max_xmit_segment_frags
;
962 // For a TSO case each MSS window should not include more than 8
963 // fragments including headers.
966 // Calculate the number of frags containing headers.
968 // Note: we support neither VLAN nor tunneling thus headers size
969 // accounting is super simple.
971 size_t headers_size
= head
->l2_len
+ head
->l3_len
+ head
->l4_len
;
972 unsigned hdr_frags
= 0;
973 size_t cur_payload_len
= 0;
974 rte_mbuf
*cur_seg
= head
;
976 while (cur_seg
&& cur_payload_len
< headers_size
) {
977 cur_payload_len
+= cur_seg
->data_len
;
978 cur_seg
= cur_seg
->next
;
983 // Header fragments will be used for each TSO segment, thus the
984 // maximum number of data segments will be 8 minus the number of
987 // It's unclear from the spec how the first TSO segment is treated
988 // if the last fragment with headers contains some data bytes:
989 // whether this fragment will be accounted as a single fragment or
990 // as two separate fragments. We prefer to play it safe and assume
991 // that this fragment will be accounted as two separate fragments.
993 size_t max_win_size
= i40e_max_xmit_segment_frags
- hdr_frags
;
995 if (head
->nb_segs
<= max_win_size
) {
999 // Get the data (without headers) part of the first data fragment
1000 size_t prev_frag_data
= cur_payload_len
- headers_size
;
1001 auto mss
= head
->tso_segsz
;
1004 unsigned frags_in_seg
= 0;
1005 size_t cur_seg_size
= 0;
1007 if (prev_frag_data
) {
1008 cur_seg_size
= prev_frag_data
;
1013 while (cur_seg_size
< mss
&& cur_seg
) {
1014 cur_seg_size
+= cur_seg
->data_len
;
1015 cur_seg
= cur_seg
->next
;
1018 if (frags_in_seg
> max_win_size
) {
1023 if (cur_seg_size
> mss
) {
1024 prev_frag_data
= cur_seg_size
- mss
;
1031 void DPDKQueuePair::tx_buf::set_cluster_offload_info(const Packet
& p
, const DPDKQueuePair
& qp
, rte_mbuf
* head
)
1033 // Handle TCP checksum offload
1034 auto oi
= p
.offload_info();
1035 if (oi
.needs_ip_csum
) {
1036 head
->ol_flags
|= PKT_TX_IP_CKSUM
;
1037 // TODO: Take a VLAN header into an account here
1038 head
->l2_len
= sizeof(struct ether_hdr
);
1039 head
->l3_len
= oi
.ip_hdr_len
;
1041 if (qp
.port().get_hw_features().tx_csum_l4_offload
) {
1042 if (oi
.protocol
== ip_protocol_num::tcp
) {
1043 head
->ol_flags
|= PKT_TX_TCP_CKSUM
;
1044 // TODO: Take a VLAN header into an account here
1045 head
->l2_len
= sizeof(struct ether_hdr
);
1046 head
->l3_len
= oi
.ip_hdr_len
;
1048 if (oi
.tso_seg_size
) {
1049 assert(oi
.needs_ip_csum
);
1050 head
->ol_flags
|= PKT_TX_TCP_SEG
;
1051 head
->l4_len
= oi
.tcp_hdr_len
;
1052 head
->tso_segsz
= oi
.tso_seg_size
;
1058 DPDKQueuePair::tx_buf
* DPDKQueuePair::tx_buf::from_packet_zc(
1059 CephContext
*cct
, Packet
&& p
, DPDKQueuePair
& qp
)
1061 // Too fragmented - linearize
1062 if (p
.nr_frags() > max_frags
) {
1064 qp
.perf_logger
->inc(l_dpdk_qp_tx_linearize_ops
);
1068 rte_mbuf
*head
= nullptr, *last_seg
= nullptr;
1072 // Create a HEAD of the fragmented packet: check if frag0 has to be
1073 // copied and if yes - send it in a copy way
1075 if (!check_frag0(p
)) {
1076 if (!copy_one_frag(qp
, p
.frag(0), head
, last_seg
, nsegs
)) {
1077 ldout(cct
, 1) << __func__
<< " no available mbuf for " << p
.frag(0).size
<< dendl
;
1080 } else if (!translate_one_frag(qp
, p
.frag(0), head
, last_seg
, nsegs
)) {
1081 ldout(cct
, 1) << __func__
<< " no available mbuf for " << p
.frag(0).size
<< dendl
;
1085 unsigned total_nsegs
= nsegs
;
1087 for (unsigned i
= 1; i
< p
.nr_frags(); i
++) {
1088 rte_mbuf
*h
= nullptr, *new_last_seg
= nullptr;
1089 if (!translate_one_frag(qp
, p
.frag(i
), h
, new_last_seg
, nsegs
)) {
1090 ldout(cct
, 1) << __func__
<< " no available mbuf for " << p
.frag(i
).size
<< dendl
;
1091 me(head
)->recycle();
1095 total_nsegs
+= nsegs
;
1097 // Attach a new buffers' chain to the packet chain
1099 last_seg
= new_last_seg
;
1102 // Update the HEAD buffer with the packet info
1103 head
->pkt_len
= p
.len();
1104 head
->nb_segs
= total_nsegs
;
1106 set_cluster_offload_info(p
, qp
, head
);
1109 // If a packet hasn't been linearized already and the resulting
1110 // cluster requires the linearisation due to HW limitation:
1112 // - Recycle the cluster.
1113 // - Linearize the packet.
1114 // - Build the cluster once again
1116 if (head
->nb_segs
> max_frags
||
1117 (p
.nr_frags() > 1 && qp
.port().is_i40e_device() && i40e_should_linearize(head
)) ||
1118 (p
.nr_frags() > vmxnet3_max_xmit_segment_frags
&& qp
.port().is_vmxnet3_device())) {
1119 me(head
)->recycle();
1121 qp
.perf_logger
->inc(l_dpdk_qp_tx_linearize_ops
);
1123 goto build_mbuf_cluster
;
1126 me(last_seg
)->set_packet(std::move(p
));
1131 void DPDKQueuePair::tx_buf::copy_packet_to_cluster(const Packet
& p
, rte_mbuf
* head
)
1133 rte_mbuf
* cur_seg
= head
;
1134 size_t cur_seg_offset
= 0;
1135 unsigned cur_frag_idx
= 0;
1136 size_t cur_frag_offset
= 0;
1139 size_t to_copy
= std::min(p
.frag(cur_frag_idx
).size
- cur_frag_offset
,
1140 inline_mbuf_data_size
- cur_seg_offset
);
1142 memcpy(rte_pktmbuf_mtod_offset(cur_seg
, void*, cur_seg_offset
),
1143 p
.frag(cur_frag_idx
).base
+ cur_frag_offset
, to_copy
);
1145 cur_frag_offset
+= to_copy
;
1146 cur_seg_offset
+= to_copy
;
1148 if (cur_frag_offset
>= p
.frag(cur_frag_idx
).size
) {
1150 if (cur_frag_idx
>= p
.nr_frags()) {
1152 // We are done - set the data size of the last segment
1155 cur_seg
->data_len
= cur_seg_offset
;
1159 cur_frag_offset
= 0;
1162 if (cur_seg_offset
>= inline_mbuf_data_size
) {
1163 cur_seg
->data_len
= inline_mbuf_data_size
;
1164 cur_seg
= cur_seg
->next
;
1167 // FIXME: assert in a fast-path - remove!!!
1173 DPDKQueuePair::tx_buf
* DPDKQueuePair::tx_buf::from_packet_copy(Packet
&& p
, DPDKQueuePair
& qp
)
1181 * Here we are going to use the fact that the inline data size is a
1184 * We will first try to allocate the cluster and only if we are
1185 * successful - we will go and copy the data.
1187 auto aligned_len
= align_up((size_t)p
.len(), inline_mbuf_data_size
);
1188 unsigned nsegs
= aligned_len
/ inline_mbuf_data_size
;
1189 rte_mbuf
*head
= nullptr, *last_seg
= nullptr;
1191 tx_buf
* buf
= qp
.get_tx_buf();
1196 head
= buf
->rte_mbuf_p();
1198 for (unsigned i
= 1; i
< nsegs
; i
++) {
1199 buf
= qp
.get_tx_buf();
1201 me(head
)->recycle();
1205 last_seg
->next
= buf
->rte_mbuf_p();
1206 last_seg
= last_seg
->next
;
1210 // If we've got here means that we have succeeded already!
1211 // We only need to copy the data and set the head buffer with the
1214 head
->pkt_len
= p
.len();
1215 head
->nb_segs
= nsegs
;
1217 copy_packet_to_cluster(p
, head
);
1218 set_cluster_offload_info(p
, qp
, head
);
1223 size_t DPDKQueuePair::tx_buf::copy_one_data_buf(
1224 DPDKQueuePair
& qp
, rte_mbuf
*& m
, char* data
, size_t buf_len
)
1226 tx_buf
* buf
= qp
.get_tx_buf();
1231 size_t len
= std::min(buf_len
, inline_mbuf_data_size
);
1233 m
= buf
->rte_mbuf_p();
1239 qp
.perf_logger
->inc(l_dpdk_qp_tx_copy_ops
);
1240 qp
.perf_logger
->inc(l_dpdk_qp_tx_copy_bytes
, len
);
1242 memcpy(rte_pktmbuf_mtod(m
, void*), data
, len
);
1247 void DPDKDevice::set_rss_table()
1249 // always fill our local indirection table.
1251 for (auto& r
: _redir_table
) {
1252 r
= i
++ % _num_queues
;
1255 if (_dev_info
.reta_size
== 0)
1258 int reta_conf_size
= std::max(1, _dev_info
.reta_size
/ RTE_RETA_GROUP_SIZE
);
1259 rte_eth_rss_reta_entry64 reta_conf
[reta_conf_size
];
1261 // Configure the HW indirection table
1263 for (auto& x
: reta_conf
) {
1265 for (auto& r
: x
.reta
) {
1266 r
= i
++ % _num_queues
;
1270 if (rte_eth_dev_rss_reta_update(_port_idx
, reta_conf
, _dev_info
.reta_size
)) {
1271 rte_exit(EXIT_FAILURE
, "Port %d: Failed to update an RSS indirection table", _port_idx
);
1275 /******************************** Interface functions *************************/
1277 std::unique_ptr
<DPDKDevice
> create_dpdk_net_device(
1284 // Check that we have at least one DPDK-able port
1285 if (rte_eth_dev_count() == 0) {
1286 rte_exit(EXIT_FAILURE
, "No Ethernet ports - bye\n");
1288 ldout(cct
, 10) << __func__
<< " ports number: " << int(rte_eth_dev_count()) << dendl
;
1291 return std::unique_ptr
<DPDKDevice
>(
1292 new DPDKDevice(cct
, port_idx
, cores
, use_lro
, enable_fc
));