]>
git.proxmox.com Git - ceph.git/blob - ceph/src/dpdk/examples/quota_watermark/qw/main.c
4 * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 #include <rte_common.h>
37 #include <rte_debug.h>
38 #include <rte_errno.h>
39 #include <rte_ethdev.h>
40 #include <rte_launch.h>
41 #include <rte_lcore.h>
46 #include <rte_byteorder.h>
51 #include "../include/conf.h"
55 #define SEND_PAUSE_FRAME(port_id, duration) send_pause_frame(port_id, duration)
57 #define SEND_PAUSE_FRAME(port_id, duration) do { } while(0)
60 #define ETHER_TYPE_FLOW_CONTROL 0x8808
62 struct ether_fc_frame
{
65 } __attribute__((__packed__
));
69 unsigned int *low_watermark
;
71 uint8_t port_pairs
[RTE_MAX_ETHPORTS
];
73 struct rte_ring
*rings
[RTE_MAX_LCORE
][RTE_MAX_ETHPORTS
];
74 struct rte_mempool
*mbuf_pool
;
77 static void send_pause_frame(uint8_t port_id
, uint16_t duration
)
79 struct rte_mbuf
*mbuf
;
80 struct ether_fc_frame
*pause_frame
;
81 struct ether_hdr
*hdr
;
82 struct ether_addr mac_addr
;
84 RTE_LOG(DEBUG
, USER1
, "Sending PAUSE frame (duration=%d) on port %d\n",
87 /* Get a mbuf from the pool */
88 mbuf
= rte_pktmbuf_alloc(mbuf_pool
);
89 if (unlikely(mbuf
== NULL
))
92 /* Prepare a PAUSE frame */
93 hdr
= rte_pktmbuf_mtod(mbuf
, struct ether_hdr
*);
94 pause_frame
= (struct ether_fc_frame
*) &hdr
[1];
96 rte_eth_macaddr_get(port_id
, &mac_addr
);
97 ether_addr_copy(&mac_addr
, &hdr
->s_addr
);
99 void *tmp
= &hdr
->d_addr
.addr_bytes
[0];
100 *((uint64_t *)tmp
) = 0x010000C28001ULL
;
102 hdr
->ether_type
= rte_cpu_to_be_16(ETHER_TYPE_FLOW_CONTROL
);
104 pause_frame
->opcode
= rte_cpu_to_be_16(0x0001);
105 pause_frame
->param
= rte_cpu_to_be_16(duration
);
110 rte_eth_tx_burst(port_id
, 0, &mbuf
, 1);
114 * Get the previous enabled lcore ID
117 * The current lcore ID.
119 * The previous enabled lcore_id or -1 if not found.
122 get_previous_lcore_id(unsigned int lcore_id
)
126 for (i
= lcore_id
- 1; i
>= 0; i
--)
127 if (rte_lcore_is_enabled(i
))
134 * Get the last enabled lcore ID
137 * The last enabled lcore_id.
140 get_last_lcore_id(void)
144 for (i
= RTE_MAX_LCORE
; i
>= 0; i
--)
145 if (rte_lcore_is_enabled(i
))
152 receive_stage(__attribute__((unused
)) void *args
)
159 unsigned int lcore_id
;
161 struct rte_mbuf
*pkts
[MAX_PKT_QUOTA
];
162 struct rte_ring
*ring
;
163 enum ring_state ring_state
[RTE_MAX_ETHPORTS
] = { RING_READY
};
165 lcore_id
= rte_lcore_id();
168 "%s() started on core %u\n", __func__
, lcore_id
);
172 /* Process each port round robin style */
173 for (port_id
= 0; port_id
< RTE_MAX_ETHPORTS
; port_id
++) {
175 if (!is_bit_set(port_id
, portmask
))
178 ring
= rings
[lcore_id
][port_id
];
180 if (ring_state
[port_id
] != RING_READY
) {
181 if (rte_ring_count(ring
) > *low_watermark
)
184 ring_state
[port_id
] = RING_READY
;
187 /* Enqueue received packets on the RX ring */
188 nb_rx_pkts
= rte_eth_rx_burst(port_id
, 0, pkts
, (uint16_t) *quota
);
189 ret
= rte_ring_enqueue_bulk(ring
, (void *) pkts
, nb_rx_pkts
);
190 if (ret
== -EDQUOT
) {
191 ring_state
[port_id
] = RING_OVERLOADED
;
192 send_pause_frame(port_id
, 1337);
195 else if (ret
== -ENOBUFS
) {
197 /* Return mbufs to the pool, effectively dropping packets */
198 for (i
= 0; i
< nb_rx_pkts
; i
++)
199 rte_pktmbuf_free(pkts
[i
]);
206 pipeline_stage(__attribute__((unused
)) void *args
)
213 unsigned int lcore_id
, previous_lcore_id
;
215 void *pkts
[MAX_PKT_QUOTA
];
216 struct rte_ring
*rx
, *tx
;
217 enum ring_state ring_state
[RTE_MAX_ETHPORTS
] = { RING_READY
};
219 lcore_id
= rte_lcore_id();
220 previous_lcore_id
= get_previous_lcore_id(lcore_id
);
223 "%s() started on core %u - processing packets from core %u\n",
224 __func__
, lcore_id
, previous_lcore_id
);
228 for (port_id
= 0; port_id
< RTE_MAX_ETHPORTS
; port_id
++) {
230 if (!is_bit_set(port_id
, portmask
))
233 tx
= rings
[lcore_id
][port_id
];
234 rx
= rings
[previous_lcore_id
][port_id
];
236 if (ring_state
[port_id
] != RING_READY
) {
237 if (rte_ring_count(tx
) > *low_watermark
)
240 ring_state
[port_id
] = RING_READY
;
243 /* Dequeue up to quota mbuf from rx */
244 nb_dq_pkts
= rte_ring_dequeue_burst(rx
, pkts
, *quota
);
245 if (unlikely(nb_dq_pkts
< 0))
248 /* Enqueue them on tx */
249 ret
= rte_ring_enqueue_bulk(tx
, pkts
, nb_dq_pkts
);
251 ring_state
[port_id
] = RING_OVERLOADED
;
253 else if (ret
== -ENOBUFS
) {
255 /* Return mbufs to the pool, effectively dropping packets */
256 for (i
= 0; i
< nb_dq_pkts
; i
++)
257 rte_pktmbuf_free(pkts
[i
]);
264 send_stage(__attribute__((unused
)) void *args
)
269 uint8_t dest_port_id
;
271 unsigned int lcore_id
, previous_lcore_id
;
274 struct rte_mbuf
*tx_pkts
[MAX_PKT_QUOTA
];
276 lcore_id
= rte_lcore_id();
277 previous_lcore_id
= get_previous_lcore_id(lcore_id
);
280 "%s() started on core %u - processing packets from core %u\n",
281 __func__
, lcore_id
, previous_lcore_id
);
285 /* Process each ring round robin style */
286 for (port_id
= 0; port_id
< RTE_MAX_ETHPORTS
; port_id
++) {
288 if (!is_bit_set(port_id
, portmask
))
291 dest_port_id
= port_pairs
[port_id
];
292 tx
= rings
[previous_lcore_id
][port_id
];
294 if (rte_ring_empty(tx
))
297 /* Dequeue packets from tx and send them */
298 nb_dq_pkts
= (uint16_t) rte_ring_dequeue_burst(tx
, (void *) tx_pkts
, *quota
);
299 rte_eth_tx_burst(dest_port_id
, 0, tx_pkts
, nb_dq_pkts
);
301 /* TODO: Check if nb_dq_pkts == nb_tx_pkts? */
307 main(int argc
, char **argv
)
310 unsigned int lcore_id
, master_lcore_id
, last_lcore_id
;
314 rte_set_log_level(RTE_LOG_INFO
);
316 ret
= rte_eal_init(argc
, argv
);
318 rte_exit(EXIT_FAILURE
, "Cannot initialize EAL\n");
324 setup_shared_variables();
327 *low_watermark
= 60 * RING_SIZE
/ 100;
329 last_lcore_id
= get_last_lcore_id();
330 master_lcore_id
= rte_get_master_lcore();
332 /* Parse the application's arguments */
333 ret
= parse_qw_args(argc
, argv
);
335 rte_exit(EXIT_FAILURE
, "Invalid quota/watermark argument(s)\n");
337 /* Create a pool of mbuf to store packets */
338 mbuf_pool
= rte_pktmbuf_pool_create("mbuf_pool", MBUF_PER_POOL
, 32, 0,
339 MBUF_DATA_SIZE
, rte_socket_id());
340 if (mbuf_pool
== NULL
)
341 rte_panic("%s\n", rte_strerror(rte_errno
));
343 for (port_id
= 0; port_id
< RTE_MAX_ETHPORTS
; port_id
++)
344 if (is_bit_set(port_id
, portmask
)) {
345 configure_eth_port(port_id
);
346 init_ring(master_lcore_id
, port_id
);
351 /* Start pipeline_connect() on all the available slave lcore but the last */
352 for (lcore_id
= 0 ; lcore_id
< last_lcore_id
; lcore_id
++) {
353 if (rte_lcore_is_enabled(lcore_id
) && lcore_id
!= master_lcore_id
) {
355 for (port_id
= 0; port_id
< RTE_MAX_ETHPORTS
; port_id
++)
356 if (is_bit_set(port_id
, portmask
))
357 init_ring(lcore_id
, port_id
);
359 /* typecast is a workaround for GCC 4.3 bug */
360 rte_eal_remote_launch((int (*)(void *))pipeline_stage
, NULL
, lcore_id
);
364 /* Start send_stage() on the last slave core */
365 /* typecast is a workaround for GCC 4.3 bug */
366 rte_eal_remote_launch((int (*)(void *))send_stage
, NULL
, last_lcore_id
);
368 /* Start receive_stage() on the master core */