]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/dpdk/DPDK.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / async / dpdk / DPDK.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2/*
3 * This file is open source software, licensed to you under the terms
4 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
5 * distributed with this work for additional information regarding copyright
6 * ownership. You may not use this file except in compliance with the License.
7 *
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19/*
20 * Copyright (C) 2014 Cloudius Systems, Ltd.
21 */
7c673cae
FG
22
23#ifndef CEPH_DPDK_DEV_H
24#define CEPH_DPDK_DEV_H
25
26#include <memory>
27#include <functional>
28#include <rte_config.h>
29#include <rte_common.h>
30#include <rte_ethdev.h>
31#include <rte_malloc.h>
32#include <rte_version.h>
33
34#include "include/page.h"
35#include "common/Tub.h"
36#include "common/perf_counters.h"
37#include "msg/async/Event.h"
38#include "const.h"
39#include "circular_buffer.h"
40#include "ethernet.h"
7c673cae
FG
41#include "Packet.h"
42#include "stream.h"
43#include "net.h"
44#include "toeplitz.h"
45
46
47struct free_deleter {
48 void operator()(void* p) { ::free(p); }
49};
50
51
52enum {
53 l_dpdk_dev_first = 58800,
54 l_dpdk_dev_rx_mcast,
55 l_dpdk_dev_rx_total_errors,
56 l_dpdk_dev_tx_total_errors,
57 l_dpdk_dev_rx_badcrc_errors,
58 l_dpdk_dev_rx_dropped_errors,
59 l_dpdk_dev_rx_nombuf_errors,
60 l_dpdk_dev_last
61};
62
63enum {
64 l_dpdk_qp_first = 58900,
65 l_dpdk_qp_rx_packets,
66 l_dpdk_qp_tx_packets,
67 l_dpdk_qp_rx_bad_checksum_errors,
68 l_dpdk_qp_rx_no_memory_errors,
69 l_dpdk_qp_rx_bytes,
70 l_dpdk_qp_tx_bytes,
71 l_dpdk_qp_rx_last_bunch,
72 l_dpdk_qp_tx_last_bunch,
73 l_dpdk_qp_rx_fragments,
74 l_dpdk_qp_tx_fragments,
75 l_dpdk_qp_rx_copy_ops,
76 l_dpdk_qp_tx_copy_ops,
77 l_dpdk_qp_rx_copy_bytes,
78 l_dpdk_qp_tx_copy_bytes,
79 l_dpdk_qp_rx_linearize_ops,
80 l_dpdk_qp_tx_linearize_ops,
81 l_dpdk_qp_tx_queue_length,
82 l_dpdk_qp_last
83};
84
85class DPDKDevice;
86class DPDKWorker;
87
88class DPDKQueuePair {
89 using packet_provider_type = std::function<Tub<Packet> ()>;
90 public:
91 void configure_proxies(const std::map<unsigned, float>& cpu_weights);
92 // build REdirection TAble for cpu_weights map: target cpu -> weight
93 void build_sw_reta(const std::map<unsigned, float>& cpu_weights);
94 void proxy_send(Packet p) {
95 _proxy_packetq.push_back(std::move(p));
96 }
97 void register_packet_provider(packet_provider_type func) {
98 _pkt_providers.push_back(std::move(func));
99 }
100 bool poll_tx();
101 friend class DPDKDevice;
102
103 class tx_buf_factory;
104
105 class tx_buf {
106 friend class DPDKQueuePair;
107 public:
108 static tx_buf* me(rte_mbuf* mbuf) {
109 return reinterpret_cast<tx_buf*>(mbuf);
110 }
111
112 private:
113 /**
114 * Checks if the original packet of a given cluster should be linearized
115 * due to HW limitations.
116 *
117 * @param head head of a cluster to check
118 *
119 * @return TRUE if a packet should be linearized.
120 */
121 static bool i40e_should_linearize(rte_mbuf *head);
122
123 /**
124 * Sets the offload info in the head buffer of an rte_mbufs cluster.
125 *
126 * @param p an original packet the cluster is built for
127 * @param qp QP handle
128 * @param head a head of an rte_mbufs cluster
129 */
130 static void set_cluster_offload_info(const Packet& p, const DPDKQueuePair& qp, rte_mbuf* head);
131
132 /**
133 * Creates a tx_buf cluster representing a given packet in a "zero-copy"
134 * way.
135 *
136 * @param p packet to translate
137 * @param qp DPDKQueuePair handle
138 *
139 * @return the HEAD tx_buf of the cluster or nullptr in case of a
140 * failure
141 */
142 static tx_buf* from_packet_zc(
143 CephContext *cct, Packet&& p, DPDKQueuePair& qp);
144
145 /**
146 * Copy the contents of the "packet" into the given cluster of
147 * rte_mbuf's.
148 *
149 * @note Size of the cluster has to be big enough to accommodate all the
150 * contents of the given packet.
151 *
152 * @param p packet to copy
153 * @param head head of the rte_mbuf's cluster
154 */
155 static void copy_packet_to_cluster(const Packet& p, rte_mbuf* head);
156
157 /**
158 * Creates a tx_buf cluster representing a given packet in a "copy" way.
159 *
160 * @param p packet to translate
161 * @param qp DPDKQueuePair handle
162 *
163 * @return the HEAD tx_buf of the cluster or nullptr in case of a
164 * failure
165 */
166 static tx_buf* from_packet_copy(Packet&& p, DPDKQueuePair& qp);
167
168 /**
169 * Zero-copy handling of a single fragment.
170 *
171 * @param do_one_buf Functor responsible for a single rte_mbuf
172 * handling
173 * @param qp DPDKQueuePair handle (in)
174 * @param frag Fragment to copy (in)
175 * @param head Head of the cluster (out)
176 * @param last_seg Last segment of the cluster (out)
177 * @param nsegs Number of segments in the cluster (out)
178 *
179 * @return TRUE in case of success
180 */
181 template <class DoOneBufFunc>
182 static bool do_one_frag(DoOneBufFunc do_one_buf, DPDKQueuePair& qp,
183 fragment& frag, rte_mbuf*& head,
184 rte_mbuf*& last_seg, unsigned& nsegs) {
185 size_t len, left_to_set = frag.size;
186 char* base = frag.base;
187
188 rte_mbuf* m;
189
11fdf7f2
TL
190 // TODO: ceph_assert() in a fast path! Remove me ASAP!
191 ceph_assert(frag.size);
7c673cae
FG
192
193 // Create a HEAD of mbufs' cluster and set the first bytes into it
194 len = do_one_buf(qp, head, base, left_to_set);
195 if (!len) {
196 return false;
197 }
198
199 left_to_set -= len;
200 base += len;
201 nsegs = 1;
202
203 //
204 // Set the rest of the data into the new mbufs and chain them to
205 // the cluster.
206 //
207 rte_mbuf* prev_seg = head;
208 while (left_to_set) {
209 len = do_one_buf(qp, m, base, left_to_set);
210 if (!len) {
211 me(head)->recycle();
212 return false;
213 }
214
215 left_to_set -= len;
216 base += len;
217 nsegs++;
218
219 prev_seg->next = m;
220 prev_seg = m;
221 }
222
223 // Return the last mbuf in the cluster
224 last_seg = prev_seg;
225
226 return true;
227 }
228
229 /**
230 * Zero-copy handling of a single fragment.
231 *
232 * @param qp DPDKQueuePair handle (in)
233 * @param frag Fragment to copy (in)
234 * @param head Head of the cluster (out)
235 * @param last_seg Last segment of the cluster (out)
236 * @param nsegs Number of segments in the cluster (out)
237 *
238 * @return TRUE in case of success
239 */
240 static bool translate_one_frag(DPDKQueuePair& qp, fragment& frag,
241 rte_mbuf*& head, rte_mbuf*& last_seg,
242 unsigned& nsegs) {
243 return do_one_frag(set_one_data_buf, qp, frag, head,
244 last_seg, nsegs);
245 }
246
247 /**
248 * Copies one fragment into the cluster of rte_mbuf's.
249 *
250 * @param qp DPDKQueuePair handle (in)
251 * @param frag Fragment to copy (in)
252 * @param head Head of the cluster (out)
253 * @param last_seg Last segment of the cluster (out)
254 * @param nsegs Number of segments in the cluster (out)
255 *
256 * We return the "last_seg" to avoid traversing the cluster in order to get
257 * it.
258 *
259 * @return TRUE in case of success
260 */
261 static bool copy_one_frag(DPDKQueuePair& qp, fragment& frag,
262 rte_mbuf*& head, rte_mbuf*& last_seg,
263 unsigned& nsegs) {
264 return do_one_frag(copy_one_data_buf, qp, frag, head,
265 last_seg, nsegs);
266 }
267
268 /**
269 * Allocates a single rte_mbuf and sets it to point to a given data
270 * buffer.
271 *
272 * @param qp DPDKQueuePair handle (in)
273 * @param m New allocated rte_mbuf (out)
274 * @param va virtual address of a data buffer (in)
275 * @param buf_len length of the data to copy (in)
276 *
277 * @return The actual number of bytes that has been set in the mbuf
278 */
279 static size_t set_one_data_buf(
280 DPDKQueuePair& qp, rte_mbuf*& m, char* va, size_t buf_len) {
281 static constexpr size_t max_frag_len = 15 * 1024; // 15K
282
11fdf7f2 283 // FIXME: current all tx buf is allocated without rte_malloc
7c673cae
FG
284 return copy_one_data_buf(qp, m, va, buf_len);
285 //
286 // Currently we break a buffer on a 15K boundary because 82599
287 // devices have a 15.5K limitation on a maximum single fragment
288 // size.
289 //
11fdf7f2 290 rte_iova_t pa = rte_malloc_virt2iova(va);
7c673cae
FG
291 if (!pa)
292 return copy_one_data_buf(qp, m, va, buf_len);
293
11fdf7f2 294 ceph_assert(buf_len);
7c673cae
FG
295 tx_buf* buf = qp.get_tx_buf();
296 if (!buf) {
297 return 0;
298 }
299
300 size_t len = std::min(buf_len, max_frag_len);
301
302 buf->set_zc_info(va, pa, len);
303 m = buf->rte_mbuf_p();
304
305 return len;
306 }
307
308 /**
309 * Allocates a single rte_mbuf and copies a given data into it.
310 *
311 * @param qp DPDKQueuePair handle (in)
312 * @param m New allocated rte_mbuf (out)
313 * @param data Data to copy from (in)
314 * @param buf_len length of the data to copy (in)
315 *
316 * @return The actual number of bytes that has been copied
317 */
318 static size_t copy_one_data_buf(
319 DPDKQueuePair& qp, rte_mbuf*& m, char* data, size_t buf_len);
320
321 /**
322 * Checks if the first fragment of the given packet satisfies the
323 * zero-copy flow requirement: its first 128 bytes should not cross the
324 * 4K page boundary. This is required in order to avoid splitting packet
325 * headers.
326 *
327 * @param p packet to check
328 *
329 * @return TRUE if packet is ok and FALSE otherwise.
330 */
331 static bool check_frag0(Packet& p)
332 {
333 //
334 // First frag is special - it has headers that should not be split.
335 // If the addressing is such that the first fragment has to be
336 // split, then send this packet in a (non-zero) copy flow. We'll
337 // check if the first 128 bytes of the first fragment reside in the
338 // physically contiguous area. If that's the case - we are good to
339 // go.
340 //
341 if (p.frag(0).size < 128)
342 return false;
343
344 return true;
345 }
346
347 public:
348 tx_buf(tx_buf_factory& fc) : _fc(fc) {
349
350 _buf_physaddr = _mbuf.buf_physaddr;
351 _data_off = _mbuf.data_off;
352 }
353
354 rte_mbuf* rte_mbuf_p() { return &_mbuf; }
355
356 void set_zc_info(void* va, phys_addr_t pa, size_t len) {
357 // mbuf_put()
358 _mbuf.data_len = len;
359 _mbuf.pkt_len = len;
360
361 // Set the mbuf to point to our data
362 _mbuf.buf_addr = va;
363 _mbuf.buf_physaddr = pa;
364 _mbuf.data_off = 0;
365 _is_zc = true;
366 }
367
368 void reset_zc() {
369
370 //
371 // If this mbuf was the last in a cluster and contains an
372 // original packet object then call the destructor of the
373 // original packet object.
374 //
375 if (_p) {
376 //
377 // Reset the std::optional. This in particular is going
378 // to call the "packet"'s destructor and reset the
379 // "optional" state to "nonengaged".
380 //
381 _p.destroy();
382
383 } else if (!_is_zc) {
384 return;
385 }
386
387 // Restore the rte_mbuf fields we trashed in set_zc_info()
388 _mbuf.buf_physaddr = _buf_physaddr;
389 _mbuf.buf_addr = rte_mbuf_to_baddr(&_mbuf);
390 _mbuf.data_off = _data_off;
391
392 _is_zc = false;
393 }
394
395 void recycle() {
396 struct rte_mbuf *m = &_mbuf, *m_next;
397
398 while (m != nullptr) {
399 m_next = m->next;
400 rte_pktmbuf_reset(m);
401 _fc.put(me(m));
402 m = m_next;
403 }
404 }
405
406 void set_packet(Packet&& p) {
407 _p = std::move(p);
408 }
409
410 private:
411 struct rte_mbuf _mbuf;
412 MARKER private_start;
413 Tub<Packet> _p;
414 phys_addr_t _buf_physaddr;
415 uint16_t _data_off;
416 // TRUE if underlying mbuf has been used in the zero-copy flow
417 bool _is_zc = false;
418 // buffers' factory the buffer came from
419 tx_buf_factory& _fc;
420 MARKER private_end;
421 };
422
423 class tx_buf_factory {
424 //
425 // Number of buffers to free in each GC iteration:
426 // We want the buffers to be allocated from the mempool as many as
427 // possible.
428 //
429 // On the other hand if there is no Tx for some time we want the
430 // completions to be eventually handled. Thus we choose the smallest
431 // possible packets count number here.
432 //
433 static constexpr int gc_count = 1;
434 public:
435 tx_buf_factory(CephContext *c, DPDKDevice *dev, uint8_t qid);
436 ~tx_buf_factory() {
437 // put all mbuf back into mempool in order to make the next factory work
438 while (gc());
439 rte_mempool_put_bulk(_pool, (void**)_ring.data(),
440 _ring.size());
441 }
442
443
444 /**
445 * @note Should not be called if there are no free tx_buf's
446 *
447 * @return a free tx_buf object
448 */
449 tx_buf* get() {
450 // Take completed from the HW first
451 tx_buf *pkt = get_one_completed();
452 if (pkt) {
453 pkt->reset_zc();
454 return pkt;
455 }
456
457 //
458 // If there are no completed at the moment - take from the
459 // factory's cache.
460 //
461 if (_ring.empty()) {
462 return nullptr;
463 }
464
465 pkt = _ring.back();
466 _ring.pop_back();
467
468 return pkt;
469 }
470
471 void put(tx_buf* buf) {
472 buf->reset_zc();
473 _ring.push_back(buf);
474 }
475
476 bool gc() {
477 for (int cnt = 0; cnt < gc_count; ++cnt) {
478 auto tx_buf_p = get_one_completed();
479 if (!tx_buf_p) {
480 return false;
481 }
482
483 put(tx_buf_p);
484 }
485
486 return true;
487 }
488 private:
489 /**
490 * Fill the mbufs circular buffer: after this the _pool will become
491 * empty. We will use it to catch the completed buffers:
492 *
493 * - Underlying PMD drivers will "free" the mbufs once they are
494 * completed.
495 * - We will poll the _pktmbuf_pool_tx till it's empty and release
496 * all the buffers from the freed mbufs.
497 */
498 void init_factory() {
499 while (rte_mbuf* mbuf = rte_pktmbuf_alloc(_pool)) {
500 _ring.push_back(new(tx_buf::me(mbuf)) tx_buf{*this});
501 }
502 }
503
504 /**
505 * PMD puts the completed buffers back into the mempool they have
506 * originally come from.
507 *
508 * @note rte_pktmbuf_alloc() resets the mbuf so there is no need to call
509 * rte_pktmbuf_reset() here again.
510 *
511 * @return a single tx_buf that has been completed by HW.
512 */
513 tx_buf* get_one_completed() {
514 return tx_buf::me(rte_pktmbuf_alloc(_pool));
515 }
516
517 private:
518 CephContext *cct;
519 std::vector<tx_buf*> _ring;
520 rte_mempool* _pool = nullptr;
521 };
522
523 public:
524 explicit DPDKQueuePair(CephContext *c, EventCenter *cen, DPDKDevice* dev, uint8_t qid);
525 ~DPDKQueuePair() {
526 if (device_stat_time_fd) {
527 center->delete_time_event(device_stat_time_fd);
528 }
529 rx_gc(true);
530 }
531
532 void rx_start() {
533 _rx_poller.construct(this);
534 }
535
536 uint32_t send(circular_buffer<Packet>& pb) {
537 // Zero-copy send
538 return _send(pb, [&] (Packet&& p) {
539 return tx_buf::from_packet_zc(cct, std::move(p), *this);
540 });
541 }
542
543 DPDKDevice& port() const { return *_dev; }
544 tx_buf* get_tx_buf() { return _tx_buf_factory.get(); }
545
546 void handle_stats();
547
548 private:
549 template <class Func>
550 uint32_t _send(circular_buffer<Packet>& pb, Func &&packet_to_tx_buf_p) {
551 if (_tx_burst.size() == 0) {
552 for (auto&& p : pb) {
11fdf7f2
TL
553 // TODO: ceph_assert() in a fast path! Remove me ASAP!
554 ceph_assert(p.len());
7c673cae
FG
555
556 tx_buf* buf = packet_to_tx_buf_p(std::move(p));
557 if (!buf) {
558 break;
559 }
560
561 _tx_burst.push_back(buf->rte_mbuf_p());
562 }
563 }
564
565 uint16_t sent = rte_eth_tx_burst(_dev_port_idx, _qid,
566 _tx_burst.data() + _tx_burst_idx,
567 _tx_burst.size() - _tx_burst_idx);
568
569 uint64_t nr_frags = 0, bytes = 0;
570
571 for (int i = 0; i < sent; i++) {
572 rte_mbuf* m = _tx_burst[_tx_burst_idx + i];
573 bytes += m->pkt_len;
574 nr_frags += m->nb_segs;
575 pb.pop_front();
576 }
577
578 perf_logger->inc(l_dpdk_qp_tx_fragments, nr_frags);
579 perf_logger->inc(l_dpdk_qp_tx_bytes, bytes);
580
581 _tx_burst_idx += sent;
582
583 if (_tx_burst_idx == _tx_burst.size()) {
584 _tx_burst_idx = 0;
585 _tx_burst.clear();
586 }
587
588 return sent;
589 }
590
591 /**
592 * Allocate a new data buffer and set the mbuf to point to it.
593 *
594 * Do some DPDK hacks to work on PMD: it assumes that the buf_addr
595 * points to the private data of RTE_PKTMBUF_HEADROOM before the actual
596 * data buffer.
597 *
598 * @param m mbuf to update
599 */
600 static bool refill_rx_mbuf(rte_mbuf* m, size_t size,
601 std::vector<void*> &datas) {
602 if (datas.empty())
603 return false;
604 void *data = datas.back();
605 datas.pop_back();
606
607 //
608 // Set the mbuf to point to our data.
609 //
610 // Do some DPDK hacks to work on PMD: it assumes that the buf_addr
611 // points to the private data of RTE_PKTMBUF_HEADROOM before the
612 // actual data buffer.
613 //
614 m->buf_addr = (char*)data - RTE_PKTMBUF_HEADROOM;
11fdf7f2 615 m->buf_physaddr = rte_mem_virt2phy(data) - RTE_PKTMBUF_HEADROOM;
7c673cae
FG
616 return true;
617 }
618
619 bool init_rx_mbuf_pool();
620 bool rx_gc(bool force=false);
621 bool refill_one_cluster(rte_mbuf* head);
622
623 /**
624 * Polls for a burst of incoming packets. This function will not block and
625 * will immediately return after processing all available packets.
626 *
627 */
628 bool poll_rx_once();
629
630 /**
631 * Translates an rte_mbuf's into packet and feeds them to _rx_stream.
632 *
633 * @param bufs An array of received rte_mbuf's
634 * @param count Number of buffers in the bufs[]
635 */
636 void process_packets(struct rte_mbuf **bufs, uint16_t count);
637
638 /**
639 * Translate rte_mbuf into the "packet".
640 * @param m mbuf to translate
641 *
642 * @return a "optional" object representing the newly received data if in an
643 * "engaged" state or an error if in a "disengaged" state.
644 */
645 Tub<Packet> from_mbuf(rte_mbuf* m);
646
647 /**
648 * Transform an LRO rte_mbuf cluster into the "packet" object.
649 * @param m HEAD of the mbufs' cluster to transform
650 *
651 * @return a "optional" object representing the newly received LRO packet if
652 * in an "engaged" state or an error if in a "disengaged" state.
653 */
654 Tub<Packet> from_mbuf_lro(rte_mbuf* m);
655
656 private:
657 CephContext *cct;
658 std::vector<packet_provider_type> _pkt_providers;
659 Tub<std::array<uint8_t, 128>> _sw_reta;
660 circular_buffer<Packet> _proxy_packetq;
661 stream<Packet> _rx_stream;
662 circular_buffer<Packet> _tx_packetq;
663 std::vector<void*> _alloc_bufs;
664
665 PerfCounters *perf_logger;
666 DPDKDevice* _dev;
667 uint8_t _dev_port_idx;
668 EventCenter *center;
669 uint8_t _qid;
670 rte_mempool *_pktmbuf_pool_rx;
671 std::vector<rte_mbuf*> _rx_free_pkts;
672 std::vector<rte_mbuf*> _rx_free_bufs;
673 std::vector<fragment> _frags;
674 std::vector<char*> _bufs;
675 size_t _num_rx_free_segs = 0;
676 uint64_t device_stat_time_fd = 0;
677
678#ifdef CEPH_PERF_DEV
679 uint64_t rx_cycles = 0;
680 uint64_t rx_count = 0;
681 uint64_t tx_cycles = 0;
682 uint64_t tx_count = 0;
683#endif
684
685 class DPDKTXPoller : public EventCenter::Poller {
686 DPDKQueuePair *qp;
687
688 public:
689 explicit DPDKTXPoller(DPDKQueuePair *qp)
690 : EventCenter::Poller(qp->center, "DPDK::DPDKTXPoller"), qp(qp) {}
691
692 virtual int poll() {
693 return qp->poll_tx();
694 }
695 } _tx_poller;
696
697 class DPDKRXGCPoller : public EventCenter::Poller {
698 DPDKQueuePair *qp;
699
700 public:
701 explicit DPDKRXGCPoller(DPDKQueuePair *qp)
702 : EventCenter::Poller(qp->center, "DPDK::DPDKRXGCPoller"), qp(qp) {}
703
704 virtual int poll() {
705 return qp->rx_gc();
706 }
707 } _rx_gc_poller;
708 tx_buf_factory _tx_buf_factory;
709 class DPDKRXPoller : public EventCenter::Poller {
710 DPDKQueuePair *qp;
711
712 public:
713 explicit DPDKRXPoller(DPDKQueuePair *qp)
714 : EventCenter::Poller(qp->center, "DPDK::DPDKRXPoller"), qp(qp) {}
715
716 virtual int poll() {
717 return qp->poll_rx_once();
718 }
719 };
720 Tub<DPDKRXPoller> _rx_poller;
721 class DPDKTXGCPoller : public EventCenter::Poller {
722 DPDKQueuePair *qp;
723
724 public:
725 explicit DPDKTXGCPoller(DPDKQueuePair *qp)
726 : EventCenter::Poller(qp->center, "DPDK::DPDKTXGCPoller"), qp(qp) {}
727
728 virtual int poll() {
729 return qp->_tx_buf_factory.gc();
730 }
731 } _tx_gc_poller;
732 std::vector<rte_mbuf*> _tx_burst;
733 uint16_t _tx_burst_idx = 0;
734};
735
736class DPDKDevice {
737 public:
738 CephContext *cct;
739 PerfCounters *perf_logger;
740 std::vector<std::unique_ptr<DPDKQueuePair>> _queues;
741 std::vector<DPDKWorker*> workers;
742 size_t _rss_table_bits = 0;
743 uint8_t _port_idx;
744 uint16_t _num_queues;
745 unsigned cores;
746 hw_features _hw_features;
747 uint8_t _queues_ready = 0;
748 unsigned _home_cpu;
749 bool _use_lro;
750 bool _enable_fc;
751 std::vector<uint8_t> _redir_table;
752 rss_key_type _rss_key;
753 bool _is_i40e_device = false;
754 bool _is_vmxnet3_device = false;
755
756 public:
757 rte_eth_dev_info _dev_info = {};
758
759 /**
760 * The final stage of a port initialization.
761 * @note Must be called *after* all queues from stage (2) have been
762 * initialized.
763 */
764 int init_port_fini();
765
766 private:
767 /**
768 * Port initialization consists of 3 main stages:
769 * 1) General port initialization which ends with a call to
770 * rte_eth_dev_configure() where we request the needed number of Rx and
771 * Tx queues.
772 * 2) Individual queues initialization. This is done in the constructor of
773 * DPDKQueuePair class. In particular the memory pools for queues are allocated
774 * in this stage.
775 * 3) The final stage of the initialization which starts with the call of
776 * rte_eth_dev_start() after which the port becomes fully functional. We
777 * will also wait for a link to get up in this stage.
778 */
779
780
781 /**
782 * First stage of the port initialization.
783 *
784 * @return 0 in case of success and an appropriate error code in case of an
785 * error.
786 */
787 int init_port_start();
788
789 /**
790 * Check the link status of out port in up to 9s, and print them finally.
791 */
792 int check_port_link_status();
793
794 /**
795 * Configures the HW Flow Control
796 */
797 void set_hw_flow_control();
798
799 public:
800 DPDKDevice(CephContext *c, uint8_t port_idx, uint16_t num_queues, bool use_lro, bool enable_fc):
801 cct(c), _port_idx(port_idx), _num_queues(num_queues),
802 _home_cpu(0), _use_lro(use_lro),
803 _enable_fc(enable_fc) {
804 _queues = std::vector<std::unique_ptr<DPDKQueuePair>>(_num_queues);
805 /* now initialise the port we will use */
806 int ret = init_port_start();
807 if (ret != 0) {
808 rte_exit(EXIT_FAILURE, "Cannot initialise port %u\n", _port_idx);
809 }
810 string name(std::string("port") + std::to_string(port_idx));
811 PerfCountersBuilder plb(cct, name, l_dpdk_dev_first, l_dpdk_dev_last);
812
813 plb.add_u64_counter(l_dpdk_dev_rx_mcast, "dpdk_device_receive_multicast_packets", "DPDK received multicast packets");
814 plb.add_u64_counter(l_dpdk_dev_rx_badcrc_errors, "dpdk_device_receive_badcrc_errors", "DPDK received bad crc errors");
815
816 plb.add_u64_counter(l_dpdk_dev_rx_total_errors, "dpdk_device_receive_total_errors", "DPDK received total_errors");
817 plb.add_u64_counter(l_dpdk_dev_tx_total_errors, "dpdk_device_send_total_errors", "DPDK sendd total_errors");
818 plb.add_u64_counter(l_dpdk_dev_rx_dropped_errors, "dpdk_device_receive_dropped_errors", "DPDK received dropped errors");
819 plb.add_u64_counter(l_dpdk_dev_rx_nombuf_errors, "dpdk_device_receive_nombuf_errors", "DPDK received RX mbuf allocation errors");
820
821 perf_logger = plb.create_perf_counters();
822 cct->get_perfcounters_collection()->add(perf_logger);
823 }
824
825 ~DPDKDevice() {
826 rte_eth_dev_stop(_port_idx);
827 }
828
829 DPDKQueuePair& queue_for_cpu(unsigned cpu) { return *_queues[cpu]; }
830 void l2receive(int qid, Packet p) {
831 _queues[qid]->_rx_stream.produce(std::move(p));
832 }
833 subscription<Packet> receive(unsigned cpuid, std::function<int (Packet)> next_packet) {
834 auto sub = _queues[cpuid]->_rx_stream.listen(std::move(next_packet));
835 _queues[cpuid]->rx_start();
836 return std::move(sub);
837 }
838 ethernet_address hw_address() {
839 struct ether_addr mac;
840 rte_eth_macaddr_get(_port_idx, &mac);
841
842 return mac.addr_bytes;
843 }
844 hw_features get_hw_features() {
845 return _hw_features;
846 }
847 const rss_key_type& rss_key() const { return _rss_key; }
848 uint16_t hw_queues_count() { return _num_queues; }
849 std::unique_ptr<DPDKQueuePair> init_local_queue(CephContext *c, EventCenter *center, string hugepages, uint16_t qid) {
850 std::unique_ptr<DPDKQueuePair> qp;
851 qp = std::unique_ptr<DPDKQueuePair>(new DPDKQueuePair(c, center, this, qid));
852 return std::move(qp);
853 }
854 unsigned hash2qid(uint32_t hash) {
855 // return hash % hw_queues_count();
856 return _redir_table[hash & (_redir_table.size() - 1)];
857 }
858 void set_local_queue(unsigned i, std::unique_ptr<DPDKQueuePair> qp) {
11fdf7f2 859 ceph_assert(!_queues[i]);
7c673cae
FG
860 _queues[i] = std::move(qp);
861 }
862 void unset_local_queue(unsigned i) {
11fdf7f2 863 ceph_assert(_queues[i]);
7c673cae
FG
864 _queues[i].reset();
865 }
866 template <typename Func>
867 unsigned forward_dst(unsigned src_cpuid, Func&& hashfn) {
868 auto& qp = queue_for_cpu(src_cpuid);
869 if (!qp._sw_reta)
870 return src_cpuid;
871
11fdf7f2 872 ceph_assert(!qp._sw_reta);
7c673cae
FG
873 auto hash = hashfn() >> _rss_table_bits;
874 auto& reta = *qp._sw_reta;
875 return reta[hash % reta.size()];
876 }
877 unsigned hash2cpu(uint32_t hash) {
878 // there is an assumption here that qid == get_id() which will
879 // not necessary be true in the future
880 return forward_dst(hash2qid(hash), [hash] { return hash; });
881 }
882
883 hw_features& hw_features_ref() { return _hw_features; }
884
885 const rte_eth_rxconf* def_rx_conf() const {
886 return &_dev_info.default_rxconf;
887 }
888
889 const rte_eth_txconf* def_tx_conf() const {
890 return &_dev_info.default_txconf;
891 }
892
893 /**
894 * Set the RSS table in the device and store it in the internal vector.
895 */
896 void set_rss_table();
897
898 uint8_t port_idx() { return _port_idx; }
899 bool is_i40e_device() const {
900 return _is_i40e_device;
901 }
902 bool is_vmxnet3_device() const {
903 return _is_vmxnet3_device;
904 }
905};
906
907
908std::unique_ptr<DPDKDevice> create_dpdk_net_device(
909 CephContext *c, unsigned cores, uint8_t port_idx = 0,
910 bool use_lro = true, bool enable_fc = true);
911
912
913/**
914 * @return Number of bytes needed for mempool objects of each QP.
915 */
916uint32_t qp_mempool_obj_size();
917
918#endif // CEPH_DPDK_DEV_H