]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/net/dpdk.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / net / dpdk.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2014 Cloudius Systems, Ltd.
20 */
21 #ifdef SEASTAR_HAVE_DPDK
22
23 #include <cinttypes>
24 #include <seastar/core/posix.hh>
25 #include "core/vla.hh"
26 #include <seastar/core/reactor.hh>
27 #include <seastar/net/virtio-interface.hh>
28 #include <seastar/core/stream.hh>
29 #include <seastar/core/circular_buffer.hh>
30 #include <seastar/core/align.hh>
31 #include <seastar/core/sstring.hh>
32 #include <seastar/core/memory.hh>
33 #include <seastar/core/metrics.hh>
34 #include <seastar/util/function_input_iterator.hh>
35 #include <seastar/util/transform_iterator.hh>
36 #include <atomic>
37 #include <vector>
38 #include <queue>
39 #include <seastar/util/std-compat.hh>
40 #include <boost/preprocessor.hpp>
41 #include <seastar/net/ip.hh>
42 #include <seastar/net/const.hh>
43 #include <seastar/core/dpdk_rte.hh>
44 #include <seastar/net/dpdk.hh>
45 #include <seastar/net/toeplitz.hh>
46
47 #include <getopt.h>
48 #include <malloc.h>
49
50 #include <cinttypes>
51 #include <rte_config.h>
52 #include <rte_common.h>
53 #include <rte_eal.h>
54 #include <rte_pci.h>
55 #include <rte_ethdev.h>
56 #include <rte_cycles.h>
57 #include <rte_memzone.h>
58 #include <rte_vfio.h>
59
60 #if RTE_VERSION <= RTE_VERSION_NUM(2,0,0,16)
61
62 static
63 inline
64 char*
65 rte_mbuf_to_baddr(rte_mbuf* mbuf) {
66 return reinterpret_cast<char*>(RTE_MBUF_TO_BADDR(mbuf));
67 }
68
69 void* as_cookie(struct rte_pktmbuf_pool_private& p) {
70 return reinterpret_cast<void*>(uint64_t(p.mbuf_data_room_size));
71 };
72
73 #else
74
75 void* as_cookie(struct rte_pktmbuf_pool_private& p) {
76 return &p;
77 };
78
79 #endif
80
81 #ifndef MARKER
82 typedef void *MARKER[0]; /**< generic marker for a point in a structure */
83 #endif
84
85 // Calculate maximum amount of memory required to store given number of objects
86 static size_t
87 get_mempool_xmem_size(uint32_t elt_num, size_t total_elt_sz, uint32_t pg_shift)
88 {
89 size_t obj_per_page, pg_num, pg_sz;
90
91 if (total_elt_sz == 0) {
92 return 0;
93 }
94
95 if (pg_shift == 0) {
96 return total_elt_sz * elt_num;
97 }
98
99 pg_sz = (size_t)1 << pg_shift;
100 obj_per_page = pg_sz / total_elt_sz;
101 if (obj_per_page == 0) {
102 return RTE_ALIGN_CEIL(total_elt_sz, pg_sz) * elt_num;
103 }
104
105 pg_num = (elt_num + obj_per_page - 1) / obj_per_page;
106 return pg_num << pg_shift;
107 }
108
109 using namespace seastar::net;
110
111 namespace seastar {
112
113 namespace dpdk {
114
115 /******************* Net device related constatns *****************************/
116 static constexpr uint16_t default_ring_size = 512;
117
118 //
119 // We need 2 times the ring size of buffers because of the way PMDs
120 // refill the ring.
121 //
122 static constexpr uint16_t mbufs_per_queue_rx = 2 * default_ring_size;
123 static constexpr uint16_t rx_gc_thresh = 64;
124
125 //
126 // No need to keep more descriptors in the air than can be sent in a single
127 // rte_eth_tx_burst() call.
128 //
129 static constexpr uint16_t mbufs_per_queue_tx = 2 * default_ring_size;
130
131 static constexpr uint16_t mbuf_cache_size = 512;
132 static constexpr uint16_t mbuf_overhead =
133 sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM;
134 //
135 // We'll allocate 2K data buffers for an inline case because this would require
136 // a single page per mbuf. If we used 4K data buffers here it would require 2
137 // pages for a single buffer (due to "mbuf_overhead") and this is a much more
138 // demanding memory constraint.
139 //
140 static constexpr size_t inline_mbuf_data_size = 2048;
141
142 //
143 // Size of the data buffer in the non-inline case.
144 //
145 // We may want to change (increase) this value in future, while the
146 // inline_mbuf_data_size value will unlikely change due to reasons described
147 // above.
148 //
149 static constexpr size_t mbuf_data_size = 2048;
150
151 // (INLINE_MBUF_DATA_SIZE(2K)*32 = 64K = Max TSO/LRO size) + 1 mbuf for headers
152 static constexpr uint8_t max_frags = 32 + 1;
153
154 //
155 // Intel's 40G NIC HW limit for a number of fragments in an xmit segment.
156 //
157 // See Chapter 8.4.1 "Transmit Packet in System Memory" of the xl710 devices
158 // spec. for more details.
159 //
160 static constexpr uint8_t i40e_max_xmit_segment_frags = 8;
161
162 //
163 // VMWare's virtual NIC limit for a number of fragments in an xmit segment.
164 //
165 // see drivers/net/vmxnet3/base/vmxnet3_defs.h VMXNET3_MAX_TXD_PER_PKT
166 //
167 static constexpr uint8_t vmxnet3_max_xmit_segment_frags = 16;
168
169 static constexpr uint16_t inline_mbuf_size =
170 inline_mbuf_data_size + mbuf_overhead;
171
172 uint32_t qp_mempool_obj_size(bool hugetlbfs_membackend)
173 {
174 uint32_t mp_size = 0;
175 struct rte_mempool_objsz mp_obj_sz = {};
176
177 //
178 // We will align each size to huge page size because DPDK allocates
179 // physically contiguous memory region for each pool object.
180 //
181
182 // Rx
183 if (hugetlbfs_membackend) {
184 mp_size +=
185 align_up(rte_mempool_calc_obj_size(mbuf_overhead, 0, &mp_obj_sz)+
186 sizeof(struct rte_pktmbuf_pool_private),
187 memory::huge_page_size);
188 } else {
189 mp_size +=
190 align_up(rte_mempool_calc_obj_size(inline_mbuf_size, 0, &mp_obj_sz)+
191 sizeof(struct rte_pktmbuf_pool_private),
192 memory::huge_page_size);
193 }
194 //Tx
195 std::memset(&mp_obj_sz, 0, sizeof(mp_obj_sz));
196 mp_size += align_up(rte_mempool_calc_obj_size(inline_mbuf_size, 0,
197 &mp_obj_sz)+
198 sizeof(struct rte_pktmbuf_pool_private),
199 memory::huge_page_size);
200 return mp_size;
201 }
202
203 static constexpr const char* pktmbuf_pool_name = "dpdk_pktmbuf_pool";
204
205 /*
206 * When doing reads from the NIC queues, use this batch size
207 */
208 static constexpr uint8_t packet_read_size = 32;
209 /******************************************************************************/
210
211 struct port_stats {
212 port_stats() : rx{}, tx{} {}
213
214 struct {
215 struct {
216 uint64_t mcast; // number of received multicast packets
217 uint64_t pause_xon; // number of received PAUSE XON frames
218 uint64_t pause_xoff; // number of received PAUSE XOFF frames
219 } good;
220
221 struct {
222 uint64_t dropped; // missed packets (e.g. full FIFO)
223 uint64_t crc; // packets with CRC error
224 uint64_t len; // packets with a bad length
225 uint64_t total; // total number of erroneous received packets
226 } bad;
227 } rx;
228
229 struct {
230 struct {
231 uint64_t pause_xon; // number of sent PAUSE XON frames
232 uint64_t pause_xoff; // number of sent PAUSE XOFF frames
233 } good;
234
235 struct {
236 uint64_t total; // total number of failed transmitted packets
237 } bad;
238 } tx;
239 };
240
241 #define XSTATS_ID_LIST \
242 (rx_multicast_packets) \
243 (rx_xon_packets) \
244 (rx_xoff_packets) \
245 (rx_crc_errors) \
246 (rx_length_errors) \
247 (rx_undersize_errors) \
248 (rx_oversize_errors) \
249 (tx_xon_packets) \
250 (tx_xoff_packets)
251
252 class dpdk_xstats {
253 public:
254 dpdk_xstats(uint16_t port_id)
255 : _port_id(port_id)
256 {
257 }
258
259 ~dpdk_xstats()
260 {
261 if (_xstats)
262 delete _xstats;
263 if (_xstat_names)
264 delete _xstat_names;
265 }
266
267 enum xstat_id {
268 BOOST_PP_SEQ_ENUM(XSTATS_ID_LIST)
269 };
270
271
272 void start() {
273 _len = rte_eth_xstats_get_names(_port_id, NULL, 0);
274 _xstats = new rte_eth_xstat[_len];
275 _xstat_names = new rte_eth_xstat_name[_len];
276 update_xstats();
277 update_xstat_names();
278 update_offsets();
279 }
280
281 void update_xstats() {
282 auto len = rte_eth_xstats_get(_port_id, _xstats, _len);
283 assert(len == _len);
284 }
285
286 uint64_t get_value(const xstat_id id) {
287 auto off = _offsets[static_cast<int>(id)];
288 if (off == -1) {
289 return 0;
290 }
291 return _xstats[off].value;
292 }
293
294 private:
295 uint16_t _port_id;
296 int _len;
297 struct rte_eth_xstat *_xstats = nullptr;
298 struct rte_eth_xstat_name *_xstat_names = nullptr;
299 int _offsets[BOOST_PP_SEQ_SIZE(XSTATS_ID_LIST)];
300
301 static const sstring id_to_str(const xstat_id id) {
302 #define ENUM_TO_STR(r, data, elem) \
303 if (id == elem) \
304 return BOOST_PP_STRINGIZE(elem);
305
306 BOOST_PP_SEQ_FOR_EACH(ENUM_TO_STR, ~, XSTATS_ID_LIST)
307 return "";
308 }
309
310 int get_offset_by_name(const xstat_id id, const int len) {
311 for (int i = 0; i < len; i++) {
312 if (id_to_str(id) == _xstat_names[i].name)
313 return i;
314 }
315 return -1;
316 }
317
318 void update_xstat_names() {
319 auto len = rte_eth_xstats_get_names(_port_id, _xstat_names, _len);
320 assert(len == _len);
321 }
322
323 void update_offsets() {
324 #define FIND_OFFSET(r, data, elem) \
325 _offsets[static_cast<int>(elem)] = \
326 get_offset_by_name(elem, _len);
327
328 BOOST_PP_SEQ_FOR_EACH(FIND_OFFSET, ~, XSTATS_ID_LIST)
329 }
330 };
331
332 class dpdk_device : public device {
333 uint16_t _port_idx;
334 uint16_t _num_queues;
335 net::hw_features _hw_features;
336 uint16_t _queues_ready = 0;
337 unsigned _home_cpu;
338 bool _use_lro;
339 bool _enable_fc;
340 std::vector<uint8_t> _redir_table;
341 rss_key_type _rss_key;
342 port_stats _stats;
343 timer<> _stats_collector;
344 const std::string _stats_plugin_name;
345 const std::string _stats_plugin_inst;
346 seastar::metrics::metric_groups _metrics;
347 bool _is_i40e_device = false;
348 bool _is_vmxnet3_device = false;
349 dpdk_xstats _xstats;
350
351 public:
352 rte_eth_dev_info _dev_info = {};
353 promise<> _link_ready_promise;
354
355 private:
356 /**
357 * Port initialization consists of 3 main stages:
358 * 1) General port initialization which ends with a call to
359 * rte_eth_dev_configure() where we request the needed number of Rx and
360 * Tx queues.
361 * 2) Individual queues initialization. This is done in the constructor of
362 * dpdk_qp class. In particular the memory pools for queues are allocated
363 * in this stage.
364 * 3) The final stage of the initialization which starts with the call of
365 * rte_eth_dev_start() after which the port becomes fully functional. We
366 * will also wait for a link to get up in this stage.
367 */
368
369
370 /**
371 * First stage of the port initialization.
372 *
373 * @return 0 in case of success and an appropriate error code in case of an
374 * error.
375 */
376 int init_port_start();
377
378 /**
379 * The final stage of a port initialization.
380 * @note Must be called *after* all queues from stage (2) have been
381 * initialized.
382 */
383 void init_port_fini();
384
385 /**
386 * Check the link status of out port in up to 9s, and print them finally.
387 */
388 void check_port_link_status();
389
390 /**
391 * Configures the HW Flow Control
392 */
393 void set_hw_flow_control();
394
395 public:
396 dpdk_device(uint16_t port_idx, uint16_t num_queues, bool use_lro,
397 bool enable_fc)
398 : _port_idx(port_idx)
399 , _num_queues(num_queues)
400 , _home_cpu(this_shard_id())
401 , _use_lro(use_lro)
402 , _enable_fc(enable_fc)
403 , _stats_plugin_name("network")
404 , _stats_plugin_inst(std::string("port") + std::to_string(_port_idx))
405 , _xstats(port_idx)
406 {
407
408 /* now initialise the port we will use */
409 int ret = init_port_start();
410 if (ret != 0) {
411 rte_exit(EXIT_FAILURE, "Cannot initialise port %u\n", _port_idx);
412 }
413
414 // Register port statistics pollers
415 namespace sm = seastar::metrics;
416 _metrics.add_group(_stats_plugin_name, {
417 // Rx Good
418 sm::make_derive("rx_multicast", _stats.rx.good.mcast,
419 sm::description("Counts a number of received multicast packets."), {sm::shard_label(_stats_plugin_inst)}),
420 // Rx Errors
421 sm::make_derive("rx_crc_errors", _stats.rx.bad.crc,
422 sm::description("Counts a number of received packets with a bad CRC value. "
423 "A non-zero value of this metric usually indicates a HW problem, e.g. a bad cable."), {sm::shard_label(_stats_plugin_inst)}),
424
425 sm::make_derive("rx_dropped", _stats.rx.bad.dropped,
426 sm::description("Counts a number of dropped received packets. "
427 "A non-zero value of this counter indicated the overflow of ingress HW buffers. "
428 "This usually happens because of a rate of a sender on the other side of the link is higher than we can process as a receiver."), {sm::shard_label(_stats_plugin_inst)}),
429
430 sm::make_derive("rx_bad_length_errors", _stats.rx.bad.len,
431 sm::description("Counts a number of received packets with a bad length value. "
432 "A non-zero value of this metric usually indicates a HW issue: e.g. bad cable."), {sm::shard_label(_stats_plugin_inst)}),
433 // Coupled counters:
434 // Good
435 sm::make_derive("rx_pause_xon", _stats.rx.good.pause_xon,
436 sm::description("Counts a number of received PAUSE XON frames (PAUSE frame with a quanta of zero). "
437 "When PAUSE XON frame is received our port may resume sending L2 frames. "
438 "PAUSE XON frames are sent to resume sending that was previously paused with a PAUSE XOFF frame. If ingress "
439 "buffer falls below the low watermark threshold before the timeout configured in the original PAUSE XOFF frame the receiver may decide to send PAUSE XON frame. "
440 "A non-zero value of this metric may mean that our sender is bursty and that the spikes overwhelm the receiver on the other side of the link."), {sm::shard_label(_stats_plugin_inst)}),
441
442 sm::make_derive("tx_pause_xon", _stats.tx.good.pause_xon,
443 sm::description("Counts a number of sent PAUSE XON frames (L2 flow control frames). "
444 "A non-zero value of this metric indicates that our ingress path doesn't keep up with the rate of a sender on the other side of the link. "
445 "Note that if a sender port respects PAUSE frames this will prevent it from sending from ALL its egress queues because L2 flow control is defined "
446 "on a per-link resolution."), {sm::shard_label(_stats_plugin_inst)}),
447
448 sm::make_derive("rx_pause_xoff", _stats.rx.good.pause_xoff,
449 sm::description("Counts a number of received PAUSE XOFF frames. "
450 "A non-zero value of this metric indicates that our egress overwhelms the receiver on the other side of the link and it has to send PAUSE frames to make us stop sending. "
451 "Note that if our port respects PAUSE frames a reception of a PAUSE XOFF frame will cause ALL egress queues of this port to stop sending."), {sm::shard_label(_stats_plugin_inst)}),
452
453 sm::make_derive("tx_pause_xoff", _stats.tx.good.pause_xoff,
454 sm::description("Counts a number of sent PAUSE XOFF frames. "
455 "A non-zero value of this metric indicates that our ingress path (SW and HW) doesn't keep up with the rate of a sender on the other side of the link and as a result "
456 "our ingress HW buffers overflow."), {sm::shard_label(_stats_plugin_inst)}),
457 // Errors
458 sm::make_derive("rx_errors", _stats.rx.bad.total,
459 sm::description("Counts the total number of ingress errors: CRC errors, bad length errors, etc."), {sm::shard_label(_stats_plugin_inst)}),
460
461 sm::make_derive("tx_errors", _stats.tx.bad.total,
462 sm::description("Counts a total number of egress errors. A non-zero value usually indicated a problem with a HW or a SW driver."), {sm::shard_label(_stats_plugin_inst)}),
463 });
464 }
465
466 ~dpdk_device() {
467 _stats_collector.cancel();
468 }
469
470 ethernet_address hw_address() override {
471 struct ether_addr mac;
472 rte_eth_macaddr_get(_port_idx, &mac);
473
474 return mac.addr_bytes;
475 }
476 net::hw_features hw_features() override {
477 return _hw_features;
478 }
479
480 net::hw_features& hw_features_ref() { return _hw_features; }
481
482 const rte_eth_rxconf* def_rx_conf() const {
483 return &_dev_info.default_rxconf;
484 }
485
486 const rte_eth_txconf* def_tx_conf() const {
487 return &_dev_info.default_txconf;
488 }
489
490 /**
491 * Set the RSS table in the device and store it in the internal vector.
492 */
493 void set_rss_table();
494
495 virtual uint16_t hw_queues_count() override { return _num_queues; }
496 virtual future<> link_ready() override { return _link_ready_promise.get_future(); }
497 virtual std::unique_ptr<qp> init_local_queue(boost::program_options::variables_map opts, uint16_t qid) override;
498 virtual unsigned hash2qid(uint32_t hash) override {
499 assert(_redir_table.size());
500 return _redir_table[hash & (_redir_table.size() - 1)];
501 }
502 uint16_t port_idx() { return _port_idx; }
503 bool is_i40e_device() const {
504 return _is_i40e_device;
505 }
506 bool is_vmxnet3_device() const {
507 return _is_vmxnet3_device;
508 }
509
510 virtual rss_key_type rss_key() const override { return _rss_key; }
511 };
512
513 template <bool HugetlbfsMemBackend>
514 class dpdk_qp : public net::qp {
515 class tx_buf_factory;
516
517 class tx_buf {
518 friend class dpdk_qp;
519 public:
520 static tx_buf* me(rte_mbuf* mbuf) {
521 return reinterpret_cast<tx_buf*>(mbuf);
522 }
523
524 private:
525 /**
526 * Checks if the original packet of a given cluster should be linearized
527 * due to HW limitations.
528 *
529 * @param head head of a cluster to check
530 *
531 * @return TRUE if a packet should be linearized.
532 */
533 static bool i40e_should_linearize(rte_mbuf *head) {
534 bool is_tso = head->ol_flags & PKT_TX_TCP_SEG;
535
536 // For a non-TSO case: number of fragments should not exceed 8
537 if (!is_tso){
538 return head->nb_segs > i40e_max_xmit_segment_frags;
539 }
540
541 //
542 // For a TSO case each MSS window should not include more than 8
543 // fragments including headers.
544 //
545
546 // Calculate the number of frags containing headers.
547 //
548 // Note: we support neither VLAN nor tunneling thus headers size
549 // accounting is super simple.
550 //
551 size_t headers_size = head->l2_len + head->l3_len + head->l4_len;
552 unsigned hdr_frags = 0;
553 size_t cur_payload_len = 0;
554 rte_mbuf *cur_seg = head;
555
556 while (cur_seg && cur_payload_len < headers_size) {
557 cur_payload_len += cur_seg->data_len;
558 cur_seg = cur_seg->next;
559 hdr_frags++;
560 }
561
562 //
563 // Header fragments will be used for each TSO segment, thus the
564 // maximum number of data segments will be 8 minus the number of
565 // header fragments.
566 //
567 // It's unclear from the spec how the first TSO segment is treated
568 // if the last fragment with headers contains some data bytes:
569 // whether this fragment will be accounted as a single fragment or
570 // as two separate fragments. We prefer to play it safe and assume
571 // that this fragment will be accounted as two separate fragments.
572 //
573 size_t max_win_size = i40e_max_xmit_segment_frags - hdr_frags;
574
575 if (head->nb_segs <= max_win_size) {
576 return false;
577 }
578
579 // Get the data (without headers) part of the first data fragment
580 size_t prev_frag_data = cur_payload_len - headers_size;
581 auto mss = head->tso_segsz;
582
583 while (cur_seg) {
584 unsigned frags_in_seg = 0;
585 size_t cur_seg_size = 0;
586
587 if (prev_frag_data) {
588 cur_seg_size = prev_frag_data;
589 frags_in_seg++;
590 prev_frag_data = 0;
591 }
592
593 while (cur_seg_size < mss && cur_seg) {
594 cur_seg_size += cur_seg->data_len;
595 cur_seg = cur_seg->next;
596 frags_in_seg++;
597
598 if (frags_in_seg > max_win_size) {
599 return true;
600 }
601 }
602
603 if (cur_seg_size > mss) {
604 prev_frag_data = cur_seg_size - mss;
605 }
606 }
607
608 return false;
609 }
610
611 /**
612 * Sets the offload info in the head buffer of an rte_mbufs cluster.
613 *
614 * @param p an original packet the cluster is built for
615 * @param qp QP handle
616 * @param head a head of an rte_mbufs cluster
617 */
618 static void set_cluster_offload_info(const packet& p, const dpdk_qp& qp, rte_mbuf* head) {
619 // Handle TCP checksum offload
620 auto oi = p.offload_info();
621 if (oi.needs_ip_csum) {
622 head->ol_flags |= PKT_TX_IP_CKSUM;
623 // TODO: Take a VLAN header into an account here
624 head->l2_len = sizeof(struct ether_hdr);
625 head->l3_len = oi.ip_hdr_len;
626 }
627 if (qp.port().hw_features().tx_csum_l4_offload) {
628 if (oi.protocol == ip_protocol_num::tcp) {
629 head->ol_flags |= PKT_TX_TCP_CKSUM;
630 // TODO: Take a VLAN header into an account here
631 head->l2_len = sizeof(struct ether_hdr);
632 head->l3_len = oi.ip_hdr_len;
633
634 if (oi.tso_seg_size) {
635 assert(oi.needs_ip_csum);
636 head->ol_flags |= PKT_TX_TCP_SEG;
637 head->l4_len = oi.tcp_hdr_len;
638 head->tso_segsz = oi.tso_seg_size;
639 }
640 } else if (oi.protocol == ip_protocol_num::udp) {
641 head->ol_flags |= PKT_TX_UDP_CKSUM;
642 // TODO: Take a VLAN header into an account here
643 head->l2_len = sizeof(struct ether_hdr);
644 head->l3_len = oi.ip_hdr_len;
645 }
646 }
647 }
648
649 /**
650 * Creates a tx_buf cluster representing a given packet in a "zero-copy"
651 * way.
652 *
653 * @param p packet to translate
654 * @param qp dpdk_qp handle
655 *
656 * @return the HEAD tx_buf of the cluster or nullptr in case of a
657 * failure
658 */
659 static tx_buf* from_packet_zc(packet&& p, dpdk_qp& qp) {
660
661 // Too fragmented - linearize
662 if (p.nr_frags() > max_frags) {
663 p.linearize();
664 ++qp._stats.tx.linearized;
665 }
666
667 build_mbuf_cluster:
668 rte_mbuf *head = nullptr, *last_seg = nullptr;
669 unsigned nsegs = 0;
670
671 // Create a HEAD of the fragmented packet
672 if (!translate_one_frag(qp, p.frag(0), head, last_seg, nsegs)) {
673 return nullptr;
674 }
675
676 unsigned total_nsegs = nsegs;
677
678 for (unsigned i = 1; i < p.nr_frags(); i++) {
679 rte_mbuf *h = nullptr, *new_last_seg = nullptr;
680 if (!translate_one_frag(qp, p.frag(i), h, new_last_seg, nsegs)) {
681 me(head)->recycle();
682 return nullptr;
683 }
684
685 total_nsegs += nsegs;
686
687 // Attach a new buffers' chain to the packet chain
688 last_seg->next = h;
689 last_seg = new_last_seg;
690 }
691
692 // Update the HEAD buffer with the packet info
693 head->pkt_len = p.len();
694 head->nb_segs = total_nsegs;
695
696 set_cluster_offload_info(p, qp, head);
697
698 //
699 // If a packet hasn't been linearized already and the resulting
700 // cluster requires the linearisation due to HW limitation:
701 //
702 // - Recycle the cluster.
703 // - Linearize the packet.
704 // - Build the cluster once again
705 //
706 if (head->nb_segs > max_frags ||
707 (p.nr_frags() > 1 && qp.port().is_i40e_device() && i40e_should_linearize(head)) ||
708 (p.nr_frags() > vmxnet3_max_xmit_segment_frags && qp.port().is_vmxnet3_device())) {
709 me(head)->recycle();
710 p.linearize();
711 ++qp._stats.tx.linearized;
712
713 goto build_mbuf_cluster;
714 }
715
716 me(last_seg)->set_packet(std::move(p));
717
718 return me(head);
719 }
720
721 /**
722 * Copy the contents of the "packet" into the given cluster of
723 * rte_mbuf's.
724 *
725 * @note Size of the cluster has to be big enough to accommodate all the
726 * contents of the given packet.
727 *
728 * @param p packet to copy
729 * @param head head of the rte_mbuf's cluster
730 */
731 static void copy_packet_to_cluster(const packet& p, rte_mbuf* head) {
732 rte_mbuf* cur_seg = head;
733 size_t cur_seg_offset = 0;
734 unsigned cur_frag_idx = 0;
735 size_t cur_frag_offset = 0;
736
737 while (true) {
738 size_t to_copy = std::min(p.frag(cur_frag_idx).size - cur_frag_offset,
739 inline_mbuf_data_size - cur_seg_offset);
740
741 memcpy(rte_pktmbuf_mtod_offset(cur_seg, void*, cur_seg_offset),
742 p.frag(cur_frag_idx).base + cur_frag_offset, to_copy);
743
744 cur_frag_offset += to_copy;
745 cur_seg_offset += to_copy;
746
747 if (cur_frag_offset >= p.frag(cur_frag_idx).size) {
748 ++cur_frag_idx;
749 if (cur_frag_idx >= p.nr_frags()) {
750 //
751 // We are done - set the data size of the last segment
752 // of the cluster.
753 //
754 cur_seg->data_len = cur_seg_offset;
755 break;
756 }
757
758 cur_frag_offset = 0;
759 }
760
761 if (cur_seg_offset >= inline_mbuf_data_size) {
762 cur_seg->data_len = inline_mbuf_data_size;
763 cur_seg = cur_seg->next;
764 cur_seg_offset = 0;
765
766 // FIXME: assert in a fast-path - remove!!!
767 assert(cur_seg);
768 }
769 }
770 }
771
772 /**
773 * Creates a tx_buf cluster representing a given packet in a "copy" way.
774 *
775 * @param p packet to translate
776 * @param qp dpdk_qp handle
777 *
778 * @return the HEAD tx_buf of the cluster or nullptr in case of a
779 * failure
780 */
781 static tx_buf* from_packet_copy(packet&& p, dpdk_qp& qp) {
782 // sanity
783 if (!p.len()) {
784 return nullptr;
785 }
786
787 /*
788 * Here we are going to use the fact that the inline data size is a
789 * power of two.
790 *
791 * We will first try to allocate the cluster and only if we are
792 * successful - we will go and copy the data.
793 */
794 auto aligned_len = align_up((size_t)p.len(), inline_mbuf_data_size);
795 unsigned nsegs = aligned_len / inline_mbuf_data_size;
796 rte_mbuf *head = nullptr, *last_seg = nullptr;
797
798 tx_buf* buf = qp.get_tx_buf();
799 if (!buf) {
800 return nullptr;
801 }
802
803 head = buf->rte_mbuf_p();
804 last_seg = head;
805 for (unsigned i = 1; i < nsegs; i++) {
806 buf = qp.get_tx_buf();
807 if (!buf) {
808 me(head)->recycle();
809 return nullptr;
810 }
811
812 last_seg->next = buf->rte_mbuf_p();
813 last_seg = last_seg->next;
814 }
815
816 //
817 // If we've got here means that we have succeeded already!
818 // We only need to copy the data and set the head buffer with the
819 // relevant info.
820 //
821 head->pkt_len = p.len();
822 head->nb_segs = nsegs;
823
824 copy_packet_to_cluster(p, head);
825 set_cluster_offload_info(p, qp, head);
826
827 return me(head);
828 }
829
830 /**
831 * Zero-copy handling of a single net::fragment.
832 *
833 * @param do_one_buf Functor responsible for a single rte_mbuf
834 * handling
835 * @param qp dpdk_qp handle (in)
836 * @param frag Fragment to copy (in)
837 * @param head Head of the cluster (out)
838 * @param last_seg Last segment of the cluster (out)
839 * @param nsegs Number of segments in the cluster (out)
840 *
841 * @return TRUE in case of success
842 */
843 template <class DoOneBufFunc>
844 static bool do_one_frag(DoOneBufFunc do_one_buf, dpdk_qp& qp,
845 fragment& frag, rte_mbuf*& head,
846 rte_mbuf*& last_seg, unsigned& nsegs) {
847 size_t len, left_to_set = frag.size;
848 char* base = frag.base;
849
850 rte_mbuf* m;
851
852 // TODO: assert() in a fast path! Remove me ASAP!
853 assert(frag.size);
854
855 // Create a HEAD of mbufs' cluster and set the first bytes into it
856 len = do_one_buf(qp, head, base, left_to_set);
857 if (!len) {
858 return false;
859 }
860
861 left_to_set -= len;
862 base += len;
863 nsegs = 1;
864
865 //
866 // Set the rest of the data into the new mbufs and chain them to
867 // the cluster.
868 //
869 rte_mbuf* prev_seg = head;
870 while (left_to_set) {
871 len = do_one_buf(qp, m, base, left_to_set);
872 if (!len) {
873 me(head)->recycle();
874 return false;
875 }
876
877 left_to_set -= len;
878 base += len;
879 nsegs++;
880
881 prev_seg->next = m;
882 prev_seg = m;
883 }
884
885 // Return the last mbuf in the cluster
886 last_seg = prev_seg;
887
888 return true;
889 }
890
891 /**
892 * Zero-copy handling of a single net::fragment.
893 *
894 * @param qp dpdk_qp handle (in)
895 * @param frag Fragment to copy (in)
896 * @param head Head of the cluster (out)
897 * @param last_seg Last segment of the cluster (out)
898 * @param nsegs Number of segments in the cluster (out)
899 *
900 * @return TRUE in case of success
901 */
902 static bool translate_one_frag(dpdk_qp& qp, fragment& frag,
903 rte_mbuf*& head, rte_mbuf*& last_seg,
904 unsigned& nsegs) {
905 return do_one_frag(set_one_data_buf, qp, frag, head,
906 last_seg, nsegs);
907 }
908
909 /**
910 * Copies one net::fragment into the cluster of rte_mbuf's.
911 *
912 * @param qp dpdk_qp handle (in)
913 * @param frag Fragment to copy (in)
914 * @param head Head of the cluster (out)
915 * @param last_seg Last segment of the cluster (out)
916 * @param nsegs Number of segments in the cluster (out)
917 *
918 * We return the "last_seg" to avoid traversing the cluster in order to get
919 * it.
920 *
921 * @return TRUE in case of success
922 */
923 static bool copy_one_frag(dpdk_qp& qp, fragment& frag,
924 rte_mbuf*& head, rte_mbuf*& last_seg,
925 unsigned& nsegs) {
926 return do_one_frag(copy_one_data_buf, qp, frag, head,
927 last_seg, nsegs);
928 }
929
930 /**
931 * Allocates a single rte_mbuf and sets it to point to a given data
932 * buffer.
933 *
934 * @param qp dpdk_qp handle (in)
935 * @param m New allocated rte_mbuf (out)
936 * @param va virtual address of a data buffer (in)
937 * @param buf_len length of the data to copy (in)
938 *
939 * @return The actual number of bytes that has been set in the mbuf
940 */
941 static size_t set_one_data_buf(
942 dpdk_qp& qp, rte_mbuf*& m, char* va, size_t buf_len) {
943 static constexpr size_t max_frag_len = 15 * 1024; // 15K
944
945 //
946 // Currently we break a buffer on a 15K boundary because 82599
947 // devices have a 15.5K limitation on a maximum single fragment
948 // size.
949 //
950 rte_iova_t iova = rte_mem_virt2iova(va);
951
952 if (iova == RTE_BAD_IOVA) {
953 return copy_one_data_buf(qp, m, va, buf_len);
954 }
955
956 tx_buf* buf = qp.get_tx_buf();
957 if (!buf) {
958 return 0;
959 }
960
961 size_t len = std::min(buf_len, max_frag_len);
962
963 buf->set_zc_info(va, iova, len);
964 m = buf->rte_mbuf_p();
965
966 return len;
967 }
968
969 /**
970 * Allocates a single rte_mbuf and copies a given data into it.
971 *
972 * @param qp dpdk_qp handle (in)
973 * @param m New allocated rte_mbuf (out)
974 * @param data Data to copy from (in)
975 * @param buf_len length of the data to copy (in)
976 *
977 * @return The actual number of bytes that has been copied
978 */
979 static size_t copy_one_data_buf(
980 dpdk_qp& qp, rte_mbuf*& m, char* data, size_t buf_len)
981 {
982 tx_buf* buf = qp.get_tx_buf();
983 if (!buf) {
984 return 0;
985 }
986
987 size_t len = std::min(buf_len, inline_mbuf_data_size);
988
989 m = buf->rte_mbuf_p();
990
991 // mbuf_put()
992 m->data_len = len;
993 m->pkt_len = len;
994
995 qp._stats.tx.good.update_copy_stats(1, len);
996
997 memcpy(rte_pktmbuf_mtod(m, void*), data, len);
998
999 return len;
1000 }
1001
1002 public:
1003 tx_buf(tx_buf_factory& fc) : _fc(fc) {
1004
1005 _buf_iova = _mbuf.buf_iova;
1006 _data_off = _mbuf.data_off;
1007 }
1008
1009 rte_mbuf* rte_mbuf_p() { return &_mbuf; }
1010
1011 void set_zc_info(void* va, rte_iova_t iova, size_t len) {
1012 // mbuf_put()
1013 _mbuf.data_len = len;
1014 _mbuf.pkt_len = len;
1015
1016 // Set the mbuf to point to our data
1017 _mbuf.buf_addr = va;
1018 _mbuf.buf_iova = iova;
1019 _mbuf.data_off = 0;
1020 _is_zc = true;
1021 }
1022
1023 void reset_zc() {
1024 //
1025 // If this mbuf was the last in a cluster and contains an
1026 // original packet object then call the destructor of the
1027 // original packet object.
1028 //
1029 if (_p) {
1030 //
1031 // Reset the std::optional. This in particular is going
1032 // to call the "packet"'s destructor and reset the
1033 // "optional" state to "nonengaged".
1034 //
1035 _p = std::nullopt;
1036
1037 } else if (!_is_zc) {
1038 return;
1039 }
1040
1041 // Restore the rte_mbuf fields we trashed in set_zc_info()
1042 _mbuf.buf_iova = _buf_iova;
1043 _mbuf.buf_addr = rte_mbuf_to_baddr(&_mbuf);
1044 _mbuf.data_off = _data_off;
1045
1046 _is_zc = false;
1047 }
1048
1049 void recycle() {
1050 struct rte_mbuf *m = &_mbuf, *m_next;
1051
1052 while (m != nullptr) {
1053 m_next = m->next;
1054 rte_pktmbuf_reset(m);
1055 _fc.put(me(m));
1056 m = m_next;
1057 }
1058 }
1059
1060 void set_packet(packet&& p) {
1061 _p = std::move(p);
1062 }
1063
1064 private:
1065 struct rte_mbuf _mbuf;
1066 MARKER private_start;
1067 std::optional<packet> _p;
1068 rte_iova_t _buf_iova;
1069 uint16_t _data_off;
1070 // TRUE if underlying mbuf has been used in the zero-copy flow
1071 bool _is_zc = false;
1072 // buffers' factory the buffer came from
1073 tx_buf_factory& _fc;
1074 MARKER private_end;
1075 };
1076
1077 class tx_buf_factory {
1078 //
1079 // Number of buffers to free in each GC iteration:
1080 // We want the buffers to be allocated from the mempool as many as
1081 // possible.
1082 //
1083 // On the other hand if there is no Tx for some time we want the
1084 // completions to be eventually handled. Thus we choose the smallest
1085 // possible packets count number here.
1086 //
1087 static constexpr int gc_count = 1;
1088 public:
1089 tx_buf_factory(uint16_t qid) {
1090 using namespace memory;
1091
1092 sstring name = sstring(pktmbuf_pool_name) + to_sstring(qid) + "_tx";
1093 printf("Creating Tx mbuf pool '%s' [%u mbufs] ...\n",
1094 name.c_str(), mbufs_per_queue_tx);
1095
1096 if (HugetlbfsMemBackend) {
1097 size_t xmem_size;
1098
1099 _xmem.reset(dpdk_qp::alloc_mempool_xmem(mbufs_per_queue_tx,
1100 inline_mbuf_size,
1101 xmem_size));
1102 if (!_xmem.get()) {
1103 printf("Can't allocate a memory for Tx buffers\n");
1104 exit(1);
1105 }
1106
1107 //
1108 // We are going to push the buffers from the mempool into
1109 // the circular_buffer and then poll them from there anyway, so
1110 // we prefer to make a mempool non-atomic in this case.
1111 //
1112 _pool =
1113 rte_mempool_create_empty(name.c_str(),
1114 mbufs_per_queue_tx,
1115 inline_mbuf_size,
1116 mbuf_cache_size,
1117 sizeof(struct rte_pktmbuf_pool_private),
1118 rte_socket_id(), 0);
1119 if (_pool) {
1120 rte_pktmbuf_pool_init(_pool, nullptr);
1121
1122 if (rte_mempool_populate_virt(_pool, (char*)(_xmem.get()),
1123 xmem_size, page_size,
1124 nullptr, nullptr) <= 0) {
1125 printf("Failed to populate mempool for Tx\n");
1126 exit(1);
1127 }
1128
1129 rte_mempool_obj_iter(_pool, rte_pktmbuf_init, nullptr);
1130 }
1131
1132 } else {
1133 _pool =
1134 rte_mempool_create(name.c_str(),
1135 mbufs_per_queue_tx, inline_mbuf_size,
1136 mbuf_cache_size,
1137 sizeof(struct rte_pktmbuf_pool_private),
1138 rte_pktmbuf_pool_init, nullptr,
1139 rte_pktmbuf_init, nullptr,
1140 rte_socket_id(), 0);
1141 }
1142
1143 if (!_pool) {
1144 printf("Failed to create mempool for Tx\n");
1145 exit(1);
1146 }
1147
1148 //
1149 // Fill the factory with the buffers from the mempool allocated
1150 // above.
1151 //
1152 init_factory();
1153 }
1154
1155 /**
1156 * @note Should not be called if there are no free tx_buf's
1157 *
1158 * @return a free tx_buf object
1159 */
1160 tx_buf* get() {
1161 // Take completed from the HW first
1162 tx_buf *pkt = get_one_completed();
1163 if (pkt) {
1164 if (HugetlbfsMemBackend) {
1165 pkt->reset_zc();
1166 }
1167
1168 return pkt;
1169 }
1170
1171 //
1172 // If there are no completed at the moment - take from the
1173 // factory's cache.
1174 //
1175 if (_ring.empty()) {
1176 return nullptr;
1177 }
1178
1179 pkt = _ring.back();
1180 _ring.pop_back();
1181
1182 return pkt;
1183 }
1184
1185 void put(tx_buf* buf) {
1186 if (HugetlbfsMemBackend) {
1187 buf->reset_zc();
1188 }
1189 _ring.push_back(buf);
1190 }
1191
1192 bool gc() {
1193 for (int cnt = 0; cnt < gc_count; ++cnt) {
1194 auto tx_buf_p = get_one_completed();
1195 if (!tx_buf_p) {
1196 return false;
1197 }
1198
1199 put(tx_buf_p);
1200 }
1201
1202 return true;
1203 }
1204 private:
1205 /**
1206 * Fill the mbufs circular buffer: after this the _pool will become
1207 * empty. We will use it to catch the completed buffers:
1208 *
1209 * - Underlying PMD drivers will "free" the mbufs once they are
1210 * completed.
1211 * - We will poll the _pktmbuf_pool_tx till it's empty and release
1212 * all the buffers from the freed mbufs.
1213 */
1214 void init_factory() {
1215 while (rte_mbuf* mbuf = rte_pktmbuf_alloc(_pool)) {
1216 _ring.push_back(new(tx_buf::me(mbuf)) tx_buf{*this});
1217 }
1218 }
1219
1220 /**
1221 * PMD puts the completed buffers back into the mempool they have
1222 * originally come from.
1223 *
1224 * @note rte_pktmbuf_alloc() resets the mbuf so there is no need to call
1225 * rte_pktmbuf_reset() here again.
1226 *
1227 * @return a single tx_buf that has been completed by HW.
1228 */
1229 tx_buf* get_one_completed() {
1230 return tx_buf::me(rte_pktmbuf_alloc(_pool));
1231 }
1232
1233 private:
1234 std::vector<tx_buf*> _ring;
1235 rte_mempool* _pool = nullptr;
1236 std::unique_ptr<void, free_deleter> _xmem;
1237 };
1238
1239 public:
1240 explicit dpdk_qp(dpdk_device* dev, uint16_t qid,
1241 const std::string stats_plugin_name);
1242
1243 virtual void rx_start() override;
1244 virtual future<> send(packet p) override {
1245 abort();
1246 }
1247 virtual ~dpdk_qp() {}
1248
1249 virtual uint32_t send(circular_buffer<packet>& pb) override {
1250 if (HugetlbfsMemBackend) {
1251 // Zero-copy send
1252 return _send(pb, [&] (packet&& p) {
1253 return tx_buf::from_packet_zc(std::move(p), *this);
1254 });
1255 } else {
1256 // "Copy"-send
1257 return _send(pb, [&](packet&& p) {
1258 return tx_buf::from_packet_copy(std::move(p), *this);
1259 });
1260 }
1261 }
1262
1263 dpdk_device& port() const { return *_dev; }
1264 tx_buf* get_tx_buf() { return _tx_buf_factory.get(); }
1265 private:
1266
1267 template <class Func>
1268 uint32_t _send(circular_buffer<packet>& pb, Func packet_to_tx_buf_p) {
1269 if (_tx_burst.size() == 0) {
1270 for (auto&& p : pb) {
1271 // TODO: assert() in a fast path! Remove me ASAP!
1272 assert(p.len());
1273
1274 tx_buf* buf = packet_to_tx_buf_p(std::move(p));
1275 if (!buf) {
1276 break;
1277 }
1278
1279 _tx_burst.push_back(buf->rte_mbuf_p());
1280 }
1281 }
1282
1283 uint16_t sent = rte_eth_tx_burst(_dev->port_idx(), _qid,
1284 _tx_burst.data() + _tx_burst_idx,
1285 _tx_burst.size() - _tx_burst_idx);
1286
1287 uint64_t nr_frags = 0, bytes = 0;
1288
1289 for (int i = 0; i < sent; i++) {
1290 rte_mbuf* m = _tx_burst[_tx_burst_idx + i];
1291 bytes += m->pkt_len;
1292 nr_frags += m->nb_segs;
1293 pb.pop_front();
1294 }
1295
1296 _stats.tx.good.update_frags_stats(nr_frags, bytes);
1297
1298 _tx_burst_idx += sent;
1299
1300 if (_tx_burst_idx == _tx_burst.size()) {
1301 _tx_burst_idx = 0;
1302 _tx_burst.clear();
1303 }
1304
1305 return sent;
1306 }
1307
1308 /**
1309 * Allocate a new data buffer and set the mbuf to point to it.
1310 *
1311 * Do some DPDK hacks to work on PMD: it assumes that the buf_addr
1312 * points to the private data of RTE_PKTMBUF_HEADROOM before the actual
1313 * data buffer.
1314 *
1315 * @param m mbuf to update
1316 */
1317 static bool refill_rx_mbuf(rte_mbuf* m, size_t size = mbuf_data_size) {
1318 char* data;
1319
1320 if (posix_memalign((void**)&data, size, size)) {
1321 return false;
1322 }
1323
1324 rte_iova_t iova = rte_mem_virt2iova(data);
1325
1326 //
1327 // Set the mbuf to point to our data.
1328 //
1329 // Do some DPDK hacks to work on PMD: it assumes that the buf_addr
1330 // points to the private data of RTE_PKTMBUF_HEADROOM before the
1331 // actual data buffer.
1332 //
1333 m->buf_addr = data - RTE_PKTMBUF_HEADROOM;
1334 m->buf_iova = iova - RTE_PKTMBUF_HEADROOM;
1335 return true;
1336 }
1337
1338 static bool init_noninline_rx_mbuf(rte_mbuf* m,
1339 size_t size = mbuf_data_size) {
1340 if (!refill_rx_mbuf(m, size)) {
1341 return false;
1342 }
1343 // The below fields stay constant during the execution.
1344 m->buf_len = size + RTE_PKTMBUF_HEADROOM;
1345 m->data_off = RTE_PKTMBUF_HEADROOM;
1346 return true;
1347 }
1348
1349 bool init_rx_mbuf_pool();
1350 bool map_dma();
1351 bool rx_gc();
1352 bool refill_one_cluster(rte_mbuf* head);
1353
1354 /**
1355 * Allocates a memory chunk to accommodate the given number of buffers of
1356 * the given size and fills a vector with underlying physical pages.
1357 *
1358 * The chunk is going to be used as an external memory buffer of the DPDK
1359 * memory pool.
1360 *
1361 * The chunk size if calculated using get_mempool_xmem_size() function.
1362 *
1363 * @param num_bufs Number of buffers (in)
1364 * @param buf_sz Size of each buffer (in)
1365 * @param xmem_size Size of allocated memory chunk (out)
1366 *
1367 * @return a virtual address of the allocated memory chunk or nullptr in
1368 * case of a failure.
1369 */
1370 static void* alloc_mempool_xmem(uint16_t num_bufs, uint16_t buf_sz,
1371 size_t& xmem_size);
1372
1373 /**
1374 * Polls for a burst of incoming packets. This function will not block and
1375 * will immediately return after processing all available packets.
1376 *
1377 */
1378 bool poll_rx_once();
1379
1380 /**
1381 * Translates an rte_mbuf's into net::packet and feeds them to _rx_stream.
1382 *
1383 * @param bufs An array of received rte_mbuf's
1384 * @param count Number of buffers in the bufs[]
1385 */
1386 void process_packets(struct rte_mbuf **bufs, uint16_t count);
1387
1388 /**
1389 * Translate rte_mbuf into the "packet".
1390 * @param m mbuf to translate
1391 *
1392 * @return a "optional" object representing the newly received data if in an
1393 * "engaged" state or an error if in a "disengaged" state.
1394 */
1395 std::optional<packet> from_mbuf(rte_mbuf* m);
1396
1397 /**
1398 * Transform an LRO rte_mbuf cluster into the "packet" object.
1399 * @param m HEAD of the mbufs' cluster to transform
1400 *
1401 * @return a "optional" object representing the newly received LRO packet if
1402 * in an "engaged" state or an error if in a "disengaged" state.
1403 */
1404 std::optional<packet> from_mbuf_lro(rte_mbuf* m);
1405
1406 private:
1407 dpdk_device* _dev;
1408 uint16_t _qid;
1409 rte_mempool *_pktmbuf_pool_rx;
1410 std::vector<rte_mbuf*> _rx_free_pkts;
1411 std::vector<rte_mbuf*> _rx_free_bufs;
1412 std::vector<fragment> _frags;
1413 std::vector<char*> _bufs;
1414 size_t _num_rx_free_segs = 0;
1415 reactor::poller _rx_gc_poller;
1416 std::unique_ptr<void, free_deleter> _rx_xmem;
1417 tx_buf_factory _tx_buf_factory;
1418 std::optional<reactor::poller> _rx_poller;
1419 reactor::poller _tx_gc_poller;
1420 std::vector<rte_mbuf*> _tx_burst;
1421 uint16_t _tx_burst_idx = 0;
1422 static constexpr phys_addr_t page_mask = ~(memory::page_size - 1);
1423 };
1424
1425 int dpdk_device::init_port_start()
1426 {
1427 assert(_port_idx < rte_eth_dev_count_avail());
1428
1429 rte_eth_dev_info_get(_port_idx, &_dev_info);
1430
1431 //
1432 // This is a workaround for a missing handling of a HW limitation in the
1433 // DPDK i40e driver. This and all related to _is_i40e_device code should be
1434 // removed once this handling is added.
1435 //
1436 if (sstring("rte_i40evf_pmd") == _dev_info.driver_name ||
1437 sstring("rte_i40e_pmd") == _dev_info.driver_name) {
1438 printf("Device is an Intel's 40G NIC. Enabling 8 fragments hack!\n");
1439 _is_i40e_device = true;
1440 }
1441
1442 if (std::string("rte_vmxnet3_pmd") == _dev_info.driver_name) {
1443 printf("Device is a VMWare Virtual NIC. Enabling 16 fragments hack!\n");
1444 _is_vmxnet3_device = true;
1445 }
1446
1447 //
1448 // Another workaround: this time for a lack of number of RSS bits.
1449 // ixgbe PF NICs support up to 16 RSS queues.
1450 // ixgbe VF NICs support up to 4 RSS queues.
1451 // i40e PF NICs support up to 64 RSS queues.
1452 // i40e VF NICs support up to 16 RSS queues.
1453 //
1454 if (sstring("rte_ixgbe_pmd") == _dev_info.driver_name) {
1455 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)16);
1456 } else if (sstring("rte_ixgbevf_pmd") == _dev_info.driver_name) {
1457 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)4);
1458 } else if (sstring("rte_i40e_pmd") == _dev_info.driver_name) {
1459 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)64);
1460 } else if (sstring("rte_i40evf_pmd") == _dev_info.driver_name) {
1461 _dev_info.max_rx_queues = std::min(_dev_info.max_rx_queues, (uint16_t)16);
1462 }
1463
1464 // Hardware offload capabilities
1465 // https://github.com/DPDK/dpdk/blob/v19.05/lib/librte_ethdev/rte_ethdev.h#L993-L1074
1466
1467 // We want to support all available offload features
1468 // TODO: below features are implemented in 17.05, should support new ones
1469 const uint64_t tx_offloads_wanted =
1470 DEV_TX_OFFLOAD_VLAN_INSERT |
1471 DEV_TX_OFFLOAD_IPV4_CKSUM |
1472 DEV_TX_OFFLOAD_UDP_CKSUM |
1473 DEV_TX_OFFLOAD_TCP_CKSUM |
1474 DEV_TX_OFFLOAD_SCTP_CKSUM |
1475 DEV_TX_OFFLOAD_TCP_TSO |
1476 DEV_TX_OFFLOAD_UDP_TSO |
1477 DEV_TX_OFFLOAD_OUTER_IPV4_CKSUM |
1478 DEV_TX_OFFLOAD_QINQ_INSERT |
1479 DEV_TX_OFFLOAD_VXLAN_TNL_TSO |
1480 DEV_TX_OFFLOAD_GRE_TNL_TSO |
1481 DEV_TX_OFFLOAD_IPIP_TNL_TSO |
1482 DEV_TX_OFFLOAD_GENEVE_TNL_TSO |
1483 DEV_TX_OFFLOAD_MACSEC_INSERT;
1484
1485 _dev_info.default_txconf.offloads =
1486 _dev_info.tx_offload_capa & tx_offloads_wanted;
1487
1488 /* for port configuration all features are off by default */
1489 rte_eth_conf port_conf = { 0 };
1490
1491 /* setting tx offloads for port */
1492 port_conf.txmode.offloads = _dev_info.default_txconf.offloads;
1493
1494 printf("Port %d: max_rx_queues %d max_tx_queues %d\n",
1495 _port_idx, _dev_info.max_rx_queues, _dev_info.max_tx_queues);
1496
1497 _num_queues = std::min({_num_queues, _dev_info.max_rx_queues, _dev_info.max_tx_queues});
1498
1499 printf("Port %d: using %d %s\n", _port_idx, _num_queues,
1500 (_num_queues > 1) ? "queues" : "queue");
1501
1502 // Set RSS mode: enable RSS if seastar is configured with more than 1 CPU.
1503 // Even if port has a single queue we still want the RSS feature to be
1504 // available in order to make HW calculate RSS hash for us.
1505 if (smp::count > 1) {
1506 if (_dev_info.hash_key_size == 40) {
1507 _rss_key = default_rsskey_40bytes;
1508 } else if (_dev_info.hash_key_size == 52) {
1509 _rss_key = default_rsskey_52bytes;
1510 } else if (_dev_info.hash_key_size != 0) {
1511 // WTF?!!
1512 rte_exit(EXIT_FAILURE,
1513 "Port %d: We support only 40 or 52 bytes RSS hash keys, %d bytes key requested",
1514 _port_idx, _dev_info.hash_key_size);
1515 } else {
1516 _rss_key = default_rsskey_40bytes;
1517 _dev_info.hash_key_size = 40;
1518 }
1519
1520 port_conf.rxmode.mq_mode = ETH_MQ_RX_RSS;
1521 /* enable all supported rss offloads */
1522 port_conf.rx_adv_conf.rss_conf.rss_hf = _dev_info.flow_type_rss_offloads;
1523 if (_dev_info.hash_key_size) {
1524 port_conf.rx_adv_conf.rss_conf.rss_key = const_cast<uint8_t *>(_rss_key.data());
1525 port_conf.rx_adv_conf.rss_conf.rss_key_len = _dev_info.hash_key_size;
1526 }
1527 } else {
1528 port_conf.rxmode.mq_mode = ETH_MQ_RX_NONE;
1529 }
1530
1531 if (_num_queues > 1) {
1532 if (_dev_info.reta_size) {
1533 // RETA size should be a power of 2
1534 assert((_dev_info.reta_size & (_dev_info.reta_size - 1)) == 0);
1535
1536 // Set the RSS table to the correct size
1537 _redir_table.resize(_dev_info.reta_size);
1538 _rss_table_bits = std::lround(std::log2(_dev_info.reta_size));
1539 printf("Port %d: RSS table size is %d\n",
1540 _port_idx, _dev_info.reta_size);
1541 } else {
1542 _rss_table_bits = std::lround(std::log2(_dev_info.max_rx_queues));
1543 }
1544 } else {
1545 _redir_table.push_back(0);
1546 }
1547
1548 // Set Rx VLAN stripping
1549 if (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_VLAN_STRIP) {
1550 port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_VLAN_STRIP;
1551 }
1552
1553 #ifdef RTE_ETHDEV_HAS_LRO_SUPPORT
1554 // Enable LRO
1555 if (_use_lro && (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_LRO)) {
1556 printf("LRO is on\n");
1557 port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_TCP_LRO;
1558 _hw_features.rx_lro = true;
1559 } else
1560 #endif
1561 printf("LRO is off\n");
1562
1563 // Check that all CSUM features are either all set all together or not set
1564 // all together. If this assumption breaks we need to rework the below logic
1565 // by splitting the csum offload feature bit into separate bits for IPv4,
1566 // TCP and UDP.
1567 assert(((_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
1568 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
1569 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) ||
1570 (!(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
1571 !(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
1572 !(_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)));
1573
1574 // Set Rx checksum checking
1575 if ( (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_IPV4_CKSUM) &&
1576 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_UDP_CKSUM) &&
1577 (_dev_info.rx_offload_capa & DEV_RX_OFFLOAD_TCP_CKSUM)) {
1578 printf("RX checksum offload supported\n");
1579 port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_CHECKSUM;
1580 _hw_features.rx_csum_offload = 1;
1581 }
1582
1583 if ((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_IPV4_CKSUM)) {
1584 printf("TX ip checksum offload supported\n");
1585 _hw_features.tx_csum_ip_offload = 1;
1586 }
1587
1588 // TSO is supported starting from DPDK v1.8
1589 if (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_TSO) {
1590 printf("TSO is supported\n");
1591 _hw_features.tx_tso = 1;
1592 }
1593
1594 // There is no UFO support in the PMDs yet.
1595 #if 0
1596 if (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_TSO) {
1597 printf("UFO is supported\n");
1598 _hw_features.tx_ufo = 1;
1599 }
1600 #endif
1601
1602 // Check that Tx TCP and UDP CSUM features are either all set all together
1603 // or not set all together. If this assumption breaks we need to rework the
1604 // below logic by splitting the csum offload feature bit into separate bits
1605 // for TCP and UDP.
1606 assert(((_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
1607 (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) ||
1608 (!(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
1609 !(_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)));
1610
1611 if ( (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_UDP_CKSUM) &&
1612 (_dev_info.tx_offload_capa & DEV_TX_OFFLOAD_TCP_CKSUM)) {
1613 printf("TX TCP&UDP checksum offload supported\n");
1614 _hw_features.tx_csum_l4_offload = 1;
1615 }
1616
1617 int retval;
1618
1619 printf("Port %u init ... ", _port_idx);
1620 fflush(stdout);
1621
1622 /*
1623 * Standard DPDK port initialisation - config port, then set up
1624 * rx and tx rings.
1625 */
1626 if ((retval = rte_eth_dev_configure(_port_idx, _num_queues, _num_queues,
1627 &port_conf)) != 0) {
1628 return retval;
1629 }
1630
1631 //rte_eth_promiscuous_enable(port_num);
1632 printf("done: \n");
1633
1634 return 0;
1635 }
1636
1637 void dpdk_device::set_hw_flow_control()
1638 {
1639 // Read the port's current/default flow control settings
1640 struct rte_eth_fc_conf fc_conf;
1641 auto ret = rte_eth_dev_flow_ctrl_get(_port_idx, &fc_conf);
1642
1643 if (ret == -ENOTSUP) {
1644 goto not_supported;
1645 }
1646
1647 if (ret < 0) {
1648 rte_exit(EXIT_FAILURE, "Port %u: failed to get hardware flow control settings: (error %d)\n", _port_idx, ret);
1649 }
1650
1651 if (_enable_fc) {
1652 fc_conf.mode = RTE_FC_FULL;
1653 } else {
1654 fc_conf.mode = RTE_FC_NONE;
1655 }
1656
1657 ret = rte_eth_dev_flow_ctrl_set(_port_idx, &fc_conf);
1658 if (ret == -ENOTSUP) {
1659 goto not_supported;
1660 }
1661
1662 if (ret < 0) {
1663 rte_exit(EXIT_FAILURE, "Port %u: failed to set hardware flow control (error %d)\n", _port_idx, ret);
1664 }
1665
1666 printf("Port %u: %s HW FC\n", _port_idx,
1667 (_enable_fc ? "Enabling" : "Disabling"));
1668 return;
1669
1670 not_supported:
1671 printf("Port %u: Changing HW FC settings is not supported\n", _port_idx);
1672 }
1673
1674 void dpdk_device::init_port_fini()
1675 {
1676 // Changing FC requires HW reset, so set it before the port is initialized.
1677 set_hw_flow_control();
1678
1679 if (rte_eth_dev_start(_port_idx) < 0) {
1680 rte_exit(EXIT_FAILURE, "Cannot start port %d\n", _port_idx);
1681 }
1682
1683 /* need to defer initialize xstats since NIC specific xstat entries
1684 show up only after port initization */
1685 _xstats.start();
1686
1687 _stats_collector.set_callback([&] {
1688 rte_eth_stats rte_stats = {};
1689 int rc = rte_eth_stats_get(_port_idx, &rte_stats);
1690
1691 if (rc) {
1692 printf("Failed to get port statistics: %s\n", strerror(rc));
1693 }
1694
1695 _stats.rx.good.mcast =
1696 _xstats.get_value(dpdk_xstats::xstat_id::rx_multicast_packets);
1697 _stats.rx.good.pause_xon =
1698 _xstats.get_value(dpdk_xstats::xstat_id::rx_xon_packets);
1699 _stats.rx.good.pause_xoff =
1700 _xstats.get_value(dpdk_xstats::xstat_id::rx_xoff_packets);
1701
1702 _stats.rx.bad.crc =
1703 _xstats.get_value(dpdk_xstats::xstat_id::rx_crc_errors);
1704 _stats.rx.bad.len =
1705 _xstats.get_value(dpdk_xstats::xstat_id::rx_length_errors) +
1706 _xstats.get_value(dpdk_xstats::xstat_id::rx_undersize_errors) +
1707 _xstats.get_value(dpdk_xstats::xstat_id::rx_oversize_errors);
1708 _stats.rx.bad.total = rte_stats.ierrors;
1709
1710 _stats.tx.good.pause_xon =
1711 _xstats.get_value(dpdk_xstats::xstat_id::tx_xon_packets);
1712 _stats.tx.good.pause_xoff =
1713 _xstats.get_value(dpdk_xstats::xstat_id::tx_xoff_packets);
1714
1715 _stats.tx.bad.total = rte_stats.oerrors;
1716 });
1717
1718 // TODO: replace deprecated filter api with generic flow api
1719 #pragma GCC diagnostic push
1720 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
1721 if (_num_queues > 1) {
1722 if (!rte_eth_dev_filter_supported(_port_idx, RTE_ETH_FILTER_HASH)) {
1723 printf("Port %d: HASH FILTER configuration is supported\n", _port_idx);
1724
1725 // Setup HW touse the TOEPLITZ hash function as an RSS hash function
1726 struct rte_eth_hash_filter_info info = {};
1727
1728 info.info_type = RTE_ETH_HASH_FILTER_GLOBAL_CONFIG;
1729 info.info.global_conf.hash_func = RTE_ETH_HASH_FUNCTION_TOEPLITZ;
1730
1731 if (rte_eth_dev_filter_ctrl(_port_idx, RTE_ETH_FILTER_HASH,
1732 RTE_ETH_FILTER_SET, &info) < 0) {
1733 rte_exit(EXIT_FAILURE, "Cannot set hash function on a port %d\n", _port_idx);
1734 }
1735 }
1736
1737 set_rss_table();
1738 }
1739 #pragma GCC diagnostic pop
1740
1741 // Wait for a link
1742 check_port_link_status();
1743
1744 printf("Created DPDK device\n");
1745 }
1746
1747 template <bool HugetlbfsMemBackend>
1748 void* dpdk_qp<HugetlbfsMemBackend>::alloc_mempool_xmem(
1749 uint16_t num_bufs, uint16_t buf_sz, size_t& xmem_size)
1750 {
1751 using namespace memory;
1752 char* xmem;
1753 struct rte_mempool_objsz mp_obj_sz = {};
1754
1755 rte_mempool_calc_obj_size(buf_sz, 0, &mp_obj_sz);
1756
1757 xmem_size =
1758 get_mempool_xmem_size(num_bufs,
1759 mp_obj_sz.elt_size + mp_obj_sz.header_size +
1760 mp_obj_sz.trailer_size,
1761 page_bits);
1762
1763 // Aligning to 2M causes the further failure in small allocations.
1764 // TODO: Check why - and fix.
1765 if (posix_memalign((void**)&xmem, page_size, xmem_size)) {
1766 printf("Can't allocate %ld bytes aligned to %ld\n",
1767 xmem_size, page_size);
1768 return nullptr;
1769 }
1770
1771 return xmem;
1772 }
1773
1774 template <bool HugetlbfsMemBackend>
1775 bool dpdk_qp<HugetlbfsMemBackend>::init_rx_mbuf_pool()
1776 {
1777 using namespace memory;
1778 sstring name = sstring(pktmbuf_pool_name) + to_sstring(_qid) + "_rx";
1779
1780 printf("Creating Rx mbuf pool '%s' [%u mbufs] ...\n",
1781 name.c_str(), mbufs_per_queue_rx);
1782
1783 //
1784 // If we have a hugetlbfs memory backend we may perform a virt2phys
1785 // translation and memory is "pinned". Therefore we may provide an external
1786 // memory for DPDK pools and this way significantly reduce the memory needed
1787 // for the DPDK in this case.
1788 //
1789 if (HugetlbfsMemBackend) {
1790 size_t xmem_size;
1791
1792 _rx_xmem.reset(alloc_mempool_xmem(mbufs_per_queue_rx, mbuf_overhead,
1793 xmem_size));
1794 if (!_rx_xmem.get()) {
1795 printf("Can't allocate a memory for Rx buffers\n");
1796 return false;
1797 }
1798
1799 //
1800 // Don't pass single-producer/single-consumer flags to mbuf create as it
1801 // seems faster to use a cache instead.
1802 //
1803 struct rte_pktmbuf_pool_private roomsz = {};
1804 roomsz.mbuf_data_room_size = mbuf_data_size + RTE_PKTMBUF_HEADROOM;
1805 _pktmbuf_pool_rx =
1806 rte_mempool_create_empty(name.c_str(),
1807 mbufs_per_queue_rx, mbuf_overhead,
1808 mbuf_cache_size,
1809 sizeof(struct rte_pktmbuf_pool_private),
1810 rte_socket_id(), 0);
1811 if (!_pktmbuf_pool_rx) {
1812 printf("Failed to create mempool for Rx\n");
1813 exit(1);
1814 }
1815
1816 rte_pktmbuf_pool_init(_pktmbuf_pool_rx, as_cookie(roomsz));
1817
1818 if (rte_mempool_populate_virt(_pktmbuf_pool_rx,
1819 (char*)(_rx_xmem.get()), xmem_size,
1820 page_size,
1821 nullptr, nullptr) < 0) {
1822 printf("Failed to populate mempool for Rx\n");
1823 exit(1);
1824 }
1825
1826 rte_mempool_obj_iter(_pktmbuf_pool_rx, rte_pktmbuf_init, nullptr);
1827
1828 // reserve the memory for Rx buffers containers
1829 _rx_free_pkts.reserve(mbufs_per_queue_rx);
1830 _rx_free_bufs.reserve(mbufs_per_queue_rx);
1831
1832 //
1833 // 1) Pull all entries from the pool.
1834 // 2) Bind data buffers to each of them.
1835 // 3) Return them back to the pool.
1836 //
1837 for (int i = 0; i < mbufs_per_queue_rx; i++) {
1838 rte_mbuf* m = rte_pktmbuf_alloc(_pktmbuf_pool_rx);
1839 assert(m);
1840 _rx_free_bufs.push_back(m);
1841 }
1842
1843 for (auto&& m : _rx_free_bufs) {
1844 if (!init_noninline_rx_mbuf(m)) {
1845 printf("Failed to allocate data buffers for Rx ring. "
1846 "Consider increasing the amount of memory.\n");
1847 exit(1);
1848 }
1849 }
1850
1851 rte_mempool_put_bulk(_pktmbuf_pool_rx, (void**)_rx_free_bufs.data(),
1852 _rx_free_bufs.size());
1853
1854 _rx_free_bufs.clear();
1855 } else {
1856 struct rte_pktmbuf_pool_private roomsz = {};
1857 roomsz.mbuf_data_room_size = inline_mbuf_data_size + RTE_PKTMBUF_HEADROOM;
1858 _pktmbuf_pool_rx =
1859 rte_mempool_create(name.c_str(),
1860 mbufs_per_queue_rx, inline_mbuf_size,
1861 mbuf_cache_size,
1862 sizeof(struct rte_pktmbuf_pool_private),
1863 rte_pktmbuf_pool_init, as_cookie(roomsz),
1864 rte_pktmbuf_init, nullptr,
1865 rte_socket_id(), 0);
1866 }
1867
1868 return _pktmbuf_pool_rx != nullptr;
1869 }
1870
1871 // Map DMA address explicitly.
1872 // XXX: does NOT work with Mellanox NICs as they use IB libs instead of VFIO.
1873 template <bool HugetlbfsMemBackend>
1874 bool dpdk_qp<HugetlbfsMemBackend>::map_dma()
1875 {
1876 auto m = memory::get_memory_layout();
1877 rte_iova_t iova = rte_mem_virt2iova((const void*)m.start);
1878
1879 return rte_vfio_dma_map(m.start, iova, m.end - m.start) == 0;
1880 }
1881
1882 void dpdk_device::check_port_link_status()
1883 {
1884 using namespace std::literals::chrono_literals;
1885 int count = 0;
1886 constexpr auto check_interval = 100ms;
1887
1888 std::cout << "\nChecking link status " << std::endl;
1889 auto t = new timer<>;
1890 t->set_callback([this, count, t] () mutable {
1891 const int max_check_time = 90; /* 9s (90 * 100ms) in total */
1892 struct rte_eth_link link;
1893 memset(&link, 0, sizeof(link));
1894 rte_eth_link_get_nowait(_port_idx, &link);
1895
1896 if (link.link_status) {
1897 std::cout <<
1898 "done\nPort " << static_cast<unsigned>(_port_idx) <<
1899 " Link Up - speed " << link.link_speed <<
1900 " Mbps - " << ((link.link_duplex == ETH_LINK_FULL_DUPLEX) ?
1901 ("full-duplex") : ("half-duplex\n")) <<
1902 std::endl;
1903 _link_ready_promise.set_value();
1904
1905 // We may start collecting statistics only after the Link is UP.
1906 _stats_collector.arm_periodic(2s);
1907 } else if (count++ < max_check_time) {
1908 std::cout << "." << std::flush;
1909 return;
1910 } else {
1911 std::cout << "done\nPort " << _port_idx << " Link Down" << std::endl;
1912 }
1913 t->cancel();
1914 delete t;
1915 });
1916 t->arm_periodic(check_interval);
1917 }
1918
1919 // This function uses offsetof with non POD types.
1920 #pragma GCC diagnostic push
1921 #pragma GCC diagnostic ignored "-Winvalid-offsetof"
1922
1923 template <bool HugetlbfsMemBackend>
1924 dpdk_qp<HugetlbfsMemBackend>::dpdk_qp(dpdk_device* dev, uint16_t qid,
1925 const std::string stats_plugin_name)
1926 : qp(true, stats_plugin_name, qid), _dev(dev), _qid(qid),
1927 _rx_gc_poller(reactor::poller::simple([&] { return rx_gc(); })),
1928 _tx_buf_factory(qid),
1929 _tx_gc_poller(reactor::poller::simple([&] { return _tx_buf_factory.gc(); }))
1930 {
1931 if (!init_rx_mbuf_pool()) {
1932 rte_exit(EXIT_FAILURE, "Cannot initialize mbuf pools\n");
1933 }
1934
1935 if (HugetlbfsMemBackend && !map_dma()) {
1936 rte_exit(EXIT_FAILURE, "Cannot map DMA\n");
1937 }
1938
1939 static_assert(offsetof(class tx_buf, private_end) -
1940 offsetof(class tx_buf, private_start) <= RTE_PKTMBUF_HEADROOM,
1941 "RTE_PKTMBUF_HEADROOM is less than dpdk_qp::tx_buf size! "
1942 "Increase the headroom size in the DPDK configuration");
1943 static_assert(offsetof(class tx_buf, _mbuf) == 0,
1944 "There is a pad at the beginning of the tx_buf before _mbuf "
1945 "field!");
1946 static_assert((inline_mbuf_data_size & (inline_mbuf_data_size - 1)) == 0,
1947 "inline_mbuf_data_size has to be a power of two!");
1948
1949 if (rte_eth_rx_queue_setup(_dev->port_idx(), _qid, default_ring_size,
1950 rte_eth_dev_socket_id(_dev->port_idx()),
1951 _dev->def_rx_conf(), _pktmbuf_pool_rx) < 0) {
1952 rte_exit(EXIT_FAILURE, "Cannot initialize rx queue\n");
1953 }
1954
1955 if (rte_eth_tx_queue_setup(_dev->port_idx(), _qid, default_ring_size,
1956 rte_eth_dev_socket_id(_dev->port_idx()), _dev->def_tx_conf()) < 0) {
1957 rte_exit(EXIT_FAILURE, "Cannot initialize tx queue\n");
1958 }
1959
1960 // Register error statistics: Rx total and checksum errors
1961 namespace sm = seastar::metrics;
1962 _metrics.add_group(_stats_plugin_name, {
1963 sm::make_derive(_queue_name + "_rx_csum_errors", _stats.rx.bad.csum,
1964 sm::description("Counts a number of packets received by this queue that have a bad CSUM value. "
1965 "A non-zero value of this metric usually indicates a HW issue, e.g. a bad cable.")),
1966
1967 sm::make_derive(_queue_name + "_rx_errors", _stats.rx.bad.total,
1968 sm::description("Counts a total number of errors in the ingress path for this queue: CSUM errors, etc.")),
1969
1970 sm::make_derive(_queue_name + "_rx_no_memory_errors", _stats.rx.bad.no_mem,
1971 sm::description("Counts a number of ingress packets received by this HW queue but dropped by the SW due to low memory. "
1972 "A non-zero value indicates that seastar doesn't have enough memory to handle the packet reception or the memory is too fragmented.")),
1973 });
1974 }
1975
1976 #pragma GCC diagnostic pop
1977
1978 template <bool HugetlbfsMemBackend>
1979 void dpdk_qp<HugetlbfsMemBackend>::rx_start() {
1980 _rx_poller = reactor::poller::simple([&] { return poll_rx_once(); });
1981 }
1982
1983 template<>
1984 inline std::optional<packet>
1985 dpdk_qp<false>::from_mbuf_lro(rte_mbuf* m)
1986 {
1987 //
1988 // Try to allocate a buffer for the whole packet's data.
1989 // If we fail - construct the packet from mbufs.
1990 // If we succeed - copy the data into this buffer, create a packet based on
1991 // this buffer and return the mbuf to its pool.
1992 //
1993 auto pkt_len = rte_pktmbuf_pkt_len(m);
1994 char* buf = (char*)malloc(pkt_len);
1995 if (buf) {
1996 // Copy the contents of the packet into the buffer we've just allocated
1997 size_t offset = 0;
1998 for (rte_mbuf* m1 = m; m1 != nullptr; m1 = m1->next) {
1999 char* data = rte_pktmbuf_mtod(m1, char*);
2000 auto len = rte_pktmbuf_data_len(m1);
2001
2002 rte_memcpy(buf + offset, data, len);
2003 offset += len;
2004 }
2005
2006 rte_pktmbuf_free(m);
2007
2008 return packet(fragment{buf, pkt_len}, make_free_deleter(buf));
2009 }
2010
2011 // Drop if allocation failed
2012 rte_pktmbuf_free(m);
2013
2014 return std::nullopt;
2015 }
2016
2017 template<>
2018 inline std::optional<packet>
2019 dpdk_qp<false>::from_mbuf(rte_mbuf* m)
2020 {
2021 if (!_dev->hw_features_ref().rx_lro || rte_pktmbuf_is_contiguous(m)) {
2022 //
2023 // Try to allocate a buffer for packet's data. If we fail - give the
2024 // application an mbuf itself. If we succeed - copy the data into this
2025 // buffer, create a packet based on this buffer and return the mbuf to
2026 // its pool.
2027 //
2028 auto len = rte_pktmbuf_data_len(m);
2029 char* buf = (char*)malloc(len);
2030
2031 if (!buf) {
2032 // Drop if allocation failed
2033 rte_pktmbuf_free(m);
2034
2035 return std::nullopt;
2036 } else {
2037 rte_memcpy(buf, rte_pktmbuf_mtod(m, char*), len);
2038 rte_pktmbuf_free(m);
2039
2040 return packet(fragment{buf, len}, make_free_deleter(buf));
2041 }
2042 } else {
2043 return from_mbuf_lro(m);
2044 }
2045 }
2046
2047 template<>
2048 inline std::optional<packet>
2049 dpdk_qp<true>::from_mbuf_lro(rte_mbuf* m)
2050 {
2051 _frags.clear();
2052 _bufs.clear();
2053
2054 for (; m != nullptr; m = m->next) {
2055 char* data = rte_pktmbuf_mtod(m, char*);
2056
2057 _frags.emplace_back(fragment{data, rte_pktmbuf_data_len(m)});
2058 _bufs.push_back(data);
2059 }
2060
2061 return packet(_frags.begin(), _frags.end(),
2062 make_deleter(deleter(),
2063 [bufs_vec = std::move(_bufs)] {
2064 for (auto&& b : bufs_vec) {
2065 free(b);
2066 }
2067 }));
2068 }
2069
2070 template<>
2071 inline std::optional<packet> dpdk_qp<true>::from_mbuf(rte_mbuf* m)
2072 {
2073 _rx_free_pkts.push_back(m);
2074 _num_rx_free_segs += m->nb_segs;
2075
2076 if (!_dev->hw_features_ref().rx_lro || rte_pktmbuf_is_contiguous(m)) {
2077 char* data = rte_pktmbuf_mtod(m, char*);
2078
2079 return packet(fragment{data, rte_pktmbuf_data_len(m)},
2080 make_free_deleter(data));
2081 } else {
2082 return from_mbuf_lro(m);
2083 }
2084 }
2085
2086 template <bool HugetlbfsMemBackend>
2087 inline bool dpdk_qp<HugetlbfsMemBackend>::refill_one_cluster(rte_mbuf* head)
2088 {
2089 for (; head != nullptr; head = head->next) {
2090 if (!refill_rx_mbuf(head)) {
2091 //
2092 // If we failed to allocate a new buffer - push the rest of the
2093 // cluster back to the free_packets list for a later retry.
2094 //
2095 _rx_free_pkts.push_back(head);
2096 return false;
2097 }
2098 _rx_free_bufs.push_back(head);
2099 }
2100
2101 return true;
2102 }
2103
2104 template <bool HugetlbfsMemBackend>
2105 bool dpdk_qp<HugetlbfsMemBackend>::rx_gc()
2106 {
2107 if (_num_rx_free_segs >= rx_gc_thresh) {
2108 while (!_rx_free_pkts.empty()) {
2109 //
2110 // Use back() + pop_back() semantics to avoid an extra
2111 // _rx_free_pkts.clear() at the end of the function - clear() has a
2112 // linear complexity.
2113 //
2114 auto m = _rx_free_pkts.back();
2115 _rx_free_pkts.pop_back();
2116
2117 if (!refill_one_cluster(m)) {
2118 break;
2119 }
2120 }
2121
2122 if (_rx_free_bufs.size()) {
2123 rte_mempool_put_bulk(_pktmbuf_pool_rx,
2124 (void **)_rx_free_bufs.data(),
2125 _rx_free_bufs.size());
2126
2127 // TODO: assert() in a fast path! Remove me ASAP!
2128 assert(_num_rx_free_segs >= _rx_free_bufs.size());
2129
2130 _num_rx_free_segs -= _rx_free_bufs.size();
2131 _rx_free_bufs.clear();
2132
2133 // TODO: assert() in a fast path! Remove me ASAP!
2134 assert((_rx_free_pkts.empty() && !_num_rx_free_segs) ||
2135 (!_rx_free_pkts.empty() && _num_rx_free_segs));
2136 }
2137 }
2138
2139 return _num_rx_free_segs >= rx_gc_thresh;
2140 }
2141
2142
2143 template <bool HugetlbfsMemBackend>
2144 void dpdk_qp<HugetlbfsMemBackend>::process_packets(
2145 struct rte_mbuf **bufs, uint16_t count)
2146 {
2147 uint64_t nr_frags = 0, bytes = 0;
2148
2149 for (uint16_t i = 0; i < count; i++) {
2150 struct rte_mbuf *m = bufs[i];
2151 offload_info oi;
2152
2153 std::optional<packet> p = from_mbuf(m);
2154
2155 // Drop the packet if translation above has failed
2156 if (!p) {
2157 _stats.rx.bad.inc_no_mem();
2158 continue;
2159 }
2160
2161 nr_frags += m->nb_segs;
2162 bytes += m->pkt_len;
2163
2164 // Set stripped VLAN value if available
2165 if ((m->ol_flags & PKT_RX_VLAN_STRIPPED) &&
2166 (m->ol_flags & PKT_RX_VLAN)) {
2167
2168 oi.vlan_tci = m->vlan_tci;
2169 }
2170
2171 if (_dev->hw_features().rx_csum_offload) {
2172 if (m->ol_flags & (PKT_RX_IP_CKSUM_BAD | PKT_RX_L4_CKSUM_BAD)) {
2173 // Packet with bad checksum, just drop it.
2174 _stats.rx.bad.inc_csum_err();
2175 continue;
2176 }
2177 // Note that when _hw_features.rx_csum_offload is on, the receive
2178 // code for ip, tcp and udp will assume they don't need to check
2179 // the checksum again, because we did this here.
2180 }
2181
2182 (*p).set_offload_info(oi);
2183 if (m->ol_flags & PKT_RX_RSS_HASH) {
2184 (*p).set_rss_hash(m->hash.rss);
2185 }
2186
2187 _dev->l2receive(std::move(*p));
2188 }
2189
2190 _stats.rx.good.update_pkts_bunch(count);
2191 _stats.rx.good.update_frags_stats(nr_frags, bytes);
2192
2193 if (!HugetlbfsMemBackend) {
2194 _stats.rx.good.copy_frags = _stats.rx.good.nr_frags;
2195 _stats.rx.good.copy_bytes = _stats.rx.good.bytes;
2196 }
2197 }
2198
2199 template <bool HugetlbfsMemBackend>
2200 bool dpdk_qp<HugetlbfsMemBackend>::poll_rx_once()
2201 {
2202 struct rte_mbuf *buf[packet_read_size];
2203
2204 /* read a port */
2205 uint16_t rx_count = rte_eth_rx_burst(_dev->port_idx(), _qid,
2206 buf, packet_read_size);
2207
2208 /* Now process the NIC packets read */
2209 if (likely(rx_count > 0)) {
2210 process_packets(buf, rx_count);
2211 }
2212
2213 return rx_count;
2214 }
2215
2216 void dpdk_device::set_rss_table()
2217 {
2218 if (_dev_info.reta_size == 0)
2219 return;
2220
2221 int reta_conf_size =
2222 std::max(1, _dev_info.reta_size / RTE_RETA_GROUP_SIZE);
2223 std::vector<rte_eth_rss_reta_entry64> reta_conf(reta_conf_size);
2224
2225 // Configure the HW indirection table
2226 unsigned i = 0;
2227 for (auto& x : reta_conf) {
2228 x.mask = ~0ULL;
2229 for (auto& r: x.reta) {
2230 r = i++ % _num_queues;
2231 }
2232 }
2233
2234 if (rte_eth_dev_rss_reta_update(_port_idx, reta_conf.data(), _dev_info.reta_size)) {
2235 rte_exit(EXIT_FAILURE, "Port %d: Failed to update an RSS indirection table", _port_idx);
2236 }
2237
2238 // Fill our local indirection table. Make it in a separate loop to keep things simple.
2239 i = 0;
2240 for (auto& r : _redir_table) {
2241 r = i++ % _num_queues;
2242 }
2243 }
2244
2245 std::unique_ptr<qp> dpdk_device::init_local_queue(boost::program_options::variables_map opts, uint16_t qid) {
2246
2247 std::unique_ptr<qp> qp;
2248 if (opts.count("hugepages")) {
2249 qp = std::make_unique<dpdk_qp<true>>(this, qid,
2250 _stats_plugin_name + "-" + _stats_plugin_inst);
2251 } else {
2252 qp = std::make_unique<dpdk_qp<false>>(this, qid,
2253 _stats_plugin_name + "-" + _stats_plugin_inst);
2254 }
2255
2256 // FIXME: future is discarded
2257 (void)smp::submit_to(_home_cpu, [this] () mutable {
2258 if (++_queues_ready == _num_queues) {
2259 init_port_fini();
2260 }
2261 });
2262 return qp;
2263 }
2264 } // namespace dpdk
2265
2266 /******************************** Interface functions *************************/
2267
2268 std::unique_ptr<net::device> create_dpdk_net_device(
2269 uint16_t port_idx,
2270 uint16_t num_queues,
2271 bool use_lro,
2272 bool enable_fc)
2273 {
2274 static bool called = false;
2275
2276 assert(!called);
2277 assert(dpdk::eal::initialized);
2278
2279 called = true;
2280
2281 // Check that we have at least one DPDK-able port
2282 if (rte_eth_dev_count_avail() == 0) {
2283 rte_exit(EXIT_FAILURE, "No Ethernet ports - bye\n");
2284 } else {
2285 printf("ports number: %d\n", rte_eth_dev_count_avail());
2286 }
2287
2288 return std::make_unique<dpdk::dpdk_device>(port_idx, num_queues, use_lro,
2289 enable_fc);
2290 }
2291
2292 std::unique_ptr<net::device> create_dpdk_net_device(
2293 const hw_config& hw_cfg)
2294 {
2295 return create_dpdk_net_device(*hw_cfg.port_index, smp::count, hw_cfg.lro, hw_cfg.hw_fc);
2296 }
2297
2298
2299 boost::program_options::options_description
2300 get_dpdk_net_options_description()
2301 {
2302 boost::program_options::options_description opts(
2303 "DPDK net options");
2304
2305 opts.add_options()
2306 ("dpdk-port-index",
2307 boost::program_options::value<unsigned>()->default_value(0),
2308 "DPDK Port Index");
2309
2310 opts.add_options()
2311 ("hw-fc",
2312 boost::program_options::value<std::string>()->default_value("on"),
2313 "Enable HW Flow Control (on / off)");
2314 #if 0
2315 opts.add_options()
2316 ("csum-offload",
2317 boost::program_options::value<std::string>()->default_value("on"),
2318 "Enable checksum offload feature (on / off)")
2319 ("tso",
2320 boost::program_options::value<std::string>()->default_value("on"),
2321 "Enable TCP segment offload feature (on / off)")
2322 ("ufo",
2323 boost::program_options::value<std::string>()->default_value("on"),
2324 "Enable UDP fragmentation offload feature (on / off)")
2325 ;
2326 #endif
2327 return opts;
2328 }
2329
2330 }
2331
2332 #endif // SEASTAR_HAVE_DPDK