]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/dpdk/DPDK.cc
update sources to 12.2.8
[ceph.git] / ceph / src / msg / async / dpdk / DPDK.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 /*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 /*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 */
22 /*
23 * Ceph - scalable distributed file system
24 *
25 * Copyright (C) 2015 XSky <haomai@xsky.com>
26 *
27 * Author: Haomai Wang <haomaiwang@gmail.com>
28 *
29 * This is free software; you can redistribute it and/or
30 * modify it under the terms of the GNU Lesser General Public
31 * License version 2.1, as published by the Free Software
32 * Foundation. See file COPYING.
33 *
34 */
35
36 #include <atomic>
37 #include <vector>
38 #include <queue>
39
40 #include <rte_config.h>
41 #include <rte_common.h>
42 #include <rte_eal.h>
43 #include <rte_pci.h>
44 #include <rte_ethdev.h>
45 #include <rte_cycles.h>
46 #include <rte_memzone.h>
47
48 #include "include/page.h"
49 #include "align.h"
50 #include "IP.h"
51 #include "const.h"
52 #include "dpdk_rte.h"
53 #include "DPDK.h"
54 #include "toeplitz.h"
55
56 #include "common/Cycles.h"
57 #include "common/dout.h"
58 #include "common/errno.h"
59 #include "include/assert.h"
60
61 #define dout_subsys ceph_subsys_dpdk
62 #undef dout_prefix
63 #define dout_prefix *_dout << "dpdk "
64
65
66 void* as_cookie(struct rte_pktmbuf_pool_private& p) {
67 return &p;
68 };
69
70 #ifndef MARKER
71 typedef void *MARKER[0]; /**< generic marker for a point in a structure */
72 #endif
73
74 /******************* Net device related constatns *****************************/
75 static constexpr uint16_t default_ring_size = 512;
76
77 //
78 // We need 2 times the ring size of buffers because of the way PMDs
79 // refill the ring.
80 //
81 static constexpr uint16_t mbufs_per_queue_rx = 2 * default_ring_size;
82 static constexpr uint16_t rx_gc_thresh = 64;
83
84 //
85 // No need to keep more descriptors in the air than can be sent in a single
86 // rte_eth_tx_burst() call.
87 //
88 static constexpr uint16_t mbufs_per_queue_tx = 2 * default_ring_size;
89
90 static constexpr uint16_t mbuf_cache_size = 512;
91 static constexpr uint16_t mbuf_overhead =
92 sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
93 //
94 // We'll allocate 2K data buffers for an inline case because this would require
95 // a single page per mbuf. If we used 4K data buffers here it would require 2
96 // pages for a single buffer (due to "mbuf_overhead") and this is a much more
97 // demanding memory constraint.
98 //
99 static constexpr size_t inline_mbuf_data_size = 2048;
100
101 //
102 // Size of the data buffer in the non-inline case.
103 //
104 // We may want to change (increase) this value in future, while the
105 // inline_mbuf_data_size value will unlikely change due to reasons described
106 // above.
107 //
108 static constexpr size_t mbuf_data_size = 4096;
109
110 // (INLINE_MBUF_DATA_SIZE(2K)*32 = 64K = Max TSO/LRO size) + 1 mbuf for headers
111 static constexpr uint8_t max_frags = 32 + 1;
112
113 //
114 // Intel's 40G NIC HW limit for a number of fragments in an xmit segment.
115 //
116 // See Chapter 8.4.1 "Transmit Packet in System Memory" of the xl710 devices
117 // spec. for more details.
118 //
119 static constexpr uint8_t i40e_max_xmit_segment_frags = 8;
120
121 //
122 // VMWare's virtual NIC limit for a number of fragments in an xmit segment.
123 //
124 // see drivers/net/vmxnet3/base/vmxnet3_defs.h VMXNET3_MAX_TXD_PER_PKT
125 //
126 static constexpr uint8_t vmxnet3_max_xmit_segment_frags = 16;
127
128 static constexpr uint16_t inline_mbuf_size = inline_mbuf_data_size + mbuf_overhead;
129
130 static size_t huge_page_size = 512 * CEPH_PAGE_SIZE;
131
132 uint32_t qp_mempool_obj_size()
133 {
134 uint32_t mp_size = 0;
135 struct rte_mempool_objsz mp_obj_sz = {};
136
137 //
138 // We will align each size to huge page size because DPDK allocates
139 // physically contiguous memory region for each pool object.
140 //
141
142 // Rx
143 mp_size += align_up(rte_mempool_calc_obj_size(mbuf_overhead, 0, &mp_obj_sz)+
144 sizeof(struct rte_pktmbuf_pool_private),
145 huge_page_size);
146
147 //Tx
148 std::memset(&mp_obj_sz, 0, sizeof(mp_obj_sz));
149 mp_size += align_up(rte_mempool_calc_obj_size(inline_mbuf_size, 0,
150 &mp_obj_sz)+
151 sizeof(struct rte_pktmbuf_pool_private),
152 huge_page_size);
153 return mp_size;
154 }
155
156 static constexpr const char* pktmbuf_pool_name = "dpdk_net_pktmbuf_pool";
157
158 /*
159 * When doing reads from the NIC queues, use this batch size
160 */
161 static constexpr uint8_t packet_read_size = 32;
162 /******************************************************************************/
163
164 int DPDKDevice::init_port_start()
165 {
166 assert(_port_idx < rte_eth_dev_count());
167
168 rte_eth_dev_info_get(_port_idx, &_dev_info);
169
170 //
171 // This is a workaround for a missing handling of a HW limitation in the
172 // DPDK i40e driver. This and all related to _is_i40e_device code should be
173 // removed once this handling is added.
174 //
175 if (std::string("rte_i40evf_pmd") == _dev_info.driver_name ||
176 std::string("rte_i40e_pmd") == _dev_info.driver_name) {
177 ldout(cct, 1) << __func__ << " Device is an Intel's 40G NIC. Enabling 8 fragments hack!" << dendl;
178 _is_i40e_device = true;
179 }
180
181 if (std::string("rte_vmxnet3_pmd") == _dev_info.driver_name) {
182 ldout(cct, 1) << __func__ << " Device is a VMWare Virtual NIC. Enabling 16 fragments hack!" << dendl;
183 _is_vmxnet3_device = true;
184 }
185
186 //
187 // Another workaround: this time for a lack of number of RSS bits.
188 // ixgbe PF NICs support up to 16 RSS queues.
189 // ixgbe VF NICs support up to 4 RSS queues.
190 // i40e PF NICs support up to 64 RSS queues.
191 // i40e VF NICs support up to 16 RSS queues.
192 //
193 if (std::string("rte_ixgbe_pmd") == _dev_info.driver_name) {
194 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)16);
195 } else if (std::string("rte_ixgbevf_pmd") == _dev_info.driver_name) {
196 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)4);
197 } else if (std::string("rte_i40e_pmd") == _dev_info.driver_name) {
198 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)64);
199 } else if (std::string("rte_i40evf_pmd") == _dev_info.driver_name) {
200 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)16);
201 }
202
203 // Clear txq_flags - we want to support all available offload features
204 // except for multi-mempool and refcnt'ing which we don't need
205 _dev_info.default_txconf.txq_flags =
206 ETH_TXQ_FLAGS_NOMULTMEMP | ETH_TXQ_FLAGS_NOREFCOUNT;
207
208 //
209 // Disable features that are not supported by port's HW
210 //
211 if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM)) {
212 _dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMUDP;
213 }
214
215 if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
216 _dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMTCP;
217 }
218
219 if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_SCTP_CKSUM)) {
220 _dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOXSUMSCTP;
221 }
222
223 if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
224 _dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
225 }
226
227 if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_VLAN_INSERT)) {
228 _dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOVLANOFFL;
229 }
230
231 if (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO)) {
232 _dev_info.default_txconf.txq_flags |= ETH_TXQ_FLAGS_NOMULTSEGS;
233 }
234
235 /* for port configuration all features are off by default */
236 rte_eth_conf port_conf = { 0 };
237
238 ldout(cct, 5) << __func__ << " Port " << int(_port_idx) << ": max_rx_queues "
239 << _dev_info.max_rx_queues << " max_tx_queues "
240 << _dev_info.max_tx_queues << dendl;
241
242 _num_queues = std::min({_num_queues, _dev_info.max_rx_queues, _dev_info.max_tx_queues});
243
244 ldout(cct, 5) << __func__ << " Port " << int(_port_idx) << ": using "
245 << _num_queues << " queues" << dendl;;
246
247 // Set RSS mode: enable RSS if seastar is configured with more than 1 CPU.
248 // Even if port has a single queue we still want the RSS feature to be
249 // available in order to make HW calculate RSS hash for us.
250 if (_num_queues > 1) {
251 if (_dev_info.hash_key_size == 40) {
252 _rss_key = default_rsskey_40bytes;
253 } else if (_dev_info.hash_key_size == 52) {
254 _rss_key = default_rsskey_52bytes;
255 } else if (_dev_info.hash_key_size != 0) {
256 // WTF?!!
257 rte_exit(EXIT_FAILURE,
258 "Port %d: We support only 40 or 52 bytes RSS hash keys, %d bytes key requested",
259 _port_idx, _dev_info.hash_key_size);
260 } else {
261 _rss_key = default_rsskey_40bytes;
262 _dev_info.hash_key_size = 40;
263 }
264
265 port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
266 port_conf.rx_adv_conf.rss_conf.rss_hf = ETH_RSS_PROTO_MASK;
267 if (_dev_info.hash_key_size) {
268 port_conf.rx_adv_conf.rss_conf.rss_key = const_cast<uint8_t *>(_rss_key.data());
269 port_conf.rx_adv_conf.rss_conf.rss_key_len = _dev_info.hash_key_size;
270 }
271 } else {
272 port_conf.rxmode.mq_mode = ETH_MQ_RX_NONE;
273 }
274
275 if (_num_queues > 1) {
276 if (_dev_info.reta_size) {
277 // RETA size should be a power of 2
278 assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0);
279
280 // Set the RSS table to the correct size
281 _redir_table.resize(_dev_info.reta_size);
282 _rss_table_bits = std::lround(std::log2(_dev_info.reta_size));
283 ldout(cct, 5) << __func__ << " Port " << int(_port_idx)
284 << ": RSS table size is " << _dev_info.reta_size << dendl;
285 } else {
286 // FIXME: same with sw_reta
287 _redir_table.resize(128);
288 _rss_table_bits = std::lround(std::log2(128));
289 }
290 } else {
291 _redir_table.push_back(0);
292 }
293
294 // Set Rx VLAN stripping
295 if (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
296 port_conf.rxmode.hw_vlan_strip = 1;
297 }
298
299 // Enable HW CRC stripping
300 port_conf.rxmode.hw_strip_crc = 1;
301
302 #ifdef RTE_ETHDEV_HAS_LRO_SUPPORT
303 // Enable LRO
304 if (_use_lro && (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO)) {
305 ldout(cct, 1) << __func__ << " LRO is on" << dendl;
306 port_conf.rxmode.enable_lro = 1;
307 _hw_features.rx_lro = true;
308 } else
309 #endif
310 ldout(cct, 1) << __func__ << " LRO is off" << dendl;
311
312 // Check that all CSUM features are either all set all together or not set
313 // all together. If this assumption breaks we need to rework the below logic
314 // by splitting the csum offload feature bit into separate bits for IPv4,
315 // TCP.
316 assert(((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
317 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) ||
318 (!(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
319 !(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)));
320
321 // Set Rx checksum checking
322 if ((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
323 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
324 ldout(cct, 1) << __func__ << " RX checksum offload supported" << dendl;
325 port_conf.rxmode.hw_ip_checksum = 1;
326 _hw_features.rx_csum_offload = 1;
327 }
328
329 if ((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
330 ldout(cct, 1) << __func__ << " TX ip checksum offload supported" << dendl;
331 _hw_features.tx_csum_ip_offload = 1;
332 }
333
334 // TSO is supported starting from DPDK v1.8
335 if (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
336 ldout(cct, 1) << __func__ << " TSO is supported" << dendl;
337 _hw_features.tx_tso = 1;
338 }
339
340 // Check that Tx TCP CSUM features are either all set all together
341 // or not set all together. If this assumption breaks we need to rework the
342 // below logic by splitting the csum offload feature bit into separate bits
343 // for TCP.
344 assert((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) ||
345 !(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM));
346
347 if (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM) {
348 ldout(cct, 1) << __func__ << " TX TCP checksum offload supported" << dendl;
349 _hw_features.tx_csum_l4_offload = 1;
350 }
351
352 int retval;
353
354 ldout(cct, 1) << __func__ << " Port " << int(_port_idx) << " init ... " << dendl;
355
356 /*
357 * Standard DPDK port initialisation - config port, then set up
358 * rx and tx rings.
359 */
360 if ((retval = rte_eth_dev_configure(_port_idx, _num_queues, _num_queues,
361 &port_conf)) != 0) {
362 lderr(cct) << __func__ << " failed to configure port " << (int)_port_idx
363 << " rx/tx queues " << _num_queues << " error " << cpp_strerror(retval) << dendl;
364 return retval;
365 }
366
367 //rte_eth_promiscuous_enable(port_num);
368 ldout(cct, 1) << __func__ << " done." << dendl;
369
370 return 0;
371 }
372
373 void DPDKDevice::set_hw_flow_control()
374 {
375 // Read the port's current/default flow control settings
376 struct rte_eth_fc_conf fc_conf;
377 auto ret = rte_eth_dev_flow_ctrl_get(_port_idx, &fc_conf);
378
379 if (ret == -ENOTSUP) {
380 ldout(cct, 1) << __func__ << " port " << int(_port_idx)
381 << ": not support to get hardware flow control settings: " << ret << dendl;
382 goto not_supported;
383 }
384
385 if (ret < 0) {
386 lderr(cct) << __func__ << " port " << int(_port_idx)
387 << ": failed to get hardware flow control settings: " << ret << dendl;
388 ceph_abort();
389 }
390
391 if (_enable_fc) {
392 fc_conf.mode = RTE_FC_FULL;
393 } else {
394 fc_conf.mode = RTE_FC_NONE;
395 }
396
397 ret = rte_eth_dev_flow_ctrl_set(_port_idx, &fc_conf);
398 if (ret == -ENOTSUP) {
399 ldout(cct, 1) << __func__ << " port " << int(_port_idx)
400 << ": not support to set hardware flow control settings: " << ret << dendl;
401 goto not_supported;
402 }
403
404 if (ret < 0) {
405 lderr(cct) << __func__ << " port " << int(_port_idx)
406 << ": failed to set hardware flow control settings: " << ret << dendl;
407 ceph_abort();
408 }
409
410 ldout(cct, 1) << __func__ << " port " << int(_port_idx) << ": HW FC " << _enable_fc << dendl;
411 return;
412
413 not_supported:
414 ldout(cct, 1) << __func__ << " port " << int(_port_idx) << ": changing HW FC settings is not supported" << dendl;
415 }
416
417 int DPDKDevice::init_port_fini()
418 {
419 // Changing FC requires HW reset, so set it before the port is initialized.
420 set_hw_flow_control();
421
422 if (rte_eth_dev_start(_port_idx) != 0) {
423 lderr(cct) << __func__ << " can't start port " << _port_idx << dendl;
424 return -1;
425 }
426
427 if (_num_queues > 1) {
428 if (!rte_eth_dev_filter_supported(_port_idx, RTE_ETH_FILTER_HASH)) {
429 ldout(cct, 5) << __func__ << " Port " << _port_idx << ": HASH FILTER configuration is supported" << dendl;
430
431 // Setup HW touse the TOEPLITZ hash function as an RSS hash function
432 struct rte_eth_hash_filter_info info = {};
433
434 info.info_type = RTE_ETH_HASH_FILTER_GLOBAL_CONFIG;
435 info.info.global_conf.hash_func = RTE_ETH_HASH_FUNCTION_TOEPLITZ;
436
437 if (rte_eth_dev_filter_ctrl(_port_idx, RTE_ETH_FILTER_HASH,
438 RTE_ETH_FILTER_SET, &info) < 0) {
439 lderr(cct) << __func__ << " cannot set hash function on a port " << _port_idx << dendl;
440 return -1;
441 }
442 }
443
444 set_rss_table();
445 }
446
447 // Wait for a link
448 if (check_port_link_status() < 0) {
449 lderr(cct) << __func__ << " port link up failed " << _port_idx << dendl;
450 return -1;
451 }
452
453 ldout(cct, 5) << __func__ << " created DPDK device" << dendl;
454 return 0;
455 }
456
457 void DPDKQueuePair::configure_proxies(const std::map<unsigned, float>& cpu_weights) {
458 assert(!cpu_weights.empty());
459 if (cpu_weights.size() == 1 && cpu_weights.begin()->first == _qid) {
460 // special case queue sending to self only, to avoid requiring a hash value
461 return;
462 }
463 register_packet_provider([this] {
464 Tub<Packet> p;
465 if (!_proxy_packetq.empty()) {
466 p = std::move(_proxy_packetq.front());
467 _proxy_packetq.pop_front();
468 }
469 return p;
470 });
471 build_sw_reta(cpu_weights);
472 }
473
474 void DPDKQueuePair::build_sw_reta(const std::map<unsigned, float>& cpu_weights) {
475 float total_weight = 0;
476 for (auto&& x : cpu_weights) {
477 total_weight += x.second;
478 }
479 float accum = 0;
480 unsigned idx = 0;
481 std::array<uint8_t, 128> reta;
482 for (auto&& entry : cpu_weights) {
483 auto cpu = entry.first;
484 auto weight = entry.second;
485 accum += weight;
486 while (idx < (accum / total_weight * reta.size() - 0.5)) {
487 reta[idx++] = cpu;
488 }
489 }
490 _sw_reta = reta;
491 }
492
493
494 bool DPDKQueuePair::init_rx_mbuf_pool()
495 {
496 std::string name = std::string(pktmbuf_pool_name) + std::to_string(_qid) + "_rx";
497
498 // reserve the memory for Rx buffers containers
499 _rx_free_pkts.reserve(mbufs_per_queue_rx);
500 _rx_free_bufs.reserve(mbufs_per_queue_rx);
501
502 _pktmbuf_pool_rx = rte_mempool_lookup(name.c_str());
503 if (!_pktmbuf_pool_rx) {
504 ldout(cct, 1) << __func__ << " Creating Rx mbuf pool '" << name.c_str()
505 << "' [" << mbufs_per_queue_rx << " mbufs] ..."<< dendl;
506
507 //
508 // Don't pass single-producer/single-consumer flags to mbuf create as it
509 // seems faster to use a cache instead.
510 //
511 struct rte_pktmbuf_pool_private roomsz = {};
512 roomsz.mbuf_data_room_size = mbuf_data_size + RTE_PKTMBUF_HEADROOM;
513 _pktmbuf_pool_rx = rte_mempool_create(
514 name.c_str(),
515 mbufs_per_queue_rx, mbuf_overhead,
516 mbuf_cache_size,
517 sizeof(struct rte_pktmbuf_pool_private),
518 rte_pktmbuf_pool_init, as_cookie(roomsz),
519 rte_pktmbuf_init, nullptr,
520 rte_socket_id(), 0);
521 if (!_pktmbuf_pool_rx) {
522 lderr(cct) << __func__ << " Failed to create mempool for rx" << dendl;
523 return false;
524 }
525
526 //
527 // 1) Pull all entries from the pool.
528 // 2) Bind data buffers to each of them.
529 // 3) Return them back to the pool.
530 //
531 for (int i = 0; i < mbufs_per_queue_rx; i++) {
532 rte_mbuf* m = rte_pktmbuf_alloc(_pktmbuf_pool_rx);
533 assert(m);
534 _rx_free_bufs.push_back(m);
535 }
536
537 for (int i = 0; i < cct->_conf->ms_dpdk_rx_buffer_count_per_core; i++) {
538 void* m = rte_malloc(NULL, mbuf_data_size, mbuf_data_size);
539 assert(m);
540 _alloc_bufs.push_back(m);
541 }
542
543 for (auto&& m : _rx_free_bufs) {
544 if (!init_noninline_rx_mbuf(m, mbuf_data_size, _alloc_bufs)) {
545 lderr(cct) << __func__ << " Failed to allocate data buffers for Rx ring. "
546 "Consider increasing the amount of memory." << dendl;
547 return false;
548 }
549 }
550
551 rte_mempool_put_bulk(_pktmbuf_pool_rx, (void**)_rx_free_bufs.data(),
552 _rx_free_bufs.size());
553
554 _rx_free_bufs.clear();
555 if (rte_eth_rx_queue_setup(_dev_port_idx, _qid, default_ring_size,
556 rte_eth_dev_socket_id(_dev_port_idx),
557 _dev->def_rx_conf(), _pktmbuf_pool_rx) < 0) {
558 lderr(cct) << __func__ << " cannot initialize rx queue" << dendl;
559 return false;
560 }
561 }
562
563 ldout(cct, 20) << __func__ << " count " << rte_mempool_count(_pktmbuf_pool_rx) << " free count " << rte_mempool_free_count(_pktmbuf_pool_rx) << dendl;
564 return _pktmbuf_pool_rx != nullptr;
565 }
566
567 int DPDKDevice::check_port_link_status()
568 {
569 int count = 0;
570
571 ldout(cct, 20) << __func__ << dendl;
572 const int sleep_time = 100 * 1000;
573 const int max_check_time = 90; /* 9s (90 * 100ms) in total */
574 while (true) {
575 struct rte_eth_link link;
576 memset(&link, 0, sizeof(link));
577 rte_eth_link_get_nowait(_port_idx, &link);
578
579 if (true) {
580 if (link.link_status) {
581 ldout(cct, 5) << __func__ << " done port "
582 << static_cast<unsigned>(_port_idx)
583 << " link Up - speed " << link.link_speed
584 << " Mbps - "
585 << ((link.link_duplex == ETH_LINK_FULL_DUPLEX) ? ("full-duplex") : ("half-duplex\n"))
586 << dendl;
587 break;
588 } else if (count++ < max_check_time) {
589 ldout(cct, 20) << __func__ << " not ready, continue to wait." << dendl;
590 usleep(sleep_time);
591 } else {
592 lderr(cct) << __func__ << "done port " << _port_idx << " link down" << dendl;
593 return -1;
594 }
595 }
596 }
597 return 0;
598 }
599
600 class C_handle_dev_stats : public EventCallback {
601 DPDKQueuePair *_qp;
602 public:
603 C_handle_dev_stats(DPDKQueuePair *qp): _qp(qp) { }
604 void do_request(int id) {
605 _qp->handle_stats();
606 }
607 };
608
609 DPDKQueuePair::DPDKQueuePair(CephContext *c, EventCenter *cen, DPDKDevice* dev, uint8_t qid)
610 : cct(c), _dev(dev), _dev_port_idx(dev->port_idx()), center(cen), _qid(qid),
611 _tx_poller(this), _rx_gc_poller(this), _tx_buf_factory(c, dev, qid),
612 _tx_gc_poller(this)
613 {
614 if (!init_rx_mbuf_pool()) {
615 lderr(cct) << __func__ << " cannot initialize mbuf pools" << dendl;
616 ceph_abort();
617 }
618
619 static_assert(offsetof(tx_buf, private_end) -
620 offsetof(tx_buf, private_start) <= RTE_PKTMBUF_HEADROOM,
621 "RTE_PKTMBUF_HEADROOM is less than DPDKQueuePair::tx_buf size! "
622 "Increase the headroom size in the DPDK configuration");
623 static_assert(offsetof(tx_buf, _mbuf) == 0,
624 "There is a pad at the beginning of the tx_buf before _mbuf "
625 "field!");
626 static_assert((inline_mbuf_data_size & (inline_mbuf_data_size - 1)) == 0,
627 "inline_mbuf_data_size has to be a power of two!");
628
629 std::string name(std::string("queue") + std::to_string(qid));
630 PerfCountersBuilder plb(cct, name, l_dpdk_qp_first, l_dpdk_qp_last);
631
632 plb.add_u64_counter(l_dpdk_qp_rx_packets, "dpdk_receive_packets", "DPDK received packets");
633 plb.add_u64_counter(l_dpdk_qp_tx_packets, "dpdk_send_packets", "DPDK sendd packets");
634 plb.add_u64_counter(l_dpdk_qp_rx_bad_checksum_errors, "dpdk_receive_bad_checksum_errors", "DPDK received bad checksum packets");
635 plb.add_u64_counter(l_dpdk_qp_rx_no_memory_errors, "dpdk_receive_no_memory_errors", "DPDK received no memory packets");
636 plb.add_u64_counter(l_dpdk_qp_rx_bytes, "dpdk_receive_bytes", "DPDK received bytes", NULL, 0, unit_t(BYTES));
637 plb.add_u64_counter(l_dpdk_qp_tx_bytes, "dpdk_send_bytes", "DPDK sendd bytes", NULL, 0, unit_t(BYTES));
638 plb.add_u64_counter(l_dpdk_qp_rx_last_bunch, "dpdk_receive_last_bunch", "DPDK last received bunch");
639 plb.add_u64_counter(l_dpdk_qp_tx_last_bunch, "dpdk_send_last_bunch", "DPDK last send bunch");
640 plb.add_u64_counter(l_dpdk_qp_rx_fragments, "dpdk_receive_fragments", "DPDK received total fragments");
641 plb.add_u64_counter(l_dpdk_qp_tx_fragments, "dpdk_send_fragments", "DPDK sendd total fragments");
642 plb.add_u64_counter(l_dpdk_qp_rx_copy_ops, "dpdk_receive_copy_ops", "DPDK received copy operations");
643 plb.add_u64_counter(l_dpdk_qp_tx_copy_ops, "dpdk_send_copy_ops", "DPDK sendd copy operations");
644 plb.add_u64_counter(l_dpdk_qp_rx_copy_bytes, "dpdk_receive_copy_bytes", "DPDK received copy bytes", NULL, 0, unit_t(BYTES));
645 plb.add_u64_counter(l_dpdk_qp_tx_copy_bytes, "dpdk_send_copy_bytes", "DPDK send copy bytes", NULL, 0, unit_t(BYTES));
646 plb.add_u64_counter(l_dpdk_qp_rx_linearize_ops, "dpdk_receive_linearize_ops", "DPDK received linearize operations");
647 plb.add_u64_counter(l_dpdk_qp_tx_linearize_ops, "dpdk_send_linearize_ops", "DPDK send linearize operations");
648 plb.add_u64_counter(l_dpdk_qp_tx_queue_length, "dpdk_send_queue_length", "DPDK send queue length");
649
650 perf_logger = plb.create_perf_counters();
651 cct->get_perfcounters_collection()->add(perf_logger);
652
653 if (!_qid)
654 device_stat_time_fd = center->create_time_event(1000*1000, new C_handle_dev_stats(this));
655 }
656
657 void DPDKQueuePair::handle_stats()
658 {
659 ldout(cct, 20) << __func__ << " started." << dendl;
660 rte_eth_stats rte_stats = {};
661 int rc = rte_eth_stats_get(_dev_port_idx, &rte_stats);
662
663 if (rc) {
664 ldout(cct, 0) << __func__ << " failed to get port statistics: " << cpp_strerror(rc) << dendl;
665 return ;
666 }
667
668 #if RTE_VERSION < RTE_VERSION_NUM(16,7,0,0)
669 _dev->perf_logger->set(l_dpdk_dev_rx_mcast, rte_stats.imcasts);
670 _dev->perf_logger->set(l_dpdk_dev_rx_badcrc_errors, rte_stats.ibadcrc);
671 #endif
672 _dev->perf_logger->set(l_dpdk_dev_rx_dropped_errors, rte_stats.imissed);
673 _dev->perf_logger->set(l_dpdk_dev_rx_nombuf_errors, rte_stats.rx_nombuf);
674
675 _dev->perf_logger->set(l_dpdk_dev_rx_total_errors, rte_stats.ierrors);
676 _dev->perf_logger->set(l_dpdk_dev_tx_total_errors, rte_stats.oerrors);
677 device_stat_time_fd = center->create_time_event(1000*1000, new C_handle_dev_stats(this));
678 }
679
680 bool DPDKQueuePair::poll_tx() {
681 bool nonloopback = !cct->_conf->ms_dpdk_debug_allow_loopback;
682 #ifdef CEPH_PERF_DEV
683 uint64_t start = Cycles::rdtsc();
684 #endif
685 uint32_t total_work = 0;
686 if (_tx_packetq.size() < 16) {
687 // refill send queue from upper layers
688 uint32_t work;
689 do {
690 work = 0;
691 for (auto&& pr : _pkt_providers) {
692 auto p = pr();
693 if (p) {
694 work++;
695 if (likely(nonloopback)) {
696 // ldout(cct, 0) << __func__ << " len: " << p->len() << " frags: " << p->nr_frags() << dendl;
697 _tx_packetq.push_back(std::move(*p));
698 } else {
699 auto th = p->get_header<eth_hdr>(0);
700 if (th->dst_mac == th->src_mac) {
701 _dev->l2receive(_qid, std::move(*p));
702 } else {
703 _tx_packetq.push_back(std::move(*p));
704 }
705 }
706 if (_tx_packetq.size() == 128) {
707 break;
708 }
709 }
710 }
711 total_work += work;
712 } while (work && total_work < 256 && _tx_packetq.size() < 128);
713 }
714 if (!_tx_packetq.empty()) {
715 uint64_t c = send(_tx_packetq);
716 perf_logger->inc(l_dpdk_qp_tx_packets, c);
717 perf_logger->set(l_dpdk_qp_tx_last_bunch, c);
718 #ifdef CEPH_PERF_DEV
719 tx_count += total_work;
720 tx_cycles += Cycles::rdtsc() - start;
721 #endif
722 return true;
723 }
724
725 return false;
726 }
727
728 inline Tub<Packet> DPDKQueuePair::from_mbuf_lro(rte_mbuf* m)
729 {
730 _frags.clear();
731 _bufs.clear();
732
733 for (; m != nullptr; m = m->next) {
734 char* data = rte_pktmbuf_mtod(m, char*);
735
736 _frags.emplace_back(fragment{data, rte_pktmbuf_data_len(m)});
737 _bufs.push_back(data);
738 }
739
740 auto del = std::bind(
741 [this](std::vector<char*> &bufs) {
742 for (auto&& b : bufs) { _alloc_bufs.push_back(b); }
743 }, std::move(_bufs));
744 return Packet(
745 _frags.begin(), _frags.end(), make_deleter(std::move(del)));
746 }
747
748 inline Tub<Packet> DPDKQueuePair::from_mbuf(rte_mbuf* m)
749 {
750 _rx_free_pkts.push_back(m);
751 _num_rx_free_segs += m->nb_segs;
752
753 if (!_dev->hw_features_ref().rx_lro || rte_pktmbuf_is_contiguous(m)) {
754 char* data = rte_pktmbuf_mtod(m, char*);
755
756 return Packet(fragment{data, rte_pktmbuf_data_len(m)},
757 make_deleter([this, data] { _alloc_bufs.push_back(data); }));
758 } else {
759 return from_mbuf_lro(m);
760 }
761 }
762
763 inline bool DPDKQueuePair::refill_one_cluster(rte_mbuf* head)
764 {
765 for (; head != nullptr; head = head->next) {
766 if (!refill_rx_mbuf(head, mbuf_data_size, _alloc_bufs)) {
767 //
768 // If we failed to allocate a new buffer - push the rest of the
769 // cluster back to the free_packets list for a later retry.
770 //
771 _rx_free_pkts.push_back(head);
772 return false;
773 }
774 _rx_free_bufs.push_back(head);
775 }
776
777 return true;
778 }
779
780 bool DPDKQueuePair::rx_gc(bool force)
781 {
782 if (_num_rx_free_segs >= rx_gc_thresh || force) {
783 ldout(cct, 10) << __func__ << " free segs " << _num_rx_free_segs
784 << " thresh " << rx_gc_thresh
785 << " free pkts " << _rx_free_pkts.size()
786 << " pool count " << rte_mempool_count(_pktmbuf_pool_rx)
787 << " free pool count " << rte_mempool_free_count(_pktmbuf_pool_rx)
788 << dendl;
789
790 while (!_rx_free_pkts.empty()) {
791 //
792 // Use back() + pop_back() semantics to avoid an extra
793 // _rx_free_pkts.clear() at the end of the function - clear() has a
794 // linear complexity.
795 //
796 auto m = _rx_free_pkts.back();
797 _rx_free_pkts.pop_back();
798
799 if (!refill_one_cluster(m)) {
800 ldout(cct, 1) << __func__ << " get new mbuf failed " << dendl;
801 break;
802 }
803 }
804
805 if (_rx_free_bufs.size()) {
806 rte_mempool_put_bulk(_pktmbuf_pool_rx,
807 (void **)_rx_free_bufs.data(),
808 _rx_free_bufs.size());
809
810 // TODO: assert() in a fast path! Remove me ASAP!
811 assert(_num_rx_free_segs >= _rx_free_bufs.size());
812
813 _num_rx_free_segs -= _rx_free_bufs.size();
814 _rx_free_bufs.clear();
815
816 // TODO: assert() in a fast path! Remove me ASAP!
817 assert((_rx_free_pkts.empty() && !_num_rx_free_segs) ||
818 (!_rx_free_pkts.empty() && _num_rx_free_segs));
819 }
820 }
821
822 return _num_rx_free_segs >= rx_gc_thresh;
823 }
824
825
826 void DPDKQueuePair::process_packets(
827 struct rte_mbuf **bufs, uint16_t count)
828 {
829 uint64_t nr_frags = 0, bytes = 0;
830
831 for (uint16_t i = 0; i < count; i++) {
832 struct rte_mbuf *m = bufs[i];
833 offload_info oi;
834
835 Tub<Packet> p = from_mbuf(m);
836
837 // Drop the packet if translation above has failed
838 if (!p) {
839 perf_logger->inc(l_dpdk_qp_rx_no_memory_errors);
840 continue;
841 }
842 // ldout(cct, 0) << __func__ << " len " << p->len() << " " << dendl;
843
844 nr_frags += m->nb_segs;
845 bytes += m->pkt_len;
846
847 // Set stipped VLAN value if available
848 if ((_dev->_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) &&
849 (m->ol_flags & PKT_RX_VLAN_PKT)) {
850 oi.vlan_tci = m->vlan_tci;
851 }
852
853 if (_dev->get_hw_features().rx_csum_offload) {
854 if (m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
855 // Packet with bad checksum, just drop it.
856 perf_logger->inc(l_dpdk_qp_rx_bad_checksum_errors);
857 continue;
858 }
859 // Note that when _hw_features.rx_csum_offload is on, the receive
860 // code for ip, tcp and udp will assume they don't need to check
861 // the checksum again, because we did this here.
862 }
863
864 p->set_offload_info(oi);
865 if (m->ol_flags & PKT_RX_RSS_HASH) {
866 p->set_rss_hash(m->hash.rss);
867 }
868
869 _dev->l2receive(_qid, std::move(*p));
870 }
871
872 perf_logger->inc(l_dpdk_qp_rx_packets, count);
873 perf_logger->set(l_dpdk_qp_rx_last_bunch, count);
874 perf_logger->inc(l_dpdk_qp_rx_fragments, nr_frags);
875 perf_logger->inc(l_dpdk_qp_rx_bytes, bytes);
876 }
877
878 bool DPDKQueuePair::poll_rx_once()
879 {
880 struct rte_mbuf *buf[packet_read_size];
881
882 /* read a port */
883 #ifdef CEPH_PERF_DEV
884 uint64_t start = Cycles::rdtsc();
885 #endif
886 uint16_t count = rte_eth_rx_burst(_dev_port_idx, _qid,
887 buf, packet_read_size);
888
889 /* Now process the NIC packets read */
890 if (likely(count > 0)) {
891 process_packets(buf, count);
892 #ifdef CEPH_PERF_DEV
893 rx_cycles = Cycles::rdtsc() - start;
894 rx_count += count;
895 #endif
896 }
897 #ifdef CEPH_PERF_DEV
898 else {
899 if (rx_count > 10000 && tx_count) {
900 ldout(cct, 0) << __func__ << " rx count=" << rx_count << " avg rx=" << Cycles::to_nanoseconds(rx_cycles)/rx_count << "ns "
901 << " tx count=" << tx_count << " avg tx=" << Cycles::to_nanoseconds(tx_cycles)/tx_count << "ns"
902 << dendl;
903 rx_count = rx_cycles = tx_count = tx_cycles = 0;
904 }
905 }
906 #endif
907
908 return count;
909 }
910
911 DPDKQueuePair::tx_buf_factory::tx_buf_factory(CephContext *c,
912 DPDKDevice *dev, uint8_t qid): cct(c)
913 {
914 std::string name = std::string(pktmbuf_pool_name) + std::to_string(qid) + "_tx";
915
916 _pool = rte_mempool_lookup(name.c_str());
917 if (!_pool) {
918 ldout(cct, 0) << __func__ << " Creating Tx mbuf pool '" << name.c_str()
919 << "' [" << mbufs_per_queue_tx << " mbufs] ..." << dendl;
920 //
921 // We are going to push the buffers from the mempool into
922 // the circular_buffer and then poll them from there anyway, so
923 // we prefer to make a mempool non-atomic in this case.
924 //
925 _pool = rte_mempool_create(name.c_str(),
926 mbufs_per_queue_tx, inline_mbuf_size,
927 mbuf_cache_size,
928 sizeof(struct rte_pktmbuf_pool_private),
929 rte_pktmbuf_pool_init, nullptr,
930 rte_pktmbuf_init, nullptr,
931 rte_socket_id(), 0);
932
933 if (!_pool) {
934 lderr(cct) << __func__ << " Failed to create mempool for Tx" << dendl;
935 ceph_abort();
936 }
937 if (rte_eth_tx_queue_setup(dev->port_idx(), qid, default_ring_size,
938 rte_eth_dev_socket_id(dev->port_idx()),
939 dev->def_tx_conf()) < 0) {
940 lderr(cct) << __func__ << " cannot initialize tx queue" << dendl;
941 ceph_abort();
942 }
943 }
944
945 //
946 // Fill the factory with the buffers from the mempool allocated
947 // above.
948 //
949 init_factory();
950 }
951
952 bool DPDKQueuePair::tx_buf::i40e_should_linearize(rte_mbuf *head)
953 {
954 bool is_tso = head->ol_flags & PKT_TX_TCP_SEG;
955
956 // For a non-TSO case: number of fragments should not exceed 8
957 if (!is_tso){
958 return head->nb_segs > i40e_max_xmit_segment_frags;
959 }
960
961 //
962 // For a TSO case each MSS window should not include more than 8
963 // fragments including headers.
964 //
965
966 // Calculate the number of frags containing headers.
967 //
968 // Note: we support neither VLAN nor tunneling thus headers size
969 // accounting is super simple.
970 //
971 size_t headers_size = head->l2_len + head->l3_len + head->l4_len;
972 unsigned hdr_frags = 0;
973 size_t cur_payload_len = 0;
974 rte_mbuf *cur_seg = head;
975
976 while (cur_seg && cur_payload_len < headers_size) {
977 cur_payload_len += cur_seg->data_len;
978 cur_seg = cur_seg->next;
979 hdr_frags++;
980 }
981
982 //
983 // Header fragments will be used for each TSO segment, thus the
984 // maximum number of data segments will be 8 minus the number of
985 // header fragments.
986 //
987 // It's unclear from the spec how the first TSO segment is treated
988 // if the last fragment with headers contains some data bytes:
989 // whether this fragment will be accounted as a single fragment or
990 // as two separate fragments. We prefer to play it safe and assume
991 // that this fragment will be accounted as two separate fragments.
992 //
993 size_t max_win_size = i40e_max_xmit_segment_frags - hdr_frags;
994
995 if (head->nb_segs <= max_win_size) {
996 return false;
997 }
998
999 // Get the data (without headers) part of the first data fragment
1000 size_t prev_frag_data = cur_payload_len - headers_size;
1001 auto mss = head->tso_segsz;
1002
1003 while (cur_seg) {
1004 unsigned frags_in_seg = 0;
1005 size_t cur_seg_size = 0;
1006
1007 if (prev_frag_data) {
1008 cur_seg_size = prev_frag_data;
1009 frags_in_seg++;
1010 prev_frag_data = 0;
1011 }
1012
1013 while (cur_seg_size < mss && cur_seg) {
1014 cur_seg_size += cur_seg->data_len;
1015 cur_seg = cur_seg->next;
1016 frags_in_seg++;
1017
1018 if (frags_in_seg > max_win_size) {
1019 return true;
1020 }
1021 }
1022
1023 if (cur_seg_size > mss) {
1024 prev_frag_data = cur_seg_size - mss;
1025 }
1026 }
1027
1028 return false;
1029 }
1030
1031 void DPDKQueuePair::tx_buf::set_cluster_offload_info(const Packet& p, const DPDKQueuePair& qp, rte_mbuf* head)
1032 {
1033 // Handle TCP checksum offload
1034 auto oi = p.offload_info();
1035 if (oi.needs_ip_csum) {
1036 head->ol_flags |= PKT_TX_IP_CKSUM;
1037 // TODO: Take a VLAN header into an account here
1038 head->l2_len = sizeof(struct ether_hdr);
1039 head->l3_len = oi.ip_hdr_len;
1040 }
1041 if (qp.port().get_hw_features().tx_csum_l4_offload) {
1042 if (oi.protocol == ip_protocol_num::tcp) {
1043 head->ol_flags |= PKT_TX_TCP_CKSUM;
1044 // TODO: Take a VLAN header into an account here
1045 head->l2_len = sizeof(struct ether_hdr);
1046 head->l3_len = oi.ip_hdr_len;
1047
1048 if (oi.tso_seg_size) {
1049 assert(oi.needs_ip_csum);
1050 head->ol_flags |= PKT_TX_TCP_SEG;
1051 head->l4_len = oi.tcp_hdr_len;
1052 head->tso_segsz = oi.tso_seg_size;
1053 }
1054 }
1055 }
1056 }
1057
1058 DPDKQueuePair::tx_buf* DPDKQueuePair::tx_buf::from_packet_zc(
1059 CephContext *cct, Packet&& p, DPDKQueuePair& qp)
1060 {
1061 // Too fragmented - linearize
1062 if (p.nr_frags() > max_frags) {
1063 p.linearize();
1064 qp.perf_logger->inc(l_dpdk_qp_tx_linearize_ops);
1065 }
1066
1067 build_mbuf_cluster:
1068 rte_mbuf *head = nullptr, *last_seg = nullptr;
1069 unsigned nsegs = 0;
1070
1071 //
1072 // Create a HEAD of the fragmented packet: check if frag0 has to be
1073 // copied and if yes - send it in a copy way
1074 //
1075 if (!check_frag0(p)) {
1076 if (!copy_one_frag(qp, p.frag(0), head, last_seg, nsegs)) {
1077 ldout(cct, 1) << __func__ << " no available mbuf for " << p.frag(0).size << dendl;
1078 return nullptr;
1079 }
1080 } else if (!translate_one_frag(qp, p.frag(0), head, last_seg, nsegs)) {
1081 ldout(cct, 1) << __func__ << " no available mbuf for " << p.frag(0).size << dendl;
1082 return nullptr;
1083 }
1084
1085 unsigned total_nsegs = nsegs;
1086
1087 for (unsigned i = 1; i < p.nr_frags(); i++) {
1088 rte_mbuf *h = nullptr, *new_last_seg = nullptr;
1089 if (!translate_one_frag(qp, p.frag(i), h, new_last_seg, nsegs)) {
1090 ldout(cct, 1) << __func__ << " no available mbuf for " << p.frag(i).size << dendl;
1091 me(head)->recycle();
1092 return nullptr;
1093 }
1094
1095 total_nsegs += nsegs;
1096
1097 // Attach a new buffers' chain to the packet chain
1098 last_seg->next = h;
1099 last_seg = new_last_seg;
1100 }
1101
1102 // Update the HEAD buffer with the packet info
1103 head->pkt_len = p.len();
1104 head->nb_segs = total_nsegs;
1105
1106 set_cluster_offload_info(p, qp, head);
1107
1108 //
1109 // If a packet hasn't been linearized already and the resulting
1110 // cluster requires the linearisation due to HW limitation:
1111 //
1112 // - Recycle the cluster.
1113 // - Linearize the packet.
1114 // - Build the cluster once again
1115 //
1116 if (head->nb_segs > max_frags ||
1117 (p.nr_frags() > 1 && qp.port().is_i40e_device() && i40e_should_linearize(head)) ||
1118 (p.nr_frags() > vmxnet3_max_xmit_segment_frags && qp.port().is_vmxnet3_device())) {
1119 me(head)->recycle();
1120 p.linearize();
1121 qp.perf_logger->inc(l_dpdk_qp_tx_linearize_ops);
1122
1123 goto build_mbuf_cluster;
1124 }
1125
1126 me(last_seg)->set_packet(std::move(p));
1127
1128 return me(head);
1129 }
1130
1131 void DPDKQueuePair::tx_buf::copy_packet_to_cluster(const Packet& p, rte_mbuf* head)
1132 {
1133 rte_mbuf* cur_seg = head;
1134 size_t cur_seg_offset = 0;
1135 unsigned cur_frag_idx = 0;
1136 size_t cur_frag_offset = 0;
1137
1138 while (true) {
1139 size_t to_copy = std::min(p.frag(cur_frag_idx).size - cur_frag_offset,
1140 inline_mbuf_data_size - cur_seg_offset);
1141
1142 memcpy(rte_pktmbuf_mtod_offset(cur_seg, void*, cur_seg_offset),
1143 p.frag(cur_frag_idx).base + cur_frag_offset, to_copy);
1144
1145 cur_frag_offset += to_copy;
1146 cur_seg_offset += to_copy;
1147
1148 if (cur_frag_offset >= p.frag(cur_frag_idx).size) {
1149 ++cur_frag_idx;
1150 if (cur_frag_idx >= p.nr_frags()) {
1151 //
1152 // We are done - set the data size of the last segment
1153 // of the cluster.
1154 //
1155 cur_seg->data_len = cur_seg_offset;
1156 break;
1157 }
1158
1159 cur_frag_offset = 0;
1160 }
1161
1162 if (cur_seg_offset >= inline_mbuf_data_size) {
1163 cur_seg->data_len = inline_mbuf_data_size;
1164 cur_seg = cur_seg->next;
1165 cur_seg_offset = 0;
1166
1167 // FIXME: assert in a fast-path - remove!!!
1168 assert(cur_seg);
1169 }
1170 }
1171 }
1172
1173 DPDKQueuePair::tx_buf* DPDKQueuePair::tx_buf::from_packet_copy(Packet&& p, DPDKQueuePair& qp)
1174 {
1175 // sanity
1176 if (!p.len()) {
1177 return nullptr;
1178 }
1179
1180 /*
1181 * Here we are going to use the fact that the inline data size is a
1182 * power of two.
1183 *
1184 * We will first try to allocate the cluster and only if we are
1185 * successful - we will go and copy the data.
1186 */
1187 auto aligned_len = align_up((size_t)p.len(), inline_mbuf_data_size);
1188 unsigned nsegs = aligned_len / inline_mbuf_data_size;
1189 rte_mbuf *head = nullptr, *last_seg = nullptr;
1190
1191 tx_buf* buf = qp.get_tx_buf();
1192 if (!buf) {
1193 return nullptr;
1194 }
1195
1196 head = buf->rte_mbuf_p();
1197 last_seg = head;
1198 for (unsigned i = 1; i < nsegs; i++) {
1199 buf = qp.get_tx_buf();
1200 if (!buf) {
1201 me(head)->recycle();
1202 return nullptr;
1203 }
1204
1205 last_seg->next = buf->rte_mbuf_p();
1206 last_seg = last_seg->next;
1207 }
1208
1209 //
1210 // If we've got here means that we have succeeded already!
1211 // We only need to copy the data and set the head buffer with the
1212 // relevant info.
1213 //
1214 head->pkt_len = p.len();
1215 head->nb_segs = nsegs;
1216
1217 copy_packet_to_cluster(p, head);
1218 set_cluster_offload_info(p, qp, head);
1219
1220 return me(head);
1221 }
1222
1223 size_t DPDKQueuePair::tx_buf::copy_one_data_buf(
1224 DPDKQueuePair& qp, rte_mbuf*& m, char* data, size_t buf_len)
1225 {
1226 tx_buf* buf = qp.get_tx_buf();
1227 if (!buf) {
1228 return 0;
1229 }
1230
1231 size_t len = std::min(buf_len, inline_mbuf_data_size);
1232
1233 m = buf->rte_mbuf_p();
1234
1235 // mbuf_put()
1236 m->data_len = len;
1237 m->pkt_len = len;
1238
1239 qp.perf_logger->inc(l_dpdk_qp_tx_copy_ops);
1240 qp.perf_logger->inc(l_dpdk_qp_tx_copy_bytes, len);
1241
1242 memcpy(rte_pktmbuf_mtod(m, void*), data, len);
1243
1244 return len;
1245 }
1246
1247 void DPDKDevice::set_rss_table()
1248 {
1249 // always fill our local indirection table.
1250 unsigned i = 0;
1251 for (auto& r : _redir_table) {
1252 r = i++ % _num_queues;
1253 }
1254
1255 if (_dev_info.reta_size == 0)
1256 return;
1257
1258 int reta_conf_size = std::max(1, _dev_info.reta_size / RTE_RETA_GROUP_SIZE);
1259 rte_eth_rss_reta_entry64 reta_conf[reta_conf_size];
1260
1261 // Configure the HW indirection table
1262 i = 0;
1263 for (auto& x : reta_conf) {
1264 x.mask = ~0ULL;
1265 for (auto& r: x.reta) {
1266 r = i++ % _num_queues;
1267 }
1268 }
1269
1270 if (rte_eth_dev_rss_reta_update(_port_idx, reta_conf, _dev_info.reta_size)) {
1271 rte_exit(EXIT_FAILURE, "Port %d: Failed to update an RSS indirection table", _port_idx);
1272 }
1273 }
1274
1275 /******************************** Interface functions *************************/
1276
1277 std::unique_ptr<DPDKDevice> create_dpdk_net_device(
1278 CephContext *cct,
1279 unsigned cores,
1280 uint8_t port_idx,
1281 bool use_lro,
1282 bool enable_fc)
1283 {
1284 // Check that we have at least one DPDK-able port
1285 if (rte_eth_dev_count() == 0) {
1286 rte_exit(EXIT_FAILURE, "No Ethernet ports - bye\n");
1287 } else {
1288 ldout(cct, 10) << __func__ << " ports number: " << int(rte_eth_dev_count()) << dendl;
1289 }
1290
1291 return std::unique_ptr<DPDKDevice>(
1292 new DPDKDevice(cct, port_idx, cores, use_lro, enable_fc));
1293 }