]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/dpdk/DPDK.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / async / dpdk / DPDK.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2/*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19/*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 */
7c673cae
FG
22
23#ifndef CEPH_DPDK_DEV_H
24#define CEPH_DPDK_DEV_H
25
7c673cae 26#include <functional>
20effc67
TL
27#include <memory>
28#include <optional>
7c673cae
FG
29#include <rte_config.h>
30#include <rte_common.h>
31#include <rte_ethdev.h>
20effc67 32#include <rte_ether.h>
7c673cae
FG
33#include <rte_malloc.h>
34#include <rte_version.h>
35
36#include "include/page.h"
7c673cae 37#include "common/perf_counters.h"
20effc67 38#include "common/admin_socket.h"
7c673cae
FG
39#include "msg/async/Event.h"
40#include "const.h"
41#include "circular_buffer.h"
42#include "ethernet.h"
7c673cae
FG
43#include "Packet.h"
44#include "stream.h"
45#include "net.h"
46#include "toeplitz.h"
47
7c673cae
FG
48struct free_deleter {
49 void operator()(void* p) { ::free(p); }
50};
51
52
53enum {
54 l_dpdk_dev_first = 58800,
55 l_dpdk_dev_rx_mcast,
56 l_dpdk_dev_rx_total_errors,
57 l_dpdk_dev_tx_total_errors,
58 l_dpdk_dev_rx_badcrc_errors,
59 l_dpdk_dev_rx_dropped_errors,
60 l_dpdk_dev_rx_nombuf_errors,
61 l_dpdk_dev_last
62};
63
64enum {
65 l_dpdk_qp_first = 58900,
66 l_dpdk_qp_rx_packets,
67 l_dpdk_qp_tx_packets,
68 l_dpdk_qp_rx_bad_checksum_errors,
69 l_dpdk_qp_rx_no_memory_errors,
70 l_dpdk_qp_rx_bytes,
71 l_dpdk_qp_tx_bytes,
72 l_dpdk_qp_rx_last_bunch,
73 l_dpdk_qp_tx_last_bunch,
74 l_dpdk_qp_rx_fragments,
75 l_dpdk_qp_tx_fragments,
76 l_dpdk_qp_rx_copy_ops,
77 l_dpdk_qp_tx_copy_ops,
78 l_dpdk_qp_rx_copy_bytes,
79 l_dpdk_qp_tx_copy_bytes,
80 l_dpdk_qp_rx_linearize_ops,
81 l_dpdk_qp_tx_linearize_ops,
82 l_dpdk_qp_tx_queue_length,
83 l_dpdk_qp_last
84};
85
86class DPDKDevice;
87class DPDKWorker;
88
20effc67
TL
89
90#ifndef MARKER
91typedef void *MARKER[0]; /**< generic marker for a point in a structure */
92#endif
93
7c673cae 94class DPDKQueuePair {
20effc67 95 using packet_provider_type = std::function<std::optional<Packet> ()>;
7c673cae
FG
96 public:
97 void configure_proxies(const std::map<unsigned, float>& cpu_weights);
98 // build REdirection TAble for cpu_weights map: target cpu -> weight
99 void build_sw_reta(const std::map<unsigned, float>& cpu_weights);
100 void proxy_send(Packet p) {
101 _proxy_packetq.push_back(std::move(p));
102 }
103 void register_packet_provider(packet_provider_type func) {
104 _pkt_providers.push_back(std::move(func));
105 }
106 bool poll_tx();
107 friend class DPDKDevice;
108
109 class tx_buf_factory;
110
111 class tx_buf {
112 friend class DPDKQueuePair;
113 public:
114 static tx_buf* me(rte_mbuf* mbuf) {
115 return reinterpret_cast<tx_buf*>(mbuf);
116 }
117
118 private:
119 /**
120 * Checks if the original packet of a given cluster should be linearized
121 * due to HW limitations.
122 *
123 * @param head head of a cluster to check
124 *
125 * @return TRUE if a packet should be linearized.
126 */
127 static bool i40e_should_linearize(rte_mbuf *head);
128
129 /**
130 * Sets the offload info in the head buffer of an rte_mbufs cluster.
131 *
132 * @param p an original packet the cluster is built for
133 * @param qp QP handle
134 * @param head a head of an rte_mbufs cluster
135 */
136 static void set_cluster_offload_info(const Packet& p, const DPDKQueuePair& qp, rte_mbuf* head);
137
138 /**
139 * Creates a tx_buf cluster representing a given packet in a "zero-copy"
140 * way.
141 *
142 * @param p packet to translate
143 * @param qp DPDKQueuePair handle
144 *
145 * @return the HEAD tx_buf of the cluster or nullptr in case of a
146 * failure
147 */
148 static tx_buf* from_packet_zc(
149 CephContext *cct, Packet&& p, DPDKQueuePair& qp);
150
151 /**
152 * Copy the contents of the "packet" into the given cluster of
153 * rte_mbuf's.
154 *
155 * @note Size of the cluster has to be big enough to accommodate all the
156 * contents of the given packet.
157 *
158 * @param p packet to copy
159 * @param head head of the rte_mbuf's cluster
160 */
161 static void copy_packet_to_cluster(const Packet& p, rte_mbuf* head);
162
163 /**
164 * Creates a tx_buf cluster representing a given packet in a "copy" way.
165 *
166 * @param p packet to translate
167 * @param qp DPDKQueuePair handle
168 *
169 * @return the HEAD tx_buf of the cluster or nullptr in case of a
170 * failure
171 */
172 static tx_buf* from_packet_copy(Packet&& p, DPDKQueuePair& qp);
173
174 /**
175 * Zero-copy handling of a single fragment.
176 *
177 * @param do_one_buf Functor responsible for a single rte_mbuf
178 * handling
179 * @param qp DPDKQueuePair handle (in)
180 * @param frag Fragment to copy (in)
181 * @param head Head of the cluster (out)
182 * @param last_seg Last segment of the cluster (out)
183 * @param nsegs Number of segments in the cluster (out)
184 *
185 * @return TRUE in case of success
186 */
187 template <class DoOneBufFunc>
188 static bool do_one_frag(DoOneBufFunc do_one_buf, DPDKQueuePair& qp,
189 fragment& frag, rte_mbuf*& head,
190 rte_mbuf*& last_seg, unsigned& nsegs) {
191 size_t len, left_to_set = frag.size;
192 char* base = frag.base;
193
194 rte_mbuf* m;
195
11fdf7f2
TL
196 // TODO: ceph_assert() in a fast path! Remove me ASAP!
197 ceph_assert(frag.size);
7c673cae
FG
198
199 // Create a HEAD of mbufs' cluster and set the first bytes into it
200 len = do_one_buf(qp, head, base, left_to_set);
201 if (!len) {
202 return false;
203 }
204
205 left_to_set -= len;
206 base += len;
207 nsegs = 1;
208
209 //
210 // Set the rest of the data into the new mbufs and chain them to
211 // the cluster.
212 //
213 rte_mbuf* prev_seg = head;
214 while (left_to_set) {
215 len = do_one_buf(qp, m, base, left_to_set);
216 if (!len) {
217 me(head)->recycle();
218 return false;
219 }
220
221 left_to_set -= len;
222 base += len;
223 nsegs++;
224
225 prev_seg->next = m;
226 prev_seg = m;
227 }
228
229 // Return the last mbuf in the cluster
230 last_seg = prev_seg;
231
232 return true;
233 }
234
235 /**
236 * Zero-copy handling of a single fragment.
237 *
238 * @param qp DPDKQueuePair handle (in)
239 * @param frag Fragment to copy (in)
240 * @param head Head of the cluster (out)
241 * @param last_seg Last segment of the cluster (out)
242 * @param nsegs Number of segments in the cluster (out)
243 *
244 * @return TRUE in case of success
245 */
246 static bool translate_one_frag(DPDKQueuePair& qp, fragment& frag,
247 rte_mbuf*& head, rte_mbuf*& last_seg,
248 unsigned& nsegs) {
249 return do_one_frag(set_one_data_buf, qp, frag, head,
250 last_seg, nsegs);
251 }
252
253 /**
254 * Copies one fragment into the cluster of rte_mbuf's.
255 *
256 * @param qp DPDKQueuePair handle (in)
257 * @param frag Fragment to copy (in)
258 * @param head Head of the cluster (out)
259 * @param last_seg Last segment of the cluster (out)
260 * @param nsegs Number of segments in the cluster (out)
261 *
262 * We return the "last_seg" to avoid traversing the cluster in order to get
263 * it.
264 *
265 * @return TRUE in case of success
266 */
267 static bool copy_one_frag(DPDKQueuePair& qp, fragment& frag,
268 rte_mbuf*& head, rte_mbuf*& last_seg,
269 unsigned& nsegs) {
270 return do_one_frag(copy_one_data_buf, qp, frag, head,
271 last_seg, nsegs);
272 }
273
274 /**
275 * Allocates a single rte_mbuf and sets it to point to a given data
276 * buffer.
277 *
278 * @param qp DPDKQueuePair handle (in)
279 * @param m New allocated rte_mbuf (out)
280 * @param va virtual address of a data buffer (in)
281 * @param buf_len length of the data to copy (in)
282 *
283 * @return The actual number of bytes that has been set in the mbuf
284 */
285 static size_t set_one_data_buf(
286 DPDKQueuePair& qp, rte_mbuf*& m, char* va, size_t buf_len) {
287 static constexpr size_t max_frag_len = 15 * 1024; // 15K
288
11fdf7f2 289 // FIXME: current all tx buf is allocated without rte_malloc
7c673cae
FG
290 return copy_one_data_buf(qp, m, va, buf_len);
291 //
292 // Currently we break a buffer on a 15K boundary because 82599
293 // devices have a 15.5K limitation on a maximum single fragment
294 // size.
295 //
11fdf7f2 296 rte_iova_t pa = rte_malloc_virt2iova(va);
7c673cae
FG
297 if (!pa)
298 return copy_one_data_buf(qp, m, va, buf_len);
299
11fdf7f2 300 ceph_assert(buf_len);
7c673cae
FG
301 tx_buf* buf = qp.get_tx_buf();
302 if (!buf) {
303 return 0;
304 }
305
306 size_t len = std::min(buf_len, max_frag_len);
307
308 buf->set_zc_info(va, pa, len);
309 m = buf->rte_mbuf_p();
310
311 return len;
312 }
313
314 /**
315 * Allocates a single rte_mbuf and copies a given data into it.
316 *
317 * @param qp DPDKQueuePair handle (in)
318 * @param m New allocated rte_mbuf (out)
319 * @param data Data to copy from (in)
320 * @param buf_len length of the data to copy (in)
321 *
322 * @return The actual number of bytes that has been copied
323 */
324 static size_t copy_one_data_buf(
325 DPDKQueuePair& qp, rte_mbuf*& m, char* data, size_t buf_len);
326
327 /**
328 * Checks if the first fragment of the given packet satisfies the
329 * zero-copy flow requirement: its first 128 bytes should not cross the
330 * 4K page boundary. This is required in order to avoid splitting packet
331 * headers.
332 *
333 * @param p packet to check
334 *
335 * @return TRUE if packet is ok and FALSE otherwise.
336 */
337 static bool check_frag0(Packet& p)
338 {
339 //
340 // First frag is special - it has headers that should not be split.
341 // If the addressing is such that the first fragment has to be
342 // split, then send this packet in a (non-zero) copy flow. We'll
343 // check if the first 128 bytes of the first fragment reside in the
344 // physically contiguous area. If that's the case - we are good to
345 // go.
346 //
347 if (p.frag(0).size < 128)
348 return false;
349
350 return true;
351 }
352
353 public:
354 tx_buf(tx_buf_factory& fc) : _fc(fc) {
355
20effc67 356 _buf_physaddr = _mbuf.buf_iova;
7c673cae
FG
357 _data_off = _mbuf.data_off;
358 }
359
360 rte_mbuf* rte_mbuf_p() { return &_mbuf; }
361
362 void set_zc_info(void* va, phys_addr_t pa, size_t len) {
363 // mbuf_put()
364 _mbuf.data_len = len;
365 _mbuf.pkt_len = len;
366
367 // Set the mbuf to point to our data
368 _mbuf.buf_addr = va;
20effc67 369 _mbuf.buf_iova = pa;
7c673cae
FG
370 _mbuf.data_off = 0;
371 _is_zc = true;
372 }
373
374 void reset_zc() {
375
376 //
377 // If this mbuf was the last in a cluster and contains an
378 // original packet object then call the destructor of the
379 // original packet object.
380 //
381 if (_p) {
382 //
383 // Reset the std::optional. This in particular is going
384 // to call the "packet"'s destructor and reset the
385 // "optional" state to "nonengaged".
386 //
20effc67 387 _p.reset();
7c673cae
FG
388
389 } else if (!_is_zc) {
390 return;
391 }
392
393 // Restore the rte_mbuf fields we trashed in set_zc_info()
20effc67 394 _mbuf.buf_iova = _buf_physaddr;
7c673cae
FG
395 _mbuf.buf_addr = rte_mbuf_to_baddr(&_mbuf);
396 _mbuf.data_off = _data_off;
397
398 _is_zc = false;
399 }
400
401 void recycle() {
402 struct rte_mbuf *m = &_mbuf, *m_next;
403
404 while (m != nullptr) {
405 m_next = m->next;
406 rte_pktmbuf_reset(m);
407 _fc.put(me(m));
408 m = m_next;
409 }
410 }
411
412 void set_packet(Packet&& p) {
413 _p = std::move(p);
414 }
415
416 private:
417 struct rte_mbuf _mbuf;
418 MARKER private_start;
20effc67 419 std::optional<Packet> _p;
7c673cae
FG
420 phys_addr_t _buf_physaddr;
421 uint16_t _data_off;
422 // TRUE if underlying mbuf has been used in the zero-copy flow
423 bool _is_zc = false;
424 // buffers' factory the buffer came from
425 tx_buf_factory& _fc;
426 MARKER private_end;
427 };
428
429 class tx_buf_factory {
430 //
431 // Number of buffers to free in each GC iteration:
432 // We want the buffers to be allocated from the mempool as many as
433 // possible.
434 //
435 // On the other hand if there is no Tx for some time we want the
436 // completions to be eventually handled. Thus we choose the smallest
437 // possible packets count number here.
438 //
439 static constexpr int gc_count = 1;
440 public:
441 tx_buf_factory(CephContext *c, DPDKDevice *dev, uint8_t qid);
442 ~tx_buf_factory() {
443 // put all mbuf back into mempool in order to make the next factory work
444 while (gc());
445 rte_mempool_put_bulk(_pool, (void**)_ring.data(),
446 _ring.size());
447 }
448
449
450 /**
451 * @note Should not be called if there are no free tx_buf's
452 *
453 * @return a free tx_buf object
454 */
455 tx_buf* get() {
456 // Take completed from the HW first
457 tx_buf *pkt = get_one_completed();
458 if (pkt) {
459 pkt->reset_zc();
460 return pkt;
461 }
462
463 //
464 // If there are no completed at the moment - take from the
465 // factory's cache.
466 //
467 if (_ring.empty()) {
468 return nullptr;
469 }
470
471 pkt = _ring.back();
472 _ring.pop_back();
473
474 return pkt;
475 }
476
477 void put(tx_buf* buf) {
478 buf->reset_zc();
479 _ring.push_back(buf);
480 }
481
20effc67
TL
482 unsigned ring_size() const {
483 return _ring.size();
484 }
485
7c673cae
FG
486 bool gc() {
487 for (int cnt = 0; cnt < gc_count; ++cnt) {
488 auto tx_buf_p = get_one_completed();
489 if (!tx_buf_p) {
490 return false;
491 }
492
493 put(tx_buf_p);
494 }
495
496 return true;
497 }
498 private:
499 /**
500 * Fill the mbufs circular buffer: after this the _pool will become
501 * empty. We will use it to catch the completed buffers:
502 *
503 * - Underlying PMD drivers will "free" the mbufs once they are
504 * completed.
505 * - We will poll the _pktmbuf_pool_tx till it's empty and release
506 * all the buffers from the freed mbufs.
507 */
508 void init_factory() {
509 while (rte_mbuf* mbuf = rte_pktmbuf_alloc(_pool)) {
510 _ring.push_back(new(tx_buf::me(mbuf)) tx_buf{*this});
511 }
512 }
513
514 /**
515 * PMD puts the completed buffers back into the mempool they have
516 * originally come from.
517 *
518 * @note rte_pktmbuf_alloc() resets the mbuf so there is no need to call
519 * rte_pktmbuf_reset() here again.
520 *
521 * @return a single tx_buf that has been completed by HW.
522 */
523 tx_buf* get_one_completed() {
524 return tx_buf::me(rte_pktmbuf_alloc(_pool));
525 }
526
527 private:
528 CephContext *cct;
529 std::vector<tx_buf*> _ring;
530 rte_mempool* _pool = nullptr;
531 };
532
533 public:
534 explicit DPDKQueuePair(CephContext *c, EventCenter *cen, DPDKDevice* dev, uint8_t qid);
535 ~DPDKQueuePair() {
536 if (device_stat_time_fd) {
537 center->delete_time_event(device_stat_time_fd);
538 }
539 rx_gc(true);
540 }
541
542 void rx_start() {
20effc67 543 _rx_poller.emplace(this);
7c673cae
FG
544 }
545
546 uint32_t send(circular_buffer<Packet>& pb) {
547 // Zero-copy send
548 return _send(pb, [&] (Packet&& p) {
549 return tx_buf::from_packet_zc(cct, std::move(p), *this);
550 });
551 }
552
553 DPDKDevice& port() const { return *_dev; }
554 tx_buf* get_tx_buf() { return _tx_buf_factory.get(); }
555
556 void handle_stats();
557
558 private:
559 template <class Func>
560 uint32_t _send(circular_buffer<Packet>& pb, Func &&packet_to_tx_buf_p) {
561 if (_tx_burst.size() == 0) {
562 for (auto&& p : pb) {
11fdf7f2
TL
563 // TODO: ceph_assert() in a fast path! Remove me ASAP!
564 ceph_assert(p.len());
7c673cae
FG
565
566 tx_buf* buf = packet_to_tx_buf_p(std::move(p));
567 if (!buf) {
568 break;
569 }
570
571 _tx_burst.push_back(buf->rte_mbuf_p());
572 }
573 }
574
575 uint16_t sent = rte_eth_tx_burst(_dev_port_idx, _qid,
576 _tx_burst.data() + _tx_burst_idx,
577 _tx_burst.size() - _tx_burst_idx);
578
579 uint64_t nr_frags = 0, bytes = 0;
580
581 for (int i = 0; i < sent; i++) {
582 rte_mbuf* m = _tx_burst[_tx_burst_idx + i];
583 bytes += m->pkt_len;
584 nr_frags += m->nb_segs;
585 pb.pop_front();
586 }
587
588 perf_logger->inc(l_dpdk_qp_tx_fragments, nr_frags);
589 perf_logger->inc(l_dpdk_qp_tx_bytes, bytes);
590
591 _tx_burst_idx += sent;
592
593 if (_tx_burst_idx == _tx_burst.size()) {
594 _tx_burst_idx = 0;
595 _tx_burst.clear();
596 }
597
598 return sent;
599 }
600
601 /**
602 * Allocate a new data buffer and set the mbuf to point to it.
603 *
604 * Do some DPDK hacks to work on PMD: it assumes that the buf_addr
605 * points to the private data of RTE_PKTMBUF_HEADROOM before the actual
606 * data buffer.
607 *
608 * @param m mbuf to update
609 */
610 static bool refill_rx_mbuf(rte_mbuf* m, size_t size,
611 std::vector<void*> &datas) {
612 if (datas.empty())
613 return false;
614 void *data = datas.back();
615 datas.pop_back();
616
617 //
618 // Set the mbuf to point to our data.
619 //
620 // Do some DPDK hacks to work on PMD: it assumes that the buf_addr
621 // points to the private data of RTE_PKTMBUF_HEADROOM before the
622 // actual data buffer.
623 //
624 m->buf_addr = (char*)data - RTE_PKTMBUF_HEADROOM;
20effc67 625 m->buf_iova = rte_mem_virt2iova(data) - RTE_PKTMBUF_HEADROOM;
7c673cae
FG
626 return true;
627 }
628
629 bool init_rx_mbuf_pool();
630 bool rx_gc(bool force=false);
631 bool refill_one_cluster(rte_mbuf* head);
632
633 /**
634 * Polls for a burst of incoming packets. This function will not block and
635 * will immediately return after processing all available packets.
636 *
637 */
638 bool poll_rx_once();
639
640 /**
641 * Translates an rte_mbuf's into packet and feeds them to _rx_stream.
642 *
643 * @param bufs An array of received rte_mbuf's
644 * @param count Number of buffers in the bufs[]
645 */
646 void process_packets(struct rte_mbuf **bufs, uint16_t count);
647
648 /**
649 * Translate rte_mbuf into the "packet".
650 * @param m mbuf to translate
651 *
652 * @return a "optional" object representing the newly received data if in an
653 * "engaged" state or an error if in a "disengaged" state.
654 */
20effc67 655 std::optional<Packet> from_mbuf(rte_mbuf* m);
7c673cae
FG
656
657 /**
658 * Transform an LRO rte_mbuf cluster into the "packet" object.
659 * @param m HEAD of the mbufs' cluster to transform
660 *
661 * @return a "optional" object representing the newly received LRO packet if
662 * in an "engaged" state or an error if in a "disengaged" state.
663 */
20effc67 664 std::optional<Packet> from_mbuf_lro(rte_mbuf* m);
7c673cae
FG
665
666 private:
667 CephContext *cct;
668 std::vector<packet_provider_type> _pkt_providers;
20effc67 669 std::optional<std::array<uint8_t, 128>> _sw_reta;
7c673cae
FG
670 circular_buffer<Packet> _proxy_packetq;
671 stream<Packet> _rx_stream;
672 circular_buffer<Packet> _tx_packetq;
673 std::vector<void*> _alloc_bufs;
674
675 PerfCounters *perf_logger;
676 DPDKDevice* _dev;
677 uint8_t _dev_port_idx;
678 EventCenter *center;
679 uint8_t _qid;
680 rte_mempool *_pktmbuf_pool_rx;
681 std::vector<rte_mbuf*> _rx_free_pkts;
682 std::vector<rte_mbuf*> _rx_free_bufs;
683 std::vector<fragment> _frags;
684 std::vector<char*> _bufs;
685 size_t _num_rx_free_segs = 0;
686 uint64_t device_stat_time_fd = 0;
687
688#ifdef CEPH_PERF_DEV
689 uint64_t rx_cycles = 0;
690 uint64_t rx_count = 0;
691 uint64_t tx_cycles = 0;
692 uint64_t tx_count = 0;
693#endif
694
695 class DPDKTXPoller : public EventCenter::Poller {
696 DPDKQueuePair *qp;
697
698 public:
699 explicit DPDKTXPoller(DPDKQueuePair *qp)
700 : EventCenter::Poller(qp->center, "DPDK::DPDKTXPoller"), qp(qp) {}
701
702 virtual int poll() {
703 return qp->poll_tx();
704 }
705 } _tx_poller;
706
707 class DPDKRXGCPoller : public EventCenter::Poller {
708 DPDKQueuePair *qp;
709
710 public:
711 explicit DPDKRXGCPoller(DPDKQueuePair *qp)
712 : EventCenter::Poller(qp->center, "DPDK::DPDKRXGCPoller"), qp(qp) {}
713
714 virtual int poll() {
715 return qp->rx_gc();
716 }
717 } _rx_gc_poller;
718 tx_buf_factory _tx_buf_factory;
719 class DPDKRXPoller : public EventCenter::Poller {
720 DPDKQueuePair *qp;
721
722 public:
723 explicit DPDKRXPoller(DPDKQueuePair *qp)
724 : EventCenter::Poller(qp->center, "DPDK::DPDKRXPoller"), qp(qp) {}
725
726 virtual int poll() {
727 return qp->poll_rx_once();
728 }
729 };
20effc67 730 std::optional<DPDKRXPoller> _rx_poller;
7c673cae
FG
731 class DPDKTXGCPoller : public EventCenter::Poller {
732 DPDKQueuePair *qp;
733
734 public:
735 explicit DPDKTXGCPoller(DPDKQueuePair *qp)
736 : EventCenter::Poller(qp->center, "DPDK::DPDKTXGCPoller"), qp(qp) {}
737
738 virtual int poll() {
739 return qp->_tx_buf_factory.gc();
740 }
741 } _tx_gc_poller;
742 std::vector<rte_mbuf*> _tx_burst;
743 uint16_t _tx_burst_idx = 0;
744};
745
746class DPDKDevice {
747 public:
748 CephContext *cct;
749 PerfCounters *perf_logger;
750 std::vector<std::unique_ptr<DPDKQueuePair>> _queues;
751 std::vector<DPDKWorker*> workers;
752 size_t _rss_table_bits = 0;
753 uint8_t _port_idx;
754 uint16_t _num_queues;
755 unsigned cores;
756 hw_features _hw_features;
757 uint8_t _queues_ready = 0;
758 unsigned _home_cpu;
759 bool _use_lro;
760 bool _enable_fc;
9f95a23c 761 std::vector<uint16_t> _redir_table;
7c673cae 762 rss_key_type _rss_key;
9f95a23c 763 struct rte_flow *_flow = nullptr;
7c673cae
FG
764 bool _is_i40e_device = false;
765 bool _is_vmxnet3_device = false;
20effc67 766 std::unique_ptr<AdminSocketHook> dfx_hook;
7c673cae
FG
767
768 public:
769 rte_eth_dev_info _dev_info = {};
770
771 /**
772 * The final stage of a port initialization.
773 * @note Must be called *after* all queues from stage (2) have been
774 * initialized.
775 */
776 int init_port_fini();
777
20effc67
TL
778 void nic_stats_dump(Formatter *f);
779 void nic_xstats_dump(Formatter *f);
7c673cae
FG
780 private:
781 /**
782 * Port initialization consists of 3 main stages:
783 * 1) General port initialization which ends with a call to
784 * rte_eth_dev_configure() where we request the needed number of Rx and
785 * Tx queues.
786 * 2) Individual queues initialization. This is done in the constructor of
787 * DPDKQueuePair class. In particular the memory pools for queues are allocated
788 * in this stage.
789 * 3) The final stage of the initialization which starts with the call of
790 * rte_eth_dev_start() after which the port becomes fully functional. We
791 * will also wait for a link to get up in this stage.
792 */
793
794
795 /**
796 * First stage of the port initialization.
797 *
798 * @return 0 in case of success and an appropriate error code in case of an
799 * error.
800 */
801 int init_port_start();
802
803 /**
804 * Check the link status of out port in up to 9s, and print them finally.
805 */
806 int check_port_link_status();
807
808 /**
809 * Configures the HW Flow Control
810 */
811 void set_hw_flow_control();
812
813 public:
814 DPDKDevice(CephContext *c, uint8_t port_idx, uint16_t num_queues, bool use_lro, bool enable_fc):
815 cct(c), _port_idx(port_idx), _num_queues(num_queues),
816 _home_cpu(0), _use_lro(use_lro),
817 _enable_fc(enable_fc) {
818 _queues = std::vector<std::unique_ptr<DPDKQueuePair>>(_num_queues);
819 /* now initialise the port we will use */
820 int ret = init_port_start();
821 if (ret != 0) {
20effc67 822 ceph_assert(false && "Cannot initialise port\n");
7c673cae 823 }
20effc67 824 std::string name(std::string("port") + std::to_string(port_idx));
7c673cae
FG
825 PerfCountersBuilder plb(cct, name, l_dpdk_dev_first, l_dpdk_dev_last);
826
827 plb.add_u64_counter(l_dpdk_dev_rx_mcast, "dpdk_device_receive_multicast_packets", "DPDK received multicast packets");
828 plb.add_u64_counter(l_dpdk_dev_rx_badcrc_errors, "dpdk_device_receive_badcrc_errors", "DPDK received bad crc errors");
829
830 plb.add_u64_counter(l_dpdk_dev_rx_total_errors, "dpdk_device_receive_total_errors", "DPDK received total_errors");
831 plb.add_u64_counter(l_dpdk_dev_tx_total_errors, "dpdk_device_send_total_errors", "DPDK sendd total_errors");
832 plb.add_u64_counter(l_dpdk_dev_rx_dropped_errors, "dpdk_device_receive_dropped_errors", "DPDK received dropped errors");
833 plb.add_u64_counter(l_dpdk_dev_rx_nombuf_errors, "dpdk_device_receive_nombuf_errors", "DPDK received RX mbuf allocation errors");
834
835 perf_logger = plb.create_perf_counters();
836 cct->get_perfcounters_collection()->add(perf_logger);
837 }
838
839 ~DPDKDevice() {
20effc67
TL
840 cct->get_admin_socket()->unregister_commands(dfx_hook.get());
841 dfx_hook.reset();
9f95a23c
TL
842 if (_flow)
843 rte_flow_destroy(_port_idx, _flow, nullptr);
7c673cae
FG
844 rte_eth_dev_stop(_port_idx);
845 }
846
847 DPDKQueuePair& queue_for_cpu(unsigned cpu) { return *_queues[cpu]; }
848 void l2receive(int qid, Packet p) {
849 _queues[qid]->_rx_stream.produce(std::move(p));
850 }
851 subscription<Packet> receive(unsigned cpuid, std::function<int (Packet)> next_packet) {
852 auto sub = _queues[cpuid]->_rx_stream.listen(std::move(next_packet));
853 _queues[cpuid]->rx_start();
9f95a23c 854 return sub;
7c673cae
FG
855 }
856 ethernet_address hw_address() {
20effc67 857 struct rte_ether_addr mac;
7c673cae
FG
858 rte_eth_macaddr_get(_port_idx, &mac);
859
860 return mac.addr_bytes;
861 }
862 hw_features get_hw_features() {
863 return _hw_features;
864 }
865 const rss_key_type& rss_key() const { return _rss_key; }
866 uint16_t hw_queues_count() { return _num_queues; }
20effc67
TL
867 std::unique_ptr<DPDKQueuePair> init_local_queue(CephContext *c,
868 EventCenter *center, std::string hugepages, uint16_t qid) {
7c673cae
FG
869 std::unique_ptr<DPDKQueuePair> qp;
870 qp = std::unique_ptr<DPDKQueuePair>(new DPDKQueuePair(c, center, this, qid));
9f95a23c 871 return qp;
7c673cae
FG
872 }
873 unsigned hash2qid(uint32_t hash) {
874 // return hash % hw_queues_count();
875 return _redir_table[hash & (_redir_table.size() - 1)];
876 }
877 void set_local_queue(unsigned i, std::unique_ptr<DPDKQueuePair> qp) {
11fdf7f2 878 ceph_assert(!_queues[i]);
7c673cae
FG
879 _queues[i] = std::move(qp);
880 }
881 void unset_local_queue(unsigned i) {
11fdf7f2 882 ceph_assert(_queues[i]);
7c673cae
FG
883 _queues[i].reset();
884 }
885 template <typename Func>
886 unsigned forward_dst(unsigned src_cpuid, Func&& hashfn) {
887 auto& qp = queue_for_cpu(src_cpuid);
888 if (!qp._sw_reta)
889 return src_cpuid;
890
11fdf7f2 891 ceph_assert(!qp._sw_reta);
7c673cae
FG
892 auto hash = hashfn() >> _rss_table_bits;
893 auto& reta = *qp._sw_reta;
894 return reta[hash % reta.size()];
895 }
896 unsigned hash2cpu(uint32_t hash) {
897 // there is an assumption here that qid == get_id() which will
898 // not necessary be true in the future
899 return forward_dst(hash2qid(hash), [hash] { return hash; });
900 }
901
902 hw_features& hw_features_ref() { return _hw_features; }
903
904 const rte_eth_rxconf* def_rx_conf() const {
905 return &_dev_info.default_rxconf;
906 }
907
908 const rte_eth_txconf* def_tx_conf() const {
909 return &_dev_info.default_txconf;
910 }
911
912 /**
913 * Set the RSS table in the device and store it in the internal vector.
914 */
915 void set_rss_table();
916
917 uint8_t port_idx() { return _port_idx; }
918 bool is_i40e_device() const {
919 return _is_i40e_device;
920 }
921 bool is_vmxnet3_device() const {
922 return _is_vmxnet3_device;
923 }
924};
925
926
927std::unique_ptr<DPDKDevice> create_dpdk_net_device(
928 CephContext *c, unsigned cores, uint8_t port_idx = 0,
929 bool use_lro = true, bool enable_fc = true);
930
931
932/**
933 * @return Number of bytes needed for mempool objects of each QP.
934 */
935uint32_t qp_mempool_obj_size();
936
937#endif // CEPH_DPDK_DEV_H