1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation
9 #include <rte_memory.h>
10 #include <rte_cycles.h>
11 #include <rte_compat.h>
12 #include <rte_memzone.h>
13 #include <rte_errno.h>
14 #include <rte_string_fns.h>
15 #include <rte_eal_memconfig.h>
16 #include <rte_pause.h>
18 #include "rte_distributor_private.h"
19 #include "rte_distributor.h"
20 #include "rte_distributor_v20.h"
21 #include "rte_distributor_v1705.h"
23 TAILQ_HEAD(rte_dist_burst_list
, rte_distributor
);
25 static struct rte_tailq_elem rte_dist_burst_tailq
= {
26 .name
= "RTE_DIST_BURST",
28 EAL_REGISTER_TAILQ(rte_dist_burst_tailq
)
30 /**** APIs called by workers ****/
32 /**** Burst Packet APIs called by workers ****/
35 rte_distributor_request_pkt_v1705(struct rte_distributor
*d
,
36 unsigned int worker_id
, struct rte_mbuf
**oldpkt
,
39 struct rte_distributor_buffer
*buf
= &(d
->bufs
[worker_id
]);
42 volatile int64_t *retptr64
;
44 if (unlikely(d
->alg_type
== RTE_DIST_ALG_SINGLE
)) {
45 rte_distributor_request_pkt_v20(d
->d_v20
,
46 worker_id
, oldpkt
[0]);
50 retptr64
= &(buf
->retptr64
[0]);
51 /* Spin while handshake bits are set (scheduler clears it) */
52 while (unlikely(*retptr64
& RTE_DISTRIB_GET_BUF
)) {
54 uint64_t t
= rte_rdtsc()+100;
56 while (rte_rdtsc() < t
)
61 * OK, if we've got here, then the scheduler has just cleared the
62 * handshake bits. Populate the retptrs with returning packets.
65 for (i
= count
; i
< RTE_DIST_BURST_SIZE
; i
++)
68 /* Set Return bit for each packet returned */
69 for (i
= count
; i
-- > 0; )
71 (((int64_t)(uintptr_t)(oldpkt
[i
])) <<
72 RTE_DISTRIB_FLAG_BITS
) | RTE_DISTRIB_RETURN_BUF
;
75 * Finally, set the GET_BUF to signal to distributor that cache
76 * line is ready for processing
78 *retptr64
|= RTE_DISTRIB_GET_BUF
;
80 BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt
, _v1705
, 17.05);
81 MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct rte_distributor
*d
,
82 unsigned int worker_id
, struct rte_mbuf
**oldpkt
,
84 rte_distributor_request_pkt_v1705
);
87 rte_distributor_poll_pkt_v1705(struct rte_distributor
*d
,
88 unsigned int worker_id
, struct rte_mbuf
**pkts
)
90 struct rte_distributor_buffer
*buf
= &d
->bufs
[worker_id
];
95 if (unlikely(d
->alg_type
== RTE_DIST_ALG_SINGLE
)) {
96 pkts
[0] = rte_distributor_poll_pkt_v20(d
->d_v20
, worker_id
);
97 return (pkts
[0]) ? 1 : 0;
100 /* If bit is set, return */
101 if (buf
->bufptr64
[0] & RTE_DISTRIB_GET_BUF
)
104 /* since bufptr64 is signed, this should be an arithmetic shift */
105 for (i
= 0; i
< RTE_DIST_BURST_SIZE
; i
++) {
106 if (likely(buf
->bufptr64
[i
] & RTE_DISTRIB_VALID_BUF
)) {
107 ret
= buf
->bufptr64
[i
] >> RTE_DISTRIB_FLAG_BITS
;
108 pkts
[count
++] = (struct rte_mbuf
*)((uintptr_t)(ret
));
113 * so now we've got the contents of the cacheline into an array of
114 * mbuf pointers, so toggle the bit so scheduler can start working
115 * on the next cacheline while we're working.
117 buf
->bufptr64
[0] |= RTE_DISTRIB_GET_BUF
;
121 BIND_DEFAULT_SYMBOL(rte_distributor_poll_pkt
, _v1705
, 17.05);
122 MAP_STATIC_SYMBOL(int rte_distributor_poll_pkt(struct rte_distributor
*d
,
123 unsigned int worker_id
, struct rte_mbuf
**pkts
),
124 rte_distributor_poll_pkt_v1705
);
127 rte_distributor_get_pkt_v1705(struct rte_distributor
*d
,
128 unsigned int worker_id
, struct rte_mbuf
**pkts
,
129 struct rte_mbuf
**oldpkt
, unsigned int return_count
)
133 if (unlikely(d
->alg_type
== RTE_DIST_ALG_SINGLE
)) {
134 if (return_count
<= 1) {
135 pkts
[0] = rte_distributor_get_pkt_v20(d
->d_v20
,
136 worker_id
, oldpkt
[0]);
137 return (pkts
[0]) ? 1 : 0;
142 rte_distributor_request_pkt(d
, worker_id
, oldpkt
, return_count
);
144 count
= rte_distributor_poll_pkt(d
, worker_id
, pkts
);
145 while (count
== -1) {
146 uint64_t t
= rte_rdtsc() + 100;
148 while (rte_rdtsc() < t
)
151 count
= rte_distributor_poll_pkt(d
, worker_id
, pkts
);
155 BIND_DEFAULT_SYMBOL(rte_distributor_get_pkt
, _v1705
, 17.05);
156 MAP_STATIC_SYMBOL(int rte_distributor_get_pkt(struct rte_distributor
*d
,
157 unsigned int worker_id
, struct rte_mbuf
**pkts
,
158 struct rte_mbuf
**oldpkt
, unsigned int return_count
),
159 rte_distributor_get_pkt_v1705
);
162 rte_distributor_return_pkt_v1705(struct rte_distributor
*d
,
163 unsigned int worker_id
, struct rte_mbuf
**oldpkt
, int num
)
165 struct rte_distributor_buffer
*buf
= &d
->bufs
[worker_id
];
168 if (unlikely(d
->alg_type
== RTE_DIST_ALG_SINGLE
)) {
170 return rte_distributor_return_pkt_v20(d
->d_v20
,
171 worker_id
, oldpkt
[0]);
176 for (i
= 0; i
< RTE_DIST_BURST_SIZE
; i
++)
177 /* Switch off the return bit first */
178 buf
->retptr64
[i
] &= ~RTE_DISTRIB_RETURN_BUF
;
180 for (i
= num
; i
-- > 0; )
181 buf
->retptr64
[i
] = (((int64_t)(uintptr_t)oldpkt
[i
]) <<
182 RTE_DISTRIB_FLAG_BITS
) | RTE_DISTRIB_RETURN_BUF
;
184 /* set the GET_BUF but even if we got no returns */
185 buf
->retptr64
[0] |= RTE_DISTRIB_GET_BUF
;
189 BIND_DEFAULT_SYMBOL(rte_distributor_return_pkt
, _v1705
, 17.05);
190 MAP_STATIC_SYMBOL(int rte_distributor_return_pkt(struct rte_distributor
*d
,
191 unsigned int worker_id
, struct rte_mbuf
**oldpkt
, int num
),
192 rte_distributor_return_pkt_v1705
);
194 /**** APIs called on distributor core ***/
196 /* stores a packet returned from a worker inside the returns array */
198 store_return(uintptr_t oldbuf
, struct rte_distributor
*d
,
199 unsigned int *ret_start
, unsigned int *ret_count
)
203 /* store returns in a circular buffer */
204 d
->returns
.mbufs
[(*ret_start
+ *ret_count
) & RTE_DISTRIB_RETURNS_MASK
]
206 *ret_start
+= (*ret_count
== RTE_DISTRIB_RETURNS_MASK
);
207 *ret_count
+= (*ret_count
!= RTE_DISTRIB_RETURNS_MASK
);
211 * Match then flow_ids (tags) of the incoming packets to the flow_ids
212 * of the inflight packets (both inflight on the workers and in each worker
213 * backlog). This will then allow us to pin those packets to the relevant
214 * workers to give us our atomic flow pinning.
217 find_match_scalar(struct rte_distributor
*d
,
219 uint16_t *output_ptr
)
221 struct rte_distributor_backlog
*bl
;
226 * 1. Loop through all worker ID's
227 * 2. Compare the current inflights to the incoming tags
228 * 3. Compare the current backlog to the incoming tags
229 * 4. Add any matches to the output
232 for (j
= 0 ; j
< RTE_DIST_BURST_SIZE
; j
++)
235 for (i
= 0; i
< d
->num_workers
; i
++) {
238 for (j
= 0; j
< RTE_DIST_BURST_SIZE
; j
++)
239 for (w
= 0; w
< RTE_DIST_BURST_SIZE
; w
++)
240 if (d
->in_flight_tags
[i
][j
] == data_ptr
[w
]) {
244 for (j
= 0; j
< RTE_DIST_BURST_SIZE
; j
++)
245 for (w
= 0; w
< RTE_DIST_BURST_SIZE
; w
++)
246 if (bl
->tags
[j
] == data_ptr
[w
]) {
253 * At this stage, the output contains 8 16-bit values, with
254 * each non-zero value containing the worker ID on which the
255 * corresponding flow is pinned to.
261 * When the handshake bits indicate that there are packets coming
262 * back from the worker, this function is called to copy and store
263 * the valid returned pointers (store_return).
266 handle_returns(struct rte_distributor
*d
, unsigned int wkr
)
268 struct rte_distributor_buffer
*buf
= &(d
->bufs
[wkr
]);
270 unsigned int ret_start
= d
->returns
.start
,
271 ret_count
= d
->returns
.count
;
272 unsigned int count
= 0;
275 if (buf
->retptr64
[0] & RTE_DISTRIB_GET_BUF
) {
276 for (i
= 0; i
< RTE_DIST_BURST_SIZE
; i
++) {
277 if (buf
->retptr64
[i
] & RTE_DISTRIB_RETURN_BUF
) {
278 oldbuf
= ((uintptr_t)(buf
->retptr64
[i
] >>
279 RTE_DISTRIB_FLAG_BITS
));
280 /* store returns in a circular buffer */
281 store_return(oldbuf
, d
, &ret_start
, &ret_count
);
283 buf
->retptr64
[i
] &= ~RTE_DISTRIB_RETURN_BUF
;
286 d
->returns
.start
= ret_start
;
287 d
->returns
.count
= ret_count
;
288 /* Clear for the worker to populate with more returns */
289 buf
->retptr64
[0] = 0;
295 * This function releases a burst (cache line) to a worker.
296 * It is called from the process function when a cacheline is
297 * full to make room for more packets for that worker, or when
298 * all packets have been assigned to bursts and need to be flushed
300 * It also needs to wait for any outstanding packets from the worker
301 * before sending out new packets.
304 release(struct rte_distributor
*d
, unsigned int wkr
)
306 struct rte_distributor_buffer
*buf
= &(d
->bufs
[wkr
]);
309 while (!(d
->bufs
[wkr
].bufptr64
[0] & RTE_DISTRIB_GET_BUF
))
312 handle_returns(d
, wkr
);
316 for (i
= 0; i
< d
->backlog
[wkr
].count
; i
++) {
317 d
->bufs
[wkr
].bufptr64
[i
] = d
->backlog
[wkr
].pkts
[i
] |
318 RTE_DISTRIB_GET_BUF
| RTE_DISTRIB_VALID_BUF
;
319 d
->in_flight_tags
[wkr
][i
] = d
->backlog
[wkr
].tags
[i
];
322 for ( ; i
< RTE_DIST_BURST_SIZE
; i
++) {
323 buf
->bufptr64
[i
] = RTE_DISTRIB_GET_BUF
;
324 d
->in_flight_tags
[wkr
][i
] = 0;
327 d
->backlog
[wkr
].count
= 0;
329 /* Clear the GET bit */
330 buf
->bufptr64
[0] &= ~RTE_DISTRIB_GET_BUF
;
336 /* process a set of packets to distribute them to workers */
338 rte_distributor_process_v1705(struct rte_distributor
*d
,
339 struct rte_mbuf
**mbufs
, unsigned int num_mbufs
)
341 unsigned int next_idx
= 0;
342 static unsigned int wkr
;
343 struct rte_mbuf
*next_mb
= NULL
;
344 int64_t next_value
= 0;
345 uint16_t new_tag
= 0;
346 uint16_t flows
[RTE_DIST_BURST_SIZE
] __rte_cache_aligned
;
347 unsigned int i
, j
, w
, wid
;
349 if (d
->alg_type
== RTE_DIST_ALG_SINGLE
) {
350 /* Call the old API */
351 return rte_distributor_process_v20(d
->d_v20
, mbufs
, num_mbufs
);
354 if (unlikely(num_mbufs
== 0)) {
355 /* Flush out all non-full cache-lines to workers. */
356 for (wid
= 0 ; wid
< d
->num_workers
; wid
++) {
357 if (d
->bufs
[wid
].bufptr64
[0] & RTE_DISTRIB_GET_BUF
) {
359 handle_returns(d
, wid
);
365 while (next_idx
< num_mbufs
) {
366 uint16_t matches
[RTE_DIST_BURST_SIZE
];
369 if (d
->bufs
[wkr
].bufptr64
[0] & RTE_DISTRIB_GET_BUF
)
370 d
->bufs
[wkr
].count
= 0;
372 if ((num_mbufs
- next_idx
) < RTE_DIST_BURST_SIZE
)
373 pkts
= num_mbufs
- next_idx
;
375 pkts
= RTE_DIST_BURST_SIZE
;
377 for (i
= 0; i
< pkts
; i
++) {
378 if (mbufs
[next_idx
+ i
]) {
379 /* flows have to be non-zero */
380 flows
[i
] = mbufs
[next_idx
+ i
]->hash
.usr
| 1;
384 for (; i
< RTE_DIST_BURST_SIZE
; i
++)
387 switch (d
->dist_match_fn
) {
388 case RTE_DIST_MATCH_VECTOR
:
389 find_match_vec(d
, &flows
[0], &matches
[0]);
392 find_match_scalar(d
, &flows
[0], &matches
[0]);
396 * Matches array now contain the intended worker ID (+1) of
397 * the incoming packets. Any zeroes need to be assigned
401 for (j
= 0; j
< pkts
; j
++) {
403 next_mb
= mbufs
[next_idx
++];
404 next_value
= (((int64_t)(uintptr_t)next_mb
) <<
405 RTE_DISTRIB_FLAG_BITS
);
407 * User is advocated to set tag value for each
408 * mbuf before calling rte_distributor_process.
409 * User defined tags are used to identify flows,
412 /* flows MUST be non-zero */
413 new_tag
= (uint16_t)(next_mb
->hash
.usr
) | 1;
416 * Uncommenting the next line will cause the find_match
417 * function to be optimized out, making this function
418 * do parallel (non-atomic) distribution
420 /* matches[j] = 0; */
423 struct rte_distributor_backlog
*bl
=
424 &d
->backlog
[matches
[j
]-1];
425 if (unlikely(bl
->count
==
426 RTE_DIST_BURST_SIZE
)) {
427 release(d
, matches
[j
]-1);
430 /* Add to worker that already has flow */
431 unsigned int idx
= bl
->count
++;
433 bl
->tags
[idx
] = new_tag
;
434 bl
->pkts
[idx
] = next_value
;
437 struct rte_distributor_backlog
*bl
=
439 if (unlikely(bl
->count
==
440 RTE_DIST_BURST_SIZE
)) {
444 /* Add to current worker worker */
445 unsigned int idx
= bl
->count
++;
447 bl
->tags
[idx
] = new_tag
;
448 bl
->pkts
[idx
] = next_value
;
450 * Now that we've just added an unpinned flow
451 * to a worker, we need to ensure that all
452 * other packets with that same flow will go
453 * to the same worker in this burst.
455 for (w
= j
; w
< pkts
; w
++)
456 if (flows
[w
] == new_tag
)
461 if (wkr
>= d
->num_workers
)
465 /* Flush out all non-full cache-lines to workers. */
466 for (wid
= 0 ; wid
< d
->num_workers
; wid
++)
467 if ((d
->bufs
[wid
].bufptr64
[0] & RTE_DISTRIB_GET_BUF
))
472 BIND_DEFAULT_SYMBOL(rte_distributor_process
, _v1705
, 17.05);
473 MAP_STATIC_SYMBOL(int rte_distributor_process(struct rte_distributor
*d
,
474 struct rte_mbuf
**mbufs
, unsigned int num_mbufs
),
475 rte_distributor_process_v1705
);
477 /* return to the caller, packets returned from workers */
479 rte_distributor_returned_pkts_v1705(struct rte_distributor
*d
,
480 struct rte_mbuf
**mbufs
, unsigned int max_mbufs
)
482 struct rte_distributor_returned_pkts
*returns
= &d
->returns
;
483 unsigned int retval
= (max_mbufs
< returns
->count
) ?
484 max_mbufs
: returns
->count
;
487 if (d
->alg_type
== RTE_DIST_ALG_SINGLE
) {
488 /* Call the old API */
489 return rte_distributor_returned_pkts_v20(d
->d_v20
,
493 for (i
= 0; i
< retval
; i
++) {
494 unsigned int idx
= (returns
->start
+ i
) &
495 RTE_DISTRIB_RETURNS_MASK
;
497 mbufs
[i
] = returns
->mbufs
[idx
];
504 BIND_DEFAULT_SYMBOL(rte_distributor_returned_pkts
, _v1705
, 17.05);
505 MAP_STATIC_SYMBOL(int rte_distributor_returned_pkts(struct rte_distributor
*d
,
506 struct rte_mbuf
**mbufs
, unsigned int max_mbufs
),
507 rte_distributor_returned_pkts_v1705
);
510 * Return the number of packets in-flight in a distributor, i.e. packets
511 * being worked on or queued up in a backlog.
513 static inline unsigned int
514 total_outstanding(const struct rte_distributor
*d
)
516 unsigned int wkr
, total_outstanding
= 0;
518 for (wkr
= 0; wkr
< d
->num_workers
; wkr
++)
519 total_outstanding
+= d
->backlog
[wkr
].count
;
521 return total_outstanding
;
525 * Flush the distributor, so that there are no outstanding packets in flight or
529 rte_distributor_flush_v1705(struct rte_distributor
*d
)
531 unsigned int flushed
;
534 if (d
->alg_type
== RTE_DIST_ALG_SINGLE
) {
535 /* Call the old API */
536 return rte_distributor_flush_v20(d
->d_v20
);
539 flushed
= total_outstanding(d
);
541 while (total_outstanding(d
) > 0)
542 rte_distributor_process(d
, NULL
, 0);
545 * Send empty burst to all workers to allow them to exit
546 * gracefully, should they need to.
548 rte_distributor_process(d
, NULL
, 0);
550 for (wkr
= 0; wkr
< d
->num_workers
; wkr
++)
551 handle_returns(d
, wkr
);
555 BIND_DEFAULT_SYMBOL(rte_distributor_flush
, _v1705
, 17.05);
556 MAP_STATIC_SYMBOL(int rte_distributor_flush(struct rte_distributor
*d
),
557 rte_distributor_flush_v1705
);
559 /* clears the internal returns array in the distributor */
561 rte_distributor_clear_returns_v1705(struct rte_distributor
*d
)
565 if (d
->alg_type
== RTE_DIST_ALG_SINGLE
) {
566 /* Call the old API */
567 rte_distributor_clear_returns_v20(d
->d_v20
);
571 /* throw away returns, so workers can exit */
572 for (wkr
= 0; wkr
< d
->num_workers
; wkr
++)
573 d
->bufs
[wkr
].retptr64
[0] = 0;
575 BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns
, _v1705
, 17.05);
576 MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct rte_distributor
*d
),
577 rte_distributor_clear_returns_v1705
);
579 /* creates a distributor instance */
580 struct rte_distributor
*
581 rte_distributor_create_v1705(const char *name
,
582 unsigned int socket_id
,
583 unsigned int num_workers
,
584 unsigned int alg_type
)
586 struct rte_distributor
*d
;
587 struct rte_dist_burst_list
*dist_burst_list
;
588 char mz_name
[RTE_MEMZONE_NAMESIZE
];
589 const struct rte_memzone
*mz
;
592 /* TODO Reorganise function properly around RTE_DIST_ALG_SINGLE/BURST */
594 /* compilation-time checks */
595 RTE_BUILD_BUG_ON((sizeof(*d
) & RTE_CACHE_LINE_MASK
) != 0);
596 RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS
& 7) != 0);
598 if (alg_type
== RTE_DIST_ALG_SINGLE
) {
599 d
= malloc(sizeof(struct rte_distributor
));
604 d
->d_v20
= rte_distributor_create_v20(name
,
605 socket_id
, num_workers
);
606 if (d
->d_v20
== NULL
) {
608 /* rte_errno will have been set */
611 d
->alg_type
= alg_type
;
615 if (name
== NULL
|| num_workers
>= RTE_DISTRIB_MAX_WORKERS
) {
620 snprintf(mz_name
, sizeof(mz_name
), RTE_DISTRIB_PREFIX
"%s", name
);
621 mz
= rte_memzone_reserve(mz_name
, sizeof(*d
), socket_id
, NO_FLAGS
);
628 strlcpy(d
->name
, name
, sizeof(d
->name
));
629 d
->num_workers
= num_workers
;
630 d
->alg_type
= alg_type
;
632 d
->dist_match_fn
= RTE_DIST_MATCH_SCALAR
;
633 #if defined(RTE_ARCH_X86)
634 d
->dist_match_fn
= RTE_DIST_MATCH_VECTOR
;
638 * Set up the backlog tags so they're pointing at the second cache
639 * line for performance during flow matching
641 for (i
= 0 ; i
< num_workers
; i
++)
642 d
->backlog
[i
].tags
= &d
->in_flight_tags
[i
][RTE_DIST_BURST_SIZE
];
644 dist_burst_list
= RTE_TAILQ_CAST(rte_dist_burst_tailq
.head
,
645 rte_dist_burst_list
);
648 rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK
);
649 TAILQ_INSERT_TAIL(dist_burst_list
, d
, next
);
650 rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK
);
654 BIND_DEFAULT_SYMBOL(rte_distributor_create
, _v1705
, 17.05);
655 MAP_STATIC_SYMBOL(struct rte_distributor
*rte_distributor_create(
656 const char *name
, unsigned int socket_id
,
657 unsigned int num_workers
, unsigned int alg_type
),
658 rte_distributor_create_v1705
);