]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/dpdk/drivers/event/opdl/opdl_ring.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / dpdk / drivers / event / opdl / opdl_ring.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation
3 */
4
5 #include <stdbool.h>
6 #include <stddef.h>
7 #include <stdint.h>
8 #include <stdio.h>
9
10 #include <rte_string_fns.h>
11 #include <rte_branch_prediction.h>
12 #include <rte_debug.h>
13 #include <rte_lcore.h>
14 #include <rte_log.h>
15 #include <rte_malloc.h>
16 #include <rte_memcpy.h>
17 #include <rte_memory.h>
18 #include <rte_memzone.h>
19 #include <rte_eal_memconfig.h>
20
21 #include "opdl_ring.h"
22 #include "opdl_log.h"
23
24 #define LIB_NAME "opdl_ring"
25
26 #define OPDL_NAME_SIZE 64
27
28
29 #define OPDL_EVENT_MASK (0x00000000000FFFFFULL)
30 #define OPDL_FLOWID_MASK (0xFFFFF)
31 #define OPDL_OPA_MASK (0xFF)
32 #define OPDL_OPA_OFFSET (0x38)
33
34 int opdl_logtype_driver;
35
36 /* Types of dependency between stages */
37 enum dep_type {
38 DEP_NONE = 0, /* no dependency */
39 DEP_DIRECT, /* stage has direct dependency */
40 DEP_INDIRECT, /* in-direct dependency through other stage(s) */
41 DEP_SELF, /* stage dependency on itself, used to detect loops */
42 };
43
44 /* Shared section of stage state.
45 * Care is needed when accessing and the layout is important, especially to
46 * limit the adjacent cache-line HW prefetcher from impacting performance.
47 */
48 struct shared_state {
49 /* Last known minimum sequence number of dependencies, used for multi
50 * thread operation
51 */
52 uint32_t available_seq;
53 char _pad1[RTE_CACHE_LINE_SIZE * 3];
54 uint32_t head; /* Head sequence number (for multi thread operation) */
55 char _pad2[RTE_CACHE_LINE_SIZE * 3];
56 struct opdl_stage *stage; /* back pointer */
57 uint32_t tail; /* Tail sequence number */
58 char _pad3[RTE_CACHE_LINE_SIZE * 2];
59 } __rte_cache_aligned;
60
61 /* A structure to keep track of "unfinished" claims. This is only used for
62 * stages that are threadsafe. Each lcore accesses its own instance of this
63 * structure to record the entries it has claimed. This allows one lcore to make
64 * multiple claims without being blocked by another. When disclaiming it moves
65 * forward the shared tail when the shared tail matches the tail value recorded
66 * here.
67 */
68 struct claim_manager {
69 uint32_t num_to_disclaim;
70 uint32_t num_claimed;
71 uint32_t mgr_head;
72 uint32_t mgr_tail;
73 struct {
74 uint32_t head;
75 uint32_t tail;
76 } claims[OPDL_DISCLAIMS_PER_LCORE];
77 } __rte_cache_aligned;
78
79 /* Context for each stage of opdl_ring.
80 * Calculations on sequence numbers need to be done with other uint32_t values
81 * so that results are modulus 2^32, and not undefined.
82 */
83 struct opdl_stage {
84 struct opdl_ring *t; /* back pointer, set at init */
85 uint32_t num_slots; /* Number of slots for entries, set at init */
86 uint32_t index; /* ID for this stage, set at init */
87 bool threadsafe; /* Set to 1 if this stage supports threadsafe use */
88 /* Last known min seq number of dependencies for used for single thread
89 * operation
90 */
91 uint32_t available_seq;
92 uint32_t head; /* Current head for single-thread operation */
93 uint32_t nb_instance; /* Number of instances */
94 uint32_t instance_id; /* ID of this stage instance */
95 uint16_t num_claimed; /* Number of slots claimed */
96 uint16_t num_event; /* Number of events */
97 uint32_t seq; /* sequence number */
98 uint32_t num_deps; /* Number of direct dependencies */
99 /* Keep track of all dependencies, used during init only */
100 enum dep_type *dep_tracking;
101 /* Direct dependencies of this stage */
102 struct shared_state **deps;
103 /* Other stages read this! */
104 struct shared_state shared __rte_cache_aligned;
105 /* For managing disclaims in multi-threaded processing stages */
106 struct claim_manager pending_disclaims[RTE_MAX_LCORE]
107 __rte_cache_aligned;
108 uint32_t shadow_head; /* Shadow head for single-thread operation */
109 uint32_t queue_id; /* ID of Queue which is assigned to this stage */
110 uint32_t pos; /* Atomic scan position */
111 } __rte_cache_aligned;
112
113 /* Context for opdl_ring */
114 struct opdl_ring {
115 char name[OPDL_NAME_SIZE]; /* OPDL queue instance name */
116 int socket; /* NUMA socket that memory is allocated on */
117 uint32_t num_slots; /* Number of slots for entries */
118 uint32_t mask; /* Mask for sequence numbers (num_slots - 1) */
119 uint32_t slot_size; /* Size of each slot in bytes */
120 uint32_t num_stages; /* Number of stages that have been added */
121 uint32_t max_num_stages; /* Max number of stages */
122 /* Stages indexed by ID */
123 struct opdl_stage *stages;
124 /* Memory for storing slot data */
125 uint8_t slots[0] __rte_cache_aligned;
126 };
127
128
129 /* Return input stage of a opdl_ring */
130 static __rte_always_inline struct opdl_stage *
131 input_stage(const struct opdl_ring *t)
132 {
133 return &t->stages[0];
134 }
135
136 /* Check if a stage is the input stage */
137 static __rte_always_inline bool
138 is_input_stage(const struct opdl_stage *s)
139 {
140 return s->index == 0;
141 }
142
143 /* Get slot pointer from sequence number */
144 static __rte_always_inline void *
145 get_slot(const struct opdl_ring *t, uint32_t n)
146 {
147 return (void *)(uintptr_t)&t->slots[(n & t->mask) * t->slot_size];
148 }
149
150 /* Find how many entries are available for processing */
151 static __rte_always_inline uint32_t
152 available(const struct opdl_stage *s)
153 {
154 if (s->threadsafe == true) {
155 uint32_t n = __atomic_load_n(&s->shared.available_seq,
156 __ATOMIC_ACQUIRE) -
157 __atomic_load_n(&s->shared.head,
158 __ATOMIC_ACQUIRE);
159
160 /* Return 0 if available_seq needs to be updated */
161 return (n <= s->num_slots) ? n : 0;
162 }
163
164 /* Single threaded */
165 return s->available_seq - s->head;
166 }
167
168 /* Read sequence number of dependencies and find minimum */
169 static __rte_always_inline void
170 update_available_seq(struct opdl_stage *s)
171 {
172 uint32_t i;
173 uint32_t this_tail = s->shared.tail;
174 uint32_t min_seq = __atomic_load_n(&s->deps[0]->tail, __ATOMIC_ACQUIRE);
175 /* Input stage sequence numbers are greater than the sequence numbers of
176 * its dependencies so an offset of t->num_slots is needed when
177 * calculating available slots and also the condition which is used to
178 * determine the dependencies minimum sequence number must be reverted.
179 */
180 uint32_t wrap;
181
182 if (is_input_stage(s)) {
183 wrap = s->num_slots;
184 for (i = 1; i < s->num_deps; i++) {
185 uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
186 __ATOMIC_ACQUIRE);
187 if ((this_tail - seq) > (this_tail - min_seq))
188 min_seq = seq;
189 }
190 } else {
191 wrap = 0;
192 for (i = 1; i < s->num_deps; i++) {
193 uint32_t seq = __atomic_load_n(&s->deps[i]->tail,
194 __ATOMIC_ACQUIRE);
195 if ((seq - this_tail) < (min_seq - this_tail))
196 min_seq = seq;
197 }
198 }
199
200 if (s->threadsafe == false)
201 s->available_seq = min_seq + wrap;
202 else
203 __atomic_store_n(&s->shared.available_seq, min_seq + wrap,
204 __ATOMIC_RELEASE);
205 }
206
207 /* Wait until the number of available slots reaches number requested */
208 static __rte_always_inline void
209 wait_for_available(struct opdl_stage *s, uint32_t n)
210 {
211 while (available(s) < n) {
212 rte_pause();
213 update_available_seq(s);
214 }
215 }
216
217 /* Return number of slots to process based on number requested and mode */
218 static __rte_always_inline uint32_t
219 num_to_process(struct opdl_stage *s, uint32_t n, bool block)
220 {
221 /* Don't read tail sequences of dependencies if not needed */
222 if (available(s) >= n)
223 return n;
224
225 update_available_seq(s);
226
227 if (block == false) {
228 uint32_t avail = available(s);
229
230 if (avail == 0) {
231 rte_pause();
232 return 0;
233 }
234 return (avail <= n) ? avail : n;
235 }
236
237 if (unlikely(n > s->num_slots)) {
238 PMD_DRV_LOG(ERR, "%u entries is more than max (%u)",
239 n, s->num_slots);
240 return 0; /* Avoid infinite loop */
241 }
242 /* blocking */
243 wait_for_available(s, n);
244 return n;
245 }
246
247 /* Copy entries in to slots with wrap-around */
248 static __rte_always_inline void
249 copy_entries_in(struct opdl_ring *t, uint32_t start, const void *entries,
250 uint32_t num_entries)
251 {
252 uint32_t slot_size = t->slot_size;
253 uint32_t slot_index = start & t->mask;
254
255 if (slot_index + num_entries <= t->num_slots) {
256 rte_memcpy(get_slot(t, start), entries,
257 num_entries * slot_size);
258 } else {
259 uint32_t split = t->num_slots - slot_index;
260
261 rte_memcpy(get_slot(t, start), entries, split * slot_size);
262 rte_memcpy(get_slot(t, 0),
263 RTE_PTR_ADD(entries, split * slot_size),
264 (num_entries - split) * slot_size);
265 }
266 }
267
268 /* Copy entries out from slots with wrap-around */
269 static __rte_always_inline void
270 copy_entries_out(struct opdl_ring *t, uint32_t start, void *entries,
271 uint32_t num_entries)
272 {
273 uint32_t slot_size = t->slot_size;
274 uint32_t slot_index = start & t->mask;
275
276 if (slot_index + num_entries <= t->num_slots) {
277 rte_memcpy(entries, get_slot(t, start),
278 num_entries * slot_size);
279 } else {
280 uint32_t split = t->num_slots - slot_index;
281
282 rte_memcpy(entries, get_slot(t, start), split * slot_size);
283 rte_memcpy(RTE_PTR_ADD(entries, split * slot_size),
284 get_slot(t, 0),
285 (num_entries - split) * slot_size);
286 }
287 }
288
289 /* Input function optimised for single thread */
290 static __rte_always_inline uint32_t
291 opdl_ring_input_singlethread(struct opdl_ring *t, const void *entries,
292 uint32_t num_entries, bool block)
293 {
294 struct opdl_stage *s = input_stage(t);
295 uint32_t head = s->head;
296
297 num_entries = num_to_process(s, num_entries, block);
298 if (num_entries == 0)
299 return 0;
300
301 copy_entries_in(t, head, entries, num_entries);
302
303 s->head += num_entries;
304 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
305
306 return num_entries;
307 }
308
309 /* Convert head and tail of claim_manager into valid index */
310 static __rte_always_inline uint32_t
311 claim_mgr_index(uint32_t n)
312 {
313 return n & (OPDL_DISCLAIMS_PER_LCORE - 1);
314 }
315
316 /* Check if there are available slots in claim_manager */
317 static __rte_always_inline bool
318 claim_mgr_available(struct claim_manager *mgr)
319 {
320 return (mgr->mgr_head < (mgr->mgr_tail + OPDL_DISCLAIMS_PER_LCORE)) ?
321 true : false;
322 }
323
324 /* Record a new claim. Only use after first checking an entry is available */
325 static __rte_always_inline void
326 claim_mgr_add(struct claim_manager *mgr, uint32_t tail, uint32_t head)
327 {
328 if ((mgr->mgr_head != mgr->mgr_tail) &&
329 (mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head ==
330 tail)) {
331 /* Combine with previous claim */
332 mgr->claims[claim_mgr_index(mgr->mgr_head - 1)].head = head;
333 } else {
334 mgr->claims[claim_mgr_index(mgr->mgr_head)].head = head;
335 mgr->claims[claim_mgr_index(mgr->mgr_head)].tail = tail;
336 mgr->mgr_head++;
337 }
338
339 mgr->num_claimed += (head - tail);
340 }
341
342 /* Read the oldest recorded claim */
343 static __rte_always_inline bool
344 claim_mgr_read(struct claim_manager *mgr, uint32_t *tail, uint32_t *head)
345 {
346 if (mgr->mgr_head == mgr->mgr_tail)
347 return false;
348
349 *head = mgr->claims[claim_mgr_index(mgr->mgr_tail)].head;
350 *tail = mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail;
351 return true;
352 }
353
354 /* Remove the oldest recorded claim. Only use after first reading the entry */
355 static __rte_always_inline void
356 claim_mgr_remove(struct claim_manager *mgr)
357 {
358 mgr->num_claimed -= (mgr->claims[claim_mgr_index(mgr->mgr_tail)].head -
359 mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail);
360 mgr->mgr_tail++;
361 }
362
363 /* Update tail in the oldest claim. Only use after first reading the entry */
364 static __rte_always_inline void
365 claim_mgr_move_tail(struct claim_manager *mgr, uint32_t num_entries)
366 {
367 mgr->num_claimed -= num_entries;
368 mgr->claims[claim_mgr_index(mgr->mgr_tail)].tail += num_entries;
369 }
370
371 static __rte_always_inline void
372 opdl_stage_disclaim_multithread_n(struct opdl_stage *s,
373 uint32_t num_entries, bool block)
374 {
375 struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
376 uint32_t head;
377 uint32_t tail;
378
379 while (num_entries) {
380 bool ret = claim_mgr_read(disclaims, &tail, &head);
381
382 if (ret == false)
383 break; /* nothing is claimed */
384 /* There should be no race condition here. If shared.tail
385 * matches, no other core can update it until this one does.
386 */
387 if (__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) ==
388 tail) {
389 if (num_entries >= (head - tail)) {
390 claim_mgr_remove(disclaims);
391 __atomic_store_n(&s->shared.tail, head,
392 __ATOMIC_RELEASE);
393 num_entries -= (head - tail);
394 } else {
395 claim_mgr_move_tail(disclaims, num_entries);
396 __atomic_store_n(&s->shared.tail,
397 num_entries + tail,
398 __ATOMIC_RELEASE);
399 num_entries = 0;
400 }
401 } else if (block == false)
402 break; /* blocked by other thread */
403 /* Keep going until num_entries are disclaimed. */
404 rte_pause();
405 }
406
407 disclaims->num_to_disclaim = num_entries;
408 }
409
410 /* Move head atomically, returning number of entries available to process and
411 * the original value of head. For non-input stages, the claim is recorded
412 * so that the tail can be updated later by opdl_stage_disclaim().
413 */
414 static __rte_always_inline void
415 move_head_atomically(struct opdl_stage *s, uint32_t *num_entries,
416 uint32_t *old_head, bool block, bool claim_func)
417 {
418 uint32_t orig_num_entries = *num_entries;
419 uint32_t ret;
420 struct claim_manager *disclaims = &s->pending_disclaims[rte_lcore_id()];
421
422 /* Attempt to disclaim any outstanding claims */
423 opdl_stage_disclaim_multithread_n(s, disclaims->num_to_disclaim,
424 false);
425
426 *old_head = __atomic_load_n(&s->shared.head, __ATOMIC_ACQUIRE);
427 while (true) {
428 bool success;
429 /* If called by opdl_ring_input(), claim does not need to be
430 * recorded, as there will be no disclaim.
431 */
432 if (claim_func) {
433 /* Check that the claim can be recorded */
434 ret = claim_mgr_available(disclaims);
435 if (ret == false) {
436 /* exit out if claim can't be recorded */
437 *num_entries = 0;
438 return;
439 }
440 }
441
442 *num_entries = num_to_process(s, orig_num_entries, block);
443 if (*num_entries == 0)
444 return;
445
446 success = __atomic_compare_exchange_n(&s->shared.head, old_head,
447 *old_head + *num_entries,
448 true, /* may fail spuriously */
449 __ATOMIC_RELEASE, /* memory order on success */
450 __ATOMIC_ACQUIRE); /* memory order on fail */
451 if (likely(success))
452 break;
453 rte_pause();
454 }
455
456 if (claim_func)
457 /* Store the claim record */
458 claim_mgr_add(disclaims, *old_head, *old_head + *num_entries);
459 }
460
461 /* Input function that supports multiple threads */
462 static __rte_always_inline uint32_t
463 opdl_ring_input_multithread(struct opdl_ring *t, const void *entries,
464 uint32_t num_entries, bool block)
465 {
466 struct opdl_stage *s = input_stage(t);
467 uint32_t old_head;
468
469 move_head_atomically(s, &num_entries, &old_head, block, false);
470 if (num_entries == 0)
471 return 0;
472
473 copy_entries_in(t, old_head, entries, num_entries);
474
475 /* If another thread started inputting before this one, but hasn't
476 * finished, we need to wait for it to complete to update the tail.
477 */
478 while (unlikely(__atomic_load_n(&s->shared.tail, __ATOMIC_ACQUIRE) !=
479 old_head))
480 rte_pause();
481
482 __atomic_store_n(&s->shared.tail, old_head + num_entries,
483 __ATOMIC_RELEASE);
484
485 return num_entries;
486 }
487
488 static __rte_always_inline uint32_t
489 opdl_first_entry_id(uint32_t start_seq, uint8_t nb_p_lcores,
490 uint8_t this_lcore)
491 {
492 return ((nb_p_lcores <= 1) ? 0 :
493 (nb_p_lcores - (start_seq % nb_p_lcores) + this_lcore) %
494 nb_p_lcores);
495 }
496
497 /* Claim slots to process, optimised for single-thread operation */
498 static __rte_always_inline uint32_t
499 opdl_stage_claim_singlethread(struct opdl_stage *s, void *entries,
500 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
501 {
502 uint32_t i = 0, j = 0, offset;
503 uint32_t opa_id = 0;
504 uint32_t flow_id = 0;
505 uint64_t event = 0;
506 void *get_slots;
507 struct rte_event *ev;
508 RTE_SET_USED(seq);
509 struct opdl_ring *t = s->t;
510 uint8_t *entries_offset = (uint8_t *)entries;
511
512 if (!atomic) {
513
514 offset = opdl_first_entry_id(s->seq, s->nb_instance,
515 s->instance_id);
516
517 num_entries = s->nb_instance * num_entries;
518
519 num_entries = num_to_process(s, num_entries, block);
520
521 for (; offset < num_entries; offset += s->nb_instance) {
522 get_slots = get_slot(t, s->head + offset);
523 memcpy(entries_offset, get_slots, t->slot_size);
524 entries_offset += t->slot_size;
525 i++;
526 }
527 } else {
528 num_entries = num_to_process(s, num_entries, block);
529
530 for (j = 0; j < num_entries; j++) {
531 ev = (struct rte_event *)get_slot(t, s->head+j);
532
533 event = __atomic_load_n(&(ev->event),
534 __ATOMIC_ACQUIRE);
535
536 opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
537 flow_id = OPDL_FLOWID_MASK & event;
538
539 if (opa_id >= s->queue_id)
540 continue;
541
542 if ((flow_id % s->nb_instance) == s->instance_id) {
543 memcpy(entries_offset, ev, t->slot_size);
544 entries_offset += t->slot_size;
545 i++;
546 }
547 }
548 }
549 s->shadow_head = s->head;
550 s->head += num_entries;
551 s->num_claimed = num_entries;
552 s->num_event = i;
553 s->pos = 0;
554
555 /* automatically disclaim entries if number of rte_events is zero */
556 if (unlikely(i == 0))
557 opdl_stage_disclaim(s, 0, false);
558
559 return i;
560 }
561
562 /* Thread-safe version of function to claim slots for processing */
563 static __rte_always_inline uint32_t
564 opdl_stage_claim_multithread(struct opdl_stage *s, void *entries,
565 uint32_t num_entries, uint32_t *seq, bool block)
566 {
567 uint32_t old_head;
568 struct opdl_ring *t = s->t;
569 uint32_t i = 0, offset;
570 uint8_t *entries_offset = (uint8_t *)entries;
571
572 if (seq == NULL) {
573 PMD_DRV_LOG(ERR, "Invalid seq PTR");
574 return 0;
575 }
576 offset = opdl_first_entry_id(*seq, s->nb_instance, s->instance_id);
577 num_entries = offset + (s->nb_instance * num_entries);
578
579 move_head_atomically(s, &num_entries, &old_head, block, true);
580
581 for (; offset < num_entries; offset += s->nb_instance) {
582 memcpy(entries_offset, get_slot(t, s->head + offset),
583 t->slot_size);
584 entries_offset += t->slot_size;
585 i++;
586 }
587
588 *seq = old_head;
589
590 return i;
591 }
592
593 /* Claim and copy slot pointers, optimised for single-thread operation */
594 static __rte_always_inline uint32_t
595 opdl_stage_claim_copy_singlethread(struct opdl_stage *s, void *entries,
596 uint32_t num_entries, uint32_t *seq, bool block)
597 {
598 num_entries = num_to_process(s, num_entries, block);
599 if (num_entries == 0)
600 return 0;
601 copy_entries_out(s->t, s->head, entries, num_entries);
602 if (seq != NULL)
603 *seq = s->head;
604 s->head += num_entries;
605 return num_entries;
606 }
607
608 /* Thread-safe version of function to claim and copy pointers to slots */
609 static __rte_always_inline uint32_t
610 opdl_stage_claim_copy_multithread(struct opdl_stage *s, void *entries,
611 uint32_t num_entries, uint32_t *seq, bool block)
612 {
613 uint32_t old_head;
614
615 move_head_atomically(s, &num_entries, &old_head, block, true);
616 if (num_entries == 0)
617 return 0;
618 copy_entries_out(s->t, old_head, entries, num_entries);
619 if (seq != NULL)
620 *seq = old_head;
621 return num_entries;
622 }
623
624 static __rte_always_inline void
625 opdl_stage_disclaim_singlethread_n(struct opdl_stage *s,
626 uint32_t num_entries)
627 {
628 uint32_t old_tail = s->shared.tail;
629
630 if (unlikely(num_entries > (s->head - old_tail))) {
631 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
632 num_entries, s->head - old_tail);
633 num_entries = s->head - old_tail;
634 }
635 __atomic_store_n(&s->shared.tail, num_entries + old_tail,
636 __ATOMIC_RELEASE);
637 }
638
639 uint32_t
640 opdl_ring_input(struct opdl_ring *t, const void *entries, uint32_t num_entries,
641 bool block)
642 {
643 if (input_stage(t)->threadsafe == false)
644 return opdl_ring_input_singlethread(t, entries, num_entries,
645 block);
646 else
647 return opdl_ring_input_multithread(t, entries, num_entries,
648 block);
649 }
650
651 uint32_t
652 opdl_ring_copy_from_burst(struct opdl_ring *t, struct opdl_stage *s,
653 const void *entries, uint32_t num_entries, bool block)
654 {
655 uint32_t head = s->head;
656
657 num_entries = num_to_process(s, num_entries, block);
658
659 if (num_entries == 0)
660 return 0;
661
662 copy_entries_in(t, head, entries, num_entries);
663
664 s->head += num_entries;
665 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
666
667 return num_entries;
668
669 }
670
671 uint32_t
672 opdl_ring_copy_to_burst(struct opdl_ring *t, struct opdl_stage *s,
673 void *entries, uint32_t num_entries, bool block)
674 {
675 uint32_t head = s->head;
676
677 num_entries = num_to_process(s, num_entries, block);
678 if (num_entries == 0)
679 return 0;
680
681 copy_entries_out(t, head, entries, num_entries);
682
683 s->head += num_entries;
684 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
685
686 return num_entries;
687 }
688
689 uint32_t
690 opdl_stage_find_num_available(struct opdl_stage *s, uint32_t num_entries)
691 {
692 /* return (num_to_process(s, num_entries, false)); */
693
694 if (available(s) >= num_entries)
695 return num_entries;
696
697 update_available_seq(s);
698
699 uint32_t avail = available(s);
700
701 if (avail == 0) {
702 rte_pause();
703 return 0;
704 }
705 return (avail <= num_entries) ? avail : num_entries;
706 }
707
708 uint32_t
709 opdl_stage_claim(struct opdl_stage *s, void *entries,
710 uint32_t num_entries, uint32_t *seq, bool block, bool atomic)
711 {
712 if (s->threadsafe == false)
713 return opdl_stage_claim_singlethread(s, entries, num_entries,
714 seq, block, atomic);
715 else
716 return opdl_stage_claim_multithread(s, entries, num_entries,
717 seq, block);
718 }
719
720 uint32_t
721 opdl_stage_claim_copy(struct opdl_stage *s, void *entries,
722 uint32_t num_entries, uint32_t *seq, bool block)
723 {
724 if (s->threadsafe == false)
725 return opdl_stage_claim_copy_singlethread(s, entries,
726 num_entries, seq, block);
727 else
728 return opdl_stage_claim_copy_multithread(s, entries,
729 num_entries, seq, block);
730 }
731
732 void
733 opdl_stage_disclaim_n(struct opdl_stage *s, uint32_t num_entries,
734 bool block)
735 {
736
737 if (s->threadsafe == false) {
738 opdl_stage_disclaim_singlethread_n(s, s->num_claimed);
739 } else {
740 struct claim_manager *disclaims =
741 &s->pending_disclaims[rte_lcore_id()];
742
743 if (unlikely(num_entries > s->num_slots)) {
744 PMD_DRV_LOG(WARNING, "Attempt to disclaim (%u) more than claimed (%u)",
745 num_entries, disclaims->num_claimed);
746 num_entries = disclaims->num_claimed;
747 }
748
749 num_entries = RTE_MIN(num_entries + disclaims->num_to_disclaim,
750 disclaims->num_claimed);
751 opdl_stage_disclaim_multithread_n(s, num_entries, block);
752 }
753 }
754
755 int
756 opdl_stage_disclaim(struct opdl_stage *s, uint32_t num_entries, bool block)
757 {
758 if (num_entries != s->num_event) {
759 rte_errno = -EINVAL;
760 return 0;
761 }
762 if (s->threadsafe == false) {
763 __atomic_store_n(&s->shared.tail, s->head, __ATOMIC_RELEASE);
764 s->seq += s->num_claimed;
765 s->shadow_head = s->head;
766 s->num_claimed = 0;
767 } else {
768 struct claim_manager *disclaims =
769 &s->pending_disclaims[rte_lcore_id()];
770 opdl_stage_disclaim_multithread_n(s, disclaims->num_claimed,
771 block);
772 }
773 return num_entries;
774 }
775
776 uint32_t
777 opdl_ring_available(struct opdl_ring *t)
778 {
779 return opdl_stage_available(&t->stages[0]);
780 }
781
782 uint32_t
783 opdl_stage_available(struct opdl_stage *s)
784 {
785 update_available_seq(s);
786 return available(s);
787 }
788
789 void
790 opdl_ring_flush(struct opdl_ring *t)
791 {
792 struct opdl_stage *s = input_stage(t);
793
794 wait_for_available(s, s->num_slots);
795 }
796
797 /******************** Non performance sensitive functions ********************/
798
799 /* Initial setup of a new stage's context */
800 static int
801 init_stage(struct opdl_ring *t, struct opdl_stage *s, bool threadsafe,
802 bool is_input)
803 {
804 uint32_t available = (is_input) ? t->num_slots : 0;
805
806 s->t = t;
807 s->num_slots = t->num_slots;
808 s->index = t->num_stages;
809 s->threadsafe = threadsafe;
810 s->shared.stage = s;
811
812 /* Alloc memory for deps */
813 s->dep_tracking = rte_zmalloc_socket(LIB_NAME,
814 t->max_num_stages * sizeof(enum dep_type),
815 0, t->socket);
816 if (s->dep_tracking == NULL)
817 return -ENOMEM;
818
819 s->deps = rte_zmalloc_socket(LIB_NAME,
820 t->max_num_stages * sizeof(struct shared_state *),
821 0, t->socket);
822 if (s->deps == NULL) {
823 rte_free(s->dep_tracking);
824 return -ENOMEM;
825 }
826
827 s->dep_tracking[s->index] = DEP_SELF;
828
829 if (threadsafe == true)
830 s->shared.available_seq = available;
831 else
832 s->available_seq = available;
833
834 return 0;
835 }
836
837 /* Add direct or indirect dependencies between stages */
838 static int
839 add_dep(struct opdl_stage *dependent, const struct opdl_stage *dependency,
840 enum dep_type type)
841 {
842 struct opdl_ring *t = dependent->t;
843 uint32_t i;
844
845 /* Add new direct dependency */
846 if ((type == DEP_DIRECT) &&
847 (dependent->dep_tracking[dependency->index] ==
848 DEP_NONE)) {
849 PMD_DRV_LOG(DEBUG, "%s:%u direct dependency on %u",
850 t->name, dependent->index, dependency->index);
851 dependent->dep_tracking[dependency->index] = DEP_DIRECT;
852 }
853
854 /* Add new indirect dependency or change direct to indirect */
855 if ((type == DEP_INDIRECT) &&
856 ((dependent->dep_tracking[dependency->index] ==
857 DEP_NONE) ||
858 (dependent->dep_tracking[dependency->index] ==
859 DEP_DIRECT))) {
860 PMD_DRV_LOG(DEBUG, "%s:%u indirect dependency on %u",
861 t->name, dependent->index, dependency->index);
862 dependent->dep_tracking[dependency->index] = DEP_INDIRECT;
863 }
864
865 /* Shouldn't happen... */
866 if ((dependent->dep_tracking[dependency->index] == DEP_SELF) &&
867 (dependent != input_stage(t))) {
868 PMD_DRV_LOG(ERR, "Loop in dependency graph %s:%u",
869 t->name, dependent->index);
870 return -EINVAL;
871 }
872
873 /* Keep going to dependencies of the dependency, until input stage */
874 if (dependency != input_stage(t))
875 for (i = 0; i < dependency->num_deps; i++) {
876 int ret = add_dep(dependent, dependency->deps[i]->stage,
877 DEP_INDIRECT);
878
879 if (ret < 0)
880 return ret;
881 }
882
883 /* Make list of sequence numbers for direct dependencies only */
884 if (type == DEP_DIRECT)
885 for (i = 0, dependent->num_deps = 0; i < t->num_stages; i++)
886 if (dependent->dep_tracking[i] == DEP_DIRECT) {
887 if ((i == 0) && (dependent->num_deps > 1))
888 rte_panic("%s:%u depends on > input",
889 t->name,
890 dependent->index);
891 dependent->deps[dependent->num_deps++] =
892 &t->stages[i].shared;
893 }
894
895 return 0;
896 }
897
898 struct opdl_ring *
899 opdl_ring_create(const char *name, uint32_t num_slots, uint32_t slot_size,
900 uint32_t max_num_stages, int socket)
901 {
902 struct opdl_ring *t;
903 char mz_name[RTE_MEMZONE_NAMESIZE];
904 int mz_flags = 0;
905 struct opdl_stage *st = NULL;
906 const struct rte_memzone *mz = NULL;
907 size_t alloc_size = RTE_CACHE_LINE_ROUNDUP(sizeof(*t) +
908 (num_slots * slot_size));
909
910 /* Compile time checking */
911 RTE_BUILD_BUG_ON((sizeof(struct shared_state) & RTE_CACHE_LINE_MASK) !=
912 0);
913 RTE_BUILD_BUG_ON((offsetof(struct opdl_stage, shared) &
914 RTE_CACHE_LINE_MASK) != 0);
915 RTE_BUILD_BUG_ON((offsetof(struct opdl_ring, slots) &
916 RTE_CACHE_LINE_MASK) != 0);
917 RTE_BUILD_BUG_ON(!rte_is_power_of_2(OPDL_DISCLAIMS_PER_LCORE));
918
919 /* Parameter checking */
920 if (name == NULL) {
921 PMD_DRV_LOG(ERR, "name param is NULL");
922 return NULL;
923 }
924 if (!rte_is_power_of_2(num_slots)) {
925 PMD_DRV_LOG(ERR, "num_slots (%u) for %s is not power of 2",
926 num_slots, name);
927 return NULL;
928 }
929
930 /* Alloc memory for stages */
931 st = rte_zmalloc_socket(LIB_NAME,
932 max_num_stages * sizeof(struct opdl_stage),
933 RTE_CACHE_LINE_SIZE, socket);
934 if (st == NULL)
935 goto exit_fail;
936
937 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
938
939 /* Alloc memory for memzone */
940 mz = rte_memzone_reserve(mz_name, alloc_size, socket, mz_flags);
941 if (mz == NULL)
942 goto exit_fail;
943
944 t = mz->addr;
945
946 /* Initialise opdl_ring queue */
947 memset(t, 0, sizeof(*t));
948 strlcpy(t->name, name, sizeof(t->name));
949 t->socket = socket;
950 t->num_slots = num_slots;
951 t->mask = num_slots - 1;
952 t->slot_size = slot_size;
953 t->max_num_stages = max_num_stages;
954 t->stages = st;
955
956 PMD_DRV_LOG(DEBUG, "Created %s at %p (num_slots=%u,socket=%i,slot_size=%u)",
957 t->name, t, num_slots, socket, slot_size);
958
959 return t;
960
961 exit_fail:
962 PMD_DRV_LOG(ERR, "Cannot reserve memory");
963 rte_free(st);
964 rte_memzone_free(mz);
965
966 return NULL;
967 }
968
969 void *
970 opdl_ring_get_slot(const struct opdl_ring *t, uint32_t index)
971 {
972 return get_slot(t, index);
973 }
974
975 bool
976 opdl_ring_cas_slot(struct opdl_stage *s, const struct rte_event *ev,
977 uint32_t index, bool atomic)
978 {
979 uint32_t i = 0, offset;
980 struct opdl_ring *t = s->t;
981 struct rte_event *ev_orig = NULL;
982 bool ev_updated = false;
983 uint64_t ev_temp = 0;
984 uint64_t ev_update = 0;
985
986 uint32_t opa_id = 0;
987 uint32_t flow_id = 0;
988 uint64_t event = 0;
989
990 if (index > s->num_event) {
991 PMD_DRV_LOG(ERR, "index is overflow");
992 return ev_updated;
993 }
994
995 ev_temp = ev->event & OPDL_EVENT_MASK;
996
997 if (!atomic) {
998 offset = opdl_first_entry_id(s->seq, s->nb_instance,
999 s->instance_id);
1000 offset += index*s->nb_instance;
1001 ev_orig = get_slot(t, s->shadow_head+offset);
1002 if ((ev_orig->event&OPDL_EVENT_MASK) != ev_temp) {
1003 ev_orig->event = ev->event;
1004 ev_updated = true;
1005 }
1006 if (ev_orig->u64 != ev->u64) {
1007 ev_orig->u64 = ev->u64;
1008 ev_updated = true;
1009 }
1010
1011 } else {
1012 for (i = s->pos; i < s->num_claimed; i++) {
1013 ev_orig = (struct rte_event *)
1014 get_slot(t, s->shadow_head+i);
1015
1016 event = __atomic_load_n(&(ev_orig->event),
1017 __ATOMIC_ACQUIRE);
1018
1019 opa_id = OPDL_OPA_MASK & (event >> OPDL_OPA_OFFSET);
1020 flow_id = OPDL_FLOWID_MASK & event;
1021
1022 if (opa_id >= s->queue_id)
1023 continue;
1024
1025 if ((flow_id % s->nb_instance) == s->instance_id) {
1026 ev_update = s->queue_id;
1027 ev_update = (ev_update << OPDL_OPA_OFFSET)
1028 | ev->event;
1029
1030 s->pos = i + 1;
1031
1032 if ((event & OPDL_EVENT_MASK) !=
1033 ev_temp) {
1034 __atomic_store_n(&(ev_orig->event),
1035 ev_update,
1036 __ATOMIC_RELEASE);
1037 ev_updated = true;
1038 }
1039 if (ev_orig->u64 != ev->u64) {
1040 ev_orig->u64 = ev->u64;
1041 ev_updated = true;
1042 }
1043
1044 break;
1045 }
1046 }
1047
1048 }
1049
1050 return ev_updated;
1051 }
1052
1053 int
1054 opdl_ring_get_socket(const struct opdl_ring *t)
1055 {
1056 return t->socket;
1057 }
1058
1059 uint32_t
1060 opdl_ring_get_num_slots(const struct opdl_ring *t)
1061 {
1062 return t->num_slots;
1063 }
1064
1065 const char *
1066 opdl_ring_get_name(const struct opdl_ring *t)
1067 {
1068 return t->name;
1069 }
1070
1071 /* Check dependency list is valid for a given opdl_ring */
1072 static int
1073 check_deps(struct opdl_ring *t, struct opdl_stage *deps[],
1074 uint32_t num_deps)
1075 {
1076 unsigned int i;
1077
1078 for (i = 0; i < num_deps; ++i) {
1079 if (!deps[i]) {
1080 PMD_DRV_LOG(ERR, "deps[%u] is NULL", i);
1081 return -EINVAL;
1082 }
1083 if (t != deps[i]->t) {
1084 PMD_DRV_LOG(ERR, "deps[%u] is in opdl_ring %s, not %s",
1085 i, deps[i]->t->name, t->name);
1086 return -EINVAL;
1087 }
1088 }
1089
1090 return 0;
1091 }
1092
1093 struct opdl_stage *
1094 opdl_stage_add(struct opdl_ring *t, bool threadsafe, bool is_input)
1095 {
1096 struct opdl_stage *s;
1097
1098 /* Parameter checking */
1099 if (!t) {
1100 PMD_DRV_LOG(ERR, "opdl_ring is NULL");
1101 return NULL;
1102 }
1103 if (t->num_stages == t->max_num_stages) {
1104 PMD_DRV_LOG(ERR, "%s has max number of stages (%u)",
1105 t->name, t->max_num_stages);
1106 return NULL;
1107 }
1108
1109 s = &t->stages[t->num_stages];
1110
1111 if (((uintptr_t)&s->shared & RTE_CACHE_LINE_MASK) != 0)
1112 PMD_DRV_LOG(WARNING, "Tail seq num (%p) of %s stage not cache aligned",
1113 &s->shared, t->name);
1114
1115 if (init_stage(t, s, threadsafe, is_input) < 0) {
1116 PMD_DRV_LOG(ERR, "Cannot reserve memory");
1117 return NULL;
1118 }
1119 t->num_stages++;
1120
1121 return s;
1122 }
1123
1124 uint32_t
1125 opdl_stage_deps_add(struct opdl_ring *t, struct opdl_stage *s,
1126 uint32_t nb_instance, uint32_t instance_id,
1127 struct opdl_stage *deps[],
1128 uint32_t num_deps)
1129 {
1130 uint32_t i;
1131 int ret = 0;
1132
1133 if ((num_deps > 0) && (!deps)) {
1134 PMD_DRV_LOG(ERR, "%s stage has NULL dependencies", t->name);
1135 return -1;
1136 }
1137 ret = check_deps(t, deps, num_deps);
1138 if (ret < 0)
1139 return ret;
1140
1141 for (i = 0; i < num_deps; i++) {
1142 ret = add_dep(s, deps[i], DEP_DIRECT);
1143 if (ret < 0)
1144 return ret;
1145 }
1146
1147 s->nb_instance = nb_instance;
1148 s->instance_id = instance_id;
1149
1150 return ret;
1151 }
1152
1153 struct opdl_stage *
1154 opdl_ring_get_input_stage(const struct opdl_ring *t)
1155 {
1156 return input_stage(t);
1157 }
1158
1159 int
1160 opdl_stage_set_deps(struct opdl_stage *s, struct opdl_stage *deps[],
1161 uint32_t num_deps)
1162 {
1163 unsigned int i;
1164 int ret;
1165
1166 if ((num_deps == 0) || (!deps)) {
1167 PMD_DRV_LOG(ERR, "cannot set NULL dependencies");
1168 return -EINVAL;
1169 }
1170
1171 ret = check_deps(s->t, deps, num_deps);
1172 if (ret < 0)
1173 return ret;
1174
1175 /* Update deps */
1176 for (i = 0; i < num_deps; i++)
1177 s->deps[i] = &deps[i]->shared;
1178 s->num_deps = num_deps;
1179
1180 return 0;
1181 }
1182
1183 struct opdl_ring *
1184 opdl_stage_get_opdl_ring(const struct opdl_stage *s)
1185 {
1186 return s->t;
1187 }
1188
1189 void
1190 opdl_stage_set_queue_id(struct opdl_stage *s,
1191 uint32_t queue_id)
1192 {
1193 s->queue_id = queue_id;
1194 }
1195
1196 void
1197 opdl_ring_dump(const struct opdl_ring *t, FILE *f)
1198 {
1199 uint32_t i;
1200
1201 if (t == NULL) {
1202 fprintf(f, "NULL OPDL!\n");
1203 return;
1204 }
1205 fprintf(f, "OPDL \"%s\": num_slots=%u; mask=%#x; slot_size=%u; num_stages=%u; socket=%i\n",
1206 t->name, t->num_slots, t->mask, t->slot_size,
1207 t->num_stages, t->socket);
1208 for (i = 0; i < t->num_stages; i++) {
1209 uint32_t j;
1210 const struct opdl_stage *s = &t->stages[i];
1211
1212 fprintf(f, " %s[%u]: threadsafe=%s; head=%u; available_seq=%u; tail=%u; deps=%u",
1213 t->name, i, (s->threadsafe) ? "true" : "false",
1214 (s->threadsafe) ? s->shared.head : s->head,
1215 (s->threadsafe) ? s->shared.available_seq :
1216 s->available_seq,
1217 s->shared.tail, (s->num_deps > 0) ?
1218 s->deps[0]->stage->index : 0);
1219 for (j = 1; j < s->num_deps; j++)
1220 fprintf(f, ",%u", s->deps[j]->stage->index);
1221 fprintf(f, "\n");
1222 }
1223 fflush(f);
1224 }
1225
1226 void
1227 opdl_ring_free(struct opdl_ring *t)
1228 {
1229 uint32_t i;
1230 const struct rte_memzone *mz;
1231 char mz_name[RTE_MEMZONE_NAMESIZE];
1232
1233 if (t == NULL) {
1234 PMD_DRV_LOG(DEBUG, "Freeing NULL OPDL Ring!");
1235 return;
1236 }
1237
1238 PMD_DRV_LOG(DEBUG, "Freeing %s opdl_ring at %p", t->name, t);
1239
1240 for (i = 0; i < t->num_stages; ++i) {
1241 rte_free(t->stages[i].deps);
1242 rte_free(t->stages[i].dep_tracking);
1243 }
1244
1245 rte_free(t->stages);
1246
1247 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, t->name);
1248 mz = rte_memzone_lookup(mz_name);
1249 if (rte_memzone_free(mz) != 0)
1250 PMD_DRV_LOG(ERR, "Cannot free memzone for %s", t->name);
1251 }
1252
1253 /* search a opdl_ring from its name */
1254 struct opdl_ring *
1255 opdl_ring_lookup(const char *name)
1256 {
1257 const struct rte_memzone *mz;
1258 char mz_name[RTE_MEMZONE_NAMESIZE];
1259
1260 snprintf(mz_name, sizeof(mz_name), "%s%s", LIB_NAME, name);
1261
1262 mz = rte_memzone_lookup(mz_name);
1263 if (mz == NULL)
1264 return NULL;
1265
1266 return mz->addr;
1267 }
1268
1269 void
1270 opdl_ring_set_stage_threadsafe(struct opdl_stage *s, bool threadsafe)
1271 {
1272 s->threadsafe = threadsafe;
1273 }