1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2014 Intel Corporation
9 #include <rte_memory.h>
10 #include <rte_memzone.h>
11 #include <rte_errno.h>
12 #include <rte_compat.h>
13 #include <rte_string_fns.h>
14 #include <rte_eal_memconfig.h>
15 #include <rte_pause.h>
17 #include "rte_distributor_v20.h"
18 #include "rte_distributor_private.h"
20 TAILQ_HEAD(rte_distributor_list
, rte_distributor_v20
);
22 static struct rte_tailq_elem rte_distributor_tailq
= {
23 .name
= "RTE_DISTRIBUTOR",
25 EAL_REGISTER_TAILQ(rte_distributor_tailq
)
27 /**** APIs called by workers ****/
30 rte_distributor_request_pkt_v20(struct rte_distributor_v20
*d
,
31 unsigned worker_id
, struct rte_mbuf
*oldpkt
)
33 union rte_distributor_buffer_v20
*buf
= &d
->bufs
[worker_id
];
34 int64_t req
= (((int64_t)(uintptr_t)oldpkt
) << RTE_DISTRIB_FLAG_BITS
)
35 | RTE_DISTRIB_GET_BUF
;
36 while (unlikely(buf
->bufptr64
& RTE_DISTRIB_FLAGS_MASK
))
40 VERSION_SYMBOL(rte_distributor_request_pkt
, _v20
, 2.0);
43 rte_distributor_poll_pkt_v20(struct rte_distributor_v20
*d
,
46 union rte_distributor_buffer_v20
*buf
= &d
->bufs
[worker_id
];
47 if (buf
->bufptr64
& RTE_DISTRIB_GET_BUF
)
50 /* since bufptr64 is signed, this should be an arithmetic shift */
51 int64_t ret
= buf
->bufptr64
>> RTE_DISTRIB_FLAG_BITS
;
52 return (struct rte_mbuf
*)((uintptr_t)ret
);
54 VERSION_SYMBOL(rte_distributor_poll_pkt
, _v20
, 2.0);
57 rte_distributor_get_pkt_v20(struct rte_distributor_v20
*d
,
58 unsigned worker_id
, struct rte_mbuf
*oldpkt
)
61 rte_distributor_request_pkt_v20(d
, worker_id
, oldpkt
);
62 while ((ret
= rte_distributor_poll_pkt_v20(d
, worker_id
)) == NULL
)
66 VERSION_SYMBOL(rte_distributor_get_pkt
, _v20
, 2.0);
69 rte_distributor_return_pkt_v20(struct rte_distributor_v20
*d
,
70 unsigned worker_id
, struct rte_mbuf
*oldpkt
)
72 union rte_distributor_buffer_v20
*buf
= &d
->bufs
[worker_id
];
73 uint64_t req
= (((int64_t)(uintptr_t)oldpkt
) << RTE_DISTRIB_FLAG_BITS
)
74 | RTE_DISTRIB_RETURN_BUF
;
78 VERSION_SYMBOL(rte_distributor_return_pkt
, _v20
, 2.0);
80 /**** APIs called on distributor core ***/
82 /* as name suggests, adds a packet to the backlog for a particular worker */
84 add_to_backlog(struct rte_distributor_backlog
*bl
, int64_t item
)
86 if (bl
->count
== RTE_DISTRIB_BACKLOG_SIZE
)
89 bl
->pkts
[(bl
->start
+ bl
->count
++) & (RTE_DISTRIB_BACKLOG_MASK
)]
94 /* takes the next packet for a worker off the backlog */
96 backlog_pop(struct rte_distributor_backlog
*bl
)
99 return bl
->pkts
[bl
->start
++ & RTE_DISTRIB_BACKLOG_MASK
];
102 /* stores a packet returned from a worker inside the returns array */
104 store_return(uintptr_t oldbuf
, struct rte_distributor_v20
*d
,
105 unsigned *ret_start
, unsigned *ret_count
)
107 /* store returns in a circular buffer - code is branch-free */
108 d
->returns
.mbufs
[(*ret_start
+ *ret_count
) & RTE_DISTRIB_RETURNS_MASK
]
110 *ret_start
+= (*ret_count
== RTE_DISTRIB_RETURNS_MASK
) & !!(oldbuf
);
111 *ret_count
+= (*ret_count
!= RTE_DISTRIB_RETURNS_MASK
) & !!(oldbuf
);
115 handle_worker_shutdown(struct rte_distributor_v20
*d
, unsigned int wkr
)
117 d
->in_flight_tags
[wkr
] = 0;
118 d
->in_flight_bitmask
&= ~(1UL << wkr
);
119 d
->bufs
[wkr
].bufptr64
= 0;
120 if (unlikely(d
->backlog
[wkr
].count
!= 0)) {
121 /* On return of a packet, we need to move the
122 * queued packets for this core elsewhere.
123 * Easiest solution is to set things up for
124 * a recursive call. That will cause those
125 * packets to be queued up for the next free
126 * core, i.e. it will return as soon as a
127 * core becomes free to accept the first
128 * packet, as subsequent ones will be added to
129 * the backlog for that core.
131 struct rte_mbuf
*pkts
[RTE_DISTRIB_BACKLOG_SIZE
];
133 struct rte_distributor_backlog
*bl
= &d
->backlog
[wkr
];
135 for (i
= 0; i
< bl
->count
; i
++) {
136 unsigned idx
= (bl
->start
+ i
) &
137 RTE_DISTRIB_BACKLOG_MASK
;
138 pkts
[i
] = (void *)((uintptr_t)(bl
->pkts
[idx
] >>
139 RTE_DISTRIB_FLAG_BITS
));
142 * Note that the tags were set before first level call
143 * to rte_distributor_process.
145 rte_distributor_process_v20(d
, pkts
, i
);
146 bl
->count
= bl
->start
= 0;
150 /* this function is called when process() fn is called without any new
151 * packets. It goes through all the workers and clears any returned packets
152 * to do a partial flush.
155 process_returns(struct rte_distributor_v20
*d
)
158 unsigned flushed
= 0;
159 unsigned ret_start
= d
->returns
.start
,
160 ret_count
= d
->returns
.count
;
162 for (wkr
= 0; wkr
< d
->num_workers
; wkr
++) {
164 const int64_t data
= d
->bufs
[wkr
].bufptr64
;
165 uintptr_t oldbuf
= 0;
167 if (data
& RTE_DISTRIB_GET_BUF
) {
169 if (d
->backlog
[wkr
].count
)
170 d
->bufs
[wkr
].bufptr64
=
171 backlog_pop(&d
->backlog
[wkr
]);
173 d
->bufs
[wkr
].bufptr64
= RTE_DISTRIB_GET_BUF
;
174 d
->in_flight_tags
[wkr
] = 0;
175 d
->in_flight_bitmask
&= ~(1UL << wkr
);
177 oldbuf
= data
>> RTE_DISTRIB_FLAG_BITS
;
178 } else if (data
& RTE_DISTRIB_RETURN_BUF
) {
179 handle_worker_shutdown(d
, wkr
);
180 oldbuf
= data
>> RTE_DISTRIB_FLAG_BITS
;
183 store_return(oldbuf
, d
, &ret_start
, &ret_count
);
186 d
->returns
.start
= ret_start
;
187 d
->returns
.count
= ret_count
;
192 /* process a set of packets to distribute them to workers */
194 rte_distributor_process_v20(struct rte_distributor_v20
*d
,
195 struct rte_mbuf
**mbufs
, unsigned num_mbufs
)
197 unsigned next_idx
= 0;
199 struct rte_mbuf
*next_mb
= NULL
;
200 int64_t next_value
= 0;
201 uint32_t new_tag
= 0;
202 unsigned ret_start
= d
->returns
.start
,
203 ret_count
= d
->returns
.count
;
205 if (unlikely(num_mbufs
== 0))
206 return process_returns(d
);
208 while (next_idx
< num_mbufs
|| next_mb
!= NULL
) {
210 int64_t data
= d
->bufs
[wkr
].bufptr64
;
211 uintptr_t oldbuf
= 0;
214 next_mb
= mbufs
[next_idx
++];
215 next_value
= (((int64_t)(uintptr_t)next_mb
)
216 << RTE_DISTRIB_FLAG_BITS
);
218 * User is advocated to set tag value for each
219 * mbuf before calling rte_distributor_process.
220 * User defined tags are used to identify flows,
223 new_tag
= next_mb
->hash
.usr
;
226 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
227 * then the size of match has to be expanded.
232 * to scan for a match use "xor" and "not" to get a 0/1
233 * value, then use shifting to merge to single "match"
234 * variable, where a one-bit indicates a match for the
235 * worker given by the bit-position
237 for (i
= 0; i
< d
->num_workers
; i
++)
238 match
|= (!(d
->in_flight_tags
[i
] ^ new_tag
)
241 /* Only turned-on bits are considered as match */
242 match
&= d
->in_flight_bitmask
;
246 unsigned worker
= __builtin_ctzl(match
);
247 if (add_to_backlog(&d
->backlog
[worker
],
253 if ((data
& RTE_DISTRIB_GET_BUF
) &&
254 (d
->backlog
[wkr
].count
|| next_mb
)) {
256 if (d
->backlog
[wkr
].count
)
257 d
->bufs
[wkr
].bufptr64
=
258 backlog_pop(&d
->backlog
[wkr
]);
261 d
->bufs
[wkr
].bufptr64
= next_value
;
262 d
->in_flight_tags
[wkr
] = new_tag
;
263 d
->in_flight_bitmask
|= (1UL << wkr
);
266 oldbuf
= data
>> RTE_DISTRIB_FLAG_BITS
;
267 } else if (data
& RTE_DISTRIB_RETURN_BUF
) {
268 handle_worker_shutdown(d
, wkr
);
269 oldbuf
= data
>> RTE_DISTRIB_FLAG_BITS
;
272 /* store returns in a circular buffer */
273 store_return(oldbuf
, d
, &ret_start
, &ret_count
);
275 if (++wkr
== d
->num_workers
)
278 /* to finish, check all workers for backlog and schedule work for them
279 * if they are ready */
280 for (wkr
= 0; wkr
< d
->num_workers
; wkr
++)
281 if (d
->backlog
[wkr
].count
&&
282 (d
->bufs
[wkr
].bufptr64
& RTE_DISTRIB_GET_BUF
)) {
284 int64_t oldbuf
= d
->bufs
[wkr
].bufptr64
>>
285 RTE_DISTRIB_FLAG_BITS
;
286 store_return(oldbuf
, d
, &ret_start
, &ret_count
);
288 d
->bufs
[wkr
].bufptr64
= backlog_pop(&d
->backlog
[wkr
]);
291 d
->returns
.start
= ret_start
;
292 d
->returns
.count
= ret_count
;
295 VERSION_SYMBOL(rte_distributor_process
, _v20
, 2.0);
297 /* return to the caller, packets returned from workers */
299 rte_distributor_returned_pkts_v20(struct rte_distributor_v20
*d
,
300 struct rte_mbuf
**mbufs
, unsigned max_mbufs
)
302 struct rte_distributor_returned_pkts
*returns
= &d
->returns
;
303 unsigned retval
= (max_mbufs
< returns
->count
) ?
304 max_mbufs
: returns
->count
;
307 for (i
= 0; i
< retval
; i
++) {
308 unsigned idx
= (returns
->start
+ i
) & RTE_DISTRIB_RETURNS_MASK
;
309 mbufs
[i
] = returns
->mbufs
[idx
];
316 VERSION_SYMBOL(rte_distributor_returned_pkts
, _v20
, 2.0);
318 /* return the number of packets in-flight in a distributor, i.e. packets
319 * being worked on or queued up in a backlog.
321 static inline unsigned
322 total_outstanding(const struct rte_distributor_v20
*d
)
324 unsigned wkr
, total_outstanding
;
326 total_outstanding
= __builtin_popcountl(d
->in_flight_bitmask
);
328 for (wkr
= 0; wkr
< d
->num_workers
; wkr
++)
329 total_outstanding
+= d
->backlog
[wkr
].count
;
331 return total_outstanding
;
334 /* flush the distributor, so that there are no outstanding packets in flight or
337 rte_distributor_flush_v20(struct rte_distributor_v20
*d
)
339 const unsigned flushed
= total_outstanding(d
);
341 while (total_outstanding(d
) > 0)
342 rte_distributor_process_v20(d
, NULL
, 0);
346 VERSION_SYMBOL(rte_distributor_flush
, _v20
, 2.0);
348 /* clears the internal returns array in the distributor */
350 rte_distributor_clear_returns_v20(struct rte_distributor_v20
*d
)
352 d
->returns
.start
= d
->returns
.count
= 0;
354 memset(d
->returns
.mbufs
, 0, sizeof(d
->returns
.mbufs
));
357 VERSION_SYMBOL(rte_distributor_clear_returns
, _v20
, 2.0);
359 /* creates a distributor instance */
360 struct rte_distributor_v20
*
361 rte_distributor_create_v20(const char *name
,
363 unsigned num_workers
)
365 struct rte_distributor_v20
*d
;
366 struct rte_distributor_list
*distributor_list
;
367 char mz_name
[RTE_MEMZONE_NAMESIZE
];
368 const struct rte_memzone
*mz
;
370 /* compilation-time checks */
371 RTE_BUILD_BUG_ON((sizeof(*d
) & RTE_CACHE_LINE_MASK
) != 0);
372 RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS
& 7) != 0);
373 RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS
>
374 sizeof(d
->in_flight_bitmask
) * CHAR_BIT
);
376 if (name
== NULL
|| num_workers
>= RTE_DISTRIB_MAX_WORKERS
) {
381 snprintf(mz_name
, sizeof(mz_name
), RTE_DISTRIB_PREFIX
"%s", name
);
382 mz
= rte_memzone_reserve(mz_name
, sizeof(*d
), socket_id
, NO_FLAGS
);
389 strlcpy(d
->name
, name
, sizeof(d
->name
));
390 d
->num_workers
= num_workers
;
392 distributor_list
= RTE_TAILQ_CAST(rte_distributor_tailq
.head
,
393 rte_distributor_list
);
395 rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK
);
396 TAILQ_INSERT_TAIL(distributor_list
, d
, next
);
397 rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK
);
401 VERSION_SYMBOL(rte_distributor_create
, _v20
, 2.0);