1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation
10 #include <rte_string_fns.h>
11 #include <rte_branch_prediction.h>
12 #include <rte_debug.h>
13 #include <rte_lcore.h>
15 #include <rte_malloc.h>
16 #include <rte_memcpy.h>
17 #include <rte_memory.h>
18 #include <rte_memzone.h>
19 #include <rte_eal_memconfig.h>
21 #include "opdl_ring.h"
24 #define LIB_NAME "opdl_ring"
26 #define OPDL_NAME_SIZE 64
29 #define OPDL_EVENT_MASK (0x00000000000FFFFFULL)
30 #define OPDL_FLOWID_MASK (0xFFFFF)
31 #define OPDL_OPA_MASK (0xFF)
32 #define OPDL_OPA_OFFSET (0x38)
34 int opdl_logtype_driver
;
36 /* Types of dependency between stages */
38 DEP_NONE
= 0, /* no dependency */
39 DEP_DIRECT
, /* stage has direct dependency */
40 DEP_INDIRECT
, /* in-direct dependency through other stage(s) */
41 DEP_SELF
, /* stage dependency on itself, used to detect loops */
44 /* Shared section of stage state.
45 * Care is needed when accessing and the layout is important, especially to
46 * limit the adjacent cache-line HW prefetcher from impacting performance.
49 /* Last known minimum sequence number of dependencies, used for multi
52 uint32_t available_seq
;
53 char _pad1
[RTE_CACHE_LINE_SIZE
* 3];
54 uint32_t head
; /* Head sequence number (for multi thread operation) */
55 char _pad2
[RTE_CACHE_LINE_SIZE
* 3];
56 struct opdl_stage
*stage
; /* back pointer */
57 uint32_t tail
; /* Tail sequence number */
58 char _pad3
[RTE_CACHE_LINE_SIZE
* 2];
59 } __rte_cache_aligned
;
61 /* A structure to keep track of "unfinished" claims. This is only used for
62 * stages that are threadsafe. Each lcore accesses its own instance of this
63 * structure to record the entries it has claimed. This allows one lcore to make
64 * multiple claims without being blocked by another. When disclaiming it moves
65 * forward the shared tail when the shared tail matches the tail value recorded
68 struct claim_manager
{
69 uint32_t num_to_disclaim
;
76 } claims
[OPDL_DISCLAIMS_PER_LCORE
];
77 } __rte_cache_aligned
;
79 /* Context for each stage of opdl_ring.
80 * Calculations on sequence numbers need to be done with other uint32_t values
81 * so that results are modulus 2^32, and not undefined.
84 struct opdl_ring
*t
; /* back pointer, set at init */
85 uint32_t num_slots
; /* Number of slots for entries, set at init */
86 uint32_t index
; /* ID for this stage, set at init */
87 bool threadsafe
; /* Set to 1 if this stage supports threadsafe use */
88 /* Last known min seq number of dependencies for used for single thread
91 uint32_t available_seq
;
92 uint32_t head
; /* Current head for single-thread operation */
93 uint32_t nb_instance
; /* Number of instances */
94 uint32_t instance_id
; /* ID of this stage instance */
95 uint16_t num_claimed
; /* Number of slots claimed */
96 uint16_t num_event
; /* Number of events */
97 uint32_t seq
; /* sequence number */
98 uint32_t num_deps
; /* Number of direct dependencies */
99 /* Keep track of all dependencies, used during init only */
100 enum dep_type
*dep_tracking
;
101 /* Direct dependencies of this stage */
102 struct shared_state
**deps
;
103 /* Other stages read this! */
104 struct shared_state shared __rte_cache_aligned
;
105 /* For managing disclaims in multi-threaded processing stages */
106 struct claim_manager pending_disclaims
[RTE_MAX_LCORE
]
108 uint32_t shadow_head
; /* Shadow head for single-thread operation */
109 uint32_t queue_id
; /* ID of Queue which is assigned to this stage */
110 uint32_t pos
; /* Atomic scan position */
111 } __rte_cache_aligned
;
113 /* Context for opdl_ring */
115 char name
[OPDL_NAME_SIZE
]; /* OPDL queue instance name */
116 int socket
; /* NUMA socket that memory is allocated on */
117 uint32_t num_slots
; /* Number of slots for entries */
118 uint32_t mask
; /* Mask for sequence numbers (num_slots - 1) */
119 uint32_t slot_size
; /* Size of each slot in bytes */
120 uint32_t num_stages
; /* Number of stages that have been added */
121 uint32_t max_num_stages
; /* Max number of stages */
122 /* Stages indexed by ID */
123 struct opdl_stage
*stages
;
124 /* Memory for storing slot data */
125 uint8_t slots
[0] __rte_cache_aligned
;
129 /* Return input stage of a opdl_ring */
130 static __rte_always_inline
struct opdl_stage
*
131 input_stage(const struct opdl_ring
*t
)
133 return &t
->stages
[0];
136 /* Check if a stage is the input stage */
137 static __rte_always_inline
bool
138 is_input_stage(const struct opdl_stage
*s
)
140 return s
->index
== 0;
143 /* Get slot pointer from sequence number */
144 static __rte_always_inline
void *
145 get_slot(const struct opdl_ring
*t
, uint32_t n
)
147 return (void *)(uintptr_t)&t
->slots
[(n
& t
->mask
) * t
->slot_size
];
150 /* Find how many entries are available for processing */
151 static __rte_always_inline
uint32_t
152 available(const struct opdl_stage
*s
)
154 if (s
->threadsafe
== true) {
155 uint32_t n
= __atomic_load_n(&s
->shared
.available_seq
,
157 __atomic_load_n(&s
->shared
.head
,
160 /* Return 0 if available_seq needs to be updated */
161 return (n
<= s
->num_slots
) ? n
: 0;
164 /* Single threaded */
165 return s
->available_seq
- s
->head
;
168 /* Read sequence number of dependencies and find minimum */
169 static __rte_always_inline
void
170 update_available_seq(struct opdl_stage
*s
)
173 uint32_t this_tail
= s
->shared
.tail
;
174 uint32_t min_seq
= __atomic_load_n(&s
->deps
[0]->tail
, __ATOMIC_ACQUIRE
);
175 /* Input stage sequence numbers are greater than the sequence numbers of
176 * its dependencies so an offset of t->num_slots is needed when
177 * calculating available slots and also the condition which is used to
178 * determine the dependencies minimum sequence number must be reverted.
182 if (is_input_stage(s
)) {
184 for (i
= 1; i
< s
->num_deps
; i
++) {
185 uint32_t seq
= __atomic_load_n(&s
->deps
[i
]->tail
,
187 if ((this_tail
- seq
) > (this_tail
- min_seq
))
192 for (i
= 1; i
< s
->num_deps
; i
++) {
193 uint32_t seq
= __atomic_load_n(&s
->deps
[i
]->tail
,
195 if ((seq
- this_tail
) < (min_seq
- this_tail
))
200 if (s
->threadsafe
== false)
201 s
->available_seq
= min_seq
+ wrap
;
203 __atomic_store_n(&s
->shared
.available_seq
, min_seq
+ wrap
,
207 /* Wait until the number of available slots reaches number requested */
208 static __rte_always_inline
void
209 wait_for_available(struct opdl_stage
*s
, uint32_t n
)
211 while (available(s
) < n
) {
213 update_available_seq(s
);
217 /* Return number of slots to process based on number requested and mode */
218 static __rte_always_inline
uint32_t
219 num_to_process(struct opdl_stage
*s
, uint32_t n
, bool block
)
221 /* Don't read tail sequences of dependencies if not needed */
222 if (available(s
) >= n
)
225 update_available_seq(s
);
227 if (block
== false) {
228 uint32_t avail
= available(s
);
234 return (avail
<= n
) ? avail
: n
;
237 if (unlikely(n
> s
->num_slots
)) {
238 PMD_DRV_LOG(ERR
, "%u entries is more than max (%u)",
240 return 0; /* Avoid infinite loop */
243 wait_for_available(s
, n
);
247 /* Copy entries in to slots with wrap-around */
248 static __rte_always_inline
void
249 copy_entries_in(struct opdl_ring
*t
, uint32_t start
, const void *entries
,
250 uint32_t num_entries
)
252 uint32_t slot_size
= t
->slot_size
;
253 uint32_t slot_index
= start
& t
->mask
;
255 if (slot_index
+ num_entries
<= t
->num_slots
) {
256 rte_memcpy(get_slot(t
, start
), entries
,
257 num_entries
* slot_size
);
259 uint32_t split
= t
->num_slots
- slot_index
;
261 rte_memcpy(get_slot(t
, start
), entries
, split
* slot_size
);
262 rte_memcpy(get_slot(t
, 0),
263 RTE_PTR_ADD(entries
, split
* slot_size
),
264 (num_entries
- split
) * slot_size
);
268 /* Copy entries out from slots with wrap-around */
269 static __rte_always_inline
void
270 copy_entries_out(struct opdl_ring
*t
, uint32_t start
, void *entries
,
271 uint32_t num_entries
)
273 uint32_t slot_size
= t
->slot_size
;
274 uint32_t slot_index
= start
& t
->mask
;
276 if (slot_index
+ num_entries
<= t
->num_slots
) {
277 rte_memcpy(entries
, get_slot(t
, start
),
278 num_entries
* slot_size
);
280 uint32_t split
= t
->num_slots
- slot_index
;
282 rte_memcpy(entries
, get_slot(t
, start
), split
* slot_size
);
283 rte_memcpy(RTE_PTR_ADD(entries
, split
* slot_size
),
285 (num_entries
- split
) * slot_size
);
289 /* Input function optimised for single thread */
290 static __rte_always_inline
uint32_t
291 opdl_ring_input_singlethread(struct opdl_ring
*t
, const void *entries
,
292 uint32_t num_entries
, bool block
)
294 struct opdl_stage
*s
= input_stage(t
);
295 uint32_t head
= s
->head
;
297 num_entries
= num_to_process(s
, num_entries
, block
);
298 if (num_entries
== 0)
301 copy_entries_in(t
, head
, entries
, num_entries
);
303 s
->head
+= num_entries
;
304 __atomic_store_n(&s
->shared
.tail
, s
->head
, __ATOMIC_RELEASE
);
309 /* Convert head and tail of claim_manager into valid index */
310 static __rte_always_inline
uint32_t
311 claim_mgr_index(uint32_t n
)
313 return n
& (OPDL_DISCLAIMS_PER_LCORE
- 1);
316 /* Check if there are available slots in claim_manager */
317 static __rte_always_inline
bool
318 claim_mgr_available(struct claim_manager
*mgr
)
320 return (mgr
->mgr_head
< (mgr
->mgr_tail
+ OPDL_DISCLAIMS_PER_LCORE
)) ?
324 /* Record a new claim. Only use after first checking an entry is available */
325 static __rte_always_inline
void
326 claim_mgr_add(struct claim_manager
*mgr
, uint32_t tail
, uint32_t head
)
328 if ((mgr
->mgr_head
!= mgr
->mgr_tail
) &&
329 (mgr
->claims
[claim_mgr_index(mgr
->mgr_head
- 1)].head
==
331 /* Combine with previous claim */
332 mgr
->claims
[claim_mgr_index(mgr
->mgr_head
- 1)].head
= head
;
334 mgr
->claims
[claim_mgr_index(mgr
->mgr_head
)].head
= head
;
335 mgr
->claims
[claim_mgr_index(mgr
->mgr_head
)].tail
= tail
;
339 mgr
->num_claimed
+= (head
- tail
);
342 /* Read the oldest recorded claim */
343 static __rte_always_inline
bool
344 claim_mgr_read(struct claim_manager
*mgr
, uint32_t *tail
, uint32_t *head
)
346 if (mgr
->mgr_head
== mgr
->mgr_tail
)
349 *head
= mgr
->claims
[claim_mgr_index(mgr
->mgr_tail
)].head
;
350 *tail
= mgr
->claims
[claim_mgr_index(mgr
->mgr_tail
)].tail
;
354 /* Remove the oldest recorded claim. Only use after first reading the entry */
355 static __rte_always_inline
void
356 claim_mgr_remove(struct claim_manager
*mgr
)
358 mgr
->num_claimed
-= (mgr
->claims
[claim_mgr_index(mgr
->mgr_tail
)].head
-
359 mgr
->claims
[claim_mgr_index(mgr
->mgr_tail
)].tail
);
363 /* Update tail in the oldest claim. Only use after first reading the entry */
364 static __rte_always_inline
void
365 claim_mgr_move_tail(struct claim_manager
*mgr
, uint32_t num_entries
)
367 mgr
->num_claimed
-= num_entries
;
368 mgr
->claims
[claim_mgr_index(mgr
->mgr_tail
)].tail
+= num_entries
;
371 static __rte_always_inline
void
372 opdl_stage_disclaim_multithread_n(struct opdl_stage
*s
,
373 uint32_t num_entries
, bool block
)
375 struct claim_manager
*disclaims
= &s
->pending_disclaims
[rte_lcore_id()];
379 while (num_entries
) {
380 bool ret
= claim_mgr_read(disclaims
, &tail
, &head
);
383 break; /* nothing is claimed */
384 /* There should be no race condition here. If shared.tail
385 * matches, no other core can update it until this one does.
387 if (__atomic_load_n(&s
->shared
.tail
, __ATOMIC_ACQUIRE
) ==
389 if (num_entries
>= (head
- tail
)) {
390 claim_mgr_remove(disclaims
);
391 __atomic_store_n(&s
->shared
.tail
, head
,
393 num_entries
-= (head
- tail
);
395 claim_mgr_move_tail(disclaims
, num_entries
);
396 __atomic_store_n(&s
->shared
.tail
,
401 } else if (block
== false)
402 break; /* blocked by other thread */
403 /* Keep going until num_entries are disclaimed. */
407 disclaims
->num_to_disclaim
= num_entries
;
410 /* Move head atomically, returning number of entries available to process and
411 * the original value of head. For non-input stages, the claim is recorded
412 * so that the tail can be updated later by opdl_stage_disclaim().
414 static __rte_always_inline
void
415 move_head_atomically(struct opdl_stage
*s
, uint32_t *num_entries
,
416 uint32_t *old_head
, bool block
, bool claim_func
)
418 uint32_t orig_num_entries
= *num_entries
;
420 struct claim_manager
*disclaims
= &s
->pending_disclaims
[rte_lcore_id()];
422 /* Attempt to disclaim any outstanding claims */
423 opdl_stage_disclaim_multithread_n(s
, disclaims
->num_to_disclaim
,
426 *old_head
= __atomic_load_n(&s
->shared
.head
, __ATOMIC_ACQUIRE
);
429 /* If called by opdl_ring_input(), claim does not need to be
430 * recorded, as there will be no disclaim.
433 /* Check that the claim can be recorded */
434 ret
= claim_mgr_available(disclaims
);
436 /* exit out if claim can't be recorded */
442 *num_entries
= num_to_process(s
, orig_num_entries
, block
);
443 if (*num_entries
== 0)
446 success
= __atomic_compare_exchange_n(&s
->shared
.head
, old_head
,
447 *old_head
+ *num_entries
,
448 true, /* may fail spuriously */
449 __ATOMIC_RELEASE
, /* memory order on success */
450 __ATOMIC_ACQUIRE
); /* memory order on fail */
457 /* Store the claim record */
458 claim_mgr_add(disclaims
, *old_head
, *old_head
+ *num_entries
);
461 /* Input function that supports multiple threads */
462 static __rte_always_inline
uint32_t
463 opdl_ring_input_multithread(struct opdl_ring
*t
, const void *entries
,
464 uint32_t num_entries
, bool block
)
466 struct opdl_stage
*s
= input_stage(t
);
469 move_head_atomically(s
, &num_entries
, &old_head
, block
, false);
470 if (num_entries
== 0)
473 copy_entries_in(t
, old_head
, entries
, num_entries
);
475 /* If another thread started inputting before this one, but hasn't
476 * finished, we need to wait for it to complete to update the tail.
478 while (unlikely(__atomic_load_n(&s
->shared
.tail
, __ATOMIC_ACQUIRE
) !=
482 __atomic_store_n(&s
->shared
.tail
, old_head
+ num_entries
,
488 static __rte_always_inline
uint32_t
489 opdl_first_entry_id(uint32_t start_seq
, uint8_t nb_p_lcores
,
492 return ((nb_p_lcores
<= 1) ? 0 :
493 (nb_p_lcores
- (start_seq
% nb_p_lcores
) + this_lcore
) %
497 /* Claim slots to process, optimised for single-thread operation */
498 static __rte_always_inline
uint32_t
499 opdl_stage_claim_singlethread(struct opdl_stage
*s
, void *entries
,
500 uint32_t num_entries
, uint32_t *seq
, bool block
, bool atomic
)
502 uint32_t i
= 0, j
= 0, offset
;
504 uint32_t flow_id
= 0;
507 struct rte_event
*ev
;
509 struct opdl_ring
*t
= s
->t
;
510 uint8_t *entries_offset
= (uint8_t *)entries
;
514 offset
= opdl_first_entry_id(s
->seq
, s
->nb_instance
,
517 num_entries
= s
->nb_instance
* num_entries
;
519 num_entries
= num_to_process(s
, num_entries
, block
);
521 for (; offset
< num_entries
; offset
+= s
->nb_instance
) {
522 get_slots
= get_slot(t
, s
->head
+ offset
);
523 memcpy(entries_offset
, get_slots
, t
->slot_size
);
524 entries_offset
+= t
->slot_size
;
528 num_entries
= num_to_process(s
, num_entries
, block
);
530 for (j
= 0; j
< num_entries
; j
++) {
531 ev
= (struct rte_event
*)get_slot(t
, s
->head
+j
);
533 event
= __atomic_load_n(&(ev
->event
),
536 opa_id
= OPDL_OPA_MASK
& (event
>> OPDL_OPA_OFFSET
);
537 flow_id
= OPDL_FLOWID_MASK
& event
;
539 if (opa_id
>= s
->queue_id
)
542 if ((flow_id
% s
->nb_instance
) == s
->instance_id
) {
543 memcpy(entries_offset
, ev
, t
->slot_size
);
544 entries_offset
+= t
->slot_size
;
549 s
->shadow_head
= s
->head
;
550 s
->head
+= num_entries
;
551 s
->num_claimed
= num_entries
;
555 /* automatically disclaim entries if number of rte_events is zero */
556 if (unlikely(i
== 0))
557 opdl_stage_disclaim(s
, 0, false);
562 /* Thread-safe version of function to claim slots for processing */
563 static __rte_always_inline
uint32_t
564 opdl_stage_claim_multithread(struct opdl_stage
*s
, void *entries
,
565 uint32_t num_entries
, uint32_t *seq
, bool block
)
568 struct opdl_ring
*t
= s
->t
;
569 uint32_t i
= 0, offset
;
570 uint8_t *entries_offset
= (uint8_t *)entries
;
573 PMD_DRV_LOG(ERR
, "Invalid seq PTR");
576 offset
= opdl_first_entry_id(*seq
, s
->nb_instance
, s
->instance_id
);
577 num_entries
= offset
+ (s
->nb_instance
* num_entries
);
579 move_head_atomically(s
, &num_entries
, &old_head
, block
, true);
581 for (; offset
< num_entries
; offset
+= s
->nb_instance
) {
582 memcpy(entries_offset
, get_slot(t
, s
->head
+ offset
),
584 entries_offset
+= t
->slot_size
;
593 /* Claim and copy slot pointers, optimised for single-thread operation */
594 static __rte_always_inline
uint32_t
595 opdl_stage_claim_copy_singlethread(struct opdl_stage
*s
, void *entries
,
596 uint32_t num_entries
, uint32_t *seq
, bool block
)
598 num_entries
= num_to_process(s
, num_entries
, block
);
599 if (num_entries
== 0)
601 copy_entries_out(s
->t
, s
->head
, entries
, num_entries
);
604 s
->head
+= num_entries
;
608 /* Thread-safe version of function to claim and copy pointers to slots */
609 static __rte_always_inline
uint32_t
610 opdl_stage_claim_copy_multithread(struct opdl_stage
*s
, void *entries
,
611 uint32_t num_entries
, uint32_t *seq
, bool block
)
615 move_head_atomically(s
, &num_entries
, &old_head
, block
, true);
616 if (num_entries
== 0)
618 copy_entries_out(s
->t
, old_head
, entries
, num_entries
);
624 static __rte_always_inline
void
625 opdl_stage_disclaim_singlethread_n(struct opdl_stage
*s
,
626 uint32_t num_entries
)
628 uint32_t old_tail
= s
->shared
.tail
;
630 if (unlikely(num_entries
> (s
->head
- old_tail
))) {
631 PMD_DRV_LOG(WARNING
, "Attempt to disclaim (%u) more than claimed (%u)",
632 num_entries
, s
->head
- old_tail
);
633 num_entries
= s
->head
- old_tail
;
635 __atomic_store_n(&s
->shared
.tail
, num_entries
+ old_tail
,
640 opdl_ring_input(struct opdl_ring
*t
, const void *entries
, uint32_t num_entries
,
643 if (input_stage(t
)->threadsafe
== false)
644 return opdl_ring_input_singlethread(t
, entries
, num_entries
,
647 return opdl_ring_input_multithread(t
, entries
, num_entries
,
652 opdl_ring_copy_from_burst(struct opdl_ring
*t
, struct opdl_stage
*s
,
653 const void *entries
, uint32_t num_entries
, bool block
)
655 uint32_t head
= s
->head
;
657 num_entries
= num_to_process(s
, num_entries
, block
);
659 if (num_entries
== 0)
662 copy_entries_in(t
, head
, entries
, num_entries
);
664 s
->head
+= num_entries
;
665 __atomic_store_n(&s
->shared
.tail
, s
->head
, __ATOMIC_RELEASE
);
672 opdl_ring_copy_to_burst(struct opdl_ring
*t
, struct opdl_stage
*s
,
673 void *entries
, uint32_t num_entries
, bool block
)
675 uint32_t head
= s
->head
;
677 num_entries
= num_to_process(s
, num_entries
, block
);
678 if (num_entries
== 0)
681 copy_entries_out(t
, head
, entries
, num_entries
);
683 s
->head
+= num_entries
;
684 __atomic_store_n(&s
->shared
.tail
, s
->head
, __ATOMIC_RELEASE
);
690 opdl_stage_find_num_available(struct opdl_stage
*s
, uint32_t num_entries
)
692 /* return (num_to_process(s, num_entries, false)); */
694 if (available(s
) >= num_entries
)
697 update_available_seq(s
);
699 uint32_t avail
= available(s
);
705 return (avail
<= num_entries
) ? avail
: num_entries
;
709 opdl_stage_claim(struct opdl_stage
*s
, void *entries
,
710 uint32_t num_entries
, uint32_t *seq
, bool block
, bool atomic
)
712 if (s
->threadsafe
== false)
713 return opdl_stage_claim_singlethread(s
, entries
, num_entries
,
716 return opdl_stage_claim_multithread(s
, entries
, num_entries
,
721 opdl_stage_claim_copy(struct opdl_stage
*s
, void *entries
,
722 uint32_t num_entries
, uint32_t *seq
, bool block
)
724 if (s
->threadsafe
== false)
725 return opdl_stage_claim_copy_singlethread(s
, entries
,
726 num_entries
, seq
, block
);
728 return opdl_stage_claim_copy_multithread(s
, entries
,
729 num_entries
, seq
, block
);
733 opdl_stage_disclaim_n(struct opdl_stage
*s
, uint32_t num_entries
,
737 if (s
->threadsafe
== false) {
738 opdl_stage_disclaim_singlethread_n(s
, s
->num_claimed
);
740 struct claim_manager
*disclaims
=
741 &s
->pending_disclaims
[rte_lcore_id()];
743 if (unlikely(num_entries
> s
->num_slots
)) {
744 PMD_DRV_LOG(WARNING
, "Attempt to disclaim (%u) more than claimed (%u)",
745 num_entries
, disclaims
->num_claimed
);
746 num_entries
= disclaims
->num_claimed
;
749 num_entries
= RTE_MIN(num_entries
+ disclaims
->num_to_disclaim
,
750 disclaims
->num_claimed
);
751 opdl_stage_disclaim_multithread_n(s
, num_entries
, block
);
756 opdl_stage_disclaim(struct opdl_stage
*s
, uint32_t num_entries
, bool block
)
758 if (num_entries
!= s
->num_event
) {
762 if (s
->threadsafe
== false) {
763 __atomic_store_n(&s
->shared
.tail
, s
->head
, __ATOMIC_RELEASE
);
764 s
->seq
+= s
->num_claimed
;
765 s
->shadow_head
= s
->head
;
768 struct claim_manager
*disclaims
=
769 &s
->pending_disclaims
[rte_lcore_id()];
770 opdl_stage_disclaim_multithread_n(s
, disclaims
->num_claimed
,
777 opdl_ring_available(struct opdl_ring
*t
)
779 return opdl_stage_available(&t
->stages
[0]);
783 opdl_stage_available(struct opdl_stage
*s
)
785 update_available_seq(s
);
790 opdl_ring_flush(struct opdl_ring
*t
)
792 struct opdl_stage
*s
= input_stage(t
);
794 wait_for_available(s
, s
->num_slots
);
797 /******************** Non performance sensitive functions ********************/
799 /* Initial setup of a new stage's context */
801 init_stage(struct opdl_ring
*t
, struct opdl_stage
*s
, bool threadsafe
,
804 uint32_t available
= (is_input
) ? t
->num_slots
: 0;
807 s
->num_slots
= t
->num_slots
;
808 s
->index
= t
->num_stages
;
809 s
->threadsafe
= threadsafe
;
812 /* Alloc memory for deps */
813 s
->dep_tracking
= rte_zmalloc_socket(LIB_NAME
,
814 t
->max_num_stages
* sizeof(enum dep_type
),
816 if (s
->dep_tracking
== NULL
)
819 s
->deps
= rte_zmalloc_socket(LIB_NAME
,
820 t
->max_num_stages
* sizeof(struct shared_state
*),
822 if (s
->deps
== NULL
) {
823 rte_free(s
->dep_tracking
);
827 s
->dep_tracking
[s
->index
] = DEP_SELF
;
829 if (threadsafe
== true)
830 s
->shared
.available_seq
= available
;
832 s
->available_seq
= available
;
837 /* Add direct or indirect dependencies between stages */
839 add_dep(struct opdl_stage
*dependent
, const struct opdl_stage
*dependency
,
842 struct opdl_ring
*t
= dependent
->t
;
845 /* Add new direct dependency */
846 if ((type
== DEP_DIRECT
) &&
847 (dependent
->dep_tracking
[dependency
->index
] ==
849 PMD_DRV_LOG(DEBUG
, "%s:%u direct dependency on %u",
850 t
->name
, dependent
->index
, dependency
->index
);
851 dependent
->dep_tracking
[dependency
->index
] = DEP_DIRECT
;
854 /* Add new indirect dependency or change direct to indirect */
855 if ((type
== DEP_INDIRECT
) &&
856 ((dependent
->dep_tracking
[dependency
->index
] ==
858 (dependent
->dep_tracking
[dependency
->index
] ==
860 PMD_DRV_LOG(DEBUG
, "%s:%u indirect dependency on %u",
861 t
->name
, dependent
->index
, dependency
->index
);
862 dependent
->dep_tracking
[dependency
->index
] = DEP_INDIRECT
;
865 /* Shouldn't happen... */
866 if ((dependent
->dep_tracking
[dependency
->index
] == DEP_SELF
) &&
867 (dependent
!= input_stage(t
))) {
868 PMD_DRV_LOG(ERR
, "Loop in dependency graph %s:%u",
869 t
->name
, dependent
->index
);
873 /* Keep going to dependencies of the dependency, until input stage */
874 if (dependency
!= input_stage(t
))
875 for (i
= 0; i
< dependency
->num_deps
; i
++) {
876 int ret
= add_dep(dependent
, dependency
->deps
[i
]->stage
,
883 /* Make list of sequence numbers for direct dependencies only */
884 if (type
== DEP_DIRECT
)
885 for (i
= 0, dependent
->num_deps
= 0; i
< t
->num_stages
; i
++)
886 if (dependent
->dep_tracking
[i
] == DEP_DIRECT
) {
887 if ((i
== 0) && (dependent
->num_deps
> 1))
888 rte_panic("%s:%u depends on > input",
891 dependent
->deps
[dependent
->num_deps
++] =
892 &t
->stages
[i
].shared
;
899 opdl_ring_create(const char *name
, uint32_t num_slots
, uint32_t slot_size
,
900 uint32_t max_num_stages
, int socket
)
903 char mz_name
[RTE_MEMZONE_NAMESIZE
];
905 struct opdl_stage
*st
= NULL
;
906 const struct rte_memzone
*mz
= NULL
;
907 size_t alloc_size
= RTE_CACHE_LINE_ROUNDUP(sizeof(*t
) +
908 (num_slots
* slot_size
));
910 /* Compile time checking */
911 RTE_BUILD_BUG_ON((sizeof(struct shared_state
) & RTE_CACHE_LINE_MASK
) !=
913 RTE_BUILD_BUG_ON((offsetof(struct opdl_stage
, shared
) &
914 RTE_CACHE_LINE_MASK
) != 0);
915 RTE_BUILD_BUG_ON((offsetof(struct opdl_ring
, slots
) &
916 RTE_CACHE_LINE_MASK
) != 0);
917 RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE
));
919 /* Parameter checking */
921 PMD_DRV_LOG(ERR
, "name param is NULL");
924 if (!rte_is_power_of_2(num_slots
)) {
925 PMD_DRV_LOG(ERR
, "num_slots (%u) for %s is not power of 2",
930 /* Alloc memory for stages */
931 st
= rte_zmalloc_socket(LIB_NAME
,
932 max_num_stages
* sizeof(struct opdl_stage
),
933 RTE_CACHE_LINE_SIZE
, socket
);
937 snprintf(mz_name
, sizeof(mz_name
), "%s%s", LIB_NAME
, name
);
939 /* Alloc memory for memzone */
940 mz
= rte_memzone_reserve(mz_name
, alloc_size
, socket
, mz_flags
);
946 /* Initialise opdl_ring queue */
947 memset(t
, 0, sizeof(*t
));
948 strlcpy(t
->name
, name
, sizeof(t
->name
));
950 t
->num_slots
= num_slots
;
951 t
->mask
= num_slots
- 1;
952 t
->slot_size
= slot_size
;
953 t
->max_num_stages
= max_num_stages
;
956 PMD_DRV_LOG(DEBUG
, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
957 t
->name
, t
, num_slots
, socket
, slot_size
);
962 PMD_DRV_LOG(ERR
, "Cannot reserve memory");
964 rte_memzone_free(mz
);
970 opdl_ring_get_slot(const struct opdl_ring
*t
, uint32_t index
)
972 return get_slot(t
, index
);
976 opdl_ring_cas_slot(struct opdl_stage
*s
, const struct rte_event
*ev
,
977 uint32_t index
, bool atomic
)
979 uint32_t i
= 0, offset
;
980 struct opdl_ring
*t
= s
->t
;
981 struct rte_event
*ev_orig
= NULL
;
982 bool ev_updated
= false;
983 uint64_t ev_temp
= 0;
984 uint64_t ev_update
= 0;
987 uint32_t flow_id
= 0;
990 if (index
> s
->num_event
) {
991 PMD_DRV_LOG(ERR
, "index is overflow");
995 ev_temp
= ev
->event
& OPDL_EVENT_MASK
;
998 offset
= opdl_first_entry_id(s
->seq
, s
->nb_instance
,
1000 offset
+= index
*s
->nb_instance
;
1001 ev_orig
= get_slot(t
, s
->shadow_head
+offset
);
1002 if ((ev_orig
->event
&OPDL_EVENT_MASK
) != ev_temp
) {
1003 ev_orig
->event
= ev
->event
;
1006 if (ev_orig
->u64
!= ev
->u64
) {
1007 ev_orig
->u64
= ev
->u64
;
1012 for (i
= s
->pos
; i
< s
->num_claimed
; i
++) {
1013 ev_orig
= (struct rte_event
*)
1014 get_slot(t
, s
->shadow_head
+i
);
1016 event
= __atomic_load_n(&(ev_orig
->event
),
1019 opa_id
= OPDL_OPA_MASK
& (event
>> OPDL_OPA_OFFSET
);
1020 flow_id
= OPDL_FLOWID_MASK
& event
;
1022 if (opa_id
>= s
->queue_id
)
1025 if ((flow_id
% s
->nb_instance
) == s
->instance_id
) {
1026 ev_update
= s
->queue_id
;
1027 ev_update
= (ev_update
<< OPDL_OPA_OFFSET
)
1032 if ((event
& OPDL_EVENT_MASK
) !=
1034 __atomic_store_n(&(ev_orig
->event
),
1039 if (ev_orig
->u64
!= ev
->u64
) {
1040 ev_orig
->u64
= ev
->u64
;
1054 opdl_ring_get_socket(const struct opdl_ring
*t
)
1060 opdl_ring_get_num_slots(const struct opdl_ring
*t
)
1062 return t
->num_slots
;
1066 opdl_ring_get_name(const struct opdl_ring
*t
)
1071 /* Check dependency list is valid for a given opdl_ring */
1073 check_deps(struct opdl_ring
*t
, struct opdl_stage
*deps
[],
1078 for (i
= 0; i
< num_deps
; ++i
) {
1080 PMD_DRV_LOG(ERR
, "deps[%u] is NULL", i
);
1083 if (t
!= deps
[i
]->t
) {
1084 PMD_DRV_LOG(ERR
, "deps[%u] is in opdl_ring %s, not %s",
1085 i
, deps
[i
]->t
->name
, t
->name
);
1094 opdl_stage_add(struct opdl_ring
*t
, bool threadsafe
, bool is_input
)
1096 struct opdl_stage
*s
;
1098 /* Parameter checking */
1100 PMD_DRV_LOG(ERR
, "opdl_ring is NULL");
1103 if (t
->num_stages
== t
->max_num_stages
) {
1104 PMD_DRV_LOG(ERR
, "%s has max number of stages (%u)",
1105 t
->name
, t
->max_num_stages
);
1109 s
= &t
->stages
[t
->num_stages
];
1111 if (((uintptr_t)&s
->shared
& RTE_CACHE_LINE_MASK
) != 0)
1112 PMD_DRV_LOG(WARNING
, "Tail seq num (%p) of %s stage not cache aligned",
1113 &s
->shared
, t
->name
);
1115 if (init_stage(t
, s
, threadsafe
, is_input
) < 0) {
1116 PMD_DRV_LOG(ERR
, "Cannot reserve memory");
1125 opdl_stage_deps_add(struct opdl_ring
*t
, struct opdl_stage
*s
,
1126 uint32_t nb_instance
, uint32_t instance_id
,
1127 struct opdl_stage
*deps
[],
1133 if ((num_deps
> 0) && (!deps
)) {
1134 PMD_DRV_LOG(ERR
, "%s stage has NULL dependencies", t
->name
);
1137 ret
= check_deps(t
, deps
, num_deps
);
1141 for (i
= 0; i
< num_deps
; i
++) {
1142 ret
= add_dep(s
, deps
[i
], DEP_DIRECT
);
1147 s
->nb_instance
= nb_instance
;
1148 s
->instance_id
= instance_id
;
1154 opdl_ring_get_input_stage(const struct opdl_ring
*t
)
1156 return input_stage(t
);
1160 opdl_stage_set_deps(struct opdl_stage
*s
, struct opdl_stage
*deps
[],
1166 if ((num_deps
== 0) || (!deps
)) {
1167 PMD_DRV_LOG(ERR
, "cannot set NULL dependencies");
1171 ret
= check_deps(s
->t
, deps
, num_deps
);
1176 for (i
= 0; i
< num_deps
; i
++)
1177 s
->deps
[i
] = &deps
[i
]->shared
;
1178 s
->num_deps
= num_deps
;
1184 opdl_stage_get_opdl_ring(const struct opdl_stage
*s
)
1190 opdl_stage_set_queue_id(struct opdl_stage
*s
,
1193 s
->queue_id
= queue_id
;
1197 opdl_ring_dump(const struct opdl_ring
*t
, FILE *f
)
1202 fprintf(f
, "NULL OPDL!\n");
1205 fprintf(f
, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1206 t
->name
, t
->num_slots
, t
->mask
, t
->slot_size
,
1207 t
->num_stages
, t
->socket
);
1208 for (i
= 0; i
< t
->num_stages
; i
++) {
1210 const struct opdl_stage
*s
= &t
->stages
[i
];
1212 fprintf(f
, " %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1213 t
->name
, i
, (s
->threadsafe
) ? "true" : "false",
1214 (s
->threadsafe
) ? s
->shared
.head
: s
->head
,
1215 (s
->threadsafe
) ? s
->shared
.available_seq
:
1217 s
->shared
.tail
, (s
->num_deps
> 0) ?
1218 s
->deps
[0]->stage
->index
: 0);
1219 for (j
= 1; j
< s
->num_deps
; j
++)
1220 fprintf(f
, ",%u", s
->deps
[j
]->stage
->index
);
1227 opdl_ring_free(struct opdl_ring
*t
)
1230 const struct rte_memzone
*mz
;
1231 char mz_name
[RTE_MEMZONE_NAMESIZE
];
1234 PMD_DRV_LOG(DEBUG
, "Freeing NULL OPDL Ring!");
1238 PMD_DRV_LOG(DEBUG
, "Freeing %s opdl_ring at %p", t
->name
, t
);
1240 for (i
= 0; i
< t
->num_stages
; ++i
) {
1241 rte_free(t
->stages
[i
].deps
);
1242 rte_free(t
->stages
[i
].dep_tracking
);
1245 rte_free(t
->stages
);
1247 snprintf(mz_name
, sizeof(mz_name
), "%s%s", LIB_NAME
, t
->name
);
1248 mz
= rte_memzone_lookup(mz_name
);
1249 if (rte_memzone_free(mz
) != 0)
1250 PMD_DRV_LOG(ERR
, "Cannot free memzone for %s", t
->name
);
1253 /* search a opdl_ring from its name */
1255 opdl_ring_lookup(const char *name
)
1257 const struct rte_memzone
*mz
;
1258 char mz_name
[RTE_MEMZONE_NAMESIZE
];
1260 snprintf(mz_name
, sizeof(mz_name
), "%s%s", LIB_NAME
, name
);
1262 mz
= rte_memzone_lookup(mz_name
);
1270 opdl_ring_set_stage_threadsafe(struct opdl_stage
*s
, bool threadsafe
)
1272 s
->threadsafe
= threadsafe
;