1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2016-2017 Intel Corporation
6 #include <rte_hash_crc.h>
7 #include <rte_event_ring.h>
11 #define SW_IQS_MASK (SW_IQS_MAX-1)
13 /* Retrieve the highest priority IQ or -1 if no pkts available. Doing the
14 * CLZ twice is faster than caching the value due to data dependencies
16 #define PKT_MASK_TO_IQ(pkts) \
17 (__builtin_ctz(pkts | (1 << SW_IQS_MAX)))
20 #error Misconfigured PRIO_TO_IQ caused by SW_IQS_MAX value change
22 #define PRIO_TO_IQ(prio) (prio >> 6)
24 #define MAX_PER_IQ_DEQUEUE 48
25 #define FLOWID_MASK (SW_QID_NUM_FIDS-1)
26 /* use cheap bit mixing, we only need to lose a few bits */
27 #define SW_HASH_FLOWID(f) (((f) ^ (f >> 10)) & FLOWID_MASK)
29 static inline uint32_t
30 sw_schedule_atomic_to_cq(struct sw_evdev
*sw
, struct sw_qid
* const qid
,
31 uint32_t iq_num
, unsigned int count
)
33 struct rte_event qes
[MAX_PER_IQ_DEQUEUE
]; /* count <= MAX */
34 struct rte_event blocked_qes
[MAX_PER_IQ_DEQUEUE
];
35 uint32_t nb_blocked
= 0;
38 if (count
> MAX_PER_IQ_DEQUEUE
)
39 count
= MAX_PER_IQ_DEQUEUE
;
41 /* This is the QID ID. The QID ID is static, hence it can be
42 * used to identify the stage of processing in history lists etc
44 uint32_t qid_id
= qid
->id
;
46 iq_dequeue_burst(sw
, &qid
->iq
[iq_num
], qes
, count
);
47 for (i
= 0; i
< count
; i
++) {
48 const struct rte_event
*qe
= &qes
[i
];
49 const uint16_t flow_id
= SW_HASH_FLOWID(qes
[i
].flow_id
);
50 struct sw_fid_t
*fid
= &qid
->fids
[flow_id
];
55 if (qid
->cq_next_tx
>= qid
->cq_num_mapped_cqs
)
57 cq_idx
= qid
->cq_next_tx
++;
59 cq
= qid
->cq_map
[cq_idx
];
62 int cq_free_cnt
= sw
->cq_ring_space
[cq
];
63 for (cq_idx
= 0; cq_idx
< qid
->cq_num_mapped_cqs
;
65 int test_cq
= qid
->cq_map
[cq_idx
];
66 int test_cq_free
= sw
->cq_ring_space
[test_cq
];
67 if (test_cq_free
> cq_free_cnt
) {
69 cq_free_cnt
= test_cq_free
;
73 fid
->cq
= cq
; /* this pins early */
76 if (sw
->cq_ring_space
[cq
] == 0 ||
77 sw
->ports
[cq
].inflights
== SW_PORT_HIST_LIST
) {
78 blocked_qes
[nb_blocked
++] = *qe
;
82 struct sw_port
*p
= &sw
->ports
[cq
];
84 /* at this point we can queue up the packet on the cq_buf */
86 p
->cq_buf
[p
->cq_buf_count
++] = *qe
;
88 sw
->cq_ring_space
[cq
]--;
90 int head
= (p
->hist_head
++ & (SW_PORT_HIST_LIST
-1));
91 p
->hist_list
[head
].fid
= flow_id
;
92 p
->hist_list
[head
].qid
= qid_id
;
98 /* if we just filled in the last slot, flush the buffer */
99 if (sw
->cq_ring_space
[cq
] == 0) {
100 struct rte_event_ring
*worker
= p
->cq_worker_ring
;
101 rte_event_ring_enqueue_burst(worker
, p
->cq_buf
,
103 &sw
->cq_ring_space
[cq
]);
107 iq_put_back(sw
, &qid
->iq
[iq_num
], blocked_qes
, nb_blocked
);
109 return count
- nb_blocked
;
112 static inline uint32_t
113 sw_schedule_parallel_to_cq(struct sw_evdev
*sw
, struct sw_qid
* const qid
,
114 uint32_t iq_num
, unsigned int count
, int keep_order
)
117 uint32_t cq_idx
= qid
->cq_next_tx
;
119 /* This is the QID ID. The QID ID is static, hence it can be
120 * used to identify the stage of processing in history lists etc
122 uint32_t qid_id
= qid
->id
;
124 if (count
> MAX_PER_IQ_DEQUEUE
)
125 count
= MAX_PER_IQ_DEQUEUE
;
128 /* only schedule as many as we have reorder buffer entries */
129 count
= RTE_MIN(count
,
130 rte_ring_count(qid
->reorder_buffer_freelist
));
132 for (i
= 0; i
< count
; i
++) {
133 const struct rte_event
*qe
= iq_peek(&qid
->iq
[iq_num
]);
134 uint32_t cq_check_count
= 0;
138 * for parallel, just send to next available CQ in round-robin
139 * fashion. So scan for an available CQ. If all CQs are full
140 * just return and move on to next QID
143 if (++cq_check_count
> qid
->cq_num_mapped_cqs
)
145 if (cq_idx
>= qid
->cq_num_mapped_cqs
)
147 cq
= qid
->cq_map
[cq_idx
++];
149 } while (rte_event_ring_free_count(
150 sw
->ports
[cq
].cq_worker_ring
) == 0 ||
151 sw
->ports
[cq
].inflights
== SW_PORT_HIST_LIST
);
153 struct sw_port
*p
= &sw
->ports
[cq
];
154 if (sw
->cq_ring_space
[cq
] == 0 ||
155 p
->inflights
== SW_PORT_HIST_LIST
)
158 sw
->cq_ring_space
[cq
]--;
160 qid
->stats
.tx_pkts
++;
162 const int head
= (p
->hist_head
& (SW_PORT_HIST_LIST
-1));
163 p
->hist_list
[head
].fid
= SW_HASH_FLOWID(qe
->flow_id
);
164 p
->hist_list
[head
].qid
= qid_id
;
167 rte_ring_sc_dequeue(qid
->reorder_buffer_freelist
,
168 (void *)&p
->hist_list
[head
].rob_entry
);
170 sw
->ports
[cq
].cq_buf
[sw
->ports
[cq
].cq_buf_count
++] = *qe
;
171 iq_pop(sw
, &qid
->iq
[iq_num
]);
173 rte_compiler_barrier();
179 qid
->cq_next_tx
= cq_idx
;
184 sw_schedule_dir_to_cq(struct sw_evdev
*sw
, struct sw_qid
* const qid
,
185 uint32_t iq_num
, unsigned int count __rte_unused
)
187 uint32_t cq_id
= qid
->cq_map
[0];
188 struct sw_port
*port
= &sw
->ports
[cq_id
];
190 /* get max burst enq size for cq_ring */
191 uint32_t count_free
= sw
->cq_ring_space
[cq_id
];
195 /* burst dequeue from the QID IQ ring */
196 struct sw_iq
*iq
= &qid
->iq
[iq_num
];
197 uint32_t ret
= iq_dequeue_burst(sw
, iq
,
198 &port
->cq_buf
[port
->cq_buf_count
], count_free
);
199 port
->cq_buf_count
+= ret
;
201 /* Update QID, Port and Total TX stats */
202 qid
->stats
.tx_pkts
+= ret
;
203 port
->stats
.tx_pkts
+= ret
;
205 /* Subtract credits from cached value */
206 sw
->cq_ring_space
[cq_id
] -= ret
;
212 sw_schedule_qid_to_cq(struct sw_evdev
*sw
)
217 sw
->sched_cq_qid_called
++;
219 for (qid_idx
= 0; qid_idx
< sw
->qid_count
; qid_idx
++) {
220 struct sw_qid
*qid
= sw
->qids_prioritized
[qid_idx
];
222 int type
= qid
->type
;
223 int iq_num
= PKT_MASK_TO_IQ(qid
->iq_pkt_mask
);
225 /* zero mapped CQs indicates directed */
226 if (iq_num
>= SW_IQS_MAX
|| qid
->cq_num_mapped_cqs
== 0)
229 uint32_t pkts_done
= 0;
230 uint32_t count
= iq_count(&qid
->iq
[iq_num
]);
233 if (type
== SW_SCHED_TYPE_DIRECT
)
234 pkts_done
+= sw_schedule_dir_to_cq(sw
, qid
,
236 else if (type
== RTE_SCHED_TYPE_ATOMIC
)
237 pkts_done
+= sw_schedule_atomic_to_cq(sw
, qid
,
240 pkts_done
+= sw_schedule_parallel_to_cq(sw
, qid
,
242 type
== RTE_SCHED_TYPE_ORDERED
);
245 /* Check if the IQ that was polled is now empty, and unset it
246 * in the IQ mask if its empty.
248 int all_done
= (pkts_done
== count
);
250 qid
->iq_pkt_mask
&= ~(all_done
<< (iq_num
));
257 /* This function will perform re-ordering of packets, and injecting into
258 * the appropriate QID IQ. As LB and DIR QIDs are in the same array, but *NOT*
259 * contiguous in that array, this function accepts a "range" of QIDs to scan.
262 sw_schedule_reorder(struct sw_evdev
*sw
, int qid_start
, int qid_end
)
264 /* Perform egress reordering */
265 struct rte_event
*qe
;
266 uint32_t pkts_iter
= 0;
268 for (; qid_start
< qid_end
; qid_start
++) {
269 struct sw_qid
*qid
= &sw
->qids
[qid_start
];
270 int i
, num_entries_in_use
;
272 if (qid
->type
!= RTE_SCHED_TYPE_ORDERED
)
275 num_entries_in_use
= rte_ring_free_count(
276 qid
->reorder_buffer_freelist
);
278 for (i
= 0; i
< num_entries_in_use
; i
++) {
279 struct reorder_buffer_entry
*entry
;
282 entry
= &qid
->reorder_buffer
[qid
->reorder_buffer_index
];
287 for (j
= 0; j
< entry
->num_fragments
; j
++) {
291 int idx
= entry
->fragment_index
+ j
;
292 qe
= &entry
->fragments
[idx
];
294 dest_qid
= qe
->queue_id
;
295 dest_iq
= PRIO_TO_IQ(qe
->priority
);
297 if (dest_qid
>= sw
->qid_count
) {
298 sw
->stats
.rx_dropped
++;
304 struct sw_qid
*q
= &sw
->qids
[dest_qid
];
305 struct sw_iq
*iq
= &q
->iq
[dest_iq
];
307 /* we checked for space above, so enqueue must
310 iq_enqueue(sw
, iq
, qe
);
311 q
->iq_pkt_mask
|= (1 << (dest_iq
));
312 q
->iq_pkt_count
[dest_iq
]++;
316 entry
->ready
= (j
!= entry
->num_fragments
);
317 entry
->num_fragments
-= j
;
318 entry
->fragment_index
+= j
;
321 entry
->fragment_index
= 0;
324 qid
->reorder_buffer_freelist
,
327 qid
->reorder_buffer_index
++;
328 qid
->reorder_buffer_index
%= qid
->window_size
;
335 static __rte_always_inline
void
336 sw_refill_pp_buf(struct sw_evdev
*sw
, struct sw_port
*port
)
339 struct rte_event_ring
*worker
= port
->rx_worker_ring
;
340 port
->pp_buf_start
= 0;
341 port
->pp_buf_count
= rte_event_ring_dequeue_burst(worker
, port
->pp_buf
,
342 RTE_DIM(port
->pp_buf
), NULL
);
345 static __rte_always_inline
uint32_t
346 __pull_port_lb(struct sw_evdev
*sw
, uint32_t port_id
, int allow_reorder
)
348 static struct reorder_buffer_entry dummy_rob
;
349 uint32_t pkts_iter
= 0;
350 struct sw_port
*port
= &sw
->ports
[port_id
];
352 /* If shadow ring has 0 pkts, pull from worker ring */
353 if (port
->pp_buf_count
== 0)
354 sw_refill_pp_buf(sw
, port
);
356 while (port
->pp_buf_count
) {
357 const struct rte_event
*qe
= &port
->pp_buf
[port
->pp_buf_start
];
358 struct sw_hist_list_entry
*hist_entry
= NULL
;
359 uint8_t flags
= qe
->op
;
360 const uint16_t eop
= !(flags
& QE_FLAG_NOT_EOP
);
361 int needs_reorder
= 0;
362 /* if no-reordering, having PARTIAL == NEW */
363 if (!allow_reorder
&& !eop
)
364 flags
= QE_FLAG_VALID
;
367 * if we don't have space for this packet in an IQ,
368 * then move on to next queue. Technically, for a
369 * packet that needs reordering, we don't need to check
370 * here, but it simplifies things not to special-case
372 uint32_t iq_num
= PRIO_TO_IQ(qe
->priority
);
373 struct sw_qid
*qid
= &sw
->qids
[qe
->queue_id
];
375 /* now process based on flags. Note that for directed
376 * queues, the enqueue_flush masks off all but the
377 * valid flag. This makes FWD and PARTIAL enqueues just
378 * NEW type, and makes DROPS no-op calls.
380 if ((flags
& QE_FLAG_COMPLETE
) && port
->inflights
> 0) {
381 const uint32_t hist_tail
= port
->hist_tail
&
382 (SW_PORT_HIST_LIST
- 1);
384 hist_entry
= &port
->hist_list
[hist_tail
];
385 const uint32_t hist_qid
= hist_entry
->qid
;
386 const uint32_t hist_fid
= hist_entry
->fid
;
388 struct sw_fid_t
*fid
=
389 &sw
->qids
[hist_qid
].fids
[hist_fid
];
391 if (fid
->pcount
== 0)
395 /* set reorder ready if an ordered QID */
397 (uintptr_t)hist_entry
->rob_entry
;
398 const uintptr_t valid
= (rob_ptr
!= 0);
399 needs_reorder
= valid
;
401 ((valid
- 1) & (uintptr_t)&dummy_rob
);
402 struct reorder_buffer_entry
*tmp_rob_ptr
=
403 (struct reorder_buffer_entry
*)rob_ptr
;
404 tmp_rob_ptr
->ready
= eop
* needs_reorder
;
407 port
->inflights
-= eop
;
408 port
->hist_tail
+= eop
;
410 if (flags
& QE_FLAG_VALID
) {
411 port
->stats
.rx_pkts
++;
413 if (allow_reorder
&& needs_reorder
) {
414 struct reorder_buffer_entry
*rob_entry
=
415 hist_entry
->rob_entry
;
417 hist_entry
->rob_entry
= NULL
;
418 /* Although fragmentation not currently
419 * supported by eventdev API, we support it
420 * here. Open: How do we alert the user that
421 * they've exceeded max frags?
423 int num_frag
= rob_entry
->num_fragments
;
424 if (num_frag
== SW_FRAGMENTS_MAX
)
425 sw
->stats
.rx_dropped
++;
427 int idx
= rob_entry
->num_fragments
++;
428 rob_entry
->fragments
[idx
] = *qe
;
433 /* Use the iq_num from above to push the QE
434 * into the qid at the right priority
437 qid
->iq_pkt_mask
|= (1 << (iq_num
));
438 iq_enqueue(sw
, &qid
->iq
[iq_num
], qe
);
439 qid
->iq_pkt_count
[iq_num
]++;
440 qid
->stats
.rx_pkts
++;
445 port
->pp_buf_start
++;
446 port
->pp_buf_count
--;
447 } /* while (avail_qes) */
453 sw_schedule_pull_port_lb(struct sw_evdev
*sw
, uint32_t port_id
)
455 return __pull_port_lb(sw
, port_id
, 1);
459 sw_schedule_pull_port_no_reorder(struct sw_evdev
*sw
, uint32_t port_id
)
461 return __pull_port_lb(sw
, port_id
, 0);
465 sw_schedule_pull_port_dir(struct sw_evdev
*sw
, uint32_t port_id
)
467 uint32_t pkts_iter
= 0;
468 struct sw_port
*port
= &sw
->ports
[port_id
];
470 /* If shadow ring has 0 pkts, pull from worker ring */
471 if (port
->pp_buf_count
== 0)
472 sw_refill_pp_buf(sw
, port
);
474 while (port
->pp_buf_count
) {
475 const struct rte_event
*qe
= &port
->pp_buf
[port
->pp_buf_start
];
476 uint8_t flags
= qe
->op
;
478 if ((flags
& QE_FLAG_VALID
) == 0)
481 uint32_t iq_num
= PRIO_TO_IQ(qe
->priority
);
482 struct sw_qid
*qid
= &sw
->qids
[qe
->queue_id
];
483 struct sw_iq
*iq
= &qid
->iq
[iq_num
];
485 port
->stats
.rx_pkts
++;
487 /* Use the iq_num from above to push the QE
488 * into the qid at the right priority
490 qid
->iq_pkt_mask
|= (1 << (iq_num
));
491 iq_enqueue(sw
, iq
, qe
);
492 qid
->iq_pkt_count
[iq_num
]++;
493 qid
->stats
.rx_pkts
++;
497 port
->pp_buf_start
++;
498 port
->pp_buf_count
--;
499 } /* while port->pp_buf_count */
505 sw_event_schedule(struct rte_eventdev
*dev
)
507 struct sw_evdev
*sw
= sw_pmd_priv(dev
);
508 uint32_t in_pkts
, out_pkts
;
509 uint32_t out_pkts_total
= 0, in_pkts_total
= 0;
510 int32_t sched_quanta
= sw
->sched_quanta
;
514 if (unlikely(!sw
->started
))
518 uint32_t in_pkts_this_iteration
= 0;
520 /* Pull from rx_ring for ports */
523 for (i
= 0; i
< sw
->port_count
; i
++) {
524 /* ack the unlinks in progress as done */
525 if (sw
->ports
[i
].unlinks_in_progress
)
526 sw
->ports
[i
].unlinks_in_progress
= 0;
528 if (sw
->ports
[i
].is_directed
)
529 in_pkts
+= sw_schedule_pull_port_dir(sw
, i
);
530 else if (sw
->ports
[i
].num_ordered_qids
> 0)
531 in_pkts
+= sw_schedule_pull_port_lb(sw
, i
);
533 in_pkts
+= sw_schedule_pull_port_no_reorder(sw
, i
);
536 /* QID scan for re-ordered */
537 in_pkts
+= sw_schedule_reorder(sw
, 0,
539 in_pkts_this_iteration
+= in_pkts
;
540 } while (in_pkts
> 4 &&
541 (int)in_pkts_this_iteration
< sched_quanta
);
543 out_pkts
= sw_schedule_qid_to_cq(sw
);
544 out_pkts_total
+= out_pkts
;
545 in_pkts_total
+= in_pkts_this_iteration
;
547 if (in_pkts
== 0 && out_pkts
== 0)
549 } while ((int)out_pkts_total
< sched_quanta
);
551 sw
->stats
.tx_pkts
+= out_pkts_total
;
552 sw
->stats
.rx_pkts
+= in_pkts_total
;
554 sw
->sched_no_iq_enqueues
+= (in_pkts_total
== 0);
555 sw
->sched_no_cq_enqueues
+= (out_pkts_total
== 0);
557 /* push all the internal buffered QEs in port->cq_ring to the
558 * worker cores: aka, do the ring transfers batched.
560 for (i
= 0; i
< sw
->port_count
; i
++) {
561 struct rte_event_ring
*worker
= sw
->ports
[i
].cq_worker_ring
;
562 rte_event_ring_enqueue_burst(worker
, sw
->ports
[i
].cq_buf
,
563 sw
->ports
[i
].cq_buf_count
,
564 &sw
->cq_ring_space
[i
]);
565 sw
->ports
[i
].cq_buf_count
= 0;