]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/dpdk/DPDK.cc
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / msg / async / dpdk / DPDK.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2/*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19/*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 */
7c673cae
FG
22
23#include <atomic>
24#include <vector>
25#include <queue>
26
27#include <rte_config.h>
28#include <rte_common.h>
29#include <rte_eal.h>
30#include <rte_pci.h>
31#include <rte_ethdev.h>
20effc67 32#include <rte_ether.h>
7c673cae
FG
33#include <rte_cycles.h>
34#include <rte_memzone.h>
35
36#include "include/page.h"
37#include "align.h"
38#include "IP.h"
39#include "const.h"
40#include "dpdk_rte.h"
41#include "DPDK.h"
42#include "toeplitz.h"
43
44#include "common/Cycles.h"
45#include "common/dout.h"
46#include "common/errno.h"
11fdf7f2 47#include "include/ceph_assert.h"
7c673cae
FG
48
49#define dout_subsys ceph_subsys_dpdk
50#undef dout_prefix
51#define dout_prefix *_dout << "dpdk "
52
53
54void* as_cookie(struct rte_pktmbuf_pool_private& p) {
55 return &p;
56};
57
7c673cae
FG
58/******************* Net device related constatns *****************************/
59static constexpr uint16_t default_ring_size = 512;
60
61//
62// We need 2 times the ring size of buffers because of the way PMDs
63// refill the ring.
64//
65static constexpr uint16_t mbufs_per_queue_rx = 2 * default_ring_size;
66static constexpr uint16_t rx_gc_thresh = 64;
67
68//
69// No need to keep more descriptors in the air than can be sent in a single
70// rte_eth_tx_burst() call.
71//
72static constexpr uint16_t mbufs_per_queue_tx = 2 * default_ring_size;
73
74static constexpr uint16_t mbuf_cache_size = 512;
11fdf7f2
TL
75//
76// Size of the data buffer in the non-inline case.
77//
78// We may want to change (increase) this value in future, while the
79// inline_mbuf_data_size value will unlikely change due to reasons described
80// above.
81//
82static constexpr size_t mbuf_data_size = 4096;
83
7c673cae 84static constexpr uint16_t mbuf_overhead =
11fdf7f2 85 sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
7c673cae
FG
86//
87// We'll allocate 2K data buffers for an inline case because this would require
88// a single page per mbuf. If we used 4K data buffers here it would require 2
89// pages for a single buffer (due to "mbuf_overhead") and this is a much more
90// demanding memory constraint.
91//
92static constexpr size_t inline_mbuf_data_size = 2048;
93
7c673cae
FG
94
95// (INLINE_MBUF_DATA_SIZE(2K)*32 = 64K = Max TSO/LRO size) + 1 mbuf for headers
96static constexpr uint8_t max_frags = 32 + 1;
97
98//
99// Intel's 40G NIC HW limit for a number of fragments in an xmit segment.
100//
101// See Chapter 8.4.1 "Transmit Packet in System Memory" of the xl710 devices
102// spec. for more details.
103//
104static constexpr uint8_t i40e_max_xmit_segment_frags = 8;
105
106//
107// VMWare's virtual NIC limit for a number of fragments in an xmit segment.
108//
109// see drivers/net/vmxnet3/base/vmxnet3_defs.h VMXNET3_MAX_TXD_PER_PKT
110//
111static constexpr uint8_t vmxnet3_max_xmit_segment_frags = 16;
112
113static constexpr uint16_t inline_mbuf_size = inline_mbuf_data_size + mbuf_overhead;
114
115static size_t huge_page_size = 512 * CEPH_PAGE_SIZE;
116
117uint32_t qp_mempool_obj_size()
118{
119 uint32_t mp_size = 0;
120 struct rte_mempool_objsz mp_obj_sz = {};
121
122 //
123 // We will align each size to huge page size because DPDK allocates
124 // physically contiguous memory region for each pool object.
125 //
126
127 // Rx
128 mp_size += align_up(rte_mempool_calc_obj_size(mbuf_overhead, 0, &mp_obj_sz)+
129 sizeof(struct rte_pktmbuf_pool_private),
130 huge_page_size);
131
132 //Tx
133 std::memset(&mp_obj_sz, 0, sizeof(mp_obj_sz));
134 mp_size += align_up(rte_mempool_calc_obj_size(inline_mbuf_size, 0,
135 &mp_obj_sz)+
136 sizeof(struct rte_pktmbuf_pool_private),
137 huge_page_size);
138 return mp_size;
139}
140
141static constexpr const char* pktmbuf_pool_name = "dpdk_net_pktmbuf_pool";
142
143/*
144 * When doing reads from the NIC queues, use this batch size
145 */
146static constexpr uint8_t packet_read_size = 32;
147/******************************************************************************/
148
149int DPDKDevice::init_port_start()
150{
9f95a23c 151 ceph_assert(_port_idx < rte_eth_dev_count_avail());
7c673cae
FG
152
153 rte_eth_dev_info_get(_port_idx, &_dev_info);
154
155 //
156 // This is a workaround for a missing handling of a HW limitation in the
157 // DPDK i40e driver. This and all related to _is_i40e_device code should be
158 // removed once this handling is added.
159 //
160 if (std::string("rte_i40evf_pmd") == _dev_info.driver_name ||
161 std::string("rte_i40e_pmd") == _dev_info.driver_name) {
162 ldout(cct, 1) << __func__ << " Device is an Intel's 40G NIC. Enabling 8 fragments hack!" << dendl;
163 _is_i40e_device = true;
164 }
165
166 if (std::string("rte_vmxnet3_pmd") == _dev_info.driver_name) {
167 ldout(cct, 1) << __func__ << " Device is a VMWare Virtual NIC. Enabling 16 fragments hack!" << dendl;
168 _is_vmxnet3_device = true;
169 }
170
171 //
172 // Another workaround: this time for a lack of number of RSS bits.
173 // ixgbe PF NICs support up to 16 RSS queues.
174 // ixgbe VF NICs support up to 4 RSS queues.
175 // i40e PF NICs support up to 64 RSS queues.
176 // i40e VF NICs support up to 16 RSS queues.
177 //
178 if (std::string("rte_ixgbe_pmd") == _dev_info.driver_name) {
179 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)16);
180 } else if (std::string("rte_ixgbevf_pmd") == _dev_info.driver_name) {
181 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)4);
182 } else if (std::string("rte_i40e_pmd") == _dev_info.driver_name) {
183 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)64);
184 } else if (std::string("rte_i40evf_pmd") == _dev_info.driver_name) {
185 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)16);
186 }
187
9f95a23c
TL
188 // Hardware offload capabilities
189 // https://github.com/DPDK/dpdk/blob/v19.05/lib/librte_ethdev/rte_ethdev.h#L993-L1074
190 // We want to support all available offload features
191 // TODO: below features are implemented in 17.05, should support new ones
192 const uint64_t tx_offloads_wanted =
193 DEV_TX_OFFLOAD_VLAN_INSERT |
194 DEV_TX_OFFLOAD_IPV4_CKSUM |
195 DEV_TX_OFFLOAD_UDP_CKSUM |
196 DEV_TX_OFFLOAD_TCP_CKSUM |
197 DEV_TX_OFFLOAD_SCTP_CKSUM |
198 DEV_TX_OFFLOAD_TCP_TSO |
199 DEV_TX_OFFLOAD_UDP_TSO |
200 DEV_TX_OFFLOAD_OUTER_IPV4_CKSUM |
201 DEV_TX_OFFLOAD_QINQ_INSERT |
202 DEV_TX_OFFLOAD_VXLAN_TNL_TSO |
203 DEV_TX_OFFLOAD_GRE_TNL_TSO |
204 DEV_TX_OFFLOAD_IPIP_TNL_TSO |
205 DEV_TX_OFFLOAD_GENEVE_TNL_TSO |
206 DEV_TX_OFFLOAD_MACSEC_INSERT;
207
208 _dev_info.default_txconf.offloads =
209 _dev_info.tx_offload_capa & tx_offloads_wanted;
7c673cae
FG
210
211 /* for port configuration all features are off by default */
212 rte_eth_conf port_conf = { 0 };
213
9f95a23c
TL
214 /* setting tx offloads for port */
215 port_conf.txmode.offloads = _dev_info.default_txconf.offloads;
216
7c673cae
FG
217 ldout(cct, 5) << __func__ << " Port " << int(_port_idx) << ": max_rx_queues "
218 << _dev_info.max_rx_queues << " max_tx_queues "
219 << _dev_info.max_tx_queues << dendl;
220
221 _num_queues = std::min({_num_queues, _dev_info.max_rx_queues, _dev_info.max_tx_queues});
222
223 ldout(cct, 5) << __func__ << " Port " << int(_port_idx) << ": using "
9f95a23c 224 << _num_queues << " queues" << dendl;
7c673cae
FG
225
226 // Set RSS mode: enable RSS if seastar is configured with more than 1 CPU.
227 // Even if port has a single queue we still want the RSS feature to be
228 // available in order to make HW calculate RSS hash for us.
229 if (_num_queues > 1) {
230 if (_dev_info.hash_key_size == 40) {
231 _rss_key = default_rsskey_40bytes;
232 } else if (_dev_info.hash_key_size == 52) {
233 _rss_key = default_rsskey_52bytes;
234 } else if (_dev_info.hash_key_size != 0) {
20effc67
TL
235 lderr(cct) << "Port " << int(_port_idx)
236 << ": We support only 40 or 52 bytes RSS hash keys, "
237 << int(_dev_info.hash_key_size) << " bytes key requested"
238 << dendl;
239 return -EINVAL;
7c673cae
FG
240 } else {
241 _rss_key = default_rsskey_40bytes;
242 _dev_info.hash_key_size = 40;
243 }
244
245 port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
9f95a23c
TL
246 /* enable all supported rss offloads */
247 port_conf.rx_adv_conf.rss_conf.rss_hf = _dev_info.flow_type_rss_offloads;
7c673cae
FG
248 if (_dev_info.hash_key_size) {
249 port_conf.rx_adv_conf.rss_conf.rss_key = const_cast<uint8_t *>(_rss_key.data());
250 port_conf.rx_adv_conf.rss_conf.rss_key_len = _dev_info.hash_key_size;
251 }
252 } else {
253 port_conf.rxmode.mq_mode = ETH_MQ_RX_NONE;
254 }
255
256 if (_num_queues > 1) {
257 if (_dev_info.reta_size) {
258 // RETA size should be a power of 2
11fdf7f2 259 ceph_assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0);
7c673cae
FG
260
261 // Set the RSS table to the correct size
262 _redir_table.resize(_dev_info.reta_size);
263 _rss_table_bits = std::lround(std::log2(_dev_info.reta_size));
264 ldout(cct, 5) << __func__ << " Port " << int(_port_idx)
265 << ": RSS table size is " << _dev_info.reta_size << dendl;
266 } else {
267 // FIXME: same with sw_reta
268 _redir_table.resize(128);
269 _rss_table_bits = std::lround(std::log2(128));
270 }
271 } else {
272 _redir_table.push_back(0);
273 }
274
275 // Set Rx VLAN stripping
276 if (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
9f95a23c 277 port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_VLAN_STRIP;
7c673cae
FG
278 }
279
7c673cae
FG
280#ifdef RTE_ETHDEV_HAS_LRO_SUPPORT
281 // Enable LRO
282 if (_use_lro && (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO)) {
283 ldout(cct, 1) << __func__ << " LRO is on" << dendl;
9f95a23c 284 port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_TCP_LRO;
7c673cae
FG
285 _hw_features.rx_lro = true;
286 } else
287#endif
288 ldout(cct, 1) << __func__ << " LRO is off" << dendl;
289
290 // Check that all CSUM features are either all set all together or not set
291 // all together. If this assumption breaks we need to rework the below logic
292 // by splitting the csum offload feature bit into separate bits for IPv4,
293 // TCP.
11fdf7f2 294 ceph_assert(((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
7c673cae
FG
295 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) ||
296 (!(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
297 !(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)));
298
299 // Set Rx checksum checking
300 if ((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
301 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
302 ldout(cct, 1) << __func__ << " RX checksum offload supported" << dendl;
9f95a23c 303 port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_CHECKSUM;
7c673cae
FG
304 _hw_features.rx_csum_offload = 1;
305 }
306
307 if ((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
308 ldout(cct, 1) << __func__ << " TX ip checksum offload supported" << dendl;
309 _hw_features.tx_csum_ip_offload = 1;
310 }
311
312 // TSO is supported starting from DPDK v1.8
20effc67
TL
313 // TSO is abnormal in some DPDK versions (eg.dpdk-20.11-3.e18.aarch64), try
314 // disable TSO by ms_dpdk_enable_tso=false
315 if ((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) &&
316 cct->_conf.get_val<bool>("ms_dpdk_enable_tso")) {
7c673cae
FG
317 ldout(cct, 1) << __func__ << " TSO is supported" << dendl;
318 _hw_features.tx_tso = 1;
319 }
320
321 // Check that Tx TCP CSUM features are either all set all together
322 // or not set all together. If this assumption breaks we need to rework the
323 // below logic by splitting the csum offload feature bit into separate bits
324 // for TCP.
11fdf7f2 325 ceph_assert((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) ||
7c673cae
FG
326 !(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM));
327
328 if (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) {
329 ldout(cct, 1) << __func__ << " TX TCP checksum offload supported" << dendl;
330 _hw_features.tx_csum_l4_offload = 1;
331 }
332
333 int retval;
334
335 ldout(cct, 1) << __func__ << " Port " << int(_port_idx) << " init ... " << dendl;
336
337 /*
338 * Standard DPDK port initialisation - config port, then set up
339 * rx and tx rings.
340 */
341 if ((retval = rte_eth_dev_configure(_port_idx, _num_queues, _num_queues,
342 &port_conf)) != 0) {
343 lderr(cct) << __func__ << " failed to configure port " << (int)_port_idx
344 << " rx/tx queues " << _num_queues << " error " << cpp_strerror(retval) << dendl;
345 return retval;
346 }
347
348 //rte_eth_promiscuous_enable(port_num);
349 ldout(cct, 1) << __func__ << " done." << dendl;
350
351 return 0;
352}
353
354void DPDKDevice::set_hw_flow_control()
355{
356 // Read the port's current/default flow control settings
357 struct rte_eth_fc_conf fc_conf;
358 auto ret = rte_eth_dev_flow_ctrl_get(_port_idx, &fc_conf);
359
360 if (ret == -ENOTSUP) {
361 ldout(cct, 1) << __func__ << " port " << int(_port_idx)
362 << ": not support to get hardware flow control settings: " << ret << dendl;
363 goto not_supported;
364 }
365
366 if (ret < 0) {
367 lderr(cct) << __func__ << " port " << int(_port_idx)
368 << ": failed to get hardware flow control settings: " << ret << dendl;
369 ceph_abort();
370 }
371
372 if (_enable_fc) {
373 fc_conf.mode = RTE_FC_FULL;
374 } else {
375 fc_conf.mode = RTE_FC_NONE;
376 }
377
378 ret = rte_eth_dev_flow_ctrl_set(_port_idx, &fc_conf);
379 if (ret == -ENOTSUP) {
380 ldout(cct, 1) << __func__ << " port " << int(_port_idx)
381 << ": not support to set hardware flow control settings: " << ret << dendl;
382 goto not_supported;
383 }
384
385 if (ret < 0) {
386 lderr(cct) << __func__ << " port " << int(_port_idx)
387 << ": failed to set hardware flow control settings: " << ret << dendl;
388 ceph_abort();
389 }
390
391 ldout(cct, 1) << __func__ << " port " << int(_port_idx) << ": HW FC " << _enable_fc << dendl;
392 return;
393
394not_supported:
395 ldout(cct, 1) << __func__ << " port " << int(_port_idx) << ": changing HW FC settings is not supported" << dendl;
396}
397
20effc67
TL
398class XstatSocketHook : public AdminSocketHook {
399 DPDKDevice *dev;
400 public:
401 explicit XstatSocketHook(DPDKDevice *dev) : dev(dev) {}
402 int call(std::string_view prefix, const cmdmap_t& cmdmap,
403 Formatter *f,
404 std::ostream& ss,
405 bufferlist& out) override {
406 if (prefix == "show_pmd_stats") {
407 dev->nic_stats_dump(f);
408 } else if (prefix == "show_pmd_xstats") {
409 dev->nic_xstats_dump(f);
410 }
411 return 0;
412 }
413};
414
7c673cae
FG
415int DPDKDevice::init_port_fini()
416{
417 // Changing FC requires HW reset, so set it before the port is initialized.
418 set_hw_flow_control();
419
420 if (rte_eth_dev_start(_port_idx) != 0) {
421 lderr(cct) << __func__ << " can't start port " << _port_idx << dendl;
422 return -1;
423 }
424
9f95a23c 425 if (_num_queues > 1)
7c673cae 426 set_rss_table();
7c673cae
FG
427
428 // Wait for a link
429 if (check_port_link_status() < 0) {
430 lderr(cct) << __func__ << " port link up failed " << _port_idx << dendl;
431 return -1;
432 }
433
434 ldout(cct, 5) << __func__ << " created DPDK device" << dendl;
20effc67
TL
435 AdminSocket *admin_socket = cct->get_admin_socket();
436 dfx_hook = std::make_unique<XstatSocketHook>(this);
437 int r = admin_socket->register_command("show_pmd_stats", dfx_hook.get(),
438 "show pmd stats statistics");
439 ceph_assert(r == 0);
440 r = admin_socket->register_command("show_pmd_xstats", dfx_hook.get(),
441 "show pmd xstats statistics");
442 ceph_assert(r == 0);
7c673cae
FG
443 return 0;
444}
445
9f95a23c
TL
446void DPDKDevice::set_rss_table()
447{
448 struct rte_flow_attr attr;
449 struct rte_flow_item pattern[1];
450 struct rte_flow_action action[2];
451 struct rte_flow_action_rss rss_conf;
452
453 /*
454 * set the rule attribute.
455 * in this case only ingress packets will be checked.
456 */
457 memset(&attr, 0, sizeof(struct rte_flow_attr));
458 attr.ingress = 1;
459
460 /* the final level must be always type end */
461 pattern[0].type = RTE_FLOW_ITEM_TYPE_END;
462
463 /*
464 * create the action sequence.
465 * one action only, set rss hash func to toeplitz.
466 */
467 uint16_t i = 0;
468 for (auto& r : _redir_table) {
469 r = i++ % _num_queues;
470 }
471 rss_conf.func = RTE_ETH_HASH_FUNCTION_TOEPLITZ;
472 rss_conf.types = ETH_RSS_FRAG_IPV4 | ETH_RSS_NONFRAG_IPV4_TCP;
473 rss_conf.queue_num = _num_queues;
474 rss_conf.queue = const_cast<uint16_t *>(_redir_table.data());
475 rss_conf.key_len = _dev_info.hash_key_size;
476 rss_conf.key = const_cast<uint8_t *>(_rss_key.data());
477 rss_conf.level = 0;
478 action[0].type = RTE_FLOW_ACTION_TYPE_RSS;
479 action[0].conf = &rss_conf;
480 action[1].type = RTE_FLOW_ACTION_TYPE_END;
481
482 if (rte_flow_validate(_port_idx, &attr, pattern, action, nullptr) == 0)
483 _flow = rte_flow_create(_port_idx, &attr, pattern, action, nullptr);
484 else
485 ldout(cct, 0) << __func__ << " Port " << _port_idx
486 << ": flow rss func configuration is unsupported"
487 << dendl;
488}
489
7c673cae 490void DPDKQueuePair::configure_proxies(const std::map<unsigned, float>& cpu_weights) {
11fdf7f2 491 ceph_assert(!cpu_weights.empty());
7c673cae
FG
492 if (cpu_weights.size() == 1 && cpu_weights.begin()->first == _qid) {
493 // special case queue sending to self only, to avoid requiring a hash value
494 return;
495 }
496 register_packet_provider([this] {
20effc67 497 std::optional<Packet> p;
7c673cae
FG
498 if (!_proxy_packetq.empty()) {
499 p = std::move(_proxy_packetq.front());
500 _proxy_packetq.pop_front();
501 }
502 return p;
503 });
504 build_sw_reta(cpu_weights);
505}
506
507void DPDKQueuePair::build_sw_reta(const std::map<unsigned, float>& cpu_weights) {
508 float total_weight = 0;
509 for (auto&& x : cpu_weights) {
510 total_weight += x.second;
511 }
512 float accum = 0;
513 unsigned idx = 0;
514 std::array<uint8_t, 128> reta;
515 for (auto&& entry : cpu_weights) {
516 auto cpu = entry.first;
517 auto weight = entry.second;
518 accum += weight;
519 while (idx < (accum / total_weight * reta.size() - 0.5)) {
520 reta[idx++] = cpu;
521 }
522 }
523 _sw_reta = reta;
524}
525
526
527bool DPDKQueuePair::init_rx_mbuf_pool()
528{
529 std::string name = std::string(pktmbuf_pool_name) + std::to_string(_qid) + "_rx";
530
531 // reserve the memory for Rx buffers containers
532 _rx_free_pkts.reserve(mbufs_per_queue_rx);
533 _rx_free_bufs.reserve(mbufs_per_queue_rx);
534
535 _pktmbuf_pool_rx = rte_mempool_lookup(name.c_str());
536 if (!_pktmbuf_pool_rx) {
537 ldout(cct, 1) << __func__ << " Creating Rx mbuf pool '" << name.c_str()
538 << "' [" << mbufs_per_queue_rx << " mbufs] ..."<< dendl;
539
540 //
541 // Don't pass single-producer/single-consumer flags to mbuf create as it
542 // seems faster to use a cache instead.
543 //
544 struct rte_pktmbuf_pool_private roomsz = {};
545 roomsz.mbuf_data_room_size = mbuf_data_size + RTE_PKTMBUF_HEADROOM;
546 _pktmbuf_pool_rx = rte_mempool_create(
547 name.c_str(),
11fdf7f2 548 mbufs_per_queue_rx, mbuf_overhead + mbuf_data_size,
7c673cae
FG
549 mbuf_cache_size,
550 sizeof(struct rte_pktmbuf_pool_private),
551 rte_pktmbuf_pool_init, as_cookie(roomsz),
552 rte_pktmbuf_init, nullptr,
553 rte_socket_id(), 0);
554 if (!_pktmbuf_pool_rx) {
555 lderr(cct) << __func__ << " Failed to create mempool for rx" << dendl;
556 return false;
557 }
558
559 //
11fdf7f2
TL
560 // allocate more data buffer
561 int bufs_count = cct->_conf->ms_dpdk_rx_buffer_count_per_core - mbufs_per_queue_rx;
562 int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
563 std::string mz_name = "rx_buffer_data" + std::to_string(_qid);
564 const struct rte_memzone *mz = rte_memzone_reserve_aligned(mz_name.c_str(),
565 mbuf_data_size*bufs_count, _pktmbuf_pool_rx->socket_id, mz_flags, mbuf_data_size);
566 ceph_assert(mz);
567 void* m = mz->addr;
568 for (int i = 0; i < bufs_count; i++) {
569 ceph_assert(m);
7c673cae 570 _alloc_bufs.push_back(m);
11fdf7f2 571 m += mbuf_data_size;
7c673cae
FG
572 }
573
7c673cae
FG
574 if (rte_eth_rx_queue_setup(_dev_port_idx, _qid, default_ring_size,
575 rte_eth_dev_socket_id(_dev_port_idx),
576 _dev->def_rx_conf(), _pktmbuf_pool_rx) < 0) {
577 lderr(cct) << __func__ << " cannot initialize rx queue" << dendl;
578 return false;
579 }
580 }
581
7c673cae
FG
582 return _pktmbuf_pool_rx != nullptr;
583}
584
585int DPDKDevice::check_port_link_status()
586{
587 int count = 0;
588
589 ldout(cct, 20) << __func__ << dendl;
590 const int sleep_time = 100 * 1000;
591 const int max_check_time = 90; /* 9s (90 * 100ms) in total */
592 while (true) {
593 struct rte_eth_link link;
594 memset(&link, 0, sizeof(link));
595 rte_eth_link_get_nowait(_port_idx, &link);
596
597 if (true) {
598 if (link.link_status) {
599 ldout(cct, 5) << __func__ << " done port "
600 << static_cast<unsigned>(_port_idx)
601 << " link Up - speed " << link.link_speed
602 << " Mbps - "
603 << ((link.link_duplex == ETH_LINK_FULL_DUPLEX) ? ("full-duplex") : ("half-duplex\n"))
604 << dendl;
605 break;
606 } else if (count++ < max_check_time) {
607 ldout(cct, 20) << __func__ << " not ready, continue to wait." << dendl;
608 usleep(sleep_time);
609 } else {
11fdf7f2 610 lderr(cct) << __func__ << " done port " << _port_idx << " link down" << dendl;
7c673cae
FG
611 return -1;
612 }
613 }
614 }
615 return 0;
616}
617
618class C_handle_dev_stats : public EventCallback {
619 DPDKQueuePair *_qp;
620 public:
621 C_handle_dev_stats(DPDKQueuePair *qp): _qp(qp) { }
11fdf7f2 622 void do_request(uint64_t id) {
7c673cae
FG
623 _qp->handle_stats();
624 }
625};
626
627DPDKQueuePair::DPDKQueuePair(CephContext *c, EventCenter *cen, DPDKDevice* dev, uint8_t qid)
628 : cct(c), _dev(dev), _dev_port_idx(dev->port_idx()), center(cen), _qid(qid),
629 _tx_poller(this), _rx_gc_poller(this), _tx_buf_factory(c, dev, qid),
630 _tx_gc_poller(this)
631{
632 if (!init_rx_mbuf_pool()) {
633 lderr(cct) << __func__ << " cannot initialize mbuf pools" << dendl;
634 ceph_abort();
635 }
636
637 static_assert(offsetof(tx_buf, private_end) -
638 offsetof(tx_buf, private_start) <= RTE_PKTMBUF_HEADROOM,
639 "RTE_PKTMBUF_HEADROOM is less than DPDKQueuePair::tx_buf size! "
640 "Increase the headroom size in the DPDK configuration");
641 static_assert(offsetof(tx_buf, _mbuf) == 0,
642 "There is a pad at the beginning of the tx_buf before _mbuf "
643 "field!");
644 static_assert((inline_mbuf_data_size & (inline_mbuf_data_size - 1)) == 0,
645 "inline_mbuf_data_size has to be a power of two!");
646
647 std::string name(std::string("queue") + std::to_string(qid));
648 PerfCountersBuilder plb(cct, name, l_dpdk_qp_first, l_dpdk_qp_last);
649
650 plb.add_u64_counter(l_dpdk_qp_rx_packets, "dpdk_receive_packets", "DPDK received packets");
651 plb.add_u64_counter(l_dpdk_qp_tx_packets, "dpdk_send_packets", "DPDK sendd packets");
652 plb.add_u64_counter(l_dpdk_qp_rx_bad_checksum_errors, "dpdk_receive_bad_checksum_errors", "DPDK received bad checksum packets");
653 plb.add_u64_counter(l_dpdk_qp_rx_no_memory_errors, "dpdk_receive_no_memory_errors", "DPDK received no memory packets");
11fdf7f2
TL
654 plb.add_u64_counter(l_dpdk_qp_rx_bytes, "dpdk_receive_bytes", "DPDK received bytes", NULL, 0, unit_t(UNIT_BYTES));
655 plb.add_u64_counter(l_dpdk_qp_tx_bytes, "dpdk_send_bytes", "DPDK sendd bytes", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
656 plb.add_u64_counter(l_dpdk_qp_rx_last_bunch, "dpdk_receive_last_bunch", "DPDK last received bunch");
657 plb.add_u64_counter(l_dpdk_qp_tx_last_bunch, "dpdk_send_last_bunch", "DPDK last send bunch");
658 plb.add_u64_counter(l_dpdk_qp_rx_fragments, "dpdk_receive_fragments", "DPDK received total fragments");
659 plb.add_u64_counter(l_dpdk_qp_tx_fragments, "dpdk_send_fragments", "DPDK sendd total fragments");
660 plb.add_u64_counter(l_dpdk_qp_rx_copy_ops, "dpdk_receive_copy_ops", "DPDK received copy operations");
661 plb.add_u64_counter(l_dpdk_qp_tx_copy_ops, "dpdk_send_copy_ops", "DPDK sendd copy operations");
11fdf7f2
TL
662 plb.add_u64_counter(l_dpdk_qp_rx_copy_bytes, "dpdk_receive_copy_bytes", "DPDK received copy bytes", NULL, 0, unit_t(UNIT_BYTES));
663 plb.add_u64_counter(l_dpdk_qp_tx_copy_bytes, "dpdk_send_copy_bytes", "DPDK send copy bytes", NULL, 0, unit_t(UNIT_BYTES));
7c673cae
FG
664 plb.add_u64_counter(l_dpdk_qp_rx_linearize_ops, "dpdk_receive_linearize_ops", "DPDK received linearize operations");
665 plb.add_u64_counter(l_dpdk_qp_tx_linearize_ops, "dpdk_send_linearize_ops", "DPDK send linearize operations");
666 plb.add_u64_counter(l_dpdk_qp_tx_queue_length, "dpdk_send_queue_length", "DPDK send queue length");
667
668 perf_logger = plb.create_perf_counters();
669 cct->get_perfcounters_collection()->add(perf_logger);
670
671 if (!_qid)
672 device_stat_time_fd = center->create_time_event(1000*1000, new C_handle_dev_stats(this));
673}
674
20effc67
TL
675void DPDKDevice::nic_stats_dump(Formatter *f)
676{
677 static uint64_t prev_pkts_rx[RTE_MAX_ETHPORTS];
678 static uint64_t prev_pkts_tx[RTE_MAX_ETHPORTS];
679 static uint64_t prev_cycles[RTE_MAX_ETHPORTS];
680 size_t tx_fragments = 0;
681 size_t rx_fragments = 0;
682 size_t tx_free_cnt = 0;
683 size_t rx_free_cnt = 0;
684
685 for (auto &qp: _queues) {
686 tx_fragments += qp->perf_logger->get(l_dpdk_qp_tx_fragments);
687 rx_fragments += qp->perf_logger->get(l_dpdk_qp_rx_fragments);
688 tx_free_cnt += qp->_tx_buf_factory.ring_size();
689 rx_free_cnt += rte_mempool_avail_count(qp->_pktmbuf_pool_rx);
690 }
691 struct rte_eth_stats stats;
692 rte_eth_stats_get(_port_idx, &stats);
693 f->open_object_section("RX");
694 f->dump_unsigned("in_packets", stats.ipackets);
695 f->dump_unsigned("recv_packets", rx_fragments);
696 f->dump_unsigned("in_bytes", stats.ibytes);
697 f->dump_unsigned("missed", stats.imissed);
698 f->dump_unsigned("errors", stats.ierrors);
699 f->close_section();
700
701 f->open_object_section("TX");
702 f->dump_unsigned("out_packets", stats.opackets);
703 f->dump_unsigned("send_packets", tx_fragments);
704 f->dump_unsigned("out_bytes", stats.obytes);
705 f->dump_unsigned("errors", stats.oerrors);
706 f->close_section();
707
708 f->open_object_section("stats");
709 f->dump_unsigned("RX_nombuf", stats.rx_nombuf);
710 f->dump_unsigned("RX_avail_mbufs", rx_free_cnt);
711 f->dump_unsigned("TX_avail_mbufs", tx_free_cnt);
712
713 uint64_t diff_cycles = prev_cycles[_port_idx];
714 prev_cycles[_port_idx] = rte_rdtsc();
715 if (diff_cycles > 0) {
716 diff_cycles = prev_cycles[_port_idx] - diff_cycles;
717 }
718
719 uint64_t diff_pkts_rx = (stats.ipackets > prev_pkts_rx[_port_idx]) ?
720 (stats.ipackets - prev_pkts_rx[_port_idx]) : 0;
721 uint64_t diff_pkts_tx = (stats.opackets > prev_pkts_tx[_port_idx]) ?
722 (stats.opackets - prev_pkts_tx[_port_idx]) : 0;
723 prev_pkts_rx[_port_idx] = stats.ipackets;
724 prev_pkts_tx[_port_idx] = stats.opackets;
725 uint64_t mpps_rx = diff_cycles > 0 ? diff_pkts_rx * rte_get_tsc_hz() / diff_cycles : 0;
726 uint64_t mpps_tx = diff_cycles > 0 ? diff_pkts_tx * rte_get_tsc_hz() / diff_cycles : 0;
727 f->dump_unsigned("Rx_pps", mpps_rx);
728 f->dump_unsigned("Tx_pps", mpps_tx);
729 f->close_section();
730}
731
732void DPDKDevice::nic_xstats_dump(Formatter *f)
733{
734 // Get count
735 int cnt_xstats = rte_eth_xstats_get_names(_port_idx, NULL, 0);
736 if (cnt_xstats < 0) {
737 ldout(cct, 1) << "Error: Cannot get count of xstats" << dendl;
738 return;
739 }
740
741 // Get id-name lookup table
742 std::vector<struct rte_eth_xstat_name> xstats_names(cnt_xstats);
743 if (cnt_xstats != rte_eth_xstats_get_names(_port_idx, xstats_names.data(), cnt_xstats)) {
744 ldout(cct, 1) << "Error: Cannot get xstats lookup" << dendl;
745 return;
746 }
747
748 // Get stats themselves
749 std::vector<struct rte_eth_xstat> xstats(cnt_xstats);
750 if (cnt_xstats != rte_eth_xstats_get(_port_idx, xstats.data(), cnt_xstats)) {
751 ldout(cct, 1) << "Error: Unable to get xstats" << dendl;
752 return;
753 }
754 f->open_object_section("xstats");
755 for (int i = 0; i < cnt_xstats; i++){
756 f->dump_unsigned(xstats_names[i].name, xstats[i].value);
757 }
758 f->close_section();
759}
760
7c673cae
FG
761void DPDKQueuePair::handle_stats()
762{
763 ldout(cct, 20) << __func__ << " started." << dendl;
764 rte_eth_stats rte_stats = {};
765 int rc = rte_eth_stats_get(_dev_port_idx, &rte_stats);
766
767 if (rc) {
768 ldout(cct, 0) << __func__ << " failed to get port statistics: " << cpp_strerror(rc) << dendl;
769 return ;
770 }
771
772#if RTE_VERSION < RTE_VERSION_NUM(16,7,0,0)
773 _dev->perf_logger->set(l_dpdk_dev_rx_mcast, rte_stats.imcasts);
774 _dev->perf_logger->set(l_dpdk_dev_rx_badcrc_errors, rte_stats.ibadcrc);
775#endif
776 _dev->perf_logger->set(l_dpdk_dev_rx_dropped_errors, rte_stats.imissed);
777 _dev->perf_logger->set(l_dpdk_dev_rx_nombuf_errors, rte_stats.rx_nombuf);
778
779 _dev->perf_logger->set(l_dpdk_dev_rx_total_errors, rte_stats.ierrors);
780 _dev->perf_logger->set(l_dpdk_dev_tx_total_errors, rte_stats.oerrors);
781 device_stat_time_fd = center->create_time_event(1000*1000, new C_handle_dev_stats(this));
782}
783
784bool DPDKQueuePair::poll_tx() {
785 bool nonloopback = !cct->_conf->ms_dpdk_debug_allow_loopback;
786#ifdef CEPH_PERF_DEV
787 uint64_t start = Cycles::rdtsc();
788#endif
789 uint32_t total_work = 0;
790 if (_tx_packetq.size() < 16) {
791 // refill send queue from upper layers
792 uint32_t work;
793 do {
794 work = 0;
795 for (auto&& pr : _pkt_providers) {
796 auto p = pr();
797 if (p) {
798 work++;
799 if (likely(nonloopback)) {
800 // ldout(cct, 0) << __func__ << " len: " << p->len() << " frags: " << p->nr_frags() << dendl;
801 _tx_packetq.push_back(std::move(*p));
802 } else {
803 auto th = p->get_header<eth_hdr>(0);
804 if (th->dst_mac == th->src_mac) {
805 _dev->l2receive(_qid, std::move(*p));
806 } else {
807 _tx_packetq.push_back(std::move(*p));
808 }
809 }
810 if (_tx_packetq.size() == 128) {
811 break;
812 }
813 }
814 }
815 total_work += work;
816 } while (work && total_work < 256 && _tx_packetq.size() < 128);
817 }
818 if (!_tx_packetq.empty()) {
819 uint64_t c = send(_tx_packetq);
820 perf_logger->inc(l_dpdk_qp_tx_packets, c);
821 perf_logger->set(l_dpdk_qp_tx_last_bunch, c);
822#ifdef CEPH_PERF_DEV
823 tx_count += total_work;
824 tx_cycles += Cycles::rdtsc() - start;
825#endif
826 return true;
827 }
828
829 return false;
830}
831
20effc67 832inline std::optional<Packet> DPDKQueuePair::from_mbuf_lro(rte_mbuf* m)
7c673cae
FG
833{
834 _frags.clear();
835 _bufs.clear();
836
837 for (; m != nullptr; m = m->next) {
838 char* data = rte_pktmbuf_mtod(m, char*);
839
840 _frags.emplace_back(fragment{data, rte_pktmbuf_data_len(m)});
841 _bufs.push_back(data);
842 }
843
844 auto del = std::bind(
845 [this](std::vector<char*> &bufs) {
846 for (auto&& b : bufs) { _alloc_bufs.push_back(b); }
847 }, std::move(_bufs));
848 return Packet(
849 _frags.begin(), _frags.end(), make_deleter(std::move(del)));
850}
851
20effc67 852inline std::optional<Packet> DPDKQueuePair::from_mbuf(rte_mbuf* m)
7c673cae
FG
853{
854 _rx_free_pkts.push_back(m);
855 _num_rx_free_segs += m->nb_segs;
856
857 if (!_dev->hw_features_ref().rx_lro || rte_pktmbuf_is_contiguous(m)) {
858 char* data = rte_pktmbuf_mtod(m, char*);
859
860 return Packet(fragment{data, rte_pktmbuf_data_len(m)},
861 make_deleter([this, data] { _alloc_bufs.push_back(data); }));
862 } else {
863 return from_mbuf_lro(m);
864 }
865}
866
867inline bool DPDKQueuePair::refill_one_cluster(rte_mbuf* head)
868{
869 for (; head != nullptr; head = head->next) {
870 if (!refill_rx_mbuf(head, mbuf_data_size, _alloc_bufs)) {
871 //
872 // If we failed to allocate a new buffer - push the rest of the
873 // cluster back to the free_packets list for a later retry.
874 //
875 _rx_free_pkts.push_back(head);
876 return false;
877 }
878 _rx_free_bufs.push_back(head);
879 }
880
881 return true;
882}
883
884bool DPDKQueuePair::rx_gc(bool force)
885{
886 if (_num_rx_free_segs >= rx_gc_thresh || force) {
887 ldout(cct, 10) << __func__ << " free segs " << _num_rx_free_segs
888 << " thresh " << rx_gc_thresh
889 << " free pkts " << _rx_free_pkts.size()
7c673cae
FG
890 << dendl;
891
892 while (!_rx_free_pkts.empty()) {
893 //
894 // Use back() + pop_back() semantics to avoid an extra
895 // _rx_free_pkts.clear() at the end of the function - clear() has a
896 // linear complexity.
897 //
898 auto m = _rx_free_pkts.back();
899 _rx_free_pkts.pop_back();
900
901 if (!refill_one_cluster(m)) {
902 ldout(cct, 1) << __func__ << " get new mbuf failed " << dendl;
903 break;
904 }
905 }
11fdf7f2
TL
906 for (auto&& m : _rx_free_bufs) {
907 rte_pktmbuf_prefree_seg(m);
908 }
7c673cae
FG
909
910 if (_rx_free_bufs.size()) {
911 rte_mempool_put_bulk(_pktmbuf_pool_rx,
912 (void **)_rx_free_bufs.data(),
913 _rx_free_bufs.size());
914
11fdf7f2
TL
915 // TODO: ceph_assert() in a fast path! Remove me ASAP!
916 ceph_assert(_num_rx_free_segs >= _rx_free_bufs.size());
7c673cae
FG
917
918 _num_rx_free_segs -= _rx_free_bufs.size();
919 _rx_free_bufs.clear();
920
11fdf7f2
TL
921 // TODO: ceph_assert() in a fast path! Remove me ASAP!
922 ceph_assert((_rx_free_pkts.empty() && !_num_rx_free_segs) ||
7c673cae
FG
923 (!_rx_free_pkts.empty() && _num_rx_free_segs));
924 }
925 }
926
927 return _num_rx_free_segs >= rx_gc_thresh;
928}
929
930
931void DPDKQueuePair::process_packets(
932 struct rte_mbuf **bufs, uint16_t count)
933{
934 uint64_t nr_frags = 0, bytes = 0;
935
936 for (uint16_t i = 0; i < count; i++) {
937 struct rte_mbuf *m = bufs[i];
938 offload_info oi;
939
20effc67 940 std::optional<Packet> p = from_mbuf(m);
7c673cae
FG
941
942 // Drop the packet if translation above has failed
943 if (!p) {
944 perf_logger->inc(l_dpdk_qp_rx_no_memory_errors);
945 continue;
946 }
947 // ldout(cct, 0) << __func__ << " len " << p->len() << " " << dendl;
948
949 nr_frags += m->nb_segs;
950 bytes += m->pkt_len;
951
952 // Set stipped VLAN value if available
953 if ((_dev->_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) &&
11fdf7f2 954 (m->ol_flags & PKT_RX_VLAN_STRIPPED)) {
7c673cae
FG
955 oi.vlan_tci = m->vlan_tci;
956 }
957
958 if (_dev->get_hw_features().rx_csum_offload) {
959 if (m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
960 // Packet with bad checksum, just drop it.
961 perf_logger->inc(l_dpdk_qp_rx_bad_checksum_errors);
962 continue;
963 }
964 // Note that when _hw_features.rx_csum_offload is on, the receive
965 // code for ip, tcp and udp will assume they don't need to check
966 // the checksum again, because we did this here.
967 }
968
969 p->set_offload_info(oi);
970 if (m->ol_flags & PKT_RX_RSS_HASH) {
971 p->set_rss_hash(m->hash.rss);
972 }
973
974 _dev->l2receive(_qid, std::move(*p));
975 }
976
977 perf_logger->inc(l_dpdk_qp_rx_packets, count);
978 perf_logger->set(l_dpdk_qp_rx_last_bunch, count);
979 perf_logger->inc(l_dpdk_qp_rx_fragments, nr_frags);
980 perf_logger->inc(l_dpdk_qp_rx_bytes, bytes);
981}
982
983bool DPDKQueuePair::poll_rx_once()
984{
985 struct rte_mbuf *buf[packet_read_size];
986
987 /* read a port */
988#ifdef CEPH_PERF_DEV
989 uint64_t start = Cycles::rdtsc();
990#endif
991 uint16_t count = rte_eth_rx_burst(_dev_port_idx, _qid,
992 buf, packet_read_size);
993
994 /* Now process the NIC packets read */
995 if (likely(count > 0)) {
996 process_packets(buf, count);
997#ifdef CEPH_PERF_DEV
998 rx_cycles = Cycles::rdtsc() - start;
999 rx_count += count;
1000#endif
1001 }
1002#ifdef CEPH_PERF_DEV
1003 else {
1004 if (rx_count > 10000 && tx_count) {
1005 ldout(cct, 0) << __func__ << " rx count=" << rx_count << " avg rx=" << Cycles::to_nanoseconds(rx_cycles)/rx_count << "ns "
1006 << " tx count=" << tx_count << " avg tx=" << Cycles::to_nanoseconds(tx_cycles)/tx_count << "ns"
1007 << dendl;
1008 rx_count = rx_cycles = tx_count = tx_cycles = 0;
1009 }
1010 }
1011#endif
1012
1013 return count;
1014}
1015
1016DPDKQueuePair::tx_buf_factory::tx_buf_factory(CephContext *c,
1017 DPDKDevice *dev, uint8_t qid): cct(c)
1018{
1019 std::string name = std::string(pktmbuf_pool_name) + std::to_string(qid) + "_tx";
1020
1021 _pool = rte_mempool_lookup(name.c_str());
1022 if (!_pool) {
1023 ldout(cct, 0) << __func__ << " Creating Tx mbuf pool '" << name.c_str()
1024 << "' [" << mbufs_per_queue_tx << " mbufs] ..." << dendl;
1025 //
1026 // We are going to push the buffers from the mempool into
1027 // the circular_buffer and then poll them from there anyway, so
1028 // we prefer to make a mempool non-atomic in this case.
1029 //
1030 _pool = rte_mempool_create(name.c_str(),
1031 mbufs_per_queue_tx, inline_mbuf_size,
1032 mbuf_cache_size,
1033 sizeof(struct rte_pktmbuf_pool_private),
1034 rte_pktmbuf_pool_init, nullptr,
1035 rte_pktmbuf_init, nullptr,
1036 rte_socket_id(), 0);
1037
1038 if (!_pool) {
1039 lderr(cct) << __func__ << " Failed to create mempool for Tx" << dendl;
1040 ceph_abort();
1041 }
1042 if (rte_eth_tx_queue_setup(dev->port_idx(), qid, default_ring_size,
1043 rte_eth_dev_socket_id(dev->port_idx()),
1044 dev->def_tx_conf()) < 0) {
1045 lderr(cct) << __func__ << " cannot initialize tx queue" << dendl;
1046 ceph_abort();
1047 }
1048 }
1049
1050 //
1051 // Fill the factory with the buffers from the mempool allocated
1052 // above.
1053 //
1054 init_factory();
1055}
1056
1057bool DPDKQueuePair::tx_buf::i40e_should_linearize(rte_mbuf *head)
1058{
1059 bool is_tso = head->ol_flags & PKT_TX_TCP_SEG;
1060
1061 // For a non-TSO case: number of fragments should not exceed 8
1062 if (!is_tso){
1063 return head->nb_segs > i40e_max_xmit_segment_frags;
1064 }
1065
1066 //
1067 // For a TSO case each MSS window should not include more than 8
1068 // fragments including headers.
1069 //
1070
1071 // Calculate the number of frags containing headers.
1072 //
1073 // Note: we support neither VLAN nor tunneling thus headers size
1074 // accounting is super simple.
1075 //
1076 size_t headers_size = head->l2_len + head->l3_len + head->l4_len;
1077 unsigned hdr_frags = 0;
1078 size_t cur_payload_len = 0;
1079 rte_mbuf *cur_seg = head;
1080
1081 while (cur_seg && cur_payload_len < headers_size) {
1082 cur_payload_len += cur_seg->data_len;
1083 cur_seg = cur_seg->next;
1084 hdr_frags++;
1085 }
1086
1087 //
1088 // Header fragments will be used for each TSO segment, thus the
1089 // maximum number of data segments will be 8 minus the number of
1090 // header fragments.
1091 //
1092 // It's unclear from the spec how the first TSO segment is treated
1093 // if the last fragment with headers contains some data bytes:
1094 // whether this fragment will be accounted as a single fragment or
1095 // as two separate fragments. We prefer to play it safe and assume
1096 // that this fragment will be accounted as two separate fragments.
1097 //
1098 size_t max_win_size = i40e_max_xmit_segment_frags - hdr_frags;
1099
1100 if (head->nb_segs <= max_win_size) {
1101 return false;
1102 }
1103
1104 // Get the data (without headers) part of the first data fragment
1105 size_t prev_frag_data = cur_payload_len - headers_size;
1106 auto mss = head->tso_segsz;
1107
1108 while (cur_seg) {
1109 unsigned frags_in_seg = 0;
1110 size_t cur_seg_size = 0;
1111
1112 if (prev_frag_data) {
1113 cur_seg_size = prev_frag_data;
1114 frags_in_seg++;
1115 prev_frag_data = 0;
1116 }
1117
1118 while (cur_seg_size < mss && cur_seg) {
1119 cur_seg_size += cur_seg->data_len;
1120 cur_seg = cur_seg->next;
1121 frags_in_seg++;
1122
1123 if (frags_in_seg > max_win_size) {
1124 return true;
1125 }
1126 }
1127
1128 if (cur_seg_size > mss) {
1129 prev_frag_data = cur_seg_size - mss;
1130 }
1131 }
1132
1133 return false;
1134}
1135
1136void DPDKQueuePair::tx_buf::set_cluster_offload_info(const Packet& p, const DPDKQueuePair& qp, rte_mbuf* head)
1137{
1138 // Handle TCP checksum offload
1139 auto oi = p.offload_info();
1140 if (oi.needs_ip_csum) {
1141 head->ol_flags |= PKT_TX_IP_CKSUM;
1142 // TODO: Take a VLAN header into an account here
20effc67 1143 head->l2_len = sizeof(struct rte_ether_hdr);
7c673cae
FG
1144 head->l3_len = oi.ip_hdr_len;
1145 }
1146 if (qp.port().get_hw_features().tx_csum_l4_offload) {
1147 if (oi.protocol == ip_protocol_num::tcp) {
1148 head->ol_flags |= PKT_TX_TCP_CKSUM;
1149 // TODO: Take a VLAN header into an account here
20effc67 1150 head->l2_len = sizeof(struct rte_ether_hdr);
7c673cae
FG
1151 head->l3_len = oi.ip_hdr_len;
1152
1153 if (oi.tso_seg_size) {
11fdf7f2 1154 ceph_assert(oi.needs_ip_csum);
7c673cae
FG
1155 head->ol_flags |= PKT_TX_TCP_SEG;
1156 head->l4_len = oi.tcp_hdr_len;
1157 head->tso_segsz = oi.tso_seg_size;
1158 }
1159 }
1160 }
1161}
1162
1163DPDKQueuePair::tx_buf* DPDKQueuePair::tx_buf::from_packet_zc(
1164 CephContext *cct, Packet&& p, DPDKQueuePair& qp)
1165{
1166 // Too fragmented - linearize
1167 if (p.nr_frags() > max_frags) {
1168 p.linearize();
1169 qp.perf_logger->inc(l_dpdk_qp_tx_linearize_ops);
1170 }
1171
1172 build_mbuf_cluster:
1173 rte_mbuf *head = nullptr, *last_seg = nullptr;
1174 unsigned nsegs = 0;
1175
1176 //
1177 // Create a HEAD of the fragmented packet: check if frag0 has to be
1178 // copied and if yes - send it in a copy way
1179 //
1180 if (!check_frag0(p)) {
1181 if (!copy_one_frag(qp, p.frag(0), head, last_seg, nsegs)) {
1182 ldout(cct, 1) << __func__ << " no available mbuf for " << p.frag(0).size << dendl;
1183 return nullptr;
1184 }
1185 } else if (!translate_one_frag(qp, p.frag(0), head, last_seg, nsegs)) {
1186 ldout(cct, 1) << __func__ << " no available mbuf for " << p.frag(0).size << dendl;
1187 return nullptr;
1188 }
1189
1190 unsigned total_nsegs = nsegs;
1191
1192 for (unsigned i = 1; i < p.nr_frags(); i++) {
1193 rte_mbuf *h = nullptr, *new_last_seg = nullptr;
1194 if (!translate_one_frag(qp, p.frag(i), h, new_last_seg, nsegs)) {
1195 ldout(cct, 1) << __func__ << " no available mbuf for " << p.frag(i).size << dendl;
1196 me(head)->recycle();
1197 return nullptr;
1198 }
1199
1200 total_nsegs += nsegs;
1201
1202 // Attach a new buffers' chain to the packet chain
1203 last_seg->next = h;
1204 last_seg = new_last_seg;
1205 }
1206
1207 // Update the HEAD buffer with the packet info
1208 head->pkt_len = p.len();
1209 head->nb_segs = total_nsegs;
9f95a23c
TL
1210 // tx_pkt_burst loops until the next pointer is null, so last_seg->next must
1211 // be null.
1212 last_seg->next = nullptr;
7c673cae
FG
1213
1214 set_cluster_offload_info(p, qp, head);
1215
1216 //
1217 // If a packet hasn't been linearized already and the resulting
1218 // cluster requires the linearisation due to HW limitation:
1219 //
1220 // - Recycle the cluster.
1221 // - Linearize the packet.
1222 // - Build the cluster once again
1223 //
1224 if (head->nb_segs > max_frags ||
1225 (p.nr_frags() > 1 && qp.port().is_i40e_device() && i40e_should_linearize(head)) ||
1226 (p.nr_frags() > vmxnet3_max_xmit_segment_frags && qp.port().is_vmxnet3_device())) {
1227 me(head)->recycle();
1228 p.linearize();
1229 qp.perf_logger->inc(l_dpdk_qp_tx_linearize_ops);
1230
1231 goto build_mbuf_cluster;
1232 }
1233
1234 me(last_seg)->set_packet(std::move(p));
1235
1236 return me(head);
1237}
1238
1239void DPDKQueuePair::tx_buf::copy_packet_to_cluster(const Packet& p, rte_mbuf* head)
1240{
1241 rte_mbuf* cur_seg = head;
1242 size_t cur_seg_offset = 0;
1243 unsigned cur_frag_idx = 0;
1244 size_t cur_frag_offset = 0;
1245
1246 while (true) {
1247 size_t to_copy = std::min(p.frag(cur_frag_idx).size - cur_frag_offset,
1248 inline_mbuf_data_size - cur_seg_offset);
1249
1250 memcpy(rte_pktmbuf_mtod_offset(cur_seg, void*, cur_seg_offset),
1251 p.frag(cur_frag_idx).base + cur_frag_offset, to_copy);
1252
1253 cur_frag_offset += to_copy;
1254 cur_seg_offset += to_copy;
1255
1256 if (cur_frag_offset >= p.frag(cur_frag_idx).size) {
1257 ++cur_frag_idx;
1258 if (cur_frag_idx >= p.nr_frags()) {
1259 //
1260 // We are done - set the data size of the last segment
1261 // of the cluster.
1262 //
1263 cur_seg->data_len = cur_seg_offset;
1264 break;
1265 }
1266
1267 cur_frag_offset = 0;
1268 }
1269
1270 if (cur_seg_offset >= inline_mbuf_data_size) {
1271 cur_seg->data_len = inline_mbuf_data_size;
1272 cur_seg = cur_seg->next;
1273 cur_seg_offset = 0;
1274
1275 // FIXME: assert in a fast-path - remove!!!
11fdf7f2 1276 ceph_assert(cur_seg);
7c673cae
FG
1277 }
1278 }
1279}
1280
1281DPDKQueuePair::tx_buf* DPDKQueuePair::tx_buf::from_packet_copy(Packet&& p, DPDKQueuePair& qp)
1282{
1283 // sanity
1284 if (!p.len()) {
1285 return nullptr;
1286 }
1287
1288 /*
1289 * Here we are going to use the fact that the inline data size is a
1290 * power of two.
1291 *
1292 * We will first try to allocate the cluster and only if we are
1293 * successful - we will go and copy the data.
1294 */
1295 auto aligned_len = align_up((size_t)p.len(), inline_mbuf_data_size);
1296 unsigned nsegs = aligned_len / inline_mbuf_data_size;
1297 rte_mbuf *head = nullptr, *last_seg = nullptr;
1298
1299 tx_buf* buf = qp.get_tx_buf();
1300 if (!buf) {
1301 return nullptr;
1302 }
1303
1304 head = buf->rte_mbuf_p();
1305 last_seg = head;
1306 for (unsigned i = 1; i < nsegs; i++) {
1307 buf = qp.get_tx_buf();
1308 if (!buf) {
1309 me(head)->recycle();
1310 return nullptr;
1311 }
1312
1313 last_seg->next = buf->rte_mbuf_p();
1314 last_seg = last_seg->next;
1315 }
1316
1317 //
1318 // If we've got here means that we have succeeded already!
1319 // We only need to copy the data and set the head buffer with the
1320 // relevant info.
1321 //
1322 head->pkt_len = p.len();
1323 head->nb_segs = nsegs;
9f95a23c
TL
1324 // tx_pkt_burst loops until the next pointer is null, so last_seg->next must
1325 // be null.
1326 last_seg->next = nullptr;
7c673cae
FG
1327
1328 copy_packet_to_cluster(p, head);
1329 set_cluster_offload_info(p, qp, head);
1330
1331 return me(head);
1332}
1333
1334size_t DPDKQueuePair::tx_buf::copy_one_data_buf(
1335 DPDKQueuePair& qp, rte_mbuf*& m, char* data, size_t buf_len)
1336{
1337 tx_buf* buf = qp.get_tx_buf();
1338 if (!buf) {
1339 return 0;
1340 }
1341
1342 size_t len = std::min(buf_len, inline_mbuf_data_size);
1343
1344 m = buf->rte_mbuf_p();
1345
1346 // mbuf_put()
1347 m->data_len = len;
1348 m->pkt_len = len;
1349
1350 qp.perf_logger->inc(l_dpdk_qp_tx_copy_ops);
1351 qp.perf_logger->inc(l_dpdk_qp_tx_copy_bytes, len);
1352
1353 memcpy(rte_pktmbuf_mtod(m, void*), data, len);
1354
1355 return len;
1356}
1357
7c673cae
FG
1358/******************************** Interface functions *************************/
1359
1360std::unique_ptr<DPDKDevice> create_dpdk_net_device(
1361 CephContext *cct,
1362 unsigned cores,
1363 uint8_t port_idx,
1364 bool use_lro,
1365 bool enable_fc)
1366{
1367 // Check that we have at least one DPDK-able port
9f95a23c 1368 if (rte_eth_dev_count_avail() == 0) {
20effc67 1369 ceph_assert(false && "No Ethernet ports - bye\n");
7c673cae 1370 } else {
9f95a23c 1371 ldout(cct, 10) << __func__ << " ports number: " << int(rte_eth_dev_count_avail()) << dendl;
7c673cae
FG
1372 }
1373
1374 return std::unique_ptr<DPDKDevice>(
1375 new DPDKDevice(cct, port_idx, cores, use_lro, enable_fc));
1376}