1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2016-2017 Intel Corporation
5 #include <rte_atomic.h>
6 #include <rte_cycles.h>
7 #include <rte_event_ring.h>
11 #define PORT_ENQUEUE_MAX_BURST_SIZE 64
14 sw_event_release(struct sw_port
*p
, uint8_t index
)
17 * Drops the next outstanding event in our history. Used on dequeue
18 * to clear any history before dequeuing more events.
22 /* create drop message */
24 ev
.op
= sw_qe_flag_map
[RTE_EVENT_OP_RELEASE
];
27 rte_event_ring_enqueue_burst(p
->rx_worker_ring
, &ev
, 1, &free_count
);
29 /* each release returns one credit */
30 p
->outstanding_releases
--;
31 p
->inflight_credits
++;
35 * special-case of rte_event_ring enqueue, with overriding the ops member on
36 * the events that get written to the ring.
38 static inline unsigned int
39 enqueue_burst_with_ops(struct rte_event_ring
*r
, const struct rte_event
*events
,
40 unsigned int n
, uint8_t *ops
)
42 struct rte_event tmp_evs
[PORT_ENQUEUE_MAX_BURST_SIZE
];
45 memcpy(tmp_evs
, events
, n
* sizeof(events
[0]));
46 for (i
= 0; i
< n
; i
++)
47 tmp_evs
[i
].op
= ops
[i
];
49 return rte_event_ring_enqueue_burst(r
, tmp_evs
, n
, NULL
);
53 sw_event_enqueue_burst(void *port
, const struct rte_event ev
[], uint16_t num
)
56 uint8_t new_ops
[PORT_ENQUEUE_MAX_BURST_SIZE
];
57 struct sw_port
*p
= port
;
58 struct sw_evdev
*sw
= (void *)p
->sw
;
59 uint32_t sw_inflights
= rte_atomic32_read(&sw
->inflights
);
60 uint32_t credit_update_quanta
= sw
->credit_update_quanta
;
63 if (num
> PORT_ENQUEUE_MAX_BURST_SIZE
)
64 num
= PORT_ENQUEUE_MAX_BURST_SIZE
;
66 for (i
= 0; i
< num
; i
++)
67 new += (ev
[i
].op
== RTE_EVENT_OP_NEW
);
69 if (unlikely(new > 0 && p
->inflight_max
< sw_inflights
))
72 if (p
->inflight_credits
< new) {
73 /* check if event enqueue brings port over max threshold */
74 if (sw_inflights
+ credit_update_quanta
> sw
->nb_events_limit
)
77 rte_atomic32_add(&sw
->inflights
, credit_update_quanta
);
78 p
->inflight_credits
+= (credit_update_quanta
);
80 /* If there are fewer inflight credits than new events, limit
81 * the number of enqueued events.
83 num
= (p
->inflight_credits
< new) ? p
->inflight_credits
: new;
86 for (i
= 0; i
< num
; i
++) {
88 int outstanding
= p
->outstanding_releases
> 0;
89 const uint8_t invalid_qid
= (ev
[i
].queue_id
>= sw
->qid_count
);
91 p
->inflight_credits
-= (op
== RTE_EVENT_OP_NEW
);
92 p
->inflight_credits
+= (op
== RTE_EVENT_OP_RELEASE
) *
95 new_ops
[i
] = sw_qe_flag_map
[op
];
96 new_ops
[i
] &= ~(invalid_qid
<< QE_FLAG_VALID_SHIFT
);
98 /* FWD and RELEASE packets will both resolve to taken (assuming
99 * correct usage of the API), providing very high correct
102 if ((new_ops
[i
] & QE_FLAG_COMPLETE
) && outstanding
)
103 p
->outstanding_releases
--;
105 /* error case: branch to avoid touching p->stats */
106 if (unlikely(invalid_qid
&& op
!= RTE_EVENT_OP_RELEASE
)) {
107 p
->stats
.rx_dropped
++;
108 p
->inflight_credits
++;
112 /* returns number of events actually enqueued */
113 uint32_t enq
= enqueue_burst_with_ops(p
->rx_worker_ring
, ev
, i
,
115 if (p
->outstanding_releases
== 0 && p
->last_dequeue_burst_sz
!= 0) {
116 uint64_t burst_ticks
= rte_get_timer_cycles() -
117 p
->last_dequeue_ticks
;
118 uint64_t burst_pkt_ticks
=
119 burst_ticks
/ p
->last_dequeue_burst_sz
;
120 p
->avg_pkt_ticks
-= p
->avg_pkt_ticks
/ NUM_SAMPLES
;
121 p
->avg_pkt_ticks
+= burst_pkt_ticks
/ NUM_SAMPLES
;
122 p
->last_dequeue_ticks
= 0;
125 /* Replenish credits if enough releases are performed */
126 if (p
->inflight_credits
>= credit_update_quanta
* 2) {
127 rte_atomic32_sub(&sw
->inflights
, credit_update_quanta
);
128 p
->inflight_credits
-= credit_update_quanta
;
135 sw_event_enqueue(void *port
, const struct rte_event
*ev
)
137 return sw_event_enqueue_burst(port
, ev
, 1);
141 sw_event_dequeue_burst(void *port
, struct rte_event
*ev
, uint16_t num
,
145 struct sw_port
*p
= (void *)port
;
146 struct rte_event_ring
*ring
= p
->cq_worker_ring
;
148 /* check that all previous dequeues have been released */
149 if (p
->implicit_release
) {
150 struct sw_evdev
*sw
= (void *)p
->sw
;
151 uint32_t credit_update_quanta
= sw
->credit_update_quanta
;
152 uint16_t out_rels
= p
->outstanding_releases
;
154 for (i
= 0; i
< out_rels
; i
++)
155 sw_event_release(p
, i
);
157 /* Replenish credits if enough releases are performed */
158 if (p
->inflight_credits
>= credit_update_quanta
* 2) {
159 rte_atomic32_sub(&sw
->inflights
, credit_update_quanta
);
160 p
->inflight_credits
-= credit_update_quanta
;
164 /* returns number of events actually dequeued */
165 uint16_t ndeq
= rte_event_ring_dequeue_burst(ring
, ev
, num
, NULL
);
166 if (unlikely(ndeq
== 0)) {
172 p
->outstanding_releases
+= ndeq
;
173 p
->last_dequeue_burst_sz
= ndeq
;
174 p
->last_dequeue_ticks
= rte_get_timer_cycles();
175 p
->poll_buckets
[(ndeq
- 1) >> SW_DEQ_STAT_BUCKET_SHIFT
]++;
183 sw_event_dequeue(void *port
, struct rte_event
*ev
, uint64_t wait
)
185 return sw_event_dequeue_burst(port
, ev
, 1, wait
);