]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/net/dpdk.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / seastar / src / net / dpdk.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21#ifdef SEASTAR_HAVE_DPDK
22
23#include <cinttypes>
24#include <seastar/core/posix.hh>
25#include "core/vla.hh"
11fdf7f2 26#include <seastar/core/reactor.hh>
f67539c2 27#include <seastar/net/virtio-interface.hh>
11fdf7f2
TL
28#include <seastar/core/stream.hh>
29#include <seastar/core/circular_buffer.hh>
30#include <seastar/core/align.hh>
31#include <seastar/core/sstring.hh>
32#include <seastar/core/memory.hh>
33#include <seastar/core/metrics.hh>
34#include <seastar/util/function_input_iterator.hh>
35#include <seastar/util/transform_iterator.hh>
36#include <atomic>
37#include <vector>
38#include <queue>
39#include <seastar/util/std-compat.hh>
40#include <boost/preprocessor.hpp>
41#include <seastar/net/ip.hh>
42#include <seastar/net/const.hh>
43#include <seastar/core/dpdk_rte.hh>
44#include <seastar/net/dpdk.hh>
45#include <seastar/net/toeplitz.hh>
20effc67 46#include <seastar/net/native-stack.hh>
11fdf7f2
TL
47
48#include <getopt.h>
49#include <malloc.h>
50
51#include <cinttypes>
52#include <rte_config.h>
53#include <rte_common.h>
54#include <rte_eal.h>
55#include <rte_pci.h>
56#include <rte_ethdev.h>
57#include <rte_cycles.h>
58#include <rte_memzone.h>
9f95a23c 59#include <rte_vfio.h>
11fdf7f2
TL
60
61#if RTE_VERSION <= RTE_VERSION_NUM(2,0,0,16)
62
63static
64inline
65char*
66rte_mbuf_to_baddr(rte_mbuf* mbuf) {
67 return reinterpret_cast<char*>(RTE_MBUF_TO_BADDR(mbuf));
68}
69
70void* as_cookie(struct rte_pktmbuf_pool_private& p) {
71 return reinterpret_cast<void*>(uint64_t(p.mbuf_data_room_size));
72};
73
74#else
75
76void* as_cookie(struct rte_pktmbuf_pool_private& p) {
77 return &p;
78};
79
80#endif
81
82#ifndef MARKER
83typedef void *MARKER[0]; /**< generic marker for a point in a structure */
84#endif
85
9f95a23c
TL
86// Calculate maximum amount of memory required to store given number of objects
87static size_t
88get_mempool_xmem_size(uint32_t elt_num, size_t total_elt_sz, uint32_t pg_shift)
89{
90 size_t obj_per_page, pg_num, pg_sz;
91
92 if (total_elt_sz == 0) {
93 return 0;
94 }
95
96 if (pg_shift == 0) {
97 return total_elt_sz * elt_num;
98 }
99
100 pg_sz = (size_t)1 << pg_shift;
101 obj_per_page = pg_sz / total_elt_sz;
102 if (obj_per_page == 0) {
103 return RTE_ALIGN_CEIL(total_elt_sz, pg_sz) * elt_num;
104 }
105
106 pg_num = (elt_num + obj_per_page - 1) / obj_per_page;
107 return pg_num << pg_shift;
108}
109
11fdf7f2
TL
110using namespace seastar::net;
111
112namespace seastar {
113
114namespace dpdk {
115
116/******************* Net device related constatns *****************************/
117static constexpr uint16_t default_ring_size = 512;
118
119//
120// We need 2 times the ring size of buffers because of the way PMDs
121// refill the ring.
122//
123static constexpr uint16_t mbufs_per_queue_rx = 2 * default_ring_size;
124static constexpr uint16_t rx_gc_thresh = 64;
125
126//
127// No need to keep more descriptors in the air than can be sent in a single
128// rte_eth_tx_burst() call.
129//
130static constexpr uint16_t mbufs_per_queue_tx = 2 * default_ring_size;
131
132static constexpr uint16_t mbuf_cache_size = 512;
133static constexpr uint16_t mbuf_overhead =
134 sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
135//
136// We'll allocate 2K data buffers for an inline case because this would require
137// a single page per mbuf. If we used 4K data buffers here it would require 2
138// pages for a single buffer (due to "mbuf_overhead") and this is a much more
139// demanding memory constraint.
140//
141static constexpr size_t inline_mbuf_data_size = 2048;
142
143//
144// Size of the data buffer in the non-inline case.
145//
146// We may want to change (increase) this value in future, while the
147// inline_mbuf_data_size value will unlikely change due to reasons described
148// above.
149//
150static constexpr size_t mbuf_data_size = 2048;
151
152// (INLINE_MBUF_DATA_SIZE(2K)*32 = 64K = Max TSO/LRO size) + 1 mbuf for headers
153static constexpr uint8_t max_frags = 32 + 1;
154
155//
156// Intel's 40G NIC HW limit for a number of fragments in an xmit segment.
157//
158// See Chapter 8.4.1 "Transmit Packet in System Memory" of the xl710 devices
159// spec. for more details.
160//
161static constexpr uint8_t i40e_max_xmit_segment_frags = 8;
162
163//
164// VMWare's virtual NIC limit for a number of fragments in an xmit segment.
165//
166// see drivers/net/vmxnet3/base/vmxnet3_defs.h VMXNET3_MAX_TXD_PER_PKT
167//
168static constexpr uint8_t vmxnet3_max_xmit_segment_frags = 16;
169
170static constexpr uint16_t inline_mbuf_size =
171 inline_mbuf_data_size + mbuf_overhead;
172
173uint32_t qp_mempool_obj_size(bool hugetlbfs_membackend)
174{
175 uint32_t mp_size = 0;
176 struct rte_mempool_objsz mp_obj_sz = {};
177
178 //
179 // We will align each size to huge page size because DPDK allocates
180 // physically contiguous memory region for each pool object.
181 //
182
183 // Rx
184 if (hugetlbfs_membackend) {
185 mp_size +=
186 align_up(rte_mempool_calc_obj_size(mbuf_overhead, 0, &mp_obj_sz)+
187 sizeof(struct rte_pktmbuf_pool_private),
188 memory::huge_page_size);
189 } else {
190 mp_size +=
191 align_up(rte_mempool_calc_obj_size(inline_mbuf_size, 0, &mp_obj_sz)+
192 sizeof(struct rte_pktmbuf_pool_private),
193 memory::huge_page_size);
194 }
195 //Tx
196 std::memset(&mp_obj_sz, 0, sizeof(mp_obj_sz));
197 mp_size += align_up(rte_mempool_calc_obj_size(inline_mbuf_size, 0,
198 &mp_obj_sz)+
199 sizeof(struct rte_pktmbuf_pool_private),
200 memory::huge_page_size);
201 return mp_size;
202}
203
204static constexpr const char* pktmbuf_pool_name = "dpdk_pktmbuf_pool";
205
206/*
207 * When doing reads from the NIC queues, use this batch size
208 */
209static constexpr uint8_t packet_read_size = 32;
210/******************************************************************************/
211
212struct port_stats {
213 port_stats() : rx{}, tx{} {}
214
215 struct {
216 struct {
217 uint64_t mcast; // number of received multicast packets
218 uint64_t pause_xon; // number of received PAUSE XON frames
219 uint64_t pause_xoff; // number of received PAUSE XOFF frames
220 } good;
221
222 struct {
223 uint64_t dropped; // missed packets (e.g. full FIFO)
224 uint64_t crc; // packets with CRC error
225 uint64_t len; // packets with a bad length
226 uint64_t total; // total number of erroneous received packets
227 } bad;
228 } rx;
229
230 struct {
231 struct {
232 uint64_t pause_xon; // number of sent PAUSE XON frames
233 uint64_t pause_xoff; // number of sent PAUSE XOFF frames
234 } good;
235
236 struct {
237 uint64_t total; // total number of failed transmitted packets
238 } bad;
239 } tx;
240};
241
242#define XSTATS_ID_LIST \
243 (rx_multicast_packets) \
244 (rx_xon_packets) \
245 (rx_xoff_packets) \
246 (rx_crc_errors) \
247 (rx_length_errors) \
248 (rx_undersize_errors) \
249 (rx_oversize_errors) \
250 (tx_xon_packets) \
251 (tx_xoff_packets)
252
253class dpdk_xstats {
254public:
9f95a23c 255 dpdk_xstats(uint16_t port_id)
11fdf7f2
TL
256 : _port_id(port_id)
257 {
258 }
259
260 ~dpdk_xstats()
261 {
262 if (_xstats)
1e59de90 263 delete[] _xstats;
11fdf7f2 264 if (_xstat_names)
1e59de90 265 delete[] _xstat_names;
11fdf7f2
TL
266 }
267
268 enum xstat_id {
269 BOOST_PP_SEQ_ENUM(XSTATS_ID_LIST)
270 };
271
272
273 void start() {
274 _len = rte_eth_xstats_get_names(_port_id, NULL, 0);
275 _xstats = new rte_eth_xstat[_len];
276 _xstat_names = new rte_eth_xstat_name[_len];
277 update_xstats();
278 update_xstat_names();
279 update_offsets();
280 }
281
282 void update_xstats() {
283 auto len = rte_eth_xstats_get(_port_id, _xstats, _len);
284 assert(len == _len);
285 }
286
287 uint64_t get_value(const xstat_id id) {
288 auto off = _offsets[static_cast<int>(id)];
289 if (off == -1) {
290 return 0;
291 }
292 return _xstats[off].value;
293 }
294
295private:
9f95a23c 296 uint16_t _port_id;
11fdf7f2
TL
297 int _len;
298 struct rte_eth_xstat *_xstats = nullptr;
299 struct rte_eth_xstat_name *_xstat_names = nullptr;
300 int _offsets[BOOST_PP_SEQ_SIZE(XSTATS_ID_LIST)];
301
302 static const sstring id_to_str(const xstat_id id) {
303#define ENUM_TO_STR(r, data, elem) \
304 if (id == elem) \
305 return BOOST_PP_STRINGIZE(elem);
306
307 BOOST_PP_SEQ_FOR_EACH(ENUM_TO_STR, ~, XSTATS_ID_LIST)
308 return "";
309 }
310
311 int get_offset_by_name(const xstat_id id, const int len) {
312 for (int i = 0; i < len; i++) {
313 if (id_to_str(id) == _xstat_names[i].name)
314 return i;
315 }
316 return -1;
317 }
318
319 void update_xstat_names() {
320 auto len = rte_eth_xstats_get_names(_port_id, _xstat_names, _len);
321 assert(len == _len);
322 }
323
324 void update_offsets() {
325#define FIND_OFFSET(r, data, elem) \
326 _offsets[static_cast<int>(elem)] = \
327 get_offset_by_name(elem, _len);
328
329 BOOST_PP_SEQ_FOR_EACH(FIND_OFFSET, ~, XSTATS_ID_LIST)
330 }
331};
332
333class dpdk_device : public device {
9f95a23c 334 uint16_t _port_idx;
11fdf7f2
TL
335 uint16_t _num_queues;
336 net::hw_features _hw_features;
9f95a23c 337 uint16_t _queues_ready = 0;
11fdf7f2
TL
338 unsigned _home_cpu;
339 bool _use_lro;
340 bool _enable_fc;
341 std::vector<uint8_t> _redir_table;
342 rss_key_type _rss_key;
343 port_stats _stats;
344 timer<> _stats_collector;
345 const std::string _stats_plugin_name;
346 const std::string _stats_plugin_inst;
347 seastar::metrics::metric_groups _metrics;
348 bool _is_i40e_device = false;
349 bool _is_vmxnet3_device = false;
350 dpdk_xstats _xstats;
351
352public:
353 rte_eth_dev_info _dev_info = {};
354 promise<> _link_ready_promise;
355
356private:
357 /**
358 * Port initialization consists of 3 main stages:
359 * 1) General port initialization which ends with a call to
360 * rte_eth_dev_configure() where we request the needed number of Rx and
361 * Tx queues.
362 * 2) Individual queues initialization. This is done in the constructor of
363 * dpdk_qp class. In particular the memory pools for queues are allocated
364 * in this stage.
365 * 3) The final stage of the initialization which starts with the call of
366 * rte_eth_dev_start() after which the port becomes fully functional. We
367 * will also wait for a link to get up in this stage.
368 */
369
370
371 /**
372 * First stage of the port initialization.
373 *
374 * @return 0 in case of success and an appropriate error code in case of an
375 * error.
376 */
377 int init_port_start();
378
379 /**
380 * The final stage of a port initialization.
381 * @note Must be called *after* all queues from stage (2) have been
382 * initialized.
383 */
384 void init_port_fini();
385
386 /**
387 * Check the link status of out port in up to 9s, and print them finally.
388 */
389 void check_port_link_status();
390
391 /**
392 * Configures the HW Flow Control
393 */
394 void set_hw_flow_control();
395
396public:
9f95a23c 397 dpdk_device(uint16_t port_idx, uint16_t num_queues, bool use_lro,
11fdf7f2
TL
398 bool enable_fc)
399 : _port_idx(port_idx)
400 , _num_queues(num_queues)
f67539c2 401 , _home_cpu(this_shard_id())
11fdf7f2
TL
402 , _use_lro(use_lro)
403 , _enable_fc(enable_fc)
404 , _stats_plugin_name("network")
405 , _stats_plugin_inst(std::string("port") + std::to_string(_port_idx))
406 , _xstats(port_idx)
407 {
408
409 /* now initialise the port we will use */
410 int ret = init_port_start();
411 if (ret != 0) {
412 rte_exit(EXIT_FAILURE, "Cannot initialise port %u\n", _port_idx);
413 }
414
11fdf7f2
TL
415 // Register port statistics pollers
416 namespace sm = seastar::metrics;
417 _metrics.add_group(_stats_plugin_name, {
418 // Rx Good
1e59de90 419 sm::make_counter("rx_multicast", _stats.rx.good.mcast,
11fdf7f2
TL
420 sm::description("Counts a number of received multicast packets."), {sm::shard_label(_stats_plugin_inst)}),
421 // Rx Errors
1e59de90 422 sm::make_counter("rx_crc_errors", _stats.rx.bad.crc,
11fdf7f2
TL
423 sm::description("Counts a number of received packets with a bad CRC value. "
424 "A non-zero value of this metric usually indicates a HW problem, e.g. a bad cable."), {sm::shard_label(_stats_plugin_inst)}),
425
1e59de90 426 sm::make_counter("rx_dropped", _stats.rx.bad.dropped,
11fdf7f2
TL
427 sm::description("Counts a number of dropped received packets. "
428 "A non-zero value of this counter indicated the overflow of ingress HW buffers. "
429 "This usually happens because of a rate of a sender on the other side of the link is higher than we can process as a receiver."), {sm::shard_label(_stats_plugin_inst)}),
430
1e59de90 431 sm::make_counter("rx_bad_length_errors", _stats.rx.bad.len,
11fdf7f2
TL
432 sm::description("Counts a number of received packets with a bad length value. "
433 "A non-zero value of this metric usually indicates a HW issue: e.g. bad cable."), {sm::shard_label(_stats_plugin_inst)}),
434 // Coupled counters:
435 // Good
1e59de90 436 sm::make_counter("rx_pause_xon", _stats.rx.good.pause_xon,
11fdf7f2
TL
437 sm::description("Counts a number of received PAUSE XON frames (PAUSE frame with a quanta of zero). "
438 "When PAUSE XON frame is received our port may resume sending L2 frames. "
439 "PAUSE XON frames are sent to resume sending that was previously paused with a PAUSE XOFF frame. If ingress "
440 "buffer falls below the low watermark threshold before the timeout configured in the original PAUSE XOFF frame the receiver may decide to send PAUSE XON frame. "
441 "A non-zero value of this metric may mean that our sender is bursty and that the spikes overwhelm the receiver on the other side of the link."), {sm::shard_label(_stats_plugin_inst)}),
442
1e59de90 443 sm::make_counter("tx_pause_xon", _stats.tx.good.pause_xon,
11fdf7f2
TL
444 sm::description("Counts a number of sent PAUSE XON frames (L2 flow control frames). "
445 "A non-zero value of this metric indicates that our ingress path doesn't keep up with the rate of a sender on the other side of the link. "
446 "Note that if a sender port respects PAUSE frames this will prevent it from sending from ALL its egress queues because L2 flow control is defined "
447 "on a per-link resolution."), {sm::shard_label(_stats_plugin_inst)}),
448
1e59de90 449 sm::make_counter("rx_pause_xoff", _stats.rx.good.pause_xoff,
11fdf7f2
TL
450 sm::description("Counts a number of received PAUSE XOFF frames. "
451 "A non-zero value of this metric indicates that our egress overwhelms the receiver on the other side of the link and it has to send PAUSE frames to make us stop sending. "
452 "Note that if our port respects PAUSE frames a reception of a PAUSE XOFF frame will cause ALL egress queues of this port to stop sending."), {sm::shard_label(_stats_plugin_inst)}),
453
1e59de90 454 sm::make_counter("tx_pause_xoff", _stats.tx.good.pause_xoff,
11fdf7f2
TL
455 sm::description("Counts a number of sent PAUSE XOFF frames. "
456 "A non-zero value of this metric indicates that our ingress path (SW and HW) doesn't keep up with the rate of a sender on the other side of the link and as a result "
457 "our ingress HW buffers overflow."), {sm::shard_label(_stats_plugin_inst)}),
458 // Errors
1e59de90 459 sm::make_counter("rx_errors", _stats.rx.bad.total,
11fdf7f2
TL
460 sm::description("Counts the total number of ingress errors: CRC errors, bad length errors, etc."), {sm::shard_label(_stats_plugin_inst)}),
461
1e59de90 462 sm::make_counter("tx_errors", _stats.tx.bad.total,
11fdf7f2
TL
463 sm::description("Counts a total number of egress errors. A non-zero value usually indicated a problem with a HW or a SW driver."), {sm::shard_label(_stats_plugin_inst)}),
464 });
465 }
466
467 ~dpdk_device() {
468 _stats_collector.cancel();
469 }
470
471 ethernet_address hw_address() override {
472 struct ether_addr mac;
473 rte_eth_macaddr_get(_port_idx, &mac);
474
475 return mac.addr_bytes;
476 }
477 net::hw_features hw_features() override {
478 return _hw_features;
479 }
480
481 net::hw_features& hw_features_ref() { return _hw_features; }
482
483 const rte_eth_rxconf* def_rx_conf() const {
484 return &_dev_info.default_rxconf;
485 }
486
487 const rte_eth_txconf* def_tx_conf() const {
488 return &_dev_info.default_txconf;
489 }
490
491 /**
492 * Set the RSS table in the device and store it in the internal vector.
493 */
494 void set_rss_table();
495
496 virtual uint16_t hw_queues_count() override { return _num_queues; }
f67539c2 497 virtual future<> link_ready() override { return _link_ready_promise.get_future(); }
20effc67 498 virtual std::unique_ptr<qp> init_local_queue(const program_options::option_group& opts, uint16_t qid) override;
11fdf7f2
TL
499 virtual unsigned hash2qid(uint32_t hash) override {
500 assert(_redir_table.size());
501 return _redir_table[hash & (_redir_table.size() - 1)];
502 }
9f95a23c 503 uint16_t port_idx() { return _port_idx; }
11fdf7f2
TL
504 bool is_i40e_device() const {
505 return _is_i40e_device;
506 }
507 bool is_vmxnet3_device() const {
508 return _is_vmxnet3_device;
509 }
510
511 virtual rss_key_type rss_key() const override { return _rss_key; }
512};
513
514template <bool HugetlbfsMemBackend>
515class dpdk_qp : public net::qp {
516 class tx_buf_factory;
517
518 class tx_buf {
519 friend class dpdk_qp;
520 public:
521 static tx_buf* me(rte_mbuf* mbuf) {
522 return reinterpret_cast<tx_buf*>(mbuf);
523 }
524
525 private:
526 /**
527 * Checks if the original packet of a given cluster should be linearized
528 * due to HW limitations.
529 *
530 * @param head head of a cluster to check
531 *
532 * @return TRUE if a packet should be linearized.
533 */
534 static bool i40e_should_linearize(rte_mbuf *head) {
535 bool is_tso = head->ol_flags & PKT_TX_TCP_SEG;
536
537 // For a non-TSO case: number of fragments should not exceed 8
538 if (!is_tso){
539 return head->nb_segs > i40e_max_xmit_segment_frags;
540 }
541
542 //
543 // For a TSO case each MSS window should not include more than 8
544 // fragments including headers.
545 //
546
547 // Calculate the number of frags containing headers.
548 //
549 // Note: we support neither VLAN nor tunneling thus headers size
550 // accounting is super simple.
551 //
552 size_t headers_size = head->l2_len + head->l3_len + head->l4_len;
553 unsigned hdr_frags = 0;
554 size_t cur_payload_len = 0;
555 rte_mbuf *cur_seg = head;
556
557 while (cur_seg && cur_payload_len < headers_size) {
558 cur_payload_len += cur_seg->data_len;
559 cur_seg = cur_seg->next;
560 hdr_frags++;
561 }
562
563 //
564 // Header fragments will be used for each TSO segment, thus the
565 // maximum number of data segments will be 8 minus the number of
566 // header fragments.
567 //
568 // It's unclear from the spec how the first TSO segment is treated
569 // if the last fragment with headers contains some data bytes:
570 // whether this fragment will be accounted as a single fragment or
571 // as two separate fragments. We prefer to play it safe and assume
572 // that this fragment will be accounted as two separate fragments.
573 //
574 size_t max_win_size = i40e_max_xmit_segment_frags - hdr_frags;
575
576 if (head->nb_segs <= max_win_size) {
577 return false;
578 }
579
580 // Get the data (without headers) part of the first data fragment
581 size_t prev_frag_data = cur_payload_len - headers_size;
582 auto mss = head->tso_segsz;
583
584 while (cur_seg) {
585 unsigned frags_in_seg = 0;
586 size_t cur_seg_size = 0;
587
588 if (prev_frag_data) {
589 cur_seg_size = prev_frag_data;
590 frags_in_seg++;
591 prev_frag_data = 0;
592 }
593
594 while (cur_seg_size < mss && cur_seg) {
595 cur_seg_size += cur_seg->data_len;
596 cur_seg = cur_seg->next;
597 frags_in_seg++;
598
599 if (frags_in_seg > max_win_size) {
600 return true;
601 }
602 }
603
604 if (cur_seg_size > mss) {
605 prev_frag_data = cur_seg_size - mss;
606 }
607 }
608
609 return false;
610 }
611
612 /**
613 * Sets the offload info in the head buffer of an rte_mbufs cluster.
614 *
615 * @param p an original packet the cluster is built for
616 * @param qp QP handle
617 * @param head a head of an rte_mbufs cluster
618 */
619 static void set_cluster_offload_info(const packet& p, const dpdk_qp& qp, rte_mbuf* head) {
620 // Handle TCP checksum offload
621 auto oi = p.offload_info();
622 if (oi.needs_ip_csum) {
623 head->ol_flags |= PKT_TX_IP_CKSUM;
624 // TODO: Take a VLAN header into an account here
625 head->l2_len = sizeof(struct ether_hdr);
626 head->l3_len = oi.ip_hdr_len;
627 }
628 if (qp.port().hw_features().tx_csum_l4_offload) {
629 if (oi.protocol == ip_protocol_num::tcp) {
630 head->ol_flags |= PKT_TX_TCP_CKSUM;
631 // TODO: Take a VLAN header into an account here
632 head->l2_len = sizeof(struct ether_hdr);
633 head->l3_len = oi.ip_hdr_len;
634
635 if (oi.tso_seg_size) {
636 assert(oi.needs_ip_csum);
637 head->ol_flags |= PKT_TX_TCP_SEG;
638 head->l4_len = oi.tcp_hdr_len;
639 head->tso_segsz = oi.tso_seg_size;
640 }
641 } else if (oi.protocol == ip_protocol_num::udp) {
642 head->ol_flags |= PKT_TX_UDP_CKSUM;
643 // TODO: Take a VLAN header into an account here
644 head->l2_len = sizeof(struct ether_hdr);
645 head->l3_len = oi.ip_hdr_len;
646 }
647 }
648 }
649
650 /**
651 * Creates a tx_buf cluster representing a given packet in a "zero-copy"
652 * way.
653 *
654 * @param p packet to translate
655 * @param qp dpdk_qp handle
656 *
657 * @return the HEAD tx_buf of the cluster or nullptr in case of a
658 * failure
659 */
660 static tx_buf* from_packet_zc(packet&& p, dpdk_qp& qp) {
661
662 // Too fragmented - linearize
663 if (p.nr_frags() > max_frags) {
664 p.linearize();
665 ++qp._stats.tx.linearized;
666 }
667
668build_mbuf_cluster:
669 rte_mbuf *head = nullptr, *last_seg = nullptr;
670 unsigned nsegs = 0;
671
9f95a23c
TL
672 // Create a HEAD of the fragmented packet
673 if (!translate_one_frag(qp, p.frag(0), head, last_seg, nsegs)) {
11fdf7f2
TL
674 return nullptr;
675 }
676
677 unsigned total_nsegs = nsegs;
678
679 for (unsigned i = 1; i < p.nr_frags(); i++) {
680 rte_mbuf *h = nullptr, *new_last_seg = nullptr;
681 if (!translate_one_frag(qp, p.frag(i), h, new_last_seg, nsegs)) {
682 me(head)->recycle();
683 return nullptr;
684 }
685
686 total_nsegs += nsegs;
687
688 // Attach a new buffers' chain to the packet chain
689 last_seg->next = h;
690 last_seg = new_last_seg;
691 }
692
693 // Update the HEAD buffer with the packet info
694 head->pkt_len = p.len();
695 head->nb_segs = total_nsegs;
696
697 set_cluster_offload_info(p, qp, head);
698
699 //
700 // If a packet hasn't been linearized already and the resulting
701 // cluster requires the linearisation due to HW limitation:
702 //
703 // - Recycle the cluster.
704 // - Linearize the packet.
705 // - Build the cluster once again
706 //
707 if (head->nb_segs > max_frags ||
708 (p.nr_frags() > 1 && qp.port().is_i40e_device() && i40e_should_linearize(head)) ||
709 (p.nr_frags() > vmxnet3_max_xmit_segment_frags && qp.port().is_vmxnet3_device())) {
710 me(head)->recycle();
711 p.linearize();
712 ++qp._stats.tx.linearized;
713
714 goto build_mbuf_cluster;
715 }
716
717 me(last_seg)->set_packet(std::move(p));
718
719 return me(head);
720 }
721
722 /**
723 * Copy the contents of the "packet" into the given cluster of
724 * rte_mbuf's.
725 *
726 * @note Size of the cluster has to be big enough to accommodate all the
727 * contents of the given packet.
728 *
729 * @param p packet to copy
730 * @param head head of the rte_mbuf's cluster
731 */
732 static void copy_packet_to_cluster(const packet& p, rte_mbuf* head) {
733 rte_mbuf* cur_seg = head;
734 size_t cur_seg_offset = 0;
735 unsigned cur_frag_idx = 0;
736 size_t cur_frag_offset = 0;
737
738 while (true) {
739 size_t to_copy = std::min(p.frag(cur_frag_idx).size - cur_frag_offset,
740 inline_mbuf_data_size - cur_seg_offset);
741
742 memcpy(rte_pktmbuf_mtod_offset(cur_seg, void*, cur_seg_offset),
743 p.frag(cur_frag_idx).base + cur_frag_offset, to_copy);
744
745 cur_frag_offset += to_copy;
746 cur_seg_offset += to_copy;
747
748 if (cur_frag_offset >= p.frag(cur_frag_idx).size) {
749 ++cur_frag_idx;
750 if (cur_frag_idx >= p.nr_frags()) {
751 //
752 // We are done - set the data size of the last segment
753 // of the cluster.
754 //
755 cur_seg->data_len = cur_seg_offset;
756 break;
757 }
758
759 cur_frag_offset = 0;
760 }
761
762 if (cur_seg_offset >= inline_mbuf_data_size) {
763 cur_seg->data_len = inline_mbuf_data_size;
764 cur_seg = cur_seg->next;
765 cur_seg_offset = 0;
766
767 // FIXME: assert in a fast-path - remove!!!
768 assert(cur_seg);
769 }
770 }
771 }
772
773 /**
774 * Creates a tx_buf cluster representing a given packet in a "copy" way.
775 *
776 * @param p packet to translate
777 * @param qp dpdk_qp handle
778 *
779 * @return the HEAD tx_buf of the cluster or nullptr in case of a
780 * failure
781 */
782 static tx_buf* from_packet_copy(packet&& p, dpdk_qp& qp) {
783 // sanity
784 if (!p.len()) {
785 return nullptr;
786 }
787
788 /*
789 * Here we are going to use the fact that the inline data size is a
790 * power of two.
791 *
792 * We will first try to allocate the cluster and only if we are
793 * successful - we will go and copy the data.
794 */
795 auto aligned_len = align_up((size_t)p.len(), inline_mbuf_data_size);
796 unsigned nsegs = aligned_len / inline_mbuf_data_size;
797 rte_mbuf *head = nullptr, *last_seg = nullptr;
798
799 tx_buf* buf = qp.get_tx_buf();
800 if (!buf) {
801 return nullptr;
802 }
803
804 head = buf->rte_mbuf_p();
805 last_seg = head;
806 for (unsigned i = 1; i < nsegs; i++) {
807 buf = qp.get_tx_buf();
808 if (!buf) {
809 me(head)->recycle();
810 return nullptr;
811 }
812
813 last_seg->next = buf->rte_mbuf_p();
814 last_seg = last_seg->next;
815 }
816
817 //
818 // If we've got here means that we have succeeded already!
819 // We only need to copy the data and set the head buffer with the
820 // relevant info.
821 //
822 head->pkt_len = p.len();
823 head->nb_segs = nsegs;
824
825 copy_packet_to_cluster(p, head);
826 set_cluster_offload_info(p, qp, head);
827
828 return me(head);
829 }
830
831 /**
832 * Zero-copy handling of a single net::fragment.
833 *
834 * @param do_one_buf Functor responsible for a single rte_mbuf
835 * handling
836 * @param qp dpdk_qp handle (in)
837 * @param frag Fragment to copy (in)
838 * @param head Head of the cluster (out)
839 * @param last_seg Last segment of the cluster (out)
840 * @param nsegs Number of segments in the cluster (out)
841 *
842 * @return TRUE in case of success
843 */
844 template <class DoOneBufFunc>
845 static bool do_one_frag(DoOneBufFunc do_one_buf, dpdk_qp& qp,
846 fragment& frag, rte_mbuf*& head,
847 rte_mbuf*& last_seg, unsigned& nsegs) {
848 size_t len, left_to_set = frag.size;
849 char* base = frag.base;
850
851 rte_mbuf* m;
852
853 // TODO: assert() in a fast path! Remove me ASAP!
854 assert(frag.size);
855
856 // Create a HEAD of mbufs' cluster and set the first bytes into it
857 len = do_one_buf(qp, head, base, left_to_set);
858 if (!len) {
859 return false;
860 }
861
862 left_to_set -= len;
863 base += len;
864 nsegs = 1;
865
866 //
867 // Set the rest of the data into the new mbufs and chain them to
868 // the cluster.
869 //
870 rte_mbuf* prev_seg = head;
871 while (left_to_set) {
872 len = do_one_buf(qp, m, base, left_to_set);
873 if (!len) {
874 me(head)->recycle();
875 return false;
876 }
877
878 left_to_set -= len;
879 base += len;
880 nsegs++;
881
882 prev_seg->next = m;
883 prev_seg = m;
884 }
885
886 // Return the last mbuf in the cluster
887 last_seg = prev_seg;
888
889 return true;
890 }
891
892 /**
893 * Zero-copy handling of a single net::fragment.
894 *
895 * @param qp dpdk_qp handle (in)
896 * @param frag Fragment to copy (in)
897 * @param head Head of the cluster (out)
898 * @param last_seg Last segment of the cluster (out)
899 * @param nsegs Number of segments in the cluster (out)
900 *
901 * @return TRUE in case of success
902 */
903 static bool translate_one_frag(dpdk_qp& qp, fragment& frag,
904 rte_mbuf*& head, rte_mbuf*& last_seg,
905 unsigned& nsegs) {
906 return do_one_frag(set_one_data_buf, qp, frag, head,
907 last_seg, nsegs);
908 }
909
910 /**
911 * Copies one net::fragment into the cluster of rte_mbuf's.
912 *
913 * @param qp dpdk_qp handle (in)
914 * @param frag Fragment to copy (in)
915 * @param head Head of the cluster (out)
916 * @param last_seg Last segment of the cluster (out)
917 * @param nsegs Number of segments in the cluster (out)
918 *
919 * We return the "last_seg" to avoid traversing the cluster in order to get
920 * it.
921 *
922 * @return TRUE in case of success
923 */
924 static bool copy_one_frag(dpdk_qp& qp, fragment& frag,
925 rte_mbuf*& head, rte_mbuf*& last_seg,
926 unsigned& nsegs) {
927 return do_one_frag(copy_one_data_buf, qp, frag, head,
928 last_seg, nsegs);
929 }
930
931 /**
932 * Allocates a single rte_mbuf and sets it to point to a given data
933 * buffer.
934 *
935 * @param qp dpdk_qp handle (in)
936 * @param m New allocated rte_mbuf (out)
937 * @param va virtual address of a data buffer (in)
938 * @param buf_len length of the data to copy (in)
939 *
940 * @return The actual number of bytes that has been set in the mbuf
941 */
942 static size_t set_one_data_buf(
943 dpdk_qp& qp, rte_mbuf*& m, char* va, size_t buf_len) {
944 static constexpr size_t max_frag_len = 15 * 1024; // 15K
945
11fdf7f2
TL
946 //
947 // Currently we break a buffer on a 15K boundary because 82599
948 // devices have a 15.5K limitation on a maximum single fragment
949 // size.
950 //
9f95a23c 951 rte_iova_t iova = rte_mem_virt2iova(va);
11fdf7f2 952
9f95a23c 953 if (iova == RTE_BAD_IOVA) {
11fdf7f2
TL
954 return copy_one_data_buf(qp, m, va, buf_len);
955 }
956
957 tx_buf* buf = qp.get_tx_buf();
958 if (!buf) {
959 return 0;
960 }
961
9f95a23c 962 size_t len = std::min(buf_len, max_frag_len);
11fdf7f2 963
9f95a23c 964 buf->set_zc_info(va, iova, len);
11fdf7f2
TL
965 m = buf->rte_mbuf_p();
966
967 return len;
968 }
969
970 /**
971 * Allocates a single rte_mbuf and copies a given data into it.
972 *
973 * @param qp dpdk_qp handle (in)
974 * @param m New allocated rte_mbuf (out)
975 * @param data Data to copy from (in)
976 * @param buf_len length of the data to copy (in)
977 *
978 * @return The actual number of bytes that has been copied
979 */
980 static size_t copy_one_data_buf(
981 dpdk_qp& qp, rte_mbuf*& m, char* data, size_t buf_len)
982 {
983 tx_buf* buf = qp.get_tx_buf();
984 if (!buf) {
985 return 0;
986 }
987
988 size_t len = std::min(buf_len, inline_mbuf_data_size);
989
990 m = buf->rte_mbuf_p();
991
992 // mbuf_put()
993 m->data_len = len;
994 m->pkt_len = len;
995
996 qp._stats.tx.good.update_copy_stats(1, len);
997
998 memcpy(rte_pktmbuf_mtod(m, void*), data, len);
999
1000 return len;
1001 }
1002
11fdf7f2
TL
1003 public:
1004 tx_buf(tx_buf_factory& fc) : _fc(fc) {
1005
9f95a23c 1006 _buf_iova = _mbuf.buf_iova;
11fdf7f2
TL
1007 _data_off = _mbuf.data_off;
1008 }
1009
1010 rte_mbuf* rte_mbuf_p() { return &_mbuf; }
1011
9f95a23c 1012 void set_zc_info(void* va, rte_iova_t iova, size_t len) {
11fdf7f2
TL
1013 // mbuf_put()
1014 _mbuf.data_len = len;
1015 _mbuf.pkt_len = len;
1016
1017 // Set the mbuf to point to our data
1018 _mbuf.buf_addr = va;
9f95a23c 1019 _mbuf.buf_iova = iova;
11fdf7f2
TL
1020 _mbuf.data_off = 0;
1021 _is_zc = true;
1022 }
1023
1024 void reset_zc() {
11fdf7f2
TL
1025 //
1026 // If this mbuf was the last in a cluster and contains an
1027 // original packet object then call the destructor of the
1028 // original packet object.
1029 //
1030 if (_p) {
1031 //
f67539c2 1032 // Reset the std::optional. This in particular is going
11fdf7f2
TL
1033 // to call the "packet"'s destructor and reset the
1034 // "optional" state to "nonengaged".
1035 //
f67539c2 1036 _p = std::nullopt;
11fdf7f2
TL
1037
1038 } else if (!_is_zc) {
1039 return;
1040 }
1041
1042 // Restore the rte_mbuf fields we trashed in set_zc_info()
9f95a23c 1043 _mbuf.buf_iova = _buf_iova;
11fdf7f2
TL
1044 _mbuf.buf_addr = rte_mbuf_to_baddr(&_mbuf);
1045 _mbuf.data_off = _data_off;
1046
1047 _is_zc = false;
1048 }
1049
1050 void recycle() {
1051 struct rte_mbuf *m = &_mbuf, *m_next;
1052
1053 while (m != nullptr) {
1054 m_next = m->next;
1055 rte_pktmbuf_reset(m);
1056 _fc.put(me(m));
1057 m = m_next;
1058 }
1059 }
1060
1061 void set_packet(packet&& p) {
1062 _p = std::move(p);
1063 }
1064
1065 private:
1066 struct rte_mbuf _mbuf;
1067 MARKER private_start;
f67539c2 1068 std::optional<packet> _p;
9f95a23c 1069 rte_iova_t _buf_iova;
11fdf7f2
TL
1070 uint16_t _data_off;
1071 // TRUE if underlying mbuf has been used in the zero-copy flow
1072 bool _is_zc = false;
1073 // buffers' factory the buffer came from
1074 tx_buf_factory& _fc;
1075 MARKER private_end;
1076 };
1077
1078 class tx_buf_factory {
1079 //
1080 // Number of buffers to free in each GC iteration:
1081 // We want the buffers to be allocated from the mempool as many as
1082 // possible.
1083 //
1084 // On the other hand if there is no Tx for some time we want the
1085 // completions to be eventually handled. Thus we choose the smallest
1086 // possible packets count number here.
1087 //
1088 static constexpr int gc_count = 1;
1089 public:
9f95a23c 1090 tx_buf_factory(uint16_t qid) {
11fdf7f2
TL
1091 using namespace memory;
1092
1093 sstring name = sstring(pktmbuf_pool_name) + to_sstring(qid) + "_tx";
1094 printf("Creating Tx mbuf pool '%s' [%u mbufs] ...\n",
1095 name.c_str(), mbufs_per_queue_tx);
9f95a23c 1096
11fdf7f2 1097 if (HugetlbfsMemBackend) {
9f95a23c 1098 size_t xmem_size;
11fdf7f2
TL
1099
1100 _xmem.reset(dpdk_qp::alloc_mempool_xmem(mbufs_per_queue_tx,
1101 inline_mbuf_size,
9f95a23c 1102 xmem_size));
11fdf7f2
TL
1103 if (!_xmem.get()) {
1104 printf("Can't allocate a memory for Tx buffers\n");
1105 exit(1);
1106 }
1107
1108 //
1109 // We are going to push the buffers from the mempool into
1110 // the circular_buffer and then poll them from there anyway, so
1111 // we prefer to make a mempool non-atomic in this case.
1112 //
1113 _pool =
9f95a23c
TL
1114 rte_mempool_create_empty(name.c_str(),
1115 mbufs_per_queue_tx,
1116 inline_mbuf_size,
1117 mbuf_cache_size,
1118 sizeof(struct rte_pktmbuf_pool_private),
1119 rte_socket_id(), 0);
1120 if (_pool) {
1121 rte_pktmbuf_pool_init(_pool, nullptr);
1122
1123 if (rte_mempool_populate_virt(_pool, (char*)(_xmem.get()),
1124 xmem_size, page_size,
1125 nullptr, nullptr) <= 0) {
1126 printf("Failed to populate mempool for Tx\n");
1127 exit(1);
1128 }
1129
1130 rte_mempool_obj_iter(_pool, rte_pktmbuf_init, nullptr);
1131 }
11fdf7f2
TL
1132
1133 } else {
1134 _pool =
9f95a23c 1135 rte_mempool_create(name.c_str(),
11fdf7f2
TL
1136 mbufs_per_queue_tx, inline_mbuf_size,
1137 mbuf_cache_size,
1138 sizeof(struct rte_pktmbuf_pool_private),
1139 rte_pktmbuf_pool_init, nullptr,
1140 rte_pktmbuf_init, nullptr,
1141 rte_socket_id(), 0);
1142 }
1143
1144 if (!_pool) {
1145 printf("Failed to create mempool for Tx\n");
1146 exit(1);
1147 }
1148
1149 //
1150 // Fill the factory with the buffers from the mempool allocated
1151 // above.
1152 //
1153 init_factory();
1154 }
1155
1156 /**
1157 * @note Should not be called if there are no free tx_buf's
1158 *
1159 * @return a free tx_buf object
1160 */
1161 tx_buf* get() {
1162 // Take completed from the HW first
1163 tx_buf *pkt = get_one_completed();
1164 if (pkt) {
1165 if (HugetlbfsMemBackend) {
1166 pkt->reset_zc();
1167 }
1168
1169 return pkt;
1170 }
1171
1172 //
1173 // If there are no completed at the moment - take from the
1174 // factory's cache.
1175 //
1176 if (_ring.empty()) {
1177 return nullptr;
1178 }
1179
1180 pkt = _ring.back();
1181 _ring.pop_back();
1182
1183 return pkt;
1184 }
1185
1186 void put(tx_buf* buf) {
1187 if (HugetlbfsMemBackend) {
1188 buf->reset_zc();
1189 }
1190 _ring.push_back(buf);
1191 }
1192
1193 bool gc() {
1194 for (int cnt = 0; cnt < gc_count; ++cnt) {
1195 auto tx_buf_p = get_one_completed();
1196 if (!tx_buf_p) {
1197 return false;
1198 }
1199
1200 put(tx_buf_p);
1201 }
1202
1203 return true;
1204 }
1205 private:
1206 /**
1207 * Fill the mbufs circular buffer: after this the _pool will become
1208 * empty. We will use it to catch the completed buffers:
1209 *
1210 * - Underlying PMD drivers will "free" the mbufs once they are
1211 * completed.
1212 * - We will poll the _pktmbuf_pool_tx till it's empty and release
1213 * all the buffers from the freed mbufs.
1214 */
1215 void init_factory() {
1216 while (rte_mbuf* mbuf = rte_pktmbuf_alloc(_pool)) {
1217 _ring.push_back(new(tx_buf::me(mbuf)) tx_buf{*this});
1218 }
1219 }
1220
1221 /**
1222 * PMD puts the completed buffers back into the mempool they have
1223 * originally come from.
1224 *
1225 * @note rte_pktmbuf_alloc() resets the mbuf so there is no need to call
1226 * rte_pktmbuf_reset() here again.
1227 *
1228 * @return a single tx_buf that has been completed by HW.
1229 */
1230 tx_buf* get_one_completed() {
1231 return tx_buf::me(rte_pktmbuf_alloc(_pool));
1232 }
1233
1234 private:
1235 std::vector<tx_buf*> _ring;
1236 rte_mempool* _pool = nullptr;
1237 std::unique_ptr<void, free_deleter> _xmem;
1238 };
1239
1240public:
9f95a23c 1241 explicit dpdk_qp(dpdk_device* dev, uint16_t qid,
11fdf7f2
TL
1242 const std::string stats_plugin_name);
1243
1244 virtual void rx_start() override;
1245 virtual future<> send(packet p) override {
1246 abort();
1247 }
1248 virtual ~dpdk_qp() {}
1249
1250 virtual uint32_t send(circular_buffer<packet>& pb) override {
1251 if (HugetlbfsMemBackend) {
1252 // Zero-copy send
1253 return _send(pb, [&] (packet&& p) {
1254 return tx_buf::from_packet_zc(std::move(p), *this);
1255 });
1256 } else {
1257 // "Copy"-send
1258 return _send(pb, [&](packet&& p) {
1259 return tx_buf::from_packet_copy(std::move(p), *this);
1260 });
1261 }
1262 }
1263
1264 dpdk_device& port() const { return *_dev; }
1265 tx_buf* get_tx_buf() { return _tx_buf_factory.get(); }
1266private:
1267
1268 template <class Func>
1269 uint32_t _send(circular_buffer<packet>& pb, Func packet_to_tx_buf_p) {
1270 if (_tx_burst.size() == 0) {
1271 for (auto&& p : pb) {
1272 // TODO: assert() in a fast path! Remove me ASAP!
1273 assert(p.len());
1274
1275 tx_buf* buf = packet_to_tx_buf_p(std::move(p));
1276 if (!buf) {
1277 break;
1278 }
1279
1280 _tx_burst.push_back(buf->rte_mbuf_p());
1281 }
1282 }
1283
1284 uint16_t sent = rte_eth_tx_burst(_dev->port_idx(), _qid,
1285 _tx_burst.data() + _tx_burst_idx,
1286 _tx_burst.size() - _tx_burst_idx);
1287
1288 uint64_t nr_frags = 0, bytes = 0;
1289
1290 for (int i = 0; i < sent; i++) {
1291 rte_mbuf* m = _tx_burst[_tx_burst_idx + i];
1292 bytes += m->pkt_len;
1293 nr_frags += m->nb_segs;
1294 pb.pop_front();
1295 }
1296
1297 _stats.tx.good.update_frags_stats(nr_frags, bytes);
1298
1299 _tx_burst_idx += sent;
1300
1301 if (_tx_burst_idx == _tx_burst.size()) {
1302 _tx_burst_idx = 0;
1303 _tx_burst.clear();
1304 }
1305
1306 return sent;
1307 }
1308
1309 /**
1310 * Allocate a new data buffer and set the mbuf to point to it.
1311 *
1312 * Do some DPDK hacks to work on PMD: it assumes that the buf_addr
1313 * points to the private data of RTE_PKTMBUF_HEADROOM before the actual
1314 * data buffer.
1315 *
1316 * @param m mbuf to update
1317 */
1318 static bool refill_rx_mbuf(rte_mbuf* m, size_t size = mbuf_data_size) {
1319 char* data;
1320
1321 if (posix_memalign((void**)&data, size, size)) {
1322 return false;
1323 }
1324
9f95a23c 1325 rte_iova_t iova = rte_mem_virt2iova(data);
11fdf7f2
TL
1326
1327 //
1328 // Set the mbuf to point to our data.
1329 //
1330 // Do some DPDK hacks to work on PMD: it assumes that the buf_addr
1331 // points to the private data of RTE_PKTMBUF_HEADROOM before the
1332 // actual data buffer.
1333 //
1334 m->buf_addr = data - RTE_PKTMBUF_HEADROOM;
9f95a23c 1335 m->buf_iova = iova - RTE_PKTMBUF_HEADROOM;
11fdf7f2
TL
1336 return true;
1337 }
1338
1339 static bool init_noninline_rx_mbuf(rte_mbuf* m,
1340 size_t size = mbuf_data_size) {
1341 if (!refill_rx_mbuf(m, size)) {
1342 return false;
1343 }
1344 // The below fields stay constant during the execution.
1345 m->buf_len = size + RTE_PKTMBUF_HEADROOM;
1346 m->data_off = RTE_PKTMBUF_HEADROOM;
1347 return true;
1348 }
1349
1350 bool init_rx_mbuf_pool();
9f95a23c 1351 bool map_dma();
11fdf7f2
TL
1352 bool rx_gc();
1353 bool refill_one_cluster(rte_mbuf* head);
1354
1355 /**
1356 * Allocates a memory chunk to accommodate the given number of buffers of
1357 * the given size and fills a vector with underlying physical pages.
1358 *
1359 * The chunk is going to be used as an external memory buffer of the DPDK
9f95a23c 1360 * memory pool.
11fdf7f2 1361 *
9f95a23c 1362 * The chunk size if calculated using get_mempool_xmem_size() function.
11fdf7f2 1363 *
9f95a23c
TL
1364 * @param num_bufs Number of buffers (in)
1365 * @param buf_sz Size of each buffer (in)
1366 * @param xmem_size Size of allocated memory chunk (out)
11fdf7f2
TL
1367 *
1368 * @return a virtual address of the allocated memory chunk or nullptr in
1369 * case of a failure.
1370 */
1371 static void* alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz,
9f95a23c 1372 size_t& xmem_size);
11fdf7f2
TL
1373
1374 /**
1375 * Polls for a burst of incoming packets. This function will not block and
1376 * will immediately return after processing all available packets.
1377 *
1378 */
1379 bool poll_rx_once();
1380
1381 /**
1382 * Translates an rte_mbuf's into net::packet and feeds them to _rx_stream.
1383 *
1384 * @param bufs An array of received rte_mbuf's
1385 * @param count Number of buffers in the bufs[]
1386 */
1387 void process_packets(struct rte_mbuf **bufs, uint16_t count);
1388
1389 /**
1390 * Translate rte_mbuf into the "packet".
1391 * @param m mbuf to translate
1392 *
1393 * @return a "optional" object representing the newly received data if in an
1394 * "engaged" state or an error if in a "disengaged" state.
1395 */
f67539c2 1396 std::optional<packet> from_mbuf(rte_mbuf* m);
11fdf7f2
TL
1397
1398 /**
1399 * Transform an LRO rte_mbuf cluster into the "packet" object.
1400 * @param m HEAD of the mbufs' cluster to transform
1401 *
1402 * @return a "optional" object representing the newly received LRO packet if
1403 * in an "engaged" state or an error if in a "disengaged" state.
1404 */
f67539c2 1405 std::optional<packet> from_mbuf_lro(rte_mbuf* m);
11fdf7f2
TL
1406
1407private:
1408 dpdk_device* _dev;
9f95a23c 1409 uint16_t _qid;
11fdf7f2
TL
1410 rte_mempool *_pktmbuf_pool_rx;
1411 std::vector<rte_mbuf*> _rx_free_pkts;
1412 std::vector<rte_mbuf*> _rx_free_bufs;
1413 std::vector<fragment> _frags;
1414 std::vector<char*> _bufs;
1415 size_t _num_rx_free_segs = 0;
1416 reactor::poller _rx_gc_poller;
1417 std::unique_ptr<void, free_deleter> _rx_xmem;
1418 tx_buf_factory _tx_buf_factory;
f67539c2 1419 std::optional<reactor::poller> _rx_poller;
11fdf7f2
TL
1420 reactor::poller _tx_gc_poller;
1421 std::vector<rte_mbuf*> _tx_burst;
1422 uint16_t _tx_burst_idx = 0;
1423 static constexpr phys_addr_t page_mask = ~(memory::page_size - 1);
1424};
1425
1426int dpdk_device::init_port_start()
1427{
9f95a23c 1428 assert(_port_idx < rte_eth_dev_count_avail());
11fdf7f2
TL
1429
1430 rte_eth_dev_info_get(_port_idx, &_dev_info);
1431
1432 //
1433 // This is a workaround for a missing handling of a HW limitation in the
1434 // DPDK i40e driver. This and all related to _is_i40e_device code should be
1435 // removed once this handling is added.
1436 //
1437 if (sstring("rte_i40evf_pmd") == _dev_info.driver_name ||
1438 sstring("rte_i40e_pmd") == _dev_info.driver_name) {
1439 printf("Device is an Intel's 40G NIC. Enabling 8 fragments hack!\n");
1440 _is_i40e_device = true;
1441 }
1442
1443 if (std::string("rte_vmxnet3_pmd") == _dev_info.driver_name) {
1444 printf("Device is a VMWare Virtual NIC. Enabling 16 fragments hack!\n");
1445 _is_vmxnet3_device = true;
1446 }
1447
1448 //
1449 // Another workaround: this time for a lack of number of RSS bits.
1450 // ixgbe PF NICs support up to 16 RSS queues.
1451 // ixgbe VF NICs support up to 4 RSS queues.
1452 // i40e PF NICs support up to 64 RSS queues.
1453 // i40e VF NICs support up to 16 RSS queues.
1454 //
1455 if (sstring("rte_ixgbe_pmd") == _dev_info.driver_name) {
1456 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)16);
1457 } else if (sstring("rte_ixgbevf_pmd") == _dev_info.driver_name) {
1458 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)4);
1459 } else if (sstring("rte_i40e_pmd") == _dev_info.driver_name) {
1460 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)64);
1461 } else if (sstring("rte_i40evf_pmd") == _dev_info.driver_name) {
1462 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)16);
1463 }
1464
9f95a23c
TL
1465 // Hardware offload capabilities
1466 // https://github.com/DPDK/dpdk/blob/v19.05/lib/librte_ethdev/rte_ethdev.h#L993-L1074
1467
1468 // We want to support all available offload features
1469 // TODO: below features are implemented in 17.05, should support new ones
1470 const uint64_t tx_offloads_wanted =
1471 DEV_TX_OFFLOAD_VLAN_INSERT |
1472 DEV_TX_OFFLOAD_IPV4_CKSUM |
1473 DEV_TX_OFFLOAD_UDP_CKSUM |
1474 DEV_TX_OFFLOAD_TCP_CKSUM |
1475 DEV_TX_OFFLOAD_SCTP_CKSUM |
1476 DEV_TX_OFFLOAD_TCP_TSO |
1477 DEV_TX_OFFLOAD_UDP_TSO |
1478 DEV_TX_OFFLOAD_OUTER_IPV4_CKSUM |
1479 DEV_TX_OFFLOAD_QINQ_INSERT |
1480 DEV_TX_OFFLOAD_VXLAN_TNL_TSO |
1481 DEV_TX_OFFLOAD_GRE_TNL_TSO |
1482 DEV_TX_OFFLOAD_IPIP_TNL_TSO |
1483 DEV_TX_OFFLOAD_GENEVE_TNL_TSO |
1484 DEV_TX_OFFLOAD_MACSEC_INSERT;
1485
1486 _dev_info.default_txconf.offloads =
1487 _dev_info.tx_offload_capa & tx_offloads_wanted;
11fdf7f2
TL
1488
1489 /* for port configuration all features are off by default */
1490 rte_eth_conf port_conf = { 0 };
1491
9f95a23c
TL
1492 /* setting tx offloads for port */
1493 port_conf.txmode.offloads = _dev_info.default_txconf.offloads;
1494
11fdf7f2
TL
1495 printf("Port %d: max_rx_queues %d max_tx_queues %d\n",
1496 _port_idx, _dev_info.max_rx_queues, _dev_info.max_tx_queues);
1497
1498 _num_queues = std::min({_num_queues, _dev_info.max_rx_queues, _dev_info.max_tx_queues});
1499
1500 printf("Port %d: using %d %s\n", _port_idx, _num_queues,
1501 (_num_queues > 1) ? "queues" : "queue");
1502
1503 // Set RSS mode: enable RSS if seastar is configured with more than 1 CPU.
1504 // Even if port has a single queue we still want the RSS feature to be
1505 // available in order to make HW calculate RSS hash for us.
1506 if (smp::count > 1) {
1507 if (_dev_info.hash_key_size == 40) {
1508 _rss_key = default_rsskey_40bytes;
1509 } else if (_dev_info.hash_key_size == 52) {
1510 _rss_key = default_rsskey_52bytes;
1511 } else if (_dev_info.hash_key_size != 0) {
1512 // WTF?!!
1513 rte_exit(EXIT_FAILURE,
1514 "Port %d: We support only 40 or 52 bytes RSS hash keys, %d bytes key requested",
1515 _port_idx, _dev_info.hash_key_size);
1516 } else {
1517 _rss_key = default_rsskey_40bytes;
1518 _dev_info.hash_key_size = 40;
1519 }
1520
1521 port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
9f95a23c
TL
1522 /* enable all supported rss offloads */
1523 port_conf.rx_adv_conf.rss_conf.rss_hf = _dev_info.flow_type_rss_offloads;
11fdf7f2
TL
1524 if (_dev_info.hash_key_size) {
1525 port_conf.rx_adv_conf.rss_conf.rss_key = const_cast<uint8_t *>(_rss_key.data());
1526 port_conf.rx_adv_conf.rss_conf.rss_key_len = _dev_info.hash_key_size;
1527 }
1528 } else {
1529 port_conf.rxmode.mq_mode = ETH_MQ_RX_NONE;
1530 }
1531
1532 if (_num_queues > 1) {
1533 if (_dev_info.reta_size) {
1534 // RETA size should be a power of 2
1535 assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0);
1536
1537 // Set the RSS table to the correct size
1538 _redir_table.resize(_dev_info.reta_size);
1539 _rss_table_bits = std::lround(std::log2(_dev_info.reta_size));
1540 printf("Port %d: RSS table size is %d\n",
1541 _port_idx, _dev_info.reta_size);
1542 } else {
1543 _rss_table_bits = std::lround(std::log2(_dev_info.max_rx_queues));
1544 }
1545 } else {
1546 _redir_table.push_back(0);
1547 }
1548
1549 // Set Rx VLAN stripping
1550 if (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
9f95a23c 1551 port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_VLAN_STRIP;
11fdf7f2
TL
1552 }
1553
11fdf7f2
TL
1554#ifdef RTE_ETHDEV_HAS_LRO_SUPPORT
1555 // Enable LRO
1556 if (_use_lro && (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO)) {
1557 printf("LRO is on\n");
9f95a23c 1558 port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_TCP_LRO;
11fdf7f2
TL
1559 _hw_features.rx_lro = true;
1560 } else
1561#endif
1562 printf("LRO is off\n");
1563
1564 // Check that all CSUM features are either all set all together or not set
1565 // all together. If this assumption breaks we need to rework the below logic
1566 // by splitting the csum offload feature bit into separate bits for IPv4,
1567 // TCP and UDP.
1568 assert(((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
1569 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
1570 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) ||
1571 (!(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
1572 !(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
1573 !(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)));
1574
1575 // Set Rx checksum checking
1576 if ( (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
1577 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
1578 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
1579 printf("RX checksum offload supported\n");
9f95a23c 1580 port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_CHECKSUM;
11fdf7f2
TL
1581 _hw_features.rx_csum_offload = 1;
1582 }
1583
1584 if ((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
1585 printf("TX ip checksum offload supported\n");
1586 _hw_features.tx_csum_ip_offload = 1;
1587 }
1588
1589 // TSO is supported starting from DPDK v1.8
1590 if (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
1591 printf("TSO is supported\n");
1592 _hw_features.tx_tso = 1;
1593 }
1594
1595 // There is no UFO support in the PMDs yet.
1596#if 0
1597 if (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO) {
1598 printf("UFO is supported\n");
1599 _hw_features.tx_ufo = 1;
1600 }
1601#endif
1602
1603 // Check that Tx TCP and UDP CSUM features are either all set all together
1604 // or not set all together. If this assumption breaks we need to rework the
1605 // below logic by splitting the csum offload feature bit into separate bits
1606 // for TCP and UDP.
1607 assert(((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
1608 (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) ||
1609 (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
1610 !(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)));
1611
1612 if ( (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
1613 (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
1614 printf("TX TCP&UDP checksum offload supported\n");
1615 _hw_features.tx_csum_l4_offload = 1;
1616 }
1617
1618 int retval;
1619
1620 printf("Port %u init ... ", _port_idx);
1621 fflush(stdout);
1622
1623 /*
1624 * Standard DPDK port initialisation - config port, then set up
1625 * rx and tx rings.
1626 */
1627 if ((retval = rte_eth_dev_configure(_port_idx, _num_queues, _num_queues,
1628 &port_conf)) != 0) {
1629 return retval;
1630 }
1631
1632 //rte_eth_promiscuous_enable(port_num);
1633 printf("done: \n");
1634
1635 return 0;
1636}
1637
1638void dpdk_device::set_hw_flow_control()
1639{
1640 // Read the port's current/default flow control settings
1641 struct rte_eth_fc_conf fc_conf;
1642 auto ret = rte_eth_dev_flow_ctrl_get(_port_idx, &fc_conf);
1643
1644 if (ret == -ENOTSUP) {
1645 goto not_supported;
1646 }
1647
1648 if (ret < 0) {
1649 rte_exit(EXIT_FAILURE, "Port %u: failed to get hardware flow control settings: (error %d)\n", _port_idx, ret);
1650 }
1651
1652 if (_enable_fc) {
1653 fc_conf.mode = RTE_FC_FULL;
1654 } else {
1655 fc_conf.mode = RTE_FC_NONE;
1656 }
1657
1658 ret = rte_eth_dev_flow_ctrl_set(_port_idx, &fc_conf);
1659 if (ret == -ENOTSUP) {
1660 goto not_supported;
1661 }
1662
1663 if (ret < 0) {
1664 rte_exit(EXIT_FAILURE, "Port %u: failed to set hardware flow control (error %d)\n", _port_idx, ret);
1665 }
1666
1667 printf("Port %u: %s HW FC\n", _port_idx,
1668 (_enable_fc ? "Enabling" : "Disabling"));
1669 return;
1670
1671not_supported:
1672 printf("Port %u: Changing HW FC settings is not supported\n", _port_idx);
1673}
1674
1675void dpdk_device::init_port_fini()
1676{
1677 // Changing FC requires HW reset, so set it before the port is initialized.
1678 set_hw_flow_control();
1679
1680 if (rte_eth_dev_start(_port_idx) < 0) {
1681 rte_exit(EXIT_FAILURE, "Cannot start port %d\n", _port_idx);
1682 }
1683
9f95a23c
TL
1684 /* need to defer initialize xstats since NIC specific xstat entries
1685 show up only after port initization */
1686 _xstats.start();
1687
1688 _stats_collector.set_callback([&] {
1689 rte_eth_stats rte_stats = {};
1690 int rc = rte_eth_stats_get(_port_idx, &rte_stats);
1691
1692 if (rc) {
1693 printf("Failed to get port statistics: %s\n", strerror(rc));
1694 }
1695
1696 _stats.rx.good.mcast =
1697 _xstats.get_value(dpdk_xstats::xstat_id::rx_multicast_packets);
1698 _stats.rx.good.pause_xon =
1699 _xstats.get_value(dpdk_xstats::xstat_id::rx_xon_packets);
1700 _stats.rx.good.pause_xoff =
1701 _xstats.get_value(dpdk_xstats::xstat_id::rx_xoff_packets);
1702
1703 _stats.rx.bad.crc =
1704 _xstats.get_value(dpdk_xstats::xstat_id::rx_crc_errors);
1705 _stats.rx.bad.len =
1706 _xstats.get_value(dpdk_xstats::xstat_id::rx_length_errors) +
1707 _xstats.get_value(dpdk_xstats::xstat_id::rx_undersize_errors) +
1708 _xstats.get_value(dpdk_xstats::xstat_id::rx_oversize_errors);
1709 _stats.rx.bad.total = rte_stats.ierrors;
1710
1711 _stats.tx.good.pause_xon =
1712 _xstats.get_value(dpdk_xstats::xstat_id::tx_xon_packets);
1713 _stats.tx.good.pause_xoff =
1714 _xstats.get_value(dpdk_xstats::xstat_id::tx_xoff_packets);
1715
1716 _stats.tx.bad.total = rte_stats.oerrors;
1717 });
1718
1719 // TODO: replace deprecated filter api with generic flow api
1720 #pragma GCC diagnostic push
1721 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
11fdf7f2
TL
1722 if (_num_queues > 1) {
1723 if (!rte_eth_dev_filter_supported(_port_idx, RTE_ETH_FILTER_HASH)) {
1724 printf("Port %d: HASH FILTER configuration is supported\n", _port_idx);
1725
1726 // Setup HW touse the TOEPLITZ hash function as an RSS hash function
1727 struct rte_eth_hash_filter_info info = {};
1728
1729 info.info_type = RTE_ETH_HASH_FILTER_GLOBAL_CONFIG;
1730 info.info.global_conf.hash_func = RTE_ETH_HASH_FUNCTION_TOEPLITZ;
1731
1732 if (rte_eth_dev_filter_ctrl(_port_idx, RTE_ETH_FILTER_HASH,
1733 RTE_ETH_FILTER_SET, &info) < 0) {
1734 rte_exit(EXIT_FAILURE, "Cannot set hash function on a port %d\n", _port_idx);
1735 }
1736 }
1737
1738 set_rss_table();
1739 }
9f95a23c 1740 #pragma GCC diagnostic pop
11fdf7f2
TL
1741
1742 // Wait for a link
1743 check_port_link_status();
1744
1745 printf("Created DPDK device\n");
1746}
1747
1748template <bool HugetlbfsMemBackend>
1749void* dpdk_qp<HugetlbfsMemBackend>::alloc_mempool_xmem(
9f95a23c 1750 uint16_t num_bufs, uint16_t buf_sz, size_t& xmem_size)
11fdf7f2
TL
1751{
1752 using namespace memory;
1753 char* xmem;
1754 struct rte_mempool_objsz mp_obj_sz = {};
1755
1756 rte_mempool_calc_obj_size(buf_sz, 0, &mp_obj_sz);
1757
9f95a23c
TL
1758 xmem_size =
1759 get_mempool_xmem_size(num_bufs,
11fdf7f2
TL
1760 mp_obj_sz.elt_size + mp_obj_sz.header_size +
1761 mp_obj_sz.trailer_size,
1762 page_bits);
1763
1764 // Aligning to 2M causes the further failure in small allocations.
1765 // TODO: Check why - and fix.
1766 if (posix_memalign((void**)&xmem, page_size, xmem_size)) {
1767 printf("Can't allocate %ld bytes aligned to %ld\n",
1768 xmem_size, page_size);
1769 return nullptr;
1770 }
1771
11fdf7f2
TL
1772 return xmem;
1773}
1774
1775template <bool HugetlbfsMemBackend>
1776bool dpdk_qp<HugetlbfsMemBackend>::init_rx_mbuf_pool()
1777{
1778 using namespace memory;
1779 sstring name = sstring(pktmbuf_pool_name) + to_sstring(_qid) + "_rx";
1780
1781 printf("Creating Rx mbuf pool '%s' [%u mbufs] ...\n",
1782 name.c_str(), mbufs_per_queue_rx);
1783
1784 //
1785 // If we have a hugetlbfs memory backend we may perform a virt2phys
1786 // translation and memory is "pinned". Therefore we may provide an external
1787 // memory for DPDK pools and this way significantly reduce the memory needed
1788 // for the DPDK in this case.
1789 //
1790 if (HugetlbfsMemBackend) {
9f95a23c 1791 size_t xmem_size;
11fdf7f2
TL
1792
1793 _rx_xmem.reset(alloc_mempool_xmem(mbufs_per_queue_rx, mbuf_overhead,
9f95a23c 1794 xmem_size));
11fdf7f2
TL
1795 if (!_rx_xmem.get()) {
1796 printf("Can't allocate a memory for Rx buffers\n");
1797 return false;
1798 }
1799
1800 //
1801 // Don't pass single-producer/single-consumer flags to mbuf create as it
1802 // seems faster to use a cache instead.
1803 //
1804 struct rte_pktmbuf_pool_private roomsz = {};
1805 roomsz.mbuf_data_room_size = mbuf_data_size + RTE_PKTMBUF_HEADROOM;
1806 _pktmbuf_pool_rx =
9f95a23c
TL
1807 rte_mempool_create_empty(name.c_str(),
1808 mbufs_per_queue_rx, mbuf_overhead,
1809 mbuf_cache_size,
1810 sizeof(struct rte_pktmbuf_pool_private),
1811 rte_socket_id(), 0);
1812 if (!_pktmbuf_pool_rx) {
1813 printf("Failed to create mempool for Rx\n");
1814 exit(1);
1815 }
1816
1817 rte_pktmbuf_pool_init(_pktmbuf_pool_rx, as_cookie(roomsz));
1818
1819 if (rte_mempool_populate_virt(_pktmbuf_pool_rx,
1820 (char*)(_rx_xmem.get()), xmem_size,
1821 page_size,
1822 nullptr, nullptr) < 0) {
1823 printf("Failed to populate mempool for Rx\n");
1824 exit(1);
1825 }
1826
1827 rte_mempool_obj_iter(_pktmbuf_pool_rx, rte_pktmbuf_init, nullptr);
11fdf7f2
TL
1828
1829 // reserve the memory for Rx buffers containers
1830 _rx_free_pkts.reserve(mbufs_per_queue_rx);
1831 _rx_free_bufs.reserve(mbufs_per_queue_rx);
1832
1833 //
1834 // 1) Pull all entries from the pool.
1835 // 2) Bind data buffers to each of them.
1836 // 3) Return them back to the pool.
1837 //
1838 for (int i = 0; i < mbufs_per_queue_rx; i++) {
1839 rte_mbuf* m = rte_pktmbuf_alloc(_pktmbuf_pool_rx);
1840 assert(m);
1841 _rx_free_bufs.push_back(m);
1842 }
1843
1844 for (auto&& m : _rx_free_bufs) {
1845 if (!init_noninline_rx_mbuf(m)) {
1846 printf("Failed to allocate data buffers for Rx ring. "
1847 "Consider increasing the amount of memory.\n");
1848 exit(1);
1849 }
1850 }
1851
1852 rte_mempool_put_bulk(_pktmbuf_pool_rx, (void**)_rx_free_bufs.data(),
1853 _rx_free_bufs.size());
1854
1855 _rx_free_bufs.clear();
1856 } else {
1857 struct rte_pktmbuf_pool_private roomsz = {};
1858 roomsz.mbuf_data_room_size = inline_mbuf_data_size + RTE_PKTMBUF_HEADROOM;
1859 _pktmbuf_pool_rx =
9f95a23c 1860 rte_mempool_create(name.c_str(),
11fdf7f2
TL
1861 mbufs_per_queue_rx, inline_mbuf_size,
1862 mbuf_cache_size,
1863 sizeof(struct rte_pktmbuf_pool_private),
1864 rte_pktmbuf_pool_init, as_cookie(roomsz),
1865 rte_pktmbuf_init, nullptr,
1866 rte_socket_id(), 0);
1867 }
1868
1869 return _pktmbuf_pool_rx != nullptr;
1870}
1871
9f95a23c
TL
1872// Map DMA address explicitly.
1873// XXX: does NOT work with Mellanox NICs as they use IB libs instead of VFIO.
1874template <bool HugetlbfsMemBackend>
1875bool dpdk_qp<HugetlbfsMemBackend>::map_dma()
1876{
1877 auto m = memory::get_memory_layout();
1878 rte_iova_t iova = rte_mem_virt2iova((const void*)m.start);
1879
1880 return rte_vfio_dma_map(m.start, iova, m.end - m.start) == 0;
1881}
1882
11fdf7f2
TL
1883void dpdk_device::check_port_link_status()
1884{
1885 using namespace std::literals::chrono_literals;
1886 int count = 0;
1887 constexpr auto check_interval = 100ms;
1888
1889 std::cout << "\nChecking link status " << std::endl;
1890 auto t = new timer<>;
1891 t->set_callback([this, count, t] () mutable {
1892 const int max_check_time = 90; /* 9s (90 * 100ms) in total */
1893 struct rte_eth_link link;
1894 memset(&link, 0, sizeof(link));
1895 rte_eth_link_get_nowait(_port_idx, &link);
1896
1897 if (link.link_status) {
1898 std::cout <<
1899 "done\nPort " << static_cast<unsigned>(_port_idx) <<
1900 " Link Up - speed " << link.link_speed <<
1901 " Mbps - " << ((link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1902 ("full-duplex") : ("half-duplex\n")) <<
1903 std::endl;
1904 _link_ready_promise.set_value();
1905
1906 // We may start collecting statistics only after the Link is UP.
1907 _stats_collector.arm_periodic(2s);
1908 } else if (count++ < max_check_time) {
1909 std::cout << "." << std::flush;
1910 return;
1911 } else {
1912 std::cout << "done\nPort " << _port_idx << " Link Down" << std::endl;
1913 }
1914 t->cancel();
1915 delete t;
1916 });
1917 t->arm_periodic(check_interval);
1918}
1919
1920// This function uses offsetof with non POD types.
1921#pragma GCC diagnostic push
1922#pragma GCC diagnostic ignored "-Winvalid-offsetof"
1923
1924template <bool HugetlbfsMemBackend>
9f95a23c 1925dpdk_qp<HugetlbfsMemBackend>::dpdk_qp(dpdk_device* dev, uint16_t qid,
11fdf7f2
TL
1926 const std::string stats_plugin_name)
1927 : qp(true, stats_plugin_name, qid), _dev(dev), _qid(qid),
1928 _rx_gc_poller(reactor::poller::simple([&] { return rx_gc(); })),
1929 _tx_buf_factory(qid),
1930 _tx_gc_poller(reactor::poller::simple([&] { return _tx_buf_factory.gc(); }))
1931{
1932 if (!init_rx_mbuf_pool()) {
1933 rte_exit(EXIT_FAILURE, "Cannot initialize mbuf pools\n");
1934 }
1935
9f95a23c
TL
1936 if (HugetlbfsMemBackend && !map_dma()) {
1937 rte_exit(EXIT_FAILURE, "Cannot map DMA\n");
1938 }
1939
11fdf7f2
TL
1940 static_assert(offsetof(class tx_buf, private_end) -
1941 offsetof(class tx_buf, private_start) <= RTE_PKTMBUF_HEADROOM,
1942 "RTE_PKTMBUF_HEADROOM is less than dpdk_qp::tx_buf size! "
1943 "Increase the headroom size in the DPDK configuration");
1944 static_assert(offsetof(class tx_buf, _mbuf) == 0,
1945 "There is a pad at the beginning of the tx_buf before _mbuf "
1946 "field!");
1947 static_assert((inline_mbuf_data_size & (inline_mbuf_data_size - 1)) == 0,
1948 "inline_mbuf_data_size has to be a power of two!");
1949
1950 if (rte_eth_rx_queue_setup(_dev->port_idx(), _qid, default_ring_size,
1951 rte_eth_dev_socket_id(_dev->port_idx()),
1952 _dev->def_rx_conf(), _pktmbuf_pool_rx) < 0) {
1953 rte_exit(EXIT_FAILURE, "Cannot initialize rx queue\n");
1954 }
1955
1956 if (rte_eth_tx_queue_setup(_dev->port_idx(), _qid, default_ring_size,
1957 rte_eth_dev_socket_id(_dev->port_idx()), _dev->def_tx_conf()) < 0) {
1958 rte_exit(EXIT_FAILURE, "Cannot initialize tx queue\n");
1959 }
1960
1961 // Register error statistics: Rx total and checksum errors
1962 namespace sm = seastar::metrics;
1963 _metrics.add_group(_stats_plugin_name, {
1e59de90 1964 sm::make_counter(_queue_name + "_rx_csum_errors", _stats.rx.bad.csum,
11fdf7f2
TL
1965 sm::description("Counts a number of packets received by this queue that have a bad CSUM value. "
1966 "A non-zero value of this metric usually indicates a HW issue, e.g. a bad cable.")),
1967
1e59de90 1968 sm::make_counter(_queue_name + "_rx_errors", _stats.rx.bad.total,
11fdf7f2
TL
1969 sm::description("Counts a total number of errors in the ingress path for this queue: CSUM errors, etc.")),
1970
1e59de90 1971 sm::make_counter(_queue_name + "_rx_no_memory_errors", _stats.rx.bad.no_mem,
11fdf7f2
TL
1972 sm::description("Counts a number of ingress packets received by this HW queue but dropped by the SW due to low memory. "
1973 "A non-zero value indicates that seastar doesn't have enough memory to handle the packet reception or the memory is too fragmented.")),
1974 });
1975}
1976
1977#pragma GCC diagnostic pop
1978
1979template <bool HugetlbfsMemBackend>
1980void dpdk_qp<HugetlbfsMemBackend>::rx_start() {
1981 _rx_poller = reactor::poller::simple([&] { return poll_rx_once(); });
1982}
1983
1984template<>
f67539c2 1985inline std::optional<packet>
11fdf7f2
TL
1986dpdk_qp<false>::from_mbuf_lro(rte_mbuf* m)
1987{
1988 //
1989 // Try to allocate a buffer for the whole packet's data.
1990 // If we fail - construct the packet from mbufs.
1991 // If we succeed - copy the data into this buffer, create a packet based on
1992 // this buffer and return the mbuf to its pool.
1993 //
1994 auto pkt_len = rte_pktmbuf_pkt_len(m);
1995 char* buf = (char*)malloc(pkt_len);
1996 if (buf) {
1997 // Copy the contents of the packet into the buffer we've just allocated
1998 size_t offset = 0;
1999 for (rte_mbuf* m1 = m; m1 != nullptr; m1 = m1->next) {
2000 char* data = rte_pktmbuf_mtod(m1, char*);
2001 auto len = rte_pktmbuf_data_len(m1);
2002
2003 rte_memcpy(buf + offset, data, len);
2004 offset += len;
2005 }
2006
2007 rte_pktmbuf_free(m);
2008
2009 return packet(fragment{buf, pkt_len}, make_free_deleter(buf));
2010 }
2011
2012 // Drop if allocation failed
2013 rte_pktmbuf_free(m);
2014
f67539c2 2015 return std::nullopt;
11fdf7f2
TL
2016}
2017
2018template<>
f67539c2 2019inline std::optional<packet>
11fdf7f2
TL
2020dpdk_qp<false>::from_mbuf(rte_mbuf* m)
2021{
2022 if (!_dev->hw_features_ref().rx_lro || rte_pktmbuf_is_contiguous(m)) {
2023 //
2024 // Try to allocate a buffer for packet's data. If we fail - give the
2025 // application an mbuf itself. If we succeed - copy the data into this
2026 // buffer, create a packet based on this buffer and return the mbuf to
2027 // its pool.
2028 //
2029 auto len = rte_pktmbuf_data_len(m);
2030 char* buf = (char*)malloc(len);
2031
2032 if (!buf) {
2033 // Drop if allocation failed
2034 rte_pktmbuf_free(m);
2035
f67539c2 2036 return std::nullopt;
11fdf7f2
TL
2037 } else {
2038 rte_memcpy(buf, rte_pktmbuf_mtod(m, char*), len);
2039 rte_pktmbuf_free(m);
2040
2041 return packet(fragment{buf, len}, make_free_deleter(buf));
2042 }
2043 } else {
2044 return from_mbuf_lro(m);
2045 }
2046}
2047
2048template<>
f67539c2 2049inline std::optional<packet>
11fdf7f2
TL
2050dpdk_qp<true>::from_mbuf_lro(rte_mbuf* m)
2051{
2052 _frags.clear();
2053 _bufs.clear();
2054
2055 for (; m != nullptr; m = m->next) {
2056 char* data = rte_pktmbuf_mtod(m, char*);
2057
2058 _frags.emplace_back(fragment{data, rte_pktmbuf_data_len(m)});
2059 _bufs.push_back(data);
2060 }
2061
2062 return packet(_frags.begin(), _frags.end(),
2063 make_deleter(deleter(),
2064 [bufs_vec = std::move(_bufs)] {
2065 for (auto&& b : bufs_vec) {
2066 free(b);
2067 }
2068 }));
2069}
2070
2071template<>
f67539c2 2072inline std::optional<packet> dpdk_qp<true>::from_mbuf(rte_mbuf* m)
11fdf7f2
TL
2073{
2074 _rx_free_pkts.push_back(m);
2075 _num_rx_free_segs += m->nb_segs;
2076
2077 if (!_dev->hw_features_ref().rx_lro || rte_pktmbuf_is_contiguous(m)) {
2078 char* data = rte_pktmbuf_mtod(m, char*);
2079
2080 return packet(fragment{data, rte_pktmbuf_data_len(m)},
2081 make_free_deleter(data));
2082 } else {
2083 return from_mbuf_lro(m);
2084 }
2085}
2086
2087template <bool HugetlbfsMemBackend>
2088inline bool dpdk_qp<HugetlbfsMemBackend>::refill_one_cluster(rte_mbuf* head)
2089{
2090 for (; head != nullptr; head = head->next) {
2091 if (!refill_rx_mbuf(head)) {
2092 //
2093 // If we failed to allocate a new buffer - push the rest of the
2094 // cluster back to the free_packets list for a later retry.
2095 //
2096 _rx_free_pkts.push_back(head);
2097 return false;
2098 }
2099 _rx_free_bufs.push_back(head);
2100 }
2101
2102 return true;
2103}
2104
2105template <bool HugetlbfsMemBackend>
2106bool dpdk_qp<HugetlbfsMemBackend>::rx_gc()
2107{
2108 if (_num_rx_free_segs >= rx_gc_thresh) {
2109 while (!_rx_free_pkts.empty()) {
2110 //
2111 // Use back() + pop_back() semantics to avoid an extra
2112 // _rx_free_pkts.clear() at the end of the function - clear() has a
2113 // linear complexity.
2114 //
2115 auto m = _rx_free_pkts.back();
2116 _rx_free_pkts.pop_back();
2117
2118 if (!refill_one_cluster(m)) {
2119 break;
2120 }
2121 }
2122
2123 if (_rx_free_bufs.size()) {
2124 rte_mempool_put_bulk(_pktmbuf_pool_rx,
2125 (void **)_rx_free_bufs.data(),
2126 _rx_free_bufs.size());
2127
2128 // TODO: assert() in a fast path! Remove me ASAP!
2129 assert(_num_rx_free_segs >= _rx_free_bufs.size());
2130
2131 _num_rx_free_segs -= _rx_free_bufs.size();
2132 _rx_free_bufs.clear();
2133
2134 // TODO: assert() in a fast path! Remove me ASAP!
2135 assert((_rx_free_pkts.empty() && !_num_rx_free_segs) ||
2136 (!_rx_free_pkts.empty() && _num_rx_free_segs));
2137 }
2138 }
2139
2140 return _num_rx_free_segs >= rx_gc_thresh;
2141}
2142
2143
2144template <bool HugetlbfsMemBackend>
2145void dpdk_qp<HugetlbfsMemBackend>::process_packets(
2146 struct rte_mbuf **bufs, uint16_t count)
2147{
2148 uint64_t nr_frags = 0, bytes = 0;
2149
2150 for (uint16_t i = 0; i < count; i++) {
2151 struct rte_mbuf *m = bufs[i];
2152 offload_info oi;
2153
f67539c2 2154 std::optional<packet> p = from_mbuf(m);
11fdf7f2
TL
2155
2156 // Drop the packet if translation above has failed
2157 if (!p) {
2158 _stats.rx.bad.inc_no_mem();
2159 continue;
2160 }
2161
2162 nr_frags += m->nb_segs;
2163 bytes += m->pkt_len;
2164
9f95a23c
TL
2165 // Set stripped VLAN value if available
2166 if ((m->ol_flags & PKT_RX_VLAN_STRIPPED) &&
2167 (m->ol_flags & PKT_RX_VLAN)) {
11fdf7f2
TL
2168
2169 oi.vlan_tci = m->vlan_tci;
2170 }
2171
2172 if (_dev->hw_features().rx_csum_offload) {
2173 if (m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
2174 // Packet with bad checksum, just drop it.
2175 _stats.rx.bad.inc_csum_err();
2176 continue;
2177 }
2178 // Note that when _hw_features.rx_csum_offload is on, the receive
2179 // code for ip, tcp and udp will assume they don't need to check
2180 // the checksum again, because we did this here.
2181 }
2182
2183 (*p).set_offload_info(oi);
2184 if (m->ol_flags & PKT_RX_RSS_HASH) {
2185 (*p).set_rss_hash(m->hash.rss);
2186 }
2187
2188 _dev->l2receive(std::move(*p));
2189 }
2190
2191 _stats.rx.good.update_pkts_bunch(count);
2192 _stats.rx.good.update_frags_stats(nr_frags, bytes);
2193
2194 if (!HugetlbfsMemBackend) {
2195 _stats.rx.good.copy_frags = _stats.rx.good.nr_frags;
2196 _stats.rx.good.copy_bytes = _stats.rx.good.bytes;
2197 }
2198}
2199
2200template <bool HugetlbfsMemBackend>
2201bool dpdk_qp<HugetlbfsMemBackend>::poll_rx_once()
2202{
2203 struct rte_mbuf *buf[packet_read_size];
2204
2205 /* read a port */
2206 uint16_t rx_count = rte_eth_rx_burst(_dev->port_idx(), _qid,
2207 buf, packet_read_size);
2208
2209 /* Now process the NIC packets read */
2210 if (likely(rx_count > 0)) {
2211 process_packets(buf, rx_count);
2212 }
2213
2214 return rx_count;
2215}
2216
2217void dpdk_device::set_rss_table()
2218{
2219 if (_dev_info.reta_size == 0)
2220 return;
2221
2222 int reta_conf_size =
2223 std::max(1, _dev_info.reta_size / RTE_RETA_GROUP_SIZE);
f67539c2 2224 std::vector<rte_eth_rss_reta_entry64> reta_conf(reta_conf_size);
11fdf7f2
TL
2225
2226 // Configure the HW indirection table
2227 unsigned i = 0;
2228 for (auto& x : reta_conf) {
2229 x.mask = ~0ULL;
2230 for (auto& r: x.reta) {
2231 r = i++ % _num_queues;
2232 }
2233 }
2234
f67539c2 2235 if (rte_eth_dev_rss_reta_update(_port_idx, reta_conf.data(), _dev_info.reta_size)) {
11fdf7f2
TL
2236 rte_exit(EXIT_FAILURE, "Port %d: Failed to update an RSS indirection table", _port_idx);
2237 }
2238
2239 // Fill our local indirection table. Make it in a separate loop to keep things simple.
2240 i = 0;
2241 for (auto& r : _redir_table) {
2242 r = i++ % _num_queues;
2243 }
2244}
2245
20effc67
TL
2246std::unique_ptr<qp> dpdk_device::init_local_queue(const program_options::option_group& opts, uint16_t qid) {
2247 auto net_opts = dynamic_cast<const net::native_stack_options*>(&opts);
2248 assert(net_opts);
11fdf7f2
TL
2249
2250 std::unique_ptr<qp> qp;
20effc67 2251 if (net_opts->_hugepages) {
11fdf7f2
TL
2252 qp = std::make_unique<dpdk_qp<true>>(this, qid,
2253 _stats_plugin_name + "-" + _stats_plugin_inst);
2254 } else {
2255 qp = std::make_unique<dpdk_qp<false>>(this, qid,
2256 _stats_plugin_name + "-" + _stats_plugin_inst);
2257 }
2258
9f95a23c
TL
2259 // FIXME: future is discarded
2260 (void)smp::submit_to(_home_cpu, [this] () mutable {
11fdf7f2
TL
2261 if (++_queues_ready == _num_queues) {
2262 init_port_fini();
2263 }
2264 });
2265 return qp;
2266}
2267} // namespace dpdk
2268
2269/******************************** Interface functions *************************/
2270
2271std::unique_ptr<net::device> create_dpdk_net_device(
9f95a23c
TL
2272 uint16_t port_idx,
2273 uint16_t num_queues,
11fdf7f2
TL
2274 bool use_lro,
2275 bool enable_fc)
2276{
2277 static bool called = false;
2278
2279 assert(!called);
2280 assert(dpdk::eal::initialized);
2281
2282 called = true;
2283
2284 // Check that we have at least one DPDK-able port
9f95a23c 2285 if (rte_eth_dev_count_avail() == 0) {
11fdf7f2
TL
2286 rte_exit(EXIT_FAILURE, "No Ethernet ports - bye\n");
2287 } else {
9f95a23c 2288 printf("ports number: %d\n", rte_eth_dev_count_avail());
11fdf7f2
TL
2289 }
2290
2291 return std::make_unique<dpdk::dpdk_device>(port_idx, num_queues, use_lro,
2292 enable_fc);
2293}
2294
2295std::unique_ptr<net::device> create_dpdk_net_device(
2296 const hw_config& hw_cfg)
2297{
2298 return create_dpdk_net_device(*hw_cfg.port_index, smp::count, hw_cfg.lro, hw_cfg.hw_fc);
2299}
2300
20effc67 2301}
11fdf7f2 2302
20effc67
TL
2303#else
2304#include <seastar/net/dpdk.hh>
2305#endif // SEASTAR_HAVE_DPDK
11fdf7f2 2306
20effc67 2307namespace seastar::net {
11fdf7f2 2308
20effc67
TL
2309dpdk_options::dpdk_options(program_options::option_group* parent_group)
2310#ifdef SEASTAR_HAVE_DPDK
2311 : program_options::option_group(parent_group, "DPDK net options")
2312 , dpdk_port_index(*this, "dpdk-port-index",
2313 0,
2314 "DPDK Port Index")
2315 , hw_fc(*this, "hw-fc",
2316 "on",
2317 "Enable HW Flow Control (on / off)")
2318#else
2319 : program_options::option_group(parent_group, "DPDK net options", program_options::unused{})
2320 , dpdk_port_index(*this, "dpdk-port-index", program_options::unused{})
2321 , hw_fc(*this, "hw-fc", program_options::unused{})
2322#endif
11fdf7f2
TL
2323#if 0
2324 opts.add_options()
2325 ("csum-offload",
2326 boost::program_options::value<std::string>()->default_value("on"),
2327 "Enable checksum offload feature (on / off)")
2328 ("tso",
2329 boost::program_options::value<std::string>()->default_value("on"),
2330 "Enable TCP segment offload feature (on / off)")
2331 ("ufo",
2332 boost::program_options::value<std::string>()->default_value("on"),
2333 "Enable UDP fragmentation offload feature (on / off)")
2334 ;
2335#endif
20effc67 2336{
11fdf7f2
TL
2337}
2338
2339}